Uploaded image for project: 'JDK'
  1. JDK
  2. JDK-8281524

ForkJoinPool blocks thread instead of helping

XMLWordPrintable

    • b17
    • 17
    • generic
    • generic

      ADDITIONAL SYSTEM INFORMATION :
      All OS
      Java 17.0.2 + 18 + 19 ea

      A DESCRIPTION OF THE PROBLEM :
      When an ForkJoinTask (e.g. Arrays.parallelSort) is invoked from a thread in a ForkJoinPool it blocks the thread instead of helping execute subtasks.


      REGRESSION : Last worked in version 16

      STEPS TO FOLLOW TO REPRODUCE THE PROBLEM :
      Case 1) Try sorting an array inside a ForkJoinPool with parallelism 1:
      ForkJoinPool fjp = new ForkJoinPool(1);
      String[] objs = Stream.generate(() -> "").limit(10_000).toArray(String[]::new);
      fjp.invoke(ForkJoinTask.adapt(() -> Arrays.parallelSort(objs)));

      Case 2)
        Sort an array inside a ForkJoinPool with multiple threads

      EXPECTED VERSUS ACTUAL BEHAVIOR :
      EXPECTED -
      Arrays gets sorted using all available threads
      ACTUAL -
      Case 1)
        Sorting hangs indefinitely

      Case 2)
        Sorting does not use all threads (or deadlocks of multiple parallel sorts are run simultaneously)


      ---------- BEGIN SOURCE ----------
      import static org.junit.jupiter.api.Assertions.*;

      import java.util.Arrays;
      import java.util.Comparator;
      import java.util.Map;
      import java.util.Map.Entry;
      import java.util.concurrent.ConcurrentHashMap;
      import java.util.concurrent.ForkJoinPool;
      import java.util.concurrent.ForkJoinTask;
      import java.util.concurrent.atomic.AtomicInteger;
      import java.util.stream.IntStream;
      import java.util.stream.Stream;

      import org.junit.jupiter.api.Test;

      class ForkJoinThreadCountTest {

      private static final int TIMEOUT = 2000;

      @Test
      void test1Thread() {
      ForkJoinPool fjp = new ForkJoinPool(1);
      Thread killThread = new Thread(() -> {
      try {
      Thread.sleep(TIMEOUT);
      fjp.shutdownNow();
      System.out.println("Stopping thread pool");
      } catch (InterruptedException e) {
      // Expected
      }
      });
      String[] objs = Stream.generate(() -> "").limit(10_000).toArray(String[]::new);

      killThread.start();
      fjp.invoke(ForkJoinTask.adapt(() -> Arrays.parallelSort(objs)));
      killThread.interrupt();
      }

      @Test
      void test2Threads() {
      ForkJoinPool fjp = new ForkJoinPool(2);
      Map<Thread, AtomicInteger> comparisonCounts = new ConcurrentHashMap<>();
      Comparator<String> comp = (a,b) -> {
      comparisonCounts.computeIfAbsent(Thread.currentThread(), t -> new AtomicInteger()).incrementAndGet();
      return 0;
      };
      String[] objs = IntStream.range(0, 10_000_000).<String> mapToObj(i -> "").toArray(String[]::new);
      // Expected approximately same workload on both threads
      fjp.invoke(ForkJoinTask.adapt(() -> Arrays.parallelSort(objs, comp)));

      for (Entry<Thread, AtomicInteger> entry : comparisonCounts.entrySet()) {
      int comparisonsOnThread = entry.getValue().get();
      if (comparisonsOnThread < 500_000) {
      fail("Work not distributed evenly: " + comparisonCounts);
      }
      }
      }

      @Test
      void testCommonForkJoinPool() throws Exception {
      Thread testThread = new Thread( () -> {
      int parallelism = ForkJoinPool.getCommonPoolParallelism();
      IntStream.range(0, 2*parallelism).parallel().forEach(i -> Stream.generate(() -> "").limit(10_000).parallel().sorted().toArray() );
      });
      testThread.start();
      testThread.join(TIMEOUT);
      if (testThread.isAlive()) {
      fail("Sorting did not terminate");
      }
      }
      }
      ---------- END SOURCE ----------

      FREQUENCY : always


            dl Doug Lea
            webbuggrp Webbug Group
            Votes:
            0 Vote for this issue
            Watchers:
            10 Start watching this issue

              Created:
              Updated: