1 ForkJoinPool定义

ForkJoinPool是自java7开始,jvm提供的一个用于并行执行的任务框架。其主旨是将大任务分成若干小任务,之后再并行对这些小任务进行计算,最终汇总这些任务的结果。得到最终的结果。其广泛用在java8的stream中。

1.1 分治法

分治法的基本思想是将一个规模为N的问题分解为K个规模较小的子问题,这些子问题的相互独立且与原问题的性质相同,求出子问题的解之后,将这些解合并,就可以得到原有问题的解。是一种分目标完成的程序算法。

image.png

1.2 工作窃取(work-stealing)

工作窃取是指当某个线程的任务队列中没有可执行任务的时候,从其他线程的任务队列中窃取任务来执行,以充分利用工作线程的计算能力,减少线程由于获取不到任务而造成的空闲浪费。在ForkJoinpool中,工作任务的队列都采用双端队列Deque容器。我们知道,在通常使用队列的过程中,我们都在队尾插入,而在队头消费以实现FIFO。而为了实现工作窃取。一般我们会改成工作线程在工作队列上LIFO,而窃取其他线程的任务的时候,从队列头部取获取。

2 实战

public class HighPerformanceArraySum {
    // 创建线程池(从分配给虚拟机的处理器的数量)
   private static final ForkJoinPool POOL = new ForkJoinPool(Runtime.getRuntime().availableProcessors());
   
   public static long computeSum(int[] array) {

       return POOL.invoke(new ArraySumTask(array, 0, array.length));
   }

   // 创建递归任务
   private static class ArraySumTask extends RecursiveTask<Long> {

       private static final int THRESHOLD = 1000;
       private final int[] data;
       private final int start;
       private final int end;

       ArraySumTask(int[] data, int start, int end) {
           this.data = data;
           this.start = start;
           this.end = end;
       }

       @Override
       protected Long compute() {
           if (end - start <= THRESHOLD) {
               return computeDirectly();   // 小于拆分阈值,执行具体任务
           } else {
               // 大于拆分阈值,继续拆分
               int mid = (start + end) >>> 1;
               ArraySumTask leftTask = new ArraySumTask(data, start, mid);
               leftTask.fork(); // 异步执行左半部分
               ArraySumTask rightTask = new ArraySumTask(data, mid, end);
               return rightTask.compute() + leftTask.join(); // 合并结果
           }
       }
				// 具体任务
       private long computeDirectly() {
           long sum = 0;
           for (int i = start; i < end; i++) {
              sum += data[i];
           }
           return sum;
       }
   }
}