-
Enhancement
-
Resolution: Unresolved
-
P4
-
None
-
8, 9
-
None
When a stream is evaluated in parallel and a behavioural parameter throws a runtime exception then the evaluation is halted and the exception is re-thrown to the caller of the stream terminal operation. However, the exception may be thrown before the stream computation has "quiesced" and all associated tasks have completed (exceptionally or otherwise). This behaviour is contrary to that of short-circuiting operations such as findFirst or findAll where the result is returned after all associated tasks have completed (even if the result is known before all tasks have completed). Such behaviour ensures no straggling tasks that may affect common pool resource utilization for further stream computations.
The same behaviour will also occur for bulk operations performed on a ConcurrentHashMap.
In some cases it is arguable that if an unexpected exception is thrown that straggling tasks are ok, since there certain code is not functioning correctly. In other cases an exception might be explicitly thrown to cancel the computation, and it may be the only means to do so (e.g. for stream computations or ConcurrentHashMap bulk operations where the use of Fork/Join is mostly an implementation detail).
A supported mode in ForkJoinTask seems preferable where an exception does not result in straggling tasks.
See the following class that reproduces the straggling task behaviour:
import java.util.concurrent.*;
import java.util.concurrent.atomic.*;
import java.util.stream.*;
public class StreamEx3 {
public static void main(String[] args) throws Exception {
for (int i = 0; i < 10000; ++i) {
int idx = test();
if (idx >= 0) {
System.out.println("late exec idx " + idx + " trial " + i);
break;
}
}
}
static int test() {
int s = 1024 * 16;
int rnd = ThreadLocalRandom.current().nextInt(s);
AtomicBoolean done = new AtomicBoolean();
AtomicInteger straggle = new AtomicInteger();
IntStream str = IntStream.range(0, s).parallel();
try {
str.forEach(e -> {
if (e == rnd)
throw new RuntimeException();
else if (done.get())
straggle.getAndIncrement();
});
} catch (RuntimeException e) {
} finally {
done.set(true);
}
if (!ForkJoinPool.commonPool().awaitQuiescence(1, TimeUnit.MINUTES))
System.out.println("stuck");
return straggle.get() != 0 ? rnd : -1;
}
}
The same behaviour will also occur for bulk operations performed on a ConcurrentHashMap.
In some cases it is arguable that if an unexpected exception is thrown that straggling tasks are ok, since there certain code is not functioning correctly. In other cases an exception might be explicitly thrown to cancel the computation, and it may be the only means to do so (e.g. for stream computations or ConcurrentHashMap bulk operations where the use of Fork/Join is mostly an implementation detail).
A supported mode in ForkJoinTask seems preferable where an exception does not result in straggling tasks.
See the following class that reproduces the straggling task behaviour:
import java.util.concurrent.*;
import java.util.concurrent.atomic.*;
import java.util.stream.*;
public class StreamEx3 {
public static void main(String[] args) throws Exception {
for (int i = 0; i < 10000; ++i) {
int idx = test();
if (idx >= 0) {
System.out.println("late exec idx " + idx + " trial " + i);
break;
}
}
}
static int test() {
int s = 1024 * 16;
int rnd = ThreadLocalRandom.current().nextInt(s);
AtomicBoolean done = new AtomicBoolean();
AtomicInteger straggle = new AtomicInteger();
IntStream str = IntStream.range(0, s).parallel();
try {
str.forEach(e -> {
if (e == rnd)
throw new RuntimeException();
else if (done.get())
straggle.getAndIncrement();
});
} catch (RuntimeException e) {
} finally {
done.set(true);
}
if (!ForkJoinPool.commonPool().awaitQuiescence(1, TimeUnit.MINUTES))
System.out.println("stuck");
return straggle.get() != 0 ? rnd : -1;
}
}