Uploaded image for project: 'JDK'
  1. JDK
  2. JDK-8267196

ScheduledThreadPoolExecutor Future never finishes is submitted before shutdown

XMLWordPrintable

      ADDITIONAL SYSTEM INFORMATION :
      Tried it on Windows (can reproduce it from jdk 8 - 16)

      A DESCRIPTION OF THE PROBLEM :
      if a Runnable/Callable is submitted into ScheduledThreadPoolExecutor and somebody immediately calls shutdownNow() the Runnable/Callable never starts, but is also never marked as cancelled or done (so it hangs in a limbo)

      STEPS TO FOLLOW TO REPRODUCE THE PROBLEM :
      submit Runnable/Callable into ScheduledThreadPoolExecutor, immediately call executor.shutdownNow() and executor.awaitTermination(...) to make sure everything finished, then check all Futures are either done or cancelled (some of them might not be).

      EXPECTED VERSUS ACTUAL BEHAVIOR :
      EXPECTED -
      the received Future should be either done or cancelled
      ACTUAL -
      it is neither started, done nor cancelled

      ---------- BEGIN SOURCE ----------
      package javafixes.concurrency;

      import org.junit.Test;

      import java.util.ArrayList;
      import java.util.List;
      import java.util.concurrent.*;
      import java.util.concurrent.atomic.AtomicBoolean;
      import java.util.function.Function;

      import static java.util.stream.Collectors.toList;
      import static java.util.stream.IntStream.range;

      public class JdkSchedulerIssue {

          @Test
          public void futureNeverStartsNorFinishesIfSubmittedOnScheduledExecutorBeforeShutdownNow() throws Exception {
              int threadCount = 3;

              for (int retryIndex = 0; retryIndex < 100_000; retryIndex++) {

                  System.out.println((retryIndex + 1) + ". attempt");

                  ExecutorService executor = Executors.newScheduledThreadPool(threadCount); // problematic executor
      // ExecutorService executor = Executors.newFixedThreadPool(threadCount); // fixed thread pool executor has no such issue

                  List<AtomicBoolean> hasStartedFlags = range(0, threadCount).mapToObj(__ -> new AtomicBoolean(false)).collect(toList());
                  List<Future<Integer>> futures = new ArrayList<>();

                  for (int threadIndex = 0; threadIndex < threadCount; threadIndex++) {
                      int index = threadIndex;

                      Callable<Integer> callable = () -> {
                          hasStartedFlags.get(index).set(true);

                          do {
                              Thread.sleep(randomInt(500, 750));
                          } while (!Thread.interrupted());

                          return 0;
                      };

                      futures.add(
                              executor.submit(callable)
                      );
                  }

      // Thread.sleep(200); // this fixes it

                  List<Runnable> drainedRunnables = executor.shutdownNow();

                  executor.awaitTermination(10, TimeUnit.SECONDS); // so this should guarantee that all submitted tasks have ended

                  if (drainedRunnables.size() < threadCount) { // this is not necessary

                      System.out.println("drained runnables count = " + drainedRunnables.size());

                      for (int threadIndex = 0; threadIndex < threadCount; threadIndex++) {
                          Future<Integer> future = futures.get(threadIndex);
                          try {

                              future.get(15, TimeUnit.SECONDS); // 15 seconds is an overkill, just to make sure that there wasn't some delay while cancelling or finishing the Callable

                          } catch (TimeoutException e) { // There should be no timeout
                              boolean hasStarted = hasStartedFlags.get(threadIndex).get();
                              boolean isCancelled = future.isCancelled();
                              boolean isDone = future.isDone();

                              if (!hasStarted && !isCancelled && !isDone) {

                                  System.out.println("hasStarted = " + hasStarted);
                                  System.out.println("isCancelled = " + isCancelled);
                                  System.out.println("isDone = " + isDone);
                                  throw new IllegalStateException();
                              }
                          } catch (Exception e) {
                              // do nothing
                          }
                      }
                  }
              }
          }

          public static int randomInt(int from, int to, Function<Integer, Boolean>... validityConditions) {
              return (int) ((long) (Math.random() * ((long) to - (long) from + 1L)) + (long) from);
          }
      }

      ---------- END SOURCE ----------

      FREQUENCY : occasionally


            dl Doug Lea
            webbuggrp Webbug Group
            Votes:
            0 Vote for this issue
            Watchers:
            5 Start watching this issue

              Created:
              Updated:
              Resolved: