diff --git a/.gitignore b/.gitignore index 294c7185..a834232f 100644 --- a/.gitignore +++ b/.gitignore @@ -1,15 +1,16 @@ -#maven -target/ - -#eclipse -.classpath -.project -.settings/ - -#DS_Store -.DS_Store - -#intellij -.idea/ -*.iml -*.iws +#maven +target/ + +#eclipse +.classpath +.project +.settings/ + +#DS_Store +.DS_Store + +#intellij +.idea/ +*.iml +*.iws +/samoa-api/nbproject/ \ No newline at end of file diff --git a/samoa-api/pom.xml b/samoa-api/pom.xml index 9f69e20b..2b7bd22d 100644 --- a/samoa-api/pom.xml +++ b/samoa-api/pom.xml @@ -1,129 +1,154 @@ - - - - - 4.0.0 - - UTF-8 - - - samoa-api - API and algorithms for SAMOA - - samoa-api - - org.apache.samoa - samoa - 0.5.0-incubating-SNAPSHOT - - - - - com.yammer.metrics - metrics-core - ${metrics-core.version} - - - - net.jcip - jcip-annotations - ${jcip-annotations.version} - - - - org.apache.commons - commons-lang3 - ${commons-lang3.version} - - - - com.github.javacliparser - javacliparser - ${javacliparser.version} - - - - org.apache.samoa - samoa-instances - ${project.version} - - - - com.google.guava - guava - ${guava.version} - - - - com.esotericsoftware.kryo - kryo - ${kryo.version} - - - - com.dreizak - miniball - ${miniball.version} - - - - org.apache.hadoop - hadoop-common - ${hadoop.version} - - - org.apache.hadoop - hadoop-hdfs - ${hadoop.version} - - - org.apache.hadoop - hadoop-minicluster - ${hadoop.version} - test - - - - - - - org.apache.maven.plugins - maven-dependency-plugin - ${maven-dependency-plugin.version} - - - copy-dependencies - package - - copy-dependencies - - - ${project.build.directory}/lib - false - false - true - - - - - - - + + + + + 4.0.0 + + UTF-8 + + + samoa-api + API and algorithms for SAMOA + + samoa-api + + org.apache.samoa + samoa + 0.5.0-incubating-SNAPSHOT + + + + + com.yammer.metrics + metrics-core + ${metrics-core.version} + + + + net.jcip + jcip-annotations + ${jcip-annotations.version} + + + + org.apache.commons + commons-lang3 + ${commons-lang3.version} + + + + com.github.javacliparser + javacliparser + ${javacliparser.version} + + + + org.apache.samoa + samoa-instances + ${project.version} + + + + com.google.guava + guava + ${guava.version} + + + + com.esotericsoftware.kryo + kryo + ${kryo.version} + + + + com.dreizak + miniball + ${miniball.version} + + + + org.apache.hadoop + hadoop-common + ${hadoop.version} + + + org.apache.hadoop + hadoop-hdfs + ${hadoop.version} + + + org.apache.hadoop + hadoop-minicluster + ${hadoop.version} + test + + + + org.apache.kafka + kafka-clients + 0.10.2.0 + + + org.apache.kafka + kafka-clients + 0.10.2.0 + test + test + + + org.apache.kafka + kafka_2.11 + 0.10.2.0 + + + org.apache.kafka + kafka_2.11 + 0.10.2.0 + test + test + + + + + + + org.apache.maven.plugins + maven-dependency-plugin + ${maven-dependency-plugin.version} + + + copy-dependencies + package + + copy-dependencies + + + ${project.build.directory}/lib + false + false + true + + + + + + + diff --git a/samoa-api/src/main/java/org/apache/samoa/streams/kafka/KafkaConsumerThread.java b/samoa-api/src/main/java/org/apache/samoa/streams/kafka/KafkaConsumerThread.java new file mode 100644 index 00000000..a93986e5 --- /dev/null +++ b/samoa-api/src/main/java/org/apache/samoa/streams/kafka/KafkaConsumerThread.java @@ -0,0 +1,178 @@ +/* + * Copyright 2017 The Apache Software Foundation. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.samoa.streams.kafka; + +/* + * #%L + * SAMOA + * %% + * Copyright (C) 2014 - 2017 Apache Software Foundation + * %% + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * #L% + */ + + +import java.util.ArrayList; +import java.util.Collection; +import java.util.Iterator; +import java.util.List; +import java.util.Properties; +import java.util.logging.Level; +import java.util.logging.Logger; +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.clients.consumer.ConsumerRecords; +import org.apache.kafka.clients.consumer.KafkaConsumer; + +/** + * + * @author pwawrzyniak + */ +class KafkaConsumerThread extends Thread { + + // Consumer class for internal use to retrieve messages from Kafka + private transient KafkaConsumer consumer; + + private Logger log = Logger.getLogger(KafkaConsumerThread.class.getName()); + + private final Properties consumerProperties; + private final Collection topics; + private final long consumerTimeout; + private final List buffer; + // used to synchronize things + private final Object lock; + private boolean running; + + /** + * Class constructor + * + * @param consumerProperties Properties of Consumer + * @param topics Topics to fetch (subscribe) + * @param consumerTimeout Timeout for data polling + */ + KafkaConsumerThread(Properties consumerProperties, Collection topics, long consumerTimeout) { + this.running = false; + this.consumerProperties = consumerProperties; + this.topics = topics; + this.consumerTimeout = consumerTimeout; + this.buffer = new ArrayList<>(); + lock = new Object(); + } + + @Override + public void run() { + + initializeConsumer(); + + while (running) { + fetchDataFromKafka(); + } + + cleanUp(); + } + + /** + * Method for fetching data from Apache Kafka. It takes care of received + * data + */ + private void fetchDataFromKafka() { + if (consumer != null) { + if (!consumer.subscription().isEmpty()) { + try { + List kafkaMsg = getMessagesBytes(consumer.poll(consumerTimeout)); + fillBufferAndNotifyWaits(kafkaMsg); + } catch (Throwable t) { + Logger.getLogger(KafkaConsumerThread.class.getName()).log(Level.SEVERE, null, t); + } + } + } + } + + /** + * Copies received messages to class buffer and notifies Processor to grab + * the data. + * + * @param kafkaMsg Messages received from Kafka + */ + private void fillBufferAndNotifyWaits(List kafkaMsg) { + synchronized (lock) { + buffer.addAll(kafkaMsg); + if (buffer.size() > 0) { + lock.notifyAll(); + } + } + } + + private void cleanUp() { + // clean resources + if (consumer != null) { + consumer.unsubscribe(); + consumer.close(); + } + } + + private void initializeConsumer() { + // lazy instantiation + log.log(Level.INFO, "Instantiating Kafka consumer"); + if (consumer == null) { + consumer = new KafkaConsumer<>(consumerProperties); + running = true; + } + consumer.subscribe(topics); + } + + private List getMessagesBytes(ConsumerRecords poll) { + Iterator> iterator = poll.iterator(); + List ret = new ArrayList<>(); + while (iterator.hasNext()) { + ret.add(iterator.next().value()); + } + return ret; + } + + void close() { + running = false; + } + + List getKafkaMessages() { + synchronized (lock) { + if (buffer.isEmpty()) { + try { + // block the call until new messages are received + lock.wait(); + } catch (InterruptedException ex) { + Logger.getLogger(KafkaConsumerThread.class.getName()).log(Level.SEVERE, null, ex); + } + } + ArrayList ret = new ArrayList<>(); + // copy buffer to return list + ret.addAll(buffer); + // clear message buffer + buffer.clear(); + return ret; + } + } +} diff --git a/samoa-api/src/main/java/org/apache/samoa/streams/kafka/KafkaDeserializer.java b/samoa-api/src/main/java/org/apache/samoa/streams/kafka/KafkaDeserializer.java new file mode 100644 index 00000000..7b11cbd4 --- /dev/null +++ b/samoa-api/src/main/java/org/apache/samoa/streams/kafka/KafkaDeserializer.java @@ -0,0 +1,55 @@ +/* + * Copyright 2017 The Apache Software Foundation. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.samoa.streams.kafka; + +/* + * #%L + * SAMOA + * %% + * Copyright (C) 2014 - 2017 Apache Software Foundation + * %% + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * #L% + */ + + +import org.apache.samoa.core.ContentEvent; + +/** + * + * @author pwawrzyniak + * @param the class that would be deserialized + */ +public interface KafkaDeserializer { + + // TODO: Consider key-value schema? + /** + * Method that provides deserialization algorithm + * @param message Message as received from Apache Kafka + * @return Deserialized form of message, to be passed to topology + */ + T deserialize(byte[] message); +} diff --git a/samoa-api/src/main/java/org/apache/samoa/streams/kafka/KafkaDestinationProcessor.java b/samoa-api/src/main/java/org/apache/samoa/streams/kafka/KafkaDestinationProcessor.java new file mode 100644 index 00000000..420d43ce --- /dev/null +++ b/samoa-api/src/main/java/org/apache/samoa/streams/kafka/KafkaDestinationProcessor.java @@ -0,0 +1,104 @@ +/* + * Copyright 2017 The Apache Software Foundation. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.samoa.streams.kafka; + +/* + * #%L + * SAMOA + * %% + * Copyright (C) 2014 - 2017 Apache Software Foundation + * %% + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * #L% + */ + + +import java.util.Properties; +import java.util.logging.Level; +import java.util.logging.Logger; +import org.apache.samoa.core.ContentEvent; +import org.apache.samoa.core.Processor; + +/** + * Destination processor that writes data to Apache Kafka + * @author pwawrzyniak + * @version 0.5.0-incubating-SNAPSHOT + * @since 0.5.0-incubating + */ +public class KafkaDestinationProcessor implements Processor { + + @Override + protected void finalize() throws Throwable { + super.finalize(); + kafkaUtils.closeProducer(); + } + + private final KafkaUtils kafkaUtils; + private final String topic; + private final KafkaSerializer serializer; + + /** + * Class constructor + * @param props Properties of Kafka Producer + * @see Kafka Producer configuration + * @param topic Topic this destination processor will write into + * @param serializer Implementation of KafkaSerializer that handles arriving data serialization + */ + public KafkaDestinationProcessor(Properties props, String topic, KafkaSerializer serializer) { + this.kafkaUtils = new KafkaUtils(null, props, 0); + this.topic = topic; + this.serializer = serializer; + } + + private KafkaDestinationProcessor(KafkaUtils kafkaUtils, String topic, KafkaSerializer serializer){ + this.kafkaUtils = kafkaUtils; + this.topic = topic; + this.serializer = serializer; + } + + @Override + public boolean process(ContentEvent event) { + try { + kafkaUtils.sendKafkaMessage(topic, serializer.serialize(event)); + } catch (Exception ex) { + Logger.getLogger(KafkaEntranceProcessor.class.getName()).log(Level.SEVERE, null, ex); + return false; + } + return true; + } + + @Override + public void onCreate(int id) { + kafkaUtils.initializeProducer(); + } + + @Override + public Processor newProcessor(Processor processor) { + KafkaDestinationProcessor kdp = (KafkaDestinationProcessor)processor; + return new KafkaDestinationProcessor(new KafkaUtils(kdp.kafkaUtils), kdp.topic, kdp.serializer); + } + +} diff --git a/samoa-api/src/main/java/org/apache/samoa/streams/kafka/KafkaEntranceProcessor.java b/samoa-api/src/main/java/org/apache/samoa/streams/kafka/KafkaEntranceProcessor.java new file mode 100644 index 00000000..7079c588 --- /dev/null +++ b/samoa-api/src/main/java/org/apache/samoa/streams/kafka/KafkaEntranceProcessor.java @@ -0,0 +1,128 @@ +/* + * Copyright 2017 The Apache Software Foundation. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.samoa.streams.kafka; + +/* + * #%L + * SAMOA + * %% + * Copyright (C) 2014 - 2017 Apache Software Foundation + * %% + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * #L% + */ + + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; +import java.util.Properties; +import java.util.logging.Level; +import java.util.logging.Logger; +import org.apache.samoa.core.ContentEvent; +import org.apache.samoa.core.EntranceProcessor; +import org.apache.samoa.core.Processor; + +/** + * Entrance processor that reads incoming messages from Apache Kafka + * @author pwawrzyniak + * @version 0.5.0-incubating-SNAPSHOT + * @since 0.5.0-incubating + */ +public class KafkaEntranceProcessor implements EntranceProcessor { + + transient private final KafkaUtils kafkaUtils; + private List buffer; + private final KafkaDeserializer deserializer; + private final String topic; + + /** + * Class constructor + * @param props Properties of Kafka consumer + * @see Apache Kafka consumer configuration + * @param topic Topic from which the messages should be read + * @param timeout Timeout used when polling Kafka for new messages + * @param deserializer Instance of the implementation of {@link KafkaDeserializer} + */ + public KafkaEntranceProcessor(Properties props, String topic, int timeout, KafkaDeserializer deserializer) { + this.kafkaUtils = new KafkaUtils(props, null, timeout); + this.deserializer = deserializer; + this.topic = topic; + } + + private KafkaEntranceProcessor(KafkaUtils kafkaUtils, KafkaDeserializer deserializer, String topic) { + this.kafkaUtils = kafkaUtils; + this.deserializer = deserializer; + this.topic = topic; + } + + @Override + public void onCreate(int id) { + this.buffer = new ArrayList<>(100); + this.kafkaUtils.initializeConsumer(Arrays.asList(this.topic)); + } + + @Override + public boolean isFinished() { + return false; + } + + @Override + public boolean hasNext() { + if (buffer.isEmpty()) { + try { + buffer.addAll(kafkaUtils.getKafkaMessages()); + } catch (Exception ex) { + Logger.getLogger(KafkaEntranceProcessor.class.getName()).log(Level.SEVERE, null, ex); + } + } + return buffer.size() > 0; + } + + @Override + public ContentEvent nextEvent() { + // assume this will never be called when buffer is empty! + return this.deserializer.deserialize(buffer.remove(buffer.size() - 1)); + } + + @Override + public boolean process(ContentEvent event) { + return false; + } + + @Override + public Processor newProcessor(Processor processor) { + KafkaEntranceProcessor kep = (KafkaEntranceProcessor) processor; + return new KafkaEntranceProcessor(new KafkaUtils(kep.kafkaUtils), kep.deserializer, kep.topic); + } + + @Override + protected void finalize() throws Throwable { + kafkaUtils.closeConsumer(); + super.finalize(); //To change body of generated methods, choose Tools | Templates. + } + +} diff --git a/samoa-api/src/main/java/org/apache/samoa/streams/kafka/KafkaJsonMapper.java b/samoa-api/src/main/java/org/apache/samoa/streams/kafka/KafkaJsonMapper.java new file mode 100644 index 00000000..c514ac07 --- /dev/null +++ b/samoa-api/src/main/java/org/apache/samoa/streams/kafka/KafkaJsonMapper.java @@ -0,0 +1,103 @@ +/* + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.samoa.streams.kafka; + +/* + * #%L + * SAMOA + * %% + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * #L% + */ +import com.google.gson.Gson; +import com.google.gson.GsonBuilder; +import com.google.gson.InstanceCreator; +import com.google.gson.JsonDeserializationContext; +import com.google.gson.JsonDeserializer; +import com.google.gson.JsonElement; +import com.google.gson.JsonObject; +import com.google.gson.JsonParseException; +import java.lang.reflect.Type; +import java.nio.charset.Charset; +import org.apache.samoa.instances.DenseInstanceData; +import org.apache.samoa.instances.InstanceData; +import org.apache.samoa.learners.InstanceContentEvent; + +/** + * Sample class for serializing and deserializing {@link InstanceContentEvent} + * from/to JSON format + * + * @author pwawrzyniak + * @version 0.5.0-incubating-SNAPSHOT + * @since 0.5.0-incubating + */ +public class KafkaJsonMapper implements KafkaDeserializer, KafkaSerializer { + + private final transient Gson gson; + private final Charset charset; + + /** + * Class constructor + * + * @param charset Charset to be used for bytes parsing + */ + public KafkaJsonMapper(Charset charset) { + this.gson = new GsonBuilder().registerTypeAdapter(InstanceData.class, new InstanceDataCustomDeserializer()).create(); + this.charset = charset; + } + + @Override + public InstanceContentEvent deserialize(byte[] message) { + return gson.fromJson(new String(message, this.charset), InstanceContentEvent.class); + } + + @Override + public byte[] serialize(InstanceContentEvent message) { + return gson.toJson(message).getBytes(this.charset); + } + + //Unused + public class InstanceDataCreator implements InstanceCreator { + + @Override + public InstanceData createInstance(Type type) { + return new DenseInstanceData(); + } + } + + public class InstanceDataCustomDeserializer implements JsonDeserializer { + + @Override + public DenseInstanceData deserialize(JsonElement je, Type type, JsonDeserializationContext jdc) throws JsonParseException { + double[] attributeValues = null; + JsonObject obj = (JsonObject) je; + attributeValues = jdc.deserialize(obj.get("attributeValues"), double[].class); + DenseInstanceData did = new DenseInstanceData(attributeValues); + return did; + } + + } + +} diff --git a/samoa-api/src/main/java/org/apache/samoa/streams/kafka/KafkaSerializer.java b/samoa-api/src/main/java/org/apache/samoa/streams/kafka/KafkaSerializer.java new file mode 100644 index 00000000..ad6bd8e6 --- /dev/null +++ b/samoa-api/src/main/java/org/apache/samoa/streams/kafka/KafkaSerializer.java @@ -0,0 +1,56 @@ +/* + * Copyright 2017 The Apache Software Foundation. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.samoa.streams.kafka; + +/* + * #%L + * SAMOA + * %% + * Copyright (C) 2014 - 2017 Apache Software Foundation + * %% + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * #L% + */ + + +import org.apache.samoa.core.ContentEvent; + +/** + * + * @author pwawrzyniak + * @param the class that would be serialized + */ +public interface KafkaSerializer { + + // TODO: Consider Key-Value schema? + + /** + * Method that provides serialization algorithm + * @param message Message received from topology, to be serialized + * @return Serialized form of the message + */ + byte[] serialize(T message); +} diff --git a/samoa-api/src/main/java/org/apache/samoa/streams/kafka/KafkaTask.java b/samoa-api/src/main/java/org/apache/samoa/streams/kafka/KafkaTask.java new file mode 100644 index 00000000..26012f2f --- /dev/null +++ b/samoa-api/src/main/java/org/apache/samoa/streams/kafka/KafkaTask.java @@ -0,0 +1,148 @@ +/* + * Copyright 2017 The Apache Software Foundation. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.samoa.streams.kafka; + +/* + * #%L + * SAMOA + * %% + * Copyright (C) 2014 - 2017 Apache Software Foundation + * %% + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * #L% + */ + +import java.text.SimpleDateFormat; +import java.util.Date; +import java.util.Properties; + +import org.apache.samoa.tasks.Task; +import org.apache.samoa.topology.ComponentFactory; +import org.apache.samoa.topology.Stream; +import org.apache.samoa.topology.Topology; +import org.apache.samoa.topology.TopologyBuilder; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.github.javacliparser.Configurable; +import com.github.javacliparser.IntOption; +import com.github.javacliparser.StringOption; + +/** + * Kafka task + * + * @author Jakub Jankowski + * @version 0.5.0-incubating-SNAPSHOT + * @since 0.5.0-incubating + * + */ + +public class KafkaTask implements Task, Configurable { + + private static final long serialVersionUID = 3984474041982397855L; + private static Logger logger = LoggerFactory.getLogger(KafkaTask.class); + + //czy identyczne dla enterance i destination? + Properties producerProps; + Properties consumerProps; + int timeout; + private final KafkaDeserializer deserializer; + private final KafkaSerializer serializer; + private final String topic; + + private TopologyBuilder builder; + private Topology kafkaTopology; + + public IntOption kafkaParallelismOption = new IntOption("parallelismOption", 'p', + "Number of destination Processors", 1, 1, Integer.MAX_VALUE); + + public StringOption evaluationNameOption = new StringOption("evaluationName", 'n', "Identifier of the evaluation", + "KafkaTask" + new SimpleDateFormat("yyyyMMddHHmmss").format(new Date())); + + /** + * Class constructor + * @param props Properties of Kafka Producer and Consumer + * @see Kafka Producer configuration + * @see Kafka Consumer configuration + * @param topic Topic to which destination processor will write into + * @param timeout Timeout used when polling Kafka for new messages + * @param serializer Implementation of KafkaSerializer that handles arriving data serialization + * @param serializer Implementation of KafkaDeserializer that handles arriving data deserialization + */ + public KafkaTask(Properties producerProps, Properties consumerProps, String topic, int timeout, KafkaSerializer serializer, KafkaDeserializer deserializer) { + this.producerProps = producerProps; + this.consumerProps = consumerProps; + this.deserializer = deserializer; + this.serializer = serializer; + this.topic = topic; + this.timeout = timeout; + } + + @Override + public void init() { + logger.info("Invoking init"); + if (builder == null) { + builder = new TopologyBuilder(); + logger.info("Successfully instantiating TopologyBuilder"); + + builder.initTopology(evaluationNameOption.getValue()); + logger.info("Successfully initializing SAMOA topology with name {}", evaluationNameOption.getValue()); + } + + // create enterance processor + KafkaEntranceProcessor sourceProcessor = new KafkaEntranceProcessor(consumerProps, topic, timeout, deserializer); + builder.addEntranceProcessor(sourceProcessor); + + // create stream + Stream stream = builder.createStream(sourceProcessor); + + // create destination processor + KafkaDestinationProcessor destProcessor = new KafkaDestinationProcessor(producerProps, topic, serializer); + builder.addProcessor(destProcessor, kafkaParallelismOption.getValue()); + builder.connectInputShuffleStream(stream, destProcessor); + + // build topology + kafkaTopology = builder.build(); + logger.info("Successfully built the topology"); + } + + @Override + public Topology getTopology() { + return kafkaTopology; + } + + @Override + public void setFactory(ComponentFactory factory) { + logger.info("Invoking setFactory: "+factory.toString()); + builder = new TopologyBuilder(factory); + logger.info("Successfully instantiating TopologyBuilder"); + + builder.initTopology(evaluationNameOption.getValue()); + logger.info("Successfully initializing SAMOA topology with name {}", evaluationNameOption.getValue()); + + } + +} diff --git a/samoa-api/src/main/java/org/apache/samoa/streams/kafka/KafkaUtils.java b/samoa-api/src/main/java/org/apache/samoa/streams/kafka/KafkaUtils.java new file mode 100644 index 00000000..75b54021 --- /dev/null +++ b/samoa-api/src/main/java/org/apache/samoa/streams/kafka/KafkaUtils.java @@ -0,0 +1,152 @@ +/* + * Copyright 2017 The Apache Software Foundation. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.samoa.streams.kafka; + +/* + * #%L + * SAMOA + * %% + * Copyright (C) 2014 - 2017 Apache Software Foundation + * %% + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * #L% + */ +import java.util.ArrayList; +import java.util.Collection; +import java.util.Iterator; +import java.util.List; +import java.util.Properties; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; +import java.util.logging.Level; +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.clients.consumer.ConsumerRecords; +import org.apache.kafka.clients.consumer.KafkaConsumer; +import org.apache.kafka.clients.producer.KafkaProducer; +import org.apache.kafka.clients.producer.ProducerRecord; +import java.util.logging.Level; +import java.util.logging.Logger; + +/** + * Internal class responsible for Kafka Stream handling (both consume and + * produce) + * + * @author pwawrzyniak + * @version 0.5.0-incubating-SNAPSHOT + * @since 0.5.0-incubating + */ +class KafkaUtils { + + private transient KafkaConsumerThread kafkaConsumerThread; + + private transient KafkaProducer producer; + + // Properties of the consumer, as defined in Kafka documentation + private final Properties consumerProperties; + private final Properties producerProperties; + + // Timeout for Kafka Consumer + private long consumerTimeout; + + + /** + * Class constructor + * + * @param consumerProperties Properties of consumer + * @param producerProperties Properties of producer + * @param consumerTimeout Timeout for consumer poll requests + */ + public KafkaUtils(Properties consumerProperties, Properties producerProperties, long consumerTimeout) { + this.consumerProperties = consumerProperties; + this.producerProperties = producerProperties; + this.consumerTimeout = consumerTimeout; + } + + /** + * Creates new KafkaUtils from existing instance + * @param kafkaUtils Instance of KafkaUtils + */ + KafkaUtils(KafkaUtils kafkaUtils) { + this.consumerProperties = kafkaUtils.consumerProperties; + this.producerProperties = kafkaUtils.producerProperties; + this.consumerTimeout = kafkaUtils.consumerTimeout; + } + + /** + * Method used to initialize Kafka Consumer Thread, i.e. instantiate it and + * subscribe to configured topic + * + * @param topics List of Kafka topics that consumer should subscribe to + */ + public void initializeConsumer(Collection topics) { + kafkaConsumerThread = new KafkaConsumerThread(consumerProperties, topics, consumerTimeout); + kafkaConsumerThread.start(); + } + + public void closeConsumer() { + kafkaConsumerThread.close(); + } + + public void initializeProducer() { + // lazy instantiation + if (producer == null) { + producer = new KafkaProducer<>(producerProperties); + } + } + + public void closeProducer(){ + if(producer != null){ + producer.close(1, TimeUnit.MINUTES); + } + } + + /** + * Method for reading new messages from Kafka topics + * + * @return Collection of read messages + * @throws Exception Exception is thrown when consumer was not initialized + * or is not subscribed to any topic. + */ + public List getKafkaMessages() throws Exception { + return kafkaConsumerThread.getKafkaMessages(); + } + + public long sendKafkaMessage(String topic, byte[] message) { + if (producer != null) { + try{ + ProducerRecord record = new ProducerRecord(topic, message); + long offset = producer.send(record).get(10, TimeUnit.SECONDS).offset(); + producer.flush(); + return offset; + } catch(InterruptedException | ExecutionException | TimeoutException e){ + Logger.getLogger(KafkaUtils.class.getName()).log(Level.SEVERE, null, e); + } + + } + return -1; + } +} diff --git a/samoa-api/src/main/resources/kafka.avsc b/samoa-api/src/main/resources/kafka.avsc new file mode 100644 index 00000000..f5f12cf2 --- /dev/null +++ b/samoa-api/src/main/resources/kafka.avsc @@ -0,0 +1,106 @@ +[ +{ + "namespace": "org.apache.samoa.streams.kafka.temp", + "type": "record", + "name": "BurrTest", + "fields": [ + {"name":"name", "type": "string"}, + {"name":"atrs", "type": {"type": "array", "items": "string"}}, + {"name":"nums", "type": {"type": "array", "items": "int"}}, + {"name":"list", "type": {"type": "array", "items": "string"}} + ] +}, +{ + "namespace": "org.apache.samoa.instances", + "type": "record", + "name": "Instance", + "fields": [ + ] +}, +{ + "namespace": "org.apache.samoa.instances", + "type": "record", + "name": "InstanceData", + "fields": [ + ] +}, +{ + "namespace": "org.apache.samoa.instances", + "type": "record", + "name": "SingleClassInstanceData", + "fields": [ + {"name":"classValue", "type": "double"} + ] +}, +{ + "namespace": "org.apache.samoa.instances", + "type": "record", + "name": "DenseInstanceData", + "fields": [ + {"name":"attributeValues", "type": {"type": "array", "items": "double"}} + ] +}, +{ + "namespace": "org.apache.samoa.instances", + "type": "record", + "name": "SparseInstanceData", + "fields": [ + {"name":"attributeValues", "type": {"type": "array", "items": "double"}}, + {"name":"indexValues", "type": {"type": "array", "items": "int"}}, + {"name":"numberAttributes", "type": "int"} + ] +}, +{ + "namespace": "org.apache.samoa.instances", + "type": "record", + "name": "SingleLabelInstance", + "fields": [ + {"name": "weight", "type": "double"}, + {"name": "instanceData", "type": ["null", "org.apache.samoa.instances.InstanceData", "org.apache.samoa.instances.DenseInstanceData", "org.apache.samoa.instances.SparseInstanceData", "org.apache.samoa.instances.SingleClassInstanceData"]}, + {"name": "classData", "type": ["null", "org.apache.samoa.instances.InstanceData", "org.apache.samoa.instances.DenseInstanceData", "org.apache.samoa.instances.SparseInstanceData", "org.apache.samoa.instances.SingleClassInstanceData"]} + ] +}, +{ + "namespace": "org.apache.samoa.instances", + "type": "record", + "name": "DenseInstance", + "fields": [ + {"name": "weight", "type": "double"}, + {"name": "instanceData", "type": ["null", "org.apache.samoa.instances.InstanceData", "org.apache.samoa.instances.DenseInstanceData", "org.apache.samoa.instances.SparseInstanceData", "org.apache.samoa.instances.SingleClassInstanceData"]}, + {"name": "classData", "type": ["null", "org.apache.samoa.instances.InstanceData", "org.apache.samoa.instances.DenseInstanceData", "org.apache.samoa.instances.SparseInstanceData", "org.apache.samoa.instances.SingleClassInstanceData"]} + ] +}, +{ + "namespace": "org.apache.samoa.core", + "type": "record", + "name": "SerializableInstance", + "fields": [ + {"name": "weight", "type": "double"}, + {"name": "instanceData", "type": ["null", "org.apache.samoa.instances.InstanceData", "org.apache.samoa.instances.DenseInstanceData", "org.apache.samoa.instances.SparseInstanceData", "org.apache.samoa.instances.SingleClassInstanceData"]}, + {"name": "classData", "type": ["null", "org.apache.samoa.instances.InstanceData", "org.apache.samoa.instances.DenseInstanceData", "org.apache.samoa.instances.SparseInstanceData", "org.apache.samoa.instances.SingleClassInstanceData"]} + ] +}, +{ + "namespace": "org.apache.samoa.learners", + "type": "record", + "name": "InstanceContent", + "fields": [ + {"name": "instanceIndex", "type": "long"}, + {"name": "classifierIndex", "type": "int"}, + {"name": "evaluationIndex", "type": "int"}, + {"name":"instance", "type":"org.apache.samoa.core.SerializableInstance"}, + {"name": "isTraining", "type": "boolean"}, + {"name": "isTesting", "type": "boolean"}, + {"name": "isLast", "type": "boolean"} + ] +}, +{ + "namespace": "org.apache.samoa.learners", + "type": "record", + "name": "InstanceContentEvent", + "fields": [ + {"name": "instanceContent", "type": "org.apache.samoa.learners.InstanceContent"} + ] +} +] + diff --git a/samoa-api/src/test/java/org/apache/samoa/streams/kafka/KafkaDestinationProcessorTest.java b/samoa-api/src/test/java/org/apache/samoa/streams/kafka/KafkaDestinationProcessorTest.java new file mode 100644 index 00000000..2d594569 --- /dev/null +++ b/samoa-api/src/test/java/org/apache/samoa/streams/kafka/KafkaDestinationProcessorTest.java @@ -0,0 +1,180 @@ +/* + * Copyright 2017 The Apache Software Foundation. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.samoa.streams.kafka; + +/* + * #%L + * SAMOA + * %% + * Copyright (C) 2014 - 2017 Apache Software Foundation + * %% + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * #L% + */ +import java.io.IOException; +import java.nio.charset.Charset; +import java.nio.file.Files; +import java.util.Arrays; +import java.util.Iterator; +import java.util.Properties; +import java.util.Random; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.TimeoutException; +import java.util.logging.Level; +import java.util.logging.Logger; +import kafka.admin.AdminUtils; +import kafka.admin.RackAwareMode; +import kafka.server.KafkaConfig; +import kafka.server.KafkaServer; +import kafka.utils.MockTime; +import kafka.utils.TestUtils; +import kafka.utils.ZKStringSerializer$; +import kafka.utils.ZkUtils; +import kafka.zk.EmbeddedZookeeper; +import org.I0Itec.zkclient.ZkClient; +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.clients.consumer.ConsumerRecords; +import org.apache.kafka.clients.consumer.KafkaConsumer; +import org.apache.kafka.common.utils.Time; +import org.apache.samoa.instances.InstancesHeader; +import org.apache.samoa.learners.InstanceContentEvent; +import org.junit.After; +import org.junit.AfterClass; +import static org.junit.Assert.*; +import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.Test; + +/** + * + * @author pwawrzyniak + */ +public class KafkaDestinationProcessorTest { + + private static final String ZKHOST = "127.0.0.1"; + private static final String BROKERHOST = "127.0.0.1"; + private static final String BROKERPORT = "9092"; + private static final String TOPIC = "test-kdp"; + private static final int NUM_INSTANCES = 11111; + private static final int CONSUMER_TIMEOUT = 1000; + + private static KafkaServer kafkaServer; + private static EmbeddedZookeeper zkServer; + private static ZkClient zkClient; + private static String zkConnect; + + public KafkaDestinationProcessorTest() { + } + + @BeforeClass + public static void setUpClass() throws IOException { + // setup Zookeeper + zkServer = new EmbeddedZookeeper(); + zkConnect = ZKHOST + ":" + zkServer.port(); + zkClient = new ZkClient(zkConnect, 30000, 30000, ZKStringSerializer$.MODULE$); + ZkUtils zkUtils = ZkUtils.apply(zkClient, false); + + // setup Broker + Properties brokerProps = new Properties(); + brokerProps.setProperty("zookeeper.connect", zkConnect); + brokerProps.setProperty("broker.id", "0"); + brokerProps.setProperty("log.dirs", Files.createTempDirectory("kafka-").toAbsolutePath().toString()); + brokerProps.setProperty("listeners", "PLAINTEXT://" + BROKERHOST + ":" + BROKERPORT); + KafkaConfig config = new KafkaConfig(brokerProps); + Time mock = new MockTime(); + kafkaServer = TestUtils.createServer(config, mock); + + // create topic + AdminUtils.createTopic(zkUtils, TOPIC, 1, 1, new Properties(), RackAwareMode.Disabled$.MODULE$); + + } + + @AfterClass + public static void tearDownClass() { + kafkaServer.shutdown(); + zkClient.close(); + zkServer.shutdown(); + } + + @Before + public void setUp() throws IOException { + + } + + @After + public void tearDown() { + + } + + @Test + public void testSendingData() throws InterruptedException, ExecutionException, TimeoutException { + + final Logger logger = Logger.getLogger(KafkaDestinationProcessorTest.class.getName()); + Properties props = TestUtilsForKafka.getProducerProperties(BROKERHOST,BROKERPORT); + props.setProperty("auto.offset.reset", "earliest"); + KafkaDestinationProcessor kdp = new KafkaDestinationProcessor(props, TOPIC, new OosTestSerializer()); + kdp.onCreate(1); + + final int[] i = {0}; + + // prepare new thread for data receiveing + Thread th = new Thread(new Runnable() { + @Override + public void run() { + KafkaConsumer consumer = new KafkaConsumer<>(TestUtilsForKafka.getConsumerProperties(BROKERHOST, BROKERPORT)); + consumer.subscribe(Arrays.asList(TOPIC)); + while (i[0] < NUM_INSTANCES) { + try { + ConsumerRecords cr = consumer.poll(CONSUMER_TIMEOUT); + Iterator> it = cr.iterator(); + while (it.hasNext()) { + ConsumerRecord record = it.next(); + i[0]++; + } + } catch (Exception ex) { + Logger.getLogger(KafkaDestinationProcessorTest.class.getName()).log(Level.SEVERE, null, ex); + } + } + consumer.close(); + } + }); + th.start(); + + int z = 0; + Random r = new Random(); + InstancesHeader header = TestUtilsForKafka.generateHeader(10); + + for (z = 0; z < NUM_INSTANCES; z++) { + InstanceContentEvent event = TestUtilsForKafka.getData(r, 10, header); + kdp.process(event); +// logger.log(Level.INFO, "{0} {1}", new Object[]{"Sent item with id: ", z}); + } + + // wait for all instances to be read + Thread.sleep(2 * CONSUMER_TIMEOUT); + assertEquals("Number of sent and received instances", z, i[0]); + } +} diff --git a/samoa-api/src/test/java/org/apache/samoa/streams/kafka/KafkaEntranceProcessorTest.java b/samoa-api/src/test/java/org/apache/samoa/streams/kafka/KafkaEntranceProcessorTest.java new file mode 100644 index 00000000..b8b5c72f --- /dev/null +++ b/samoa-api/src/test/java/org/apache/samoa/streams/kafka/KafkaEntranceProcessorTest.java @@ -0,0 +1,187 @@ +/* + * Copyright 2017 The Apache Software Foundation. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.samoa.streams.kafka; + +/* + * #%L + * SAMOA + * %% + * Copyright (C) 2014 - 2017 Apache Software Foundation + * %% + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * #L% + */ +import com.google.gson.Gson; +import java.io.IOException; +import java.nio.charset.Charset; +import java.nio.file.Files; +import java.util.ArrayList; +import java.util.List; +import java.util.Properties; +import java.util.Random; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; +import java.util.logging.Level; +import java.util.logging.Logger; +import org.apache.samoa.learners.InstanceContentEvent; +import org.junit.After; +import org.junit.AfterClass; +import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.Test; +import static org.junit.Assert.*; +import kafka.admin.AdminUtils; +import kafka.admin.RackAwareMode; +import kafka.server.KafkaConfig; +import kafka.server.KafkaServer; +import kafka.utils.MockTime; +import kafka.utils.TestUtils; +import org.apache.kafka.common.utils.Time; +import kafka.utils.ZKStringSerializer$; +import kafka.utils.ZkUtils; +import kafka.zk.EmbeddedZookeeper; +import org.I0Itec.zkclient.ZkClient; +import org.apache.kafka.clients.producer.KafkaProducer; +import org.apache.kafka.clients.producer.ProducerRecord; +import org.apache.samoa.instances.InstancesHeader; + +/** + * + * @author pwawrzyniak + * @author Jakub Jankowski + */ +public class KafkaEntranceProcessorTest { + + private static final String ZKHOST = "127.0.0.1"; + private static final String BROKERHOST = "127.0.0.1"; + private static final String BROKERPORT = "9092"; + private static final String TOPIC_AVRO = "samoa_test-avro"; + private static final String TOPIC_JSON = "samoa_test-json"; + private static final int NUM_INSTANCES = 11111; + + private static KafkaServer kafkaServer; + private static EmbeddedZookeeper zkServer; + private static ZkClient zkClient; + private static String zkConnect; + private static int TIMEOUT = 1000; + + public KafkaEntranceProcessorTest() { + } + + @BeforeClass + public static void setUpClass() throws IOException { + // setup Zookeeper + zkServer = new EmbeddedZookeeper(); + zkConnect = ZKHOST + ":" + zkServer.port(); + zkClient = new ZkClient(zkConnect, 30000, 30000, ZKStringSerializer$.MODULE$); + ZkUtils zkUtils = ZkUtils.apply(zkClient, false); + + // setup Broker + Properties brokerProps = new Properties(); + brokerProps.setProperty("zookeeper.connect", zkConnect); + brokerProps.setProperty("broker.id", "0"); + brokerProps.setProperty("log.dirs", Files.createTempDirectory("kafka-").toAbsolutePath().toString()); + brokerProps.setProperty("listeners", "PLAINTEXT://" + BROKERHOST + ":" + BROKERPORT); + KafkaConfig config = new KafkaConfig(brokerProps); + Time mock = new MockTime(); + kafkaServer = TestUtils.createServer(config, mock); + + // create topics + AdminUtils.createTopic(zkUtils, TOPIC_AVRO, 1, 1, new Properties(), RackAwareMode.Disabled$.MODULE$); + AdminUtils.createTopic(zkUtils, TOPIC_JSON, 1, 1, new Properties(), RackAwareMode.Disabled$.MODULE$); + + } + + @AfterClass + public static void tearDownClass() { + try { + kafkaServer.shutdown(); + zkClient.close(); + zkServer.shutdown(); + } catch (Exception ex) { + Logger.getLogger(KafkaEntranceProcessorTest.class.getName()).log(Level.SEVERE, null, ex); + } + } + + @Before + public void setUp() throws IOException { + + } + + @After + public void tearDown() { + + } + + @Test + public void testFetchingNewData() throws InterruptedException, ExecutionException, TimeoutException { + + final Logger logger = Logger.getLogger(KafkaEntranceProcessorTest.class.getName()); + logger.log(Level.INFO, "OOS"); + logger.log(Level.INFO, "testFetchingNewData"); + Properties props = TestUtilsForKafka.getConsumerProperties(BROKERHOST, BROKERPORT); + props.setProperty("auto.offset.reset", "earliest"); + KafkaEntranceProcessor kep = new KafkaEntranceProcessor(props, TOPIC_JSON, TIMEOUT, new OosTestSerializer()); + + kep.onCreate(1); + + // prepare new thread for data producing + Thread th = new Thread(new Runnable() { + @Override + public void run() { + KafkaProducer producer = new KafkaProducer<>(TestUtilsForKafka.getProducerProperties(BROKERHOST, BROKERPORT)); + + Random r = new Random(); + InstancesHeader header = TestUtilsForKafka.generateHeader(10); + OosTestSerializer serializer = new OosTestSerializer(); + int i = 0; + for (i = 0; i < NUM_INSTANCES; i++) { + try { + InstanceContentEvent event = TestUtilsForKafka.getData(r, 10, header); + + ProducerRecord record = new ProducerRecord(TOPIC_JSON, serializer.serialize(event)); + long stat = producer.send(record).get(10, TimeUnit.SECONDS).offset(); + } catch (InterruptedException | ExecutionException | TimeoutException ex) { + Logger.getLogger(KafkaEntranceProcessorTest.class.getName()).log(Level.SEVERE, null, ex); + } + } + producer.flush(); + producer.close(); + } + }); + th.start(); + + int z = 0; + while (z < NUM_INSTANCES && kep.hasNext()) { + InstanceContentEvent event = (InstanceContentEvent) kep.nextEvent(); + z++; + } + + assertEquals("Number of sent and received instances", NUM_INSTANCES, z); + + } +} diff --git a/samoa-api/src/test/java/org/apache/samoa/streams/kafka/KafkaEntranceProcessorWithJsonTest.java b/samoa-api/src/test/java/org/apache/samoa/streams/kafka/KafkaEntranceProcessorWithJsonTest.java new file mode 100644 index 00000000..061bbf4e --- /dev/null +++ b/samoa-api/src/test/java/org/apache/samoa/streams/kafka/KafkaEntranceProcessorWithJsonTest.java @@ -0,0 +1,180 @@ +/* + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.samoa.streams.kafka; + +/* + * #%L + * SAMOA + * %% + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * #L% + */ +import com.google.gson.Gson; +import java.io.IOException; +import java.nio.charset.Charset; +import java.nio.file.Files; +import java.util.Properties; +import java.util.Random; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; +import java.util.logging.Level; +import java.util.logging.Logger; +import kafka.admin.AdminUtils; +import kafka.admin.RackAwareMode; +import kafka.server.KafkaConfig; +import kafka.server.KafkaServer; +import kafka.utils.MockTime; +import kafka.utils.TestUtils; +import kafka.utils.ZKStringSerializer$; +import kafka.utils.ZkUtils; +import kafka.zk.EmbeddedZookeeper; +import org.I0Itec.zkclient.ZkClient; +import org.apache.kafka.clients.producer.KafkaProducer; +import org.apache.kafka.clients.producer.ProducerRecord; +import org.apache.kafka.common.utils.Time; +import org.apache.samoa.instances.InstancesHeader; +import org.apache.samoa.learners.InstanceContentEvent; +import org.junit.After; +import org.junit.AfterClass; +import static org.junit.Assert.*; +import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.Test; + +/** + * + * @author pwawrzyniak + * @author Jakub Jankowski + */ +public class KafkaEntranceProcessorWithJsonTest { + + private static final String ZKHOST = "127.0.0.1"; + private static final String BROKERHOST = "127.0.0.1"; + private static final String BROKERPORT = "9092"; + private static final String TOPIC_JSON = "samoa_test-json"; + private static final int NUM_INSTANCES = 11111; + + private static KafkaServer kafkaServer; + private static EmbeddedZookeeper zkServer; + private static ZkClient zkClient; + private static String zkConnect; + private static int TIMEOUT = 1000; + + public KafkaEntranceProcessorWithJsonTest() { + } + + @BeforeClass + public static void setUpClass() throws IOException { + // setup Zookeeper + zkServer = new EmbeddedZookeeper(); + zkConnect = ZKHOST + ":" + zkServer.port(); + zkClient = new ZkClient(zkConnect, 30000, 30000, ZKStringSerializer$.MODULE$); + ZkUtils zkUtils = ZkUtils.apply(zkClient, false); + + // setup Broker + Properties brokerProps = new Properties(); + brokerProps.setProperty("zookeeper.connect", zkConnect); + brokerProps.setProperty("broker.id", "0"); + brokerProps.setProperty("log.dirs", Files.createTempDirectory("kafka-").toAbsolutePath().toString()); + brokerProps.setProperty("listeners", "PLAINTEXT://" + BROKERHOST + ":" + BROKERPORT); + KafkaConfig config = new KafkaConfig(brokerProps); + Time mock = new MockTime(); + kafkaServer = TestUtils.createServer(config, mock); + + // create topics + AdminUtils.createTopic(zkUtils, TOPIC_JSON, 1, 1, new Properties(), RackAwareMode.Disabled$.MODULE$); + + } + + @AfterClass + public static void tearDownClass() { + try { + kafkaServer.shutdown(); + zkClient.close(); + zkServer.shutdown(); + } catch (Exception ex) { + Logger.getLogger(KafkaEntranceProcessorWithJsonTest.class.getName()).log(Level.SEVERE, null, ex); + } + } + + @Before + public void setUp() throws IOException { + + } + + @After + public void tearDown() { + + } + +@Test + public void testFetchingNewDataWithJson() throws InterruptedException, ExecutionException, TimeoutException { + + final Logger logger = Logger.getLogger(KafkaEntranceProcessorTest.class.getName()); + logger.log(Level.INFO, "JSON"); + logger.log(Level.INFO, "testFetchingNewDataWithJson"); + Properties props = TestUtilsForKafka.getConsumerProperties(BROKERHOST, BROKERPORT); + props.setProperty("auto.offset.reset", "earliest"); + KafkaEntranceProcessor kep = new KafkaEntranceProcessor(props, TOPIC_JSON, TIMEOUT, new KafkaJsonMapper(Charset.defaultCharset())); + + kep.onCreate(1); + + // prepare new thread for data producing + Thread th = new Thread(new Runnable() { + @Override + public void run() { + KafkaProducer producer = new KafkaProducer<>(TestUtilsForKafka.getProducerProperties(BROKERHOST,BROKERPORT)); + + Random r = new Random(); + InstancesHeader header = TestUtilsForKafka.generateHeader(10); + Gson gson = new Gson(); + int i = 0; + for (i = 0; i < NUM_INSTANCES; i++) { + try { + InstanceContentEvent event = TestUtilsForKafka.getData(r, 10, header); + + ProducerRecord record = new ProducerRecord(TOPIC_JSON, gson.toJson(event).getBytes()); + long stat = producer.send(record).get(10, TimeUnit.SECONDS).offset(); + } catch (InterruptedException | ExecutionException | TimeoutException ex) { + Logger.getLogger(KafkaEntranceProcessorTest.class.getName()).log(Level.SEVERE, null, ex); + } + } + producer.flush(); + producer.close(); + } + }); + th.start(); + + int z = 0; + while (z < NUM_INSTANCES && kep.hasNext()) { + InstanceContentEvent event = (InstanceContentEvent) kep.nextEvent(); + z++; + } + + assertEquals("Number of sent and received instances", NUM_INSTANCES, z); + + } +} diff --git a/samoa-api/src/test/java/org/apache/samoa/streams/kafka/KafkaTaskTest.java b/samoa-api/src/test/java/org/apache/samoa/streams/kafka/KafkaTaskTest.java new file mode 100644 index 00000000..4215b086 --- /dev/null +++ b/samoa-api/src/test/java/org/apache/samoa/streams/kafka/KafkaTaskTest.java @@ -0,0 +1,157 @@ +/* + * Copyright 2017 The Apache Software Foundation. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.samoa.streams.kafka; + +import java.io.IOException; +import java.util.Properties; +import java.util.Random; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; +import java.util.logging.Level; +import java.util.logging.Logger; + + +import org.I0Itec.zkclient.ZkClient; +import org.junit.After; +import org.junit.AfterClass; +import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.Ignore; +import org.junit.Test; + +import kafka.server.KafkaServer; +import kafka.zk.EmbeddedZookeeper; +import org.apache.kafka.clients.producer.KafkaProducer; +import org.apache.kafka.clients.producer.ProducerRecord; +import org.apache.samoa.instances.InstancesHeader; +import org.apache.samoa.streams.kafka.topology.SimpleComponentFactory; +import org.apache.samoa.streams.kafka.topology.SimpleEngine; + +/* + * #%L + * SAMOA + * %% + * Copyright (C) 2014 - 2017 Apache Software Foundation + * %% + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * #L% + */ +/** + * + * @author Jakub Jankowski + */ +@Ignore +public class KafkaTaskTest { + + private static final String ZKHOST = "127.0.0.1";//10.255.251.202"; //10.255.251.202 + private static final String BROKERHOST = "127.0.0.1";//"10.255.251.214"; //10.255.251.214 + private static final String BROKERPORT = "9092"; //6667, local: 9092 + private static final String TOPIC = "samoa_test"; //samoa_test, local: test + private static final int NUM_INSTANCES = 125922; + + private static KafkaServer kafkaServer; + private static EmbeddedZookeeper zkServer; + private static ZkClient zkClient; + private static String zkConnect; + + @BeforeClass + public static void setUpClass() throws IOException { + // setup Zookeeper +// zkServer = new EmbeddedZookeeper(); +// zkConnect = ZKHOST + ":" + "2181"; //+ zkServer.port(); +// zkClient = new ZkClient(zkConnect, 30000, 30000, ZKStringSerializer$.MODULE$); +// ZkUtils zkUtils = ZkUtils.apply(zkClient, false); + + // setup Broker + /*Properties brokerProps = new Properties(); + brokerProps.setProperty("zookeeper.connect", zkConnect); + brokerProps.setProperty("broker.id", "0"); + brokerProps.setProperty("log.dirs", Files.createTempDirectory("kafka-").toAbsolutePath().toString()); + brokerProps.setProperty("listeners", "PLAINTEXT://" + BROKERHOST + ":" + BROKERPORT); + KafkaConfig config = new KafkaConfig(brokerProps); + Time mock = new MockTime(); + kafkaServer = TestUtils.createServer(config, mock);*/ + // create topic + //AdminUtils.createTopic(zkUtils, TOPIC, 1, 1, new Properties(), RackAwareMode.Disabled$.MODULE$); + } + + @AfterClass + public static void tearDownClass() { + //kafkaServer.shutdown(); +// zkClient.close(); +// zkServer.shutdown(); + } + + @Before + public void setUp() throws IOException { + + } + + @After + public void tearDown() { + + } + + @Test + public void testKafkaTask() throws InterruptedException, ExecutionException, TimeoutException { + Logger logger = Logger.getLogger(KafkaTaskTest.class.getName()); + logger.log(Level.INFO, "KafkaTask"); + Properties producerProps = TestUtilsForKafka.getProducerProperties(BROKERHOST, BROKERPORT); + Properties consumerProps = TestUtilsForKafka.getConsumerProperties(BROKERHOST, BROKERPORT); + + KafkaTask task = new KafkaTask(producerProps, consumerProps, "kafkaTaskTest", 10000, new OosTestSerializer(), new OosTestSerializer()); + task.setFactory(new SimpleComponentFactory()); + task.init(); + SimpleEngine.submitTopology(task.getTopology()); + + Thread th = new Thread(new Runnable() { + @Override + public void run() { + KafkaProducer producer = new KafkaProducer<>(TestUtilsForKafka.getProducerProperties(BROKERHOST, BROKERPORT)); + + Random r = new Random(); + InstancesHeader header = TestUtilsForKafka.generateHeader(10); + OosTestSerializer serializer = new OosTestSerializer(); + int i = 0; + for (i = 0; i < NUM_INSTANCES; i++) { + try { + ProducerRecord record = new ProducerRecord(TOPIC, serializer.serialize(TestUtilsForKafka.getData(r, 10, header))); + long stat = producer.send(record).get(10, TimeUnit.DAYS).offset(); +// Thread.sleep(5); + Logger.getLogger(KafkaEntranceProcessorTest.class.getName()).log(Level.INFO, "Sent message with ID={0} to Kafka!, offset={1}", new Object[]{i, stat}); + } catch (InterruptedException | ExecutionException | TimeoutException ex) { + Logger.getLogger(KafkaEntranceProcessorTest.class.getName()).log(Level.SEVERE, null, ex); + } + } + producer.flush(); + producer.close(); + } + }); + th.start(); + + } +} diff --git a/samoa-api/src/test/java/org/apache/samoa/streams/kafka/KafkaUtilsTest.java b/samoa-api/src/test/java/org/apache/samoa/streams/kafka/KafkaUtilsTest.java new file mode 100644 index 00000000..8f77504b --- /dev/null +++ b/samoa-api/src/test/java/org/apache/samoa/streams/kafka/KafkaUtilsTest.java @@ -0,0 +1,247 @@ +/* + * Copyright 2017 The Apache Software Foundation. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.samoa.streams.kafka; + +/* + * #%L + * SAMOA + * %% + * Copyright (C) 2014 - 2017 Apache Software Foundation + * %% + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * #L% + */ +import com.google.gson.Gson; +import java.io.IOException; +import java.nio.file.Files; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collection; +import java.util.Iterator; +import java.util.List; +import java.util.Properties; +import java.util.Random; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.TimeoutException; +import java.util.logging.Level; +import java.util.logging.Logger; +import kafka.admin.AdminUtils; +import kafka.admin.RackAwareMode; +import kafka.server.KafkaConfig; +import kafka.server.KafkaServer; +import kafka.utils.MockTime; +import kafka.utils.TestUtils; +import kafka.utils.ZKStringSerializer$; +import kafka.utils.ZkUtils; +import kafka.zk.EmbeddedZookeeper; +import org.I0Itec.zkclient.ZkClient; +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.clients.consumer.ConsumerRecords; +import org.apache.kafka.clients.consumer.KafkaConsumer; +import org.apache.kafka.clients.producer.KafkaProducer; +import org.apache.kafka.clients.producer.ProducerRecord; +import org.apache.kafka.common.utils.Time; +import org.apache.samoa.instances.InstancesHeader; +import org.junit.After; +import org.junit.AfterClass; +import static org.junit.Assert.*; +import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.Test; + +/** + * + * @author pwawrzyniak + */ +public class KafkaUtilsTest { + + private static final String ZKHOST = "127.0.0.1"; + private static final String BROKERHOST = "127.0.0.1"; + private static final String BROKERPORT = "9092"; + private static final String TOPIC_R = "test-r"; + private static final String TOPIC_S = "test-s"; + private static final int NUM_INSTANCES = 50; + + private static KafkaServer kafkaServer; + private static EmbeddedZookeeper zkServer; + private static ZkClient zkClient; + private static String zkConnect; + + private static final Logger logger = Logger.getLogger(KafkaUtilsTest.class.getCanonicalName()); + private final long CONSUMER_TIMEOUT = 1500; + + public KafkaUtilsTest() { + } + + @BeforeClass + public static void setUpClass() throws IOException { + // setup Zookeeper + zkServer = new EmbeddedZookeeper(); + zkConnect = ZKHOST + ":" + zkServer.port(); + zkClient = new ZkClient(zkConnect, 30000, 30000, ZKStringSerializer$.MODULE$); + ZkUtils zkUtils = ZkUtils.apply(zkClient, false); + + // setup Broker + Properties brokerProps = new Properties(); + brokerProps.setProperty("zookeeper.connect", zkConnect); + brokerProps.setProperty("broker.id", "0"); + brokerProps.setProperty("log.dirs", Files.createTempDirectory("kafkaUtils-").toAbsolutePath().toString()); + brokerProps.setProperty("listeners", "PLAINTEXT://" + BROKERHOST + ":" + BROKERPORT); + KafkaConfig config = new KafkaConfig(brokerProps); + Time mock = new MockTime(); + kafkaServer = TestUtils.createServer(config, mock); + + // create topics + AdminUtils.createTopic(zkUtils, TOPIC_R, 1, 1, new Properties(), RackAwareMode.Disabled$.MODULE$); + AdminUtils.createTopic(zkUtils, TOPIC_S, 1, 1, new Properties(), RackAwareMode.Disabled$.MODULE$); + + } + + @AfterClass + public static void tearDownClass() { + kafkaServer.shutdown(); + zkClient.close(); + zkServer.shutdown(); + } + + @Before + public void setUp() { + } + + @After + public void tearDown() { + } + + /** + * Test of initializeConsumer method, of class KafkaUtils. + */ + @Test + public void testInitializeConsumer() throws Exception { + logger.log(Level.INFO, "initializeConsumer"); + Collection topics = Arrays.asList(TOPIC_R); + KafkaUtils instance = new KafkaUtils(TestUtilsForKafka.getConsumerProperties(BROKERHOST,BROKERPORT), TestUtilsForKafka.getProducerProperties(BROKERHOST,BROKERPORT), CONSUMER_TIMEOUT); + assertNotNull(instance); + + instance.initializeConsumer(topics); + Thread.sleep(1000); + instance.closeConsumer(); + + Thread.sleep(CONSUMER_TIMEOUT); + + instance.initializeConsumer(topics); + Thread.sleep(1000); + instance.closeConsumer(); + assertTrue(true); + } + + /** + * Test of getKafkaMessages method, of class KafkaUtils. + */ + @Test + public void testGetKafkaMessages() throws Exception { + logger.log(Level.INFO, "getKafkaMessages"); + Collection topics = Arrays.asList(TOPIC_R); + KafkaUtils instance = new KafkaUtils(TestUtilsForKafka.getConsumerProperties(BROKERHOST,BROKERPORT), TestUtilsForKafka.getProducerProperties(BROKERHOST,BROKERPORT), CONSUMER_TIMEOUT); + assertNotNull(instance); + + logger.log(Level.INFO, "Initialising consumer"); + instance.initializeConsumer(topics); + + logger.log(Level.INFO, "Produce data"); + List expResult = sendAndGetMessages(NUM_INSTANCES); + + logger.log(Level.INFO, "Wait a moment"); + Thread.sleep(CONSUMER_TIMEOUT); + + logger.log(Level.INFO, "Get results from Kafka"); + List result = instance.getKafkaMessages(); + + assertArrayEquals(expResult.toArray(), result.toArray()); + instance.closeConsumer(); + } + + private List sendAndGetMessages(int maxNum) throws InterruptedException, ExecutionException, TimeoutException { + List ret; + try (KafkaProducer producer = new KafkaProducer<>(TestUtilsForKafka.getProducerProperties("sendM-test",BROKERHOST,BROKERPORT))) { + ret = new ArrayList<>(); + Random r = new Random(); + InstancesHeader header = TestUtilsForKafka.generateHeader(10); + Gson gson = new Gson(); + int i = 0; + for (i = 0; i < maxNum; i++) { + ProducerRecord record = new ProducerRecord(TOPIC_R, gson.toJson(TestUtilsForKafka.getData(r, 10, header)).getBytes()); + ret.add(record.value()); + producer.send(record); + } + producer.flush(); + } + return ret; + } + + /** + * Test of sendKafkaMessage method, of class KafkaUtils. + * + * @throws java.lang.InterruptedException + */ + @Test + public void testSendKafkaMessage() throws InterruptedException { + logger.log(Level.INFO, "sendKafkaMessage"); + + logger.log(Level.INFO, "Initialising producer"); + KafkaUtils instance = new KafkaUtils(TestUtilsForKafka.getConsumerProperties(BROKERHOST,BROKERPORT), TestUtilsForKafka.getProducerProperties("rcv-test", BROKERHOST,BROKERPORT), CONSUMER_TIMEOUT); + instance.initializeProducer(); + + logger.log(Level.INFO, "Initialising consumer"); + KafkaConsumer consumer; + consumer = new KafkaConsumer<>(TestUtilsForKafka.getConsumerProperties(BROKERHOST,BROKERPORT)); + consumer.subscribe(Arrays.asList(TOPIC_S)); + + logger.log(Level.INFO, "Produce data"); + List sent = new ArrayList<>(); + Random r = new Random(); + InstancesHeader header = TestUtilsForKafka.generateHeader(10); + Gson gson = new Gson(); + for (int i = 0; i < NUM_INSTANCES; i++) { + byte[] val = gson.toJson(TestUtilsForKafka.getData(r, 10, header)).getBytes(); + sent.add(val); + instance.sendKafkaMessage(TOPIC_S, val); + } + // wait for Kafka a bit :) + Thread.sleep(2*CONSUMER_TIMEOUT); + + logger.log(Level.INFO, "Get results from Kafka"); + ConsumerRecords records = consumer.poll(CONSUMER_TIMEOUT); + Iterator> it = records.iterator(); + List consumed = new ArrayList<>(); + while (it.hasNext()) { + consumed.add(it.next().value()); + } + consumer.close(); + + assertArrayEquals(sent.toArray(), consumed.toArray()); + } + +} diff --git a/samoa-api/src/test/java/org/apache/samoa/streams/kafka/OosTestSerializer.java b/samoa-api/src/test/java/org/apache/samoa/streams/kafka/OosTestSerializer.java new file mode 100644 index 00000000..649d3e01 --- /dev/null +++ b/samoa-api/src/test/java/org/apache/samoa/streams/kafka/OosTestSerializer.java @@ -0,0 +1,60 @@ +/* + * Copyright 2017 The Apache Software Foundation. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.samoa.streams.kafka; + +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.io.ObjectInputStream; +import java.io.ObjectOutputStream; +import java.util.logging.Level; +import java.util.logging.Logger; +import org.apache.samoa.learners.InstanceContentEvent; + +/** + * + * @author Piotr Wawrzyniak + */ +public class OosTestSerializer implements KafkaDeserializer, KafkaSerializer { + + @Override + public InstanceContentEvent deserialize(byte[] message) { + try { + ObjectInputStream ois = new ObjectInputStream(new ByteArrayInputStream(message)); + InstanceContentEvent ice = (InstanceContentEvent)ois.readObject(); + return ice; + } catch (IOException | ClassNotFoundException ex) { + Logger.getLogger(OosTestSerializer.class.getName()).log(Level.SEVERE, null, ex); + } + return null; + } + + @Override + public byte[] serialize(InstanceContentEvent message) { + try { + ByteArrayOutputStream baos = new ByteArrayOutputStream(); + ObjectOutputStream oos = new ObjectOutputStream(baos); + oos.writeObject(message); + oos.flush(); + return baos.toByteArray(); + } catch (IOException ex) { + Logger.getLogger(OosTestSerializer.class.getName()).log(Level.SEVERE, null, ex); + } + return null; + } + + +} diff --git a/samoa-api/src/test/java/org/apache/samoa/streams/kafka/TestUtilsForKafka.java b/samoa-api/src/test/java/org/apache/samoa/streams/kafka/TestUtilsForKafka.java new file mode 100644 index 00000000..87ab16c9 --- /dev/null +++ b/samoa-api/src/test/java/org/apache/samoa/streams/kafka/TestUtilsForKafka.java @@ -0,0 +1,143 @@ +/* + * Copyright 2017 The Apache Software Foundation. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.samoa.streams.kafka; + +/* + * #%L + * SAMOA + * %% + * Copyright (C) 2014 - 2017 Apache Software Foundation + * %% + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * #L% + */ + + +import java.util.Properties; +import java.util.Random; +import org.apache.samoa.instances.Attribute; +import org.apache.samoa.instances.DenseInstance; +import org.apache.samoa.instances.Instance; +import org.apache.samoa.instances.Instances; +import org.apache.samoa.instances.InstancesHeader; +import org.apache.samoa.learners.InstanceContentEvent; +import org.apache.samoa.moa.core.FastVector; + +/** + * + * @author pwawrzyniak + */ +public class TestUtilsForKafka { + +// private static final String BROKERHOST = "127.0.0.1"; +// private static final String BROKERPORT = "9092"; + + protected static InstanceContentEvent getData(Random instanceRandom, int numAtts, InstancesHeader header) { + double[] attVals = new double[numAtts + 1]; + double sum = 0.0; + double sumWeights = 0.0; + for (int i = 0; i < numAtts; i++) { + attVals[i] = instanceRandom.nextDouble(); + + } + int classLabel; + if (sum >= sumWeights * 0.5) { + classLabel = 1; + } else { + classLabel = 0; + } + + Instance inst = new DenseInstance(1.0, attVals); + inst.setDataset(header); + inst.setClassValue(classLabel); + + return new InstanceContentEvent(0, inst, true, false); + } + + @SuppressWarnings({"rawtypes", "unchecked"}) + protected static InstancesHeader generateHeader(int numAttributes) { + FastVector attributes = new FastVector(); + for (int i = 0; i < numAttributes; i++) { + attributes.addElement(new Attribute("att" + (i + 1))); + } + + FastVector classLabels = new FastVector(); + for (int i = 0; i < numAttributes; i++) { + classLabels.addElement("class" + (i + 1)); + } + attributes.addElement(new Attribute("class", classLabels)); + InstancesHeader streamHeader = new InstancesHeader(new Instances("test-kafka", attributes, 0)); + streamHeader.setClassIndex(streamHeader.numAttributes() - 1); + return streamHeader; + } + + + protected static Properties getProducerProperties(String BROKERHOST, String BROKERPORT) { + return getProducerProperties("test", BROKERHOST, BROKERPORT); + } + + /** + * + * @param clientId + * @return + */ + protected static Properties getProducerProperties(String clientId, String BROKERHOST, String BROKERPORT) { + Properties producerProps = new Properties(); + producerProps.setProperty("bootstrap.servers", BROKERHOST + ":" + BROKERPORT); + producerProps.setProperty("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); + producerProps.setProperty("value.serializer", "org.apache.kafka.common.serialization.ByteArraySerializer"); + producerProps.setProperty("group.id", "test"); + producerProps.setProperty("client.id", clientId); + return producerProps; + } + + protected static Properties getConsumerProperties(String BROKERHOST, String BROKERPORT) { + Properties consumerProps = new Properties(); + consumerProps.setProperty("bootstrap.servers", BROKERHOST + ":" + BROKERPORT); + consumerProps.put("enable.auto.commit", "true"); + consumerProps.put("auto.commit.interval.ms", "1000"); + consumerProps.setProperty("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); + consumerProps.setProperty("value.deserializer", "org.apache.kafka.common.serialization.ByteArrayDeserializer"); + consumerProps.setProperty("group.id", "test"); + consumerProps.setProperty("auto.offset.reset", "earliest"); + return consumerProps; + } + + protected static Properties getConsumerProducerProperties(String BROKERHOST, String BROKERPORT) { + Properties props = new Properties(); + props.setProperty("bootstrap.servers", BROKERHOST + ":" + BROKERPORT); + props.put("enable.auto.commit", "true"); + props.put("auto.commit.interval.ms", "1000"); + props.setProperty("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); + props.setProperty("value.serializer", "org.apache.kafka.common.serialization.ByteArraySerializer"); + props.setProperty("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); + props.setProperty("value.deserializer", "org.apache.kafka.common.serialization.ByteArrayDeserializer"); + props.setProperty("group.id", "burrito"); + props.setProperty("auto.offset.reset", "earliest"); + props.setProperty("client.id", "burrito"); + return props; + } +} diff --git a/samoa-api/src/test/java/org/apache/samoa/streams/kafka/topology/SimpleComponentFactory.java b/samoa-api/src/test/java/org/apache/samoa/streams/kafka/topology/SimpleComponentFactory.java new file mode 100644 index 00000000..202833ea --- /dev/null +++ b/samoa-api/src/test/java/org/apache/samoa/streams/kafka/topology/SimpleComponentFactory.java @@ -0,0 +1,53 @@ +package org.apache.samoa.streams.kafka.topology; + +/* + * #%L + * SAMOA + * %% + * Copyright (C) 2014 - 2015 Apache Software Foundation + * %% + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * #L% + */ + +import org.apache.samoa.core.EntranceProcessor; +import org.apache.samoa.core.Processor; +import org.apache.samoa.topology.ComponentFactory; +import org.apache.samoa.topology.EntranceProcessingItem; +import org.apache.samoa.topology.IProcessingItem; +import org.apache.samoa.topology.ProcessingItem; +import org.apache.samoa.topology.Stream; +import org.apache.samoa.topology.Topology; + +public class SimpleComponentFactory implements ComponentFactory { + + public ProcessingItem createPi(Processor processor, int paralellism) { + return new SimpleProcessingItem(processor, paralellism); + } + + public ProcessingItem createPi(Processor processor) { + return this.createPi(processor, 1); + } + + public EntranceProcessingItem createEntrancePi(EntranceProcessor processor) { + return new SimpleEntranceProcessingItem(processor); + } + + public Stream createStream(IProcessingItem sourcePi) { + return new SimpleStream(sourcePi); + } + + public Topology createTopology(String topoName) { + return new SimpleTopology(topoName); + } +} diff --git a/samoa-api/src/test/java/org/apache/samoa/streams/kafka/topology/SimpleEngine.java b/samoa-api/src/test/java/org/apache/samoa/streams/kafka/topology/SimpleEngine.java new file mode 100644 index 00000000..338444b7 --- /dev/null +++ b/samoa-api/src/test/java/org/apache/samoa/streams/kafka/topology/SimpleEngine.java @@ -0,0 +1,37 @@ +/* + * To change this template, choose Tools | Templates + * and open the template in the editor. + */ +package org.apache.samoa.streams.kafka.topology; + +/* + * #%L + * SAMOA + * %% + * Copyright (C) 2014 - 2015 Apache Software Foundation + * %% + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * #L% + */ + +import org.apache.samoa.topology.Topology; + +public class SimpleEngine { + + public static void submitTopology(Topology topology) { + SimpleTopology simpleTopology = (SimpleTopology) topology; + simpleTopology.run(); + // runs until completion + } + +} diff --git a/samoa-api/src/test/java/org/apache/samoa/streams/kafka/topology/SimpleEntranceProcessingItem.java b/samoa-api/src/test/java/org/apache/samoa/streams/kafka/topology/SimpleEntranceProcessingItem.java new file mode 100644 index 00000000..26ed4710 --- /dev/null +++ b/samoa-api/src/test/java/org/apache/samoa/streams/kafka/topology/SimpleEntranceProcessingItem.java @@ -0,0 +1,33 @@ +package org.apache.samoa.streams.kafka.topology; + +/* + * #%L + * SAMOA + * %% + * Copyright (C) 2014 - 2015 Apache Software Foundation + * %% + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * #L% + */ + +import org.apache.samoa.core.EntranceProcessor; +import org.apache.samoa.topology.LocalEntranceProcessingItem; + +class SimpleEntranceProcessingItem extends LocalEntranceProcessingItem { + public SimpleEntranceProcessingItem(EntranceProcessor processor) { + super(processor); + } + + // The default waiting time when there is no available events is 100ms + // Override waitForNewEvents() to change it +} diff --git a/samoa-api/src/test/java/org/apache/samoa/streams/kafka/topology/SimpleProcessingItem.java b/samoa-api/src/test/java/org/apache/samoa/streams/kafka/topology/SimpleProcessingItem.java new file mode 100644 index 00000000..bac03981 --- /dev/null +++ b/samoa-api/src/test/java/org/apache/samoa/streams/kafka/topology/SimpleProcessingItem.java @@ -0,0 +1,87 @@ +/* + * To change this template, choose Tools | Templates + * and open the template in the editor. + */ +package org.apache.samoa.streams.kafka.topology; + +/* + * #%L + * SAMOA + * %% + * Copyright (C) 2014 - 2015 Apache Software Foundation + * %% + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * #L% + */ + +import org.apache.samoa.core.ContentEvent; +import org.apache.samoa.core.Processor; +import org.apache.samoa.topology.AbstractProcessingItem; +import org.apache.samoa.topology.IProcessingItem; +import org.apache.samoa.topology.ProcessingItem; +import org.apache.samoa.topology.Stream; +import org.apache.samoa.utils.PartitioningScheme; +import org.apache.samoa.utils.StreamDestination; + +/** + * + * @author abifet + */ +class SimpleProcessingItem extends AbstractProcessingItem { + private IProcessingItem[] arrayProcessingItem; + + SimpleProcessingItem(Processor processor) { + super(processor); + } + + SimpleProcessingItem(Processor processor, int parallelism) { + super(processor); + this.setParallelism(parallelism); + } + + public IProcessingItem getProcessingItem(int i) { + return arrayProcessingItem[i]; + } + + @Override + protected ProcessingItem addInputStream(Stream inputStream, PartitioningScheme scheme) { + StreamDestination destination = new StreamDestination(this, this.getParallelism(), scheme); + ((SimpleStream) inputStream).addDestination(destination); + return this; + } + + public SimpleProcessingItem copy() { + Processor processor = this.getProcessor(); + return new SimpleProcessingItem(processor.newProcessor(processor)); + } + + public void processEvent(ContentEvent event, int counter) { + + int parallelism = this.getParallelism(); + // System.out.println("Process event "+event+" (isLast="+event.isLastEvent()+") with counter="+counter+" while parallelism="+parallelism); + if (this.arrayProcessingItem == null && parallelism > 0) { + // Init processing elements, the first time they are needed + this.arrayProcessingItem = new IProcessingItem[parallelism]; + for (int j = 0; j < parallelism; j++) { + arrayProcessingItem[j] = this.copy(); + arrayProcessingItem[j].getProcessor().onCreate(j); + } + } + if (this.arrayProcessingItem != null) { + IProcessingItem pi = this.getProcessingItem(counter); + Processor p = pi.getProcessor(); + // System.out.println("PI="+pi+", p="+p); + this.getProcessingItem(counter).getProcessor().process(event); + } + } +} diff --git a/samoa-api/src/test/java/org/apache/samoa/streams/kafka/topology/SimpleStream.java b/samoa-api/src/test/java/org/apache/samoa/streams/kafka/topology/SimpleStream.java new file mode 100644 index 00000000..8405463c --- /dev/null +++ b/samoa-api/src/test/java/org/apache/samoa/streams/kafka/topology/SimpleStream.java @@ -0,0 +1,95 @@ +/* + * To change this template, choose Tools | Templates + * and open the template in the editor. + */ +package org.apache.samoa.streams.kafka.topology; + +/* + * #%L + * SAMOA + * %% + * Copyright (C) 2014 - 2015 Apache Software Foundation + * %% + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * #L% + */ + +import java.util.LinkedList; +import java.util.List; + +import org.apache.commons.lang3.builder.HashCodeBuilder; +import org.apache.samoa.core.ContentEvent; +import org.apache.samoa.topology.AbstractStream; +import org.apache.samoa.topology.IProcessingItem; +import org.apache.samoa.utils.StreamDestination; + +/** + * + * @author abifet + */ +class SimpleStream extends AbstractStream { + private List destinations; + private int maxCounter; + private int eventCounter; + + SimpleStream(IProcessingItem sourcePi) { + super(sourcePi); + this.destinations = new LinkedList<>(); + this.eventCounter = 0; + this.maxCounter = 1; + } + + private int getNextCounter() { + if (maxCounter > 0 && eventCounter >= maxCounter) + eventCounter = 0; + this.eventCounter++; + return this.eventCounter; + } + + @Override + public void put(ContentEvent event) { + this.put(event, this.getNextCounter()); + } + + private void put(ContentEvent event, int counter) { + SimpleProcessingItem pi; + int parallelism; + for (StreamDestination destination : destinations) { + pi = (SimpleProcessingItem) destination.getProcessingItem(); + parallelism = destination.getParallelism(); + switch (destination.getPartitioningScheme()) { + case SHUFFLE: + pi.processEvent(event, counter % parallelism); + break; + case GROUP_BY_KEY: + HashCodeBuilder hb = new HashCodeBuilder(); + hb.append(event.getKey()); + int key = hb.build() % parallelism; + pi.processEvent(event, key); + break; + case BROADCAST: + for (int p = 0; p < parallelism; p++) { + pi.processEvent(event, p); + } + break; + } + } + } + + public void addDestination(StreamDestination destination) { + this.destinations.add(destination); + if (maxCounter <= 0) + maxCounter = 1; + maxCounter *= destination.getParallelism(); + } +} diff --git a/samoa-api/src/test/java/org/apache/samoa/streams/kafka/topology/SimpleTopology.java b/samoa-api/src/test/java/org/apache/samoa/streams/kafka/topology/SimpleTopology.java new file mode 100644 index 00000000..d298b695 --- /dev/null +++ b/samoa-api/src/test/java/org/apache/samoa/streams/kafka/topology/SimpleTopology.java @@ -0,0 +1,46 @@ +/* + * To change this template, choose Tools | Templates + * and open the template in the editor. + */ +package org.apache.samoa.streams.kafka.topology; + +/* + * #%L + * SAMOA + * %% + * Copyright (C) 2014 - 2015 Apache Software Foundation + * %% + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * #L% + */ + +import org.apache.samoa.topology.AbstractTopology; + +public class SimpleTopology extends AbstractTopology { + SimpleTopology(String name) { + super(name); + } + + public void run() { + if (this.getEntranceProcessingItems() == null) + throw new IllegalStateException("You need to set entrance PI before running the topology."); + if (this.getEntranceProcessingItems().size() != 1) + throw new IllegalStateException("SimpleTopology supports 1 entrance PI only. Number of entrance PIs is " + + this.getEntranceProcessingItems().size()); + + SimpleEntranceProcessingItem entrancePi = (SimpleEntranceProcessingItem) this.getEntranceProcessingItems() + .toArray()[0]; + entrancePi.getProcessor().onCreate(0); // id=0 as it is not used in simple mode + entrancePi.startSendingEvents(); + } +}