3.6 KiB
3.6 KiB
并行流
- 将顺序流转换为并行流, 使用parallel方法
public static long parallelSum(long n) {
return Stream.interate(1L, i -> i +1)
.limit(n)
.parallel()
.reduce(0L, Long::sum);
}
通过sequential方法可以把并行流变成顺序流, 并行流内部使用了默认的ForkJoinPool,默认线程数量为处理器数量
// 测试运行性能
System.out.println("Parallel sum done in: "+ measureSumPerf(ParallelStreams::parallelSum, 10_000_000)+" msecs" );
使用正确的书据结构可以保证其最佳性能,避免装箱开销, 并行化本身并不是没有代价的,需要对流做递归划分,把每个子流的归纳操作分配到不同的线程
正确使用并行流
避免算法改变了某些共享状态,避免共享累加器这种共享可变状态
高效使用并行流
- 如果有疑问,测量
- 留意装箱
- 有些操作本身在并行流上的性能就比顺序流差
- 考虑操作流水线的总计算成本
- 对于较小的数据量,选择并行流几乎从来不是一个好的决定
- 要考虑流背后的数据结构是否易于分解
- 流自身的特点,以及流水线中的中间操作改变流的方式,都可能会改变分解过程的性能
- 考虑终端操作中合并步骤的代价是大是小
分支/合并框架
目的为以递归的方式将可以并行的任务拆分成更小的任务,然后将每个子任务的结果合并起来生成整体结果,是ExecutorService接口的一个实现,把子任务分配给线程池(ForkJoinPool)中工作线程
- 使用RecursiveTask
提交任务必须创建一个RecursiveTask<R>的一个子类, R是并行化执行任务(以及所有子任务)产生的结果类型, 如果不返回结果,是RecursiveAction类型, 要定义RecursiveTask,实现它唯一的抽象方法compute
protected abstract R compute();
这个方法定义了将任务拆分成子任务的逻辑,以及无法再拆或不便拆时生成单个子任务的逻辑
if(任务足够小或不可分){
顺序计算该任务
} else {
将任务分成两个子任务
递归调用本方法,拆分每个子任务,等待所有子任务完成
合并每个子任务的结果
}
使用分支/合并框架的最佳做法
- 对一个任务调用join犯法会阻塞调用方,直到该任务作出结果
- 不应该在RescursiveTask内部使用ForkJoinPool的invoke方法,应该始终直接调用computer或fork方法
- 对子任务调用fork方法可以把它排进ForkJoinPool
- 调试使用分支/合并框架的并行计算可能有点棘手
- 和并行流一样,不应理所当然认为在多核处理器上使用分支/合并框架比顺序计算快
工作窃取
每个子任务的执行时间都不一样, 实际情况下可能会有一个线程早早完成了分配给它的任务,队列已近空了,而其他线程还在忙碌, 这时,这个线程从随机选择一个线程,从队列尾巴上“偷走一个任务”, 这个过程一直持续到所有任务执行完成
Spliterator
可分迭代器,为了并行执行而设计的
public interface Spliterator<T> {
boolean tryAdvance(Consumer<? super T> action);
Spliterator<T> trySplit();
long estimateSize();
int characteristics();
}
拆分过程
对第一个Spliterator调用trysplit,生成第二个Spliterator, 对两个Spliterator调用trysplit,生成4各spliterator, 不断调用trysplit,直到返回null。
Spliterator的特性
最后一个方法characteristics,返回一个int,代表Spliterator本身特性集的编码