-
Enhancement
-
Resolution: Won't Fix
-
P4
-
None
-
11, 12
A DESCRIPTION OF THE PROBLEM :
Stream.flatMap is currently implemented such that it processes elements sequentially, even is the Stream is parallel. Run the example below on multicore machine: Expected: parallel processing should be faster than sequential processing. If should be roughly as fast as "poorManFlatMap". Actual: making the stream parallel does not improve the performance over sequential processing. "poorManFlapMap" is significant faster than the actual flatMap implementation.
This was also discussed here https://stackoverflow.com/questions/45038120/parallel-flatmap-always-sequential
public final class Streams {
public static void main(final String[] args) {
final Collection<List<Integer>> data = Collections.nCopies(1, getData());
poorMan(data);
sequential(data);
normal(data);
}
private static void normal(final Collection<List<Integer>> data) {
final Instant now = Instant.now();
System.out.println("data = " +
data.stream()
.flatMap(Collection::stream)
.parallel()
.mapToDouble(Math::sqrt)
.sum());
System.out.println("Duration.between(Instant.now(),now) = " + Duration.between(now, Instant.now()));
}
private static void sequential(final Collection<List<Integer>> data) {
final Instant now = Instant.now();
System.out.println("data = " +
data.stream()
.flatMap(Collection::stream)
.mapToDouble(Math::sqrt)
.sum());
System.out.println("Duration.between(Instant.now(),now) = " + Duration.between(now, Instant.now()));
}
private static void poorMan(final Collection<List<Integer>> data) {
final Instant now = Instant.now();
System.out.println("data = " +
poorManFlatMap(data.stream().parallel(), Collection::stream)
.mapToDouble(Math::sqrt)
.sum());
System.out.println("Duration.between(Instant.now(),now) = " + Duration.between(now, Instant.now()));
}
private static List<Integer> getData() {
return new Random(10).ints(100_000_000, 4, 100).boxed().collect(Collectors.toList());
}
static <T, U> Stream<T> poorManFlatMap(final Stream<U> arg, final Function<U, Stream<T>> mapper) {
final Iterator<U> iterator = arg.iterator();
if (!iterator.hasNext()) {
return Stream.empty();
}
Stream<T> result = Stream.empty();
while (iterator.hasNext()) {
result = Stream.concat(result, mapper.apply(iterator.next()).parallel());
}
return result;
}
}
Stream.flatMap is currently implemented such that it processes elements sequentially, even is the Stream is parallel. Run the example below on multicore machine: Expected: parallel processing should be faster than sequential processing. If should be roughly as fast as "poorManFlatMap". Actual: making the stream parallel does not improve the performance over sequential processing. "poorManFlapMap" is significant faster than the actual flatMap implementation.
This was also discussed here https://stackoverflow.com/questions/45038120/parallel-flatmap-always-sequential
public final class Streams {
public static void main(final String[] args) {
final Collection<List<Integer>> data = Collections.nCopies(1, getData());
poorMan(data);
sequential(data);
normal(data);
}
private static void normal(final Collection<List<Integer>> data) {
final Instant now = Instant.now();
System.out.println("data = " +
data.stream()
.flatMap(Collection::stream)
.parallel()
.mapToDouble(Math::sqrt)
.sum());
System.out.println("Duration.between(Instant.now(),now) = " + Duration.between(now, Instant.now()));
}
private static void sequential(final Collection<List<Integer>> data) {
final Instant now = Instant.now();
System.out.println("data = " +
data.stream()
.flatMap(Collection::stream)
.mapToDouble(Math::sqrt)
.sum());
System.out.println("Duration.between(Instant.now(),now) = " + Duration.between(now, Instant.now()));
}
private static void poorMan(final Collection<List<Integer>> data) {
final Instant now = Instant.now();
System.out.println("data = " +
poorManFlatMap(data.stream().parallel(), Collection::stream)
.mapToDouble(Math::sqrt)
.sum());
System.out.println("Duration.between(Instant.now(),now) = " + Duration.between(now, Instant.now()));
}
private static List<Integer> getData() {
return new Random(10).ints(100_000_000, 4, 100).boxed().collect(Collectors.toList());
}
static <T, U> Stream<T> poorManFlatMap(final Stream<U> arg, final Function<U, Stream<T>> mapper) {
final Iterator<U> iterator = arg.iterator();
if (!iterator.hasNext()) {
return Stream.empty();
}
Stream<T> result = Stream.empty();
while (iterator.hasNext()) {
result = Stream.concat(result, mapper.apply(iterator.next()).parallel());
}
return result;
}
}