流能够并行履行,以添加很多输入元素的运行时性能。并行流ForkJoinPool经过静态ForkJoinPool.commonPool()办法运用公共可用的流。底层线程池的大小最多运用五个线程 – 具体取决于可用物理CPU中心的数量:
ForkJoinPool commonPool = ForkJoinPool.commonPool();
System.out.println(commonPool.getParallelism()); // 3
在我的机器上,公共池初始化为默认值为3的并行度。经过设置以下JVM参数能够减小或添加此值:
-Djava.util.concurrent.ForkJoinPool.common.parallelism=5
集合支撑创建并行元素流的办法parallelStream()。或许,您能够在给定流上调用中心办法parallel(),以将次序流通换为并行流。
为了评价并行流的并行履行行为,下一个示例将有关当前线程的信息打印出来:
Arrays.asList("a1", "a2", "b1", "c2", "c1")
.parallelStream()
.filter(s -> {
System.out.format("filter: %s [%s]\n",
s, Thread.currentThread().getName());
return true;
})
.map(s -> {
System.out.format("map: %s [%s]\n",
s, Thread.currentThread().getName());
return s.toUpperCase();
})
.forEach(s -> System.out.format("forEach: %s [%s]\n",
s, Thread.currentThread().getName()));
经过查询调试输出,咱们应该更好地理解哪些线程实践用于履行流操作:
filter: b1 [main]
filter: a2 [ForkJoinPool.commonPool-worker-1]
map: a2 [ForkJoinPool.commonPool-worker-1]
filter: c2 [ForkJoinPool.commonPool-worker-3]
map: c2 [ForkJoinPool.commonPool-worker-3]
filter: c1 [ForkJoinPool.commonPool-worker-2]
map: c1 [ForkJoinPool.commonPool-worker-2]
forEach: C2 [ForkJoinPool.commonPool-worker-3]
forEach: A2 [ForkJoinPool.commonPool-worker-1]
map: b1 [main]
forEach: B1 [main]
filter: a1 [ForkJoinPool.commonPool-worker-3]
map: a1 [ForkJoinPool.commonPool-worker-3]
forEach: A1 [ForkJoinPool.commonPool-worker-3]
forEach: C1 [ForkJoinPool.commonPool-worker-2]
如您所见,并行流利用公共中的一切可用线程ForkJoinPool来履行流操作。输出在接连运行中或许不同,由于实践运用的特定线程的行为是非确定性的。
让咱们经过一个额外的流操作来扩展该示例:
Arrays.asList("a1", "a2", "b1", "c2", "c1")
.parallelStream()
.filter(s -> {
System.out.format("filter: %s [%s]\n",
s, Thread.currentThread().getName());
return true;
})
.map(s -> {
System.out.format("map: %s [%s]\n",
s, Thread.currentThread().getName());
return s.toUpperCase();
})
.sorted((s1, s2) -> {
System.out.format("sort: %s <> %s [%s]\n",
s1, s2, Thread.currentThread().getName());
return s1.compareTo(s2);
})
.forEach(s -> System.out.format("forEach: %s [%s]\n",
s, Thread.currentThread().getName()));
成果或许开始看起来很奇怪:
filter: c2 [ForkJoinPool.commonPool-worker-3]
filter: c1 [ForkJoinPool.commonPool-worker-2]
map: c1 [ForkJoinPool.commonPool-worker-2]
filter: a2 [ForkJoinPool.commonPool-worker-1]
map: a2 [ForkJoinPool.commonPool-worker-1]
filter: b1 [main]
map: b1 [main]
filter: a1 [ForkJoinPool.commonPool-worker-2]
map: a1 [ForkJoinPool.commonPool-worker-2]
map: c2 [ForkJoinPool.commonPool-worker-3]
sort: A2 <> A1 [main]
sort: B1 <> A2 [main]
sort: C2 <> B1 [main]
sort: C1 <> C2 [main]
sort: C1 <> B1 [main]
sort: C1 <> C2 [main]
forEach: A1 [ForkJoinPool.commonPool-worker-1]
forEach: C2 [ForkJoinPool.commonPool-worker-3]
forEach: B1 [main]
forEach: A2 [ForkJoinPool.commonPool-worker-2]
forEach: C1 [ForkJoinPool.commonPool-worker-1]
似乎sort只在主线程上次序履行。实践上,sort在并行流上运用新的Java 8办法Arrays.parallelSort()。如Javadoc中所述,假如排序将按次序或并行履行,则此办法决定数组的长度:
假如指定数组的长度小于最小粒度,则运用适当的Arrays.sort办法对其进行排序。
回到reduce一节的比如。咱们现已发现组合器函数只是并行调用,而不是次序流调用。让咱们看看实践触及哪些线程:
List<Person> persons = Arrays.asList(
new Person("Max", 18),
new Person("Peter", 23),
new Person("Pamela", 23),
new Person("David", 12));
persons
.parallelStream()
.reduce(0,
(sum, p) -> {
System.out.format("accumulator: sum=%s; person=%s [%s]\n",
sum, p, Thread.currentThread().getName());
return sum += p.age;
},
(sum1, sum2) -> {
System.out.format("combiner: sum1=%s; sum2=%s [%s]\n",
sum1, sum2, Thread.currentThread().getName());
return sum1 + sum2;
});
控制台输出显现累加器和组合器函数在一切可用线程上并行履行:
accumulator: sum=0; person=Pamela; [main]
accumulator: sum=0; person=Max; [ForkJoinPool.commonPool-worker-3]
accumulator: sum=0; person=David; [ForkJoinPool.commonPool-worker-2]
accumulator: sum=0; person=Peter; [ForkJoinPool.commonPool-worker-1]
combiner: sum1=18; sum2=23; [ForkJoinPool.commonPool-worker-1]
combiner: sum1=23; sum2=12; [ForkJoinPool.commonPool-worker-2]
combiner: sum1=41; sum2=35; [ForkJoinPool.commonPool-worker-2]
总之,并行流能够为具有很多输入元素的流带来良好的性能提高。但请记住,某些并行流操作reduce
,collect
需求额外的核算(组合操作),这在次序履行时是不需求的。
此外,咱们了解到一切并行流操作同享相同的JVM范围ForkJoinPool。因此,您或许希望防止实施慢速堵塞流操作,由于这或许会减慢严峻依赖并行流的应用程序的其他部分。