Uploaded image for project: 'JDK'
  1. JDK
  2. JDK-8187947

A race condition in SubmissionPublisher

XMLWordPrintable

    • Icon: Bug Bug
    • Resolution: Fixed
    • Icon: P4 P4
    • 10
    • 10
    • core-libs
    • None

      http://cs.oswego.edu/pipermail/concurrency-interest/2017-September/016177.html
      --------------------------------------------------------------
      Consider the following example:

          public class SubmissionPublisherTest {

              private final static int N = 1 << 20;

              private final AtomicInteger numbers = new AtomicInteger();
              private final SubmissionPublisher<Integer> pub = new
      SubmissionPublisher<>();
              private final ExecutorService pubExecutor =
      Executors.newSingleThreadExecutor();
              private final CountDownLatch finished = new CountDownLatch(1);

              public static void main(String[] args) throws InterruptedException {
                  new SubmissionPublisherTest().run();
              }

              private void run() throws InterruptedException {
                  pub.subscribe(newSubscriber());
                  try {
                      finished.await(30, TimeUnit.SECONDS);
                  } finally {
                      pubExecutor.shutdownNow();
                  }
                  System.out.println("Finished");
              }

              private Flow.Subscriber<Integer> newSubscriber() {
                  return new Flow.Subscriber<>() {

                      Flow.Subscription sub;
                      int received;

                      @Override
                      public void onSubscribe(Flow.Subscription s) {
                          (this.sub = s).request(N);
                          publish();
                      }

                      @Override
                      public void onNext(Integer item) {
                          if (++received == N) finished.countDown();
                          publish();
                          System.out.println(item);
                      }

                      @Override public void onError(Throwable t) { }
                      @Override public void onComplete() { }
                  };
              }

              private void publish() {
                  int number = numbers.incrementAndGet();
                  BiPredicate<Flow.Subscriber<? super Integer>, Integer>
                          onDropReportError = (s, i) -> { throw new
      InternalError(); };
                  pubExecutor.execute(() -> pub.offer(number, onDropReportError));
          // pub.offer(number, onDropReportError);
              }
          }

      What happens here?

      Subscriber.onSubscribe bulk-requests a number (N = 1048576) of integers from
      SubmissionPublisher. Subscriber.onNext prints the integer it has received and
      offers a single integer to the publisher. run() waits until all N integers have
      been received and then returns.

      onSubscribe offers an initial integer to the SubmissionPublisher. This
      kick-starts
      the "feedback loop".

      If you run this snippet, chances are good you will not see all 1048576 integers
      printed out to the console before the program terminates. Instead, the output
      will stop at some number m < 1048576 for no apparent reason.

      However, if you comment out the line:

          pubExecutor.execute(() -> pub.offer(number, onDropReportError));

      and uncomment the previously commented line, everything will work as expected.
      The difference is that in this case offers of integers happen synchronously
      rather than from a separate thread.

            dl Doug Lea
            prappo Pavel Rappo (Inactive)
            Votes:
            0 Vote for this issue
            Watchers:
            3 Start watching this issue

              Created:
              Updated:
              Resolved: