diff --git a/mesos-rxjava-client/src/main/java/com/mesosphere/mesos/rx/java/MesosClient.java b/mesos-rxjava-client/src/main/java/com/mesosphere/mesos/rx/java/MesosClient.java index d6f92d3..be0ebdc 100644 --- a/mesos-rxjava-client/src/main/java/com/mesosphere/mesos/rx/java/MesosClient.java +++ b/mesos-rxjava-client/src/main/java/com/mesosphere/mesos/rx/java/MesosClient.java @@ -25,8 +25,10 @@ import io.netty.handler.codec.http.HttpResponseStatus; import io.netty.util.concurrent.DefaultThreadFactory; import io.reactivex.netty.RxNetty; +import io.reactivex.netty.pipeline.ssl.DefaultFactories; import io.reactivex.netty.protocol.http.client.*; import org.jetbrains.annotations.NotNull; +import org.jetbrains.annotations.Nullable; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import rx.Observable; @@ -44,6 +46,7 @@ import java.util.concurrent.*; import java.util.concurrent.atomic.AtomicReference; import java.util.function.Function; +import java.util.function.Supplier; import static com.mesosphere.mesos.rx.java.util.UserAgentEntries.userAgentEntryForGradleArtifact; import static com.mesosphere.mesos.rx.java.util.UserAgentEntries.userAgentEntryForMavenArtifact; @@ -88,6 +91,21 @@ public final class MesosClient { @NotNull private final Observable.Transformer backpressureTransformer; + @Nullable + private final Supplier> headerSupplier; + + MesosClient( + @NotNull final URI mesosUri, + @NotNull final Function, UserAgentEntry> applicationUserAgentEntry, + @NotNull final MessageCodec sendCodec, + @NotNull final MessageCodec receiveCodec, + @NotNull final Send subscribe, + @NotNull final Function, Observable>>> streamProcessor, + @NotNull final Observable.Transformer backpressureTransformer + ) { + this(mesosUri, applicationUserAgentEntry, sendCodec, receiveCodec, subscribe, streamProcessor, backpressureTransformer, null); + } + MesosClient( @NotNull final URI mesosUri, @NotNull final Function, UserAgentEntry> applicationUserAgentEntry, @@ -95,13 +113,15 @@ public final class MesosClient { @NotNull final MessageCodec receiveCodec, @NotNull final Send subscribe, @NotNull final Function, Observable>>> streamProcessor, - @NotNull final Observable.Transformer backpressureTransformer + @NotNull final Observable.Transformer backpressureTransformer, + @Nullable final Supplier> headerSupplier ) { this.mesosUri = mesosUri; this.receiveCodec = receiveCodec; this.subscribe = subscribe; this.streamProcessor = streamProcessor; this.backpressureTransformer = backpressureTransformer; + this.headerSupplier = headerSupplier; userAgent = new UserAgent( applicationUserAgentEntry, @@ -109,7 +129,7 @@ public final class MesosClient { userAgentEntryForGradleArtifact("rxnetty") ); - createPost = curryCreatePost(mesosUri, sendCodec, receiveCodec, userAgent, mesosStreamId); + createPost = curryCreatePost(mesosUri, sendCodec, receiveCodec, userAgent, mesosStreamId, headerSupplier); } /** @@ -127,10 +147,15 @@ public AwaitableSubscription openStream() { final URI uri = resolveMesosUri(mesosUri); - final HttpClient httpClient = RxNetty.newHttpClientBuilder(uri.getHost(), getPort(uri)) - .withName(userAgent.getEntries().get(0).getName()) - .pipelineConfigurator(new HttpClientPipelineConfigurator<>()) - .build(); + HttpClientBuilder builder = RxNetty.newHttpClientBuilder(uri.getHost(), getPort(uri)) + .withName(userAgent.getEntries().get(0).getName()) + .pipelineConfigurator(new HttpClientPipelineConfigurator<>()); + + if (uri.getScheme().equalsIgnoreCase("https")) { + builder = builder.withSslEngineFactory(DefaultFactories.trustAll()); + } + + final HttpClient httpClient = builder.build(); final Observable receives = createPost.call(subscribe) .flatMap(httpClient::submit) @@ -395,15 +420,14 @@ public void onCompleted() { } } - @NotNull // @VisibleForTesting static Func1>> curryCreatePost( - @NotNull final URI mesosUri, - @NotNull final MessageCodec sendCodec, - @NotNull final MessageCodec receiveCodec, - @NotNull final UserAgent userAgent, - @NotNull final AtomicReference mesosStreamId - ) { + @NotNull final URI mesosUri, + @NotNull final MessageCodec sendCodec, + @NotNull final MessageCodec receiveCodec, + @NotNull final UserAgent userAgent, + @NotNull final AtomicReference mesosStreamId, + Supplier> headerSupplier) { return (Send s) -> { final byte[] bytes = sendCodec.encode(s); HttpClientRequest request = HttpClientRequest.createPost(mesosUri.getPath()) @@ -411,6 +435,15 @@ static Func1>> curry .withHeader("Content-Type", sendCodec.mediaType()) .withHeader("Accept", receiveCodec.mediaType()); + if (headerSupplier != null) { + Map headers = headerSupplier.get(); + if (headers != null) { + for (String header : headers.keySet()) { + request = request.withHeader(header, headers.get(header)); + } + } + } + final String streamId = mesosStreamId.get(); if (streamId != null) { request = request.withHeader(MESOS_STREAM_ID, streamId); diff --git a/mesos-rxjava-client/src/main/java/com/mesosphere/mesos/rx/java/MesosClientBuilder.java b/mesos-rxjava-client/src/main/java/com/mesosphere/mesos/rx/java/MesosClientBuilder.java index 23bc1fc..b34a3f2 100644 --- a/mesos-rxjava-client/src/main/java/com/mesosphere/mesos/rx/java/MesosClientBuilder.java +++ b/mesos-rxjava-client/src/main/java/com/mesosphere/mesos/rx/java/MesosClientBuilder.java @@ -25,8 +25,10 @@ import rx.functions.Action0; import java.net.URI; +import java.util.Map; import java.util.Optional; import java.util.function.Function; +import java.util.function.Supplier; import static com.mesosphere.mesos.rx.java.util.Validations.checkNotNull; @@ -46,6 +48,7 @@ public final class MesosClientBuilder { private Send subscribe; private Function, Observable>>> streamProcessor; private Observable.Transformer backpressureTransformer; + private Supplier> headerSupplier; private MesosClientBuilder() { backpressureTransformer = observable -> observable; @@ -208,6 +211,21 @@ public MesosClientBuilder onBackpressureBuffer( return this; } + + /** + * Provides a supplier that will be called to supply arbitrary HTTP headers. + * + * @param headerSupplier supplier callback for arbitrary HTTP request headers + * @return this builder (allowing for further chained calls) + */ + @NotNull + public MesosClientBuilder headerSupplier( + @NotNull final Supplier> headerSupplier + ) { + this.headerSupplier = headerSupplier; + return this; + } + /** * Instructs the HTTP byte[] stream to be composed with reactive pull backpressure such that * a burst of incoming Mesos messages is handled by a bounded buffer rather than a @@ -259,7 +277,8 @@ public final MesosClient build() { checkNotNull(receiveCodec), checkNotNull(subscribe), checkNotNull(streamProcessor), - checkNotNull(backpressureTransformer) + checkNotNull(backpressureTransformer), + headerSupplier ); } diff --git a/mesos-rxjava-client/src/test/java/com/mesosphere/mesos/rx/java/MesosClientTest.java b/mesos-rxjava-client/src/test/java/com/mesosphere/mesos/rx/java/MesosClientTest.java index 939c2c1..31a21ab 100644 --- a/mesos-rxjava-client/src/test/java/com/mesosphere/mesos/rx/java/MesosClientTest.java +++ b/mesos-rxjava-client/src/test/java/com/mesosphere/mesos/rx/java/MesosClientTest.java @@ -76,8 +76,8 @@ public void testRequestUriFromPassedUri() throws Exception { new UserAgent( literal("testing", "latest") ), - new AtomicReference<>(null) - ); + new AtomicReference<>(null), + null); final HttpClientRequest request = createPost.call("something") .toBlocking() @@ -95,8 +95,8 @@ public void testBasicAuthHeaderAddedToRequestWhenUserInfoPresentInUri() throws E new UserAgent( literal("testing", "latest") ), - new AtomicReference<>(null) - ); + new AtomicReference<>(null), + null); final HttpClientRequest request = createPost.call("something") .toBlocking() @@ -122,8 +122,8 @@ public void testMesosStreamIdAddedToRequestWhenNonNull() throws Exception { new UserAgent( literal("testing", "latest") ), - new AtomicReference<>("streamId") - ); + new AtomicReference<>("streamId"), + null); final HttpClientRequest request = createPost.call("something") .toBlocking() @@ -143,8 +143,8 @@ public void testMesosStreamIdNotPresentWhenNull() throws Exception { new UserAgent( literal("testing", "latest") ), - new AtomicReference<>(null) - ); + new AtomicReference<>(null), + null); final HttpClientRequest request = createPost.call("something") .toBlocking()