ForkJoinPool,RecursiveAction,RecursiveTask並行運算

JDK1.7提供了一個將一個任務拆分成多個“小任務”並行計算,再把多個“小任務”的結果合併成總的計算結果或者不合並的功能:

Fork:分岔、分流、叉子的意思。Join:合併、加入的意思。加在一起就是分開算然後再合併起來。

ForkJoinPool:ForkJoin的線程池,主要管理這個創建的線程

RecursiveAction:用於沒有返回結果的任務(類似Runnable)

RecursiveTask:用於有返回結果的任務(類似Callable)

ForkJoinPool,RecursiveAction,RecursiveTask並行運算

1、示例代碼:

<code>
package com.example.forkjoinpool;

import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;

import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicInteger;

public class ForkjoinpoolApplication {


public static void main(String[] args) throws InterruptedException, ExecutionException {
PrintAction printAction = new PrintAction(1, 100);
ForkJoinPool pool = new ForkJoinPool();
pool.submit(printAction);
//線程阻塞,等待所有任務完成
pool.awaitTermination(2, TimeUnit.SECONDS);
pool.shutdown();

// PrintTask printTask = new PrintTask(1, 100);
// ForkJoinPool pool = new ForkJoinPool();
// pool.submit(printTask);
// System.out.println( printTask.get());
//線程阻塞,等待所有任務完成
// pool.awaitTermination(2, TimeUnit.SECONDS);
// pool.shutdown();

}
}

class PrintAction extends RecursiveAction {
private final int THRESHOLD = 50; //最多隻能打印50個數
private int start;
private int end;
public AtomicInteger atomicInteger = new AtomicInteger(0);

public PrintAction(int start, int end) {
super();
this.start = start;

this.end = end;
}


@Override
protected void compute() {
if (end - start < THRESHOLD) {
for (int i = start; i < end; i++) {
atomicInteger.addAndGet(i);
System.out.println(Thread.currentThread().getName() + "的i值:" + i);
System.out.println(Thread.currentThread().getName() + "的atomicInteger值:" + atomicInteger);
}
} else {
int middle = (start + end) / 2;
PrintAction left = new PrintAction(start, middle);
PrintAction right = new PrintAction(middle, end);
//並行執行兩個“小任務”
left.fork();
right.fork();
}

}

}

class PrintTask extends RecursiveTask<integer> {
private static final int THRESHOLD = 50; //最多隻能打印50個數
private int start;
private int end;


public PrintTask(int start, int end) {
super();
this.start = start;
this.end = end;
}


@Override
protected Integer compute() {
int sum = 0;
if (end - start < THRESHOLD) {
for (int i = start; i < end; i++) {
sum += i;
System.out.println(Thread.currentThread().getName() + "的i值:" + i);
}
return sum;
} else {

int middle = (start + end) / 2;
PrintTask left = new PrintTask(start, middle);
PrintTask right = new PrintTask(middle, end);
//並行執行兩個“小任務”
left.fork();
right.fork();
return left.join() + right.join();
}
}
}
/<integer>/<code>

任務異常處理:

ForkJoinTask在執行的時候可能會拋出異常,但是和使用一般線程池一樣沒辦法在主線程中捕獲。

所以ForkJoinTask提供了isCompletedAbnormally來判斷任務是否已經拋出異常或者被取消了。可以通過getException來獲取異常,它會返回Throwable對象,如果任務沒有完成或者沒有拋出異常則返回null。


一些注意點:

<code>ForkJoinPool在執行過程中,會創建大量的子任務,導致GC進行垃圾回收,這些是需要注意的。
/<code>

個人覺得ForkJoinPool適用於需要將一個大任務拆分成多個子任務的情況,就是JAVA8的對集合進行parallelStream那樣。

parallelStream.forEach是不保證順序的,如果要保證順序正確,應該使用 forEachOrdered

改功能主要是用於很多小運算時,數量多,運算量很小,很多線程肯定不合適(線程的創建、上下文切換、銷燬是的時間可能都比運算的時間多),這時候就可以把很多小任務分類合併下,放一個線程中執行,減少線程數量,但要小心別創建太多線程了哦


分享到:


相關文章: