1 回答
TA贡献1936条经验 获得超6个赞
AnExecutorService
不会仅仅因为一项作业(及其子作业)完成而自行终止。线程池背后的整个想法是可重用。
因此,只有当应用程序调用shutdown()
它时,它才会终止。
您可以使用它isQuiescent()
来查找是否没有待处理的作业,这仅在所有提交的作业都属于您的特定任务时才有效。使用返回的 future 来submit
检查实际工作的完成情况要简洁得多。
在任何一种情况下,排队任务的完成状态都不会说明您正在轮询的队列。当您了解提交结束时,您仍然需要检查队列中是否有未决元素。
此外,建议使用线程安全BlockingQueue
实现而不是装饰LinkedList
withsynchronized
块。加上一些需要清理的其他内容,代码将如下所示:
public class TestForkJoinPoolEnd {
private static final BlockingQueue<String> QUEUE = new LinkedBlockingQueue<>();
private static final int MAX_SIZE = 5000;
private static final int SPEED_UP = 100;
public static void main(String[] args) {
ForkJoinPool customThreadPool = new ForkJoinPool(12);
ForkJoinTask<?> future = customThreadPool.submit(
() -> makeList()
.parallelStream()
.forEach(TestForkJoinPoolEnd::process));
QUEUE.offer("Theard pool started up");
int counter = MAX_SIZE + 1;
while (!future.isDone()) try {
String s = QUEUE.poll(1, TimeUnit.MILLISECONDS);
if (s != null) {
System.out.println(s);
counter--;
}
} catch (InterruptedException e) {}
for(;;) {
String s = QUEUE.poll();
if (s == null) break;
System.out.println(s);
counter--;
}
System.out.println("counter = " + counter);
System.out.println("isQuiescent = " + customThreadPool.isQuiescent() + " isTerminating " +
"= " + customThreadPool.isTerminating() + " isTerminated = "
+ customThreadPool.isTerminated() + " isShutdown =" + customThreadPool.isShutdown());
customThreadPool.shutdown();
}
static List<String> makeList() {
return IntStream.range(0, MAX_SIZE)
.mapToObj(i -> makeString())
.collect(Collectors.toList());
}
static String makeString() {
int targetStringLength = 10;
Random random = new Random();
StringBuilder buffer = new StringBuilder(targetStringLength);
for (int i = 0; i < targetStringLength; i++) {
int randomLimitedInt = random.nextInt('z' - 'a' + 1) + 'a';
buffer.append((char) randomLimitedInt);
}
return buffer.toString();
}
static int toSeed(String s) {
return s.chars().sum() / SPEED_UP;
}
static void process(String s) {
long start = System.nanoTime();
try {
TimeUnit.MILLISECONDS.sleep(toSeed(s));
} catch (InterruptedException e) {
}
long end = System.nanoTime();
QUEUE.offer(s + " slept for " + (end - start)/1000000 + " milliseconds");
}
}
如果您sleep在接收端的呼叫应该模拟一些工作量而不是等待新项目,您也可以使用
int counter = MAX_SIZE + 1;
while (!future.isDone()) {
String s = QUEUE.poll();
if (s != null) {
System.out.println(s);
counter--;
}
try {
TimeUnit.MILLISECONDS.sleep(1);
} catch (InterruptedException e) {}
}
但逻辑没有改变。future.isDone()返回后true,我们必须重新检查队列中待处理的元素。我们只保证不会有新的项目到达,而不保证队列已经空了。
作为旁注,该makeString()方法可以进一步改进
static String makeString() {
int targetStringLength = 10;
ThreadLocalRandom random = ThreadLocalRandom.current();
StringBuilder buffer = new StringBuilder(targetStringLength);
for (int i = 0; i < targetStringLength; i++) {
int randomLimitedInt = random.nextInt('a', 'z' + 1);
buffer.append((char)randomLimitedInt);
}
return buffer.toString();
}
甚至
static String makeString() {
int targetStringLength = 10;
return ThreadLocalRandom.current()
.ints(targetStringLength, 'a', 'z'+1)
.collect(StringBuilder::new, StringBuilder::appendCodePoint, StringBuilder::append)
.toString();
}
添加回答
举报