-
Bug
-
Resolution: Fixed
-
P4
-
10
-
None
-
b35
-
x86
-
os_x
http://cs.oswego.edu/pipermail/concurrency-interest/2017-September/016177.html
--------------------------------------------------------------
Consider the following example:
public class SubmissionPublisherTest {
private final static int N = 1 << 20;
private final AtomicInteger numbers = new AtomicInteger();
private final SubmissionPublisher<Integer> pub = new
SubmissionPublisher<>();
private final ExecutorService pubExecutor =
Executors.newSingleThreadExecutor();
private final CountDownLatch finished = new CountDownLatch(1);
public static void main(String[] args) throws InterruptedException {
new SubmissionPublisherTest().run();
}
private void run() throws InterruptedException {
pub.subscribe(newSubscriber());
try {
finished.await(30, TimeUnit.SECONDS);
} finally {
pubExecutor.shutdownNow();
}
System.out.println("Finished");
}
private Flow.Subscriber<Integer> newSubscriber() {
return new Flow.Subscriber<>() {
Flow.Subscription sub;
int received;
@Override
public void onSubscribe(Flow.Subscription s) {
(this.sub = s).request(N);
publish();
}
@Override
public void onNext(Integer item) {
if (++received == N) finished.countDown();
publish();
System.out.println(item);
}
@Override public void onError(Throwable t) { }
@Override public void onComplete() { }
};
}
private void publish() {
int number = numbers.incrementAndGet();
BiPredicate<Flow.Subscriber<? super Integer>, Integer>
onDropReportError = (s, i) -> { throw new
InternalError(); };
pubExecutor.execute(() -> pub.offer(number, onDropReportError));
// pub.offer(number, onDropReportError);
}
}
What happens here?
Subscriber.onSubscribe bulk-requests a number (N = 1048576) of integers from
SubmissionPublisher. Subscriber.onNext prints the integer it has received and
offers a single integer to the publisher. run() waits until all N integers have
been received and then returns.
onSubscribe offers an initial integer to the SubmissionPublisher. This
kick-starts
the "feedback loop".
If you run this snippet, chances are good you will not see all 1048576 integers
printed out to the console before the program terminates. Instead, the output
will stop at some number m < 1048576 for no apparent reason.
However, if you comment out the line:
pubExecutor.execute(() -> pub.offer(number, onDropReportError));
and uncomment the previously commented line, everything will work as expected.
The difference is that in this case offers of integers happen synchronously
rather than from a separate thread.
--------------------------------------------------------------
Consider the following example:
public class SubmissionPublisherTest {
private final static int N = 1 << 20;
private final AtomicInteger numbers = new AtomicInteger();
private final SubmissionPublisher<Integer> pub = new
SubmissionPublisher<>();
private final ExecutorService pubExecutor =
Executors.newSingleThreadExecutor();
private final CountDownLatch finished = new CountDownLatch(1);
public static void main(String[] args) throws InterruptedException {
new SubmissionPublisherTest().run();
}
private void run() throws InterruptedException {
pub.subscribe(newSubscriber());
try {
finished.await(30, TimeUnit.SECONDS);
} finally {
pubExecutor.shutdownNow();
}
System.out.println("Finished");
}
private Flow.Subscriber<Integer> newSubscriber() {
return new Flow.Subscriber<>() {
Flow.Subscription sub;
int received;
@Override
public void onSubscribe(Flow.Subscription s) {
(this.sub = s).request(N);
publish();
}
@Override
public void onNext(Integer item) {
if (++received == N) finished.countDown();
publish();
System.out.println(item);
}
@Override public void onError(Throwable t) { }
@Override public void onComplete() { }
};
}
private void publish() {
int number = numbers.incrementAndGet();
BiPredicate<Flow.Subscriber<? super Integer>, Integer>
onDropReportError = (s, i) -> { throw new
InternalError(); };
pubExecutor.execute(() -> pub.offer(number, onDropReportError));
// pub.offer(number, onDropReportError);
}
}
What happens here?
Subscriber.onSubscribe bulk-requests a number (N = 1048576) of integers from
SubmissionPublisher. Subscriber.onNext prints the integer it has received and
offers a single integer to the publisher. run() waits until all N integers have
been received and then returns.
onSubscribe offers an initial integer to the SubmissionPublisher. This
kick-starts
the "feedback loop".
If you run this snippet, chances are good you will not see all 1048576 integers
printed out to the console before the program terminates. Instead, the output
will stop at some number m < 1048576 for no apparent reason.
However, if you comment out the line:
pubExecutor.execute(() -> pub.offer(number, onDropReportError));
and uncomment the previously commented line, everything will work as expected.
The difference is that in this case offers of integers happen synchronously
rather than from a separate thread.
- csr for
-
JDK-8192907 A race condition in SubmissionPublisher
- Closed
- relates to
-
JDK-8193174 SubmissionPublisher invokes the Subscriber's onComplete before all of its submitted items have been published
- Resolved