Summary
Add a BodySubscribers::ofPublisher()
method that makes it possible to subscribe
to an HttpResponse
body through a Flow.Publisher<List<ByteBuffer>>
, and
add a corresponding BodyHandlers::ofPublisher()
method.
Problem
This new API was suggested during the implementation review of JEP 321. See http://mail.openjdk.java.net/pipermail/net-dev/2018-March/011311.html and http://mail.openjdk.java.net/pipermail/net-dev/2018-April/011314.html.
Some Reactive Stream implementations offer the possibility to assemble out-of-the-box
chains of Publishers. Making it possible to subscribe to the bytes composing the
body of an HttpResponse
through an out-of-the box Flow.Publisher
, instead of having
to code an implementation of BodyHandler
/BodySubscriber
to do so, will make
integration with such Reactive Streams implementations smoother.
Solution
Provide a public static BodyHandlers::ofPublisher()
factory method that returns a
BodyHandler<Publisher<List<ByteBuffer>>>
, and a corresponding
BodySubscribers::ofPublisher()
method that returns a
BodySubscriber<Publisher<List<ByteBuffer>>>
.
Specification
The API documentation of the two new methods is detailed below. For the reader's convenience a live webrev link based on sandbox, http-client-branch is also provided: http://cr.openjdk.java.net/~dfuchs/webrev_8201186/webrev.00/
HttpResponse.BodyHandlers:
/**
* Returns a {@code BodyHandler<Publisher<List<ByteBuffer>>>} that creates a
* {@link BodySubscriber BodySubscriber}{@code <Publisher<List<ByteBuffer>>>}
* obtained from {@link BodySubscribers#ofPublisher()
* BodySubscribers.ofPublisher()}.
*
* <p> When the {@code HttpResponse} object is returned, the response
* headers will have been completely read, but the body may not have
* been fully received yet. The {@link #body()} method returns a
* {@link Publisher Publisher<List<ByteBuffer>>} from which the body
* response bytes can be obtained as they are received. The publisher
* can and must be subscribed to only once.
*
* @apiNote See {@link BodySubscribers#ofPublisher()} for more
* information.
*
* @return a response body handler
*/
public static BodyHandler<Publisher<List<ByteBuffer>>> ofPublisher() {
return (status, headers) -> BodySubscribers.ofPublisher();
}
HttpResponse.BodySubscribers:
/**
* Returns a response subscriber which publishes the response body
* through a {@link Publisher Publisher<List<ByteBuffer>>}.
*
* <p> The {@link HttpResponse} using this subscriber is available
* immediately after the response headers have been read, without
* requiring to wait for the entire body to be processed. The response
* body bytes can then be obtained by subscribing to the publisher
* returned by the {@code HttpResponse} {@link HttpResponse#body() body}
* method.
*
* <p>The publisher returned by the {@link HttpResponse#body() body}
* method can be subscribed to only once. The first subscriber will
* receive the body response bytes if successfully subscribed, or will
* cause the subscription to be cancelled otherwise.
* If more subscriptions are attempted, the subsequent subscribers will
* be immediately subscribed with an empty subscription and their
* {@link Subscriber#onError(Throwable) onError} method
* will be invoked with an {@code IllegalStateException}.
*
* @apiNote To ensure that all resources associated with the
* corresponding exchange are properly released the caller must
* ensure that the provided publisher is subscribed once, and either
* {@linkplain Subscription#request(long) requests} all bytes
* until {@link Subscriber#onComplete() onComplete} or
* {@link Subscriber#onError(Throwable) onError} are invoked, or
* cancel the provided {@linkplain Subscriber#onSubscribe(Subscription)
* subscription} if it is unable or unwilling to do so.
* Note that depending on the actual HTTP protocol {@linkplain
* HttpClient.Version version} used for the exchange, cancelling the
* subscription instead of exhausting the flow may cause the underlying
* HTTP connection to be closed and prevent it from being reused for
* subsequent operations.
*
* @return A {@code BodySubscriber} which publishes the response body
* through a {@code Publisher<List<ByteBuffer>>}.
*/
public static BodySubscriber<Publisher<List<ByteBuffer>>> ofPublisher() {
return ResponseSubscribers.createPublisher();
}
- csr of
-
JDK-8201186 Add BodyHandler<Publisher<List<ByteBuffer>>>
-
- Resolved
-