Summary
Add a BodySubscribers::ofPublisher() method that makes it possible to subscribe
to an HttpResponse body through a Flow.Publisher<List<ByteBuffer>>, and
add a corresponding BodyHandlers::ofPublisher() method.
Problem
This new API was suggested during the implementation review of JEP 321. See http://mail.openjdk.java.net/pipermail/net-dev/2018-March/011311.html and http://mail.openjdk.java.net/pipermail/net-dev/2018-April/011314.html.
Some Reactive Stream implementations offer the possibility to assemble out-of-the-box
chains of Publishers. Making it possible to subscribe to the bytes composing the
body of an HttpResponse through an out-of-the box Flow.Publisher, instead of having
to code an implementation of BodyHandler/BodySubscriber to do so, will make
integration with such Reactive Streams implementations smoother.
Solution
Provide a public static BodyHandlers::ofPublisher() factory method that returns a
BodyHandler<Publisher<List<ByteBuffer>>>, and a corresponding
BodySubscribers::ofPublisher() method that returns a
BodySubscriber<Publisher<List<ByteBuffer>>>.
Specification
The API documentation of the two new methods is detailed below. For the reader's convenience a live webrev link based on sandbox, http-client-branch is also provided: http://cr.openjdk.java.net/~dfuchs/webrev_8201186/webrev.00/
HttpResponse.BodyHandlers:
    /**
     * Returns a {@code BodyHandler<Publisher<List<ByteBuffer>>>} that creates a
     * {@link BodySubscriber BodySubscriber}{@code <Publisher<List<ByteBuffer>>>}
     * obtained from {@link BodySubscribers#ofPublisher()
     * BodySubscribers.ofPublisher()}.
     *
     * <p> When the {@code HttpResponse} object is returned, the response
     * headers will have been completely read, but the body may not have
     * been fully received yet. The {@link #body()} method returns a
     * {@link Publisher Publisher<List<ByteBuffer>>} from which the body
     * response bytes can be obtained as they are received. The publisher
     * can and must be subscribed to only once.
     *
     * @apiNote See {@link BodySubscribers#ofPublisher()} for more
     * information.
     *
     * @return a response body handler
     */
    public static BodyHandler<Publisher<List<ByteBuffer>>> ofPublisher() {
        return (status, headers) -> BodySubscribers.ofPublisher();
    }HttpResponse.BodySubscribers:
    /**
     * Returns a response subscriber which publishes the response body
     * through a {@link Publisher Publisher<List<ByteBuffer>>}.
     *
     * <p> The {@link HttpResponse} using this subscriber is available
     * immediately after the response headers have been read, without
     * requiring to wait for the entire body to be processed. The response
     * body bytes can then be obtained by subscribing to the publisher
     * returned by the {@code HttpResponse} {@link HttpResponse#body() body}
     * method.
     *
     * <p>The publisher returned by the {@link HttpResponse#body() body}
     * method can be subscribed to only once. The first subscriber will
     * receive the body response bytes if successfully subscribed, or will
     * cause the subscription to be cancelled otherwise.
     * If more subscriptions are attempted, the subsequent subscribers will
     * be immediately subscribed with an empty subscription and their
     * {@link Subscriber#onError(Throwable) onError} method
     * will be invoked with an {@code IllegalStateException}.
     *
     * @apiNote To ensure that all resources associated with the
     * corresponding exchange are properly released the caller must
     * ensure that the provided publisher is subscribed once, and either
     * {@linkplain Subscription#request(long) requests} all bytes
     * until {@link Subscriber#onComplete() onComplete} or
     * {@link Subscriber#onError(Throwable) onError} are invoked, or
     * cancel the provided {@linkplain Subscriber#onSubscribe(Subscription)
     * subscription} if it is unable or unwilling to do so.
     * Note that depending on the actual HTTP protocol {@linkplain
     * HttpClient.Version version} used for the exchange, cancelling the
     * subscription instead of exhausting the flow may cause the underlying
     * HTTP connection to be closed and prevent it from being reused for
     * subsequent operations.
     *
     * @return A {@code BodySubscriber} which publishes the response body
     *         through a {@code Publisher<List<ByteBuffer>>}.
     */
    public static BodySubscriber<Publisher<List<ByteBuffer>>> ofPublisher() {
        return ResponseSubscribers.createPublisher();
    }- csr of
- 
                    JDK-8201186 Add BodyHandler<Publisher<List<ByteBuffer>>> -           
- Resolved
 
-