到底什么是线程?什么是进程?
多线程的创建方式
使用匿名内部类创建线程
// 使用匿名内部类创建线程
@Test
public void test() {
// 创建线程
Thread thread = new Thread() {
@Override
public void run() {
log.info("子线程执行了:{}",Thread.currentThread().getName());
}
};
// start方法是用于启动线程的,启动线程后,会自动调用run方法
thread.start();
log.info("主线程执行了:{}",Thread.currentThread().getName());
}
特点:
直接继承Thread类
重写run()方法
通过start()启动线程
缺点:Java不支持多继承,继承Thread后就不能继承其他类
实现Runable接口创建线程
@Test
public void test2() {
// 方式2:实现Runnable接口
Runnable runnable = new Runnable() {
@Override
public void run() {
log.info("子线程执行了:{}", Thread.currentThread().getName());
}
};
Thread thread = new Thread(runnable);
thread.start(); // 正确:启动新线程
// thread.run(); // 错误:直接调用run()只是普通方法调用,不会启动新线程
log.info("主线程执行了:{}", Thread.currentThread().getName());
}
特点:
实现Runnable接口
可以继承其他类
更好地体现了面向对象的设计思想
使用lombda表达式创建线程
// 使用lambda表达式创建线程
@Test
public void test3() {
// 创建Runnable对象
Runnable runnable = () -> {
log.info("子线程执行了:{}", Thread.currentThread().getName());
};
// 创建线程
Thread thread = new Thread(runnable);
// start方法是用于启动线程的,启动线程后,会自动调用run方法
thread.start();
log.info("主线程执行了:{}", Thread.currentThread().getName());
}
特点:
代码更简洁
本质还是Runnable接口的实现
适合简单的线程任务
使用Callable和Future创建线程
@Test
public void test4() {
// 方式4:Callable + Future(可以有返回值)
Callable<String> callable = () -> {
log.info("子线程执行了:{}", Thread.currentThread().getName());
Thread.sleep(1000); // 模拟耗时操作
return "任务完成";
};
FutureTask<String> futureTask = new FutureTask<>(callable);
new Thread(futureTask).start();
try {
String result = futureTask.get(); // 等待任务完成并获取结果
log.info("线程执行结果:{}", result);
} catch (Exception e) {
log.error("执行异常", e);
}
}
特点:
可以有返回值
可以抛出异常
get()方法会阻塞等待结果
使用线程池创建线程
@Test
public void test5() {
// 方式5:线程池
ExecutorService executorService = Executors.newFixedThreadPool(5);
try {
for (int i = 0; i < 10; i++) {
final int taskId = i;
executorService.submit(() -> {
log.info("任务{}在线程{}中执行", taskId, Thread.currentThread().getName());
});
}
} finally {
// 重点:记得关闭线程池
executorService.shutdown();
}
}
特点:
线程复用,性能更好
统一管理线程
控制并发数
注意要关闭线程池
使用Spring的@Async注解
// 配置类
@Configuration // 标识这是一个配置类
@EnableAsync // 开启Spring异步执行的功能
public class AsyncConfig {
/**
* 新任务提交
* ↓
* 是否有空闲的核心线程?
* ├── 有 → 直接执行
* └── 没有
* ↓
* 队列是否已满?
* ├── 未满 → 加入队列等待
* └── 已满
* ↓
* 是否达到最大线程数?
* ├── 未达到 → 创建新线程执行
* └── 已达到 → 触发拒绝策略
* ├── AbortPolicy:直接抛出异常
* ├── CallerRunsPolicy:在调用者线程中执行任务
* ├── DiscardOldestPolicy:丢弃队列中最旧的任务
* └── DiscardPolicy:直接丢弃任务
*
*/
@Bean // 将这个方法的返回值注册为Spring容器中的Bean
public Executor asyncExecutor() {
// 创建ThreadPoolTaskExecutor对象,这是Spring提供的线程池执行器
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
// 核心线程数:线程池创建时候初始化的线程数
// 这些线程会一直存活,即使它们处于空闲状态
executor.setCorePoolSize(5);
// 最大线程数:线程池最大的线程数,只有在工作队列满了之后才会创建超出核心线程数的线程
// 当任务数增加时,线程池会创建新线程处理任务,直到达到maxPoolSize
executor.setMaxPoolSize(10);
// 队列容量:用于缓存任务的阻塞队列的大小
// 当核心线程都在工作时,新任务会被放到队列中等待
// 只有当队列满了后,才会创建新的线程(但不超过最大线程数)
executor.setQueueCapacity(25);
// 线程名前缀:设置线程名的前缀,方便查看日志时区分不同线程池
executor.setThreadNamePrefix("Async-");
// 拒绝策略:当线程池和队列都满了时的处理策略
// CALLER_RUNS:在调用者线程中执行任务
executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
// 等待所有任务完成后再关闭线程池
executor.setWaitForTasksToCompleteOnShutdown(true);
// 等待任务完成的最长时间
executor.setAwaitTerminationSeconds(60);
// 初始化线程池
executor.initialize();
return executor;
}
}
// 服务类
@Service
public class SpringAsyncTest {
@Async
public void asyncMethod() {
log.info("异步方法执行在线程:{}", Thread.currentThread().getName());
}
@Async
public CompletableFuture<String> asyncMethodWithResult() {
log.info("异步方法执行在线程:{}", Thread.currentThread().getName());
return CompletableFuture.completedFuture("异步执行完成");
}
}
特点:
Spring框架提供的异步能力
配置简单,使用方便
可以与Spring的事务管理结合
支持返回CompletableFuture
重要注意点
start() vs run()
start():启动新线程
run():普通方法调用,不会启动新线程
线程池最佳实践
使用ThreadPoolExecutor自定义线程池
根据实际需求设置参数
记得关闭线程池
@Async使用注意
需要@EnableAsync开启
方法必须是public
在同一个类中调用是无效的
这些方式各有特点,建议:
简单场景:使用Runnable + Lambda
需要返回值:使用Callable + Future
并发场景:使用线程池
Spring项目:考虑使用@Async
手写异步注解
MyAsync
@Target(ElementType.METHOD) // 限定该注解只能用在方法上
@Retention(RetentionPolicy.RUNTIME) // 保留到运行时
@Documented // 生成javadoc时会包含注解
// 自写异步注解
public @interface MyAsync {
}
MyAsyncAop
/**
* 处理带有@MyAsync注解的方法的切面
* 核心功能:将同步方法转换为异步执行
* 支持两种返回类型:
* 1. void:直接在异步线程中执行,不关心结果
* 2. CompletableFuture:在异步线程中执行,并通过 CompletableFuture 返回结果
*/
@Slf4j
@Aspect
@Component
public class MyAsyncAop {
@Autowired
@Qualifier("MyAsyncExecutor")
private ThreadPoolTaskExecutor myAsyncExecutor;
/**
* 环绕通知:处理异步方法的核心逻辑
* 主要职责:
* 1. 将方法执行转移到异步线程池
* 2. 处理方法返回值
* 3. 确保异常被正确处理
*
* @param pjp 切点,包含了被代理方法的所有信息和执行能力
* @return 根据方法返回类型返回相应结果
* @throws IllegalArgumentException 如果方法签名无效或返回类型不支持
*/
@Around("@annotation(com.example.advancedlearning.annotation.MyAsync)")
public Object around(ProceedingJoinPoint pjp) {
// 获取方法签名,用于提取方法信息
if (!(pjp.getSignature() instanceof MethodSignature signature)) {
throw new IllegalArgumentException("该注解只能用于方法");
}
Class<?> returnType = signature.getReturnType();
String methodName = signature.getMethod().getName();
try {
// 处理无返回值的方法
if (void.class.equals(returnType)) {
/**
* void 方法处理策略:
* 1. 使用 execute 提交任务到线程池
* 2. 立即返回 null
* 3. 方法会在异步线程中执行,调用者不需要等待结果
*/
myAsyncExecutor.execute(() -> {
try {
pjp.proceed();
} catch (Throwable e) {
log.error("void方法执行异常: {}", methodName, e);
}
});
return null;
}
// 处理返回 CompletableFuture 的方法
else if (CompletableFuture.class.isAssignableFrom(returnType)) {
/**
* CompletableFuture 处理策略:
* 1. 创建新的 CompletableFuture 作为最终结果的载体
* 2. 在异步线程中执行原方法
* 3. 原方法返回的 CompletableFuture 结果传递给新创建的 CompletableFuture
*
* 为什么需要新的 CompletableFuture?
* - 确保方法执行在异步线程中
* - 处理原方法返回的 CompletableFuture 的结果
* - 提供统一的异常处理机制
*/
CompletableFuture<Object> resultFuture = new CompletableFuture<>();
// 将任务提交到线程池执行
myAsyncExecutor.execute(() -> {
try {
// 执行原方法
Object result = pjp.proceed();
// 处理方法返回的 CompletableFuture
if (result instanceof CompletableFuture<?> future) {
/**
* whenComplete: 当 Future 完成时(无论成功还是异常)调用的回调方法
* 参数1 (value): 正常完成时的结果值
* 参数2 (ex): 异常完成时的异常对象
* 注意:这是一个非阻塞的操作,不会影响原来的 Future 结果
*/
future.whenComplete((value, ex) -> {
if (ex != null) {
/**
* completeExceptionally: 以异常完成这个 Future
* - 将异常传递给调用者
* - Future 的 get() 方法将抛出这个异常
* - 标记 Future 为完成状态(异常完成)
*/
resultFuture.completeExceptionally(ex);
} else {
/**
* complete: 正常完成这个 Future
* - 设置 Future 的完成值
* - 唤醒所有等待这个结果的线程
* - 标记 Future 为完成状态(正常完成)
* - 返回值可以通过 Future.get() 获取
*/
resultFuture.complete(value);
}
});
} else {
// 更明确的错误处理
String errorMsg = String.format(
"方法 %s 声明返回 CompletableFuture 但实际返回了 %s。请确保方法返回 CompletableFuture!",
methodName,
result != null ? result.getClass().getName() : "null"
);
resultFuture.completeExceptionally(new IllegalStateException(errorMsg));
}
} catch (Throwable e) {
// 异常处理:将异常传递给 CompletableFuture
log.error("异步方法执行异常: {}", methodName, e);
resultFuture.completeExceptionally(e);
}
});
return resultFuture;
}
// 不支持的返回类型处理
throw new IllegalArgumentException(
"MyAsync方法必须返回void或CompletableFuture,当前返回类型: " + returnType.getName());
} catch (Throwable e) {
// 处理 AOP 本身的异常
log.error("异步方法封装异常, 方法: {}", methodName, e);
throw new RuntimeException("异步处理失败", e);
}
}
}
AsyncDemoService
@Service
@Slf4j
public class AsyncDemoService {
/**
* 案例1: 最简单的异步方法 - void返回值
* 适用场景:发送通知邮件、记录日志等不需要返回结果的操作
*/
@MyAsync
public void sendEmail(String to, String content) {
// 模拟耗时操作
try {
// log.info("开始发送邮件给: {}", to);
System.out.println("开始发送邮件给: " + to + " 线程名称:" + Thread.currentThread().getName());
Thread.sleep(2000); // 模拟发送邮件耗时2秒
// log.info("邮件发送完成: {}", to);
System.out.println("邮件发送完成: " + to + " 线程名称:" + Thread.currentThread().getName());
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
log.error("发送邮件被中断", e);
}
}
/**
* 案例2: 返回CompletableFuture的异步方法
* 适用场景:需要获取异步执行结果,且可能需要进一步处理的场景
*/
@MyAsync
public CompletableFuture<String> processOrder(String orderId) {
// log.info("开始处理订单: {}", orderId);
System.out.println("processOrder方法开始处理订单: " + orderId + " 线程名称:" + Thread.currentThread().getName());
// 模拟订单处理
try {
Thread.sleep(3000); // 模拟处理订单耗时3秒
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
log.error("订单处理被中断", e);
}
return CompletableFuture.completedFuture("订单 " + orderId + " 处理完成");
}
@MyAsync
public CompletableFuture<Integer> calculateSum(int start, int end) { // 这里必须是 CompletableFuture<Integer>
System.out.println("开始计算从 " + start + " 到 " + end + " 的和" + " 线程名称:" + Thread.currentThread().getName());
// 模拟耗时计算
try {
Thread.sleep(1500); // 模拟计算耗时1.5秒
int result = IntStream.rangeClosed(start, end).sum();
return CompletableFuture.completedFuture(result); // 返回包装后的结果
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
log.error("计算被中断", e);
return CompletableFuture.failedFuture(e); // 返回异常结果
}
}
}
AsyncDemoServiceTest
@SpringBootTest
@Slf4j
public class AsyncDemoServiceTest {
@Autowired
private AsyncDemoService asyncDemoService;
@Autowired
private SpringAsyncTest springAsyncTest;
@Test
public void springAsyncTest() throws Exception {
// void 方法会等待完成
springAsyncTest.sendEmail("test@example.com", "这是一封测试邮件");
// 有返回值的方法需要等待结果
CompletableFuture<String> orderFuture = springAsyncTest.processOrder("ORDER123");
CompletableFuture<Integer> sumFuture = springAsyncTest.calculateSum(1, 100);
// 等待有返回值的方法完成
String orderResult = orderFuture.get();
Integer sumResult = sumFuture.get();
System.out.println("订单结果: " + orderResult + " 线程名称:" + Thread.currentThread().getName());
System.out.println("计算结果: " + sumResult + " 线程名称:" + Thread.currentThread().getName());
}
@Test
public void testAsyncMethods() throws Exception {
// void 方法会等待完成
asyncDemoService.sendEmail("test@example.com", "这是一封测试邮件");
// 有返回值的方法需要等待结果
CompletableFuture<String> orderFuture = asyncDemoService.processOrder("ORDER123");
CompletableFuture<Integer> sumFuture = asyncDemoService.calculateSum(1, 100);
// 等待有返回值的方法完成
String orderResult = orderFuture.get();
Integer sumResult = sumFuture.get();
System.out.println("订单结果: " + orderResult + " 线程名称:" + Thread.currentThread().getName());
System.out.println("计算结果: " + sumResult + " 线程名称:" + Thread.currentThread().getName());
}
@Test
public void testMultipleAsyncCalls() {
// 模拟并发调用
log.info("开始并发测试");
for (int i = 0; i < 5; i++) {
final int index = i;
asyncDemoService.sendEmail(
"user" + index + "@example.com",
"并发测试邮件 " + index
);
}
log.info("并发调用完成");
// 等待异步操作完成
// try {
// Thread.sleep(3000);
// } catch (InterruptedException e) {
// Thread.currentThread().interrupt();
// }
}
}
MyAsyncConfig
@Configuration
public class MyAsyncConfig {
/**
* 新任务提交
* ↓
* 是否有空闲的核心线程?
* ├── 有 → 直接执行
* └── 没有
* ↓
* 队列是否已满?
* ├── 未满 → 加入队列等待
* └── 已满
* ↓
* 是否达到最大线程数?
* ├── 未达到 → 创建新线程执行
* └── 已达到 → 触发拒绝策略
* ├── AbortPolicy:直接抛出异常
* ├── CallerRunsPolicy:在调用者线程中执行任务
* ├── DiscardOldestPolicy:丢弃队列中最旧的任务
* └── DiscardPolicy:直接丢弃任务
*
*/
@Bean("MyAsyncExecutor")
public ThreadPoolTaskExecutor asyncExecutor() {
// 创建ThreadPoolTaskExecutor对象,这是Spring提供的线程池执行器
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
// 核心线程数:线程池创建时候初始化的线程数
// 这些线程会一直存活,即使它们处于空闲状态
executor.setCorePoolSize(5);
// 最大线程数:线程池最大的线程数,只有在工作队列满了之后才会创建超出核心线程数的线程
// 当任务数增加时,线程池会创建新线程处理任务,直到达到maxPoolSize
executor.setMaxPoolSize(10);
// 队列容量:用于缓存任务的阻塞队列的大小
// 当核心线程都在工作时,新任务会被放到队列中等待
// 只有当队列满了后,才会创建新的线程(但不超过最大线程数)
executor.setQueueCapacity(25);
// 线程名前缀:设置线程名的前缀,方便查看日志时区分不同线程池
executor.setThreadNamePrefix("Async-");
// 拒绝策略:当线程池和队列都满了时的处理策略
// CALLER_RUNS:在调用者线程中执行任务
executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
// 等待所有任务完成后再关闭线程池
executor.setWaitForTasksToCompleteOnShutdown(true);
// 等待任务完成的最长时间
executor.setAwaitTerminationSeconds(60);
// 初始化线程池
executor.initialize();
return executor;
}
}
手写异步日志注解(简单案例)
MyAsyncLog
@Target({ElementType.METHOD}) // 仅用于方法
@Retention(RetentionPolicy.RUNTIME) // 运行时保留
@Documented
public @interface MyAsyncLog {
}
MyAsyncLogAspect
/**
* 依赖注入的最佳实践比较
* ─────────────────────────────────────────────────────────────────────────
* 1. 构造函数注入的优势:
* - 保证依赖不可变性(final 修饰)
* - 保证依赖不为空(编译期检查)
* - 更好的测试性(清晰的依赖关系)
* - Spring 官方推荐的方式
* 2. @Autowired 的劣势:
* - 可能出现 NPE(运行时才能发现)
* - 依赖关系不直观
* - 不利于测试(依赖注入不明确)
* ─────────────────────────────────────────────────────────────────────────
*/
@Slf4j
@Aspect
@Component
public class MyAsyncLogAspect {
// 使用 final 确保依赖不可变
private final RequestLogService requestLogService;
private final ThreadPoolTaskExecutor myAsyncExecutor;
// 构造函数注入
public MyAsyncLogAspect(RequestLogService logService,
@Qualifier("MyAsyncExecutor") ThreadPoolTaskExecutor asyncExecutor) {
this.requestLogService = logService;
this.myAsyncExecutor = asyncExecutor;
}
/**
* /**
* 环绕通知:处理带有 @AsyncLog 注解的方法
* ─────────────────────────────────────────────────────────
* 执行顺序:
* 1. 获取请求信息
* 2. 执行原方法
* 3. 异步记录日志
* ─────────────────────────────────────────────────────────
*/
@Around("@annotation(com.example.advancedlearning.annotation.MyAsyncLog)")
public Object around(ProceedingJoinPoint point) {
// 获取请求上下文
ServletRequestAttributes attributes = (ServletRequestAttributes) RequestContextHolder.getRequestAttributes();
// 构建基础日志信息
RequestLog.RequestLogBuilder logBuilder = RequestLog.builder()
.operationTime(LocalDateTime.now());
// 处理请求信息
if (attributes != null) {
try {
HttpServletRequest request = attributes.getRequest();
logBuilder
.ipAddress(getIpAddress(request))
.requestPath(request.getRequestURI())
.requestMethod(request.getMethod())
.userAgent(request.getHeader("User-Agent"));
} catch (Exception e) {
// 记录异常但不影响主流程
log.warn("获取请求信息时发生异常", e);
logBuilder
.ipAddress("unknown")
.requestPath("unknown")
.requestMethod("unknown")
.userAgent("unknown");
}
} else {
// 非Web请求上下文的处理
log.debug("非Web请求上下文");
logBuilder
.ipAddress("non-web-request")
.requestPath("non-web-request")
.requestMethod("non-web-request")
.userAgent("non-web-request");
}
// 执行原方法
try {
Object result = point.proceed();
// 异步记录日志
myAsyncExecutor.execute(() -> requestLogService.saveLog(logBuilder.build()));
return result;
} catch (Throwable e) {
// 记录异常信息
log.error("方法执行异常", e);
// 仍然记录日志
myAsyncExecutor.execute(() -> requestLogService.saveLog(logBuilder.build()));
throw new RuntimeException(e);
}
}
/**
* 获取真实IP地址
* ────────────────────────────────────────────────────
* 依次从以下头信息中获取IP:
* 1. X-Forwarded-For
* 2. Proxy-Client-IP
* 3. WL-Proxy-Client-IP
* 4. 直接远程地址
* ────────────────────────────────────────────────────
*/
private String getIpAddress(HttpServletRequest request) {
String[] headers = {
"X-Forwarded-For",
"Proxy-Client-IP",
"WL-Proxy-Client-IP",
"HTTP_CLIENT_IP",
"HTTP_X_FORWARDED_FOR"
};
String ip;
for (String header : headers) {
ip = request.getHeader(header);
if (isValidIp(ip)) {
return ip;
}
}
return request.getRemoteAddr(); // 获取远程地址
}
/**
* 验证IP地址是否有效
*/
private boolean isValidIp(String ip) {
return ip != null && !ip.isEmpty() && !"unknown".equalsIgnoreCase(ip);
}
}
RequestLog
@Data
@Builder // 使用 Lombok 注解,自动生成构造器
public class RequestLog {
/**
* 操作时间(精确到秒)
* 示例:2024-12-22 15:30:45
*/
private LocalDateTime operationTime;
/**
* 请求IP地址
* 示例:192.168.1.1
*/
private String ipAddress;
/**
* 请求路径
* 示例:/api/users
*/
private String requestPath;
/**
* 请求方法
* 示例:GET、POST、PUT、DELETE
*/
private String requestMethod;
/**
* 用户代理信息
* 示例:Mozilla/5.0 (Windows NT 10.0; Win64; x64) ...
*/
private String userAgent;
}
RequestLogService
@Service
@Slf4j
public class RequestLogService {
// 日志文件路径
private static final String LOG_FILE = "request_operation.log";
// 日期格式化
private static final DateTimeFormatter FORMATTER = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss");
/**
* 构造函数,确保日志文件存在
*/
public RequestLogService() {
try {
if (!Files.exists(Paths.get(LOG_FILE))) {
Files.createFile(Paths.get(LOG_FILE));
log.info("创建日志文件成功");
}
} catch (IOException e) {
log.error("创建日志文件失败", e);
}
}
/**
* 保存日志
* 格式:[时间] | IP | 方法 | 路径 | User-Agent
*/
public void saveLog(RequestLog requestLog) {
System.out.println("开始日志记录:" + requestLog + " 线程:" + Thread.currentThread().getName());
// 构建日志内容
String logContent = String.format("[%s] | %s | %s | %s | %s",
requestLog.getOperationTime().format(FORMATTER),
requestLog.getIpAddress(),
requestLog.getRequestMethod(),
requestLog.getRequestPath(),
requestLog.getUserAgent()
);
// 写入日志文件
try (FileWriter writer = new FileWriter(LOG_FILE, true)) {
writer.write(logContent + System.lineSeparator()); // 使用系统换行符
writer.flush(); // 立即写入文件
} catch (IOException e) {
log.error("写入日志文件失败", e);
}
}