diff --git a/pom.xml b/pom.xml
index c1d552f3..03f51114 100644
--- a/pom.xml
+++ b/pom.xml
@@ -108,10 +108,17 @@ under the License.
hadoop-hdfs
2.6.1
+
+ org.scala-lang
+ scala-library
+ ${scala.version}
+
+ 2.10
+ 2.10.4
UTF-8
0.10.0-SNAPSHOT
@@ -168,9 +175,9 @@ under the License.
-
+
+ src/main/scala
+
+ org.scala-tools
+ maven-scala-plugin
+ 2.15.0
+
+
+
+ compile
+ testCompile
+
+
+
+
org.apache.maven.plugins
maven-compiler-plugin
@@ -204,7 +225,7 @@ under the License.
1.7
-
+
maven-assembly-plugin
diff --git a/src/main/assembly/src.xml b/src/main/assembly/src.xml
index f57fee2a..17373026 100644
--- a/src/main/assembly/src.xml
+++ b/src/main/assembly/src.xml
@@ -36,6 +36,16 @@
+
+ ${basedir}/src/main/config/magnetic-feed.properties
+ config
+ true
+
+
+ ${basedir}/src/main/config/magnetic-join.properties
+ config
+ true
+
${basedir}/src/main/config/wikipedia-feed.properties
config
diff --git a/src/main/config/magnetic-feed.properties b/src/main/config/magnetic-feed.properties
new file mode 100644
index 00000000..ca62a888
--- /dev/null
+++ b/src/main/config/magnetic-feed.properties
@@ -0,0 +1,27 @@
+# Job
+job.factory.class=org.apache.samza.job.yarn.YarnJobFactory
+job.name=magnetic-feed
+
+# YARN
+yarn.package.path=file://${basedir}/target/${project.artifactId}-${pom.version}-dist.tar.gz
+
+# Task
+task.class=com.magnetic.streaming.samza.MagneticEventsFeedStreamTask
+task.inputs=kafka.imp-raw,kafka.bid-raw
+
+# Serializers
+serializers.registry.json.class=org.apache.samza.serializers.JsonSerdeFactory
+serializers.registry.string.class=org.apache.samza.serializers.StringSerdeFactory
+
+# Kafka Systems
+systems.kafka.samza.factory=org.apache.samza.system.kafka.KafkaSystemFactory
+systems.kafka.samza.key.serde=string
+systems.kafka.samza.msg.serde=json
+systems.kafka.streams.imp-raw.samza.msg.serde=string
+systems.kafka.streams.bid-raw.samza.msg.serde=string
+systems.kafka.consumer.zookeeper.connect=localhost:2181/
+systems.kafka.producer.bootstrap.servers=localhost:9092
+
+# Job Coordinator
+job.coordinator.system=kafka
+job.coordinator.replication.factor=1
diff --git a/src/main/config/magnetic-join.properties b/src/main/config/magnetic-join.properties
new file mode 100644
index 00000000..7a539258
--- /dev/null
+++ b/src/main/config/magnetic-join.properties
@@ -0,0 +1,40 @@
+# Job
+job.factory.class=org.apache.samza.job.yarn.YarnJobFactory
+job.name=magnetic-join
+
+# YARN
+yarn.package.path=file://${basedir}/target/${project.artifactId}-${pom.version}-dist.tar.gz
+
+# Task
+task.class=com.magnetic.streaming.samza.MagneticJoinStreamsTask
+task.inputs=kafka.imp-meta,kafka.bid-meta
+# Call the window() method every hour (actual window size defined in window method)
+task.window.ms=3600000
+
+# Serializers
+serializers.registry.json.class=org.apache.samza.serializers.JsonSerdeFactory
+serializers.registry.string.class=org.apache.samza.serializers.StringSerdeFactory
+
+# Kafka System
+systems.kafka.samza.factory=org.apache.samza.system.kafka.KafkaSystemFactory
+systems.kafka.samza.key.serde=string
+systems.kafka.samza.msg.serde=json
+systems.kafka.consumer.zookeeper.connect=localhost:2181/
+systems.kafka.producer.bootstrap.servers=localhost:9092
+
+# KV Store
+stores.imp-store.factory=org.apache.samza.storage.kv.RocksDbKeyValueStorageEngineFactory
+stores.imp-store.changelog=kafka.imp-store-changelog
+stores.imp-store.changelog.replication.factor=1
+stores.imp-store.key.serde=string
+stores.imp-store.msg.serde=json
+
+stores.bid-store.factory=org.apache.samza.storage.kv.RocksDbKeyValueStorageEngineFactory
+stores.bid-store.changelog=kafka.bid-store-changelog
+stores.bid-store.changelog.replication.factor=1
+stores.bid-store.key.serde=string
+stores.bid-store.msg.serde=json
+
+# Job Coordinator
+job.coordinator.system=kafka
+job.coordinator.replication.factor=1
diff --git a/src/main/scala/README.md b/src/main/scala/README.md
new file mode 100644
index 00000000..8bc63267
--- /dev/null
+++ b/src/main/scala/README.md
@@ -0,0 +1,94 @@
+# Magnetic Imp-Bid Join PoC using Samza
+
+###Intro
+- Repartition impressions and bids by the same key (auction id) and send them to related topic/partition.
+- That way n-th partition of impressions topic will have the same auctions as n-th partition of bids topic.
+- When reading those topics, Samza will provide data from n-th partition on both topics to the same task,
+i.e. all the necessary information to make a join will end up on the same machine/process.
+- Samza's local state storage (KV store) provides a lookup mechanism to make a join.
+- Increasing amount of partitions/concurrent tasks allows the join to scale linearly (nothing-share architecture).
+- Tested on local Samza grid
+
+###TODO (See https://github.com/staslos/samza-hello-samza/tree/imp_bid_join_cdh)
+- Deploy to Hadoop cluster and test at scale
+- Performance of the state storage for lookups, size of the data we can hold (KV storage works well on SSD,
+but can suffer on regular HDD)
+- Performance of the state storage at clean up since it pauses main processing,
+i.e. window() method blocks process() method
+- Restoring from checkpoint; can we control read rates so two streams stay more or less aligned?
+- Replays from Kafka at some time interval; need to maintain timestamp->offsets/topic/partition information
+- Replays from s3? If we can keep big enough window, it's easier that with Spark,
+because data streams alignment is not so critical
+
+### Current state PoC
+Use following commands to run PoC locally.
+
+```
+# For the first time run to set up Samza environment
+bin/grid bootstrap
+
+# Create Kafka topics (some of them created automatically, but here we do it explicitly)
+sh deploy/kafka/bin/kafka-topics.sh --create --topic imp-raw --zookeeper localhost:2181 --partitions 1 --replication 1
+sh deploy/kafka/bin/kafka-topics.sh --create --topic bid-raw --zookeeper localhost:2181 --partitions 1 --replication 1
+sh deploy/kafka/bin/kafka-topics.sh --create --topic imp-meta --zookeeper localhost:2181 --partitions 4 --replication 1
+sh deploy/kafka/bin/kafka-topics.sh --create --topic bid-meta --zookeeper localhost:2181 --partitions 4 --replication 1
+sh deploy/kafka/bin/kafka-topics.sh --create --topic imp-error --zookeeper localhost:2181 --partitions 1 --replication 1
+sh deploy/kafka/bin/kafka-topics.sh --create --topic bid-error --zookeeper localhost:2181 --partitions 1 --replication 1
+sh deploy/kafka/bin/kafka-topics.sh --create --topic imp-bid-joined --zookeeper localhost:2181 --partitions 1 --replication 1
+sh deploy/kafka/bin/kafka-topics.sh --create --topic imp-store-changelog --zookeeper localhost:2181 --partitions 4 --replication 1
+sh deploy/kafka/bin/kafka-topics.sh --create --topic bid-store-changelog --zookeeper localhost:2181 --partitions 4 --replication 1
+sh deploy/kafka/bin/kafka-topics.sh --list --zookeeper localhost:2181
+
+# Start the grid
+bin/grid start all
+
+# Build and deploy project
+mvn clean package
+rm -rf deploy/samza
+mkdir -p deploy/samza
+tar -xvf ./target/hello-samza-0.10.0-dist.tar.gz -C deploy/samza
+
+# Start raw events repartition
+deploy/samza/bin/run-job.sh --config-factory=org.apache.samza.config.factories.PropertiesConfigFactory \
+ --config-path=file://$PWD/deploy/samza/config/magnetic-feed.properties
+# Start join process
+deploy/samza/bin/run-job.sh --config-factory=org.apache.samza.config.factories.PropertiesConfigFactory \
+ --config-path=file://$PWD/deploy/samza/config/magnetic-join.properties
+
+# Logs can be found:
+tail -100f deploy/yarn/logs/userlogs/application_XXXXXXXXXX_XXXX/container_XXXXXXXXXX_XXXX_XX_XXXXXX/{logs}
+
+# Start Kafka concole producer to submit some ad logs
+sh deploy/kafka/bin/kafka-console-producer.sh --broker-list localhost:9092 --topic imp-raw
+
+# Copy paste ad log event(impression)
+ V=magnetic.domdex.com t=[10/Dec/2015:00:00:00 +0000] a= u=- c=c4706fc6df6f48b683d6aca71863f99f m=GET l=/ahtm q=js=t&r=c&b=39634&c=57391&n=9468&id=650e33b95a1449705601&sz=728x90&s=onetravel.com&u=c4706fc6df6f48b683d6aca71863f99f&f=1&cat=00-00&ms=558536&kw=&kwcat=&dp=&a=VmjAfwAOX7AUNL2pBW_4_aHw4x_o6q1Wy3wCYA s=200 b=2849 r=http://www.onetravel.com/ a0=2601:346:404:4e50:b090:77f3:4343:fbc1 ua=Mozilla/4.0 (compatible; MSIE 8.0; Windows NT 6.1; WOW64; Trident/4.0; SLCC2; .NET CLR 2.0.50727; .NET CLR 3.5.30729; .NET CLR 3.0.30729; Media Center PC 6.0; .NET4.0C; .NET4.0E; InfoPath.3) d=1570 rpt=ahtm x=
+
+# If matching bid log is not submitted, adding those impressions will lead to cleanup of previous one from imp-store
+# when next time window() function runs (you can see this happen by tailing imp-store-changelog topic, it's delayed, so be patient)
+ V=magnetic.domdex.com t=[10/Dec/2015:01:00:01 +0000] a= u=- c=c4706fc6df6f48b683d6aca71863f99f m=GET l=/ahtm q=js=t&r=c&b=39634&c=57391&n=9468&id=650e33b95a1449705611&sz=728x90&s=onetravel.com&u=c4706fc6df6f48b683d6aca71863f99f&f=1&cat=00-00&ms=558536&kw=&kwcat=&dp=&a=VmjAfwAOX7AUNL2pBW_4_aHw4x_o6q1Wy3wCYA s=200 b=2849 r=http://www.onetravel.com/ a0=2601:346:404:4e50:b090:77f3:4343:fbc1 ua=Mozilla/4.0 (compatible; MSIE 8.0; Windows NT 6.1; WOW64; Trident/4.0; SLCC2; .NET CLR 2.0.50727; .NET CLR 3.5.30729; .NET CLR 3.0.30729; Media Center PC 6.0; .NET4.0C; .NET4.0E; InfoPath.3) d=1570 rpt=ahtm x=
+ V=magnetic.domdex.com t=[10/Dec/2015:01:00:01 +0000] a= u=- c=c4706fc6df6f48b683d6aca71863f99f m=GET l=/ahtm q=js=t&r=c&b=39634&c=57391&n=9468&id=650e33b95a1449705621&sz=728x90&s=onetravel.com&u=c4706fc6df6f48b683d6aca71863f99f&f=1&cat=00-00&ms=558536&kw=&kwcat=&dp=&a=VmjAfwAOX7AUNL2pBW_4_aHw4x_o6q1Wy3wCYA s=200 b=2849 r=http://www.onetravel.com/ a0=2601:346:404:4e50:b090:77f3:4343:fbc1 ua=Mozilla/4.0 (compatible; MSIE 8.0; Windows NT 6.1; WOW64; Trident/4.0; SLCC2; .NET CLR 2.0.50727; .NET CLR 3.5.30729; .NET CLR 3.0.30729; Media Center PC 6.0; .NET4.0C; .NET4.0E; InfoPath.3) d=1570 rpt=ahtm x=
+ V=magnetic.domdex.com t=[10/Dec/2015:01:00:01 +0000] a= u=- c=c4706fc6df6f48b683d6aca71863f99f m=GET l=/ahtm q=js=t&r=c&b=39634&c=57391&n=9468&id=650e33b95a1449705631&sz=728x90&s=onetravel.com&u=c4706fc6df6f48b683d6aca71863f99f&f=1&cat=00-00&ms=558536&kw=&kwcat=&dp=&a=VmjAfwAOX7AUNL2pBW_4_aHw4x_o6q1Wy3wCYA s=200 b=2849 r=http://www.onetravel.com/ a0=2601:346:404:4e50:b090:77f3:4343:fbc1 ua=Mozilla/4.0 (compatible; MSIE 8.0; Windows NT 6.1; WOW64; Trident/4.0; SLCC2; .NET CLR 2.0.50727; .NET CLR 3.5.30729; .NET CLR 3.0.30729; Media Center PC 6.0; .NET4.0C; .NET4.0E; InfoPath.3) d=1570 rpt=ahtm x=
+ V=magnetic.domdex.com t=[10/Dec/2015:01:00:01 +0000] a= u=- c=c4706fc6df6f48b683d6aca71863f99f m=GET l=/ahtm q=js=t&r=c&b=39634&c=57391&n=9468&id=650e33b95a1449705641&sz=728x90&s=onetravel.com&u=c4706fc6df6f48b683d6aca71863f99f&f=1&cat=00-00&ms=558536&kw=&kwcat=&dp=&a=VmjAfwAOX7AUNL2pBW_4_aHw4x_o6q1Wy3wCYA s=200 b=2849 r=http://www.onetravel.com/ a0=2601:346:404:4e50:b090:77f3:4343:fbc1 ua=Mozilla/4.0 (compatible; MSIE 8.0; Windows NT 6.1; WOW64; Trident/4.0; SLCC2; .NET CLR 2.0.50727; .NET CLR 3.5.30729; .NET CLR 3.0.30729; Media Center PC 6.0; .NET4.0C; .NET4.0E; InfoPath.3) d=1570 rpt=ahtm x=
+ V=magnetic.domdex.com t=[10/Dec/2015:01:00:01 +0000] a= u=- c=c4706fc6df6f48b683d6aca71863f99f m=GET l=/ahtm q=js=t&r=c&b=39634&c=57391&n=9468&id=650e33b95a1449705651&sz=728x90&s=onetravel.com&u=c4706fc6df6f48b683d6aca71863f99f&f=1&cat=00-00&ms=558536&kw=&kwcat=&dp=&a=VmjAfwAOX7AUNL2pBW_4_aHw4x_o6q1Wy3wCYA s=200 b=2849 r=http://www.onetravel.com/ a0=2601:346:404:4e50:b090:77f3:4343:fbc1 ua=Mozilla/4.0 (compatible; MSIE 8.0; Windows NT 6.1; WOW64; Trident/4.0; SLCC2; .NET CLR 2.0.50727; .NET CLR 3.5.30729; .NET CLR 3.0.30729; Media Center PC 6.0; .NET4.0C; .NET4.0E; InfoPath.3) d=1570 rpt=ahtm x=
+ V=magnetic.domdex.com t=[10/Dec/2015:01:00:01 +0000] a= u=- c=c4706fc6df6f48b683d6aca71863f99f m=GET l=/ahtm q=js=t&r=c&b=39634&c=57391&n=9468&id=650e33b95a1449705661&sz=728x90&s=onetravel.com&u=c4706fc6df6f48b683d6aca71863f99f&f=1&cat=00-00&ms=558536&kw=&kwcat=&dp=&a=VmjAfwAOX7AUNL2pBW_4_aHw4x_o6q1Wy3wCYA s=200 b=2849 r=http://www.onetravel.com/ a0=2601:346:404:4e50:b090:77f3:4343:fbc1 ua=Mozilla/4.0 (compatible; MSIE 8.0; Windows NT 6.1; WOW64; Trident/4.0; SLCC2; .NET CLR 2.0.50727; .NET CLR 3.5.30729; .NET CLR 3.0.30729; Media Center PC 6.0; .NET4.0C; .NET4.0E; InfoPath.3) d=1570 rpt=ahtm x=
+ V=magnetic.domdex.com t=[10/Dec/2015:01:00:01 +0000] a= u=- c=c4706fc6df6f48b683d6aca71863f99f m=GET l=/ahtm q=js=t&r=c&b=39634&c=57391&n=9468&id=650e33b95a1449705671&sz=728x90&s=onetravel.com&u=c4706fc6df6f48b683d6aca71863f99f&f=1&cat=00-00&ms=558536&kw=&kwcat=&dp=&a=VmjAfwAOX7AUNL2pBW_4_aHw4x_o6q1Wy3wCYA s=200 b=2849 r=http://www.onetravel.com/ a0=2601:346:404:4e50:b090:77f3:4343:fbc1 ua=Mozilla/4.0 (compatible; MSIE 8.0; Windows NT 6.1; WOW64; Trident/4.0; SLCC2; .NET CLR 2.0.50727; .NET CLR 3.5.30729; .NET CLR 3.0.30729; Media Center PC 6.0; .NET4.0C; .NET4.0E; InfoPath.3) d=1570 rpt=ahtm x=
+ V=magnetic.domdex.com t=[10/Dec/2015:01:00:01 +0000] a= u=- c=c4706fc6df6f48b683d6aca71863f99f m=GET l=/ahtm q=js=t&r=c&b=39634&c=57391&n=9468&id=650e33b95a1449705681&sz=728x90&s=onetravel.com&u=c4706fc6df6f48b683d6aca71863f99f&f=1&cat=00-00&ms=558536&kw=&kwcat=&dp=&a=VmjAfwAOX7AUNL2pBW_4_aHw4x_o6q1Wy3wCYA s=200 b=2849 r=http://www.onetravel.com/ a0=2601:346:404:4e50:b090:77f3:4343:fbc1 ua=Mozilla/4.0 (compatible; MSIE 8.0; Windows NT 6.1; WOW64; Trident/4.0; SLCC2; .NET CLR 2.0.50727; .NET CLR 3.5.30729; .NET CLR 3.0.30729; Media Center PC 6.0; .NET4.0C; .NET4.0E; InfoPath.3) d=1570 rpt=ahtm x=
+ V=magnetic.domdex.com t=[10/Dec/2015:01:00:01 +0000] a= u=- c=c4706fc6df6f48b683d6aca71863f99f m=GET l=/ahtm q=js=t&r=c&b=39634&c=57391&n=9468&id=650e33b95a1449705691&sz=728x90&s=onetravel.com&u=c4706fc6df6f48b683d6aca71863f99f&f=1&cat=00-00&ms=558536&kw=&kwcat=&dp=&a=VmjAfwAOX7AUNL2pBW_4_aHw4x_o6q1Wy3wCYA s=200 b=2849 r=http://www.onetravel.com/ a0=2601:346:404:4e50:b090:77f3:4343:fbc1 ua=Mozilla/4.0 (compatible; MSIE 8.0; Windows NT 6.1; WOW64; Trident/4.0; SLCC2; .NET CLR 2.0.50727; .NET CLR 3.5.30729; .NET CLR 3.0.30729; Media Center PC 6.0; .NET4.0C; .NET4.0E; InfoPath.3) d=1570 rpt=ahtm x=
+
+# Start Kafka concole producer to submit some bid logs
+sh deploy/kafka/bin/kafka-console-producer.sh --broker-list localhost:9092 --topic bid-raw
+
+# Copy-paste matching bid log event
+1449705600 45799578e9064ca5b4e87af2aba77092 161.253.120.255 US 511 DC 20016 America/New_York 650e33b95a1449705601 5668c08c0008a5e80a1f1acb6c0f76fa g 1 thegradcafe.com 728x90 1 00-00 1054641 9115 54663 38227 54663,52593,51249,51246,55928,50856,46309,52454,32235,50944 Mozilla/5.0 (Macintosh; Intel Mac OS X 10_10_5) AppleWebKit/601.3.9 (KHTML, like Gecko) Version/9.0.2 Safari/601.3.9 http://thegradcafe.com/survey/index.php 1 875 1000 1000 en iab_tech 85 85 45 6500 45 15 25000 1000 600 1000 1000 1000 1000 1000 magnetic_ctr_variable_price 1.0 2.64151881627e-05 2015120920 -4.05492019653 2015120920 Safari Mac_OS_X 00-00 0 1 70 n_a
+
+# Monitor propagation of data thru system
+sh deploy/kafka/bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic imp-meta
+sh deploy/kafka/bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic bid-meta
+sh deploy/kafka/bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic imp-error
+sh deploy/kafka/bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic bid-error
+sh deploy/kafka/bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic imp-bid-join
+
+# Shutdown everything
+bin/grid stop all
+```
diff --git a/src/main/scala/com/magnetic/streaming/common/AdEventsParser.scala b/src/main/scala/com/magnetic/streaming/common/AdEventsParser.scala
new file mode 100644
index 00000000..5e4dd81e
--- /dev/null
+++ b/src/main/scala/com/magnetic/streaming/common/AdEventsParser.scala
@@ -0,0 +1,99 @@
+package com.magnetic.streaming.common
+
+import com.magnetic.streaming.common.ParseHelpers._
+
+object AdEventsParser {
+
+ val MAGNETIC_NETWORK_ID = 1
+
+ val IMPRESSION = "imp"
+ val CLICK = "clk"
+ val CONVERSION = "conv"
+ val CONVCOUNT = "ccount"
+ val RETARGETING = "rpx"
+ val RETARCOUNT = "rcount"
+ val ERROR = "error"
+ val SKIP = "skip"
+ val BID = "bid"
+
+ val ACLK_PATH_QS = "/aclk/(\\w+=.*)/qe=".r
+ val VALID_AUCTION_ID_PATTERN = "[\\-\\.\\w\\d]{6,50}".r
+
+ def parse_querystring(fields: Map[String, String]): Map[String, String] = {
+ fields.get("l") match {
+ case Some(ACLK_PATH_QS(aclk)) => aclk.split('/').map(split_one(_)).toMap
+ case _ =>
+ parse_qs(fields.getOrElse("q", "").stripSuffix("/"))
+ }
+ }
+
+ def parse_common(fields: Map[String, String]): (String, Long, String, Map[String, String], String) = {
+
+ val timestamp = parse_timestamp(fields.getOrElse("t", ""))
+ val querystring = parse_querystring(fields)
+ val cookie = {
+ val c = fields.getOrElse("c", "").split('|')(0)
+ if (c.length() == 32) c
+ else null
+ }
+
+ val (event_type, pixel_id) = {
+ if (fields.getOrElse("rpt", "") == "ahtm") (IMPRESSION, null)
+ else if (fields.getOrElse("rpt", "") == "aclk") (CLICK, null)
+ else (SKIP, null)
+ }
+
+ (event_type, timestamp, cookie, querystring, pixel_id)
+ }
+
+ def parse_fields(line: String):Map[String, String] = {
+ line.split("\t").withFilter(_.contains("=")).map(split_one(_)).toMap
+ }
+
+ def should_skip_impression(fields: Map[String, String], querystring: Map[String, String]): Boolean = {
+ false //TODO implement
+ }
+
+ def parse_imp_meta(line: String): Map[String, Any] = {
+ val fields = parse_fields(line)
+ val (event_type, timestamp, cookie, querystring, pixel_id) = parse_common(fields)
+ event_type match {
+ case IMPRESSION =>
+ if (should_skip_impression(fields, querystring))
+ throw new RuntimeException("Should skip impression")
+ else {
+ val network_id = _int(querystring.getOrElse("mp", MAGNETIC_NETWORK_ID.toString))
+
+ val auction_id = {
+ if (network_id == MAGNETIC_NETWORK_ID)
+ querystring("id")
+ else
+ VALID_AUCTION_ID_PATTERN.findFirstIn(querystring.getOrElse("id", "")) match {
+ case Some(x) => querystring("id")
+ case _ => s"fake-$cookie-$timestamp"
+ }
+ }
+ val d = scala.collection.mutable.Map[String, Any]()
+ d("event_type") = IMPRESSION
+ d("auction_id") = auction_id
+ d("log_timestamp") = timestamp
+ d("imp_log_line") = line
+ d.toMap
+ }
+ case _ =>
+ throw new RuntimeException("Not impression: " + event_type)
+ }
+ }
+
+ def parse_bid_meta(line: String): Map[String, Any] = {
+ val fields = line.split("\t", 10)
+ val auction_id = fields(8)
+ val timestamp = fields(0).toLong
+ val d = scala.collection.mutable.Map[String, Any]()
+ d("event_type") = BID
+ d("auction_id") = auction_id
+ d("log_timestamp") = timestamp
+ d("bid_log_line") = line
+ d.toMap
+ }
+}
diff --git a/src/main/scala/com/magnetic/streaming/common/ParseHelpers.scala b/src/main/scala/com/magnetic/streaming/common/ParseHelpers.scala
new file mode 100644
index 00000000..1e2cee10
--- /dev/null
+++ b/src/main/scala/com/magnetic/streaming/common/ParseHelpers.scala
@@ -0,0 +1,36 @@
+package com.magnetic.streaming.common
+
+import java.net.{URL, URLDecoder}
+import java.text.SimpleDateFormat
+import java.util.TimeZone
+
+import scala.util.Try
+
+object ParseHelpers {
+
+ def _int(s: String): java.lang.Integer = {
+ Try {s.toInt}.toOption match {
+ case Some(i) => i
+ case _ => null
+ }
+ }
+
+ def split_one(s: String, delim: String = "="): (String, String) = {
+ s.split(delim, 2).toList match {
+ case k :: Nil => k -> ""
+ case k :: v :: Nil => k -> v
+ case _ => "" -> ""
+ }
+ }
+
+ def parse_timestamp(raw_timestamp: String): Long = {
+ Try {new SimpleDateFormat("[dd/MMM/yyyy:HH:mm:ss Z]").parse(raw_timestamp)}.toOption match {
+ case Some(t) => t.getTime/1000
+ case _ => 0
+ }
+ }
+
+ def parse_qs(q:String): Map[String,String] = {
+ URLDecoder.decode(q).split("&").flatMap(_.split(";")).map(_.trim).withFilter(_.length > 0).map(split_one(_)).toMap
+ }
+}
\ No newline at end of file
diff --git a/src/main/scala/com/magnetic/streaming/samza/MagneticEventsFeedStreamTask.scala b/src/main/scala/com/magnetic/streaming/samza/MagneticEventsFeedStreamTask.scala
new file mode 100644
index 00000000..4753547f
--- /dev/null
+++ b/src/main/scala/com/magnetic/streaming/samza/MagneticEventsFeedStreamTask.scala
@@ -0,0 +1,93 @@
+package com.magnetic.streaming.samza
+
+import scala.collection.JavaConversions._
+import com.magnetic.streaming.common.AdEventsParser
+import org.apache.samza.Partition
+import org.apache.samza.system.IncomingMessageEnvelope
+import org.apache.samza.system.OutgoingMessageEnvelope
+import org.apache.samza.system.SystemStream
+import org.apache.samza.system.SystemStreamPartition
+import org.apache.samza.task.MessageCollector
+import org.apache.samza.task.StreamTask
+import org.apache.samza.task.TaskCoordinator
+import org.apache.samza.task.TaskCoordinator.RequestScope
+
+import scala.util.{Failure, Success, Try}
+
+/**
+ * Reads raw impressions and bids from Kafka.
+ * Does minor parsing to extract meta information, like auction id and log timestamp.
+ * Then sends event to the appropriate topic/partition.
+ * Chooses partition, by hashing auction id.
+ */
+class MagneticEventsFeedStreamTask extends StreamTask {
+ val NUM_PARTITIONS = 4 // Equals to the number of partitions in target topic
+ val IMP_OUTPUT_STREAM = new SystemStream("kafka", "imp-meta")
+ val BID_OUTPUT_STREAM = new SystemStream("kafka", "bid-meta")
+ val IMP_ERROR_STREAM = new SystemStream("kafka", "imp-error")
+ val BID_ERROR_STREAM = new SystemStream("kafka", "bid-error")
+
+ def getPartitionKey(key: String) = {
+ key.hashCode() % NUM_PARTITIONS
+ }
+
+ def send(event: Map[String, Any], collector: MessageCollector, system: SystemStream) {
+ val auctionId = event("auction_id").asInstanceOf[String]
+ collector.send(
+ new OutgoingMessageEnvelope(system, getPartitionKey(auctionId), auctionId, mapAsJavaMap(event))
+ )
+ }
+
+ def sendError(rawEvent: String, ex: Throwable, collector: MessageCollector, system: SystemStream) {
+ val error = Map("event_type" -> AdEventsParser.ERROR, "log_line" -> rawEvent, "exception" -> ex.getMessage)
+ collector.send(
+ new OutgoingMessageEnvelope(system, mapAsJavaMap(error))
+ )
+ }
+
+ override def process(envelope: IncomingMessageEnvelope, collector: MessageCollector, coordinator: TaskCoordinator) {
+ val rawEvent = envelope.getMessage.asInstanceOf[String]
+ envelope.getSystemStreamPartition.getSystemStream.getStream match {
+ case "imp-raw" =>
+ Try(AdEventsParser.parse_imp_meta(rawEvent)) match {
+ case Success(metaEvent) => send(metaEvent, collector, IMP_OUTPUT_STREAM)
+ case Failure(exception) => sendError(rawEvent, exception, collector, IMP_ERROR_STREAM)
+ }
+ case "bid-raw" =>
+ Try(AdEventsParser.parse_bid_meta(rawEvent)) match {
+ case Success(metaEvent) => send(metaEvent, collector, BID_OUTPUT_STREAM)
+ case Failure(exception) => sendError(rawEvent, exception, collector, BID_ERROR_STREAM)
+ }
+ case notSupportedStream =>
+ throw new RuntimeException(s"Not supported stream: $notSupportedStream")
+ }
+ }
+}
+
+/**
+ * TODO remove this and create a proper unit test with mock objects
+ */
+object HelloMagneticSamza {
+ def main(args: Array[String]): Unit = {
+ val t = new MagneticEventsFeedStreamTask()
+ val system = new SystemStream("bla", "imp-raw")
+ val partition = new SystemStreamPartition(system, new Partition(0))
+ val envelope = new IncomingMessageEnvelope(partition, "", "key", "msg")
+ val collector = new MessageCollector() {
+ def send(envelope: OutgoingMessageEnvelope) {
+ println("Send method called:" + envelope.getSystemStream.getStream)
+ println("Message:" + envelope.getMessage)
+ }
+ }
+ val coordinator = new TaskCoordinator() {
+ def commit(requestScope: RequestScope) {
+ println("Commit method called")
+ }
+
+ def shutdown(requestScope: RequestScope) {
+ System.out.println("Shautdown method called")
+ }
+ }
+ t.process(envelope, collector, coordinator)
+ }
+}
\ No newline at end of file
diff --git a/src/main/scala/com/magnetic/streaming/samza/MagneticJoinStreamsTask.scala b/src/main/scala/com/magnetic/streaming/samza/MagneticJoinStreamsTask.scala
new file mode 100644
index 00000000..46a32a84
--- /dev/null
+++ b/src/main/scala/com/magnetic/streaming/samza/MagneticJoinStreamsTask.scala
@@ -0,0 +1,88 @@
+package com.magnetic.streaming.samza
+
+import scala.collection.JavaConversions._
+import com.magnetic.streaming.common.AdEventsParser.IMPRESSION
+import org.apache.samza.config.Config
+import org.apache.samza.storage.kv.KeyValueStore
+import org.apache.samza.system.{OutgoingMessageEnvelope, SystemStream, IncomingMessageEnvelope}
+import org.apache.samza.task._
+
+/**
+ * Reads impressions and bids pre-parsed by previous step.
+ * Since they were partitioned by auction id, ims and bids with the same key end up in the same task.
+ * Uses local KV store assigned for given task to lookup matching events.
+ * If match found, creates a joined event and sends it downstream.
+ * Otherwise persists event for further lookups.
+ * Periodically deletes stale events from local KV store (it blocks main processing, so performance is critical).
+ */
+class MagneticJoinStreamsTask extends StreamTask with InitableTask with WindowableTask {
+
+ val OUTPUT_STREAM = new SystemStream("kafka", "imp-bid-joined")
+ var impStore: KeyValueStore[String, java.util.Map[String, Any]] = null
+ var bidStore: KeyValueStore[String, java.util.Map[String, Any]] = null
+ var lastImpTimestamp = 0
+ var lastBidTimestamp = 0
+
+ override def init(config: Config, context: TaskContext) {
+ this.impStore = context.getStore("imp-store").asInstanceOf[KeyValueStore[String, java.util.Map[String, Any]]]
+ this.bidStore = context.getStore("bid-store").asInstanceOf[KeyValueStore[String, java.util.Map[String, Any]]]
+ }
+
+ def buildJoinedEvent(metaImp:Map[String,Any], metaBid:Map[String,Any]):Map[String,Any] = {
+ val d = scala.collection.mutable.Map[String, Any]()
+ d("event_type") = IMPRESSION
+ d("auction_id") = metaImp("auction_id")
+ d("log_timestamp") = metaImp("log_timestamp")
+ d("imp_log_line") = metaImp("imp_log_line")
+ d("bid_log_line") = metaBid("bid_log_line")
+ d.toMap
+ }
+
+ override def process(envelope: IncomingMessageEnvelope, collector: MessageCollector, coordinator: TaskCoordinator) {
+ val key = envelope.getKey.asInstanceOf[String]
+ val event = mapAsScalaMap(envelope.getMessage.asInstanceOf[java.util.Map[String,Any]]).toMap
+ envelope.getSystemStreamPartition.getSystemStream.getStream match {
+ case "imp-meta" =>
+ lastImpTimestamp = event("log_timestamp").asInstanceOf[Integer]
+ Option(bidStore.get(key)) match {
+ case Some(bid) =>
+ collector.send(
+ new OutgoingMessageEnvelope(
+ OUTPUT_STREAM, key, mapAsJavaMap(buildJoinedEvent(event, mapAsScalaMap(bid).toMap))
+ )
+ )
+ bidStore.delete(key)
+ case None => impStore.put(key, mapAsJavaMap(event))
+ }
+ case "bid-meta" =>
+ lastBidTimestamp = event("log_timestamp").asInstanceOf[Integer]
+ Option(impStore.get(key)) match {
+ case Some(imp) =>
+ collector.send(
+ new OutgoingMessageEnvelope(
+ OUTPUT_STREAM, key, mapAsJavaMap(buildJoinedEvent(mapAsScalaMap(imp).toMap, event))
+ )
+ )
+ impStore.delete(key)
+ case None => bidStore.put(key, mapAsJavaMap(event))
+ }
+ case notSupportedStream =>
+ throw new RuntimeException(s"Not supported stream: $notSupportedStream")
+ }
+ }
+
+ def cleanUpEventStore(eventStore: KeyValueStore[String, java.util.Map[String, Any]], thresholdTimestamp: Integer) {
+ val it = eventStore.all()
+ while (it.hasNext) {
+ val entry = it.next()
+ if (entry.getValue.get("log_timestamp").asInstanceOf[Integer] < thresholdTimestamp) {
+ eventStore.delete(entry.getKey)
+ }
+ }
+ }
+
+ override def window(messageCollector: MessageCollector, taskCoordinator: TaskCoordinator) {
+ cleanUpEventStore(impStore, lastImpTimestamp - 3600) //TODO Keep one hour of events. Make it configurable
+ cleanUpEventStore(bidStore, lastBidTimestamp - 3600) //TODO Keep one hour of events. Make it configurable
+ }
+}