diff --git a/xtraplatform-streams/src/main/java/de/ii/xtraplatform/streams/app/ReactiveRx.java b/xtraplatform-streams/src/main/java/de/ii/xtraplatform/streams/app/ReactiveRx.java index 0ff50a1f..f808c601 100644 --- a/xtraplatform-streams/src/main/java/de/ii/xtraplatform/streams/app/ReactiveRx.java +++ b/xtraplatform-streams/src/main/java/de/ii/xtraplatform/streams/app/ReactiveRx.java @@ -30,6 +30,7 @@ @Singleton @AutoBind +@SuppressWarnings({"PMD.GodClass", "PMD.TooManyMethods"}) public class ReactiveRx implements Reactive { @Inject @@ -37,12 +38,12 @@ public ReactiveRx() {} @Override public Runner runner(String name) { - return new RunnerRx(name); + return new RunnerRx(); } @Override public Runner runner(String name, int capacity, int queueSize) { - return new RunnerRx(name, capacity, queueSize); + return new RunnerRx(capacity, queueSize); } static Triple, SubscriberRx, StreamContext> getGraph(Stream stream) { @@ -57,7 +58,6 @@ static Triple, SubscriberRx, StreamContext> getGraph(St throw new IllegalStateException(); } - // TODO: might use ConnectableFlowable static Triple, SubscriberRx, StreamContext> getGraph( StreamDefault stream) { Flowable source = assemble(stream.getSource()); @@ -289,7 +289,6 @@ public void onNext(byte[] bytes) { throw new IllegalStateException(); } - // TODO: test static SubscriberRx assemble( SinkTransformedImpl sink, StreamContext stream) { UnicastProcessor subscriber = UnicastProcessor.create(); @@ -350,7 +349,7 @@ public void onComplete() { } } - // TODO: creating or clearing Iterables hurts performance + // NOTE: creating or clearing Iterables hurts performance // when switching to RxJava, the correspondent operator would be concatMapIterable // but the more performant alternative might be partialCollect from extensions private static class AsymmetricFlow { @@ -370,7 +369,7 @@ private static class AsymmetricFlow { return items; }) - // TODO: lazy + // NOTE: lazy .concatWith( Flowable.fromIterable( () -> { diff --git a/xtraplatform-streams/src/main/java/de/ii/xtraplatform/streams/app/RunnerRx.java b/xtraplatform-streams/src/main/java/de/ii/xtraplatform/streams/app/RunnerRx.java index f2c3c888..3036dddc 100644 --- a/xtraplatform-streams/src/main/java/de/ii/xtraplatform/streams/app/RunnerRx.java +++ b/xtraplatform-streams/src/main/java/de/ii/xtraplatform/streams/app/RunnerRx.java @@ -24,10 +24,8 @@ import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.atomic.AtomicInteger; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; -// TODO: the queue was introduced as a mean to protect the connection pool and prevent deadlocks +// NOTE: the queue was introduced as a mean to protect the connection pool and prevent deadlocks // because of a bug (running.get() < queueSize instead of running.get() < capacity) it was never // used // despite that there were no problems with deadlocks and enabling it slightly decreases performance @@ -35,34 +33,29 @@ // FeatureStreams public class RunnerRx implements Runner { - private static final Logger LOGGER = LoggerFactory.getLogger(RunnerRx.class); - private final Scheduler scheduler; - private final String name; private final int capacity; private final int queueSize; private final Queue queue; private final AtomicInteger running; - public RunnerRx(String name) { - this(name, Runner.DYNAMIC_CAPACITY, Runner.DYNAMIC_CAPACITY); + public RunnerRx() { + this(Runner.DYNAMIC_CAPACITY, Runner.DYNAMIC_CAPACITY); } - public RunnerRx(String name, int capacity, int queueSize) { - this(getConfig(name, capacity), name, capacity, queueSize); + public RunnerRx(int capacity, int queueSize) { + this(getConfig(capacity), capacity, queueSize); } - RunnerRx(ExecutorService executorService, String name, int capacity, int queueSize) { + RunnerRx(ExecutorService executorService, int capacity, int queueSize) { if (capacity == 0) { throw new IllegalArgumentException("invalid capacity: 0"); } - // TODO: thread names - getDispatcherName(name); + // NOPMD - TODO: thread names this.scheduler = Schedulers.from(executorService); scheduler.start(); - this.name = name; this.capacity = capacity; this.queueSize = queueSize; this.queue = new LinkedList<>(); @@ -153,26 +146,21 @@ public int getActiveStreams() { return running.get(); } - private static ExecutorService getConfig(String name, int capacity) { + private static ExecutorService getConfig(int capacity) { return capacity == Runner.DYNAMIC_CAPACITY - ? getDefaultConfig(name) - : getConfig(name, capacity, capacity); + ? getDefaultConfig() + : createExecutorService(capacity); } - private static ExecutorService getDefaultConfig(String name) { - return getConfig(name, 8, 64); + private static ExecutorService getDefaultConfig() { + return createExecutorService(64); } - // TODO - private static ExecutorService getConfig(String name, int parallelismMin, int parallelismMax) { + private static ExecutorService createExecutorService(int parallelismMax) { return Executors.newWorkStealingPool(Math.max(1, parallelismMax)); } - private static String getDispatcherName(String name) { - return String.format("stream.%s", name); - } - @Override public void close() { if (Objects.nonNull(scheduler)) { diff --git a/xtraplatform-streams/src/main/java/de/ii/xtraplatform/streams/app/SinkDefault.java b/xtraplatform-streams/src/main/java/de/ii/xtraplatform/streams/app/SinkDefault.java index 7964a279..0196e716 100644 --- a/xtraplatform-streams/src/main/java/de/ii/xtraplatform/streams/app/SinkDefault.java +++ b/xtraplatform-streams/src/main/java/de/ii/xtraplatform/streams/app/SinkDefault.java @@ -15,6 +15,7 @@ import java.util.function.BiFunction; import java.util.function.Consumer; +@SuppressWarnings("PMD.DataClass") public class SinkDefault implements SinkReduced { public enum Type { diff --git a/xtraplatform-streams/src/main/java/de/ii/xtraplatform/streams/app/SourceDefault.java b/xtraplatform-streams/src/main/java/de/ii/xtraplatform/streams/app/SourceDefault.java index 395d4b9d..7848699e 100644 --- a/xtraplatform-streams/src/main/java/de/ii/xtraplatform/streams/app/SourceDefault.java +++ b/xtraplatform-streams/src/main/java/de/ii/xtraplatform/streams/app/SourceDefault.java @@ -19,6 +19,7 @@ import java.util.function.Function; import org.reactivestreams.Publisher; +@SuppressWarnings("PMD.DataClass") public class SourceDefault implements Source { enum Type { diff --git a/xtraplatform-streams/src/main/java/de/ii/xtraplatform/streams/app/SourceTransformed.java b/xtraplatform-streams/src/main/java/de/ii/xtraplatform/streams/app/SourceTransformed.java index b784f80e..c85e8091 100644 --- a/xtraplatform-streams/src/main/java/de/ii/xtraplatform/streams/app/SourceTransformed.java +++ b/xtraplatform-streams/src/main/java/de/ii/xtraplatform/streams/app/SourceTransformed.java @@ -53,7 +53,6 @@ public Source via(TransformerChained transformer) { return via2; } - // TODO: fuse @Override public BasicStream to(SinkReduced sink) { return new StreamDefault<>(this, sink); diff --git a/xtraplatform-streams/src/main/java/de/ii/xtraplatform/streams/app/TransformerDefault.java b/xtraplatform-streams/src/main/java/de/ii/xtraplatform/streams/app/TransformerDefault.java index 1f1d4a33..2968e047 100644 --- a/xtraplatform-streams/src/main/java/de/ii/xtraplatform/streams/app/TransformerDefault.java +++ b/xtraplatform-streams/src/main/java/de/ii/xtraplatform/streams/app/TransformerDefault.java @@ -16,6 +16,7 @@ import java.util.function.Function; import java.util.function.Predicate; +@SuppressWarnings("PMD.DataClass") public class TransformerDefault implements Transformer { public enum Type { @@ -41,8 +42,8 @@ public TransformerDefault(Function function) { this(Type.MAP, function, null, null, null, null, null); } - public TransformerDefault(Function> function, boolean flatMap) { - this(Type.FLATMAP, null, null, null, null, null, function); + public static TransformerDefault flatMap(Function> function) { + return new TransformerDefault<>(Type.FLATMAP, null, null, null, null, null, function); } public TransformerDefault(Predicate predicate) { diff --git a/xtraplatform-streams/src/main/java/de/ii/xtraplatform/streams/app/TransformerFused.java b/xtraplatform-streams/src/main/java/de/ii/xtraplatform/streams/app/TransformerFused.java index 8d5fb690..f4e5ce12 100644 --- a/xtraplatform-streams/src/main/java/de/ii/xtraplatform/streams/app/TransformerFused.java +++ b/xtraplatform-streams/src/main/java/de/ii/xtraplatform/streams/app/TransformerFused.java @@ -18,6 +18,7 @@ import java.util.Comparator; import java.util.function.Consumer; +@SuppressWarnings("PMD.TooManyMethods") public class TransformerFused implements TranformerCustomFuseableOut { private final TranformerCustomFuseableOut transformer1; diff --git a/xtraplatform-streams/src/main/java/de/ii/xtraplatform/streams/domain/LogContextStream.java b/xtraplatform-streams/src/main/java/de/ii/xtraplatform/streams/domain/LogContextStream.java index 2967dac1..739e7726 100644 --- a/xtraplatform-streams/src/main/java/de/ii/xtraplatform/streams/domain/LogContextStream.java +++ b/xtraplatform-streams/src/main/java/de/ii/xtraplatform/streams/domain/LogContextStream.java @@ -17,7 +17,7 @@ public final class LogContextStream { private LogContextStream() {} - // TODO: apply to first flowable + // NOPMD - TODO: apply to first flowable public static Source withMdc(Source source) { Map mdc = MDC.getCopyOfContextMap(); diff --git a/xtraplatform-streams/src/main/java/de/ii/xtraplatform/streams/domain/Reactive.java b/xtraplatform-streams/src/main/java/de/ii/xtraplatform/streams/domain/Reactive.java index 377129fc..b706ba29 100644 --- a/xtraplatform-streams/src/main/java/de/ii/xtraplatform/streams/domain/Reactive.java +++ b/xtraplatform-streams/src/main/java/de/ii/xtraplatform/streams/domain/Reactive.java @@ -31,6 +31,7 @@ import org.reactivestreams.FlowAdapters; import org.reactivestreams.Publisher; +@SuppressWarnings({"PMD.TooManyMethods", "PMD.ExcessivePublicCount"}) public interface Reactive { Runner runner(String name); @@ -146,7 +147,7 @@ static Transformer reduce(U zero, BiFunction reducer) { } static Transformer flatMap(Function> flatMap) { - return new TransformerDefault<>(flatMap, true); + return TransformerDefault.flatMap(flatMap); } } @@ -201,28 +202,6 @@ interface TransformerCustomSource> extends Transformer V getCustomSource(Source source); } - /*interface TransformerCustomSink, W extends SinkWrapperReduced> extends TransformerCustom { - - //TODO: does not work since SinkReduced does not match SinkWrapperReduced - W to(SinkReduced sink); - - default V to(Sink sink) { - return getCustomSink(TransformerCustom.super.to(sink)); - } - - V getCustomSink(Sink sink); - } - - interface SinkWrapper extends Sink { - - Sink getDelegate(); - } - - interface SinkWrapperReduced extends SinkReduced { - - SinkReduced getDelegate(); - }*/ - interface Sink { static Sink ignore() { return new SinkDefault<>(Type.IGNORE);