diff --git a/.gitignore b/.gitignore
index 294c7185..a834232f 100644
--- a/.gitignore
+++ b/.gitignore
@@ -1,15 +1,16 @@
-#maven
-target/
-
-#eclipse
-.classpath
-.project
-.settings/
-
-#DS_Store
-.DS_Store
-
-#intellij
-.idea/
-*.iml
-*.iws
+#maven
+target/
+
+#eclipse
+.classpath
+.project
+.settings/
+
+#DS_Store
+.DS_Store
+
+#intellij
+.idea/
+*.iml
+*.iws
+/samoa-api/nbproject/
\ No newline at end of file
diff --git a/samoa-api/pom.xml b/samoa-api/pom.xml
index 9f69e20b..2b7bd22d 100644
--- a/samoa-api/pom.xml
+++ b/samoa-api/pom.xml
@@ -1,129 +1,154 @@
-
-
-
-
- 4.0.0
-
- UTF-8
-
-
- samoa-api
- API and algorithms for SAMOA
-
- samoa-api
-
- org.apache.samoa
- samoa
- 0.5.0-incubating-SNAPSHOT
-
-
-
-
- com.yammer.metrics
- metrics-core
- ${metrics-core.version}
-
-
-
- net.jcip
- jcip-annotations
- ${jcip-annotations.version}
-
-
-
- org.apache.commons
- commons-lang3
- ${commons-lang3.version}
-
-
-
- com.github.javacliparser
- javacliparser
- ${javacliparser.version}
-
-
-
- org.apache.samoa
- samoa-instances
- ${project.version}
-
-
-
- com.google.guava
- guava
- ${guava.version}
-
-
-
- com.esotericsoftware.kryo
- kryo
- ${kryo.version}
-
-
-
- com.dreizak
- miniball
- ${miniball.version}
-
-
-
- org.apache.hadoop
- hadoop-common
- ${hadoop.version}
-
-
- org.apache.hadoop
- hadoop-hdfs
- ${hadoop.version}
-
-
- org.apache.hadoop
- hadoop-minicluster
- ${hadoop.version}
- test
-
-
-
-
-
-
- org.apache.maven.plugins
- maven-dependency-plugin
- ${maven-dependency-plugin.version}
-
-
- copy-dependencies
- package
-
- copy-dependencies
-
-
- ${project.build.directory}/lib
- false
- false
- true
-
-
-
-
-
-
-
+
+
+
+
+ 4.0.0
+
+ UTF-8
+
+
+ samoa-api
+ API and algorithms for SAMOA
+
+ samoa-api
+
+ org.apache.samoa
+ samoa
+ 0.5.0-incubating-SNAPSHOT
+
+
+
+
+ com.yammer.metrics
+ metrics-core
+ ${metrics-core.version}
+
+
+
+ net.jcip
+ jcip-annotations
+ ${jcip-annotations.version}
+
+
+
+ org.apache.commons
+ commons-lang3
+ ${commons-lang3.version}
+
+
+
+ com.github.javacliparser
+ javacliparser
+ ${javacliparser.version}
+
+
+
+ org.apache.samoa
+ samoa-instances
+ ${project.version}
+
+
+
+ com.google.guava
+ guava
+ ${guava.version}
+
+
+
+ com.esotericsoftware.kryo
+ kryo
+ ${kryo.version}
+
+
+
+ com.dreizak
+ miniball
+ ${miniball.version}
+
+
+
+ org.apache.hadoop
+ hadoop-common
+ ${hadoop.version}
+
+
+ org.apache.hadoop
+ hadoop-hdfs
+ ${hadoop.version}
+
+
+ org.apache.hadoop
+ hadoop-minicluster
+ ${hadoop.version}
+ test
+
+
+
+ org.apache.kafka
+ kafka-clients
+ 0.10.2.0
+
+
+ org.apache.kafka
+ kafka-clients
+ 0.10.2.0
+ test
+ test
+
+
+ org.apache.kafka
+ kafka_2.11
+ 0.10.2.0
+
+
+ org.apache.kafka
+ kafka_2.11
+ 0.10.2.0
+ test
+ test
+
+
+
+
+
+
+ org.apache.maven.plugins
+ maven-dependency-plugin
+ ${maven-dependency-plugin.version}
+
+
+ copy-dependencies
+ package
+
+ copy-dependencies
+
+
+ ${project.build.directory}/lib
+ false
+ false
+ true
+
+
+
+
+
+
+
diff --git a/samoa-api/src/main/java/org/apache/samoa/streams/kafka/KafkaConsumerThread.java b/samoa-api/src/main/java/org/apache/samoa/streams/kafka/KafkaConsumerThread.java
new file mode 100644
index 00000000..a93986e5
--- /dev/null
+++ b/samoa-api/src/main/java/org/apache/samoa/streams/kafka/KafkaConsumerThread.java
@@ -0,0 +1,178 @@
+/*
+ * Copyright 2017 The Apache Software Foundation.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.samoa.streams.kafka;
+
+/*
+ * #%L
+ * SAMOA
+ * %%
+ * Copyright (C) 2014 - 2017 Apache Software Foundation
+ * %%
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * #L%
+ */
+
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Properties;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.clients.consumer.ConsumerRecords;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+
+/**
+ *
+ * @author pwawrzyniak
+ */
+class KafkaConsumerThread extends Thread {
+
+ // Consumer class for internal use to retrieve messages from Kafka
+ private transient KafkaConsumer consumer;
+
+ private Logger log = Logger.getLogger(KafkaConsumerThread.class.getName());
+
+ private final Properties consumerProperties;
+ private final Collection topics;
+ private final long consumerTimeout;
+ private final List buffer;
+ // used to synchronize things
+ private final Object lock;
+ private boolean running;
+
+ /**
+ * Class constructor
+ *
+ * @param consumerProperties Properties of Consumer
+ * @param topics Topics to fetch (subscribe)
+ * @param consumerTimeout Timeout for data polling
+ */
+ KafkaConsumerThread(Properties consumerProperties, Collection topics, long consumerTimeout) {
+ this.running = false;
+ this.consumerProperties = consumerProperties;
+ this.topics = topics;
+ this.consumerTimeout = consumerTimeout;
+ this.buffer = new ArrayList<>();
+ lock = new Object();
+ }
+
+ @Override
+ public void run() {
+
+ initializeConsumer();
+
+ while (running) {
+ fetchDataFromKafka();
+ }
+
+ cleanUp();
+ }
+
+ /**
+ * Method for fetching data from Apache Kafka. It takes care of received
+ * data
+ */
+ private void fetchDataFromKafka() {
+ if (consumer != null) {
+ if (!consumer.subscription().isEmpty()) {
+ try {
+ List kafkaMsg = getMessagesBytes(consumer.poll(consumerTimeout));
+ fillBufferAndNotifyWaits(kafkaMsg);
+ } catch (Throwable t) {
+ Logger.getLogger(KafkaConsumerThread.class.getName()).log(Level.SEVERE, null, t);
+ }
+ }
+ }
+ }
+
+ /**
+ * Copies received messages to class buffer and notifies Processor to grab
+ * the data.
+ *
+ * @param kafkaMsg Messages received from Kafka
+ */
+ private void fillBufferAndNotifyWaits(List kafkaMsg) {
+ synchronized (lock) {
+ buffer.addAll(kafkaMsg);
+ if (buffer.size() > 0) {
+ lock.notifyAll();
+ }
+ }
+ }
+
+ private void cleanUp() {
+ // clean resources
+ if (consumer != null) {
+ consumer.unsubscribe();
+ consumer.close();
+ }
+ }
+
+ private void initializeConsumer() {
+ // lazy instantiation
+ log.log(Level.INFO, "Instantiating Kafka consumer");
+ if (consumer == null) {
+ consumer = new KafkaConsumer<>(consumerProperties);
+ running = true;
+ }
+ consumer.subscribe(topics);
+ }
+
+ private List getMessagesBytes(ConsumerRecords poll) {
+ Iterator> iterator = poll.iterator();
+ List ret = new ArrayList<>();
+ while (iterator.hasNext()) {
+ ret.add(iterator.next().value());
+ }
+ return ret;
+ }
+
+ void close() {
+ running = false;
+ }
+
+ List getKafkaMessages() {
+ synchronized (lock) {
+ if (buffer.isEmpty()) {
+ try {
+ // block the call until new messages are received
+ lock.wait();
+ } catch (InterruptedException ex) {
+ Logger.getLogger(KafkaConsumerThread.class.getName()).log(Level.SEVERE, null, ex);
+ }
+ }
+ ArrayList ret = new ArrayList<>();
+ // copy buffer to return list
+ ret.addAll(buffer);
+ // clear message buffer
+ buffer.clear();
+ return ret;
+ }
+ }
+}
diff --git a/samoa-api/src/main/java/org/apache/samoa/streams/kafka/KafkaDeserializer.java b/samoa-api/src/main/java/org/apache/samoa/streams/kafka/KafkaDeserializer.java
new file mode 100644
index 00000000..7b11cbd4
--- /dev/null
+++ b/samoa-api/src/main/java/org/apache/samoa/streams/kafka/KafkaDeserializer.java
@@ -0,0 +1,55 @@
+/*
+ * Copyright 2017 The Apache Software Foundation.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.samoa.streams.kafka;
+
+/*
+ * #%L
+ * SAMOA
+ * %%
+ * Copyright (C) 2014 - 2017 Apache Software Foundation
+ * %%
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * #L%
+ */
+
+
+import org.apache.samoa.core.ContentEvent;
+
+/**
+ *
+ * @author pwawrzyniak
+ * @param the class that would be deserialized
+ */
+public interface KafkaDeserializer {
+
+ // TODO: Consider key-value schema?
+ /**
+ * Method that provides deserialization algorithm
+ * @param message Message as received from Apache Kafka
+ * @return Deserialized form of message, to be passed to topology
+ */
+ T deserialize(byte[] message);
+}
diff --git a/samoa-api/src/main/java/org/apache/samoa/streams/kafka/KafkaDestinationProcessor.java b/samoa-api/src/main/java/org/apache/samoa/streams/kafka/KafkaDestinationProcessor.java
new file mode 100644
index 00000000..420d43ce
--- /dev/null
+++ b/samoa-api/src/main/java/org/apache/samoa/streams/kafka/KafkaDestinationProcessor.java
@@ -0,0 +1,104 @@
+/*
+ * Copyright 2017 The Apache Software Foundation.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.samoa.streams.kafka;
+
+/*
+ * #%L
+ * SAMOA
+ * %%
+ * Copyright (C) 2014 - 2017 Apache Software Foundation
+ * %%
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * #L%
+ */
+
+
+import java.util.Properties;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+import org.apache.samoa.core.ContentEvent;
+import org.apache.samoa.core.Processor;
+
+/**
+ * Destination processor that writes data to Apache Kafka
+ * @author pwawrzyniak
+ * @version 0.5.0-incubating-SNAPSHOT
+ * @since 0.5.0-incubating
+ */
+public class KafkaDestinationProcessor implements Processor {
+
+ @Override
+ protected void finalize() throws Throwable {
+ super.finalize();
+ kafkaUtils.closeProducer();
+ }
+
+ private final KafkaUtils kafkaUtils;
+ private final String topic;
+ private final KafkaSerializer serializer;
+
+ /**
+ * Class constructor
+ * @param props Properties of Kafka Producer
+ * @see Kafka Producer configuration
+ * @param topic Topic this destination processor will write into
+ * @param serializer Implementation of KafkaSerializer that handles arriving data serialization
+ */
+ public KafkaDestinationProcessor(Properties props, String topic, KafkaSerializer serializer) {
+ this.kafkaUtils = new KafkaUtils(null, props, 0);
+ this.topic = topic;
+ this.serializer = serializer;
+ }
+
+ private KafkaDestinationProcessor(KafkaUtils kafkaUtils, String topic, KafkaSerializer serializer){
+ this.kafkaUtils = kafkaUtils;
+ this.topic = topic;
+ this.serializer = serializer;
+ }
+
+ @Override
+ public boolean process(ContentEvent event) {
+ try {
+ kafkaUtils.sendKafkaMessage(topic, serializer.serialize(event));
+ } catch (Exception ex) {
+ Logger.getLogger(KafkaEntranceProcessor.class.getName()).log(Level.SEVERE, null, ex);
+ return false;
+ }
+ return true;
+ }
+
+ @Override
+ public void onCreate(int id) {
+ kafkaUtils.initializeProducer();
+ }
+
+ @Override
+ public Processor newProcessor(Processor processor) {
+ KafkaDestinationProcessor kdp = (KafkaDestinationProcessor)processor;
+ return new KafkaDestinationProcessor(new KafkaUtils(kdp.kafkaUtils), kdp.topic, kdp.serializer);
+ }
+
+}
diff --git a/samoa-api/src/main/java/org/apache/samoa/streams/kafka/KafkaEntranceProcessor.java b/samoa-api/src/main/java/org/apache/samoa/streams/kafka/KafkaEntranceProcessor.java
new file mode 100644
index 00000000..7079c588
--- /dev/null
+++ b/samoa-api/src/main/java/org/apache/samoa/streams/kafka/KafkaEntranceProcessor.java
@@ -0,0 +1,128 @@
+/*
+ * Copyright 2017 The Apache Software Foundation.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.samoa.streams.kafka;
+
+/*
+ * #%L
+ * SAMOA
+ * %%
+ * Copyright (C) 2014 - 2017 Apache Software Foundation
+ * %%
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * #L%
+ */
+
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Properties;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+import org.apache.samoa.core.ContentEvent;
+import org.apache.samoa.core.EntranceProcessor;
+import org.apache.samoa.core.Processor;
+
+/**
+ * Entrance processor that reads incoming messages from Apache Kafka
+ * @author pwawrzyniak
+ * @version 0.5.0-incubating-SNAPSHOT
+ * @since 0.5.0-incubating
+ */
+public class KafkaEntranceProcessor implements EntranceProcessor {
+
+ transient private final KafkaUtils kafkaUtils;
+ private List buffer;
+ private final KafkaDeserializer deserializer;
+ private final String topic;
+
+ /**
+ * Class constructor
+ * @param props Properties of Kafka consumer
+ * @see Apache Kafka consumer configuration
+ * @param topic Topic from which the messages should be read
+ * @param timeout Timeout used when polling Kafka for new messages
+ * @param deserializer Instance of the implementation of {@link KafkaDeserializer}
+ */
+ public KafkaEntranceProcessor(Properties props, String topic, int timeout, KafkaDeserializer deserializer) {
+ this.kafkaUtils = new KafkaUtils(props, null, timeout);
+ this.deserializer = deserializer;
+ this.topic = topic;
+ }
+
+ private KafkaEntranceProcessor(KafkaUtils kafkaUtils, KafkaDeserializer deserializer, String topic) {
+ this.kafkaUtils = kafkaUtils;
+ this.deserializer = deserializer;
+ this.topic = topic;
+ }
+
+ @Override
+ public void onCreate(int id) {
+ this.buffer = new ArrayList<>(100);
+ this.kafkaUtils.initializeConsumer(Arrays.asList(this.topic));
+ }
+
+ @Override
+ public boolean isFinished() {
+ return false;
+ }
+
+ @Override
+ public boolean hasNext() {
+ if (buffer.isEmpty()) {
+ try {
+ buffer.addAll(kafkaUtils.getKafkaMessages());
+ } catch (Exception ex) {
+ Logger.getLogger(KafkaEntranceProcessor.class.getName()).log(Level.SEVERE, null, ex);
+ }
+ }
+ return buffer.size() > 0;
+ }
+
+ @Override
+ public ContentEvent nextEvent() {
+ // assume this will never be called when buffer is empty!
+ return this.deserializer.deserialize(buffer.remove(buffer.size() - 1));
+ }
+
+ @Override
+ public boolean process(ContentEvent event) {
+ return false;
+ }
+
+ @Override
+ public Processor newProcessor(Processor processor) {
+ KafkaEntranceProcessor kep = (KafkaEntranceProcessor) processor;
+ return new KafkaEntranceProcessor(new KafkaUtils(kep.kafkaUtils), kep.deserializer, kep.topic);
+ }
+
+ @Override
+ protected void finalize() throws Throwable {
+ kafkaUtils.closeConsumer();
+ super.finalize(); //To change body of generated methods, choose Tools | Templates.
+ }
+
+}
diff --git a/samoa-api/src/main/java/org/apache/samoa/streams/kafka/KafkaJsonMapper.java b/samoa-api/src/main/java/org/apache/samoa/streams/kafka/KafkaJsonMapper.java
new file mode 100644
index 00000000..c514ac07
--- /dev/null
+++ b/samoa-api/src/main/java/org/apache/samoa/streams/kafka/KafkaJsonMapper.java
@@ -0,0 +1,103 @@
+/*
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.samoa.streams.kafka;
+
+/*
+ * #%L
+ * SAMOA
+ * %%
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * #L%
+ */
+import com.google.gson.Gson;
+import com.google.gson.GsonBuilder;
+import com.google.gson.InstanceCreator;
+import com.google.gson.JsonDeserializationContext;
+import com.google.gson.JsonDeserializer;
+import com.google.gson.JsonElement;
+import com.google.gson.JsonObject;
+import com.google.gson.JsonParseException;
+import java.lang.reflect.Type;
+import java.nio.charset.Charset;
+import org.apache.samoa.instances.DenseInstanceData;
+import org.apache.samoa.instances.InstanceData;
+import org.apache.samoa.learners.InstanceContentEvent;
+
+/**
+ * Sample class for serializing and deserializing {@link InstanceContentEvent}
+ * from/to JSON format
+ *
+ * @author pwawrzyniak
+ * @version 0.5.0-incubating-SNAPSHOT
+ * @since 0.5.0-incubating
+ */
+public class KafkaJsonMapper implements KafkaDeserializer, KafkaSerializer {
+
+ private final transient Gson gson;
+ private final Charset charset;
+
+ /**
+ * Class constructor
+ *
+ * @param charset Charset to be used for bytes parsing
+ */
+ public KafkaJsonMapper(Charset charset) {
+ this.gson = new GsonBuilder().registerTypeAdapter(InstanceData.class, new InstanceDataCustomDeserializer()).create();
+ this.charset = charset;
+ }
+
+ @Override
+ public InstanceContentEvent deserialize(byte[] message) {
+ return gson.fromJson(new String(message, this.charset), InstanceContentEvent.class);
+ }
+
+ @Override
+ public byte[] serialize(InstanceContentEvent message) {
+ return gson.toJson(message).getBytes(this.charset);
+ }
+
+ //Unused
+ public class InstanceDataCreator implements InstanceCreator {
+
+ @Override
+ public InstanceData createInstance(Type type) {
+ return new DenseInstanceData();
+ }
+ }
+
+ public class InstanceDataCustomDeserializer implements JsonDeserializer {
+
+ @Override
+ public DenseInstanceData deserialize(JsonElement je, Type type, JsonDeserializationContext jdc) throws JsonParseException {
+ double[] attributeValues = null;
+ JsonObject obj = (JsonObject) je;
+ attributeValues = jdc.deserialize(obj.get("attributeValues"), double[].class);
+ DenseInstanceData did = new DenseInstanceData(attributeValues);
+ return did;
+ }
+
+ }
+
+}
diff --git a/samoa-api/src/main/java/org/apache/samoa/streams/kafka/KafkaSerializer.java b/samoa-api/src/main/java/org/apache/samoa/streams/kafka/KafkaSerializer.java
new file mode 100644
index 00000000..ad6bd8e6
--- /dev/null
+++ b/samoa-api/src/main/java/org/apache/samoa/streams/kafka/KafkaSerializer.java
@@ -0,0 +1,56 @@
+/*
+ * Copyright 2017 The Apache Software Foundation.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.samoa.streams.kafka;
+
+/*
+ * #%L
+ * SAMOA
+ * %%
+ * Copyright (C) 2014 - 2017 Apache Software Foundation
+ * %%
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * #L%
+ */
+
+
+import org.apache.samoa.core.ContentEvent;
+
+/**
+ *
+ * @author pwawrzyniak
+ * @param the class that would be serialized
+ */
+public interface KafkaSerializer {
+
+ // TODO: Consider Key-Value schema?
+
+ /**
+ * Method that provides serialization algorithm
+ * @param message Message received from topology, to be serialized
+ * @return Serialized form of the message
+ */
+ byte[] serialize(T message);
+}
diff --git a/samoa-api/src/main/java/org/apache/samoa/streams/kafka/KafkaTask.java b/samoa-api/src/main/java/org/apache/samoa/streams/kafka/KafkaTask.java
new file mode 100644
index 00000000..26012f2f
--- /dev/null
+++ b/samoa-api/src/main/java/org/apache/samoa/streams/kafka/KafkaTask.java
@@ -0,0 +1,148 @@
+/*
+ * Copyright 2017 The Apache Software Foundation.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.samoa.streams.kafka;
+
+/*
+ * #%L
+ * SAMOA
+ * %%
+ * Copyright (C) 2014 - 2017 Apache Software Foundation
+ * %%
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * #L%
+ */
+
+import java.text.SimpleDateFormat;
+import java.util.Date;
+import java.util.Properties;
+
+import org.apache.samoa.tasks.Task;
+import org.apache.samoa.topology.ComponentFactory;
+import org.apache.samoa.topology.Stream;
+import org.apache.samoa.topology.Topology;
+import org.apache.samoa.topology.TopologyBuilder;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.github.javacliparser.Configurable;
+import com.github.javacliparser.IntOption;
+import com.github.javacliparser.StringOption;
+
+/**
+ * Kafka task
+ *
+ * @author Jakub Jankowski
+ * @version 0.5.0-incubating-SNAPSHOT
+ * @since 0.5.0-incubating
+ *
+ */
+
+public class KafkaTask implements Task, Configurable {
+
+ private static final long serialVersionUID = 3984474041982397855L;
+ private static Logger logger = LoggerFactory.getLogger(KafkaTask.class);
+
+ //czy identyczne dla enterance i destination?
+ Properties producerProps;
+ Properties consumerProps;
+ int timeout;
+ private final KafkaDeserializer deserializer;
+ private final KafkaSerializer serializer;
+ private final String topic;
+
+ private TopologyBuilder builder;
+ private Topology kafkaTopology;
+
+ public IntOption kafkaParallelismOption = new IntOption("parallelismOption", 'p',
+ "Number of destination Processors", 1, 1, Integer.MAX_VALUE);
+
+ public StringOption evaluationNameOption = new StringOption("evaluationName", 'n', "Identifier of the evaluation",
+ "KafkaTask" + new SimpleDateFormat("yyyyMMddHHmmss").format(new Date()));
+
+ /**
+ * Class constructor
+ * @param props Properties of Kafka Producer and Consumer
+ * @see Kafka Producer configuration
+ * @see Kafka Consumer configuration
+ * @param topic Topic to which destination processor will write into
+ * @param timeout Timeout used when polling Kafka for new messages
+ * @param serializer Implementation of KafkaSerializer that handles arriving data serialization
+ * @param serializer Implementation of KafkaDeserializer that handles arriving data deserialization
+ */
+ public KafkaTask(Properties producerProps, Properties consumerProps, String topic, int timeout, KafkaSerializer serializer, KafkaDeserializer deserializer) {
+ this.producerProps = producerProps;
+ this.consumerProps = consumerProps;
+ this.deserializer = deserializer;
+ this.serializer = serializer;
+ this.topic = topic;
+ this.timeout = timeout;
+ }
+
+ @Override
+ public void init() {
+ logger.info("Invoking init");
+ if (builder == null) {
+ builder = new TopologyBuilder();
+ logger.info("Successfully instantiating TopologyBuilder");
+
+ builder.initTopology(evaluationNameOption.getValue());
+ logger.info("Successfully initializing SAMOA topology with name {}", evaluationNameOption.getValue());
+ }
+
+ // create enterance processor
+ KafkaEntranceProcessor sourceProcessor = new KafkaEntranceProcessor(consumerProps, topic, timeout, deserializer);
+ builder.addEntranceProcessor(sourceProcessor);
+
+ // create stream
+ Stream stream = builder.createStream(sourceProcessor);
+
+ // create destination processor
+ KafkaDestinationProcessor destProcessor = new KafkaDestinationProcessor(producerProps, topic, serializer);
+ builder.addProcessor(destProcessor, kafkaParallelismOption.getValue());
+ builder.connectInputShuffleStream(stream, destProcessor);
+
+ // build topology
+ kafkaTopology = builder.build();
+ logger.info("Successfully built the topology");
+ }
+
+ @Override
+ public Topology getTopology() {
+ return kafkaTopology;
+ }
+
+ @Override
+ public void setFactory(ComponentFactory factory) {
+ logger.info("Invoking setFactory: "+factory.toString());
+ builder = new TopologyBuilder(factory);
+ logger.info("Successfully instantiating TopologyBuilder");
+
+ builder.initTopology(evaluationNameOption.getValue());
+ logger.info("Successfully initializing SAMOA topology with name {}", evaluationNameOption.getValue());
+
+ }
+
+}
diff --git a/samoa-api/src/main/java/org/apache/samoa/streams/kafka/KafkaUtils.java b/samoa-api/src/main/java/org/apache/samoa/streams/kafka/KafkaUtils.java
new file mode 100644
index 00000000..75b54021
--- /dev/null
+++ b/samoa-api/src/main/java/org/apache/samoa/streams/kafka/KafkaUtils.java
@@ -0,0 +1,152 @@
+/*
+ * Copyright 2017 The Apache Software Foundation.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.samoa.streams.kafka;
+
+/*
+ * #%L
+ * SAMOA
+ * %%
+ * Copyright (C) 2014 - 2017 Apache Software Foundation
+ * %%
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * #L%
+ */
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Properties;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import java.util.logging.Level;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.clients.consumer.ConsumerRecords;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+/**
+ * Internal class responsible for Kafka Stream handling (both consume and
+ * produce)
+ *
+ * @author pwawrzyniak
+ * @version 0.5.0-incubating-SNAPSHOT
+ * @since 0.5.0-incubating
+ */
+class KafkaUtils {
+
+ private transient KafkaConsumerThread kafkaConsumerThread;
+
+ private transient KafkaProducer producer;
+
+ // Properties of the consumer, as defined in Kafka documentation
+ private final Properties consumerProperties;
+ private final Properties producerProperties;
+
+ // Timeout for Kafka Consumer
+ private long consumerTimeout;
+
+
+ /**
+ * Class constructor
+ *
+ * @param consumerProperties Properties of consumer
+ * @param producerProperties Properties of producer
+ * @param consumerTimeout Timeout for consumer poll requests
+ */
+ public KafkaUtils(Properties consumerProperties, Properties producerProperties, long consumerTimeout) {
+ this.consumerProperties = consumerProperties;
+ this.producerProperties = producerProperties;
+ this.consumerTimeout = consumerTimeout;
+ }
+
+ /**
+ * Creates new KafkaUtils from existing instance
+ * @param kafkaUtils Instance of KafkaUtils
+ */
+ KafkaUtils(KafkaUtils kafkaUtils) {
+ this.consumerProperties = kafkaUtils.consumerProperties;
+ this.producerProperties = kafkaUtils.producerProperties;
+ this.consumerTimeout = kafkaUtils.consumerTimeout;
+ }
+
+ /**
+ * Method used to initialize Kafka Consumer Thread, i.e. instantiate it and
+ * subscribe to configured topic
+ *
+ * @param topics List of Kafka topics that consumer should subscribe to
+ */
+ public void initializeConsumer(Collection topics) {
+ kafkaConsumerThread = new KafkaConsumerThread(consumerProperties, topics, consumerTimeout);
+ kafkaConsumerThread.start();
+ }
+
+ public void closeConsumer() {
+ kafkaConsumerThread.close();
+ }
+
+ public void initializeProducer() {
+ // lazy instantiation
+ if (producer == null) {
+ producer = new KafkaProducer<>(producerProperties);
+ }
+ }
+
+ public void closeProducer(){
+ if(producer != null){
+ producer.close(1, TimeUnit.MINUTES);
+ }
+ }
+
+ /**
+ * Method for reading new messages from Kafka topics
+ *
+ * @return Collection of read messages
+ * @throws Exception Exception is thrown when consumer was not initialized
+ * or is not subscribed to any topic.
+ */
+ public List getKafkaMessages() throws Exception {
+ return kafkaConsumerThread.getKafkaMessages();
+ }
+
+ public long sendKafkaMessage(String topic, byte[] message) {
+ if (producer != null) {
+ try{
+ ProducerRecord record = new ProducerRecord(topic, message);
+ long offset = producer.send(record).get(10, TimeUnit.SECONDS).offset();
+ producer.flush();
+ return offset;
+ } catch(InterruptedException | ExecutionException | TimeoutException e){
+ Logger.getLogger(KafkaUtils.class.getName()).log(Level.SEVERE, null, e);
+ }
+
+ }
+ return -1;
+ }
+}
diff --git a/samoa-api/src/main/resources/kafka.avsc b/samoa-api/src/main/resources/kafka.avsc
new file mode 100644
index 00000000..f5f12cf2
--- /dev/null
+++ b/samoa-api/src/main/resources/kafka.avsc
@@ -0,0 +1,106 @@
+[
+{
+ "namespace": "org.apache.samoa.streams.kafka.temp",
+ "type": "record",
+ "name": "BurrTest",
+ "fields": [
+ {"name":"name", "type": "string"},
+ {"name":"atrs", "type": {"type": "array", "items": "string"}},
+ {"name":"nums", "type": {"type": "array", "items": "int"}},
+ {"name":"list", "type": {"type": "array", "items": "string"}}
+ ]
+},
+{
+ "namespace": "org.apache.samoa.instances",
+ "type": "record",
+ "name": "Instance",
+ "fields": [
+ ]
+},
+{
+ "namespace": "org.apache.samoa.instances",
+ "type": "record",
+ "name": "InstanceData",
+ "fields": [
+ ]
+},
+{
+ "namespace": "org.apache.samoa.instances",
+ "type": "record",
+ "name": "SingleClassInstanceData",
+ "fields": [
+ {"name":"classValue", "type": "double"}
+ ]
+},
+{
+ "namespace": "org.apache.samoa.instances",
+ "type": "record",
+ "name": "DenseInstanceData",
+ "fields": [
+ {"name":"attributeValues", "type": {"type": "array", "items": "double"}}
+ ]
+},
+{
+ "namespace": "org.apache.samoa.instances",
+ "type": "record",
+ "name": "SparseInstanceData",
+ "fields": [
+ {"name":"attributeValues", "type": {"type": "array", "items": "double"}},
+ {"name":"indexValues", "type": {"type": "array", "items": "int"}},
+ {"name":"numberAttributes", "type": "int"}
+ ]
+},
+{
+ "namespace": "org.apache.samoa.instances",
+ "type": "record",
+ "name": "SingleLabelInstance",
+ "fields": [
+ {"name": "weight", "type": "double"},
+ {"name": "instanceData", "type": ["null", "org.apache.samoa.instances.InstanceData", "org.apache.samoa.instances.DenseInstanceData", "org.apache.samoa.instances.SparseInstanceData", "org.apache.samoa.instances.SingleClassInstanceData"]},
+ {"name": "classData", "type": ["null", "org.apache.samoa.instances.InstanceData", "org.apache.samoa.instances.DenseInstanceData", "org.apache.samoa.instances.SparseInstanceData", "org.apache.samoa.instances.SingleClassInstanceData"]}
+ ]
+},
+{
+ "namespace": "org.apache.samoa.instances",
+ "type": "record",
+ "name": "DenseInstance",
+ "fields": [
+ {"name": "weight", "type": "double"},
+ {"name": "instanceData", "type": ["null", "org.apache.samoa.instances.InstanceData", "org.apache.samoa.instances.DenseInstanceData", "org.apache.samoa.instances.SparseInstanceData", "org.apache.samoa.instances.SingleClassInstanceData"]},
+ {"name": "classData", "type": ["null", "org.apache.samoa.instances.InstanceData", "org.apache.samoa.instances.DenseInstanceData", "org.apache.samoa.instances.SparseInstanceData", "org.apache.samoa.instances.SingleClassInstanceData"]}
+ ]
+},
+{
+ "namespace": "org.apache.samoa.core",
+ "type": "record",
+ "name": "SerializableInstance",
+ "fields": [
+ {"name": "weight", "type": "double"},
+ {"name": "instanceData", "type": ["null", "org.apache.samoa.instances.InstanceData", "org.apache.samoa.instances.DenseInstanceData", "org.apache.samoa.instances.SparseInstanceData", "org.apache.samoa.instances.SingleClassInstanceData"]},
+ {"name": "classData", "type": ["null", "org.apache.samoa.instances.InstanceData", "org.apache.samoa.instances.DenseInstanceData", "org.apache.samoa.instances.SparseInstanceData", "org.apache.samoa.instances.SingleClassInstanceData"]}
+ ]
+},
+{
+ "namespace": "org.apache.samoa.learners",
+ "type": "record",
+ "name": "InstanceContent",
+ "fields": [
+ {"name": "instanceIndex", "type": "long"},
+ {"name": "classifierIndex", "type": "int"},
+ {"name": "evaluationIndex", "type": "int"},
+ {"name":"instance", "type":"org.apache.samoa.core.SerializableInstance"},
+ {"name": "isTraining", "type": "boolean"},
+ {"name": "isTesting", "type": "boolean"},
+ {"name": "isLast", "type": "boolean"}
+ ]
+},
+{
+ "namespace": "org.apache.samoa.learners",
+ "type": "record",
+ "name": "InstanceContentEvent",
+ "fields": [
+ {"name": "instanceContent", "type": "org.apache.samoa.learners.InstanceContent"}
+ ]
+}
+]
+
diff --git a/samoa-api/src/test/java/org/apache/samoa/streams/kafka/KafkaDestinationProcessorTest.java b/samoa-api/src/test/java/org/apache/samoa/streams/kafka/KafkaDestinationProcessorTest.java
new file mode 100644
index 00000000..2d594569
--- /dev/null
+++ b/samoa-api/src/test/java/org/apache/samoa/streams/kafka/KafkaDestinationProcessorTest.java
@@ -0,0 +1,180 @@
+/*
+ * Copyright 2017 The Apache Software Foundation.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.samoa.streams.kafka;
+
+/*
+ * #%L
+ * SAMOA
+ * %%
+ * Copyright (C) 2014 - 2017 Apache Software Foundation
+ * %%
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * #L%
+ */
+import java.io.IOException;
+import java.nio.charset.Charset;
+import java.nio.file.Files;
+import java.util.Arrays;
+import java.util.Iterator;
+import java.util.Properties;
+import java.util.Random;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeoutException;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+import kafka.admin.AdminUtils;
+import kafka.admin.RackAwareMode;
+import kafka.server.KafkaConfig;
+import kafka.server.KafkaServer;
+import kafka.utils.MockTime;
+import kafka.utils.TestUtils;
+import kafka.utils.ZKStringSerializer$;
+import kafka.utils.ZkUtils;
+import kafka.zk.EmbeddedZookeeper;
+import org.I0Itec.zkclient.ZkClient;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.clients.consumer.ConsumerRecords;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.common.utils.Time;
+import org.apache.samoa.instances.InstancesHeader;
+import org.apache.samoa.learners.InstanceContentEvent;
+import org.junit.After;
+import org.junit.AfterClass;
+import static org.junit.Assert.*;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+/**
+ *
+ * @author pwawrzyniak
+ */
+public class KafkaDestinationProcessorTest {
+
+ private static final String ZKHOST = "127.0.0.1";
+ private static final String BROKERHOST = "127.0.0.1";
+ private static final String BROKERPORT = "9092";
+ private static final String TOPIC = "test-kdp";
+ private static final int NUM_INSTANCES = 11111;
+ private static final int CONSUMER_TIMEOUT = 1000;
+
+ private static KafkaServer kafkaServer;
+ private static EmbeddedZookeeper zkServer;
+ private static ZkClient zkClient;
+ private static String zkConnect;
+
+ public KafkaDestinationProcessorTest() {
+ }
+
+ @BeforeClass
+ public static void setUpClass() throws IOException {
+ // setup Zookeeper
+ zkServer = new EmbeddedZookeeper();
+ zkConnect = ZKHOST + ":" + zkServer.port();
+ zkClient = new ZkClient(zkConnect, 30000, 30000, ZKStringSerializer$.MODULE$);
+ ZkUtils zkUtils = ZkUtils.apply(zkClient, false);
+
+ // setup Broker
+ Properties brokerProps = new Properties();
+ brokerProps.setProperty("zookeeper.connect", zkConnect);
+ brokerProps.setProperty("broker.id", "0");
+ brokerProps.setProperty("log.dirs", Files.createTempDirectory("kafka-").toAbsolutePath().toString());
+ brokerProps.setProperty("listeners", "PLAINTEXT://" + BROKERHOST + ":" + BROKERPORT);
+ KafkaConfig config = new KafkaConfig(brokerProps);
+ Time mock = new MockTime();
+ kafkaServer = TestUtils.createServer(config, mock);
+
+ // create topic
+ AdminUtils.createTopic(zkUtils, TOPIC, 1, 1, new Properties(), RackAwareMode.Disabled$.MODULE$);
+
+ }
+
+ @AfterClass
+ public static void tearDownClass() {
+ kafkaServer.shutdown();
+ zkClient.close();
+ zkServer.shutdown();
+ }
+
+ @Before
+ public void setUp() throws IOException {
+
+ }
+
+ @After
+ public void tearDown() {
+
+ }
+
+ @Test
+ public void testSendingData() throws InterruptedException, ExecutionException, TimeoutException {
+
+ final Logger logger = Logger.getLogger(KafkaDestinationProcessorTest.class.getName());
+ Properties props = TestUtilsForKafka.getProducerProperties(BROKERHOST,BROKERPORT);
+ props.setProperty("auto.offset.reset", "earliest");
+ KafkaDestinationProcessor kdp = new KafkaDestinationProcessor(props, TOPIC, new OosTestSerializer());
+ kdp.onCreate(1);
+
+ final int[] i = {0};
+
+ // prepare new thread for data receiveing
+ Thread th = new Thread(new Runnable() {
+ @Override
+ public void run() {
+ KafkaConsumer consumer = new KafkaConsumer<>(TestUtilsForKafka.getConsumerProperties(BROKERHOST, BROKERPORT));
+ consumer.subscribe(Arrays.asList(TOPIC));
+ while (i[0] < NUM_INSTANCES) {
+ try {
+ ConsumerRecords cr = consumer.poll(CONSUMER_TIMEOUT);
+ Iterator> it = cr.iterator();
+ while (it.hasNext()) {
+ ConsumerRecord record = it.next();
+ i[0]++;
+ }
+ } catch (Exception ex) {
+ Logger.getLogger(KafkaDestinationProcessorTest.class.getName()).log(Level.SEVERE, null, ex);
+ }
+ }
+ consumer.close();
+ }
+ });
+ th.start();
+
+ int z = 0;
+ Random r = new Random();
+ InstancesHeader header = TestUtilsForKafka.generateHeader(10);
+
+ for (z = 0; z < NUM_INSTANCES; z++) {
+ InstanceContentEvent event = TestUtilsForKafka.getData(r, 10, header);
+ kdp.process(event);
+// logger.log(Level.INFO, "{0} {1}", new Object[]{"Sent item with id: ", z});
+ }
+
+ // wait for all instances to be read
+ Thread.sleep(2 * CONSUMER_TIMEOUT);
+ assertEquals("Number of sent and received instances", z, i[0]);
+ }
+}
diff --git a/samoa-api/src/test/java/org/apache/samoa/streams/kafka/KafkaEntranceProcessorTest.java b/samoa-api/src/test/java/org/apache/samoa/streams/kafka/KafkaEntranceProcessorTest.java
new file mode 100644
index 00000000..b8b5c72f
--- /dev/null
+++ b/samoa-api/src/test/java/org/apache/samoa/streams/kafka/KafkaEntranceProcessorTest.java
@@ -0,0 +1,187 @@
+/*
+ * Copyright 2017 The Apache Software Foundation.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.samoa.streams.kafka;
+
+/*
+ * #%L
+ * SAMOA
+ * %%
+ * Copyright (C) 2014 - 2017 Apache Software Foundation
+ * %%
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * #L%
+ */
+import com.google.gson.Gson;
+import java.io.IOException;
+import java.nio.charset.Charset;
+import java.nio.file.Files;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Properties;
+import java.util.Random;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+import org.apache.samoa.learners.InstanceContentEvent;
+import org.junit.After;
+import org.junit.AfterClass;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import static org.junit.Assert.*;
+import kafka.admin.AdminUtils;
+import kafka.admin.RackAwareMode;
+import kafka.server.KafkaConfig;
+import kafka.server.KafkaServer;
+import kafka.utils.MockTime;
+import kafka.utils.TestUtils;
+import org.apache.kafka.common.utils.Time;
+import kafka.utils.ZKStringSerializer$;
+import kafka.utils.ZkUtils;
+import kafka.zk.EmbeddedZookeeper;
+import org.I0Itec.zkclient.ZkClient;
+import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.samoa.instances.InstancesHeader;
+
+/**
+ *
+ * @author pwawrzyniak
+ * @author Jakub Jankowski
+ */
+public class KafkaEntranceProcessorTest {
+
+ private static final String ZKHOST = "127.0.0.1";
+ private static final String BROKERHOST = "127.0.0.1";
+ private static final String BROKERPORT = "9092";
+ private static final String TOPIC_AVRO = "samoa_test-avro";
+ private static final String TOPIC_JSON = "samoa_test-json";
+ private static final int NUM_INSTANCES = 11111;
+
+ private static KafkaServer kafkaServer;
+ private static EmbeddedZookeeper zkServer;
+ private static ZkClient zkClient;
+ private static String zkConnect;
+ private static int TIMEOUT = 1000;
+
+ public KafkaEntranceProcessorTest() {
+ }
+
+ @BeforeClass
+ public static void setUpClass() throws IOException {
+ // setup Zookeeper
+ zkServer = new EmbeddedZookeeper();
+ zkConnect = ZKHOST + ":" + zkServer.port();
+ zkClient = new ZkClient(zkConnect, 30000, 30000, ZKStringSerializer$.MODULE$);
+ ZkUtils zkUtils = ZkUtils.apply(zkClient, false);
+
+ // setup Broker
+ Properties brokerProps = new Properties();
+ brokerProps.setProperty("zookeeper.connect", zkConnect);
+ brokerProps.setProperty("broker.id", "0");
+ brokerProps.setProperty("log.dirs", Files.createTempDirectory("kafka-").toAbsolutePath().toString());
+ brokerProps.setProperty("listeners", "PLAINTEXT://" + BROKERHOST + ":" + BROKERPORT);
+ KafkaConfig config = new KafkaConfig(brokerProps);
+ Time mock = new MockTime();
+ kafkaServer = TestUtils.createServer(config, mock);
+
+ // create topics
+ AdminUtils.createTopic(zkUtils, TOPIC_AVRO, 1, 1, new Properties(), RackAwareMode.Disabled$.MODULE$);
+ AdminUtils.createTopic(zkUtils, TOPIC_JSON, 1, 1, new Properties(), RackAwareMode.Disabled$.MODULE$);
+
+ }
+
+ @AfterClass
+ public static void tearDownClass() {
+ try {
+ kafkaServer.shutdown();
+ zkClient.close();
+ zkServer.shutdown();
+ } catch (Exception ex) {
+ Logger.getLogger(KafkaEntranceProcessorTest.class.getName()).log(Level.SEVERE, null, ex);
+ }
+ }
+
+ @Before
+ public void setUp() throws IOException {
+
+ }
+
+ @After
+ public void tearDown() {
+
+ }
+
+ @Test
+ public void testFetchingNewData() throws InterruptedException, ExecutionException, TimeoutException {
+
+ final Logger logger = Logger.getLogger(KafkaEntranceProcessorTest.class.getName());
+ logger.log(Level.INFO, "OOS");
+ logger.log(Level.INFO, "testFetchingNewData");
+ Properties props = TestUtilsForKafka.getConsumerProperties(BROKERHOST, BROKERPORT);
+ props.setProperty("auto.offset.reset", "earliest");
+ KafkaEntranceProcessor kep = new KafkaEntranceProcessor(props, TOPIC_JSON, TIMEOUT, new OosTestSerializer());
+
+ kep.onCreate(1);
+
+ // prepare new thread for data producing
+ Thread th = new Thread(new Runnable() {
+ @Override
+ public void run() {
+ KafkaProducer producer = new KafkaProducer<>(TestUtilsForKafka.getProducerProperties(BROKERHOST, BROKERPORT));
+
+ Random r = new Random();
+ InstancesHeader header = TestUtilsForKafka.generateHeader(10);
+ OosTestSerializer serializer = new OosTestSerializer();
+ int i = 0;
+ for (i = 0; i < NUM_INSTANCES; i++) {
+ try {
+ InstanceContentEvent event = TestUtilsForKafka.getData(r, 10, header);
+
+ ProducerRecord record = new ProducerRecord(TOPIC_JSON, serializer.serialize(event));
+ long stat = producer.send(record).get(10, TimeUnit.SECONDS).offset();
+ } catch (InterruptedException | ExecutionException | TimeoutException ex) {
+ Logger.getLogger(KafkaEntranceProcessorTest.class.getName()).log(Level.SEVERE, null, ex);
+ }
+ }
+ producer.flush();
+ producer.close();
+ }
+ });
+ th.start();
+
+ int z = 0;
+ while (z < NUM_INSTANCES && kep.hasNext()) {
+ InstanceContentEvent event = (InstanceContentEvent) kep.nextEvent();
+ z++;
+ }
+
+ assertEquals("Number of sent and received instances", NUM_INSTANCES, z);
+
+ }
+}
diff --git a/samoa-api/src/test/java/org/apache/samoa/streams/kafka/KafkaEntranceProcessorWithJsonTest.java b/samoa-api/src/test/java/org/apache/samoa/streams/kafka/KafkaEntranceProcessorWithJsonTest.java
new file mode 100644
index 00000000..061bbf4e
--- /dev/null
+++ b/samoa-api/src/test/java/org/apache/samoa/streams/kafka/KafkaEntranceProcessorWithJsonTest.java
@@ -0,0 +1,180 @@
+/*
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.samoa.streams.kafka;
+
+/*
+ * #%L
+ * SAMOA
+ * %%
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * #L%
+ */
+import com.google.gson.Gson;
+import java.io.IOException;
+import java.nio.charset.Charset;
+import java.nio.file.Files;
+import java.util.Properties;
+import java.util.Random;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+import kafka.admin.AdminUtils;
+import kafka.admin.RackAwareMode;
+import kafka.server.KafkaConfig;
+import kafka.server.KafkaServer;
+import kafka.utils.MockTime;
+import kafka.utils.TestUtils;
+import kafka.utils.ZKStringSerializer$;
+import kafka.utils.ZkUtils;
+import kafka.zk.EmbeddedZookeeper;
+import org.I0Itec.zkclient.ZkClient;
+import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kafka.common.utils.Time;
+import org.apache.samoa.instances.InstancesHeader;
+import org.apache.samoa.learners.InstanceContentEvent;
+import org.junit.After;
+import org.junit.AfterClass;
+import static org.junit.Assert.*;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+/**
+ *
+ * @author pwawrzyniak
+ * @author Jakub Jankowski
+ */
+public class KafkaEntranceProcessorWithJsonTest {
+
+ private static final String ZKHOST = "127.0.0.1";
+ private static final String BROKERHOST = "127.0.0.1";
+ private static final String BROKERPORT = "9092";
+ private static final String TOPIC_JSON = "samoa_test-json";
+ private static final int NUM_INSTANCES = 11111;
+
+ private static KafkaServer kafkaServer;
+ private static EmbeddedZookeeper zkServer;
+ private static ZkClient zkClient;
+ private static String zkConnect;
+ private static int TIMEOUT = 1000;
+
+ public KafkaEntranceProcessorWithJsonTest() {
+ }
+
+ @BeforeClass
+ public static void setUpClass() throws IOException {
+ // setup Zookeeper
+ zkServer = new EmbeddedZookeeper();
+ zkConnect = ZKHOST + ":" + zkServer.port();
+ zkClient = new ZkClient(zkConnect, 30000, 30000, ZKStringSerializer$.MODULE$);
+ ZkUtils zkUtils = ZkUtils.apply(zkClient, false);
+
+ // setup Broker
+ Properties brokerProps = new Properties();
+ brokerProps.setProperty("zookeeper.connect", zkConnect);
+ brokerProps.setProperty("broker.id", "0");
+ brokerProps.setProperty("log.dirs", Files.createTempDirectory("kafka-").toAbsolutePath().toString());
+ brokerProps.setProperty("listeners", "PLAINTEXT://" + BROKERHOST + ":" + BROKERPORT);
+ KafkaConfig config = new KafkaConfig(brokerProps);
+ Time mock = new MockTime();
+ kafkaServer = TestUtils.createServer(config, mock);
+
+ // create topics
+ AdminUtils.createTopic(zkUtils, TOPIC_JSON, 1, 1, new Properties(), RackAwareMode.Disabled$.MODULE$);
+
+ }
+
+ @AfterClass
+ public static void tearDownClass() {
+ try {
+ kafkaServer.shutdown();
+ zkClient.close();
+ zkServer.shutdown();
+ } catch (Exception ex) {
+ Logger.getLogger(KafkaEntranceProcessorWithJsonTest.class.getName()).log(Level.SEVERE, null, ex);
+ }
+ }
+
+ @Before
+ public void setUp() throws IOException {
+
+ }
+
+ @After
+ public void tearDown() {
+
+ }
+
+@Test
+ public void testFetchingNewDataWithJson() throws InterruptedException, ExecutionException, TimeoutException {
+
+ final Logger logger = Logger.getLogger(KafkaEntranceProcessorTest.class.getName());
+ logger.log(Level.INFO, "JSON");
+ logger.log(Level.INFO, "testFetchingNewDataWithJson");
+ Properties props = TestUtilsForKafka.getConsumerProperties(BROKERHOST, BROKERPORT);
+ props.setProperty("auto.offset.reset", "earliest");
+ KafkaEntranceProcessor kep = new KafkaEntranceProcessor(props, TOPIC_JSON, TIMEOUT, new KafkaJsonMapper(Charset.defaultCharset()));
+
+ kep.onCreate(1);
+
+ // prepare new thread for data producing
+ Thread th = new Thread(new Runnable() {
+ @Override
+ public void run() {
+ KafkaProducer producer = new KafkaProducer<>(TestUtilsForKafka.getProducerProperties(BROKERHOST,BROKERPORT));
+
+ Random r = new Random();
+ InstancesHeader header = TestUtilsForKafka.generateHeader(10);
+ Gson gson = new Gson();
+ int i = 0;
+ for (i = 0; i < NUM_INSTANCES; i++) {
+ try {
+ InstanceContentEvent event = TestUtilsForKafka.getData(r, 10, header);
+
+ ProducerRecord record = new ProducerRecord(TOPIC_JSON, gson.toJson(event).getBytes());
+ long stat = producer.send(record).get(10, TimeUnit.SECONDS).offset();
+ } catch (InterruptedException | ExecutionException | TimeoutException ex) {
+ Logger.getLogger(KafkaEntranceProcessorTest.class.getName()).log(Level.SEVERE, null, ex);
+ }
+ }
+ producer.flush();
+ producer.close();
+ }
+ });
+ th.start();
+
+ int z = 0;
+ while (z < NUM_INSTANCES && kep.hasNext()) {
+ InstanceContentEvent event = (InstanceContentEvent) kep.nextEvent();
+ z++;
+ }
+
+ assertEquals("Number of sent and received instances", NUM_INSTANCES, z);
+
+ }
+}
diff --git a/samoa-api/src/test/java/org/apache/samoa/streams/kafka/KafkaTaskTest.java b/samoa-api/src/test/java/org/apache/samoa/streams/kafka/KafkaTaskTest.java
new file mode 100644
index 00000000..4215b086
--- /dev/null
+++ b/samoa-api/src/test/java/org/apache/samoa/streams/kafka/KafkaTaskTest.java
@@ -0,0 +1,157 @@
+/*
+ * Copyright 2017 The Apache Software Foundation.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.samoa.streams.kafka;
+
+import java.io.IOException;
+import java.util.Properties;
+import java.util.Random;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+
+import org.I0Itec.zkclient.ZkClient;
+import org.junit.After;
+import org.junit.AfterClass;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Ignore;
+import org.junit.Test;
+
+import kafka.server.KafkaServer;
+import kafka.zk.EmbeddedZookeeper;
+import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.samoa.instances.InstancesHeader;
+import org.apache.samoa.streams.kafka.topology.SimpleComponentFactory;
+import org.apache.samoa.streams.kafka.topology.SimpleEngine;
+
+/*
+ * #%L
+ * SAMOA
+ * %%
+ * Copyright (C) 2014 - 2017 Apache Software Foundation
+ * %%
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * #L%
+ */
+/**
+ *
+ * @author Jakub Jankowski
+ */
+@Ignore
+public class KafkaTaskTest {
+
+ private static final String ZKHOST = "127.0.0.1";//10.255.251.202"; //10.255.251.202
+ private static final String BROKERHOST = "127.0.0.1";//"10.255.251.214"; //10.255.251.214
+ private static final String BROKERPORT = "9092"; //6667, local: 9092
+ private static final String TOPIC = "samoa_test"; //samoa_test, local: test
+ private static final int NUM_INSTANCES = 125922;
+
+ private static KafkaServer kafkaServer;
+ private static EmbeddedZookeeper zkServer;
+ private static ZkClient zkClient;
+ private static String zkConnect;
+
+ @BeforeClass
+ public static void setUpClass() throws IOException {
+ // setup Zookeeper
+// zkServer = new EmbeddedZookeeper();
+// zkConnect = ZKHOST + ":" + "2181"; //+ zkServer.port();
+// zkClient = new ZkClient(zkConnect, 30000, 30000, ZKStringSerializer$.MODULE$);
+// ZkUtils zkUtils = ZkUtils.apply(zkClient, false);
+
+ // setup Broker
+ /*Properties brokerProps = new Properties();
+ brokerProps.setProperty("zookeeper.connect", zkConnect);
+ brokerProps.setProperty("broker.id", "0");
+ brokerProps.setProperty("log.dirs", Files.createTempDirectory("kafka-").toAbsolutePath().toString());
+ brokerProps.setProperty("listeners", "PLAINTEXT://" + BROKERHOST + ":" + BROKERPORT);
+ KafkaConfig config = new KafkaConfig(brokerProps);
+ Time mock = new MockTime();
+ kafkaServer = TestUtils.createServer(config, mock);*/
+ // create topic
+ //AdminUtils.createTopic(zkUtils, TOPIC, 1, 1, new Properties(), RackAwareMode.Disabled$.MODULE$);
+ }
+
+ @AfterClass
+ public static void tearDownClass() {
+ //kafkaServer.shutdown();
+// zkClient.close();
+// zkServer.shutdown();
+ }
+
+ @Before
+ public void setUp() throws IOException {
+
+ }
+
+ @After
+ public void tearDown() {
+
+ }
+
+ @Test
+ public void testKafkaTask() throws InterruptedException, ExecutionException, TimeoutException {
+ Logger logger = Logger.getLogger(KafkaTaskTest.class.getName());
+ logger.log(Level.INFO, "KafkaTask");
+ Properties producerProps = TestUtilsForKafka.getProducerProperties(BROKERHOST, BROKERPORT);
+ Properties consumerProps = TestUtilsForKafka.getConsumerProperties(BROKERHOST, BROKERPORT);
+
+ KafkaTask task = new KafkaTask(producerProps, consumerProps, "kafkaTaskTest", 10000, new OosTestSerializer(), new OosTestSerializer());
+ task.setFactory(new SimpleComponentFactory());
+ task.init();
+ SimpleEngine.submitTopology(task.getTopology());
+
+ Thread th = new Thread(new Runnable() {
+ @Override
+ public void run() {
+ KafkaProducer producer = new KafkaProducer<>(TestUtilsForKafka.getProducerProperties(BROKERHOST, BROKERPORT));
+
+ Random r = new Random();
+ InstancesHeader header = TestUtilsForKafka.generateHeader(10);
+ OosTestSerializer serializer = new OosTestSerializer();
+ int i = 0;
+ for (i = 0; i < NUM_INSTANCES; i++) {
+ try {
+ ProducerRecord record = new ProducerRecord(TOPIC, serializer.serialize(TestUtilsForKafka.getData(r, 10, header)));
+ long stat = producer.send(record).get(10, TimeUnit.DAYS).offset();
+// Thread.sleep(5);
+ Logger.getLogger(KafkaEntranceProcessorTest.class.getName()).log(Level.INFO, "Sent message with ID={0} to Kafka!, offset={1}", new Object[]{i, stat});
+ } catch (InterruptedException | ExecutionException | TimeoutException ex) {
+ Logger.getLogger(KafkaEntranceProcessorTest.class.getName()).log(Level.SEVERE, null, ex);
+ }
+ }
+ producer.flush();
+ producer.close();
+ }
+ });
+ th.start();
+
+ }
+}
diff --git a/samoa-api/src/test/java/org/apache/samoa/streams/kafka/KafkaUtilsTest.java b/samoa-api/src/test/java/org/apache/samoa/streams/kafka/KafkaUtilsTest.java
new file mode 100644
index 00000000..8f77504b
--- /dev/null
+++ b/samoa-api/src/test/java/org/apache/samoa/streams/kafka/KafkaUtilsTest.java
@@ -0,0 +1,247 @@
+/*
+ * Copyright 2017 The Apache Software Foundation.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.samoa.streams.kafka;
+
+/*
+ * #%L
+ * SAMOA
+ * %%
+ * Copyright (C) 2014 - 2017 Apache Software Foundation
+ * %%
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * #L%
+ */
+import com.google.gson.Gson;
+import java.io.IOException;
+import java.nio.file.Files;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Properties;
+import java.util.Random;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeoutException;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+import kafka.admin.AdminUtils;
+import kafka.admin.RackAwareMode;
+import kafka.server.KafkaConfig;
+import kafka.server.KafkaServer;
+import kafka.utils.MockTime;
+import kafka.utils.TestUtils;
+import kafka.utils.ZKStringSerializer$;
+import kafka.utils.ZkUtils;
+import kafka.zk.EmbeddedZookeeper;
+import org.I0Itec.zkclient.ZkClient;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.clients.consumer.ConsumerRecords;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kafka.common.utils.Time;
+import org.apache.samoa.instances.InstancesHeader;
+import org.junit.After;
+import org.junit.AfterClass;
+import static org.junit.Assert.*;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+/**
+ *
+ * @author pwawrzyniak
+ */
+public class KafkaUtilsTest {
+
+ private static final String ZKHOST = "127.0.0.1";
+ private static final String BROKERHOST = "127.0.0.1";
+ private static final String BROKERPORT = "9092";
+ private static final String TOPIC_R = "test-r";
+ private static final String TOPIC_S = "test-s";
+ private static final int NUM_INSTANCES = 50;
+
+ private static KafkaServer kafkaServer;
+ private static EmbeddedZookeeper zkServer;
+ private static ZkClient zkClient;
+ private static String zkConnect;
+
+ private static final Logger logger = Logger.getLogger(KafkaUtilsTest.class.getCanonicalName());
+ private final long CONSUMER_TIMEOUT = 1500;
+
+ public KafkaUtilsTest() {
+ }
+
+ @BeforeClass
+ public static void setUpClass() throws IOException {
+ // setup Zookeeper
+ zkServer = new EmbeddedZookeeper();
+ zkConnect = ZKHOST + ":" + zkServer.port();
+ zkClient = new ZkClient(zkConnect, 30000, 30000, ZKStringSerializer$.MODULE$);
+ ZkUtils zkUtils = ZkUtils.apply(zkClient, false);
+
+ // setup Broker
+ Properties brokerProps = new Properties();
+ brokerProps.setProperty("zookeeper.connect", zkConnect);
+ brokerProps.setProperty("broker.id", "0");
+ brokerProps.setProperty("log.dirs", Files.createTempDirectory("kafkaUtils-").toAbsolutePath().toString());
+ brokerProps.setProperty("listeners", "PLAINTEXT://" + BROKERHOST + ":" + BROKERPORT);
+ KafkaConfig config = new KafkaConfig(brokerProps);
+ Time mock = new MockTime();
+ kafkaServer = TestUtils.createServer(config, mock);
+
+ // create topics
+ AdminUtils.createTopic(zkUtils, TOPIC_R, 1, 1, new Properties(), RackAwareMode.Disabled$.MODULE$);
+ AdminUtils.createTopic(zkUtils, TOPIC_S, 1, 1, new Properties(), RackAwareMode.Disabled$.MODULE$);
+
+ }
+
+ @AfterClass
+ public static void tearDownClass() {
+ kafkaServer.shutdown();
+ zkClient.close();
+ zkServer.shutdown();
+ }
+
+ @Before
+ public void setUp() {
+ }
+
+ @After
+ public void tearDown() {
+ }
+
+ /**
+ * Test of initializeConsumer method, of class KafkaUtils.
+ */
+ @Test
+ public void testInitializeConsumer() throws Exception {
+ logger.log(Level.INFO, "initializeConsumer");
+ Collection topics = Arrays.asList(TOPIC_R);
+ KafkaUtils instance = new KafkaUtils(TestUtilsForKafka.getConsumerProperties(BROKERHOST,BROKERPORT), TestUtilsForKafka.getProducerProperties(BROKERHOST,BROKERPORT), CONSUMER_TIMEOUT);
+ assertNotNull(instance);
+
+ instance.initializeConsumer(topics);
+ Thread.sleep(1000);
+ instance.closeConsumer();
+
+ Thread.sleep(CONSUMER_TIMEOUT);
+
+ instance.initializeConsumer(topics);
+ Thread.sleep(1000);
+ instance.closeConsumer();
+ assertTrue(true);
+ }
+
+ /**
+ * Test of getKafkaMessages method, of class KafkaUtils.
+ */
+ @Test
+ public void testGetKafkaMessages() throws Exception {
+ logger.log(Level.INFO, "getKafkaMessages");
+ Collection topics = Arrays.asList(TOPIC_R);
+ KafkaUtils instance = new KafkaUtils(TestUtilsForKafka.getConsumerProperties(BROKERHOST,BROKERPORT), TestUtilsForKafka.getProducerProperties(BROKERHOST,BROKERPORT), CONSUMER_TIMEOUT);
+ assertNotNull(instance);
+
+ logger.log(Level.INFO, "Initialising consumer");
+ instance.initializeConsumer(topics);
+
+ logger.log(Level.INFO, "Produce data");
+ List expResult = sendAndGetMessages(NUM_INSTANCES);
+
+ logger.log(Level.INFO, "Wait a moment");
+ Thread.sleep(CONSUMER_TIMEOUT);
+
+ logger.log(Level.INFO, "Get results from Kafka");
+ List result = instance.getKafkaMessages();
+
+ assertArrayEquals(expResult.toArray(), result.toArray());
+ instance.closeConsumer();
+ }
+
+ private List sendAndGetMessages(int maxNum) throws InterruptedException, ExecutionException, TimeoutException {
+ List ret;
+ try (KafkaProducer producer = new KafkaProducer<>(TestUtilsForKafka.getProducerProperties("sendM-test",BROKERHOST,BROKERPORT))) {
+ ret = new ArrayList<>();
+ Random r = new Random();
+ InstancesHeader header = TestUtilsForKafka.generateHeader(10);
+ Gson gson = new Gson();
+ int i = 0;
+ for (i = 0; i < maxNum; i++) {
+ ProducerRecord record = new ProducerRecord(TOPIC_R, gson.toJson(TestUtilsForKafka.getData(r, 10, header)).getBytes());
+ ret.add(record.value());
+ producer.send(record);
+ }
+ producer.flush();
+ }
+ return ret;
+ }
+
+ /**
+ * Test of sendKafkaMessage method, of class KafkaUtils.
+ *
+ * @throws java.lang.InterruptedException
+ */
+ @Test
+ public void testSendKafkaMessage() throws InterruptedException {
+ logger.log(Level.INFO, "sendKafkaMessage");
+
+ logger.log(Level.INFO, "Initialising producer");
+ KafkaUtils instance = new KafkaUtils(TestUtilsForKafka.getConsumerProperties(BROKERHOST,BROKERPORT), TestUtilsForKafka.getProducerProperties("rcv-test", BROKERHOST,BROKERPORT), CONSUMER_TIMEOUT);
+ instance.initializeProducer();
+
+ logger.log(Level.INFO, "Initialising consumer");
+ KafkaConsumer consumer;
+ consumer = new KafkaConsumer<>(TestUtilsForKafka.getConsumerProperties(BROKERHOST,BROKERPORT));
+ consumer.subscribe(Arrays.asList(TOPIC_S));
+
+ logger.log(Level.INFO, "Produce data");
+ List sent = new ArrayList<>();
+ Random r = new Random();
+ InstancesHeader header = TestUtilsForKafka.generateHeader(10);
+ Gson gson = new Gson();
+ for (int i = 0; i < NUM_INSTANCES; i++) {
+ byte[] val = gson.toJson(TestUtilsForKafka.getData(r, 10, header)).getBytes();
+ sent.add(val);
+ instance.sendKafkaMessage(TOPIC_S, val);
+ }
+ // wait for Kafka a bit :)
+ Thread.sleep(2*CONSUMER_TIMEOUT);
+
+ logger.log(Level.INFO, "Get results from Kafka");
+ ConsumerRecords records = consumer.poll(CONSUMER_TIMEOUT);
+ Iterator> it = records.iterator();
+ List consumed = new ArrayList<>();
+ while (it.hasNext()) {
+ consumed.add(it.next().value());
+ }
+ consumer.close();
+
+ assertArrayEquals(sent.toArray(), consumed.toArray());
+ }
+
+}
diff --git a/samoa-api/src/test/java/org/apache/samoa/streams/kafka/OosTestSerializer.java b/samoa-api/src/test/java/org/apache/samoa/streams/kafka/OosTestSerializer.java
new file mode 100644
index 00000000..649d3e01
--- /dev/null
+++ b/samoa-api/src/test/java/org/apache/samoa/streams/kafka/OosTestSerializer.java
@@ -0,0 +1,60 @@
+/*
+ * Copyright 2017 The Apache Software Foundation.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.samoa.streams.kafka;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.ObjectInputStream;
+import java.io.ObjectOutputStream;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+import org.apache.samoa.learners.InstanceContentEvent;
+
+/**
+ *
+ * @author Piotr Wawrzyniak
+ */
+public class OosTestSerializer implements KafkaDeserializer, KafkaSerializer {
+
+ @Override
+ public InstanceContentEvent deserialize(byte[] message) {
+ try {
+ ObjectInputStream ois = new ObjectInputStream(new ByteArrayInputStream(message));
+ InstanceContentEvent ice = (InstanceContentEvent)ois.readObject();
+ return ice;
+ } catch (IOException | ClassNotFoundException ex) {
+ Logger.getLogger(OosTestSerializer.class.getName()).log(Level.SEVERE, null, ex);
+ }
+ return null;
+ }
+
+ @Override
+ public byte[] serialize(InstanceContentEvent message) {
+ try {
+ ByteArrayOutputStream baos = new ByteArrayOutputStream();
+ ObjectOutputStream oos = new ObjectOutputStream(baos);
+ oos.writeObject(message);
+ oos.flush();
+ return baos.toByteArray();
+ } catch (IOException ex) {
+ Logger.getLogger(OosTestSerializer.class.getName()).log(Level.SEVERE, null, ex);
+ }
+ return null;
+ }
+
+
+}
diff --git a/samoa-api/src/test/java/org/apache/samoa/streams/kafka/TestUtilsForKafka.java b/samoa-api/src/test/java/org/apache/samoa/streams/kafka/TestUtilsForKafka.java
new file mode 100644
index 00000000..87ab16c9
--- /dev/null
+++ b/samoa-api/src/test/java/org/apache/samoa/streams/kafka/TestUtilsForKafka.java
@@ -0,0 +1,143 @@
+/*
+ * Copyright 2017 The Apache Software Foundation.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.samoa.streams.kafka;
+
+/*
+ * #%L
+ * SAMOA
+ * %%
+ * Copyright (C) 2014 - 2017 Apache Software Foundation
+ * %%
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * #L%
+ */
+
+
+import java.util.Properties;
+import java.util.Random;
+import org.apache.samoa.instances.Attribute;
+import org.apache.samoa.instances.DenseInstance;
+import org.apache.samoa.instances.Instance;
+import org.apache.samoa.instances.Instances;
+import org.apache.samoa.instances.InstancesHeader;
+import org.apache.samoa.learners.InstanceContentEvent;
+import org.apache.samoa.moa.core.FastVector;
+
+/**
+ *
+ * @author pwawrzyniak
+ */
+public class TestUtilsForKafka {
+
+// private static final String BROKERHOST = "127.0.0.1";
+// private static final String BROKERPORT = "9092";
+
+ protected static InstanceContentEvent getData(Random instanceRandom, int numAtts, InstancesHeader header) {
+ double[] attVals = new double[numAtts + 1];
+ double sum = 0.0;
+ double sumWeights = 0.0;
+ for (int i = 0; i < numAtts; i++) {
+ attVals[i] = instanceRandom.nextDouble();
+
+ }
+ int classLabel;
+ if (sum >= sumWeights * 0.5) {
+ classLabel = 1;
+ } else {
+ classLabel = 0;
+ }
+
+ Instance inst = new DenseInstance(1.0, attVals);
+ inst.setDataset(header);
+ inst.setClassValue(classLabel);
+
+ return new InstanceContentEvent(0, inst, true, false);
+ }
+
+ @SuppressWarnings({"rawtypes", "unchecked"})
+ protected static InstancesHeader generateHeader(int numAttributes) {
+ FastVector attributes = new FastVector();
+ for (int i = 0; i < numAttributes; i++) {
+ attributes.addElement(new Attribute("att" + (i + 1)));
+ }
+
+ FastVector classLabels = new FastVector();
+ for (int i = 0; i < numAttributes; i++) {
+ classLabels.addElement("class" + (i + 1));
+ }
+ attributes.addElement(new Attribute("class", classLabels));
+ InstancesHeader streamHeader = new InstancesHeader(new Instances("test-kafka", attributes, 0));
+ streamHeader.setClassIndex(streamHeader.numAttributes() - 1);
+ return streamHeader;
+ }
+
+
+ protected static Properties getProducerProperties(String BROKERHOST, String BROKERPORT) {
+ return getProducerProperties("test", BROKERHOST, BROKERPORT);
+ }
+
+ /**
+ *
+ * @param clientId
+ * @return
+ */
+ protected static Properties getProducerProperties(String clientId, String BROKERHOST, String BROKERPORT) {
+ Properties producerProps = new Properties();
+ producerProps.setProperty("bootstrap.servers", BROKERHOST + ":" + BROKERPORT);
+ producerProps.setProperty("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
+ producerProps.setProperty("value.serializer", "org.apache.kafka.common.serialization.ByteArraySerializer");
+ producerProps.setProperty("group.id", "test");
+ producerProps.setProperty("client.id", clientId);
+ return producerProps;
+ }
+
+ protected static Properties getConsumerProperties(String BROKERHOST, String BROKERPORT) {
+ Properties consumerProps = new Properties();
+ consumerProps.setProperty("bootstrap.servers", BROKERHOST + ":" + BROKERPORT);
+ consumerProps.put("enable.auto.commit", "true");
+ consumerProps.put("auto.commit.interval.ms", "1000");
+ consumerProps.setProperty("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
+ consumerProps.setProperty("value.deserializer", "org.apache.kafka.common.serialization.ByteArrayDeserializer");
+ consumerProps.setProperty("group.id", "test");
+ consumerProps.setProperty("auto.offset.reset", "earliest");
+ return consumerProps;
+ }
+
+ protected static Properties getConsumerProducerProperties(String BROKERHOST, String BROKERPORT) {
+ Properties props = new Properties();
+ props.setProperty("bootstrap.servers", BROKERHOST + ":" + BROKERPORT);
+ props.put("enable.auto.commit", "true");
+ props.put("auto.commit.interval.ms", "1000");
+ props.setProperty("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
+ props.setProperty("value.serializer", "org.apache.kafka.common.serialization.ByteArraySerializer");
+ props.setProperty("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
+ props.setProperty("value.deserializer", "org.apache.kafka.common.serialization.ByteArrayDeserializer");
+ props.setProperty("group.id", "burrito");
+ props.setProperty("auto.offset.reset", "earliest");
+ props.setProperty("client.id", "burrito");
+ return props;
+ }
+}
diff --git a/samoa-api/src/test/java/org/apache/samoa/streams/kafka/topology/SimpleComponentFactory.java b/samoa-api/src/test/java/org/apache/samoa/streams/kafka/topology/SimpleComponentFactory.java
new file mode 100644
index 00000000..202833ea
--- /dev/null
+++ b/samoa-api/src/test/java/org/apache/samoa/streams/kafka/topology/SimpleComponentFactory.java
@@ -0,0 +1,53 @@
+package org.apache.samoa.streams.kafka.topology;
+
+/*
+ * #%L
+ * SAMOA
+ * %%
+ * Copyright (C) 2014 - 2015 Apache Software Foundation
+ * %%
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * #L%
+ */
+
+import org.apache.samoa.core.EntranceProcessor;
+import org.apache.samoa.core.Processor;
+import org.apache.samoa.topology.ComponentFactory;
+import org.apache.samoa.topology.EntranceProcessingItem;
+import org.apache.samoa.topology.IProcessingItem;
+import org.apache.samoa.topology.ProcessingItem;
+import org.apache.samoa.topology.Stream;
+import org.apache.samoa.topology.Topology;
+
+public class SimpleComponentFactory implements ComponentFactory {
+
+ public ProcessingItem createPi(Processor processor, int paralellism) {
+ return new SimpleProcessingItem(processor, paralellism);
+ }
+
+ public ProcessingItem createPi(Processor processor) {
+ return this.createPi(processor, 1);
+ }
+
+ public EntranceProcessingItem createEntrancePi(EntranceProcessor processor) {
+ return new SimpleEntranceProcessingItem(processor);
+ }
+
+ public Stream createStream(IProcessingItem sourcePi) {
+ return new SimpleStream(sourcePi);
+ }
+
+ public Topology createTopology(String topoName) {
+ return new SimpleTopology(topoName);
+ }
+}
diff --git a/samoa-api/src/test/java/org/apache/samoa/streams/kafka/topology/SimpleEngine.java b/samoa-api/src/test/java/org/apache/samoa/streams/kafka/topology/SimpleEngine.java
new file mode 100644
index 00000000..338444b7
--- /dev/null
+++ b/samoa-api/src/test/java/org/apache/samoa/streams/kafka/topology/SimpleEngine.java
@@ -0,0 +1,37 @@
+/*
+ * To change this template, choose Tools | Templates
+ * and open the template in the editor.
+ */
+package org.apache.samoa.streams.kafka.topology;
+
+/*
+ * #%L
+ * SAMOA
+ * %%
+ * Copyright (C) 2014 - 2015 Apache Software Foundation
+ * %%
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * #L%
+ */
+
+import org.apache.samoa.topology.Topology;
+
+public class SimpleEngine {
+
+ public static void submitTopology(Topology topology) {
+ SimpleTopology simpleTopology = (SimpleTopology) topology;
+ simpleTopology.run();
+ // runs until completion
+ }
+
+}
diff --git a/samoa-api/src/test/java/org/apache/samoa/streams/kafka/topology/SimpleEntranceProcessingItem.java b/samoa-api/src/test/java/org/apache/samoa/streams/kafka/topology/SimpleEntranceProcessingItem.java
new file mode 100644
index 00000000..26ed4710
--- /dev/null
+++ b/samoa-api/src/test/java/org/apache/samoa/streams/kafka/topology/SimpleEntranceProcessingItem.java
@@ -0,0 +1,33 @@
+package org.apache.samoa.streams.kafka.topology;
+
+/*
+ * #%L
+ * SAMOA
+ * %%
+ * Copyright (C) 2014 - 2015 Apache Software Foundation
+ * %%
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * #L%
+ */
+
+import org.apache.samoa.core.EntranceProcessor;
+import org.apache.samoa.topology.LocalEntranceProcessingItem;
+
+class SimpleEntranceProcessingItem extends LocalEntranceProcessingItem {
+ public SimpleEntranceProcessingItem(EntranceProcessor processor) {
+ super(processor);
+ }
+
+ // The default waiting time when there is no available events is 100ms
+ // Override waitForNewEvents() to change it
+}
diff --git a/samoa-api/src/test/java/org/apache/samoa/streams/kafka/topology/SimpleProcessingItem.java b/samoa-api/src/test/java/org/apache/samoa/streams/kafka/topology/SimpleProcessingItem.java
new file mode 100644
index 00000000..bac03981
--- /dev/null
+++ b/samoa-api/src/test/java/org/apache/samoa/streams/kafka/topology/SimpleProcessingItem.java
@@ -0,0 +1,87 @@
+/*
+ * To change this template, choose Tools | Templates
+ * and open the template in the editor.
+ */
+package org.apache.samoa.streams.kafka.topology;
+
+/*
+ * #%L
+ * SAMOA
+ * %%
+ * Copyright (C) 2014 - 2015 Apache Software Foundation
+ * %%
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * #L%
+ */
+
+import org.apache.samoa.core.ContentEvent;
+import org.apache.samoa.core.Processor;
+import org.apache.samoa.topology.AbstractProcessingItem;
+import org.apache.samoa.topology.IProcessingItem;
+import org.apache.samoa.topology.ProcessingItem;
+import org.apache.samoa.topology.Stream;
+import org.apache.samoa.utils.PartitioningScheme;
+import org.apache.samoa.utils.StreamDestination;
+
+/**
+ *
+ * @author abifet
+ */
+class SimpleProcessingItem extends AbstractProcessingItem {
+ private IProcessingItem[] arrayProcessingItem;
+
+ SimpleProcessingItem(Processor processor) {
+ super(processor);
+ }
+
+ SimpleProcessingItem(Processor processor, int parallelism) {
+ super(processor);
+ this.setParallelism(parallelism);
+ }
+
+ public IProcessingItem getProcessingItem(int i) {
+ return arrayProcessingItem[i];
+ }
+
+ @Override
+ protected ProcessingItem addInputStream(Stream inputStream, PartitioningScheme scheme) {
+ StreamDestination destination = new StreamDestination(this, this.getParallelism(), scheme);
+ ((SimpleStream) inputStream).addDestination(destination);
+ return this;
+ }
+
+ public SimpleProcessingItem copy() {
+ Processor processor = this.getProcessor();
+ return new SimpleProcessingItem(processor.newProcessor(processor));
+ }
+
+ public void processEvent(ContentEvent event, int counter) {
+
+ int parallelism = this.getParallelism();
+ // System.out.println("Process event "+event+" (isLast="+event.isLastEvent()+") with counter="+counter+" while parallelism="+parallelism);
+ if (this.arrayProcessingItem == null && parallelism > 0) {
+ // Init processing elements, the first time they are needed
+ this.arrayProcessingItem = new IProcessingItem[parallelism];
+ for (int j = 0; j < parallelism; j++) {
+ arrayProcessingItem[j] = this.copy();
+ arrayProcessingItem[j].getProcessor().onCreate(j);
+ }
+ }
+ if (this.arrayProcessingItem != null) {
+ IProcessingItem pi = this.getProcessingItem(counter);
+ Processor p = pi.getProcessor();
+ // System.out.println("PI="+pi+", p="+p);
+ this.getProcessingItem(counter).getProcessor().process(event);
+ }
+ }
+}
diff --git a/samoa-api/src/test/java/org/apache/samoa/streams/kafka/topology/SimpleStream.java b/samoa-api/src/test/java/org/apache/samoa/streams/kafka/topology/SimpleStream.java
new file mode 100644
index 00000000..8405463c
--- /dev/null
+++ b/samoa-api/src/test/java/org/apache/samoa/streams/kafka/topology/SimpleStream.java
@@ -0,0 +1,95 @@
+/*
+ * To change this template, choose Tools | Templates
+ * and open the template in the editor.
+ */
+package org.apache.samoa.streams.kafka.topology;
+
+/*
+ * #%L
+ * SAMOA
+ * %%
+ * Copyright (C) 2014 - 2015 Apache Software Foundation
+ * %%
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * #L%
+ */
+
+import java.util.LinkedList;
+import java.util.List;
+
+import org.apache.commons.lang3.builder.HashCodeBuilder;
+import org.apache.samoa.core.ContentEvent;
+import org.apache.samoa.topology.AbstractStream;
+import org.apache.samoa.topology.IProcessingItem;
+import org.apache.samoa.utils.StreamDestination;
+
+/**
+ *
+ * @author abifet
+ */
+class SimpleStream extends AbstractStream {
+ private List destinations;
+ private int maxCounter;
+ private int eventCounter;
+
+ SimpleStream(IProcessingItem sourcePi) {
+ super(sourcePi);
+ this.destinations = new LinkedList<>();
+ this.eventCounter = 0;
+ this.maxCounter = 1;
+ }
+
+ private int getNextCounter() {
+ if (maxCounter > 0 && eventCounter >= maxCounter)
+ eventCounter = 0;
+ this.eventCounter++;
+ return this.eventCounter;
+ }
+
+ @Override
+ public void put(ContentEvent event) {
+ this.put(event, this.getNextCounter());
+ }
+
+ private void put(ContentEvent event, int counter) {
+ SimpleProcessingItem pi;
+ int parallelism;
+ for (StreamDestination destination : destinations) {
+ pi = (SimpleProcessingItem) destination.getProcessingItem();
+ parallelism = destination.getParallelism();
+ switch (destination.getPartitioningScheme()) {
+ case SHUFFLE:
+ pi.processEvent(event, counter % parallelism);
+ break;
+ case GROUP_BY_KEY:
+ HashCodeBuilder hb = new HashCodeBuilder();
+ hb.append(event.getKey());
+ int key = hb.build() % parallelism;
+ pi.processEvent(event, key);
+ break;
+ case BROADCAST:
+ for (int p = 0; p < parallelism; p++) {
+ pi.processEvent(event, p);
+ }
+ break;
+ }
+ }
+ }
+
+ public void addDestination(StreamDestination destination) {
+ this.destinations.add(destination);
+ if (maxCounter <= 0)
+ maxCounter = 1;
+ maxCounter *= destination.getParallelism();
+ }
+}
diff --git a/samoa-api/src/test/java/org/apache/samoa/streams/kafka/topology/SimpleTopology.java b/samoa-api/src/test/java/org/apache/samoa/streams/kafka/topology/SimpleTopology.java
new file mode 100644
index 00000000..d298b695
--- /dev/null
+++ b/samoa-api/src/test/java/org/apache/samoa/streams/kafka/topology/SimpleTopology.java
@@ -0,0 +1,46 @@
+/*
+ * To change this template, choose Tools | Templates
+ * and open the template in the editor.
+ */
+package org.apache.samoa.streams.kafka.topology;
+
+/*
+ * #%L
+ * SAMOA
+ * %%
+ * Copyright (C) 2014 - 2015 Apache Software Foundation
+ * %%
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * #L%
+ */
+
+import org.apache.samoa.topology.AbstractTopology;
+
+public class SimpleTopology extends AbstractTopology {
+ SimpleTopology(String name) {
+ super(name);
+ }
+
+ public void run() {
+ if (this.getEntranceProcessingItems() == null)
+ throw new IllegalStateException("You need to set entrance PI before running the topology.");
+ if (this.getEntranceProcessingItems().size() != 1)
+ throw new IllegalStateException("SimpleTopology supports 1 entrance PI only. Number of entrance PIs is "
+ + this.getEntranceProcessingItems().size());
+
+ SimpleEntranceProcessingItem entrancePi = (SimpleEntranceProcessingItem) this.getEntranceProcessingItems()
+ .toArray()[0];
+ entrancePi.getProcessor().onCreate(0); // id=0 as it is not used in simple mode
+ entrancePi.startSendingEvents();
+ }
+}