Uploaded image for project: 'JDK'
  1. JDK
  2. JDK-8211988

Improve parallelism of Stream.flatMap

XMLWordPrintable

      A DESCRIPTION OF THE PROBLEM :
      Stream.flatMap is currently implemented such that it processes elements sequentially, even is the Stream is parallel. Run the example below on multicore machine: Expected: parallel processing should be faster than sequential processing. If should be roughly as fast as "poorManFlatMap". Actual: making the stream parallel does not improve the performance over sequential processing. "poorManFlapMap" is significant faster than the actual flatMap implementation.

      This was also discussed here https://stackoverflow.com/questions/45038120/parallel-flatmap-always-sequential

      public final class Streams {
          public static void main(final String[] args) {
              final Collection<List<Integer>> data = Collections.nCopies(1, getData());
              poorMan(data);
              sequential(data);
              normal(data);

          }

          private static void normal(final Collection<List<Integer>> data) {
              final Instant now = Instant.now();
              System.out.println("data = " +
                                         data.stream()
                                                 .flatMap(Collection::stream)
                                                 .parallel()
                                                 .mapToDouble(Math::sqrt)
                                                 .sum());
              System.out.println("Duration.between(Instant.now(),now) = " + Duration.between(now, Instant.now()));
          }


          private static void sequential(final Collection<List<Integer>> data) {
              final Instant now = Instant.now();
              System.out.println("data = " +
                                         data.stream()
                                                 .flatMap(Collection::stream)
                                                 .mapToDouble(Math::sqrt)
                                                 .sum());
              System.out.println("Duration.between(Instant.now(),now) = " + Duration.between(now, Instant.now()));
          }


          private static void poorMan(final Collection<List<Integer>> data) {
              final Instant now = Instant.now();
              System.out.println("data = " +
                                         poorManFlatMap(data.stream().parallel(), Collection::stream)
                                                 .mapToDouble(Math::sqrt)
                                                 .sum());
              System.out.println("Duration.between(Instant.now(),now) = " + Duration.between(now, Instant.now()));
          }

          private static List<Integer> getData() {
              return new Random(10).ints(100_000_000, 4, 100).boxed().collect(Collectors.toList());
          }

          static <T, U> Stream<T> poorManFlatMap(final Stream<U> arg, final Function<U, Stream<T>> mapper) {
              final Iterator<U> iterator = arg.iterator();
              if (!iterator.hasNext()) {
                  return Stream.empty();
              }
              Stream<T> result = Stream.empty();
              while (iterator.hasNext()) {
                  result = Stream.concat(result, mapper.apply(iterator.next()).parallel());
              }
              return result;
          }
      }


            Unassigned Unassigned
            webbuggrp Webbug Group
            Votes:
            0 Vote for this issue
            Watchers:
            3 Start watching this issue

              Created:
              Updated:
              Resolved: