Uploaded image for project: 'JDK'
  1. JDK
  2. JDK-6431315

ThreadPoolExecutor.shutdownNow may hang ExecutorService.invokeAll

XMLWordPrintable

    • b89
    • generic
    • generic
    • Verified

      Doug Lea writes:

      Neither David nor I much liked the yield I put in FutureTask. We discussed
      some ways of changing. The simplest is a cross between our suggestions:
      If a set sees that task is cancelled, it now nulls out runner, in case
      canceller thread is stalled and so will otherwise interrupt too late. There
      is still an unsolvable race here, so cancellers can still interrupt
      a thread after set completes, but this closes the gap as narrowly as would
      any other approach (modulo a few instruction cycles) and doesn't require
      any further changes.

      It does however look possible to do at least this well plus make
      set and get a bit faster by using a highly customized/hacked version
      of Bill Scherer and my dual stuff. This would be too much of a change
      for a Mustang bug fix though.

      Luckily Martin didn't pay attention the first time so hasn't
      filed bug report. Martin can you do this now? The other parts of
      changes are below.

      David: nulling runner in set only on seeing CANCELLED rather
      than unconditionally allows current sentinel use of runner,
      so avoids need to add READY state while still nulling thread
      about as soon as logically possible.

      I checked in these changes to FutureTask.


      Doug Lea wrote:
      >> Sorry that I had somehow completely forgot about this one.
      >> It IS a bug, or at least a failure to meet reasonable expectations
      >> in the submitted not-very-reasonable usage.
      >>
      >> Martin: Can you file bug:
      >>
      >> ThreadPoolExecutor.shutdownNow may hang ExecutorService.invokeAll.
      >>
      >> A test program is attached (it's just a simple adaptation of posted
      >> program.)
      >>
      >> Details....
      >> [... omitted ...]
      >>
      >>>>
      >>>> Now back to TPE and it dropping a task due to the check for shutdown.
      >>>> Here
      >>>> are the possible states as I see them after the worker has grabbed a task
      >>>> from the queue:
      >>>>
      >>>> 1. shutdown requested, thread interrupted
      >>>> 2. shutdown not-requested, thread interrupted
      >>>>
      >>>> For (1) we should just go ahead and execute the task leaving the
      >>>> interrupt
      >>>> set - that way if the task responds to interrupt it will return quickly.
      >>>>
      >>>> For (2) it means we probably have a later interrupt from a previous
      >>>> task and
      >>>> we should clear it (as we currently do).
      >>>>
      >>>> ...
      >>>>
      >>>> The problem is of course that in both TPE and futureTask the
      >>>> setting/clearing of interrupt state is not atomic with respect to the
      >>>> TPE or
      >>>> FutureTask state. So in the above we may get interrupted due to a
      >>>> shutdown
      >>>> just after we checked for not being shutdown, and so will clear an
      >>>> interrupt
      >>>> we shouldn't have. We can fix that with a second check.
      >>
      >>
      >> Yes; I agree. This leads to the simple change:
      >>
      >> Index: ThreadPoolExecutor.java
      >> ===================================================================
      >> RCS file:
      >> /export/home/jsr166/jsr166/jsr166/src/main/java/util/concurrent/ThreadPoolExecutor.java,v
      >>
      >> retrieving revision 1.80
      >> diff -c -r1.80 ThreadPoolExecutor.java
      >> *** ThreadPoolExecutor.java 20 Apr 2006 07:20:42 -0000 1.80
      >> --- ThreadPoolExecutor.java 17 May 2006 12:04:52 -0000
      >> ***************
      >> *** 666,677 ****
      >> final ReentrantLock runLock = this.runLock;
      >> runLock.lock();
      >> try {
      >> ! Thread.interrupted(); // clear interrupt status on entry
      >> ! // Abort now if immediate cancel. Otherwise, we have
      >> ! // committed to run this task.
      >> ! if (runState == STOP)
      >> ! return;
      >> !
      >> boolean ran = false;
      >> beforeExecute(thread, task);
      >> try {
      >> --- 666,676 ----
      >> final ReentrantLock runLock = this.runLock;
      >> runLock.lock();
      >> try {
      >> ! // If not shutting down then clear an outstanding
      >> interrupt.
      >> ! if (runState != STOP &&
      >> ! Thread.interrupted() &&
      >> ! runState == STOP) // Re-interrupt if stopped after
      >> clearing
      >> ! thread.interrupt();
      >> boolean ran = false;
      >> beforeExecute(thread, task);
      >> try {
      >>
      >>
      >> ------------------------------------------------------------------------
      >>
      >>
      >> import java.util.*;
      >> import java.util.concurrent.*;
      >>
      >> /**
      >> * Adapted from posting by Tom Sugden tom at epcc.ed.ac.uk
      >> */
      >> public class BlockingTaskExecutorTest {
      >>
      >>
      >> public static void main(String[] args) throws Exception {
      >> for (int i = 1; i <= 100; i++) {
      >> System.out.print(".");
      >>
      >> final ExecutorService executor = Executors.newCachedThreadPool();
      >>
      >> final NotificationReceiver notifier1 = new NotificationReceiver();
      >> final NotificationReceiver notifier2 = new NotificationReceiver();
      >>
      >> final Callable task1 = new BlockingTask(notifier1);
      >> final Callable task2 = new BlockingTask(notifier2);
      >> final Callable task3 = new NonBlockingTask(); // *** A ***
      >>
      >> final List tasks = new ArrayList();
      >> tasks.add(task1);
      >> tasks.add(task2);
      >> tasks.add(task3); // *** B***
      >>
      >> // start a thread to invoke the tasks
      >> Thread thread = new Thread() {
      >> public void run() {
      >> try {
      >> executor.invokeAll(tasks);
      >> }
      >> catch (Throwable e) {
      >> // e.printStackTrace();
      >> // System.exit(-1);
      >> }
      >> }
      >> };
      >> thread.start();
      >>
      >> // wait until tasks begin execution
      >> notifier1.waitForNotification();
      >> notifier2.waitForNotification();
      >>
      >> // now try to shutdown the executor service while tasks
      >> // are blocked. This should cause the tasks to be
      >> // interupted.
      >> executor.shutdownNow();
      >> boolean stopped = executor.awaitTermination(5, TimeUnit.SECONDS);
      >> // System.out.println("Terminated? " + stopped);
      >> if (!stopped) throw new Error("Executor stuck");
      >>
      >> // wait for the invocation thread to complete
      >> thread.join(1000);
      >> if (thread.isAlive()) {
      >> thread.interrupt();
      >> thread.join(1000);
      >> throw new Error("invokeAll stuck");
      >> }
      >> }
      >> System.out.println("\n done.");
      >> }
      >> }
      >>
      >> /**
      >> * A helper class with a method to wait for a notification. The notification
      >> is
      >> * received via the <code>sendNotification</code> method.
      >> */
      >> class NotificationReceiver
      >> {
      >> /** Has the notifier been notified? */
      >> boolean mNotified = false;
      >>
      >> /**
      >> * Notify the notification receiver.
      >> */
      >> public synchronized void sendNotification()
      >> {
      >> mNotified = true;
      >> notifyAll();
      >> }
      >>
      >> /**
      >> * Waits until a notification has been received.
      >> *
      >> * @throws InterruptedException
      >> * if the wait is interrupted
      >> */
      >> public synchronized void waitForNotification() throws InterruptedException
      >> {
      >> while (!mNotified)
      >> {
      >> wait();
      >> }
      >> }
      >> }
      >>
      >> /**
      >> * A callable task that blocks until it is interupted. This task sends a
      >> * notification to a notification receiver when it is first called.
      >> */
      >> class BlockingTask implements Callable
      >> {
      >> private final NotificationReceiver mReceiver;
      >>
      >> BlockingTask(NotificationReceiver notifier)
      >> {
      >> mReceiver = notifier;
      >> }
      >>
      >> public Object call() throws Exception
      >> {
      >> mReceiver.sendNotification();
      >>
      >> // wait indefinitely until task is interupted
      >> while (true)
      >> {
      >> synchronized (this)
      >> {
      >> wait();
      >> }
      >> }
      >> }
      >> }
      >>
      >> /**
      >> * A callable task that simply returns a string result.
      >> */
      >> class NonBlockingTask implements Callable
      >> {
      >> public Object call() throws Exception
      >> {
      >> return "NonBlockingTaskResult";
      >> }
      >> }
      >>
      >>
      >> ------------------------------------------------------------------------
      >>
      >> _______________________________________________
      >> Concurrency-jsr mailing list
      >> ###@###.###
      >> http://altair.cs.oswego.edu/mailman/listinfo/concurrency-jsr

      _______________________________________________
      Concurrency-jsr mailing list
      ###@###.###
      http://altair.cs.oswego.edu/mailman/listinfo/concurrency-jsr

            martin Martin Buchholz
            martin Martin Buchholz
            Votes:
            0 Vote for this issue
            Watchers:
            1 Start watching this issue

              Created:
              Updated:
              Resolved:
              Imported:
              Indexed: