-
Bug
-
Resolution: Fixed
-
P3
-
9
-
None
-
b161
Issue | Fix Version | Assignee | Priority | Status | Resolution | Resolved In Build |
---|---|---|---|---|---|---|
JDK-8176968 | 10 | Doug Lea | P3 | Resolved | Fixed | b03 |
As reported by Dávid Karnok on concurrency-interest
http://markmail.org/thread/p2hlhe6i4fdntfpe
The following test fails (after a few rounds for me) because a concurrent call to close() and closeExceptionally() race for the terminal condition and different subscribers may see different terminal state:
Flow.Subscriber<Integer> createSub(Object[] result, int index, CountDownLatch cdl) {
return new Flow.Subscriber<Integer>() {
@Override
public void onSubscribe(Flow.Subscription subscription) {
}
@Override
public void onNext(Integer item) {
}
@Override
public void onError(Throwable throwable) {
result[index] = throwable;
cdl.countDown();
}
@Override
public void onComplete() {
result[index] = "complete";
cdl.countDown();
}
};
}
@Test
public void closeErrorRace() throws Exception {
ExecutorService exec = Executors.newSingleThreadExecutor();
try {
for (int i = 0; i < 1000; i++) {
System.out.println("Round " + i);
SubmissionPublisher<Integer> sp = new SubmissionPublisher<>();
CountDownLatch cdl = new CountDownLatch(2);
Object[] result = { null, null };
sp.subscribe(createSub(result, 0, cdl));
Flow.Subscriber<Integer> sb2 = createSub(result, 1, cdl);
Throwable ex = new RuntimeException();
AtomicInteger wip = new AtomicInteger(2);
Runnable r1 = () -> {
wip.decrementAndGet();
while (wip.get() != 0) ;
sp.closeExceptionally(ex);
sp.subscribe(sb2);
};
exec.submit(r1);
wip.decrementAndGet();
while (wip.get() != 0) ;
sp.close();
assertTrue(cdl.await(5, TimeUnit.SECONDS));
assertEquals(result[0], result[1]);
}
} finally {
exec.shutdownNow();
}
}
I'm assuming consistency is desirable here and the fix could be an if statement inside the synchronized block of closeExceptionally():
if (!closed) {
BufferedSubscription b;
synchronized (this) {
b = clients;
if (b == null) { // or if (closed) {
return;
}
clients = null;
closed = true;
closedException = error;
}
// the rest
}
http://markmail.org/thread/p2hlhe6i4fdntfpe
The following test fails (after a few rounds for me) because a concurrent call to close() and closeExceptionally() race for the terminal condition and different subscribers may see different terminal state:
Flow.Subscriber<Integer> createSub(Object[] result, int index, CountDownLatch cdl) {
return new Flow.Subscriber<Integer>() {
@Override
public void onSubscribe(Flow.Subscription subscription) {
}
@Override
public void onNext(Integer item) {
}
@Override
public void onError(Throwable throwable) {
result[index] = throwable;
cdl.countDown();
}
@Override
public void onComplete() {
result[index] = "complete";
cdl.countDown();
}
};
}
@Test
public void closeErrorRace() throws Exception {
ExecutorService exec = Executors.newSingleThreadExecutor();
try {
for (int i = 0; i < 1000; i++) {
System.out.println("Round " + i);
SubmissionPublisher<Integer> sp = new SubmissionPublisher<>();
CountDownLatch cdl = new CountDownLatch(2);
Object[] result = { null, null };
sp.subscribe(createSub(result, 0, cdl));
Flow.Subscriber<Integer> sb2 = createSub(result, 1, cdl);
Throwable ex = new RuntimeException();
AtomicInteger wip = new AtomicInteger(2);
Runnable r1 = () -> {
wip.decrementAndGet();
while (wip.get() != 0) ;
sp.closeExceptionally(ex);
sp.subscribe(sb2);
};
exec.submit(r1);
wip.decrementAndGet();
while (wip.get() != 0) ;
sp.close();
assertTrue(cdl.await(5, TimeUnit.SECONDS));
assertEquals(result[0], result[1]);
}
} finally {
exec.shutdownNow();
}
}
I'm assuming consistency is desirable here and the fix could be an if statement inside the synchronized block of closeExceptionally():
if (!closed) {
BufferedSubscription b;
synchronized (this) {
b = clients;
if (b == null) { // or if (closed) {
return;
}
clients = null;
closed = true;
closedException = error;
}
// the rest
}
- backported by
-
JDK-8176968 SubmissionPublisher closeExceptionally() may override close()
-
- Resolved
-