Uploaded image for project: 'JDK'
  1. JDK
  2. JDK-6663476

FutureTask.get() may return null if set() is not called from run()

    XMLWordPrintable

Details

    • Bug
    • Resolution: Unresolved
    • P4
    • None
    • 6u4
    • core-libs

    Description

      SYNOPSIS:

      The FutureTask.Sync.runner variable is not managed in the same way that it is for FutureTask.run().

      OPERATING SYSTEM(S):
      Windows

      FULL JDK VERSION(S):
      Sun Java 1.5.0_14 and Sun Java 6 update4

      DESCRIPTION:

      Problem is observed only on a multiprocessor machine and not on single processor machine.Had run the program across JDKs and had observed the issue.
      If needed,outputs of programs run can be provided.

      Sample Program:
      //package testFutureTask;

      import java.util.Random;
      import java.util.concurrent.Callable;
      import java.util.concurrent.CountDownLatch;
      import java.util.concurrent.ExecutionException;
      import java.util.concurrent.FutureTask;
      import java.util.concurrent.Semaphore;
      import java.util.concurrent.TimeUnit;
      import java.util.concurrent.TimeoutException;

      /**
       * Demonstrates that {@link FutureTask#set} doesn't safely publish the
       * value. testSet failed on my Intellistation z-Pro (dual processor, each
       * hyperthreaded) with the following message:
       *
       * Failure counts by method (of 1642125 attempts):
       * get, with infinite wait: 8189
       * get, with timeout: 6268
       * get, with zero wait: 4194
       *
       * @author James Synge
       */
      public class TestFutureTaskSubClass {
          public static void main(String[] args) throws Exception {
              TestFutureTaskSubClass test = new TestFutureTaskSubClass();
              test.test(true);

              test = new TestFutureTaskSubClass();
              test.test(false);
          }

          protected static final int TOTAL_METHODS = 3;
          private Thread[] threads;
          private FutureTask<int[]>[] tasks;

          protected volatile Integer mt_expectedV = null;
          protected volatile AFutureValue mt_fv = null;
          protected volatile CountDownLatch mt_doneGetting = null;
          protected volatile boolean mt_failed = false;
          protected volatile boolean mt_doStop = false;

          /**
           * Test method for {@link java.util.concurrent.FutureTask#set(java.lang.Object)}.
           * @throws Exception
           */
          public void test(final boolean useSet) throws Exception {
              System.out.println("--------------------------------------------");
              if (useSet) {
                  System.out.println("Testing FutureValue.set");
              }
              else {
                  System.out.println("Testing FutureValue.run");
              }
              /*
               * Create background threads that will get the value
               * of a FutureTask provided by this foreground thread.
               */
              createTasksAndThreads();

              System.out.println("Started " + tasks.length + " threads to read from the FutureValue");

              /*
               * Run for some amount of time.
               */
              final int DURATION = 10 * 1000; // 10 seconds
              System.out.println("Starting main loop...");

              final long startTime = System.currentTimeMillis();
              final long endTime = startTime + DURATION;

              int counter = 0;
              mt_failed = false;
              while (!mt_failed) {
                  long now = System.currentTimeMillis();
                  if (now >= endTime) {
                      System.out.println("Main loop duration reached");
                      break;
                  }

                  /*
                   * Objects that the threads will need after they
                   * get.
                   */
                  mt_expectedV = new Integer((int)(now & 0xffffff));
                  mt_doneGetting = new CountDownLatch(threads.length);

                  /*
                   * The FutureValue that the threads will try to get
                   * the expected value from.
                   */
                  if (useSet) {
                      mt_fv = new AFutureValue();
                  }
                  else {
                      mt_fv = new AFutureValue(new FixedResult<Integer>(mt_expectedV));
                  }

                  /*
                   * Yield the process before setting the value... sometimes.
                   */
                  if (counter % 10 == 0) {
                      Thread.yield();
                  }

                  /*
                   * Set the value.
                   */
                  if (useSet) {
                      assertTrue(mt_fv.mySet(mt_expectedV));
                  }
                  else {
                      mt_fv.run();
                      assertTrue(mt_fv.isDone());
                      assertTrue(!mt_fv.isCancelled());
                  }

                  assertSame(mt_expectedV, mt_fv.get());

                  /*
                   * Wait for the threads to finish getting the value.
                   */
                  if (!mt_doneGetting.await(DURATION, TimeUnit.MILLISECONDS)) {
                      throw new AssertionError("waited too long for reader threads to read");
                  }

                  counter++;
                  if (counter % 100000 == 0) {
                      System.out.println("Completed " + counter + " loops");
                  }
              }

              mt_doStop = true;
              mt_fv = null;

              /*
               * Wait for all threads to finish.
               */
              System.out.println("Waiting for reader threads to finish");
              for (Thread thread : threads) {
                  thread.join();
              }

              /*
               * Did any of the threads fail?
               */
              boolean didFail = false;
              int[] sumFailuresByMethod = new int[TOTAL_METHODS];
              for (FutureTask<int[]> task : tasks) {
                  int[] failuresByMethod = task.get();
                  for (int method = 0; method < TOTAL_METHODS; method++) {
                      sumFailuresByMethod[method] += failuresByMethod[method];
                      if (failuresByMethod[method] != 0) {
                          didFail = true;
                      }
                  }
              }

              if (!didFail) {
                  System.out.println("No problems encountered");
                  return;
              }

              int attempts = threads.length * counter;
              String msg = String.format(
                      "Failure counts by method (of %d attempts):\n" +
                      " get, with infinite wait:\t%d\n" +
                      " get, with timeout:\t%d\n" +
                      " get, with zero wait:\t%d",
                      attempts,
                      sumFailuresByMethod[0],
                      sumFailuresByMethod[1],
                      sumFailuresByMethod[2]);
              System.err.println(msg);
              return;
          }

          private void assertSame(Integer expected, Integer actual) {
              if (expected == actual) {
                  return;
              }

              String msg = String.format(
                      "Expected instances to be the same, but they aren't\n" +
                      " expected identityHashCode: %d\n" +
                      " actual identityHashCode: %d\n" +
                      " expected value: %s\n" +
                      " actual value: %s",
                      System.identityHashCode(expected),
                      System.identityHashCode(actual),
                      expected, actual);

              throw new AssertionError(msg);
          }

          private void assertTrue(boolean v) {
              if (v) {
                  return;
              }
              throw new AssertionError("Expected true, as false");
          }

          /**
           * Need at least one background thread, and ideally want one thread
           * (including the foreground testing thread) on each processor.
           */
          private void createTasksAndThreads() {
              int numProcessors = Runtime.getRuntime().availableProcessors();
              final int numThreads = (numProcessors > 1) ? (numProcessors - 1) : numProcessors;

              threads = new Thread[numThreads];
              tasks = new FutureTask[numThreads];

              for (int i = 0; i < threads.length; i++) {
                  BackgroundReader bgReader = new BackgroundReader();
                  FutureTask<int[]> task = new FutureTask<int[]>(bgReader);
                  tasks[i] = task;
                  Thread thread = new Thread(task);
                  thread.start();
                  threads[i] = thread;
              }
          }

          /**
           * Define the block that will be executed by the background threads.
           */
          class BackgroundReader implements Callable<int[]> {
              public int[] call() throws Exception {
                  int gets = 0;
                  FutureTask<Integer> prevFV = null;
                  FutureTask<Integer> fv = null;
                  int method = new Random().nextInt(3);
                  int[] failuresByMethod = new int[3];
                  int spinLoops = 0;
                  while (!mt_doStop) {
                      /*
                       * Spin loop to get the next FutureValue.
                       */
                      fv = mt_fv;
                      if (fv == prevFV) {
                          spinLoops++;
                          if (spinLoops % 10000000 == 0) {
                              System.out.println(
                                      "Long FV update loop in thread " +
                                      Thread.currentThread().getName() +
                                      "; count = " +
                                      spinLoops +
                                      "; gets = " +
                                      gets);
                          }
                          continue;
                      }

                      if (fv == null) {
                          break;
                      }

                      prevFV = fv;
                      spinLoops = 0;
                      gets++;

                      /*
                       * Try several methods to get the value.
                       */
                      Integer value = null;
                      if (method == 0) {
                          // Wait until value is set.
                          value = fv.get();
                      }
                      else if (method == 1) {
                          // Wait a reasonable amount of time.
                          value = fv.get(100, TimeUnit.MILLISECONDS);
                      }
                      else if (method == 2) {
                          // Poll for the value (i.e. don't wait, but instead spin).
                          while (true) {
                              try {
                                  value = fv.get(0, TimeUnit.SECONDS);
                                  break;
                              }
                              catch (TimeoutException ex) {
                                  continue;
                              }
                          }
                      }

                      Integer expectedV = mt_expectedV;

                      try {
                          if (value == null) {
                              failuresByMethod[method]++;
      // mt_failed = true;
      // throw new AssertionError("Method #" + method + " failed");
                          }
                          else if (expectedV != value) {
                              mt_failed = true;
                              throw new AssertionError(
                                      "Gets #" + gets +
                                      ", Method #" + method +
                                      ": wrong instance returned; expected object " +
                                      System.identityHashCode(expectedV) +
                                      ", but got " +
                                      System.identityHashCode(value));
                          }
                      }
                      finally {
                          mt_doneGetting.countDown();
                      }

                      method++;
                      if (method > 2) {
                          method = 0;
                      }
                  }
                  return failuresByMethod;
              }
          }

          /**
           * A sub-class of {@link FutureTask} that attempts to use FutureTask.set.
           */
          class AFutureValue extends FutureTask<Integer> {
              private volatile Thread creatorThread = Thread.currentThread();
              public AFutureValue() {
                  super(new NeverCalled<Integer>());
                  return;
              }
              public AFutureValue(FixedResult<Integer> callable) {
                  super(callable);
                  return;
              }
              public boolean mySet(Integer value) {
                  if (isDone()) {
                      return false;
                  }
                  if (creatorThread != Thread.currentThread()) {
                      throw new IllegalStateException("Must only call from creator's thread");
                  }
                  try {
                      super.set(value);
                      if (isCancelled()) {
                          return false;
                      }
                      else {
                          return true;
                      }
                  }
                  finally {
                      creatorThread = null;
                  }
              }
              @Override
              public boolean cancel(boolean mayInterruptIfRunning) {
                  // TODO Auto-generated method stub
                  return super.cancel(mayInterruptIfRunning);
              }
              @Override
              public Integer get() throws InterruptedException {
                  if (creatorThread == Thread.currentThread() && !isDone()) {
                      throw new IllegalStateException("Must not call from creator's thread");
                  }
                  try {
                      Integer result = super.get();
                      return result;
                  } catch (ExecutionException e) {
                      // Can't happen.
                      throw new IllegalStateException(e);
                  }
              }
              @Override
              public Integer get(long timeout, TimeUnit unit) throws InterruptedException, TimeoutException {
                  if (creatorThread == Thread.currentThread() && !isDone()) {
                      throw new IllegalStateException("Must not call from creator's thread");
                  }
                  try {
                      Integer result = super.get(timeout, unit);
                      return result;
                  } catch (ExecutionException e) {
                      // Can't happen.
                      throw new IllegalStateException(e);
                  }
              }

          }

          class NeverCalled<V> implements Callable<V> {
              public V call() throws Exception {
                  throw new UnsupportedOperationException();
              }
          }

          class FixedResult<V> implements Callable<V> {
              private final V result;
              FixedResult(V result) {
                  this.result = result;
                  return;
              }
              public V call() throws Exception {
                  return result;
              }
          }
      }

      Attachments

        Issue Links

          Activity

            People

              Unassigned Unassigned
              elarsen Erik Larsen (Inactive)
              Votes:
              0 Vote for this issue
              Watchers:
              0 Start watching this issue

              Dates

                Created:
                Updated:
                Imported:
                Indexed: