-
Bug
-
Resolution: Not an Issue
-
P4
-
10, 11
-
x86_64
-
os_x
ADDITIONAL SYSTEM INFORMATION :
MacBook Pro (15-inch, 2017)
3.1 GHz Intel Core i7
16 GB 2133 MHz LPDDR3
$uname -a
Darwin XXXXX 16.7.0 Darwin Kernel Version 16.7.0: Thu Jun 21 20:07:39 PDT 2018; root:xnu-3789.73.14~1/RELEASE_X86_64 x86_64
$ java --version
java 10.0.2 2018-07-17
Java(TM) SE Runtime Environment 18.3 (build 10.0.2+13)
Java HotSpot(TM) 64-Bit Server VM 18.3 (build 10.0.2+13, mixed mode)
A DESCRIPTION OF THE PROBLEM :
Reentrant stack trace leads to deadlock in nested Stream.parallel operations using a concurrent hash map.
I am building a data storage library in which I would like to use Stream.parallel. The application which uses this library also uses Stream.parallel and concurrent data structures to manipulate data. The combination leads to a deadlock with some very strange stack traces where the compute method called in the outer loop appears to be reentrant.
STEPS TO FOLLOW TO REPRODUCE THE PROBLEM :
Run the attached source code in either of two parallel execution modes which fail. (line 24 & 25)
Three additional modes of execution succeed. (lines 28, 29 & 30)
EXPECTED VERSUS ACTUAL BEHAVIOR :
EXPECTED -
A thread's stack trace should never contain line 87 more than once. The code should not deadlock
ACTUAL -
Found one Java-level deadlock:
=============================
"outer pool1":
waiting to lock monitor 0x00007ff6ca8aba00 (object 0x00000006cfb8a370, a java.util.concurrent.ConcurrentHashMap$ReservationNode),
which is held by "outer pool2"
"outer pool2":
waiting to lock monitor 0x00007ff6ca085800 (object 0x00000006cfb30b58, a java.util.concurrent.ConcurrentHashMap$ReservationNode),
which is held by "outer pool1"
Java stack information for the threads listed above:
===================================================
"outer pool1":
at java.util.concurrent.ConcurrentHashMap.compute(java.base@10.0.2/ConcurrentHashMap.java:1938)
- waiting to lock <0x00000006cfb8a370> (a java.util.concurrent.ConcurrentHashMap$ReservationNode)
at com.upserve.NestedParallel.lambda$streamParallelOuterTask$4(NestedParallel.java:87)
at com.upserve.NestedParallel$$Lambda$6/354291670.accept(Unknown Source)
at java.util.stream.ForEachOps$ForEachOp$OfLong.accept(java.base@10.0.2/ForEachOps.java:225)
at java.util.Random$RandomLongsSpliterator.forEachRemaining(java.base@10.0.2/Random.java:1099)
at java.util.Spliterator$OfLong.forEachRemaining(java.base@10.0.2/Spliterator.java:763)
at java.util.stream.AbstractPipeline.copyInto(java.base@10.0.2/AbstractPipeline.java:484)
at java.util.stream.ForEachOps$ForEachTask.compute(java.base@10.0.2/ForEachOps.java:290)
at java.util.concurrent.CountedCompleter.exec(java.base@10.0.2/CountedCompleter.java:746)
at java.util.concurrent.ForkJoinTask.doExec(java.base@10.0.2/ForkJoinTask.java:290)
at java.util.concurrent.ForkJoinPool.awaitJoin(java.base@10.0.2/ForkJoinPool.java:1713)
at java.util.concurrent.ForkJoinTask.doJoin(java.base@10.0.2/ForkJoinTask.java:397)
at java.util.concurrent.ForkJoinTask.get(java.base@10.0.2/ForkJoinTask.java:1004)
at com.upserve.NestedParallel.streamParallelInnerTask(NestedParallel.java:172)
at com.upserve.NestedParallel.lambda$biFunction$9(NestedParallel.java:147)
at com.upserve.NestedParallel$$Lambda$12/1333040169.apply(Unknown Source)
at java.util.concurrent.ConcurrentHashMap.compute(java.base@10.0.2/ConcurrentHashMap.java:1961)
- locked <0x00000006cf9a8c60> (a java.util.concurrent.ConcurrentHashMap$Node)
at com.upserve.NestedParallel.lambda$streamParallelOuterTask$4(NestedParallel.java:87)
at com.upserve.NestedParallel$$Lambda$6/354291670.accept(Unknown Source)
at java.util.stream.ForEachOps$ForEachOp$OfLong.accept(java.base@10.0.2/ForEachOps.java:225)
at java.util.Random$RandomLongsSpliterator.forEachRemaining(java.base@10.0.2/Random.java:1099)
at java.util.Spliterator$OfLong.forEachRemaining(java.base@10.0.2/Spliterator.java:763)
at java.util.stream.AbstractPipeline.copyInto(java.base@10.0.2/AbstractPipeline.java:484)
at java.util.stream.ForEachOps$ForEachTask.compute(java.base@10.0.2/ForEachOps.java:290)
at java.util.concurrent.CountedCompleter.exec(java.base@10.0.2/CountedCompleter.java:746)
at java.util.concurrent.ForkJoinTask.doExec(java.base@10.0.2/ForkJoinTask.java:290)
at java.util.concurrent.ForkJoinPool.awaitJoin(java.base@10.0.2/ForkJoinPool.java:1713)
at java.util.concurrent.ForkJoinTask.doJoin(java.base@10.0.2/ForkJoinTask.java:397)
at java.util.concurrent.ForkJoinTask.get(java.base@10.0.2/ForkJoinTask.java:1004)
at com.upserve.NestedParallel.streamParallelInnerTask(NestedParallel.java:172)
at com.upserve.NestedParallel.lambda$biFunction$9(NestedParallel.java:147)
at com.upserve.NestedParallel$$Lambda$12/1333040169.apply(Unknown Source)
at java.util.concurrent.ConcurrentHashMap.compute(java.base@10.0.2/ConcurrentHashMap.java:1961)
- locked <0x00000006cf81c3b0> (a java.util.concurrent.ConcurrentHashMap$Node)
at com.upserve.NestedParallel.lambda$streamParallelOuterTask$4(NestedParallel.java:87)
at com.upserve.NestedParallel$$Lambda$6/354291670.accept(Unknown Source)
at java.util.stream.ForEachOps$ForEachOp$OfLong.accept(java.base@10.0.2/ForEachOps.java:225)
at java.util.Random$RandomLongsSpliterator.forEachRemaining(java.base@10.0.2/Random.java:1099)
at java.util.Spliterator$OfLong.forEachRemaining(java.base@10.0.2/Spliterator.java:763)
at java.util.stream.AbstractPipeline.copyInto(java.base@10.0.2/AbstractPipeline.java:484)
at java.util.stream.ForEachOps$ForEachTask.compute(java.base@10.0.2/ForEachOps.java:290)
at java.util.concurrent.CountedCompleter.exec(java.base@10.0.2/CountedCompleter.java:746)
at java.util.concurrent.ForkJoinTask.doExec(java.base@10.0.2/ForkJoinTask.java:290)
at java.util.concurrent.ForkJoinPool.awaitJoin(java.base@10.0.2/ForkJoinPool.java:1713)
at java.util.concurrent.ForkJoinTask.doJoin(java.base@10.0.2/ForkJoinTask.java:397)
at java.util.concurrent.ForkJoinTask.get(java.base@10.0.2/ForkJoinTask.java:1004)
at com.upserve.NestedParallel.streamParallelInnerTask(NestedParallel.java:172)
at com.upserve.NestedParallel.lambda$biFunction$9(NestedParallel.java:147)
at com.upserve.NestedParallel$$Lambda$12/1333040169.apply(Unknown Source)
at java.util.concurrent.ConcurrentHashMap.compute(java.base@10.0.2/ConcurrentHashMap.java:1961)
- locked <0x00000006cf81c1c0> (a java.util.concurrent.ConcurrentHashMap$Node)
at com.upserve.NestedParallel.lambda$streamParallelOuterTask$4(NestedParallel.java:87)
at com.upserve.NestedParallel$$Lambda$6/354291670.accept(Unknown Source)
at java.util.stream.ForEachOps$ForEachOp$OfLong.accept(java.base@10.0.2/ForEachOps.java:225)
at java.util.Random$RandomLongsSpliterator.forEachRemaining(java.base@10.0.2/Random.java:1099)
at java.util.Spliterator$OfLong.forEachRemaining(java.base@10.0.2/Spliterator.java:763)
at java.util.stream.AbstractPipeline.copyInto(java.base@10.0.2/AbstractPipeline.java:484)
at java.util.stream.ForEachOps$ForEachTask.compute(java.base@10.0.2/ForEachOps.java:290)
at java.util.concurrent.CountedCompleter.exec(java.base@10.0.2/CountedCompleter.java:746)
at java.util.concurrent.ForkJoinTask.doExec(java.base@10.0.2/ForkJoinTask.java:290)
at java.util.concurrent.ForkJoinPool.awaitJoin(java.base@10.0.2/ForkJoinPool.java:1713)
at java.util.concurrent.ForkJoinTask.doJoin(java.base@10.0.2/ForkJoinTask.java:397)
at java.util.concurrent.ForkJoinTask.get(java.base@10.0.2/ForkJoinTask.java:1004)
at com.upserve.NestedParallel.streamParallelInnerTask(NestedParallel.java:172)
at com.upserve.NestedParallel.lambda$biFunction$9(NestedParallel.java:147)
at com.upserve.NestedParallel$$Lambda$12/1333040169.apply(Unknown Source)
at java.util.concurrent.ConcurrentHashMap.compute(java.base@10.0.2/ConcurrentHashMap.java:1922)
- locked <0x00000006cfb31d80> (a java.util.concurrent.ConcurrentHashMap$ReservationNode)
at com.upserve.NestedParallel.lambda$streamParallelOuterTask$4(NestedParallel.java:87)
at com.upserve.NestedParallel$$Lambda$6/354291670.accept(Unknown Source)
at java.util.stream.ForEachOps$ForEachOp$OfLong.accept(java.base@10.0.2/ForEachOps.java:225)
at java.util.Random$RandomLongsSpliterator.forEachRemaining(java.base@10.0.2/Random.java:1099)
at java.util.Spliterator$OfLong.forEachRemaining(java.base@10.0.2/Spliterator.java:763)
at java.util.stream.AbstractPipeline.copyInto(java.base@10.0.2/AbstractPipeline.java:484)
at java.util.stream.ForEachOps$ForEachTask.compute(java.base@10.0.2/ForEachOps.java:290)
at java.util.concurrent.CountedCompleter.exec(java.base@10.0.2/CountedCompleter.java:746)
at java.util.concurrent.ForkJoinTask.doExec(java.base@10.0.2/ForkJoinTask.java:290)
at java.util.concurrent.ForkJoinPool.awaitJoin(java.base@10.0.2/ForkJoinPool.java:1713)
at java.util.concurrent.ForkJoinTask.doJoin(java.base@10.0.2/ForkJoinTask.java:397)
at java.util.concurrent.ForkJoinTask.get(java.base@10.0.2/ForkJoinTask.java:1004)
at com.upserve.NestedParallel.streamParallelInnerTask(NestedParallel.java:172)
at com.upserve.NestedParallel.lambda$biFunction$9(NestedParallel.java:147)
at com.upserve.NestedParallel$$Lambda$12/1333040169.apply(Unknown Source)
at java.util.concurrent.ConcurrentHashMap.compute(java.base@10.0.2/ConcurrentHashMap.java:1961)
- locked <0x00000006cfa26d78> (a java.util.concurrent.ConcurrentHashMap$Node)
at com.upserve.NestedParallel.lambda$streamParallelOuterTask$4(NestedParallel.java:87)
at com.upserve.NestedParallel$$Lambda$6/354291670.accept(Unknown Source)
at java.util.stream.ForEachOps$ForEachOp$OfLong.accept(java.base@10.0.2/ForEachOps.java:225)
at java.util.Random$RandomLongsSpliterator.forEachRemaining(java.base@10.0.2/Random.java:1099)
at java.util.Spliterator$OfLong.forEachRemaining(java.base@10.0.2/Spliterator.java:763)
at java.util.stream.AbstractPipeline.copyInto(java.base@10.0.2/AbstractPipeline.java:484)
at java.util.stream.ForEachOps$ForEachTask.compute(java.base@10.0.2/ForEachOps.java:290)
at java.util.concurrent.CountedCompleter.exec(java.base@10.0.2/CountedCompleter.java:746)
at java.util.concurrent.ForkJoinTask.doExec(java.base@10.0.2/ForkJoinTask.java:290)
at java.util.concurrent.ForkJoinPool.awaitJoin(java.base@10.0.2/ForkJoinPool.java:1713)
at java.util.concurrent.ForkJoinTask.doJoin(java.base@10.0.2/ForkJoinTask.java:397)
at java.util.concurrent.ForkJoinTask.get(java.base@10.0.2/ForkJoinTask.java:1004)
at com.upserve.NestedParallel.streamParallelInnerTask(NestedParallel.java:172)
at com.upserve.NestedParallel.lambda$biFunction$9(NestedParallel.java:147)
at com.upserve.NestedParallel$$Lambda$12/1333040169.apply(Unknown Source)
at java.util.concurrent.ConcurrentHashMap.compute(java.base@10.0.2/ConcurrentHashMap.java:1922)
- locked <0x00000006cfb30b58> (a java.util.concurrent.ConcurrentHashMap$ReservationNode)
at com.upserve.NestedParallel.lambda$streamParallelOuterTask$4(NestedParallel.java:87)
at com.upserve.NestedParallel$$Lambda$6/354291670.accept(Unknown Source)
at java.util.stream.ForEachOps$ForEachOp$OfLong.accept(java.base@10.0.2/ForEachOps.java:225)
at java.util.Random$RandomLongsSpliterator.forEachRemaining(java.base@10.0.2/Random.java:1099)
at java.util.Spliterator$OfLong.forEachRemaining(java.base@10.0.2/Spliterator.java:763)
at java.util.stream.AbstractPipeline.copyInto(java.base@10.0.2/AbstractPipeline.java:484)
at java.util.stream.ForEachOps$ForEachTask.compute(java.base@10.0.2/ForEachOps.java:290)
at java.util.concurrent.CountedCompleter.exec(java.base@10.0.2/CountedCompleter.java:746)
at java.util.concurrent.ForkJoinTask.doExec(java.base@10.0.2/ForkJoinTask.java:290)
at java.util.concurrent.ForkJoinPool.awaitJoin(java.base@10.0.2/ForkJoinPool.java:1713)
at java.util.concurrent.ForkJoinTask.doJoin(java.base@10.0.2/ForkJoinTask.java:397)
at java.util.concurrent.ForkJoinTask.get(java.base@10.0.2/ForkJoinTask.java:1004)
at com.upserve.NestedParallel.streamParallelInnerTask(NestedParallel.java:172)
at com.upserve.NestedParallel.lambda$biFunction$9(NestedParallel.java:147)
at com.upserve.NestedParallel$$Lambda$12/1333040169.apply(Unknown Source)
at java.util.concurrent.ConcurrentHashMap.compute(java.base@10.0.2/ConcurrentHashMap.java:1922)
- locked <0x00000006cfb195f0> (a java.util.concurrent.ConcurrentHashMap$ReservationNode)
at com.upserve.NestedParallel.lambda$streamParallelOuterTask$4(NestedParallel.java:87)
at com.upserve.NestedParallel$$Lambda$6/354291670.accept(Unknown Source)
at java.util.stream.ForEachOps$ForEachOp$OfLong.accept(java.base@10.0.2/ForEachOps.java:225)
at java.util.Random$RandomLongsSpliterator.forEachRemaining(java.base@10.0.2/Random.java:1099)
at java.util.Spliterator$OfLong.forEachRemaining(java.base@10.0.2/Spliterator.java:763)
at java.util.stream.AbstractPipeline.copyInto(java.base@10.0.2/AbstractPipeline.java:484)
at java.util.stream.ForEachOps$ForEachTask.compute(java.base@10.0.2/ForEachOps.java:290)
at java.util.concurrent.CountedCompleter.exec(java.base@10.0.2/CountedCompleter.java:746)
at java.util.concurrent.ForkJoinTask.doExec(java.base@10.0.2/ForkJoinTask.java:290)
at java.util.concurrent.ForkJoinTask.doInvoke(java.base@10.0.2/ForkJoinTask.java:408)
at java.util.concurrent.ForkJoinTask.invoke(java.base@10.0.2/ForkJoinTask.java:736)
at java.util.stream.ForEachOps$ForEachOp.evaluateParallel(java.base@10.0.2/ForEachOps.java:159)
at java.util.stream.ForEachOps$ForEachOp$OfLong.evaluateParallel(java.base@10.0.2/ForEachOps.java:209)
at java.util.stream.AbstractPipeline.evaluate(java.base@10.0.2/AbstractPipeline.java:233)
at java.util.stream.LongPipeline.forEach(java.base@10.0.2/LongPipeline.java:421)
at java.util.stream.LongPipeline$Head.forEach(java.base@10.0.2/LongPipeline.java:579)
at com.upserve.NestedParallel.lambda$streamParallelOuterTask$5(NestedParallel.java:84)
at com.upserve.NestedParallel$$Lambda$5/2101440631.run(Unknown Source)
at java.util.concurrent.ForkJoinTask$AdaptedRunnableAction.exec(java.base@10.0.2/ForkJoinTask.java:1407)
at java.util.concurrent.ForkJoinTask.doExec(java.base@10.0.2/ForkJoinTask.java:290)
at java.util.concurrent.ForkJoinPool.runWorker(java.base@10.0.2/ForkJoinPool.java:1603)
at java.util.concurrent.ForkJoinWorkerThread.run(java.base@10.0.2/ForkJoinWorkerThread.java:177)
"outer pool2":
at java.util.concurrent.ConcurrentHashMap.compute(java.base@10.0.2/ConcurrentHashMap.java:1938)
- waiting to lock <0x00000006cfb30b58> (a java.util.concurrent.ConcurrentHashMap$ReservationNode)
at com.upserve.NestedParallel.lambda$streamParallelOuterTask$4(NestedParallel.java:87)
at com.upserve.NestedParallel$$Lambda$6/354291670.accept(Unknown Source)
at java.util.stream.ForEachOps$ForEachOp$OfLong.accept(java.base@10.0.2/ForEachOps.java:225)
at java.util.Random$RandomLongsSpliterator.forEachRemaining(java.base@10.0.2/Random.java:1099)
at java.util.Spliterator$OfLong.forEachRemaining(java.base@10.0.2/Spliterator.java:763)
at java.util.stream.AbstractPipeline.copyInto(java.base@10.0.2/AbstractPipeline.java:484)
at java.util.stream.ForEachOps$ForEachTask.compute(java.base@10.0.2/ForEachOps.java:290)
at java.util.concurrent.CountedCompleter.exec(java.base@10.0.2/CountedCompleter.java:746)
at java.util.concurrent.ForkJoinTask.doExec(java.base@10.0.2/ForkJoinTask.java:290)
at java.util.concurrent.ForkJoinPool.awaitJoin(java.base@10.0.2/ForkJoinPool.java:1713)
at java.util.concurrent.ForkJoinTask.doJoin(java.base@10.0.2/ForkJoinTask.java:397)
at java.util.concurrent.ForkJoinTask.get(java.base@10.0.2/ForkJoinTask.java:1004)
at com.upserve.NestedParallel.streamParallelInnerTask(NestedParallel.java:172)
at com.upserve.NestedParallel.lambda$biFunction$9(NestedParallel.java:147)
at com.upserve.NestedParallel$$Lambda$12/1333040169.apply(Unknown Source)
at java.util.concurrent.ConcurrentHashMap.compute(java.base@10.0.2/ConcurrentHashMap.java:1922)
- locked <0x00000006cfb8a370> (a java.util.concurrent.ConcurrentHashMap$ReservationNode)
at com.upserve.NestedParallel.lambda$streamParallelOuterTask$4(NestedParallel.java:87)
at com.upserve.NestedParallel$$Lambda$6/354291670.accept(Unknown Source)
at java.util.stream.ForEachOps$ForEachOp$OfLong.accept(java.base@10.0.2/ForEachOps.java:225)
at java.util.Random$RandomLongsSpliterator.forEachRemaining(java.base@10.0.2/Random.java:1099)
at java.util.Spliterator$OfLong.forEachRemaining(java.base@10.0.2/Spliterator.java:763)
at java.util.stream.AbstractPipeline.copyInto(java.base@10.0.2/AbstractPipeline.java:484)
at java.util.stream.ForEachOps$ForEachTask.compute(java.base@10.0.2/ForEachOps.java:290)
at java.util.concurrent.CountedCompleter.exec(java.base@10.0.2/CountedCompleter.java:746)
at java.util.concurrent.ForkJoinTask.doExec(java.base@10.0.2/ForkJoinTask.java:290)
at java.util.concurrent.ForkJoinPool.runWorker(java.base@10.0.2/ForkJoinPool.java:1603)
at java.util.concurrent.ForkJoinWorkerThread.run(java.base@10.0.2/ForkJoinWorkerThread.java:177)
Found 1 deadlock.
---------- BEGIN SOURCE ----------
package com.upserve;
import java.util.*;
import java.util.concurrent.*;
import java.util.concurrent.atomic.*;
import java.util.function.*;
import java.util.stream.*;
public class NestedParallel implements Runnable {
static final String STREAM_PARALLEL = "STREAM PARALLEL";
static final String LOOP_PARALLEL = "LOOP PARALLEL";
static final String SERIAL = "SERIAL";
final ConcurrentMap<String, Long> concurrentMap;
final ExecutorService outerPool;
final ExecutorService innerPool;
final Random random;
final String innerLoop;
final String outerLoop;
public static void main(String[] args){
// // DEADLOCKS
NestedParallel nestedParallel = new NestedParallel(STREAM_PARALLEL, STREAM_PARALLEL);
// NestedParallel nestedParallel = new NestedParallel(STREAM_PARALLEL, LOOP_PARALLEL);
// // SUCCESSS
// NestedParallel nestedParallel = new NestedParallel(STREAM_PARALLEL, SERIAL);
// NestedParallel nestedParallel = new NestedParallel(LOOP_PARALLEL, STREAM_PARALLEL);
// NestedParallel nestedParallel = new NestedParallel(SERIAL, LOOP_PARALLEL); // SLOW!
System.out.println("starting");
nestedParallel.run();
System.out.println("finished");
}
public NestedParallel(String outerLoop, String innerLoop) {
concurrentMap = new ConcurrentHashMap<>();
this.innerLoop = innerLoop;
this.outerLoop = outerLoop;
Function<String, ForkJoinPool.ForkJoinWorkerThreadFactory> threadFactoryFunction = name -> pool ->
{
final ForkJoinWorkerThread worker = ForkJoinPool.defaultForkJoinWorkerThreadFactory.newThread(pool);
worker.setName(name + worker.getPoolIndex());
return worker;
};
Function<String, ForkJoinPool> forkJoinPoolFunction = name -> new ForkJoinPool(
Runtime.getRuntime().availableProcessors(),
threadFactoryFunction.apply(name),
(t, e) -> {
System.out.println("In pool " + name + ", thread " + t + " threw exception: " + e);
},
true
);
outerPool = forkJoinPoolFunction.apply("outer pool");
innerPool = forkJoinPoolFunction.apply("inner pool");
random = new Random();
}
@Override
public void run() {
if (STREAM_PARALLEL.equals(outerLoop)) {
streamParallelOuterTask();
} else if (LOOP_PARALLEL.equals(outerLoop)) {
loopParallelOuterTask();
} else if (SERIAL.equals(outerLoop)) {
serialOuterTask();
} else {
throw new RuntimeException("Invalid command: " + outerLoop);
}
}
private void streamParallelOuterTask(){
Future future = outerPool.submit(() -> {
random
.longs(1_000_000, 0, 1_000)
.parallel()
.forEach(val -> {
if (val % 1017 == 0) System.out.println("Outer Task Thread:" + Thread.currentThread().getName());
String key = String.valueOf(val);
concurrentMap.compute(key, biFunction(val));
}
);
});
try {
future.get();
} catch (InterruptedException e) {
throw new RuntimeException("Interrupted " + Thread.currentThread().getName(), e);
} catch (ExecutionException e) {
throw new RuntimeException("Failed " + Thread.currentThread().getName(), e);
}
}
private void loopParallelOuterTask(){
List<Long> vals = random.longs(1_000_000, 0, 1_000).boxed().collect(Collectors.toList());
List<Future> futures = new ArrayList<>();
for(Long val: vals) {
futures.add(outerPool.submit(() -> {
if (val % 1000 == 0) System.out.println("Outer Task Thread:" + Thread.currentThread().getName());
String key = String.valueOf(val);
concurrentMap.compute(key, biFunction(val));
}));
}
futures.forEach(f -> {
try {
f.get();
} catch (InterruptedException e) {
throw new RuntimeException("Interrupted " + Thread.currentThread().getName(), e);
} catch (ExecutionException e) {
throw new RuntimeException("Failed " + Thread.currentThread().getName(), e);
}
});
}
private void serialOuterTask() {
random
.longs(1_000_000, 0, 1_000)
.forEach(val -> {
if (val % 1000 == 0) System.out.println("Outer Task Thread:" + Thread.currentThread().getName());
String key = String.valueOf(val);
concurrentMap.compute(key, biFunction(val));
}
);
}
private BiFunction<String, Long, Long> biFunction(long val) {
return (k, v) -> {
if (v == null) {
v = val;
} else {
v += val;
}
final Long vMod = v % 10_000;
// Do some naive task
if (STREAM_PARALLEL.equals(innerLoop)) {
return streamParallelInnerTask(vMod);
} else if (LOOP_PARALLEL.equals(innerLoop)) {
return loopParallelInnerTask(vMod);
} else if (SERIAL.equals(innerLoop)) {
return serialInnerTask(vMod);
} else {
throw new RuntimeException("Invalid command: " + innerLoop);
}
};
}
private Long streamParallelInnerTask(long vMod) {
LongAdder longAdder = new LongAdder();
// If outerPool is used again, no deadlock but process will livelock after a while
Future future = innerPool.submit(() -> {
LongStream.range(1, vMod)
.parallel()
.forEach(value -> {
// System.out.println("Inner Task Thread: " +Thread.currentThread().getName());
longAdder.add(value);
});
});
try {
future.get();
} catch (InterruptedException e) {
throw new RuntimeException("Interrupted " + Thread.currentThread().getName(), e);
} catch (ExecutionException e) {
throw new RuntimeException("Failed " + Thread.currentThread().getName(), e);
}
return longAdder.longValue();
}
private Long loopParallelInnerTask(long vMod) {
LongAdder longAdder = new LongAdder();
List<Future> futures = new ArrayList<>();
for (long i=0; i<vMod; i++) {
final Long innerVal = i;
// Can not use outerPool again here as it causes livelock with all thread waiting
futures.add(innerPool.submit(() -> {
// System.out.println("Inner Task Thread: " +Thread.currentThread().getName());
longAdder.add(innerVal);
}));
}
futures.forEach(f -> {
try {
f.get();
} catch (InterruptedException e) {
throw new RuntimeException("Interrupted " + Thread.currentThread().getName(), e);
} catch (ExecutionException e) {
throw new RuntimeException("Failed " + Thread.currentThread().getName(), e);
}
});
return longAdder.longValue();
}
private Long serialInnerTask(long vMod) {
LongAdder longAdder = new LongAdder();
LongStream.range(1, vMod)
.forEach(value -> {
// System.out.println("Inner Task Thread: " +Thread.currentThread().getName());
longAdder.add(value);
});
return longAdder.longValue();
}
}
---------- END SOURCE ----------
CUSTOMER SUBMITTED WORKAROUND :
Beware of Stream.parallel and where it may be used as an internal concern in a library.
Use other methods to parallelize high levels of an application that use Stream.parallel internally.
FREQUENCY : always
MacBook Pro (15-inch, 2017)
3.1 GHz Intel Core i7
16 GB 2133 MHz LPDDR3
$uname -a
Darwin XXXXX 16.7.0 Darwin Kernel Version 16.7.0: Thu Jun 21 20:07:39 PDT 2018; root:xnu-3789.73.14~1/RELEASE_X86_64 x86_64
$ java --version
java 10.0.2 2018-07-17
Java(TM) SE Runtime Environment 18.3 (build 10.0.2+13)
Java HotSpot(TM) 64-Bit Server VM 18.3 (build 10.0.2+13, mixed mode)
A DESCRIPTION OF THE PROBLEM :
Reentrant stack trace leads to deadlock in nested Stream.parallel operations using a concurrent hash map.
I am building a data storage library in which I would like to use Stream.parallel. The application which uses this library also uses Stream.parallel and concurrent data structures to manipulate data. The combination leads to a deadlock with some very strange stack traces where the compute method called in the outer loop appears to be reentrant.
STEPS TO FOLLOW TO REPRODUCE THE PROBLEM :
Run the attached source code in either of two parallel execution modes which fail. (line 24 & 25)
Three additional modes of execution succeed. (lines 28, 29 & 30)
EXPECTED VERSUS ACTUAL BEHAVIOR :
EXPECTED -
A thread's stack trace should never contain line 87 more than once. The code should not deadlock
ACTUAL -
Found one Java-level deadlock:
=============================
"outer pool1":
waiting to lock monitor 0x00007ff6ca8aba00 (object 0x00000006cfb8a370, a java.util.concurrent.ConcurrentHashMap$ReservationNode),
which is held by "outer pool2"
"outer pool2":
waiting to lock monitor 0x00007ff6ca085800 (object 0x00000006cfb30b58, a java.util.concurrent.ConcurrentHashMap$ReservationNode),
which is held by "outer pool1"
Java stack information for the threads listed above:
===================================================
"outer pool1":
at java.util.concurrent.ConcurrentHashMap.compute(java.base@10.0.2/ConcurrentHashMap.java:1938)
- waiting to lock <0x00000006cfb8a370> (a java.util.concurrent.ConcurrentHashMap$ReservationNode)
at com.upserve.NestedParallel.lambda$streamParallelOuterTask$4(NestedParallel.java:87)
at com.upserve.NestedParallel$$Lambda$6/354291670.accept(Unknown Source)
at java.util.stream.ForEachOps$ForEachOp$OfLong.accept(java.base@10.0.2/ForEachOps.java:225)
at java.util.Random$RandomLongsSpliterator.forEachRemaining(java.base@10.0.2/Random.java:1099)
at java.util.Spliterator$OfLong.forEachRemaining(java.base@10.0.2/Spliterator.java:763)
at java.util.stream.AbstractPipeline.copyInto(java.base@10.0.2/AbstractPipeline.java:484)
at java.util.stream.ForEachOps$ForEachTask.compute(java.base@10.0.2/ForEachOps.java:290)
at java.util.concurrent.CountedCompleter.exec(java.base@10.0.2/CountedCompleter.java:746)
at java.util.concurrent.ForkJoinTask.doExec(java.base@10.0.2/ForkJoinTask.java:290)
at java.util.concurrent.ForkJoinPool.awaitJoin(java.base@10.0.2/ForkJoinPool.java:1713)
at java.util.concurrent.ForkJoinTask.doJoin(java.base@10.0.2/ForkJoinTask.java:397)
at java.util.concurrent.ForkJoinTask.get(java.base@10.0.2/ForkJoinTask.java:1004)
at com.upserve.NestedParallel.streamParallelInnerTask(NestedParallel.java:172)
at com.upserve.NestedParallel.lambda$biFunction$9(NestedParallel.java:147)
at com.upserve.NestedParallel$$Lambda$12/1333040169.apply(Unknown Source)
at java.util.concurrent.ConcurrentHashMap.compute(java.base@10.0.2/ConcurrentHashMap.java:1961)
- locked <0x00000006cf9a8c60> (a java.util.concurrent.ConcurrentHashMap$Node)
at com.upserve.NestedParallel.lambda$streamParallelOuterTask$4(NestedParallel.java:87)
at com.upserve.NestedParallel$$Lambda$6/354291670.accept(Unknown Source)
at java.util.stream.ForEachOps$ForEachOp$OfLong.accept(java.base@10.0.2/ForEachOps.java:225)
at java.util.Random$RandomLongsSpliterator.forEachRemaining(java.base@10.0.2/Random.java:1099)
at java.util.Spliterator$OfLong.forEachRemaining(java.base@10.0.2/Spliterator.java:763)
at java.util.stream.AbstractPipeline.copyInto(java.base@10.0.2/AbstractPipeline.java:484)
at java.util.stream.ForEachOps$ForEachTask.compute(java.base@10.0.2/ForEachOps.java:290)
at java.util.concurrent.CountedCompleter.exec(java.base@10.0.2/CountedCompleter.java:746)
at java.util.concurrent.ForkJoinTask.doExec(java.base@10.0.2/ForkJoinTask.java:290)
at java.util.concurrent.ForkJoinPool.awaitJoin(java.base@10.0.2/ForkJoinPool.java:1713)
at java.util.concurrent.ForkJoinTask.doJoin(java.base@10.0.2/ForkJoinTask.java:397)
at java.util.concurrent.ForkJoinTask.get(java.base@10.0.2/ForkJoinTask.java:1004)
at com.upserve.NestedParallel.streamParallelInnerTask(NestedParallel.java:172)
at com.upserve.NestedParallel.lambda$biFunction$9(NestedParallel.java:147)
at com.upserve.NestedParallel$$Lambda$12/1333040169.apply(Unknown Source)
at java.util.concurrent.ConcurrentHashMap.compute(java.base@10.0.2/ConcurrentHashMap.java:1961)
- locked <0x00000006cf81c3b0> (a java.util.concurrent.ConcurrentHashMap$Node)
at com.upserve.NestedParallel.lambda$streamParallelOuterTask$4(NestedParallel.java:87)
at com.upserve.NestedParallel$$Lambda$6/354291670.accept(Unknown Source)
at java.util.stream.ForEachOps$ForEachOp$OfLong.accept(java.base@10.0.2/ForEachOps.java:225)
at java.util.Random$RandomLongsSpliterator.forEachRemaining(java.base@10.0.2/Random.java:1099)
at java.util.Spliterator$OfLong.forEachRemaining(java.base@10.0.2/Spliterator.java:763)
at java.util.stream.AbstractPipeline.copyInto(java.base@10.0.2/AbstractPipeline.java:484)
at java.util.stream.ForEachOps$ForEachTask.compute(java.base@10.0.2/ForEachOps.java:290)
at java.util.concurrent.CountedCompleter.exec(java.base@10.0.2/CountedCompleter.java:746)
at java.util.concurrent.ForkJoinTask.doExec(java.base@10.0.2/ForkJoinTask.java:290)
at java.util.concurrent.ForkJoinPool.awaitJoin(java.base@10.0.2/ForkJoinPool.java:1713)
at java.util.concurrent.ForkJoinTask.doJoin(java.base@10.0.2/ForkJoinTask.java:397)
at java.util.concurrent.ForkJoinTask.get(java.base@10.0.2/ForkJoinTask.java:1004)
at com.upserve.NestedParallel.streamParallelInnerTask(NestedParallel.java:172)
at com.upserve.NestedParallel.lambda$biFunction$9(NestedParallel.java:147)
at com.upserve.NestedParallel$$Lambda$12/1333040169.apply(Unknown Source)
at java.util.concurrent.ConcurrentHashMap.compute(java.base@10.0.2/ConcurrentHashMap.java:1961)
- locked <0x00000006cf81c1c0> (a java.util.concurrent.ConcurrentHashMap$Node)
at com.upserve.NestedParallel.lambda$streamParallelOuterTask$4(NestedParallel.java:87)
at com.upserve.NestedParallel$$Lambda$6/354291670.accept(Unknown Source)
at java.util.stream.ForEachOps$ForEachOp$OfLong.accept(java.base@10.0.2/ForEachOps.java:225)
at java.util.Random$RandomLongsSpliterator.forEachRemaining(java.base@10.0.2/Random.java:1099)
at java.util.Spliterator$OfLong.forEachRemaining(java.base@10.0.2/Spliterator.java:763)
at java.util.stream.AbstractPipeline.copyInto(java.base@10.0.2/AbstractPipeline.java:484)
at java.util.stream.ForEachOps$ForEachTask.compute(java.base@10.0.2/ForEachOps.java:290)
at java.util.concurrent.CountedCompleter.exec(java.base@10.0.2/CountedCompleter.java:746)
at java.util.concurrent.ForkJoinTask.doExec(java.base@10.0.2/ForkJoinTask.java:290)
at java.util.concurrent.ForkJoinPool.awaitJoin(java.base@10.0.2/ForkJoinPool.java:1713)
at java.util.concurrent.ForkJoinTask.doJoin(java.base@10.0.2/ForkJoinTask.java:397)
at java.util.concurrent.ForkJoinTask.get(java.base@10.0.2/ForkJoinTask.java:1004)
at com.upserve.NestedParallel.streamParallelInnerTask(NestedParallel.java:172)
at com.upserve.NestedParallel.lambda$biFunction$9(NestedParallel.java:147)
at com.upserve.NestedParallel$$Lambda$12/1333040169.apply(Unknown Source)
at java.util.concurrent.ConcurrentHashMap.compute(java.base@10.0.2/ConcurrentHashMap.java:1922)
- locked <0x00000006cfb31d80> (a java.util.concurrent.ConcurrentHashMap$ReservationNode)
at com.upserve.NestedParallel.lambda$streamParallelOuterTask$4(NestedParallel.java:87)
at com.upserve.NestedParallel$$Lambda$6/354291670.accept(Unknown Source)
at java.util.stream.ForEachOps$ForEachOp$OfLong.accept(java.base@10.0.2/ForEachOps.java:225)
at java.util.Random$RandomLongsSpliterator.forEachRemaining(java.base@10.0.2/Random.java:1099)
at java.util.Spliterator$OfLong.forEachRemaining(java.base@10.0.2/Spliterator.java:763)
at java.util.stream.AbstractPipeline.copyInto(java.base@10.0.2/AbstractPipeline.java:484)
at java.util.stream.ForEachOps$ForEachTask.compute(java.base@10.0.2/ForEachOps.java:290)
at java.util.concurrent.CountedCompleter.exec(java.base@10.0.2/CountedCompleter.java:746)
at java.util.concurrent.ForkJoinTask.doExec(java.base@10.0.2/ForkJoinTask.java:290)
at java.util.concurrent.ForkJoinPool.awaitJoin(java.base@10.0.2/ForkJoinPool.java:1713)
at java.util.concurrent.ForkJoinTask.doJoin(java.base@10.0.2/ForkJoinTask.java:397)
at java.util.concurrent.ForkJoinTask.get(java.base@10.0.2/ForkJoinTask.java:1004)
at com.upserve.NestedParallel.streamParallelInnerTask(NestedParallel.java:172)
at com.upserve.NestedParallel.lambda$biFunction$9(NestedParallel.java:147)
at com.upserve.NestedParallel$$Lambda$12/1333040169.apply(Unknown Source)
at java.util.concurrent.ConcurrentHashMap.compute(java.base@10.0.2/ConcurrentHashMap.java:1961)
- locked <0x00000006cfa26d78> (a java.util.concurrent.ConcurrentHashMap$Node)
at com.upserve.NestedParallel.lambda$streamParallelOuterTask$4(NestedParallel.java:87)
at com.upserve.NestedParallel$$Lambda$6/354291670.accept(Unknown Source)
at java.util.stream.ForEachOps$ForEachOp$OfLong.accept(java.base@10.0.2/ForEachOps.java:225)
at java.util.Random$RandomLongsSpliterator.forEachRemaining(java.base@10.0.2/Random.java:1099)
at java.util.Spliterator$OfLong.forEachRemaining(java.base@10.0.2/Spliterator.java:763)
at java.util.stream.AbstractPipeline.copyInto(java.base@10.0.2/AbstractPipeline.java:484)
at java.util.stream.ForEachOps$ForEachTask.compute(java.base@10.0.2/ForEachOps.java:290)
at java.util.concurrent.CountedCompleter.exec(java.base@10.0.2/CountedCompleter.java:746)
at java.util.concurrent.ForkJoinTask.doExec(java.base@10.0.2/ForkJoinTask.java:290)
at java.util.concurrent.ForkJoinPool.awaitJoin(java.base@10.0.2/ForkJoinPool.java:1713)
at java.util.concurrent.ForkJoinTask.doJoin(java.base@10.0.2/ForkJoinTask.java:397)
at java.util.concurrent.ForkJoinTask.get(java.base@10.0.2/ForkJoinTask.java:1004)
at com.upserve.NestedParallel.streamParallelInnerTask(NestedParallel.java:172)
at com.upserve.NestedParallel.lambda$biFunction$9(NestedParallel.java:147)
at com.upserve.NestedParallel$$Lambda$12/1333040169.apply(Unknown Source)
at java.util.concurrent.ConcurrentHashMap.compute(java.base@10.0.2/ConcurrentHashMap.java:1922)
- locked <0x00000006cfb30b58> (a java.util.concurrent.ConcurrentHashMap$ReservationNode)
at com.upserve.NestedParallel.lambda$streamParallelOuterTask$4(NestedParallel.java:87)
at com.upserve.NestedParallel$$Lambda$6/354291670.accept(Unknown Source)
at java.util.stream.ForEachOps$ForEachOp$OfLong.accept(java.base@10.0.2/ForEachOps.java:225)
at java.util.Random$RandomLongsSpliterator.forEachRemaining(java.base@10.0.2/Random.java:1099)
at java.util.Spliterator$OfLong.forEachRemaining(java.base@10.0.2/Spliterator.java:763)
at java.util.stream.AbstractPipeline.copyInto(java.base@10.0.2/AbstractPipeline.java:484)
at java.util.stream.ForEachOps$ForEachTask.compute(java.base@10.0.2/ForEachOps.java:290)
at java.util.concurrent.CountedCompleter.exec(java.base@10.0.2/CountedCompleter.java:746)
at java.util.concurrent.ForkJoinTask.doExec(java.base@10.0.2/ForkJoinTask.java:290)
at java.util.concurrent.ForkJoinPool.awaitJoin(java.base@10.0.2/ForkJoinPool.java:1713)
at java.util.concurrent.ForkJoinTask.doJoin(java.base@10.0.2/ForkJoinTask.java:397)
at java.util.concurrent.ForkJoinTask.get(java.base@10.0.2/ForkJoinTask.java:1004)
at com.upserve.NestedParallel.streamParallelInnerTask(NestedParallel.java:172)
at com.upserve.NestedParallel.lambda$biFunction$9(NestedParallel.java:147)
at com.upserve.NestedParallel$$Lambda$12/1333040169.apply(Unknown Source)
at java.util.concurrent.ConcurrentHashMap.compute(java.base@10.0.2/ConcurrentHashMap.java:1922)
- locked <0x00000006cfb195f0> (a java.util.concurrent.ConcurrentHashMap$ReservationNode)
at com.upserve.NestedParallel.lambda$streamParallelOuterTask$4(NestedParallel.java:87)
at com.upserve.NestedParallel$$Lambda$6/354291670.accept(Unknown Source)
at java.util.stream.ForEachOps$ForEachOp$OfLong.accept(java.base@10.0.2/ForEachOps.java:225)
at java.util.Random$RandomLongsSpliterator.forEachRemaining(java.base@10.0.2/Random.java:1099)
at java.util.Spliterator$OfLong.forEachRemaining(java.base@10.0.2/Spliterator.java:763)
at java.util.stream.AbstractPipeline.copyInto(java.base@10.0.2/AbstractPipeline.java:484)
at java.util.stream.ForEachOps$ForEachTask.compute(java.base@10.0.2/ForEachOps.java:290)
at java.util.concurrent.CountedCompleter.exec(java.base@10.0.2/CountedCompleter.java:746)
at java.util.concurrent.ForkJoinTask.doExec(java.base@10.0.2/ForkJoinTask.java:290)
at java.util.concurrent.ForkJoinTask.doInvoke(java.base@10.0.2/ForkJoinTask.java:408)
at java.util.concurrent.ForkJoinTask.invoke(java.base@10.0.2/ForkJoinTask.java:736)
at java.util.stream.ForEachOps$ForEachOp.evaluateParallel(java.base@10.0.2/ForEachOps.java:159)
at java.util.stream.ForEachOps$ForEachOp$OfLong.evaluateParallel(java.base@10.0.2/ForEachOps.java:209)
at java.util.stream.AbstractPipeline.evaluate(java.base@10.0.2/AbstractPipeline.java:233)
at java.util.stream.LongPipeline.forEach(java.base@10.0.2/LongPipeline.java:421)
at java.util.stream.LongPipeline$Head.forEach(java.base@10.0.2/LongPipeline.java:579)
at com.upserve.NestedParallel.lambda$streamParallelOuterTask$5(NestedParallel.java:84)
at com.upserve.NestedParallel$$Lambda$5/2101440631.run(Unknown Source)
at java.util.concurrent.ForkJoinTask$AdaptedRunnableAction.exec(java.base@10.0.2/ForkJoinTask.java:1407)
at java.util.concurrent.ForkJoinTask.doExec(java.base@10.0.2/ForkJoinTask.java:290)
at java.util.concurrent.ForkJoinPool.runWorker(java.base@10.0.2/ForkJoinPool.java:1603)
at java.util.concurrent.ForkJoinWorkerThread.run(java.base@10.0.2/ForkJoinWorkerThread.java:177)
"outer pool2":
at java.util.concurrent.ConcurrentHashMap.compute(java.base@10.0.2/ConcurrentHashMap.java:1938)
- waiting to lock <0x00000006cfb30b58> (a java.util.concurrent.ConcurrentHashMap$ReservationNode)
at com.upserve.NestedParallel.lambda$streamParallelOuterTask$4(NestedParallel.java:87)
at com.upserve.NestedParallel$$Lambda$6/354291670.accept(Unknown Source)
at java.util.stream.ForEachOps$ForEachOp$OfLong.accept(java.base@10.0.2/ForEachOps.java:225)
at java.util.Random$RandomLongsSpliterator.forEachRemaining(java.base@10.0.2/Random.java:1099)
at java.util.Spliterator$OfLong.forEachRemaining(java.base@10.0.2/Spliterator.java:763)
at java.util.stream.AbstractPipeline.copyInto(java.base@10.0.2/AbstractPipeline.java:484)
at java.util.stream.ForEachOps$ForEachTask.compute(java.base@10.0.2/ForEachOps.java:290)
at java.util.concurrent.CountedCompleter.exec(java.base@10.0.2/CountedCompleter.java:746)
at java.util.concurrent.ForkJoinTask.doExec(java.base@10.0.2/ForkJoinTask.java:290)
at java.util.concurrent.ForkJoinPool.awaitJoin(java.base@10.0.2/ForkJoinPool.java:1713)
at java.util.concurrent.ForkJoinTask.doJoin(java.base@10.0.2/ForkJoinTask.java:397)
at java.util.concurrent.ForkJoinTask.get(java.base@10.0.2/ForkJoinTask.java:1004)
at com.upserve.NestedParallel.streamParallelInnerTask(NestedParallel.java:172)
at com.upserve.NestedParallel.lambda$biFunction$9(NestedParallel.java:147)
at com.upserve.NestedParallel$$Lambda$12/1333040169.apply(Unknown Source)
at java.util.concurrent.ConcurrentHashMap.compute(java.base@10.0.2/ConcurrentHashMap.java:1922)
- locked <0x00000006cfb8a370> (a java.util.concurrent.ConcurrentHashMap$ReservationNode)
at com.upserve.NestedParallel.lambda$streamParallelOuterTask$4(NestedParallel.java:87)
at com.upserve.NestedParallel$$Lambda$6/354291670.accept(Unknown Source)
at java.util.stream.ForEachOps$ForEachOp$OfLong.accept(java.base@10.0.2/ForEachOps.java:225)
at java.util.Random$RandomLongsSpliterator.forEachRemaining(java.base@10.0.2/Random.java:1099)
at java.util.Spliterator$OfLong.forEachRemaining(java.base@10.0.2/Spliterator.java:763)
at java.util.stream.AbstractPipeline.copyInto(java.base@10.0.2/AbstractPipeline.java:484)
at java.util.stream.ForEachOps$ForEachTask.compute(java.base@10.0.2/ForEachOps.java:290)
at java.util.concurrent.CountedCompleter.exec(java.base@10.0.2/CountedCompleter.java:746)
at java.util.concurrent.ForkJoinTask.doExec(java.base@10.0.2/ForkJoinTask.java:290)
at java.util.concurrent.ForkJoinPool.runWorker(java.base@10.0.2/ForkJoinPool.java:1603)
at java.util.concurrent.ForkJoinWorkerThread.run(java.base@10.0.2/ForkJoinWorkerThread.java:177)
Found 1 deadlock.
---------- BEGIN SOURCE ----------
package com.upserve;
import java.util.*;
import java.util.concurrent.*;
import java.util.concurrent.atomic.*;
import java.util.function.*;
import java.util.stream.*;
public class NestedParallel implements Runnable {
static final String STREAM_PARALLEL = "STREAM PARALLEL";
static final String LOOP_PARALLEL = "LOOP PARALLEL";
static final String SERIAL = "SERIAL";
final ConcurrentMap<String, Long> concurrentMap;
final ExecutorService outerPool;
final ExecutorService innerPool;
final Random random;
final String innerLoop;
final String outerLoop;
public static void main(String[] args){
// // DEADLOCKS
NestedParallel nestedParallel = new NestedParallel(STREAM_PARALLEL, STREAM_PARALLEL);
// NestedParallel nestedParallel = new NestedParallel(STREAM_PARALLEL, LOOP_PARALLEL);
// // SUCCESSS
// NestedParallel nestedParallel = new NestedParallel(STREAM_PARALLEL, SERIAL);
// NestedParallel nestedParallel = new NestedParallel(LOOP_PARALLEL, STREAM_PARALLEL);
// NestedParallel nestedParallel = new NestedParallel(SERIAL, LOOP_PARALLEL); // SLOW!
System.out.println("starting");
nestedParallel.run();
System.out.println("finished");
}
public NestedParallel(String outerLoop, String innerLoop) {
concurrentMap = new ConcurrentHashMap<>();
this.innerLoop = innerLoop;
this.outerLoop = outerLoop;
Function<String, ForkJoinPool.ForkJoinWorkerThreadFactory> threadFactoryFunction = name -> pool ->
{
final ForkJoinWorkerThread worker = ForkJoinPool.defaultForkJoinWorkerThreadFactory.newThread(pool);
worker.setName(name + worker.getPoolIndex());
return worker;
};
Function<String, ForkJoinPool> forkJoinPoolFunction = name -> new ForkJoinPool(
Runtime.getRuntime().availableProcessors(),
threadFactoryFunction.apply(name),
(t, e) -> {
System.out.println("In pool " + name + ", thread " + t + " threw exception: " + e);
},
true
);
outerPool = forkJoinPoolFunction.apply("outer pool");
innerPool = forkJoinPoolFunction.apply("inner pool");
random = new Random();
}
@Override
public void run() {
if (STREAM_PARALLEL.equals(outerLoop)) {
streamParallelOuterTask();
} else if (LOOP_PARALLEL.equals(outerLoop)) {
loopParallelOuterTask();
} else if (SERIAL.equals(outerLoop)) {
serialOuterTask();
} else {
throw new RuntimeException("Invalid command: " + outerLoop);
}
}
private void streamParallelOuterTask(){
Future future = outerPool.submit(() -> {
random
.longs(1_000_000, 0, 1_000)
.parallel()
.forEach(val -> {
if (val % 1017 == 0) System.out.println("Outer Task Thread:" + Thread.currentThread().getName());
String key = String.valueOf(val);
concurrentMap.compute(key, biFunction(val));
}
);
});
try {
future.get();
} catch (InterruptedException e) {
throw new RuntimeException("Interrupted " + Thread.currentThread().getName(), e);
} catch (ExecutionException e) {
throw new RuntimeException("Failed " + Thread.currentThread().getName(), e);
}
}
private void loopParallelOuterTask(){
List<Long> vals = random.longs(1_000_000, 0, 1_000).boxed().collect(Collectors.toList());
List<Future> futures = new ArrayList<>();
for(Long val: vals) {
futures.add(outerPool.submit(() -> {
if (val % 1000 == 0) System.out.println("Outer Task Thread:" + Thread.currentThread().getName());
String key = String.valueOf(val);
concurrentMap.compute(key, biFunction(val));
}));
}
futures.forEach(f -> {
try {
f.get();
} catch (InterruptedException e) {
throw new RuntimeException("Interrupted " + Thread.currentThread().getName(), e);
} catch (ExecutionException e) {
throw new RuntimeException("Failed " + Thread.currentThread().getName(), e);
}
});
}
private void serialOuterTask() {
random
.longs(1_000_000, 0, 1_000)
.forEach(val -> {
if (val % 1000 == 0) System.out.println("Outer Task Thread:" + Thread.currentThread().getName());
String key = String.valueOf(val);
concurrentMap.compute(key, biFunction(val));
}
);
}
private BiFunction<String, Long, Long> biFunction(long val) {
return (k, v) -> {
if (v == null) {
v = val;
} else {
v += val;
}
final Long vMod = v % 10_000;
// Do some naive task
if (STREAM_PARALLEL.equals(innerLoop)) {
return streamParallelInnerTask(vMod);
} else if (LOOP_PARALLEL.equals(innerLoop)) {
return loopParallelInnerTask(vMod);
} else if (SERIAL.equals(innerLoop)) {
return serialInnerTask(vMod);
} else {
throw new RuntimeException("Invalid command: " + innerLoop);
}
};
}
private Long streamParallelInnerTask(long vMod) {
LongAdder longAdder = new LongAdder();
// If outerPool is used again, no deadlock but process will livelock after a while
Future future = innerPool.submit(() -> {
LongStream.range(1, vMod)
.parallel()
.forEach(value -> {
// System.out.println("Inner Task Thread: " +Thread.currentThread().getName());
longAdder.add(value);
});
});
try {
future.get();
} catch (InterruptedException e) {
throw new RuntimeException("Interrupted " + Thread.currentThread().getName(), e);
} catch (ExecutionException e) {
throw new RuntimeException("Failed " + Thread.currentThread().getName(), e);
}
return longAdder.longValue();
}
private Long loopParallelInnerTask(long vMod) {
LongAdder longAdder = new LongAdder();
List<Future> futures = new ArrayList<>();
for (long i=0; i<vMod; i++) {
final Long innerVal = i;
// Can not use outerPool again here as it causes livelock with all thread waiting
futures.add(innerPool.submit(() -> {
// System.out.println("Inner Task Thread: " +Thread.currentThread().getName());
longAdder.add(innerVal);
}));
}
futures.forEach(f -> {
try {
f.get();
} catch (InterruptedException e) {
throw new RuntimeException("Interrupted " + Thread.currentThread().getName(), e);
} catch (ExecutionException e) {
throw new RuntimeException("Failed " + Thread.currentThread().getName(), e);
}
});
return longAdder.longValue();
}
private Long serialInnerTask(long vMod) {
LongAdder longAdder = new LongAdder();
LongStream.range(1, vMod)
.forEach(value -> {
// System.out.println("Inner Task Thread: " +Thread.currentThread().getName());
longAdder.add(value);
});
return longAdder.longValue();
}
}
---------- END SOURCE ----------
CUSTOMER SUBMITTED WORKAROUND :
Beware of Stream.parallel and where it may be used as an internal concern in a library.
Use other methods to parallelize high levels of an application that use Stream.parallel internally.
FREQUENCY : always