Skip to content
This repository was archived by the owner on Oct 23, 2024. It is now read-only.
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,10 @@
import io.netty.handler.codec.http.HttpResponseStatus;
import io.netty.util.concurrent.DefaultThreadFactory;
import io.reactivex.netty.RxNetty;
import io.reactivex.netty.pipeline.ssl.DefaultFactories;
import io.reactivex.netty.protocol.http.client.*;
import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import rx.Observable;
Expand All @@ -44,6 +46,7 @@
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Function;
import java.util.function.Supplier;

import static com.mesosphere.mesos.rx.java.util.UserAgentEntries.userAgentEntryForGradleArtifact;
import static com.mesosphere.mesos.rx.java.util.UserAgentEntries.userAgentEntryForMavenArtifact;
Expand Down Expand Up @@ -88,28 +91,45 @@ public final class MesosClient<Send, Receive> {
@NotNull
private final Observable.Transformer<byte[], byte[]> backpressureTransformer;

@Nullable
private final Supplier<Map<String, String>> headerSupplier;

MesosClient(
@NotNull final URI mesosUri,
@NotNull final Function<Class<?>, UserAgentEntry> applicationUserAgentEntry,
@NotNull final MessageCodec<Send> sendCodec,
@NotNull final MessageCodec<Receive> receiveCodec,
@NotNull final Send subscribe,
@NotNull final Function<Observable<Receive>, Observable<Optional<SinkOperation<Send>>>> streamProcessor,
@NotNull final Observable.Transformer<byte[], byte[]> backpressureTransformer
) {
this(mesosUri, applicationUserAgentEntry, sendCodec, receiveCodec, subscribe, streamProcessor, backpressureTransformer, null);
}

MesosClient(
@NotNull final URI mesosUri,
@NotNull final Function<Class<?>, UserAgentEntry> applicationUserAgentEntry,
@NotNull final MessageCodec<Send> sendCodec,
@NotNull final MessageCodec<Receive> receiveCodec,
@NotNull final Send subscribe,
@NotNull final Function<Observable<Receive>, Observable<Optional<SinkOperation<Send>>>> streamProcessor,
@NotNull final Observable.Transformer<byte[], byte[]> backpressureTransformer
@NotNull final Observable.Transformer<byte[], byte[]> backpressureTransformer,
@Nullable final Supplier<Map<String, String>> headerSupplier
) {
this.mesosUri = mesosUri;
this.receiveCodec = receiveCodec;
this.subscribe = subscribe;
this.streamProcessor = streamProcessor;
this.backpressureTransformer = backpressureTransformer;
this.headerSupplier = headerSupplier;

userAgent = new UserAgent(
applicationUserAgentEntry,
userAgentEntryForMavenArtifact("com.mesosphere.mesos.rx.java", "mesos-rxjava-client"),
userAgentEntryForGradleArtifact("rxnetty")
);

createPost = curryCreatePost(mesosUri, sendCodec, receiveCodec, userAgent, mesosStreamId);
createPost = curryCreatePost(mesosUri, sendCodec, receiveCodec, userAgent, mesosStreamId, headerSupplier);
}

/**
Expand All @@ -127,10 +147,15 @@ public AwaitableSubscription openStream() {

final URI uri = resolveMesosUri(mesosUri);

final HttpClient<ByteBuf, ByteBuf> httpClient = RxNetty.<ByteBuf, ByteBuf>newHttpClientBuilder(uri.getHost(), getPort(uri))
.withName(userAgent.getEntries().get(0).getName())
.pipelineConfigurator(new HttpClientPipelineConfigurator<>())
.build();
HttpClientBuilder<ByteBuf, ByteBuf> builder = RxNetty.<ByteBuf, ByteBuf>newHttpClientBuilder(uri.getHost(), getPort(uri))
.withName(userAgent.getEntries().get(0).getName())
.pipelineConfigurator(new HttpClientPipelineConfigurator<>());

if (uri.getScheme().equalsIgnoreCase("https")) {
builder = builder.withSslEngineFactory(DefaultFactories.trustAll());
}

final HttpClient<ByteBuf, ByteBuf> httpClient = builder.build();

final Observable<Receive> receives = createPost.call(subscribe)
.flatMap(httpClient::submit)
Expand Down Expand Up @@ -395,22 +420,30 @@ public void onCompleted() {
}
}

@NotNull
// @VisibleForTesting
static <Send, Receive> Func1<Send, Observable<HttpClientRequest<ByteBuf>>> curryCreatePost(
@NotNull final URI mesosUri,
@NotNull final MessageCodec<Send> sendCodec,
@NotNull final MessageCodec<Receive> receiveCodec,
@NotNull final UserAgent userAgent,
@NotNull final AtomicReference<String> mesosStreamId
) {
@NotNull final URI mesosUri,
@NotNull final MessageCodec<Send> sendCodec,
@NotNull final MessageCodec<Receive> receiveCodec,
@NotNull final UserAgent userAgent,
@NotNull final AtomicReference<String> mesosStreamId,
Supplier<Map<String, String>> headerSupplier) {
return (Send s) -> {
final byte[] bytes = sendCodec.encode(s);
HttpClientRequest<ByteBuf> request = HttpClientRequest.createPost(mesosUri.getPath())
.withHeader("User-Agent", userAgent.toString())
.withHeader("Content-Type", sendCodec.mediaType())
.withHeader("Accept", receiveCodec.mediaType());

if (headerSupplier != null) {
Map<String, String> headers = headerSupplier.get();
if (headers != null) {
for (String header : headers.keySet()) {
request = request.withHeader(header, headers.get(header));
}
}
}

final String streamId = mesosStreamId.get();
if (streamId != null) {
request = request.withHeader(MESOS_STREAM_ID, streamId);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,10 @@
import rx.functions.Action0;

import java.net.URI;
import java.util.Map;
import java.util.Optional;
import java.util.function.Function;
import java.util.function.Supplier;

import static com.mesosphere.mesos.rx.java.util.Validations.checkNotNull;

Expand All @@ -46,6 +48,7 @@ public final class MesosClientBuilder<Send, Receive> {
private Send subscribe;
private Function<Observable<Receive>, Observable<Optional<SinkOperation<Send>>>> streamProcessor;
private Observable.Transformer<byte[], byte[]> backpressureTransformer;
private Supplier<Map<String, String>> headerSupplier;

private MesosClientBuilder() {
backpressureTransformer = observable -> observable;
Expand Down Expand Up @@ -208,6 +211,21 @@ public MesosClientBuilder<Send, Receive> onBackpressureBuffer(
return this;
}


/**
* Provides a supplier that will be called to supply arbitrary HTTP headers.
*
* @param headerSupplier supplier callback for arbitrary HTTP request headers
* @return this builder (allowing for further chained calls)
*/
@NotNull
public MesosClientBuilder<Send, Receive> headerSupplier(
@NotNull final Supplier<Map<String, String>> headerSupplier
) {
this.headerSupplier = headerSupplier;
return this;
}

/**
* Instructs the HTTP byte[] stream to be composed with reactive pull backpressure such that
* a burst of incoming Mesos messages is handled by a bounded buffer rather than a
Expand Down Expand Up @@ -259,7 +277,8 @@ public final MesosClient<Send, Receive> build() {
checkNotNull(receiveCodec),
checkNotNull(subscribe),
checkNotNull(streamProcessor),
checkNotNull(backpressureTransformer)
checkNotNull(backpressureTransformer),
headerSupplier
);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -76,8 +76,8 @@ public void testRequestUriFromPassedUri() throws Exception {
new UserAgent(
literal("testing", "latest")
),
new AtomicReference<>(null)
);
new AtomicReference<>(null),
null);

final HttpClientRequest<ByteBuf> request = createPost.call("something")
.toBlocking()
Expand All @@ -95,8 +95,8 @@ public void testBasicAuthHeaderAddedToRequestWhenUserInfoPresentInUri() throws E
new UserAgent(
literal("testing", "latest")
),
new AtomicReference<>(null)
);
new AtomicReference<>(null),
null);

final HttpClientRequest<ByteBuf> request = createPost.call("something")
.toBlocking()
Expand All @@ -122,8 +122,8 @@ public void testMesosStreamIdAddedToRequestWhenNonNull() throws Exception {
new UserAgent(
literal("testing", "latest")
),
new AtomicReference<>("streamId")
);
new AtomicReference<>("streamId"),
null);

final HttpClientRequest<ByteBuf> request = createPost.call("something")
.toBlocking()
Expand All @@ -143,8 +143,8 @@ public void testMesosStreamIdNotPresentWhenNull() throws Exception {
new UserAgent(
literal("testing", "latest")
),
new AtomicReference<>(null)
);
new AtomicReference<>(null),
null);

final HttpClientRequest<ByteBuf> request = createPost.call("something")
.toBlocking()
Expand Down