Uploaded image for project: 'JDK'
  1. JDK
  2. JDK-8176155

SubmissionPublisher closeExceptionally() may override close()

XMLWordPrintable

    • Icon: Bug Bug
    • Resolution: Fixed
    • Icon: P3 P3
    • 9
    • 9
    • core-libs
    • None

        As reported by Dávid Karnok on concurrency-interest
        http://markmail.org/thread/p2hlhe6i4fdntfpe


        The following test fails (after a few rounds for me) because a concurrent call to close() and closeExceptionally() race for the terminal condition and different subscribers may see different terminal state:


        Flow.Subscriber<Integer> createSub(Object[] result, int index, CountDownLatch cdl) {
            return new Flow.Subscriber<Integer>() {
                @Override
                public void onSubscribe(Flow.Subscription subscription) {

                }

                @Override
                public void onNext(Integer item) {

                }

                @Override
                public void onError(Throwable throwable) {
                    result[index] = throwable;
                    cdl.countDown();
                }

                @Override
                public void onComplete() {
                    result[index] = "complete";
                    cdl.countDown();
                }
            };
        }

        @Test
        public void closeErrorRace() throws Exception {
            ExecutorService exec = Executors.newSingleThreadExecutor();
            try {
                for (int i = 0; i < 1000; i++) {
                    System.out.println("Round " + i);
                    SubmissionPublisher<Integer> sp = new SubmissionPublisher<>();

                    CountDownLatch cdl = new CountDownLatch(2);

                    Object[] result = { null, null };

                    sp.subscribe(createSub(result, 0, cdl));

                    Flow.Subscriber<Integer> sb2 = createSub(result, 1, cdl);

                    Throwable ex = new RuntimeException();

                    AtomicInteger wip = new AtomicInteger(2);

                    Runnable r1 = () -> {
                        wip.decrementAndGet();
                        while (wip.get() != 0) ;

                        sp.closeExceptionally(ex);
                        sp.subscribe(sb2);
                    };

                    exec.submit(r1);

                    wip.decrementAndGet();
                    while (wip.get() != 0) ;

                    sp.close();

                    assertTrue(cdl.await(5, TimeUnit.SECONDS));

                    assertEquals(result[0], result[1]);
                }
            } finally {
                exec.shutdownNow();
            }
        }
        I'm assuming consistency is desirable here and the fix could be an if statement inside the synchronized block of closeExceptionally():

        if (!closed) {
            BufferedSubscription b;
            synchronized (this) {
                b = clients;
                if (b == null) { // or if (closed) {
                    return;
                }
                clients = null;
                closed = true;
                closedException = error;
            }
            // the rest
        }

              martin Martin Buchholz
              martin Martin Buchholz
              Votes:
              0 Vote for this issue
              Watchers:
              4 Start watching this issue

                Created:
                Updated:
                Resolved: