Uploaded image for project: 'JDK'
  1. JDK
  2. JDK-8193366

Improve interoperability between HTTP Client's BodyPublisher/BodySubscriber and Flow.Subscriber/Publisher

    XMLWordPrintable

Details

    • minimal
    • Addition of new API points to an incubating API.
    • Java API
    • JDK

    Description

      Summary

      Improve interoperability between HTTP Client's request BodyPublisher and response BodySubscriber, and regular java.util.concurrent.Flow.Publisher and java.util.concurrent.Flow.Subscriber, respectively.

      Problem

      The incubating HTTP Client defines types that support publishing and subscribing to HTTP request and response bodies, respectively. These types, HttpRequest.BodyPublisher and HttpResponse.BodySubscriber, add HTTP protocol specific behavior and API semantics beyond that of their supertypes, Flow.Publisher and Flow.Subscriber, respectively.

      It is desirable for the HTTP Client to operate with "regular" Flow.Publisher and Flow.Subscriber without every developer having to write their own wrappers or adapters, which would be cumbersome and could lead to bugs. Supporting "regular" Flow.Publisher and Flow.Subscriber broadens the possible set of body processors that can be used with the HTTP Client, and improves interoperability with existing technologies that already use reactive-streams Flow.

      Solution

      Add static factory methods, similar to other HTTP Client body publishers and subscribers, that adapt a given Flow.Publisher, or Flow.Subscriber, to an appropriate HTTP Client BodyPublisher, or BodySubscriber. Specifically:

      On the request side.

      To HttpRequest.BodyPublisher add:

      1) fromPublisher(Flow.Publisher<? extends ByteBuffer> publisher)
      
      2) fromPublisher(Flow.Publisher<? extends ByteBuffer> publisher,
                       long contentLength)

      The first variant returns a BodyPublisher with an unknown content length. The second variant returns a BodyProcessor with a given known content length. Two variants are provided to 1) avoid special casing a constant to represent an unknown content length, and 2) avoid handling of the corner case given known content length of 0.

      On the response side.

      To HttpResponse.BodyHandler add:

      1) fromSubscriber(Subscriber<? super List<ByteBuffer>> subscriber)
      
      2) <S extends Subscriber<? super List<ByteBuffer>>,T>
         fromSubscriber(S subscriber, Function<S,T> finisher)

      To HttpResponse.BodySubscriber add:

      1) <S extends Subscriber<? super List<ByteBuffer>>>
         fromSubscriber(S subscriber)
      
      2) <S extends Subscriber<? super List<ByteBuffer>>,T>
         fromSubscriber(S subscriber, Function<S,T> finisher)

      The addition of similarly named, arity and type methods in BodyHandler and BodySubscriber follows the current existing convention. The first variant, accepting just a subscriber, is used for subscribers where the response body is not made directly available, but instead made a available through some other mechanism, e.g. an entry in a database, etc. The second variant, accepts a finisher function that is used to create / generate the response value, which is then made available through the usual HttpResponse::body.

      Specification

      Discussion has taken place on net-dev, see http://mail.openjdk.java.net/pipermail/net-dev/2017-December/011063.html . Code Review: http://mail.openjdk.java.net/pipermail/net-dev/2017-December/011086.html . Webrev: http://cr.openjdk.java.net/~chegar/8193365/webrev.01/ .

      To BodyPublisher add:

      /**
       * Returns a request body publisher whose body is retrieved from the
       * given {@code Flow.Publisher}. The returned request body publisher
       * has an unknown content length.
       *
       * @apiNote This method can be used as an adapter between {@code
       * BodyPublisher} and {@code Flow.Publisher}, where the amount of
       * request body that the publisher will publish is unknown.
       *
       * @param publisher the publisher responsible for publishing the body
       * @return a BodyPublisher
       */
      static BodyPublisher fromPublisher(Flow.Publisher<? extends ByteBuffer> publisher) { ... }
      
      /**
       * Returns a request body publisher whose body is retrieved from the
       * given {@code Flow.Publisher}. The returned request body publisher
       * has the given content length.
       *
       * <p> The given {@code contentLength} is a positive number, that
       * represents the exact amount of bytes the {@code publisher} must
       * publish.
       *
       * @apiNote This method can be used as an adapter between {@code
       * BodyPublisher} and {@code Flow.Publisher}, where the amount of
       * request body that the publisher will publish is known.
       *
       * @param publisher the publisher responsible for publishing the body
       * @param contentLength a positive number representing the exact
       *                      amount of bytes the publisher will publish
       * @throws IllegalArgumentException if the content length is
       *                                  non-positive
       * @return a BodyPublisher
       */
      static BodyPublisher fromPublisher(Flow.Publisher<? extends ByteBuffer> publisher,
                                         long contentLength) { ... }

      To BodyHandler add:

      /**
       * Returns a response body handler that returns a {@link BodySubscriber
       * BodySubscriber}{@code <Void>} obtained from {@linkplain
       * BodySubscriber#fromSubscriber(Subscriber)}, with the given
       * {@code subscriber}.
       *
       * <p> The response body is not available through this, or the {@code
       * HttpResponse} API, but instead all response body is forwarded to the
       * given {@code subscriber}, which should make it available, if
       * appropriate, through some other mechanism, e.g. an entry in a
       * database, etc.
       *
       * @apiNote This method can be used as an adapter between {@code
       * BodySubscriber} and {@code Flow.Subscriber}.
       *
       * <p> For example:
       * <pre> {@code
       *  TextSubscriber subscriber = new TextSubscriber();
       *  HttpResponse<Void> response = client.sendAsync(request,
       *      BodyHandler.fromSubscriber(subscriber)).join();
       *  System.out.println(response.statusCode());
       * }</pre>
       *
       * @param subscriber the subscriber
       * @return a response body handler
       */
      public static BodyHandler<Void>
      fromSubscriber(Subscriber<? super List<ByteBuffer>> subscriber) { ... }
      
      /**
       * Returns a response body handler that returns a {@link BodySubscriber
       * BodySubscriber}{@code <T>} obtained from {@link
       * BodySubscriber#fromSubscriber(Subscriber, Function)}, with the
       * given {@code subscriber} and {@code finisher} function.
       *
       * <p> The given {@code finisher} function is applied after the given
       * subscriber's {@code onComplete} has been invoked. The {@code finisher}
       * function is invoked with the given subscriber, and returns a value
       * that is set as the response's body.
       *
       * @apiNote This method can be used as an adapter between {@code
       * BodySubscriber} and {@code Flow.Subscriber}.
       *
       * <p> For example:
       * <pre> {@code
       * TextSubscriber subscriber = ...;  // accumulates bytes and transforms them into a String
       * HttpResponse<String> response = client.sendAsync(request,
       *     BodyHandler.fromSubscriber(subscriber, TextSubscriber::getTextResult)).join();
       * String text = response.body();
       * }</pre>
       *
       * @param <S> the type of the Subscriber
       * @param <T> the type of the response body
       * @param subscriber the subscriber
       * @param finisher a function to be applied after the subscriber has completed
       * @return a response body handler
       */
      public static <S extends Subscriber<? super List<ByteBuffer>>,T> BodyHandler<T>
      fromSubscriber(S subscriber, Function<S,T> finisher) { ... }

      To BodySubscriber add:

      /**
       * Returns a body subscriber that forwards all response body to the
       * given {@code Flow.Subscriber}. The {@linkplain #getBody()} completion
       * stage} of the returned body subscriber completes after one of the
       * given subscribers {@code onComplete} or {@code onError} has been
       * invoked.
       *
       * @apiNote This method can be used as an adapter between {@code
       * BodySubscriber} and {@code Flow.Subscriber}.
       *
       * @param <S> the type of the Subscriber
       * @param subscriber the subscriber
       * @return a body subscriber
       */
      public static <S extends Subscriber<? super List<ByteBuffer>>> BodySubscriber<Void>
      fromSubscriber(S subscriber) { ... }
      
      /**
       * Returns a body subscriber that forwards all response body to the
       * given {@code Flow.Subscriber}. The {@linkplain #getBody()} completion
       * stage} of the returned body subscriber completes after one of the
       * given subscribers {@code onComplete} or {@code onError} has been
       * invoked.
       *
       * <p> The given {@code finisher} function is applied after the given
       * subscriber's {@code onComplete} has been invoked. The {@code finisher}
       * function is invoked with the given subscriber, and returns a value
       * that is set as the response's body.
       *
       * @apiNote This method can be used as an adapter between {@code
       * BodySubscriber} and {@code Flow.Subscriber}.
       *
       * @param <S> the type of the Subscriber
       * @param <T> the type of the response body
       * @param subscriber the subscriber
       * @param finisher a function to be applied after the subscriber has
       *                 completed
       * @return a body subscriber
       */
      public static <S extends Subscriber<? super List<ByteBuffer>>,T> BodySubscriber<T>
      fromSubscriber(S subscriber, Function<S,T> finisher) { ... }

      Attachments

        Issue Links

          Activity

            People

              chegar Chris Hegarty
              chegar Chris Hegarty
              Daniel Fuchs, Michael McMahon
              Votes:
              0 Vote for this issue
              Watchers:
              4 Start watching this issue

              Dates

                Created:
                Updated:
                Resolved: