-
Enhancement
-
Resolution: Fixed
-
P4
-
None
-
None
-
b19
AbstractSpliterator and IteratorSpliterator are widely used in standard stream sources and in third-party code. Standard sources affected by this problem include:
Stream.iterate(seed, hasNext, next)
BufferedReader.lines()
Files.list()
Files.walk()
CharSequence.codePoints() (default implementation)
Scanner.tokens()
Iterable.spliterator() (default implementation)
and so on
When AbstractSpliterator, IteratorSpliterator, or their primitive specializations are created with unknown size, their splitting behavior is tailored for quite long streams. However, situations when we have only a few elements with long CPU-intensive downstream operations are also common. In these situations, standard Stream API performs exceptionally badly with these spliterators. In particular, if the stream contains 1024 elements or less, parallel() may have no effect at all: stream is not parallelized regardless of how many CPU cores we have and how long downstream operation takes. If the stream contains 1025-3072 elements, parallel() uses at most two cores, even if we have much more. Even with 10,000 elements we heavily underload the typical modern hardware.
The problem can be easily demonstrated with this code:
Stream.iterate(0, i -> i < 20, i -> i+1)
.parallel()
.forEach(x -> {
LockSupport.parkNanos(TimeUnit.SECONDS.toNanos(1));
System.out.println(x);
});
Here, we have a stream of 20 elements. The processing of each one takes one second. You may observe that the whole operation takes at least 20 seconds, and the elements are always printed in ascending order. If you print the thread name, you'll see that it's always the same. Note that you may replace the source with something else (e.g. IntStream.range(0, 20).boxed()) and observe the proper parallelization.
Analysis
===
The problem lies in the strategy used inside Stream API implementation that decides whether we should split further or stop. It's implemented in several places in the same manner (e.g. java.util.stream.AbstractShortCircuitTask#compute). At first, we estimate the size of the whole stream calling estimateSize() on the original spliterator. Then we compute the target size (sizeThreshold) dividing this value by (parallelism_level * 4). After that we attempt to split only when the estimatedSize() of the part is bigger than the sizeThreshold.
These spliterators report initial estimatedSize() as Long.MAX_VALUE (which is the conventional way to say that the size is unknown). However, when the part of the source is dumped into an array, a fresh ArraySpliterator is created which reports its size exactly. Current implementation starts with array size = 1024 (BATCH_UNIT). As this size is much less than Long.MAX_VALUE/(parallelism_level * 4) (for whatever reasonable parallelism_level), we never try to split this array further, even if we have no reminder at all.
Proposed solution
===
I propose a simple solution. Let's allow ArraySpliterator to be non-SIZED and report artificial estimatedSize(), much bigger than the real one. This will allow AbstractSpliterator and IteratorSpliterator to produce prefix whose size is comparable to Long.MAX_VALUE (say, starting with Long.MAX_VALUE/2), and this will enable further splitting of the prefix. This change will drastically improve parallel streaming for affected streams of size <= 1024 and significantly improve for streams of size 1025..20000. The cost is higher-grained splitting for huge streams of unknown size. This might add a minor overhead for such scenarios which, I believe, is completely tolerable.
No public API changes are necessary, sequential processing should not be affected, except an extra field in ArraySpliterator which increases a footprint by 8 bytes.
Stream.iterate(seed, hasNext, next)
BufferedReader.lines()
Files.list()
Files.walk()
CharSequence.codePoints() (default implementation)
Scanner.tokens()
Iterable.spliterator() (default implementation)
and so on
When AbstractSpliterator, IteratorSpliterator, or their primitive specializations are created with unknown size, their splitting behavior is tailored for quite long streams. However, situations when we have only a few elements with long CPU-intensive downstream operations are also common. In these situations, standard Stream API performs exceptionally badly with these spliterators. In particular, if the stream contains 1024 elements or less, parallel() may have no effect at all: stream is not parallelized regardless of how many CPU cores we have and how long downstream operation takes. If the stream contains 1025-3072 elements, parallel() uses at most two cores, even if we have much more. Even with 10,000 elements we heavily underload the typical modern hardware.
The problem can be easily demonstrated with this code:
Stream.iterate(0, i -> i < 20, i -> i+1)
.parallel()
.forEach(x -> {
LockSupport.parkNanos(TimeUnit.SECONDS.toNanos(1));
System.out.println(x);
});
Here, we have a stream of 20 elements. The processing of each one takes one second. You may observe that the whole operation takes at least 20 seconds, and the elements are always printed in ascending order. If you print the thread name, you'll see that it's always the same. Note that you may replace the source with something else (e.g. IntStream.range(0, 20).boxed()) and observe the proper parallelization.
Analysis
===
The problem lies in the strategy used inside Stream API implementation that decides whether we should split further or stop. It's implemented in several places in the same manner (e.g. java.util.stream.AbstractShortCircuitTask#compute). At first, we estimate the size of the whole stream calling estimateSize() on the original spliterator. Then we compute the target size (sizeThreshold) dividing this value by (parallelism_level * 4). After that we attempt to split only when the estimatedSize() of the part is bigger than the sizeThreshold.
These spliterators report initial estimatedSize() as Long.MAX_VALUE (which is the conventional way to say that the size is unknown). However, when the part of the source is dumped into an array, a fresh ArraySpliterator is created which reports its size exactly. Current implementation starts with array size = 1024 (BATCH_UNIT). As this size is much less than Long.MAX_VALUE/(parallelism_level * 4) (for whatever reasonable parallelism_level), we never try to split this array further, even if we have no reminder at all.
Proposed solution
===
I propose a simple solution. Let's allow ArraySpliterator to be non-SIZED and report artificial estimatedSize(), much bigger than the real one. This will allow AbstractSpliterator and IteratorSpliterator to produce prefix whose size is comparable to Long.MAX_VALUE (say, starting with Long.MAX_VALUE/2), and this will enable further splitting of the prefix. This change will drastically improve parallel streaming for affected streams of size <= 1024 and significantly improve for streams of size 1025..20000. The cost is higher-grained splitting for huge streams of unknown size. This might add a minor overhead for such scenarios which, I believe, is completely tolerable.
No public API changes are necessary, sequential processing should not be affected, except an extra field in ArraySpliterator which increases a footprint by 8 bytes.