Uploaded image for project: 'JDK'
  1. JDK
  2. JDK-8042758

Using a semaphore inside a parallel stream action may deadlock ANOTHER stream.

XMLWordPrintable

    • Icon: Bug Bug
    • Resolution: Not an Issue
    • Icon: P4 P4
    • None
    • 8
    • core-libs

      FULL PRODUCT VERSION :
      java version "1.8.0_05"
      Java(TM) SE Runtime Environment (build 1.8.0_05-b13)
      Java HotSpot(TM) 64-Bit Server VM (build 25.5-b02, mixed mode)

      ADDITIONAL OS VERSION INFORMATION :
      Darwin Vesper.local 13.1.0 Darwin Kernel Version 13.1.0: Wed Apr 2 23:52:02 PDT 2014; root:xnu-2422.92.1~2/RELEASE_X86_64 x86_64

      A DESCRIPTION OF THE PROBLEM :
      Consider the following situation: We are using a Java 8 parallel stream to perform a parallel forEach loop, e.g.,

      IntStream.range(0,20).parallel().forEach(i -> { /* work done here */})
      The number of parallel threads is controlled by the system property "java.util.concurrent.ForkJoinPool.common.parallelism" and usually equal to the number of processors.

      Now assume that we like to limit the number of parallel executions for a specific pice of work - e.g. because that part is memory intensive and memory constrain imply a limit of parallel executions.

      An obvious and elegant way to limit parallel executions is to use a Semaphore, e.g., the following pice of code limits the number of parallel executions to 5:

              final Semaphore concurrentExecutions = new Semaphore(5);
              IntStream.range(0,20).parallel().forEach(i -> {

                  concurrentExecutions.acquireUninterruptibly();

                  try {
                      /* WORK DONE HERE */
                  }
                  finally {
                      concurrentExecutions.release();
                  }
              });
      This works just fine.

      However: Using any other parallel stream inside the worker (at /* WORK DONE HERE */) may result in a deadlock.

      For me this is an unexpected behavior.


      STEPS TO FOLLOW TO REPRODUCE THE PROBLEM :
      Run the program provided, the test program is available at http://svn.finmath.net/finmath%20experiments/trunk/src/net/finmath/experiments/concurrency/ForkJoinPoolTest.java

      Change the setting of the boolean isUseInnerStream to false -> the DEADLOCK will not occur.

      EXPECTED VERSUS ACTUAL BEHAVIOR :
      EXPECTED -
      I was expecting that the program finishes without deadlock, limited to 5 concurrent threads, while the inner loop uses all remaining threads.
      ACTUAL -
      Deadlock.

      REPRODUCIBILITY :
      This bug can be reproduced always.

      ---------- BEGIN SOURCE ----------
      /*
       * (c) Copyright Christian P. Fries, Germany. All rights reserved. Contact: email@christian-fries.de.
       *
       * Created on 03.05.2014
       */
      package net.finmath.experiments.concurrency;

      import java.util.concurrent.Semaphore;
      import java.util.stream.IntStream;

      /**
       * This is a test of Java 8 parallel streams.
       *
       * The idea behind this code is that the Semaphore concurrentExecutions
       * should limit the parallel executions of the outer forEach (which is an
       * <code>IntStream.range(0,numberOfTasks).parallel().forEach</code> (for example:
       * the parallel executions of the outer forEach should be limited due to a
       * memory constrain).
       *
       * Inside the execution block of the outer forEach we use another parallel stream
       * to create an inner forEach. The number of concurrent
       * executions of the inner forEach is not limited by us (it is however limited by a
       * system property "java.util.concurrent.ForkJoinPool.common.parallelism").
       *
       * Problem: If the semaphore is used AND the inner forEach is active, then
       * the execution will be DEADLOCKED.
       *
       * Note: A practical application is the implementation of the parallel
       * LevenbergMarquardt optimizer in
       * {@link http://finmath.net/java/finmath-lib/apidocs/net/finmath/optimizer/LevenbergMarquardt.html}
       * In one application the number of tasks in the outer and inner loop is very large (>1000)
       * and due to memory limitation the outer loop should be limited to a small (5) number
       * of concurrent executions.
       *
       * @author Christian Fries
       */
      public class ForkJoinPoolTest {

      public static void main(String[] args) {

      // Any combination of the booleans works, except (true,true)
      final boolean isUseSemaphore = true;
      final boolean isUseInnerStream = true;

      final int numberOfTasksInOuterLoop = 20; // In real applications this can be a large number (e.g. > 1000).
      final int numberOfTasksInInnerLoop = 100; // In real applications this can be a large number (e.g. > 1000).
      final int concurrentExecusionsLimitInOuterLoop = 5;
      final int concurrentExecutionsLimitForStreams = 10;

      final Semaphore concurrentExecutions = new Semaphore(concurrentExecusionsLimitInOuterLoop);

      System.setProperty("java.util.concurrent.ForkJoinPool.common.parallelism",Integer.toString(concurrentExecutionsLimitForStreams));
      System.out.println("java.util.concurrent.ForkJoinPool.common.parallelism = " + System.getProperty("java.util.concurrent.ForkJoinPool.common.parallelism"));

      IntStream.range(0,numberOfTasksInOuterLoop).parallel().forEach(i -> {

      if(isUseSemaphore) {
      concurrentExecutions.acquireUninterruptibly();
      }

      try {
      System.out.println(i + "\t" + concurrentExecutions.availablePermits() + "\t" + Thread.currentThread());

      if(isUseInnerStream) {
      runCodeWhichUsesParallelStream(numberOfTasksInInnerLoop);
      }
      else {
      try {
      Thread.sleep(10*numberOfTasksInInnerLoop);
      } catch (Exception e) {
      }
      }
      }
      finally {
      if(isUseSemaphore) {
      concurrentExecutions.release();
      }
      }
      });

      System.out.println("D O N E");
      }

      /**
      * Runs code in a parallel forEach using streams.
      *
      * @param numberOfTasksInInnerLoop Number of tasks to execute.
      */
      private static void runCodeWhichUsesParallelStream(int numberOfTasksInInnerLoop) {
      IntStream.range(0,numberOfTasksInInnerLoop).parallel().forEach(j -> {
      try {
      Thread.sleep(10);
      } catch (Exception e) {
      }
      });
      }
      }
      ---------- END SOURCE ----------

      CUSTOMER SUBMITTED WORKAROUND :
      An obvious workaround is to use a dedicated Thread Pool for the outer loop.

            psandoz Paul Sandoz
            webbuggrp Webbug Group
            Votes:
            0 Vote for this issue
            Watchers:
            4 Start watching this issue

              Created:
              Updated:
              Resolved: