diff --git a/pom.xml b/pom.xml index c1d552f3..03f51114 100644 --- a/pom.xml +++ b/pom.xml @@ -108,10 +108,17 @@ under the License. hadoop-hdfs 2.6.1 + + org.scala-lang + scala-library + ${scala.version} + + 2.10 + 2.10.4 UTF-8 0.10.0-SNAPSHOT @@ -168,9 +175,9 @@ under the License. - + + src/main/scala + + org.scala-tools + maven-scala-plugin + 2.15.0 + + + + compile + testCompile + + + + org.apache.maven.plugins maven-compiler-plugin @@ -204,7 +225,7 @@ under the License. 1.7 - + maven-assembly-plugin diff --git a/src/main/assembly/src.xml b/src/main/assembly/src.xml index f57fee2a..17373026 100644 --- a/src/main/assembly/src.xml +++ b/src/main/assembly/src.xml @@ -36,6 +36,16 @@ + + ${basedir}/src/main/config/magnetic-feed.properties + config + true + + + ${basedir}/src/main/config/magnetic-join.properties + config + true + ${basedir}/src/main/config/wikipedia-feed.properties config diff --git a/src/main/config/magnetic-feed.properties b/src/main/config/magnetic-feed.properties new file mode 100644 index 00000000..ca62a888 --- /dev/null +++ b/src/main/config/magnetic-feed.properties @@ -0,0 +1,27 @@ +# Job +job.factory.class=org.apache.samza.job.yarn.YarnJobFactory +job.name=magnetic-feed + +# YARN +yarn.package.path=file://${basedir}/target/${project.artifactId}-${pom.version}-dist.tar.gz + +# Task +task.class=com.magnetic.streaming.samza.MagneticEventsFeedStreamTask +task.inputs=kafka.imp-raw,kafka.bid-raw + +# Serializers +serializers.registry.json.class=org.apache.samza.serializers.JsonSerdeFactory +serializers.registry.string.class=org.apache.samza.serializers.StringSerdeFactory + +# Kafka Systems +systems.kafka.samza.factory=org.apache.samza.system.kafka.KafkaSystemFactory +systems.kafka.samza.key.serde=string +systems.kafka.samza.msg.serde=json +systems.kafka.streams.imp-raw.samza.msg.serde=string +systems.kafka.streams.bid-raw.samza.msg.serde=string +systems.kafka.consumer.zookeeper.connect=localhost:2181/ +systems.kafka.producer.bootstrap.servers=localhost:9092 + +# Job Coordinator +job.coordinator.system=kafka +job.coordinator.replication.factor=1 diff --git a/src/main/config/magnetic-join.properties b/src/main/config/magnetic-join.properties new file mode 100644 index 00000000..7a539258 --- /dev/null +++ b/src/main/config/magnetic-join.properties @@ -0,0 +1,40 @@ +# Job +job.factory.class=org.apache.samza.job.yarn.YarnJobFactory +job.name=magnetic-join + +# YARN +yarn.package.path=file://${basedir}/target/${project.artifactId}-${pom.version}-dist.tar.gz + +# Task +task.class=com.magnetic.streaming.samza.MagneticJoinStreamsTask +task.inputs=kafka.imp-meta,kafka.bid-meta +# Call the window() method every hour (actual window size defined in window method) +task.window.ms=3600000 + +# Serializers +serializers.registry.json.class=org.apache.samza.serializers.JsonSerdeFactory +serializers.registry.string.class=org.apache.samza.serializers.StringSerdeFactory + +# Kafka System +systems.kafka.samza.factory=org.apache.samza.system.kafka.KafkaSystemFactory +systems.kafka.samza.key.serde=string +systems.kafka.samza.msg.serde=json +systems.kafka.consumer.zookeeper.connect=localhost:2181/ +systems.kafka.producer.bootstrap.servers=localhost:9092 + +# KV Store +stores.imp-store.factory=org.apache.samza.storage.kv.RocksDbKeyValueStorageEngineFactory +stores.imp-store.changelog=kafka.imp-store-changelog +stores.imp-store.changelog.replication.factor=1 +stores.imp-store.key.serde=string +stores.imp-store.msg.serde=json + +stores.bid-store.factory=org.apache.samza.storage.kv.RocksDbKeyValueStorageEngineFactory +stores.bid-store.changelog=kafka.bid-store-changelog +stores.bid-store.changelog.replication.factor=1 +stores.bid-store.key.serde=string +stores.bid-store.msg.serde=json + +# Job Coordinator +job.coordinator.system=kafka +job.coordinator.replication.factor=1 diff --git a/src/main/scala/README.md b/src/main/scala/README.md new file mode 100644 index 00000000..8bc63267 --- /dev/null +++ b/src/main/scala/README.md @@ -0,0 +1,94 @@ +# Magnetic Imp-Bid Join PoC using Samza + +###Intro +- Repartition impressions and bids by the same key (auction id) and send them to related topic/partition. +- That way n-th partition of impressions topic will have the same auctions as n-th partition of bids topic. +- When reading those topics, Samza will provide data from n-th partition on both topics to the same task, +i.e. all the necessary information to make a join will end up on the same machine/process. +- Samza's local state storage (KV store) provides a lookup mechanism to make a join. +- Increasing amount of partitions/concurrent tasks allows the join to scale linearly (nothing-share architecture). +- Tested on local Samza grid + +###TODO (See https://github.com/staslos/samza-hello-samza/tree/imp_bid_join_cdh) +- Deploy to Hadoop cluster and test at scale +- Performance of the state storage for lookups, size of the data we can hold (KV storage works well on SSD, +but can suffer on regular HDD) +- Performance of the state storage at clean up since it pauses main processing, +i.e. window() method blocks process() method +- Restoring from checkpoint; can we control read rates so two streams stay more or less aligned? +- Replays from Kafka at some time interval; need to maintain timestamp->offsets/topic/partition information +- Replays from s3? If we can keep big enough window, it's easier that with Spark, +because data streams alignment is not so critical + +### Current state PoC +Use following commands to run PoC locally. + +``` +# For the first time run to set up Samza environment +bin/grid bootstrap + +# Create Kafka topics (some of them created automatically, but here we do it explicitly) +sh deploy/kafka/bin/kafka-topics.sh --create --topic imp-raw --zookeeper localhost:2181 --partitions 1 --replication 1 +sh deploy/kafka/bin/kafka-topics.sh --create --topic bid-raw --zookeeper localhost:2181 --partitions 1 --replication 1 +sh deploy/kafka/bin/kafka-topics.sh --create --topic imp-meta --zookeeper localhost:2181 --partitions 4 --replication 1 +sh deploy/kafka/bin/kafka-topics.sh --create --topic bid-meta --zookeeper localhost:2181 --partitions 4 --replication 1 +sh deploy/kafka/bin/kafka-topics.sh --create --topic imp-error --zookeeper localhost:2181 --partitions 1 --replication 1 +sh deploy/kafka/bin/kafka-topics.sh --create --topic bid-error --zookeeper localhost:2181 --partitions 1 --replication 1 +sh deploy/kafka/bin/kafka-topics.sh --create --topic imp-bid-joined --zookeeper localhost:2181 --partitions 1 --replication 1 +sh deploy/kafka/bin/kafka-topics.sh --create --topic imp-store-changelog --zookeeper localhost:2181 --partitions 4 --replication 1 +sh deploy/kafka/bin/kafka-topics.sh --create --topic bid-store-changelog --zookeeper localhost:2181 --partitions 4 --replication 1 +sh deploy/kafka/bin/kafka-topics.sh --list --zookeeper localhost:2181 + +# Start the grid +bin/grid start all + +# Build and deploy project +mvn clean package +rm -rf deploy/samza +mkdir -p deploy/samza +tar -xvf ./target/hello-samza-0.10.0-dist.tar.gz -C deploy/samza + +# Start raw events repartition +deploy/samza/bin/run-job.sh --config-factory=org.apache.samza.config.factories.PropertiesConfigFactory \ + --config-path=file://$PWD/deploy/samza/config/magnetic-feed.properties +# Start join process +deploy/samza/bin/run-job.sh --config-factory=org.apache.samza.config.factories.PropertiesConfigFactory \ + --config-path=file://$PWD/deploy/samza/config/magnetic-join.properties + +# Logs can be found: +tail -100f deploy/yarn/logs/userlogs/application_XXXXXXXXXX_XXXX/container_XXXXXXXXXX_XXXX_XX_XXXXXX/{logs} + +# Start Kafka concole producer to submit some ad logs +sh deploy/kafka/bin/kafka-console-producer.sh --broker-list localhost:9092 --topic imp-raw + +# Copy paste ad log event(impression) + V=magnetic.domdex.com t=[10/Dec/2015:00:00:00 +0000] a= u=- c=c4706fc6df6f48b683d6aca71863f99f m=GET l=/ahtm q=js=t&r=c&b=39634&c=57391&n=9468&id=650e33b95a1449705601&sz=728x90&s=onetravel.com&u=c4706fc6df6f48b683d6aca71863f99f&f=1&cat=00-00&ms=558536&kw=&kwcat=&dp=&a=VmjAfwAOX7AUNL2pBW_4_aHw4x_o6q1Wy3wCYA s=200 b=2849 r=http://www.onetravel.com/ a0=2601:346:404:4e50:b090:77f3:4343:fbc1 ua=Mozilla/4.0 (compatible; MSIE 8.0; Windows NT 6.1; WOW64; Trident/4.0; SLCC2; .NET CLR 2.0.50727; .NET CLR 3.5.30729; .NET CLR 3.0.30729; Media Center PC 6.0; .NET4.0C; .NET4.0E; InfoPath.3) d=1570 rpt=ahtm x= + +# If matching bid log is not submitted, adding those impressions will lead to cleanup of previous one from imp-store +# when next time window() function runs (you can see this happen by tailing imp-store-changelog topic, it's delayed, so be patient) + V=magnetic.domdex.com t=[10/Dec/2015:01:00:01 +0000] a= u=- c=c4706fc6df6f48b683d6aca71863f99f m=GET l=/ahtm q=js=t&r=c&b=39634&c=57391&n=9468&id=650e33b95a1449705611&sz=728x90&s=onetravel.com&u=c4706fc6df6f48b683d6aca71863f99f&f=1&cat=00-00&ms=558536&kw=&kwcat=&dp=&a=VmjAfwAOX7AUNL2pBW_4_aHw4x_o6q1Wy3wCYA s=200 b=2849 r=http://www.onetravel.com/ a0=2601:346:404:4e50:b090:77f3:4343:fbc1 ua=Mozilla/4.0 (compatible; MSIE 8.0; Windows NT 6.1; WOW64; Trident/4.0; SLCC2; .NET CLR 2.0.50727; .NET CLR 3.5.30729; .NET CLR 3.0.30729; Media Center PC 6.0; .NET4.0C; .NET4.0E; InfoPath.3) d=1570 rpt=ahtm x= + V=magnetic.domdex.com t=[10/Dec/2015:01:00:01 +0000] a= u=- c=c4706fc6df6f48b683d6aca71863f99f m=GET l=/ahtm q=js=t&r=c&b=39634&c=57391&n=9468&id=650e33b95a1449705621&sz=728x90&s=onetravel.com&u=c4706fc6df6f48b683d6aca71863f99f&f=1&cat=00-00&ms=558536&kw=&kwcat=&dp=&a=VmjAfwAOX7AUNL2pBW_4_aHw4x_o6q1Wy3wCYA s=200 b=2849 r=http://www.onetravel.com/ a0=2601:346:404:4e50:b090:77f3:4343:fbc1 ua=Mozilla/4.0 (compatible; MSIE 8.0; Windows NT 6.1; WOW64; Trident/4.0; SLCC2; .NET CLR 2.0.50727; .NET CLR 3.5.30729; .NET CLR 3.0.30729; Media Center PC 6.0; .NET4.0C; .NET4.0E; InfoPath.3) d=1570 rpt=ahtm x= + V=magnetic.domdex.com t=[10/Dec/2015:01:00:01 +0000] a= u=- c=c4706fc6df6f48b683d6aca71863f99f m=GET l=/ahtm q=js=t&r=c&b=39634&c=57391&n=9468&id=650e33b95a1449705631&sz=728x90&s=onetravel.com&u=c4706fc6df6f48b683d6aca71863f99f&f=1&cat=00-00&ms=558536&kw=&kwcat=&dp=&a=VmjAfwAOX7AUNL2pBW_4_aHw4x_o6q1Wy3wCYA s=200 b=2849 r=http://www.onetravel.com/ a0=2601:346:404:4e50:b090:77f3:4343:fbc1 ua=Mozilla/4.0 (compatible; MSIE 8.0; Windows NT 6.1; WOW64; Trident/4.0; SLCC2; .NET CLR 2.0.50727; .NET CLR 3.5.30729; .NET CLR 3.0.30729; Media Center PC 6.0; .NET4.0C; .NET4.0E; InfoPath.3) d=1570 rpt=ahtm x= + V=magnetic.domdex.com t=[10/Dec/2015:01:00:01 +0000] a= u=- c=c4706fc6df6f48b683d6aca71863f99f m=GET l=/ahtm q=js=t&r=c&b=39634&c=57391&n=9468&id=650e33b95a1449705641&sz=728x90&s=onetravel.com&u=c4706fc6df6f48b683d6aca71863f99f&f=1&cat=00-00&ms=558536&kw=&kwcat=&dp=&a=VmjAfwAOX7AUNL2pBW_4_aHw4x_o6q1Wy3wCYA s=200 b=2849 r=http://www.onetravel.com/ a0=2601:346:404:4e50:b090:77f3:4343:fbc1 ua=Mozilla/4.0 (compatible; MSIE 8.0; Windows NT 6.1; WOW64; Trident/4.0; SLCC2; .NET CLR 2.0.50727; .NET CLR 3.5.30729; .NET CLR 3.0.30729; Media Center PC 6.0; .NET4.0C; .NET4.0E; InfoPath.3) d=1570 rpt=ahtm x= + V=magnetic.domdex.com t=[10/Dec/2015:01:00:01 +0000] a= u=- c=c4706fc6df6f48b683d6aca71863f99f m=GET l=/ahtm q=js=t&r=c&b=39634&c=57391&n=9468&id=650e33b95a1449705651&sz=728x90&s=onetravel.com&u=c4706fc6df6f48b683d6aca71863f99f&f=1&cat=00-00&ms=558536&kw=&kwcat=&dp=&a=VmjAfwAOX7AUNL2pBW_4_aHw4x_o6q1Wy3wCYA s=200 b=2849 r=http://www.onetravel.com/ a0=2601:346:404:4e50:b090:77f3:4343:fbc1 ua=Mozilla/4.0 (compatible; MSIE 8.0; Windows NT 6.1; WOW64; Trident/4.0; SLCC2; .NET CLR 2.0.50727; .NET CLR 3.5.30729; .NET CLR 3.0.30729; Media Center PC 6.0; .NET4.0C; .NET4.0E; InfoPath.3) d=1570 rpt=ahtm x= + V=magnetic.domdex.com t=[10/Dec/2015:01:00:01 +0000] a= u=- c=c4706fc6df6f48b683d6aca71863f99f m=GET l=/ahtm q=js=t&r=c&b=39634&c=57391&n=9468&id=650e33b95a1449705661&sz=728x90&s=onetravel.com&u=c4706fc6df6f48b683d6aca71863f99f&f=1&cat=00-00&ms=558536&kw=&kwcat=&dp=&a=VmjAfwAOX7AUNL2pBW_4_aHw4x_o6q1Wy3wCYA s=200 b=2849 r=http://www.onetravel.com/ a0=2601:346:404:4e50:b090:77f3:4343:fbc1 ua=Mozilla/4.0 (compatible; MSIE 8.0; Windows NT 6.1; WOW64; Trident/4.0; SLCC2; .NET CLR 2.0.50727; .NET CLR 3.5.30729; .NET CLR 3.0.30729; Media Center PC 6.0; .NET4.0C; .NET4.0E; InfoPath.3) d=1570 rpt=ahtm x= + V=magnetic.domdex.com t=[10/Dec/2015:01:00:01 +0000] a= u=- c=c4706fc6df6f48b683d6aca71863f99f m=GET l=/ahtm q=js=t&r=c&b=39634&c=57391&n=9468&id=650e33b95a1449705671&sz=728x90&s=onetravel.com&u=c4706fc6df6f48b683d6aca71863f99f&f=1&cat=00-00&ms=558536&kw=&kwcat=&dp=&a=VmjAfwAOX7AUNL2pBW_4_aHw4x_o6q1Wy3wCYA s=200 b=2849 r=http://www.onetravel.com/ a0=2601:346:404:4e50:b090:77f3:4343:fbc1 ua=Mozilla/4.0 (compatible; MSIE 8.0; Windows NT 6.1; WOW64; Trident/4.0; SLCC2; .NET CLR 2.0.50727; .NET CLR 3.5.30729; .NET CLR 3.0.30729; Media Center PC 6.0; .NET4.0C; .NET4.0E; InfoPath.3) d=1570 rpt=ahtm x= + V=magnetic.domdex.com t=[10/Dec/2015:01:00:01 +0000] a= u=- c=c4706fc6df6f48b683d6aca71863f99f m=GET l=/ahtm q=js=t&r=c&b=39634&c=57391&n=9468&id=650e33b95a1449705681&sz=728x90&s=onetravel.com&u=c4706fc6df6f48b683d6aca71863f99f&f=1&cat=00-00&ms=558536&kw=&kwcat=&dp=&a=VmjAfwAOX7AUNL2pBW_4_aHw4x_o6q1Wy3wCYA s=200 b=2849 r=http://www.onetravel.com/ a0=2601:346:404:4e50:b090:77f3:4343:fbc1 ua=Mozilla/4.0 (compatible; MSIE 8.0; Windows NT 6.1; WOW64; Trident/4.0; SLCC2; .NET CLR 2.0.50727; .NET CLR 3.5.30729; .NET CLR 3.0.30729; Media Center PC 6.0; .NET4.0C; .NET4.0E; InfoPath.3) d=1570 rpt=ahtm x= + V=magnetic.domdex.com t=[10/Dec/2015:01:00:01 +0000] a= u=- c=c4706fc6df6f48b683d6aca71863f99f m=GET l=/ahtm q=js=t&r=c&b=39634&c=57391&n=9468&id=650e33b95a1449705691&sz=728x90&s=onetravel.com&u=c4706fc6df6f48b683d6aca71863f99f&f=1&cat=00-00&ms=558536&kw=&kwcat=&dp=&a=VmjAfwAOX7AUNL2pBW_4_aHw4x_o6q1Wy3wCYA s=200 b=2849 r=http://www.onetravel.com/ a0=2601:346:404:4e50:b090:77f3:4343:fbc1 ua=Mozilla/4.0 (compatible; MSIE 8.0; Windows NT 6.1; WOW64; Trident/4.0; SLCC2; .NET CLR 2.0.50727; .NET CLR 3.5.30729; .NET CLR 3.0.30729; Media Center PC 6.0; .NET4.0C; .NET4.0E; InfoPath.3) d=1570 rpt=ahtm x= + +# Start Kafka concole producer to submit some bid logs +sh deploy/kafka/bin/kafka-console-producer.sh --broker-list localhost:9092 --topic bid-raw + +# Copy-paste matching bid log event +1449705600 45799578e9064ca5b4e87af2aba77092 161.253.120.255 US 511 DC 20016 America/New_York 650e33b95a1449705601 5668c08c0008a5e80a1f1acb6c0f76fa g 1 thegradcafe.com 728x90 1 00-00 1054641 9115 54663 38227 54663,52593,51249,51246,55928,50856,46309,52454,32235,50944 Mozilla/5.0 (Macintosh; Intel Mac OS X 10_10_5) AppleWebKit/601.3.9 (KHTML, like Gecko) Version/9.0.2 Safari/601.3.9 http://thegradcafe.com/survey/index.php 1 875 1000 1000 en iab_tech 85 85 45 6500 45 15 25000 1000 600 1000 1000 1000 1000 1000 magnetic_ctr_variable_price 1.0 2.64151881627e-05 2015120920 -4.05492019653 2015120920 Safari Mac_OS_X 00-00 0 1 70 n_a + +# Monitor propagation of data thru system +sh deploy/kafka/bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic imp-meta +sh deploy/kafka/bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic bid-meta +sh deploy/kafka/bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic imp-error +sh deploy/kafka/bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic bid-error +sh deploy/kafka/bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic imp-bid-join + +# Shutdown everything +bin/grid stop all +``` diff --git a/src/main/scala/com/magnetic/streaming/common/AdEventsParser.scala b/src/main/scala/com/magnetic/streaming/common/AdEventsParser.scala new file mode 100644 index 00000000..5e4dd81e --- /dev/null +++ b/src/main/scala/com/magnetic/streaming/common/AdEventsParser.scala @@ -0,0 +1,99 @@ +package com.magnetic.streaming.common + +import com.magnetic.streaming.common.ParseHelpers._ + +object AdEventsParser { + + val MAGNETIC_NETWORK_ID = 1 + + val IMPRESSION = "imp" + val CLICK = "clk" + val CONVERSION = "conv" + val CONVCOUNT = "ccount" + val RETARGETING = "rpx" + val RETARCOUNT = "rcount" + val ERROR = "error" + val SKIP = "skip" + val BID = "bid" + + val ACLK_PATH_QS = "/aclk/(\\w+=.*)/qe=".r + val VALID_AUCTION_ID_PATTERN = "[\\-\\.\\w\\d]{6,50}".r + + def parse_querystring(fields: Map[String, String]): Map[String, String] = { + fields.get("l") match { + case Some(ACLK_PATH_QS(aclk)) => aclk.split('/').map(split_one(_)).toMap + case _ => + parse_qs(fields.getOrElse("q", "").stripSuffix("/")) + } + } + + def parse_common(fields: Map[String, String]): (String, Long, String, Map[String, String], String) = { + + val timestamp = parse_timestamp(fields.getOrElse("t", "")) + val querystring = parse_querystring(fields) + val cookie = { + val c = fields.getOrElse("c", "").split('|')(0) + if (c.length() == 32) c + else null + } + + val (event_type, pixel_id) = { + if (fields.getOrElse("rpt", "") == "ahtm") (IMPRESSION, null) + else if (fields.getOrElse("rpt", "") == "aclk") (CLICK, null) + else (SKIP, null) + } + + (event_type, timestamp, cookie, querystring, pixel_id) + } + + def parse_fields(line: String):Map[String, String] = { + line.split("\t").withFilter(_.contains("=")).map(split_one(_)).toMap + } + + def should_skip_impression(fields: Map[String, String], querystring: Map[String, String]): Boolean = { + false //TODO implement + } + + def parse_imp_meta(line: String): Map[String, Any] = { + val fields = parse_fields(line) + val (event_type, timestamp, cookie, querystring, pixel_id) = parse_common(fields) + event_type match { + case IMPRESSION => + if (should_skip_impression(fields, querystring)) + throw new RuntimeException("Should skip impression") + else { + val network_id = _int(querystring.getOrElse("mp", MAGNETIC_NETWORK_ID.toString)) + + val auction_id = { + if (network_id == MAGNETIC_NETWORK_ID) + querystring("id") + else + VALID_AUCTION_ID_PATTERN.findFirstIn(querystring.getOrElse("id", "")) match { + case Some(x) => querystring("id") + case _ => s"fake-$cookie-$timestamp" + } + } + val d = scala.collection.mutable.Map[String, Any]() + d("event_type") = IMPRESSION + d("auction_id") = auction_id + d("log_timestamp") = timestamp + d("imp_log_line") = line + d.toMap + } + case _ => + throw new RuntimeException("Not impression: " + event_type) + } + } + + def parse_bid_meta(line: String): Map[String, Any] = { + val fields = line.split("\t", 10) + val auction_id = fields(8) + val timestamp = fields(0).toLong + val d = scala.collection.mutable.Map[String, Any]() + d("event_type") = BID + d("auction_id") = auction_id + d("log_timestamp") = timestamp + d("bid_log_line") = line + d.toMap + } +} diff --git a/src/main/scala/com/magnetic/streaming/common/ParseHelpers.scala b/src/main/scala/com/magnetic/streaming/common/ParseHelpers.scala new file mode 100644 index 00000000..1e2cee10 --- /dev/null +++ b/src/main/scala/com/magnetic/streaming/common/ParseHelpers.scala @@ -0,0 +1,36 @@ +package com.magnetic.streaming.common + +import java.net.{URL, URLDecoder} +import java.text.SimpleDateFormat +import java.util.TimeZone + +import scala.util.Try + +object ParseHelpers { + + def _int(s: String): java.lang.Integer = { + Try {s.toInt}.toOption match { + case Some(i) => i + case _ => null + } + } + + def split_one(s: String, delim: String = "="): (String, String) = { + s.split(delim, 2).toList match { + case k :: Nil => k -> "" + case k :: v :: Nil => k -> v + case _ => "" -> "" + } + } + + def parse_timestamp(raw_timestamp: String): Long = { + Try {new SimpleDateFormat("[dd/MMM/yyyy:HH:mm:ss Z]").parse(raw_timestamp)}.toOption match { + case Some(t) => t.getTime/1000 + case _ => 0 + } + } + + def parse_qs(q:String): Map[String,String] = { + URLDecoder.decode(q).split("&").flatMap(_.split(";")).map(_.trim).withFilter(_.length > 0).map(split_one(_)).toMap + } +} \ No newline at end of file diff --git a/src/main/scala/com/magnetic/streaming/samza/MagneticEventsFeedStreamTask.scala b/src/main/scala/com/magnetic/streaming/samza/MagneticEventsFeedStreamTask.scala new file mode 100644 index 00000000..4753547f --- /dev/null +++ b/src/main/scala/com/magnetic/streaming/samza/MagneticEventsFeedStreamTask.scala @@ -0,0 +1,93 @@ +package com.magnetic.streaming.samza + +import scala.collection.JavaConversions._ +import com.magnetic.streaming.common.AdEventsParser +import org.apache.samza.Partition +import org.apache.samza.system.IncomingMessageEnvelope +import org.apache.samza.system.OutgoingMessageEnvelope +import org.apache.samza.system.SystemStream +import org.apache.samza.system.SystemStreamPartition +import org.apache.samza.task.MessageCollector +import org.apache.samza.task.StreamTask +import org.apache.samza.task.TaskCoordinator +import org.apache.samza.task.TaskCoordinator.RequestScope + +import scala.util.{Failure, Success, Try} + +/** + * Reads raw impressions and bids from Kafka. + * Does minor parsing to extract meta information, like auction id and log timestamp. + * Then sends event to the appropriate topic/partition. + * Chooses partition, by hashing auction id. + */ +class MagneticEventsFeedStreamTask extends StreamTask { + val NUM_PARTITIONS = 4 // Equals to the number of partitions in target topic + val IMP_OUTPUT_STREAM = new SystemStream("kafka", "imp-meta") + val BID_OUTPUT_STREAM = new SystemStream("kafka", "bid-meta") + val IMP_ERROR_STREAM = new SystemStream("kafka", "imp-error") + val BID_ERROR_STREAM = new SystemStream("kafka", "bid-error") + + def getPartitionKey(key: String) = { + key.hashCode() % NUM_PARTITIONS + } + + def send(event: Map[String, Any], collector: MessageCollector, system: SystemStream) { + val auctionId = event("auction_id").asInstanceOf[String] + collector.send( + new OutgoingMessageEnvelope(system, getPartitionKey(auctionId), auctionId, mapAsJavaMap(event)) + ) + } + + def sendError(rawEvent: String, ex: Throwable, collector: MessageCollector, system: SystemStream) { + val error = Map("event_type" -> AdEventsParser.ERROR, "log_line" -> rawEvent, "exception" -> ex.getMessage) + collector.send( + new OutgoingMessageEnvelope(system, mapAsJavaMap(error)) + ) + } + + override def process(envelope: IncomingMessageEnvelope, collector: MessageCollector, coordinator: TaskCoordinator) { + val rawEvent = envelope.getMessage.asInstanceOf[String] + envelope.getSystemStreamPartition.getSystemStream.getStream match { + case "imp-raw" => + Try(AdEventsParser.parse_imp_meta(rawEvent)) match { + case Success(metaEvent) => send(metaEvent, collector, IMP_OUTPUT_STREAM) + case Failure(exception) => sendError(rawEvent, exception, collector, IMP_ERROR_STREAM) + } + case "bid-raw" => + Try(AdEventsParser.parse_bid_meta(rawEvent)) match { + case Success(metaEvent) => send(metaEvent, collector, BID_OUTPUT_STREAM) + case Failure(exception) => sendError(rawEvent, exception, collector, BID_ERROR_STREAM) + } + case notSupportedStream => + throw new RuntimeException(s"Not supported stream: $notSupportedStream") + } + } +} + +/** + * TODO remove this and create a proper unit test with mock objects + */ +object HelloMagneticSamza { + def main(args: Array[String]): Unit = { + val t = new MagneticEventsFeedStreamTask() + val system = new SystemStream("bla", "imp-raw") + val partition = new SystemStreamPartition(system, new Partition(0)) + val envelope = new IncomingMessageEnvelope(partition, "", "key", "msg") + val collector = new MessageCollector() { + def send(envelope: OutgoingMessageEnvelope) { + println("Send method called:" + envelope.getSystemStream.getStream) + println("Message:" + envelope.getMessage) + } + } + val coordinator = new TaskCoordinator() { + def commit(requestScope: RequestScope) { + println("Commit method called") + } + + def shutdown(requestScope: RequestScope) { + System.out.println("Shautdown method called") + } + } + t.process(envelope, collector, coordinator) + } +} \ No newline at end of file diff --git a/src/main/scala/com/magnetic/streaming/samza/MagneticJoinStreamsTask.scala b/src/main/scala/com/magnetic/streaming/samza/MagneticJoinStreamsTask.scala new file mode 100644 index 00000000..46a32a84 --- /dev/null +++ b/src/main/scala/com/magnetic/streaming/samza/MagneticJoinStreamsTask.scala @@ -0,0 +1,88 @@ +package com.magnetic.streaming.samza + +import scala.collection.JavaConversions._ +import com.magnetic.streaming.common.AdEventsParser.IMPRESSION +import org.apache.samza.config.Config +import org.apache.samza.storage.kv.KeyValueStore +import org.apache.samza.system.{OutgoingMessageEnvelope, SystemStream, IncomingMessageEnvelope} +import org.apache.samza.task._ + +/** + * Reads impressions and bids pre-parsed by previous step. + * Since they were partitioned by auction id, ims and bids with the same key end up in the same task. + * Uses local KV store assigned for given task to lookup matching events. + * If match found, creates a joined event and sends it downstream. + * Otherwise persists event for further lookups. + * Periodically deletes stale events from local KV store (it blocks main processing, so performance is critical). + */ +class MagneticJoinStreamsTask extends StreamTask with InitableTask with WindowableTask { + + val OUTPUT_STREAM = new SystemStream("kafka", "imp-bid-joined") + var impStore: KeyValueStore[String, java.util.Map[String, Any]] = null + var bidStore: KeyValueStore[String, java.util.Map[String, Any]] = null + var lastImpTimestamp = 0 + var lastBidTimestamp = 0 + + override def init(config: Config, context: TaskContext) { + this.impStore = context.getStore("imp-store").asInstanceOf[KeyValueStore[String, java.util.Map[String, Any]]] + this.bidStore = context.getStore("bid-store").asInstanceOf[KeyValueStore[String, java.util.Map[String, Any]]] + } + + def buildJoinedEvent(metaImp:Map[String,Any], metaBid:Map[String,Any]):Map[String,Any] = { + val d = scala.collection.mutable.Map[String, Any]() + d("event_type") = IMPRESSION + d("auction_id") = metaImp("auction_id") + d("log_timestamp") = metaImp("log_timestamp") + d("imp_log_line") = metaImp("imp_log_line") + d("bid_log_line") = metaBid("bid_log_line") + d.toMap + } + + override def process(envelope: IncomingMessageEnvelope, collector: MessageCollector, coordinator: TaskCoordinator) { + val key = envelope.getKey.asInstanceOf[String] + val event = mapAsScalaMap(envelope.getMessage.asInstanceOf[java.util.Map[String,Any]]).toMap + envelope.getSystemStreamPartition.getSystemStream.getStream match { + case "imp-meta" => + lastImpTimestamp = event("log_timestamp").asInstanceOf[Integer] + Option(bidStore.get(key)) match { + case Some(bid) => + collector.send( + new OutgoingMessageEnvelope( + OUTPUT_STREAM, key, mapAsJavaMap(buildJoinedEvent(event, mapAsScalaMap(bid).toMap)) + ) + ) + bidStore.delete(key) + case None => impStore.put(key, mapAsJavaMap(event)) + } + case "bid-meta" => + lastBidTimestamp = event("log_timestamp").asInstanceOf[Integer] + Option(impStore.get(key)) match { + case Some(imp) => + collector.send( + new OutgoingMessageEnvelope( + OUTPUT_STREAM, key, mapAsJavaMap(buildJoinedEvent(mapAsScalaMap(imp).toMap, event)) + ) + ) + impStore.delete(key) + case None => bidStore.put(key, mapAsJavaMap(event)) + } + case notSupportedStream => + throw new RuntimeException(s"Not supported stream: $notSupportedStream") + } + } + + def cleanUpEventStore(eventStore: KeyValueStore[String, java.util.Map[String, Any]], thresholdTimestamp: Integer) { + val it = eventStore.all() + while (it.hasNext) { + val entry = it.next() + if (entry.getValue.get("log_timestamp").asInstanceOf[Integer] < thresholdTimestamp) { + eventStore.delete(entry.getKey) + } + } + } + + override def window(messageCollector: MessageCollector, taskCoordinator: TaskCoordinator) { + cleanUpEventStore(impStore, lastImpTimestamp - 3600) //TODO Keep one hour of events. Make it configurable + cleanUpEventStore(bidStore, lastBidTimestamp - 3600) //TODO Keep one hour of events. Make it configurable + } +}