Uploaded image for project: 'JDK'
  1. JDK
  2. JDK-6459119

Explain how afterExecute can access a submitted job's Throwable

XMLWordPrintable

    • b08
    • x86
    • windows_xp
    • Not verified

      FULL PRODUCT VERSION :
      java version "1.6.0-beta2"
      Java(TM) SE Runtime Environment (build 1.6.0-beta2-b86)
      Java HotSpot(TM) Client VM (build 1.6.0-beta2-b86, mixed mode, sharing)

      ADDITIONAL OS VERSION INFORMATION :
      Microsoft Windows XP [Version 5.1.2600]

      A DESCRIPTION OF THE PROBLEM :
      ScheduledThreadPoolExecutor swallows Exceptions coming from Runnables' run() method submitted via execute(Runnable command), in contrast to ThreadPoolExecutor's same method, where afterExecute(Runnable r, Throwable t) will be invoked with the RuntimeException (not Errors, see bug #6450211) that happened in such cases.

      This is because ScheduledTPE wraps the Runnable in a ScheduledFutureTask, by submitting it to schedule(command, 0, TimeUnit.NANOSECONDS), however it returns void, thus there is no way to get at the Exception thrown from the Runnable's run() method.

      Actually, it seems like afterExecute(...) never will get an non-null Throwable in the ScheduledTPE, and it will probably also count all tasks as "completed", opposed to normal TPE, which distinguishes here (but this might also be wrong, confer bug #6450207).

      This seems inconsistent, and I haven't found a description of this in the javadoc. It would seem more correct to let afterExecute also know of all Throwables that happened in a ScheduledTPE, in particular since this seems to be the only way to "install" an (external) "exception listener" in the ThreadPool.

      STEPS TO FOLLOW TO REPRODUCE THE PROBLEM :
      run test case

      EXPECTED VERSUS ACTUAL BEHAVIOR :
      EXPECTED -
      ThreadPool Runnable's Exception caught!
      (Pool Thread's stacktrace, due to unwind)
      ScheduledThreadPool Runnable's Exception caught!
      (Maybe Pool Thread's stacktrace, due to unwind?)

      ACTUAL -
      ThreadPool Runnable's Exception caught!
      (Pool Thread's stacktrace, due to unwind)
      ScheduledThreadPool Runnable gave no Exception

      REPRODUCIBILITY :
      This bug can be reproduced always.

      ---------- BEGIN SOURCE ----------
      import java.util.concurrent.LinkedBlockingQueue;
      import java.util.concurrent.ScheduledThreadPoolExecutor;
      import java.util.concurrent.ThreadPoolExecutor;
      import java.util.concurrent.TimeUnit;

      public class DifferentHandling {

      public static void main(String[] args) {
      ThreadPool threadPool = new ThreadPool();
      ScheduledThreadPool scheduledThreadPool = new ScheduledThreadPool();
      threadPool.execute(new Task());
      scheduledThreadPool.execute(new Task());
      }

      private static class Task implements Runnable {
      public void run() {
      throw new RuntimeException("test");
      }
      }

      private static class ThreadPool extends ThreadPoolExecutor {
      ThreadPool() {
      super(10, 10, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>());
      }

      @Override
      protected void afterExecute(Runnable r, Throwable t) {
      if (t != null) {
      System.out.println("ThreadPool Runnable's Exception caught!");
      }
      else {
      System.out.println("ThreadPool Runnable's gave no Exception");
      }
      }
      }

      private static class ScheduledThreadPool extends ScheduledThreadPoolExecutor {
      ScheduledThreadPool() {
      super(10);
      }

      @Override
      protected void afterExecute(Runnable r, Throwable t) {
      if (t != null) {
      System.out.println("ScheduledThreadPool Runnable's Exception caught!");
      }
      else {
      System.out.println("ScheduledThreadPool Runnable gave no Exception");
      }
      }
      }
      }

      ---------- END SOURCE ----------

      CUSTOMER SUBMITTED WORKAROUND :
      Possibly implement the decorateTask(Runnable runnable, RunnableScheduledFuture<V> task) to wrap the task in a new RunnableScheduledFuture which invokes directly through on all methods except run, which stores any Exception in a static ThreadLocal before throwing on, and then in afterExecute check for (and clear) this.
      The submitter added the following SDN comment:

      My point is exactly that afterExecute _cannot_ be used - the Exception argument when it is being invoked is always null (since it is wrapped up in an internal implementation of (Runnable)ScheduledFuture which _don't_ propagate any Exceptions back to be included in the afterExecute invocation). The actual problem is that if you just want to send off a task to be executed (or scheduled), and don't want to hang on to the returned Future to see what happens, any Exceptions or Errors happening within the run()/call() are just utterly swallowed without any trace, and I havent found any natural extensionpoint in the class to handle this (which leads me to believe that afterExecute isn't working as (probably?) intended in ScheduledTPE).

      (Might be another bug:) In addition, I find it hard to see how one easily can employ the decorateTask methods: The RunnableScheduledFuture that you're provided with is an internal (non-static) class of ScheduledThreadPoolExecutor (called ScheduledFutureTask), without public accessors of the internal data. This combined with the fact that decorateTask methods do not provide all the information that was given to the actual schedule or submit invocation (delay, initialDelay, period, unit and result) makes it hard (I so far think it is impossible) to make an external implementation that will honor the contracts of the schedule-methods. The methods run() and its private sibling runPeriodic() are problematic to implement externally due to this lacking information, and in addition the accessed private field sequencer (and NANO_ORIGIN) of ScheduledTPE might make it even harder (at least to just copy'n'paste implement directly!).

      (Might be yet another bug:) In addition to this, I tried to do some proxying-magic with the actual provided instance of RunnableScheduledFuture. However, also here I found problems: The above-mentioned internal implementation of RunnableScheduledFuture extends FutureTask. This class uses an internal class called Sync which extends AbstractQueuedSynchronizer. The actual run() method invokes sync.innerRun(), which thus encapsulates any raised Exceptions (so that they can be set in the Future, obviously - but this makes this intervention-point useless too). But, yes, there is also a method setException(Throwable t), whose javadoc states "This method is invoked internally by the run method upon failure of the computation.". There is however no calls in any code to this method: when the contained task raises Exception whithin the innerRun() method, _inner_SetException() is invoked _directly_ (which most probably should have been a invocation to setException() instead, since this default does sync.innerSetException()). Further, there is also a "done()" method, which is invoked both on good and bad runs. But, again, there is no public ways to get to the Exception that was set with setException (or rather innerSetException).

      The workaround-suggestion I came up with in "CUSTOMER SUBMITTED WORKAROUND" is obviously flawed, since the whole idea of a Worker-Pool is that _some other_ thread should run the run()/call() method, hence ThreadLocals won't cut it!

      However, after hours of banging my head while looking in vain for appropriate extension-points or listener-registration possibilities, I finally came up with an (obvious?) extension that simply overrides all submit-methods by wrapping up the submitted Callable or Runnable in a new instance of the same, which invokes the run/call in a try-catch, and runs notification to registered TaskExceptionListeners before throwing on (which hopefully should preserve contract - any Callables or Runnables laying in the queue are already "hidden" behind the internal RunnableScheduledFuture)

      /**
       * Extension of {@link ScheduledThreadPoolExecutor} that makes it possible to install {@link TaskExceptionListener
       * TaskExceptionListeners} to get notified of Exceptions and Errors that happen while the
       * Runnable or Callable is executed.
       *
       * @author endre
       */
      public class ScheduledThreadPoolExecutorWithExceptionListening extends ScheduledThreadPoolExecutor {

      static final Logger log = Logger.getLogger("picorg.util.ScheduledExecutor");

      public ScheduledThreadPoolExecutorWithExceptionListening(int corePoolSize, RejectedExecutionHandler handler) {
      super(corePoolSize, handler);
      }

      public ScheduledThreadPoolExecutorWithExceptionListening(int corePoolSize, ThreadFactory threadFactory,
      RejectedExecutionHandler handler) {
      super(corePoolSize, threadFactory, handler);
      }

      public ScheduledThreadPoolExecutorWithExceptionListening(int corePoolSize, ThreadFactory threadFactory) {
      super(corePoolSize, threadFactory);
      }

      public ScheduledThreadPoolExecutorWithExceptionListening(int corePoolSize) {
      super(corePoolSize);
      }

      private final CopyOnWriteArraySet _taskExceptionListeners = new CopyOnWriteArraySet();

      public void addTaskExceptionListener(TaskExceptionListener tel) {
      _taskExceptionListeners.add(tel);
      }

      /**
      * "Rename-method" of
      * {@link #addTaskExceptionListener(com.picorg.util.ScheduledThreadPoolExecutorWithExceptionListening.TaskExceptionListener)},
      * so that it's easier to DI the {@link TaskExceptionListener}.
      *
      * @param tel the TaskExceptionListener to add.
      */
      public void setTaskExceptionListener(TaskExceptionListener tel) {
      _taskExceptionListeners.add(tel);
      }

      public void removeTaskExceptionListener(TaskExceptionListener tel) {
      _taskExceptionListeners.remove(tel);
      }

      private void exception(Runnable r, Throwable t) {
      for (TaskExceptionListener listener : _taskExceptionListeners) {
      listener.taskRaisedException(r, t);
      }
      }

      private void exception(Callable c, Throwable t) {
      for (TaskExceptionListener listener : _taskExceptionListeners) {
      listener.taskRaisedException(c, t);
      }
      }

      private Runnable newRunnable(final Runnable command) {
      return new Runnable() {
      public void run() {
      try {
      command.run();
      }
      catch (RuntimeException e) {
      exception(command, e);
      throw e;
      }
      catch (Error e) {
      exception(command, e);
      throw e;
      }
      }
      };
      }

      private Callable newCallable(final Callable callable) {
      return new Callable() {
      public T call() throws Exception {
      try {
      return callable.call();
      }
      catch (Exception e) {
      exception(callable, e);
      throw e;
      }
      catch (Error e) {
      exception(callable, e);
      throw e;
      }
      }
      };
      }

      @Override
      public ScheduledFuture schedule(Callable callable, long delay, TimeUnit unit) {
      if (log.isTraceEnabled())
      log.trace("threadpool.schedule(Callable:[" + callable + "], delay:[" + delay + ' ' + unit + "]).");
      return super.schedule(newCallable(callable), delay, unit);
      }

      @Override
      public ScheduledFuture schedule(Runnable command, long delay, TimeUnit unit) {
      if (log.isTraceEnabled())
      log.trace("threadpool.schedule(Runnable:[" + command + "], delay:[" + delay + ' ' + unit + "]).");
      return super.schedule(newRunnable(command), delay, unit);
      }

      @Override
      public ScheduledFuture scheduleAtFixedRate(Runnable command, long initialDelay, long period, TimeUnit unit) {
      if (log.isTraceEnabled())
      log.trace("threadpool.scheduleAtFixedRate(Runnable:[" + command + "], initialDelay:[" + initialDelay
      + "], period:[" + period + ' ' + unit + "]).");
      return super.scheduleAtFixedRate(newRunnable(command), initialDelay, period, unit);
      }

      @Override
      public ScheduledFuture scheduleWithFixedDelay(Runnable command, long initialDelay, long delay, TimeUnit unit) {
      if (log.isTraceEnabled())
      log.trace("threadpool.scheduleWithFixedDelay(Runnable:[" + command + "], initialDelay:[" + initialDelay
      + "], period:[" + delay + ' ' + unit + "]).");
      return super.scheduleWithFixedDelay(newRunnable(command), initialDelay, delay, unit);
      }

      /**
      * Listener for Exceptions to submitted and scheduled Runnables and Callables.
      *
      * @author endre
      */
      public interface TaskExceptionListener extends EventListener {
      void taskRaisedException(Runnable r, Throwable t);

      void taskRaisedException(Callable c, Throwable t);
      }

      /**
      * A simple Logging implementation of {@link TaskExceptionListener}.
      *
      * @author endre
      */
      public static class LoggingTaskExceptionListener implements TaskExceptionListener {

      public void taskRaisedException(Runnable r, Throwable t) {
      log.warn("submitted/scheduled Runnable [" + r + "] raised Exception while running.", t);
      }

      public void taskRaisedException(Callable c, Throwable t) {
      log.warn("submitted/scheduled Callable [" + c + "] raised Exception while running.", t);
      }
      }
      }

            martin Martin Buchholz
            rmandalasunw Ranjith Mandala (Inactive)
            Votes:
            0 Vote for this issue
            Watchers:
            0 Start watching this issue

              Created:
              Updated:
              Resolved:
              Imported:
              Indexed: