-
Bug
-
Resolution: Fixed
-
P4
-
16
-
b07
There's a subtle race condition in AggregatePublisher.AggregateSubscription. The AggregatePublisher will subscribe to downstream publishers in turn, subscribing to the next publisher when the previous publisher onComplete() has been called.
The request() method passed to the upstream subscriber detects that all subscribers have completed if the current downstream publisher is null and the queue of publisher yet to subscribe to is empty.
The event loop is responsible for subscribing to the next publisher, and does so by polling the queue and assigning the returned value to this.publisher, and then subscribe to that publisher. However, when subscribing to the last publisher in the queue, there is a small time window where the queue can be observed to be empty, before this.publisher is assigned.
Some locking/synchronizing is needed to make sure we see a consistent state.
The request() method passed to the upstream subscriber detects that all subscribers have completed if the current downstream publisher is null and the queue of publisher yet to subscribe to is empty.
The event loop is responsible for subscribing to the next publisher, and does so by polling the queue and assigning the returned value to this.publisher, and then subscribe to that publisher. However, when subscribing to the last publisher in the queue, there is a small time window where the queue can be observed to be empty, before this.publisher is assigned.
Some locking/synchronizing is needed to make sure we see a consistent state.
- links to
-
Commit(master) openjdk/jdk/b720517c
-
Review(master) openjdk/jdk/23204