Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -30,19 +30,20 @@

@Singleton
@AutoBind
@SuppressWarnings({"PMD.GodClass", "PMD.TooManyMethods"})
public class ReactiveRx implements Reactive {

@Inject
public ReactiveRx() {}

@Override
public Runner runner(String name) {
return new RunnerRx(name);
return new RunnerRx();
}

@Override
public Runner runner(String name, int capacity, int queueSize) {
return new RunnerRx(name, capacity, queueSize);
return new RunnerRx(capacity, queueSize);
}

static <V, W> Triple<Flowable<V>, SubscriberRx<V>, StreamContext<W>> getGraph(Stream<W> stream) {
Expand All @@ -57,7 +58,6 @@ static <V, W> Triple<Flowable<V>, SubscriberRx<V>, StreamContext<W>> getGraph(St
throw new IllegalStateException();
}

// TODO: might use ConnectableFlowable
static <V, W> Triple<Flowable<V>, SubscriberRx<V>, StreamContext<W>> getGraph(
StreamDefault<V, W> stream) {
Flowable<V> source = assemble(stream.getSource());
Expand Down Expand Up @@ -289,7 +289,6 @@ public void onNext(byte[] bytes) {
throw new IllegalStateException();
}

// TODO: test
static <U, V, W> SubscriberRx<U> assemble(
SinkTransformedImpl<U, V, W> sink, StreamContext<W> stream) {
UnicastProcessor<U> subscriber = UnicastProcessor.create();
Expand Down Expand Up @@ -350,7 +349,7 @@ public void onComplete() {
}
}

// TODO: creating or clearing Iterables hurts performance
// NOTE: creating or clearing Iterables hurts performance
// when switching to RxJava, the correspondent operator would be concatMapIterable
// but the more performant alternative might be partialCollect from extensions
private static class AsymmetricFlow<U, V> {
Expand All @@ -370,7 +369,7 @@ private static class AsymmetricFlow<U, V> {

return items;
})
// TODO: lazy
// NOTE: lazy
.concatWith(
Flowable.fromIterable(
() -> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,45 +24,38 @@
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicInteger;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

// TODO: the queue was introduced as a mean to protect the connection pool and prevent deadlocks
// NOTE: the queue was introduced as a mean to protect the connection pool and prevent deadlocks
// because of a bug (running.get() < queueSize instead of running.get() < capacity) it was never
// used
// despite that there were no problems with deadlocks and enabling it slightly decreases performance
// so I guess the options are to either remove it or enable it and use DYNAMIC_CAPACITY for
// FeatureStreams
public class RunnerRx implements Runner {

private static final Logger LOGGER = LoggerFactory.getLogger(RunnerRx.class);

private final Scheduler scheduler;
private final String name;
private final int capacity;
private final int queueSize;
private final Queue<Runnable> queue;
private final AtomicInteger running;

public RunnerRx(String name) {
this(name, Runner.DYNAMIC_CAPACITY, Runner.DYNAMIC_CAPACITY);
public RunnerRx() {
this(Runner.DYNAMIC_CAPACITY, Runner.DYNAMIC_CAPACITY);
}

public RunnerRx(String name, int capacity, int queueSize) {
this(getConfig(name, capacity), name, capacity, queueSize);
public RunnerRx(int capacity, int queueSize) {
this(getConfig(capacity), capacity, queueSize);
}

RunnerRx(ExecutorService executorService, String name, int capacity, int queueSize) {
RunnerRx(ExecutorService executorService, int capacity, int queueSize) {
if (capacity == 0) {
throw new IllegalArgumentException("invalid capacity: 0");
}

// TODO: thread names
getDispatcherName(name);
// NOPMD - TODO: thread names
this.scheduler = Schedulers.from(executorService);
scheduler.start();

this.name = name;
this.capacity = capacity;
this.queueSize = queueSize;
this.queue = new LinkedList<>();
Expand Down Expand Up @@ -153,26 +146,21 @@ public int getActiveStreams() {
return running.get();
}

private static ExecutorService getConfig(String name, int capacity) {
private static ExecutorService getConfig(int capacity) {
return capacity == Runner.DYNAMIC_CAPACITY
? getDefaultConfig(name)
: getConfig(name, capacity, capacity);
? getDefaultConfig()
: createExecutorService(capacity);
}

private static ExecutorService getDefaultConfig(String name) {
return getConfig(name, 8, 64);
private static ExecutorService getDefaultConfig() {
return createExecutorService(64);
}

// TODO
private static ExecutorService getConfig(String name, int parallelismMin, int parallelismMax) {
private static ExecutorService createExecutorService(int parallelismMax) {

return Executors.newWorkStealingPool(Math.max(1, parallelismMax));
}

private static String getDispatcherName(String name) {
return String.format("stream.%s", name);
}

@Override
public void close() {
if (Objects.nonNull(scheduler)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
import java.util.function.BiFunction;
import java.util.function.Consumer;

@SuppressWarnings("PMD.DataClass")
public class SinkDefault<U, V> implements SinkReduced<U, V> {

public enum Type {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import java.util.function.Function;
import org.reactivestreams.Publisher;

@SuppressWarnings("PMD.DataClass")
public class SourceDefault<T> implements Source<T> {

enum Type {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,6 @@ public <U1, W> Source<W> via(TransformerChained<U, U1, W> transformer) {
return via2;
}

// TODO: fuse
@Override
public <V> BasicStream<U, V> to(SinkReduced<U, V> sink) {
return new StreamDefault<>(this, sink);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
import java.util.function.Function;
import java.util.function.Predicate;

@SuppressWarnings("PMD.DataClass")
public class TransformerDefault<T, U> implements Transformer<T, U> {

public enum Type {
Expand All @@ -41,8 +42,8 @@ public TransformerDefault(Function<T, U> function) {
this(Type.MAP, function, null, null, null, null, null);
}

public TransformerDefault(Function<T, Source<U>> function, boolean flatMap) {
this(Type.FLATMAP, null, null, null, null, null, function);
public static <T, U> TransformerDefault<T, U> flatMap(Function<T, Source<U>> function) {
return new TransformerDefault<>(Type.FLATMAP, null, null, null, null, null, function);
}

public TransformerDefault(Predicate<T> predicate) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
import java.util.Comparator;
import java.util.function.Consumer;

@SuppressWarnings("PMD.TooManyMethods")
public class TransformerFused<T, U, V, W> implements TranformerCustomFuseableOut<T, V, W> {

private final TranformerCustomFuseableOut<T, U, W> transformer1;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ public final class LogContextStream {

private LogContextStream() {}

// TODO: apply to first flowable
// NOPMD - TODO: apply to first flowable
public static <T, U> Source<T> withMdc(Source<T> source) {
Map<String, String> mdc = MDC.getCopyOfContextMap();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import org.reactivestreams.FlowAdapters;
import org.reactivestreams.Publisher;

@SuppressWarnings({"PMD.TooManyMethods", "PMD.ExcessivePublicCount"})
public interface Reactive {

Runner runner(String name);
Expand Down Expand Up @@ -146,7 +147,7 @@ static <T, U> Transformer<T, U> reduce(U zero, BiFunction<U, T, U> reducer) {
}

static <T, U> Transformer<T, U> flatMap(Function<T, Source<U>> flatMap) {
return new TransformerDefault<>(flatMap, true);
return TransformerDefault.flatMap(flatMap);
}
}

Expand Down Expand Up @@ -201,28 +202,6 @@ interface TransformerCustomSource<T, U, V extends Source<U>> extends Transformer
V getCustomSource(Source<U> source);
}

/*interface TransformerCustomSink<T, U, V extends SinkWrapper<T>, W extends SinkWrapperReduced<T, ?>> extends TransformerCustom<T, U> {

//TODO: does not work since SinkReduced<U, X> does not match SinkWrapperReduced<T, ?>
<X> W to(SinkReduced<U, X> sink);

default V to(Sink<U> sink) {
return getCustomSink(TransformerCustom.super.to(sink));
}

V getCustomSink(Sink<T> sink);
}

interface SinkWrapper<T> extends Sink<T> {

Sink<T> getDelegate();
}

interface SinkWrapperReduced<T, V> extends SinkReduced<T, V> {

SinkReduced<T, V> getDelegate();
}*/

interface Sink<U> {
static <T> Sink<T> ignore() {
return new SinkDefault<>(Type.IGNORE);
Expand Down