Uploaded image for project: 'JDK'
  1. JDK
  2. JDK-8201325

Add BodyHandler<Publisher<List<ByteBuffer>>>

    XMLWordPrintable

Details

    • CSR
    • Resolution: Approved
    • P3
    • 11
    • core-libs
    • source
    • minimal
    • New API in final static classes - there should be no compatibility issue.
    • Java API
    • SE

    Description

      Summary

      Add a BodySubscribers::ofPublisher() method that makes it possible to subscribe to an HttpResponse body through a Flow.Publisher<List<ByteBuffer>>, and add a corresponding BodyHandlers::ofPublisher() method.

      Problem

      This new API was suggested during the implementation review of JEP 321. See http://mail.openjdk.java.net/pipermail/net-dev/2018-March/011311.html and http://mail.openjdk.java.net/pipermail/net-dev/2018-April/011314.html.

      Some Reactive Stream implementations offer the possibility to assemble out-of-the-box chains of Publishers. Making it possible to subscribe to the bytes composing the body of an HttpResponse through an out-of-the box Flow.Publisher, instead of having to code an implementation of BodyHandler/BodySubscriber to do so, will make integration with such Reactive Streams implementations smoother.

      Solution

      Provide a public static BodyHandlers::ofPublisher() factory method that returns a BodyHandler<Publisher<List<ByteBuffer>>>, and a corresponding BodySubscribers::ofPublisher() method that returns a BodySubscriber<Publisher<List<ByteBuffer>>>.

      Specification

      The API documentation of the two new methods is detailed below. For the reader's convenience a live webrev link based on sandbox, http-client-branch is also provided: http://cr.openjdk.java.net/~dfuchs/webrev_8201186/webrev.00/

      HttpResponse.BodyHandlers:

          /**
           * Returns a {@code BodyHandler<Publisher<List<ByteBuffer>>>} that creates a
           * {@link BodySubscriber BodySubscriber}{@code <Publisher<List<ByteBuffer>>>}
           * obtained from {@link BodySubscribers#ofPublisher()
           * BodySubscribers.ofPublisher()}.
           *
           * <p> When the {@code HttpResponse} object is returned, the response
           * headers will have been completely read, but the body may not have
           * been fully received yet. The {@link #body()} method returns a
           * {@link Publisher Publisher<List<ByteBuffer>>} from which the body
           * response bytes can be obtained as they are received. The publisher
           * can and must be subscribed to only once.
           *
           * @apiNote See {@link BodySubscribers#ofPublisher()} for more
           * information.
           *
           * @return a response body handler
           */
          public static BodyHandler<Publisher<List<ByteBuffer>>> ofPublisher() {
              return (status, headers) -> BodySubscribers.ofPublisher();
          }

      HttpResponse.BodySubscribers:

          /**
           * Returns a response subscriber which publishes the response body
           * through a {@link Publisher Publisher<List<ByteBuffer>>}.
           *
           * <p> The {@link HttpResponse} using this subscriber is available
           * immediately after the response headers have been read, without
           * requiring to wait for the entire body to be processed. The response
           * body bytes can then be obtained by subscribing to the publisher
           * returned by the {@code HttpResponse} {@link HttpResponse#body() body}
           * method.
           *
           * <p>The publisher returned by the {@link HttpResponse#body() body}
           * method can be subscribed to only once. The first subscriber will
           * receive the body response bytes if successfully subscribed, or will
           * cause the subscription to be cancelled otherwise.
           * If more subscriptions are attempted, the subsequent subscribers will
           * be immediately subscribed with an empty subscription and their
           * {@link Subscriber#onError(Throwable) onError} method
           * will be invoked with an {@code IllegalStateException}.
           *
           * @apiNote To ensure that all resources associated with the
           * corresponding exchange are properly released the caller must
           * ensure that the provided publisher is subscribed once, and either
           * {@linkplain Subscription#request(long) requests} all bytes
           * until {@link Subscriber#onComplete() onComplete} or
           * {@link Subscriber#onError(Throwable) onError} are invoked, or
           * cancel the provided {@linkplain Subscriber#onSubscribe(Subscription)
           * subscription} if it is unable or unwilling to do so.
           * Note that depending on the actual HTTP protocol {@linkplain
           * HttpClient.Version version} used for the exchange, cancelling the
           * subscription instead of exhausting the flow may cause the underlying
           * HTTP connection to be closed and prevent it from being reused for
           * subsequent operations.
           *
           * @return A {@code BodySubscriber} which publishes the response body
           *         through a {@code Publisher<List<ByteBuffer>>}.
           */
          public static BodySubscriber<Publisher<List<ByteBuffer>>> ofPublisher() {
              return ResponseSubscribers.createPublisher();
          }

      Attachments

        Issue Links

          Activity

            People

              dfuchs Daniel Fuchs
              chegar Chris Hegarty
              Chris Hegarty
              Votes:
              0 Vote for this issue
              Watchers:
              1 Start watching this issue

              Dates

                Created:
                Updated:
                Resolved: