Details
Description
Summary
Improve interoperability between HTTP Client's request BodyPublisher
and response BodySubscriber
, and regular
java.util.concurrent.Flow.Publisher
and
java.util.concurrent.Flow.Subscriber
, respectively.
Problem
The incubating HTTP Client defines types that support publishing and
subscribing to HTTP request and response bodies, respectively. These
types, HttpRequest.BodyPublisher
and HttpResponse.BodySubscriber
,
add HTTP protocol specific behavior and API semantics beyond that of
their supertypes, Flow.Publisher
and Flow.Subscriber
, respectively.
It is desirable for the HTTP Client to operate with "regular"
Flow.Publisher
and Flow.Subscriber
without every developer having
to write their own wrappers or adapters, which would be cumbersome and
could lead to bugs. Supporting "regular" Flow.Publisher
and
Flow.Subscriber
broadens the possible set of body processors that can
be used with the HTTP Client, and improves interoperability with
existing technologies that already use reactive-streams Flow.
Solution
Add static factory methods, similar to other HTTP Client body
publishers and subscribers, that adapt a given Flow.Publisher
, or
Flow.Subscriber
, to an appropriate HTTP Client BodyPublisher
, or
BodySubscriber
. Specifically:
On the request side.
To HttpRequest.BodyPublisher
add:
1) fromPublisher(Flow.Publisher<? extends ByteBuffer> publisher)
2) fromPublisher(Flow.Publisher<? extends ByteBuffer> publisher,
long contentLength)
The first variant returns a BodyPublisher
with an unknown
content length. The second variant returns a BodyProcessor with a
given known content length. Two variants are provided to 1) avoid
special casing a constant to represent an unknown content length,
and 2) avoid handling of the corner case given known content length of
0
.
On the response side.
To HttpResponse.BodyHandler
add:
1) fromSubscriber(Subscriber<? super List<ByteBuffer>> subscriber)
2) <S extends Subscriber<? super List<ByteBuffer>>,T>
fromSubscriber(S subscriber, Function<S,T> finisher)
To HttpResponse.BodySubscriber add:
1) <S extends Subscriber<? super List<ByteBuffer>>>
fromSubscriber(S subscriber)
2) <S extends Subscriber<? super List<ByteBuffer>>,T>
fromSubscriber(S subscriber, Function<S,T> finisher)
The addition of similarly named, arity and type methods in
BodyHandler
and BodySubscriber
follows the current existing
convention. The first variant, accepting just a subscriber, is used for
subscribers where the response body is not made directly available, but
instead made a available through some other mechanism, e.g. an
entry in a database, etc. The second variant, accepts a finisher
function that is used to create / generate the response value,
which is then made available through the usual HttpResponse::body
.
Specification
Discussion has taken place on net-dev, see http://mail.openjdk.java.net/pipermail/net-dev/2017-December/011063.html . Code Review: http://mail.openjdk.java.net/pipermail/net-dev/2017-December/011086.html . Webrev: http://cr.openjdk.java.net/~chegar/8193365/webrev.01/ .
To BodyPublisher
add:
/**
* Returns a request body publisher whose body is retrieved from the
* given {@code Flow.Publisher}. The returned request body publisher
* has an unknown content length.
*
* @apiNote This method can be used as an adapter between {@code
* BodyPublisher} and {@code Flow.Publisher}, where the amount of
* request body that the publisher will publish is unknown.
*
* @param publisher the publisher responsible for publishing the body
* @return a BodyPublisher
*/
static BodyPublisher fromPublisher(Flow.Publisher<? extends ByteBuffer> publisher) { ... }
/**
* Returns a request body publisher whose body is retrieved from the
* given {@code Flow.Publisher}. The returned request body publisher
* has the given content length.
*
* <p> The given {@code contentLength} is a positive number, that
* represents the exact amount of bytes the {@code publisher} must
* publish.
*
* @apiNote This method can be used as an adapter between {@code
* BodyPublisher} and {@code Flow.Publisher}, where the amount of
* request body that the publisher will publish is known.
*
* @param publisher the publisher responsible for publishing the body
* @param contentLength a positive number representing the exact
* amount of bytes the publisher will publish
* @throws IllegalArgumentException if the content length is
* non-positive
* @return a BodyPublisher
*/
static BodyPublisher fromPublisher(Flow.Publisher<? extends ByteBuffer> publisher,
long contentLength) { ... }
To BodyHandler
add:
/**
* Returns a response body handler that returns a {@link BodySubscriber
* BodySubscriber}{@code <Void>} obtained from {@linkplain
* BodySubscriber#fromSubscriber(Subscriber)}, with the given
* {@code subscriber}.
*
* <p> The response body is not available through this, or the {@code
* HttpResponse} API, but instead all response body is forwarded to the
* given {@code subscriber}, which should make it available, if
* appropriate, through some other mechanism, e.g. an entry in a
* database, etc.
*
* @apiNote This method can be used as an adapter between {@code
* BodySubscriber} and {@code Flow.Subscriber}.
*
* <p> For example:
* <pre> {@code
* TextSubscriber subscriber = new TextSubscriber();
* HttpResponse<Void> response = client.sendAsync(request,
* BodyHandler.fromSubscriber(subscriber)).join();
* System.out.println(response.statusCode());
* }</pre>
*
* @param subscriber the subscriber
* @return a response body handler
*/
public static BodyHandler<Void>
fromSubscriber(Subscriber<? super List<ByteBuffer>> subscriber) { ... }
/**
* Returns a response body handler that returns a {@link BodySubscriber
* BodySubscriber}{@code <T>} obtained from {@link
* BodySubscriber#fromSubscriber(Subscriber, Function)}, with the
* given {@code subscriber} and {@code finisher} function.
*
* <p> The given {@code finisher} function is applied after the given
* subscriber's {@code onComplete} has been invoked. The {@code finisher}
* function is invoked with the given subscriber, and returns a value
* that is set as the response's body.
*
* @apiNote This method can be used as an adapter between {@code
* BodySubscriber} and {@code Flow.Subscriber}.
*
* <p> For example:
* <pre> {@code
* TextSubscriber subscriber = ...; // accumulates bytes and transforms them into a String
* HttpResponse<String> response = client.sendAsync(request,
* BodyHandler.fromSubscriber(subscriber, TextSubscriber::getTextResult)).join();
* String text = response.body();
* }</pre>
*
* @param <S> the type of the Subscriber
* @param <T> the type of the response body
* @param subscriber the subscriber
* @param finisher a function to be applied after the subscriber has completed
* @return a response body handler
*/
public static <S extends Subscriber<? super List<ByteBuffer>>,T> BodyHandler<T>
fromSubscriber(S subscriber, Function<S,T> finisher) { ... }
To BodySubscriber
add:
/**
* Returns a body subscriber that forwards all response body to the
* given {@code Flow.Subscriber}. The {@linkplain #getBody()} completion
* stage} of the returned body subscriber completes after one of the
* given subscribers {@code onComplete} or {@code onError} has been
* invoked.
*
* @apiNote This method can be used as an adapter between {@code
* BodySubscriber} and {@code Flow.Subscriber}.
*
* @param <S> the type of the Subscriber
* @param subscriber the subscriber
* @return a body subscriber
*/
public static <S extends Subscriber<? super List<ByteBuffer>>> BodySubscriber<Void>
fromSubscriber(S subscriber) { ... }
/**
* Returns a body subscriber that forwards all response body to the
* given {@code Flow.Subscriber}. The {@linkplain #getBody()} completion
* stage} of the returned body subscriber completes after one of the
* given subscribers {@code onComplete} or {@code onError} has been
* invoked.
*
* <p> The given {@code finisher} function is applied after the given
* subscriber's {@code onComplete} has been invoked. The {@code finisher}
* function is invoked with the given subscriber, and returns a value
* that is set as the response's body.
*
* @apiNote This method can be used as an adapter between {@code
* BodySubscriber} and {@code Flow.Subscriber}.
*
* @param <S> the type of the Subscriber
* @param <T> the type of the response body
* @param subscriber the subscriber
* @param finisher a function to be applied after the subscriber has
* completed
* @return a body subscriber
*/
public static <S extends Subscriber<? super List<ByteBuffer>>,T> BodySubscriber<T>
fromSubscriber(S subscriber, Function<S,T> finisher) { ... }
Attachments
Issue Links
- csr of
-
JDK-8193365 Improve interoperability between HTTP Client's BodyPublisher/BodySubscriber and Flow.Subscriber/Publisher
- Closed