JAVA8中并发类CompletableFuture使用遇到的坑-守护线程
1. 前言
最近在看Java8中对并发的支持CompletableFuture类,觉得挺好的,在单一线程执行的时候可以省略很多代码,手动验证后,发现其中还是有一部分坑的,此处记录一下。此处主要描述其创建线程为守护线程的问题,会随着主线程消亡直接消亡,导致任务失败。
2. CompletableFuture优点
简洁,后面以代码为例
3. 案例简述
3.1 CompletableFuture创建一个线程,内部进行轮询读取任务(如:模拟kafka之类的),会发现,执行结束后,整个线程一起关闭了,无法打到一直轮询的目的。传统直接创建的线程(默认为非守护线程)会由于子线程未结束,阻止主线程关闭。
4. 代码
package com.hz.threadpool.thread;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
/**
* @description:
* @author: pp_lan
* @date: 2022/3/24
*/
public class CompletableFutureTest {
/**
* java8并发接口
*/
public static void main(String[] args) throws InterruptedException {
System.out.println("============== testAsyncRunnable ===============");
CompletableFuture.runAsync((() -> {
while (true) {
try {
TimeUnit.MILLISECONDS.sleep(1000);
System.out.println("**************kafka读取消息结束**************");
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}));
TimeUnit.SECONDS.sleep(3);
}
}
5. 执行结果验证
主线程等待了3秒,子线程内部第3秒还未执行完成,随着主线程一起关闭了
Connected to the target VM, address: '127.0.0.1:64615', transport: 'socket'
============== testAsyncRunnable ===============
**************kafka读取消息结束**************
**************kafka读取消息结束**************
Disconnected from the target VM, address: '127.0.0.1:64615', transport: 'socket'
6. 源码分析
默认的线程池为java.util.concurrent.ForkJoinPool
线程池内部创建线程的时候将线程设置为了守护线程
7. 解决方案
7.1 方案说明
既然问题出在了自定义的线程池创建Thread的时候,那么我们可以手动创建线程池(此处也可以不可以覆写ThreadFactory),规避该问题 ,代码如下:
import java.util.concurrent.*;
/**
* @description:
* @author: pp_lan
* @date: 2022/3/24
*/
public class CompletableFutureTest {
/**
* java8并发接口
*/
public static void main(String[] args) throws InterruptedException, ExecutionException {
System.out.println("============== testAsyncRunnable ===============");
// 使用默认线程池
// CompletableFuture<Void> future = CompletableFuture.runAsync((() -> {
// while (true) {
// try {
// TimeUnit.MILLISECONDS.sleep(1000);
// System.out.println("**************kafka读取消息结束**************");
// } catch (InterruptedException e) {
// e.printStackTrace();
// }
//
// }
// }));
// 使用自定义线程池
ThreadPoolExecutor pool = new ThreadPoolExecutor(2, 2, 3, TimeUnit.SECONDS, new ArrayBlockingQueue<>(5),
r -> new Thread(r));
try {
CompletableFuture.runAsync((() -> {
int i = 0;
while (i++ < 2) {
try {
TimeUnit.MILLISECONDS.sleep(1000);
System.out.println("**************kafka读取消息结束**************");
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}), pool).whenComplete((r, e) -> {
System.out.println("执行完毕");
});
} finally {
pool.shutdown();
if (!pool.awaitTermination(10, TimeUnit.SECONDS)) {
pool.shutdownNow();
}
}
}
}
CompletableFuture.runAsync和手动创建的线程池组合,虽然可以解决创建守护线程的问题,但是线程池还是需要手动去关闭。
7.2 源码分析
从源码上看该方法只是将传入的线程池进行一个简单调用。从本文的案例来说,可能直接使用线程池更为直观可靠。