From 6b7c1b58af231ea517f83412b872dbdd3616ac82 Mon Sep 17 00:00:00 2001 From: Stanislav Los Date: Tue, 15 Dec 2015 15:32:36 -0500 Subject: [PATCH 01/11] simple feed works --- pom.xml | 10 +++---- src/main/assembly/src.xml | 5 ++++ src/main/config/magnetic-feed.properties | 26 ++++++++++++++++ .../samza/MagneticEventsFeedStreamTask.java | 30 +++++++++++++++++++ .../com/magnetic/streaming/samza/README.md | 26 ++++++++++++++++ 5 files changed, 92 insertions(+), 5 deletions(-) create mode 100644 src/main/config/magnetic-feed.properties create mode 100644 src/main/java/com/magnetic/streaming/samza/MagneticEventsFeedStreamTask.java create mode 100644 src/main/java/com/magnetic/streaming/samza/README.md diff --git a/pom.xml b/pom.xml index c1d552f3..6f97713e 100644 --- a/pom.xml +++ b/pom.xml @@ -168,9 +168,9 @@ under the License. - + org.apache.maven.plugins @@ -204,7 +204,7 @@ under the License. 1.7 - + maven-assembly-plugin diff --git a/src/main/assembly/src.xml b/src/main/assembly/src.xml index f57fee2a..b659db08 100644 --- a/src/main/assembly/src.xml +++ b/src/main/assembly/src.xml @@ -36,6 +36,11 @@ + + ${basedir}/src/main/config/magnetic-feed.properties + config + true + ${basedir}/src/main/config/wikipedia-feed.properties config diff --git a/src/main/config/magnetic-feed.properties b/src/main/config/magnetic-feed.properties new file mode 100644 index 00000000..fa84e1ad --- /dev/null +++ b/src/main/config/magnetic-feed.properties @@ -0,0 +1,26 @@ +# Job +job.factory.class=org.apache.samza.job.yarn.YarnJobFactory +job.name=magnetic-feed + +# YARN +yarn.package.path=file://${basedir}/target/${project.artifactId}-${pom.version}-dist.tar.gz + +# Task +task.class=com.magnetic.streaming.samza.MagneticEventsFeedStreamTask +task.inputs=kafka.imp-raw,kafka.bid-raw + +# Serializers +serializers.registry.json.class=org.apache.samza.serializers.JsonSerdeFactory +serializers.registry.string.class=org.apache.samza.serializers.StringSerdeFactory + +# Kafka System +systems.kafka.samza.factory=org.apache.samza.system.kafka.KafkaSystemFactory +systems.kafka.samza.msg.serde=string +systems.kafka.consumer.zookeeper.connect=localhost:2181/ +systems.kafka.producer.bootstrap.servers=localhost:9092 + +# Job Coordinator +job.coordinator.system=kafka +# Add configuration to disable checkpointing for this job once it is available in the Coordinator Stream model +# See https://issues.apache.org/jira/browse/SAMZA-465?focusedCommentId=14533346&page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel#comment-14533346 for more details +job.coordinator.replication.factor=1 diff --git a/src/main/java/com/magnetic/streaming/samza/MagneticEventsFeedStreamTask.java b/src/main/java/com/magnetic/streaming/samza/MagneticEventsFeedStreamTask.java new file mode 100644 index 00000000..da66e8a0 --- /dev/null +++ b/src/main/java/com/magnetic/streaming/samza/MagneticEventsFeedStreamTask.java @@ -0,0 +1,30 @@ +package com.magnetic.streaming.samza; + +import org.apache.samza.system.IncomingMessageEnvelope; +import org.apache.samza.system.SystemStream; +import org.apache.samza.task.MessageCollector; +import org.apache.samza.task.StreamTask; +import org.apache.samza.task.TaskCoordinator; + +/** + * Created by stanislav on 12/15/15. + */ +public class MagneticEventsFeedStreamTask implements StreamTask { + //private static final SystemStream IMP_OUTPUT_STREAM = new SystemStream("kafka", "imp-raw-partitioned"); + //private static final SystemStream BID_OUTPUT_STREAM = new SystemStream("kafka", "bid-raw-partitioned"); + + @Override + public void process(IncomingMessageEnvelope envelope, MessageCollector collector, TaskCoordinator coordinator) throws Exception { + String event = (String)envelope.getMessage(); + if(envelope.getSystemStreamPartition().getSystemStream().getStream().equals("imp-raw")){ + + System.out.println("Imp: " + event); + } + else if (envelope.getSystemStreamPartition().getSystemStream().getStream().equals("bid-raw")){ + System.out.println("Bid: " + event); + } + else { + throw new RuntimeException("Not supported stream: " + envelope.getSystemStreamPartition().getSystemStream().getStream()); + } + } +} diff --git a/src/main/java/com/magnetic/streaming/samza/README.md b/src/main/java/com/magnetic/streaming/samza/README.md new file mode 100644 index 00000000..9a7d398e --- /dev/null +++ b/src/main/java/com/magnetic/streaming/samza/README.md @@ -0,0 +1,26 @@ +``` +for the first time: +bin/grid bootstrap + +after that: +bin/grid start all + +sh deploy/kafka/bin/kafka-topics.sh --create --topic imp-raw --zookeeper localhost:2181 --partitions 1 --replication 1 +sh deploy/kafka/bin/kafka-topics.sh --create --topic bid-raw --zookeeper localhost:2181 --partitions 1 --replication 1 +sh deploy/kafka/bin/kafka-topics.sh --create --topic imp-raw-partitioned --zookeeper localhost:2181 --partitions 4 --replication 1 +sh deploy/kafka/bin/kafka-topics.sh --create --topic bid-raw-partitioned --zookeeper localhost:2181 --partitions 4 --replication 1 +sh deploy/kafka/bin/kafka-topics.sh --list --zookeeper localhost:2181 + +mvn clean package +rm -rf deploy/samza +mkdir -p deploy/samza +tar -xvf ./target/hello-samza-0.10.0-dist.tar.gz -C deploy/samza + +deploy/samza/bin/run-job.sh --config-factory=org.apache.samza.config.factories.PropertiesConfigFactory \ + --config-path=file://$PWD/deploy/samza/config/magnetic-feed.properties +tail -100f deploy/yarn/logs/userlogs/application_XXXXXXXXXX_XXXX/container_XXXXXXXXXX_XXXX_XX_XXXXXX/{logs} + +sh deploy/kafka/bin/kafka-console-producer.sh --broker-list localhost:9092 --topic imp-raw + +bin/grid stop all +``` \ No newline at end of file From bd0f3080f6e2d46609fe6fb476d3e5aa086a4e54 Mon Sep 17 00:00:00 2001 From: Stanislav Los Date: Wed, 16 Dec 2015 09:22:55 -0500 Subject: [PATCH 02/11] Added scala, but it doesn't work --- pom.xml | 21 +++++ .../samza/MagneticEventsFeedStreamTask.java | 70 +++++++++++++-- .../com/magnetic/streaming/samza/README.md | 14 ++- .../streaming/common/AdEventsParser.scala | 88 +++++++++++++++++++ .../streaming/common/ParseHelpers.scala | 35 ++++++++ 5 files changed, 220 insertions(+), 8 deletions(-) create mode 100644 src/main/scala/com/magnetic/streaming/common/AdEventsParser.scala create mode 100644 src/main/scala/com/magnetic/streaming/common/ParseHelpers.scala diff --git a/pom.xml b/pom.xml index 6f97713e..03f51114 100644 --- a/pom.xml +++ b/pom.xml @@ -108,10 +108,17 @@ under the License. hadoop-hdfs 2.6.1 + + org.scala-lang + scala-library + ${scala.version} + + 2.10 + 2.10.4 UTF-8 0.10.0-SNAPSHOT @@ -194,7 +201,21 @@ under the License. --> + src/main/scala + + org.scala-tools + maven-scala-plugin + 2.15.0 + + + + compile + testCompile + + + + org.apache.maven.plugins maven-compiler-plugin diff --git a/src/main/java/com/magnetic/streaming/samza/MagneticEventsFeedStreamTask.java b/src/main/java/com/magnetic/streaming/samza/MagneticEventsFeedStreamTask.java index da66e8a0..97d84a45 100644 --- a/src/main/java/com/magnetic/streaming/samza/MagneticEventsFeedStreamTask.java +++ b/src/main/java/com/magnetic/streaming/samza/MagneticEventsFeedStreamTask.java @@ -1,30 +1,90 @@ package com.magnetic.streaming.samza; +import com.magnetic.streaming.common.AdEventsParser; +import org.apache.samza.Partition; import org.apache.samza.system.IncomingMessageEnvelope; +import org.apache.samza.system.OutgoingMessageEnvelope; import org.apache.samza.system.SystemStream; +import org.apache.samza.system.SystemStreamPartition; import org.apache.samza.task.MessageCollector; import org.apache.samza.task.StreamTask; import org.apache.samza.task.TaskCoordinator; +import scala.Option; /** * Created by stanislav on 12/15/15. */ public class MagneticEventsFeedStreamTask implements StreamTask { - //private static final SystemStream IMP_OUTPUT_STREAM = new SystemStream("kafka", "imp-raw-partitioned"); - //private static final SystemStream BID_OUTPUT_STREAM = new SystemStream("kafka", "bid-raw-partitioned"); + private static final int NUM_PARTITIONS = 4; + private static final SystemStream IMP_OUTPUT_STREAM = new SystemStream("kafka", "imp-raw-partitioned"); + private static final SystemStream BID_OUTPUT_STREAM = new SystemStream("kafka", "bid-raw-partitioned"); + private static final SystemStream IMP_ERROR_STREAM = new SystemStream("kafka", "imp-error"); + private static final SystemStream BID_ERROR_STREAM = new SystemStream("kafka", "bid-error"); + + private int getPartitionKey(String key){ + return key.hashCode() % NUM_PARTITIONS; + } + + private void send(Option key, String event, MessageCollector collector, SystemStream system){ + if(key.isDefined()){ + String auctionId = (String)key.get(); + collector.send(new OutgoingMessageEnvelope(system, getPartitionKey(auctionId), auctionId, event)); + } + } @Override public void process(IncomingMessageEnvelope envelope, MessageCollector collector, TaskCoordinator coordinator) throws Exception { String event = (String)envelope.getMessage(); if(envelope.getSystemStreamPartition().getSystemStream().getStream().equals("imp-raw")){ - - System.out.println("Imp: " + event); + Option key = AdEventsParser.extract_impression_auction_id(event); + if(key.isDefined()) { + send(key, event, collector, IMP_OUTPUT_STREAM); + } + else { + collector.send(new OutgoingMessageEnvelope(IMP_ERROR_STREAM, event)); + } } else if (envelope.getSystemStreamPartition().getSystemStream().getStream().equals("bid-raw")){ - System.out.println("Bid: " + event); + Option key = AdEventsParser.extract_bid_auction_id(event); + if(key.isDefined()) { + send(key, event, collector, BID_OUTPUT_STREAM); + } + else { + collector.send(new OutgoingMessageEnvelope(BID_ERROR_STREAM, event)); + } } else { throw new RuntimeException("Not supported stream: " + envelope.getSystemStreamPartition().getSystemStream().getStream()); } } + + /** + * Sample run + * @param arg + * @throws Exception + */ + public static void main(String[] arg) throws Exception { + MagneticEventsFeedStreamTask t = new MagneticEventsFeedStreamTask(); + SystemStream system = new SystemStream("bla", "imp-raw"); + SystemStreamPartition partition = new SystemStreamPartition(system, new Partition(0)); + IncomingMessageEnvelope envelope = new IncomingMessageEnvelope(partition, "", "key", "msg"); + MessageCollector collector = new MessageCollector() { + @Override + public void send(OutgoingMessageEnvelope envelope) { + System.out.println("Send method called:" + envelope.getSystemStream().getStream()); + } + }; + TaskCoordinator coordinator = new TaskCoordinator() { + @Override + public void commit(RequestScope requestScope) { + System.out.println("Commit method called"); + } + + @Override + public void shutdown(RequestScope requestScope) { + System.out.println("Shautdown method called"); + } + }; + t.process(envelope, collector, coordinator); + } } diff --git a/src/main/java/com/magnetic/streaming/samza/README.md b/src/main/java/com/magnetic/streaming/samza/README.md index 9a7d398e..53f5a318 100644 --- a/src/main/java/com/magnetic/streaming/samza/README.md +++ b/src/main/java/com/magnetic/streaming/samza/README.md @@ -2,15 +2,17 @@ for the first time: bin/grid bootstrap -after that: -bin/grid start all - sh deploy/kafka/bin/kafka-topics.sh --create --topic imp-raw --zookeeper localhost:2181 --partitions 1 --replication 1 sh deploy/kafka/bin/kafka-topics.sh --create --topic bid-raw --zookeeper localhost:2181 --partitions 1 --replication 1 sh deploy/kafka/bin/kafka-topics.sh --create --topic imp-raw-partitioned --zookeeper localhost:2181 --partitions 4 --replication 1 sh deploy/kafka/bin/kafka-topics.sh --create --topic bid-raw-partitioned --zookeeper localhost:2181 --partitions 4 --replication 1 +sh deploy/kafka/bin/kafka-topics.sh --create --topic imp-error --zookeeper localhost:2181 --partitions 1 --replication 1 +sh deploy/kafka/bin/kafka-topics.sh --create --topic bid-error --zookeeper localhost:2181 --partitions 1 --replication 1 sh deploy/kafka/bin/kafka-topics.sh --list --zookeeper localhost:2181 +after that: +bin/grid start all + mvn clean package rm -rf deploy/samza mkdir -p deploy/samza @@ -21,6 +23,12 @@ deploy/samza/bin/run-job.sh --config-factory=org.apache.samza.config.factories.P tail -100f deploy/yarn/logs/userlogs/application_XXXXXXXXXX_XXXX/container_XXXXXXXXXX_XXXX_XX_XXXXXX/{logs} sh deploy/kafka/bin/kafka-console-producer.sh --broker-list localhost:9092 --topic imp-raw +sh deploy/kafka/bin/kafka-console-producer.sh --broker-list localhost:9092 --topic bid-raw + +sh deploy/kafka/bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic imp-raw-partitioned +sh deploy/kafka/bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic bid-raw-partitioned +sh deploy/kafka/bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic imp-error +sh deploy/kafka/bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic bid-error bin/grid stop all ``` \ No newline at end of file diff --git a/src/main/scala/com/magnetic/streaming/common/AdEventsParser.scala b/src/main/scala/com/magnetic/streaming/common/AdEventsParser.scala new file mode 100644 index 00000000..8495a215 --- /dev/null +++ b/src/main/scala/com/magnetic/streaming/common/AdEventsParser.scala @@ -0,0 +1,88 @@ +package com.magnetic.streaming.common + +import com.magnetic.streaming.common.ParseHelpers._ + +object AdEventsParser { + + val MAGNETIC_NETWORK_ID = 1 + + val IMPRESSION = "imp" + val CLICK = "clk" + val CONVERSION = "conv" + val CONVCOUNT = "ccount" + val RETARGETING = "rpx" + val RETARCOUNT = "rcount" + val ERROR = "error" + val SKIP = "skip" + + val ACLK_PATH_QS = "/aclk/(\\w+=.*)/qe=".r + val VALID_AUCTION_ID_PATTERN = "[\\-\\.\\w\\d]{6,50}".r + + def parse_querystring(fields: Map[String, String]): Map[String, String] = { + fields.get("l") match { + case Some(ACLK_PATH_QS(aclk)) => aclk.split('/').map(split_one(_)).toMap + case _ => + parse_qs(fields.getOrElse("q", "").stripSuffix("/")) + } + } + + def parse_common(fields: Map[String, String]): (String, String, String, Map[String, String], String) = { + + val timestamp = parse_timestamp(fields.getOrElse("t", "")) + val querystring = parse_querystring(fields) + val cookie = { + val c = fields.getOrElse("c", "").split('|')(0) + if (c.length() == 32) c + else null + } + + val (event_type, pixel_id) = { + if (fields.getOrElse("rpt", "") == "ahtm") (IMPRESSION, null) + else if (fields.getOrElse("rpt", "") == "aclk") (CLICK, null) + else (SKIP, null) + } + + (event_type, timestamp, cookie, querystring, pixel_id) + } + + def parse_fields(line: String):Map[String, String] = { + line.split("\t").withFilter(_.contains("=")).map(split_one(_)).toMap + } + + def extract_auction_id(querystring: Map[String, String]): Option[String] = { + val network_id = _int(querystring.getOrElse("mp", MAGNETIC_NETWORK_ID.toString)) + + if (network_id == MAGNETIC_NETWORK_ID) + Some(querystring("id")) + else + VALID_AUCTION_ID_PATTERN.findFirstIn(querystring.getOrElse("id", "")) + } + + def should_skip_impression(fields: Map[String, String], querystring: Map[String, String]): Boolean = { + false //TODO implement + } + + def extract_impression_auction_id(line: String): Option[String] = { + val fields = parse_fields(line) + val (event_type, timestamp, cookie, querystring, pixel_id) = parse_common(fields) + event_type match { + case IMPRESSION => + if (should_skip_impression(fields, querystring)) + None + else + extract_auction_id(querystring) + case _ => + None + } + } + + def extract_bid_auction_id(line: String): Option[String] = { + try { + val auction_id = line.split("\t", 10)(8) + Some(auction_id) + } + catch { + case t: Throwable => None + } + } +} diff --git a/src/main/scala/com/magnetic/streaming/common/ParseHelpers.scala b/src/main/scala/com/magnetic/streaming/common/ParseHelpers.scala new file mode 100644 index 00000000..aa12c24e --- /dev/null +++ b/src/main/scala/com/magnetic/streaming/common/ParseHelpers.scala @@ -0,0 +1,35 @@ +package com.magnetic.streaming.common + +import java.net.{URL, URLDecoder} +import java.text.SimpleDateFormat + +import scala.util.Try + +object ParseHelpers { + + def _int(s: String): java.lang.Integer = { + Try {s.toInt}.toOption match { + case Some(i) => i + case _ => null + } + } + + def split_one(s: String, delim: String = "="): (String, String) = { + s.split(delim, 2).toList match { + case k :: Nil => k -> "" + case k :: v :: Nil => k -> v + case _ => "" -> "" + } + } + + def parse_timestamp(raw_timestamp: String): String = { + Try {new SimpleDateFormat("[dd/MMM/yyyy:HH:mm:ss Z]").parse(raw_timestamp)}.toOption match { + case Some(t) => (t.getTime/1000).toString + case _ => "0" + } + } + + def parse_qs(q:String): Map[String,String] = { + URLDecoder.decode(q).split("&").flatMap(_.split(";")).map(_.trim).withFilter(_.length > 0).map(split_one(_)).toMap + } +} \ No newline at end of file From b1e816eb08a4fba848b2545b523a3872eaebe852 Mon Sep 17 00:00:00 2001 From: Stanislav Los Date: Wed, 16 Dec 2015 11:04:46 -0500 Subject: [PATCH 03/11] converted to scala --- .../samza/MagneticEventsFeedStreamTask.java | 90 ------------------- .../streaming/samza => scala}/README.md | 0 .../samza/MagneticEventsFeedStreamTask.scala | 72 +++++++++++++++ 3 files changed, 72 insertions(+), 90 deletions(-) delete mode 100644 src/main/java/com/magnetic/streaming/samza/MagneticEventsFeedStreamTask.java rename src/main/{java/com/magnetic/streaming/samza => scala}/README.md (100%) create mode 100644 src/main/scala/com/magnetic/streaming/samza/MagneticEventsFeedStreamTask.scala diff --git a/src/main/java/com/magnetic/streaming/samza/MagneticEventsFeedStreamTask.java b/src/main/java/com/magnetic/streaming/samza/MagneticEventsFeedStreamTask.java deleted file mode 100644 index 97d84a45..00000000 --- a/src/main/java/com/magnetic/streaming/samza/MagneticEventsFeedStreamTask.java +++ /dev/null @@ -1,90 +0,0 @@ -package com.magnetic.streaming.samza; - -import com.magnetic.streaming.common.AdEventsParser; -import org.apache.samza.Partition; -import org.apache.samza.system.IncomingMessageEnvelope; -import org.apache.samza.system.OutgoingMessageEnvelope; -import org.apache.samza.system.SystemStream; -import org.apache.samza.system.SystemStreamPartition; -import org.apache.samza.task.MessageCollector; -import org.apache.samza.task.StreamTask; -import org.apache.samza.task.TaskCoordinator; -import scala.Option; - -/** - * Created by stanislav on 12/15/15. - */ -public class MagneticEventsFeedStreamTask implements StreamTask { - private static final int NUM_PARTITIONS = 4; - private static final SystemStream IMP_OUTPUT_STREAM = new SystemStream("kafka", "imp-raw-partitioned"); - private static final SystemStream BID_OUTPUT_STREAM = new SystemStream("kafka", "bid-raw-partitioned"); - private static final SystemStream IMP_ERROR_STREAM = new SystemStream("kafka", "imp-error"); - private static final SystemStream BID_ERROR_STREAM = new SystemStream("kafka", "bid-error"); - - private int getPartitionKey(String key){ - return key.hashCode() % NUM_PARTITIONS; - } - - private void send(Option key, String event, MessageCollector collector, SystemStream system){ - if(key.isDefined()){ - String auctionId = (String)key.get(); - collector.send(new OutgoingMessageEnvelope(system, getPartitionKey(auctionId), auctionId, event)); - } - } - - @Override - public void process(IncomingMessageEnvelope envelope, MessageCollector collector, TaskCoordinator coordinator) throws Exception { - String event = (String)envelope.getMessage(); - if(envelope.getSystemStreamPartition().getSystemStream().getStream().equals("imp-raw")){ - Option key = AdEventsParser.extract_impression_auction_id(event); - if(key.isDefined()) { - send(key, event, collector, IMP_OUTPUT_STREAM); - } - else { - collector.send(new OutgoingMessageEnvelope(IMP_ERROR_STREAM, event)); - } - } - else if (envelope.getSystemStreamPartition().getSystemStream().getStream().equals("bid-raw")){ - Option key = AdEventsParser.extract_bid_auction_id(event); - if(key.isDefined()) { - send(key, event, collector, BID_OUTPUT_STREAM); - } - else { - collector.send(new OutgoingMessageEnvelope(BID_ERROR_STREAM, event)); - } - } - else { - throw new RuntimeException("Not supported stream: " + envelope.getSystemStreamPartition().getSystemStream().getStream()); - } - } - - /** - * Sample run - * @param arg - * @throws Exception - */ - public static void main(String[] arg) throws Exception { - MagneticEventsFeedStreamTask t = new MagneticEventsFeedStreamTask(); - SystemStream system = new SystemStream("bla", "imp-raw"); - SystemStreamPartition partition = new SystemStreamPartition(system, new Partition(0)); - IncomingMessageEnvelope envelope = new IncomingMessageEnvelope(partition, "", "key", "msg"); - MessageCollector collector = new MessageCollector() { - @Override - public void send(OutgoingMessageEnvelope envelope) { - System.out.println("Send method called:" + envelope.getSystemStream().getStream()); - } - }; - TaskCoordinator coordinator = new TaskCoordinator() { - @Override - public void commit(RequestScope requestScope) { - System.out.println("Commit method called"); - } - - @Override - public void shutdown(RequestScope requestScope) { - System.out.println("Shautdown method called"); - } - }; - t.process(envelope, collector, coordinator); - } -} diff --git a/src/main/java/com/magnetic/streaming/samza/README.md b/src/main/scala/README.md similarity index 100% rename from src/main/java/com/magnetic/streaming/samza/README.md rename to src/main/scala/README.md diff --git a/src/main/scala/com/magnetic/streaming/samza/MagneticEventsFeedStreamTask.scala b/src/main/scala/com/magnetic/streaming/samza/MagneticEventsFeedStreamTask.scala new file mode 100644 index 00000000..39f5b94f --- /dev/null +++ b/src/main/scala/com/magnetic/streaming/samza/MagneticEventsFeedStreamTask.scala @@ -0,0 +1,72 @@ +package com.magnetic.streaming.samza; + +import com.magnetic.streaming.common.AdEventsParser +import org.apache.samza.Partition +import org.apache.samza.system.IncomingMessageEnvelope +import org.apache.samza.system.OutgoingMessageEnvelope +import org.apache.samza.system.SystemStream +import org.apache.samza.system.SystemStreamPartition +import org.apache.samza.task.MessageCollector +import org.apache.samza.task.StreamTask +import org.apache.samza.task.TaskCoordinator +import org.apache.samza.task.TaskCoordinator.RequestScope + +class MagneticEventsFeedStreamTask extends StreamTask { + val NUM_PARTITIONS = 4 + val IMP_OUTPUT_STREAM = new SystemStream("kafka", "imp-raw-partitioned") + val BID_OUTPUT_STREAM = new SystemStream("kafka", "bid-raw-partitioned") + val IMP_ERROR_STREAM = new SystemStream("kafka", "imp-error") + val BID_ERROR_STREAM = new SystemStream("kafka", "bid-error") + + def getPartitionKey(key: String) = { + key.hashCode() % NUM_PARTITIONS + } + + def send(auctionId: String, event: String, collector: MessageCollector, system: SystemStream) { + collector.send( + new OutgoingMessageEnvelope(system, getPartitionKey(auctionId), auctionId, event) + ) + } + + def process(envelope: IncomingMessageEnvelope, collector: MessageCollector, coordinator: TaskCoordinator) { + val event = envelope.getMessage.asInstanceOf[String] + envelope.getSystemStreamPartition.getSystemStream.getStream match { + case "imp-raw" => + AdEventsParser.extract_impression_auction_id(event) match { + case Some(auctionId) => send(auctionId, event, collector, IMP_OUTPUT_STREAM) + case None => collector.send(new OutgoingMessageEnvelope(IMP_ERROR_STREAM, event)) + } + case "bid-raw" => + AdEventsParser.extract_bid_auction_id(event) match { + case Some(auctionId) => send(auctionId, event, collector, BID_OUTPUT_STREAM) + case None => collector.send(new OutgoingMessageEnvelope(BID_ERROR_STREAM, event)) + } + case notSupportedStream => + throw new RuntimeException(s"Not supported stream: $notSupportedStream") + } + } +} + +object HelloMagneticSamza { + def main(args: Array[String]): Unit = { + val t = new MagneticEventsFeedStreamTask() + val system = new SystemStream("bla", "imp-raw") + val partition = new SystemStreamPartition(system, new Partition(0)) + val envelope = new IncomingMessageEnvelope(partition, "", "key", "msg") + val collector = new MessageCollector() { + def send(envelope: OutgoingMessageEnvelope) { + println("Send method called:" + envelope.getSystemStream().getStream()) + } + } + val coordinator = new TaskCoordinator() { + def commit(requestScope: RequestScope) { + println("Commit method called") + } + + def shutdown(requestScope: RequestScope) { + System.out.println("Shautdown method called") + } + } + t.process(envelope, collector, coordinator) + } +} \ No newline at end of file From 3c9e03f3430282855013badcb0e7e36fa0202d79 Mon Sep 17 00:00:00 2001 From: Stanislav Los Date: Wed, 16 Dec 2015 11:05:49 -0500 Subject: [PATCH 04/11] minor cleanup --- .../streaming/samza/MagneticEventsFeedStreamTask.scala | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/main/scala/com/magnetic/streaming/samza/MagneticEventsFeedStreamTask.scala b/src/main/scala/com/magnetic/streaming/samza/MagneticEventsFeedStreamTask.scala index 39f5b94f..d4111eea 100644 --- a/src/main/scala/com/magnetic/streaming/samza/MagneticEventsFeedStreamTask.scala +++ b/src/main/scala/com/magnetic/streaming/samza/MagneticEventsFeedStreamTask.scala @@ -1,4 +1,4 @@ -package com.magnetic.streaming.samza; +package com.magnetic.streaming.samza import com.magnetic.streaming.common.AdEventsParser import org.apache.samza.Partition @@ -55,7 +55,7 @@ object HelloMagneticSamza { val envelope = new IncomingMessageEnvelope(partition, "", "key", "msg") val collector = new MessageCollector() { def send(envelope: OutgoingMessageEnvelope) { - println("Send method called:" + envelope.getSystemStream().getStream()) + println("Send method called:" + envelope.getSystemStream.getStream) } } val coordinator = new TaskCoordinator() { From d870779b70b7968d8a67439f8e6032a134fa349a Mon Sep 17 00:00:00 2001 From: Stanislav Los Date: Wed, 16 Dec 2015 15:08:02 -0500 Subject: [PATCH 05/11] join works --- src/main/assembly/src.xml | 5 ++ src/main/config/magnetic-feed.properties | 1 + src/main/config/magnetic-join.properties | 40 +++++++++++++ src/main/scala/README.md | 15 ++++- .../samza/MagneticEventsFeedStreamTask.scala | 56 +++++++++---------- .../samza/MagneticJoinStreamsTask.scala | 45 +++++++++++++++ 6 files changed, 133 insertions(+), 29 deletions(-) create mode 100644 src/main/config/magnetic-join.properties create mode 100644 src/main/scala/com/magnetic/streaming/samza/MagneticJoinStreamsTask.scala diff --git a/src/main/assembly/src.xml b/src/main/assembly/src.xml index b659db08..17373026 100644 --- a/src/main/assembly/src.xml +++ b/src/main/assembly/src.xml @@ -41,6 +41,11 @@ config true + + ${basedir}/src/main/config/magnetic-join.properties + config + true + ${basedir}/src/main/config/wikipedia-feed.properties config diff --git a/src/main/config/magnetic-feed.properties b/src/main/config/magnetic-feed.properties index fa84e1ad..464f55d3 100644 --- a/src/main/config/magnetic-feed.properties +++ b/src/main/config/magnetic-feed.properties @@ -15,6 +15,7 @@ serializers.registry.string.class=org.apache.samza.serializers.StringSerdeFactor # Kafka System systems.kafka.samza.factory=org.apache.samza.system.kafka.KafkaSystemFactory +systems.kafka.samza.key.serde=string systems.kafka.samza.msg.serde=string systems.kafka.consumer.zookeeper.connect=localhost:2181/ systems.kafka.producer.bootstrap.servers=localhost:9092 diff --git a/src/main/config/magnetic-join.properties b/src/main/config/magnetic-join.properties new file mode 100644 index 00000000..81509aca --- /dev/null +++ b/src/main/config/magnetic-join.properties @@ -0,0 +1,40 @@ +# Job +job.factory.class=org.apache.samza.job.yarn.YarnJobFactory +job.name=magnetic-join + +# YARN +yarn.package.path=file://${basedir}/target/${project.artifactId}-${pom.version}-dist.tar.gz + +# Task +task.class=com.magnetic.streaming.samza.MagneticJoinStreamsTask +task.inputs=kafka.imp-raw-partitioned,kafka.bid-raw-partitioned + +# Serializers +serializers.registry.json.class=org.apache.samza.serializers.JsonSerdeFactory +serializers.registry.string.class=org.apache.samza.serializers.StringSerdeFactory + +# Kafka System +systems.kafka.samza.factory=org.apache.samza.system.kafka.KafkaSystemFactory +systems.kafka.samza.key.serde=string +systems.kafka.samza.msg.serde=string +systems.kafka.consumer.zookeeper.connect=localhost:2181/ +systems.kafka.producer.bootstrap.servers=localhost:9092 + +# KV Store +stores.imp-store.factory=org.apache.samza.storage.kv.RocksDbKeyValueStorageEngineFactory +stores.imp-store.changelog=kafka.imp-store-changelog +stores.imp-store.changelog.replication.factor=1 +stores.imp-store.key.serde=string +stores.imp-store.msg.serde=string + +stores.bid-store.factory=org.apache.samza.storage.kv.RocksDbKeyValueStorageEngineFactory +stores.bid-store.changelog=kafka.bid-store-changelog +stores.bid-store.changelog.replication.factor=1 +stores.bid-store.key.serde=string +stores.bid-store.msg.serde=string + +# Job Coordinator +job.coordinator.system=kafka +# Add configuration to disable checkpointing for this job once it is available in the Coordinator Stream model +# See https://issues.apache.org/jira/browse/SAMZA-465?focusedCommentId=14533346&page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel#comment-14533346 for more details +job.coordinator.replication.factor=1 diff --git a/src/main/scala/README.md b/src/main/scala/README.md index 53f5a318..f5b5f420 100644 --- a/src/main/scala/README.md +++ b/src/main/scala/README.md @@ -8,6 +8,9 @@ sh deploy/kafka/bin/kafka-topics.sh --create --topic imp-raw-partitioned --zooke sh deploy/kafka/bin/kafka-topics.sh --create --topic bid-raw-partitioned --zookeeper localhost:2181 --partitions 4 --replication 1 sh deploy/kafka/bin/kafka-topics.sh --create --topic imp-error --zookeeper localhost:2181 --partitions 1 --replication 1 sh deploy/kafka/bin/kafka-topics.sh --create --topic bid-error --zookeeper localhost:2181 --partitions 1 --replication 1 +sh deploy/kafka/bin/kafka-topics.sh --create --topic imp-bid-joined --zookeeper localhost:2181 --partitions 1 --replication 1 +sh deploy/kafka/bin/kafka-topics.sh --create --topic imp-store-changelog --zookeeper localhost:2181 --partitions 4 --replication 1 +sh deploy/kafka/bin/kafka-topics.sh --create --topic bid-store-changelog --zookeeper localhost:2181 --partitions 4 --replication 1 sh deploy/kafka/bin/kafka-topics.sh --list --zookeeper localhost:2181 after that: @@ -20,6 +23,10 @@ tar -xvf ./target/hello-samza-0.10.0-dist.tar.gz -C deploy/samza deploy/samza/bin/run-job.sh --config-factory=org.apache.samza.config.factories.PropertiesConfigFactory \ --config-path=file://$PWD/deploy/samza/config/magnetic-feed.properties +deploy/samza/bin/run-job.sh --config-factory=org.apache.samza.config.factories.PropertiesConfigFactory \ + --config-path=file://$PWD/deploy/samza/config/magnetic-join.properties + +# Logs can be found: tail -100f deploy/yarn/logs/userlogs/application_XXXXXXXXXX_XXXX/container_XXXXXXXXXX_XXXX_XX_XXXXXX/{logs} sh deploy/kafka/bin/kafka-console-producer.sh --broker-list localhost:9092 --topic imp-raw @@ -29,6 +36,12 @@ sh deploy/kafka/bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topi sh deploy/kafka/bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic bid-raw-partitioned sh deploy/kafka/bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic imp-error sh deploy/kafka/bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic bid-error +sh deploy/kafka/bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic imp-bid-join bin/grid stop all -``` \ No newline at end of file +``` + +ad log event: + V=magnetic.domdex.com t=[10/Dec/2015:00:00:00 +0000] a= u=- c=c4706fc6df6f48b683d6aca71863f99f m=GET l=/ahtm q=js=t&r=c&b=39634&c=57391&n=9468&id=650e33b95a1449705601&sz=728x90&s=onetravel.com&u=c4706fc6df6f48b683d6aca71863f99f&f=1&cat=00-00&ms=558536&kw=&kwcat=&dp=&a=VmjAfwAOX7AUNL2pBW_4_aHw4x_o6q1Wy3wCYA s=200 b=2849 r=http://www.onetravel.com/ a0=2601:346:404:4e50:b090:77f3:4343:fbc1 ua=Mozilla/4.0 (compatible; MSIE 8.0; Windows NT 6.1; WOW64; Trident/4.0; SLCC2; .NET CLR 2.0.50727; .NET CLR 3.5.30729; .NET CLR 3.0.30729; Media Center PC 6.0; .NET4.0C; .NET4.0E; InfoPath.3) d=1570 rpt=ahtm x= +bid log event: +1449705600 45799578e9064ca5b4e87af2aba77092 161.253.120.255 US 511 DC 20016 America/New_York 650e33b95a1449705601 5668c08c0008a5e80a1f1acb6c0f76fa g 1 thegradcafe.com 728x90 1 00-00 1054641 9115 54663 38227 54663,52593,51249,51246,55928,50856,46309,52454,32235,50944 Mozilla/5.0 (Macintosh; Intel Mac OS X 10_10_5) AppleWebKit/601.3.9 (KHTML, like Gecko) Version/9.0.2 Safari/601.3.9 http://thegradcafe.com/survey/index.php 1 875 1000 1000 en iab_tech 85 85 45 6500 45 15 25000 1000 600 1000 1000 1000 1000 1000 magnetic_ctr_variable_price 1.0 2.64151881627e-05 2015120920 -4.05492019653 2015120920 Safari Mac_OS_X 00-00 0 1 70 n_a diff --git a/src/main/scala/com/magnetic/streaming/samza/MagneticEventsFeedStreamTask.scala b/src/main/scala/com/magnetic/streaming/samza/MagneticEventsFeedStreamTask.scala index d4111eea..6b0e50fa 100644 --- a/src/main/scala/com/magnetic/streaming/samza/MagneticEventsFeedStreamTask.scala +++ b/src/main/scala/com/magnetic/streaming/samza/MagneticEventsFeedStreamTask.scala @@ -12,39 +12,39 @@ import org.apache.samza.task.TaskCoordinator import org.apache.samza.task.TaskCoordinator.RequestScope class MagneticEventsFeedStreamTask extends StreamTask { - val NUM_PARTITIONS = 4 - val IMP_OUTPUT_STREAM = new SystemStream("kafka", "imp-raw-partitioned") - val BID_OUTPUT_STREAM = new SystemStream("kafka", "bid-raw-partitioned") - val IMP_ERROR_STREAM = new SystemStream("kafka", "imp-error") - val BID_ERROR_STREAM = new SystemStream("kafka", "bid-error") + val NUM_PARTITIONS = 4 + val IMP_OUTPUT_STREAM = new SystemStream("kafka", "imp-raw-partitioned") + val BID_OUTPUT_STREAM = new SystemStream("kafka", "bid-raw-partitioned") + val IMP_ERROR_STREAM = new SystemStream("kafka", "imp-error") + val BID_ERROR_STREAM = new SystemStream("kafka", "bid-error") - def getPartitionKey(key: String) = { - key.hashCode() % NUM_PARTITIONS - } + def getPartitionKey(key: String) = { + key.hashCode() % NUM_PARTITIONS + } - def send(auctionId: String, event: String, collector: MessageCollector, system: SystemStream) { - collector.send( - new OutgoingMessageEnvelope(system, getPartitionKey(auctionId), auctionId, event) - ) - } + def send(auctionId: String, event: String, collector: MessageCollector, system: SystemStream) { + collector.send( + new OutgoingMessageEnvelope(system, getPartitionKey(auctionId), auctionId, event) + ) + } - def process(envelope: IncomingMessageEnvelope, collector: MessageCollector, coordinator: TaskCoordinator) { - val event = envelope.getMessage.asInstanceOf[String] - envelope.getSystemStreamPartition.getSystemStream.getStream match { - case "imp-raw" => - AdEventsParser.extract_impression_auction_id(event) match { - case Some(auctionId) => send(auctionId, event, collector, IMP_OUTPUT_STREAM) - case None => collector.send(new OutgoingMessageEnvelope(IMP_ERROR_STREAM, event)) - } - case "bid-raw" => - AdEventsParser.extract_bid_auction_id(event) match { - case Some(auctionId) => send(auctionId, event, collector, BID_OUTPUT_STREAM) - case None => collector.send(new OutgoingMessageEnvelope(BID_ERROR_STREAM, event)) - } - case notSupportedStream => - throw new RuntimeException(s"Not supported stream: $notSupportedStream") + override def process(envelope: IncomingMessageEnvelope, collector: MessageCollector, coordinator: TaskCoordinator) { + val event = envelope.getMessage.asInstanceOf[String] + envelope.getSystemStreamPartition.getSystemStream.getStream match { + case "imp-raw" => + AdEventsParser.extract_impression_auction_id(event) match { + case Some(auctionId) => send(auctionId, event, collector, IMP_OUTPUT_STREAM) + case None => collector.send(new OutgoingMessageEnvelope(IMP_ERROR_STREAM, event)) + } + case "bid-raw" => + AdEventsParser.extract_bid_auction_id(event) match { + case Some(auctionId) => send(auctionId, event, collector, BID_OUTPUT_STREAM) + case None => collector.send(new OutgoingMessageEnvelope(BID_ERROR_STREAM, event)) } + case notSupportedStream => + throw new RuntimeException(s"Not supported stream: $notSupportedStream") } + } } object HelloMagneticSamza { diff --git a/src/main/scala/com/magnetic/streaming/samza/MagneticJoinStreamsTask.scala b/src/main/scala/com/magnetic/streaming/samza/MagneticJoinStreamsTask.scala new file mode 100644 index 00000000..c2d32bfd --- /dev/null +++ b/src/main/scala/com/magnetic/streaming/samza/MagneticJoinStreamsTask.scala @@ -0,0 +1,45 @@ +package com.magnetic.streaming.samza + +import org.apache.samza.config.Config +import org.apache.samza.storage.kv.KeyValueStore +import org.apache.samza.system.{OutgoingMessageEnvelope, SystemStream, IncomingMessageEnvelope} +import org.apache.samza.task._ + +class MagneticJoinStreamsTask extends StreamTask with InitableTask { + + val OUTPUT_STREAM = new SystemStream("kafka", "imp-bid-joined") + var impStore: KeyValueStore[String, String] = null + var bidStore: KeyValueStore[String, String] = null + + override def init(config: Config, context: TaskContext) { + this.impStore = context.getStore("imp-store").asInstanceOf[KeyValueStore[String, String]] + this.bidStore = context.getStore("bid-store").asInstanceOf[KeyValueStore[String, String]] + } + + override def process(envelope: IncomingMessageEnvelope, collector: MessageCollector, coordinator: TaskCoordinator) { + val key = envelope.getKey.asInstanceOf[String] + val event = envelope.getMessage.asInstanceOf[String] + envelope.getSystemStreamPartition.getSystemStream.getStream match { + case "imp-raw-partitioned" => + Option(bidStore.get(key)) match { + case Some(bid) => + collector.send( + new OutgoingMessageEnvelope(OUTPUT_STREAM, key, s"{ raw_imp:$event, raw_bid:$bid }") + ) + bidStore.delete(key) + case None => impStore.put(key, event) + } + case "bid-raw-partitioned" => + Option(impStore.get(key)) match { + case Some(imp) => + collector.send( + new OutgoingMessageEnvelope(OUTPUT_STREAM, key, s"{raw_imp:$imp, raw_bid:$event }") + ) + impStore.delete(key) + case None => bidStore.put(key, event) + } + case notSupportedStream => + throw new RuntimeException(s"Not supported stream: $notSupportedStream") + } + } +} From b739882fc5a7dedd74020342d4c18294e244adbf Mon Sep 17 00:00:00 2001 From: Stanislav Los Date: Thu, 17 Dec 2015 09:29:22 -0500 Subject: [PATCH 06/11] added windowable task --- .../magnetic/streaming/samza/MagneticJoinStreamsTask.scala | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/src/main/scala/com/magnetic/streaming/samza/MagneticJoinStreamsTask.scala b/src/main/scala/com/magnetic/streaming/samza/MagneticJoinStreamsTask.scala index c2d32bfd..7f7575f5 100644 --- a/src/main/scala/com/magnetic/streaming/samza/MagneticJoinStreamsTask.scala +++ b/src/main/scala/com/magnetic/streaming/samza/MagneticJoinStreamsTask.scala @@ -5,7 +5,7 @@ import org.apache.samza.storage.kv.KeyValueStore import org.apache.samza.system.{OutgoingMessageEnvelope, SystemStream, IncomingMessageEnvelope} import org.apache.samza.task._ -class MagneticJoinStreamsTask extends StreamTask with InitableTask { +class MagneticJoinStreamsTask extends StreamTask with InitableTask with WindowableTask { val OUTPUT_STREAM = new SystemStream("kafka", "imp-bid-joined") var impStore: KeyValueStore[String, String] = null @@ -42,4 +42,8 @@ class MagneticJoinStreamsTask extends StreamTask with InitableTask { throw new RuntimeException(s"Not supported stream: $notSupportedStream") } } + + override def window(messageCollector: MessageCollector, taskCoordinator: TaskCoordinator) { + + } } From 024b1ed6d63bd66da1cda89942ce5397074e1875 Mon Sep 17 00:00:00 2001 From: Stanislav Los Date: Thu, 17 Dec 2015 15:04:03 -0500 Subject: [PATCH 07/11] converted to json --- src/main/config/magnetic-feed.properties | 8 +-- src/main/config/magnetic-join.properties | 10 ++-- src/main/scala/README.md | 8 +-- .../streaming/common/AdEventsParser.scala | 59 +++++++++++-------- .../streaming/common/ParseHelpers.scala | 7 ++- .../samza/MagneticEventsFeedStreamTask.scala | 34 +++++++---- .../samza/MagneticJoinStreamsTask.scala | 39 ++++++++---- 7 files changed, 102 insertions(+), 63 deletions(-) diff --git a/src/main/config/magnetic-feed.properties b/src/main/config/magnetic-feed.properties index 464f55d3..ca62a888 100644 --- a/src/main/config/magnetic-feed.properties +++ b/src/main/config/magnetic-feed.properties @@ -13,15 +13,15 @@ task.inputs=kafka.imp-raw,kafka.bid-raw serializers.registry.json.class=org.apache.samza.serializers.JsonSerdeFactory serializers.registry.string.class=org.apache.samza.serializers.StringSerdeFactory -# Kafka System +# Kafka Systems systems.kafka.samza.factory=org.apache.samza.system.kafka.KafkaSystemFactory systems.kafka.samza.key.serde=string -systems.kafka.samza.msg.serde=string +systems.kafka.samza.msg.serde=json +systems.kafka.streams.imp-raw.samza.msg.serde=string +systems.kafka.streams.bid-raw.samza.msg.serde=string systems.kafka.consumer.zookeeper.connect=localhost:2181/ systems.kafka.producer.bootstrap.servers=localhost:9092 # Job Coordinator job.coordinator.system=kafka -# Add configuration to disable checkpointing for this job once it is available in the Coordinator Stream model -# See https://issues.apache.org/jira/browse/SAMZA-465?focusedCommentId=14533346&page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel#comment-14533346 for more details job.coordinator.replication.factor=1 diff --git a/src/main/config/magnetic-join.properties b/src/main/config/magnetic-join.properties index 81509aca..3d6fbde3 100644 --- a/src/main/config/magnetic-join.properties +++ b/src/main/config/magnetic-join.properties @@ -7,7 +7,7 @@ yarn.package.path=file://${basedir}/target/${project.artifactId}-${pom.version}- # Task task.class=com.magnetic.streaming.samza.MagneticJoinStreamsTask -task.inputs=kafka.imp-raw-partitioned,kafka.bid-raw-partitioned +task.inputs=kafka.imp-meta,kafka.bid-meta # Serializers serializers.registry.json.class=org.apache.samza.serializers.JsonSerdeFactory @@ -16,7 +16,7 @@ serializers.registry.string.class=org.apache.samza.serializers.StringSerdeFactor # Kafka System systems.kafka.samza.factory=org.apache.samza.system.kafka.KafkaSystemFactory systems.kafka.samza.key.serde=string -systems.kafka.samza.msg.serde=string +systems.kafka.samza.msg.serde=json systems.kafka.consumer.zookeeper.connect=localhost:2181/ systems.kafka.producer.bootstrap.servers=localhost:9092 @@ -25,16 +25,14 @@ stores.imp-store.factory=org.apache.samza.storage.kv.RocksDbKeyValueStorageEngin stores.imp-store.changelog=kafka.imp-store-changelog stores.imp-store.changelog.replication.factor=1 stores.imp-store.key.serde=string -stores.imp-store.msg.serde=string +stores.imp-store.msg.serde=json stores.bid-store.factory=org.apache.samza.storage.kv.RocksDbKeyValueStorageEngineFactory stores.bid-store.changelog=kafka.bid-store-changelog stores.bid-store.changelog.replication.factor=1 stores.bid-store.key.serde=string -stores.bid-store.msg.serde=string +stores.bid-store.msg.serde=json # Job Coordinator job.coordinator.system=kafka -# Add configuration to disable checkpointing for this job once it is available in the Coordinator Stream model -# See https://issues.apache.org/jira/browse/SAMZA-465?focusedCommentId=14533346&page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel#comment-14533346 for more details job.coordinator.replication.factor=1 diff --git a/src/main/scala/README.md b/src/main/scala/README.md index f5b5f420..5dad2af2 100644 --- a/src/main/scala/README.md +++ b/src/main/scala/README.md @@ -4,8 +4,8 @@ bin/grid bootstrap sh deploy/kafka/bin/kafka-topics.sh --create --topic imp-raw --zookeeper localhost:2181 --partitions 1 --replication 1 sh deploy/kafka/bin/kafka-topics.sh --create --topic bid-raw --zookeeper localhost:2181 --partitions 1 --replication 1 -sh deploy/kafka/bin/kafka-topics.sh --create --topic imp-raw-partitioned --zookeeper localhost:2181 --partitions 4 --replication 1 -sh deploy/kafka/bin/kafka-topics.sh --create --topic bid-raw-partitioned --zookeeper localhost:2181 --partitions 4 --replication 1 +sh deploy/kafka/bin/kafka-topics.sh --create --topic imp-meta --zookeeper localhost:2181 --partitions 4 --replication 1 +sh deploy/kafka/bin/kafka-topics.sh --create --topic bid-meta --zookeeper localhost:2181 --partitions 4 --replication 1 sh deploy/kafka/bin/kafka-topics.sh --create --topic imp-error --zookeeper localhost:2181 --partitions 1 --replication 1 sh deploy/kafka/bin/kafka-topics.sh --create --topic bid-error --zookeeper localhost:2181 --partitions 1 --replication 1 sh deploy/kafka/bin/kafka-topics.sh --create --topic imp-bid-joined --zookeeper localhost:2181 --partitions 1 --replication 1 @@ -32,8 +32,8 @@ tail -100f deploy/yarn/logs/userlogs/application_XXXXXXXXXX_XXXX/container_XXXXX sh deploy/kafka/bin/kafka-console-producer.sh --broker-list localhost:9092 --topic imp-raw sh deploy/kafka/bin/kafka-console-producer.sh --broker-list localhost:9092 --topic bid-raw -sh deploy/kafka/bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic imp-raw-partitioned -sh deploy/kafka/bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic bid-raw-partitioned +sh deploy/kafka/bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic imp-meta +sh deploy/kafka/bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic bid-meta sh deploy/kafka/bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic imp-error sh deploy/kafka/bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic bid-error sh deploy/kafka/bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic imp-bid-join diff --git a/src/main/scala/com/magnetic/streaming/common/AdEventsParser.scala b/src/main/scala/com/magnetic/streaming/common/AdEventsParser.scala index 8495a215..5e4dd81e 100644 --- a/src/main/scala/com/magnetic/streaming/common/AdEventsParser.scala +++ b/src/main/scala/com/magnetic/streaming/common/AdEventsParser.scala @@ -13,7 +13,8 @@ object AdEventsParser { val RETARGETING = "rpx" val RETARCOUNT = "rcount" val ERROR = "error" - val SKIP = "skip" + val SKIP = "skip" + val BID = "bid" val ACLK_PATH_QS = "/aclk/(\\w+=.*)/qe=".r val VALID_AUCTION_ID_PATTERN = "[\\-\\.\\w\\d]{6,50}".r @@ -26,7 +27,7 @@ object AdEventsParser { } } - def parse_common(fields: Map[String, String]): (String, String, String, Map[String, String], String) = { + def parse_common(fields: Map[String, String]): (String, Long, String, Map[String, String], String) = { val timestamp = parse_timestamp(fields.getOrElse("t", "")) val querystring = parse_querystring(fields) @@ -49,40 +50,50 @@ object AdEventsParser { line.split("\t").withFilter(_.contains("=")).map(split_one(_)).toMap } - def extract_auction_id(querystring: Map[String, String]): Option[String] = { - val network_id = _int(querystring.getOrElse("mp", MAGNETIC_NETWORK_ID.toString)) - - if (network_id == MAGNETIC_NETWORK_ID) - Some(querystring("id")) - else - VALID_AUCTION_ID_PATTERN.findFirstIn(querystring.getOrElse("id", "")) - } - def should_skip_impression(fields: Map[String, String], querystring: Map[String, String]): Boolean = { false //TODO implement } - def extract_impression_auction_id(line: String): Option[String] = { + def parse_imp_meta(line: String): Map[String, Any] = { val fields = parse_fields(line) val (event_type, timestamp, cookie, querystring, pixel_id) = parse_common(fields) event_type match { case IMPRESSION => if (should_skip_impression(fields, querystring)) - None - else - extract_auction_id(querystring) + throw new RuntimeException("Should skip impression") + else { + val network_id = _int(querystring.getOrElse("mp", MAGNETIC_NETWORK_ID.toString)) + + val auction_id = { + if (network_id == MAGNETIC_NETWORK_ID) + querystring("id") + else + VALID_AUCTION_ID_PATTERN.findFirstIn(querystring.getOrElse("id", "")) match { + case Some(x) => querystring("id") + case _ => s"fake-$cookie-$timestamp" + } + } + val d = scala.collection.mutable.Map[String, Any]() + d("event_type") = IMPRESSION + d("auction_id") = auction_id + d("log_timestamp") = timestamp + d("imp_log_line") = line + d.toMap + } case _ => - None + throw new RuntimeException("Not impression: " + event_type) } } - def extract_bid_auction_id(line: String): Option[String] = { - try { - val auction_id = line.split("\t", 10)(8) - Some(auction_id) - } - catch { - case t: Throwable => None - } + def parse_bid_meta(line: String): Map[String, Any] = { + val fields = line.split("\t", 10) + val auction_id = fields(8) + val timestamp = fields(0).toLong + val d = scala.collection.mutable.Map[String, Any]() + d("event_type") = BID + d("auction_id") = auction_id + d("log_timestamp") = timestamp + d("bid_log_line") = line + d.toMap } } diff --git a/src/main/scala/com/magnetic/streaming/common/ParseHelpers.scala b/src/main/scala/com/magnetic/streaming/common/ParseHelpers.scala index aa12c24e..1e2cee10 100644 --- a/src/main/scala/com/magnetic/streaming/common/ParseHelpers.scala +++ b/src/main/scala/com/magnetic/streaming/common/ParseHelpers.scala @@ -2,6 +2,7 @@ package com.magnetic.streaming.common import java.net.{URL, URLDecoder} import java.text.SimpleDateFormat +import java.util.TimeZone import scala.util.Try @@ -22,10 +23,10 @@ object ParseHelpers { } } - def parse_timestamp(raw_timestamp: String): String = { + def parse_timestamp(raw_timestamp: String): Long = { Try {new SimpleDateFormat("[dd/MMM/yyyy:HH:mm:ss Z]").parse(raw_timestamp)}.toOption match { - case Some(t) => (t.getTime/1000).toString - case _ => "0" + case Some(t) => t.getTime/1000 + case _ => 0 } } diff --git a/src/main/scala/com/magnetic/streaming/samza/MagneticEventsFeedStreamTask.scala b/src/main/scala/com/magnetic/streaming/samza/MagneticEventsFeedStreamTask.scala index 6b0e50fa..a90f0fd6 100644 --- a/src/main/scala/com/magnetic/streaming/samza/MagneticEventsFeedStreamTask.scala +++ b/src/main/scala/com/magnetic/streaming/samza/MagneticEventsFeedStreamTask.scala @@ -1,5 +1,6 @@ package com.magnetic.streaming.samza +import scala.collection.JavaConversions._ import com.magnetic.streaming.common.AdEventsParser import org.apache.samza.Partition import org.apache.samza.system.IncomingMessageEnvelope @@ -11,10 +12,12 @@ import org.apache.samza.task.StreamTask import org.apache.samza.task.TaskCoordinator import org.apache.samza.task.TaskCoordinator.RequestScope +import scala.util.{Failure, Success, Try} + class MagneticEventsFeedStreamTask extends StreamTask { val NUM_PARTITIONS = 4 - val IMP_OUTPUT_STREAM = new SystemStream("kafka", "imp-raw-partitioned") - val BID_OUTPUT_STREAM = new SystemStream("kafka", "bid-raw-partitioned") + val IMP_OUTPUT_STREAM = new SystemStream("kafka", "imp-meta") + val BID_OUTPUT_STREAM = new SystemStream("kafka", "bid-meta") val IMP_ERROR_STREAM = new SystemStream("kafka", "imp-error") val BID_ERROR_STREAM = new SystemStream("kafka", "bid-error") @@ -22,24 +25,32 @@ class MagneticEventsFeedStreamTask extends StreamTask { key.hashCode() % NUM_PARTITIONS } - def send(auctionId: String, event: String, collector: MessageCollector, system: SystemStream) { + def send(event: Map[String, Any], collector: MessageCollector, system: SystemStream) { + val auctionId = event("auction_id").asInstanceOf[String] + collector.send( + new OutgoingMessageEnvelope(system, getPartitionKey(auctionId), auctionId, mapAsJavaMap(event)) + ) + } + + def sendError(rawEvent: String, ex: Throwable, collector: MessageCollector, system: SystemStream) { + val error = Map("event_type" -> AdEventsParser.ERROR, "log_line" -> rawEvent, "exception" -> ex.getMessage) collector.send( - new OutgoingMessageEnvelope(system, getPartitionKey(auctionId), auctionId, event) + new OutgoingMessageEnvelope(system, mapAsJavaMap(error)) ) } override def process(envelope: IncomingMessageEnvelope, collector: MessageCollector, coordinator: TaskCoordinator) { - val event = envelope.getMessage.asInstanceOf[String] + val rawEvent = envelope.getMessage.asInstanceOf[String] envelope.getSystemStreamPartition.getSystemStream.getStream match { case "imp-raw" => - AdEventsParser.extract_impression_auction_id(event) match { - case Some(auctionId) => send(auctionId, event, collector, IMP_OUTPUT_STREAM) - case None => collector.send(new OutgoingMessageEnvelope(IMP_ERROR_STREAM, event)) + Try(AdEventsParser.parse_imp_meta(rawEvent)) match { + case Success(metaEvent) => send(metaEvent, collector, IMP_OUTPUT_STREAM) + case Failure(exception) => sendError(rawEvent, exception, collector, IMP_ERROR_STREAM) } case "bid-raw" => - AdEventsParser.extract_bid_auction_id(event) match { - case Some(auctionId) => send(auctionId, event, collector, BID_OUTPUT_STREAM) - case None => collector.send(new OutgoingMessageEnvelope(BID_ERROR_STREAM, event)) + Try(AdEventsParser.parse_bid_meta(rawEvent)) match { + case Success(metaEvent) => send(metaEvent, collector, BID_OUTPUT_STREAM) + case Failure(exception) => sendError(rawEvent, exception, collector, BID_ERROR_STREAM) } case notSupportedStream => throw new RuntimeException(s"Not supported stream: $notSupportedStream") @@ -56,6 +67,7 @@ object HelloMagneticSamza { val collector = new MessageCollector() { def send(envelope: OutgoingMessageEnvelope) { println("Send method called:" + envelope.getSystemStream.getStream) + println("Message:" + envelope.getMessage) } } val coordinator = new TaskCoordinator() { diff --git a/src/main/scala/com/magnetic/streaming/samza/MagneticJoinStreamsTask.scala b/src/main/scala/com/magnetic/streaming/samza/MagneticJoinStreamsTask.scala index 7f7575f5..5e2c96d7 100644 --- a/src/main/scala/com/magnetic/streaming/samza/MagneticJoinStreamsTask.scala +++ b/src/main/scala/com/magnetic/streaming/samza/MagneticJoinStreamsTask.scala @@ -1,5 +1,7 @@ package com.magnetic.streaming.samza +import scala.collection.JavaConversions._ +import com.magnetic.streaming.common.AdEventsParser.IMPRESSION import org.apache.samza.config.Config import org.apache.samza.storage.kv.KeyValueStore import org.apache.samza.system.{OutgoingMessageEnvelope, SystemStream, IncomingMessageEnvelope} @@ -8,35 +10,50 @@ import org.apache.samza.task._ class MagneticJoinStreamsTask extends StreamTask with InitableTask with WindowableTask { val OUTPUT_STREAM = new SystemStream("kafka", "imp-bid-joined") - var impStore: KeyValueStore[String, String] = null - var bidStore: KeyValueStore[String, String] = null + var impStore: KeyValueStore[String, java.util.Map[String, Any]] = null + var bidStore: KeyValueStore[String, java.util.Map[String, Any]] = null override def init(config: Config, context: TaskContext) { - this.impStore = context.getStore("imp-store").asInstanceOf[KeyValueStore[String, String]] - this.bidStore = context.getStore("bid-store").asInstanceOf[KeyValueStore[String, String]] + this.impStore = context.getStore("imp-store").asInstanceOf[KeyValueStore[String, java.util.Map[String, Any]]] + this.bidStore = context.getStore("bid-store").asInstanceOf[KeyValueStore[String, java.util.Map[String, Any]]] + } + + def buildJoinedEvent(metaImp:Map[String,Any], metaBid:Map[String,Any]):Map[String,Any] = { + val d = scala.collection.mutable.Map[String, Any]() + d("event_type") = IMPRESSION + d("auction_id") = metaImp("auction_id") + d("log_timestamp") = metaImp("log_timestamp") + d("imp_log_line") = metaImp("imp_log_line") + d("bid_log_line") = metaBid("bid_log_line") + d.toMap } override def process(envelope: IncomingMessageEnvelope, collector: MessageCollector, coordinator: TaskCoordinator) { val key = envelope.getKey.asInstanceOf[String] - val event = envelope.getMessage.asInstanceOf[String] + println(envelope.getMessage) + val event = mapAsScalaMap(envelope.getMessage.asInstanceOf[java.util.Map[String,Any]]).toMap envelope.getSystemStreamPartition.getSystemStream.getStream match { - case "imp-raw-partitioned" => + case "imp-meta" => Option(bidStore.get(key)) match { case Some(bid) => collector.send( - new OutgoingMessageEnvelope(OUTPUT_STREAM, key, s"{ raw_imp:$event, raw_bid:$bid }") + new OutgoingMessageEnvelope( + OUTPUT_STREAM, key, mapAsJavaMap(buildJoinedEvent(event, mapAsScalaMap(bid).toMap)) + ) ) bidStore.delete(key) - case None => impStore.put(key, event) + case None => impStore.put(key, mapAsJavaMap(event)) } - case "bid-raw-partitioned" => + case "bid-meta" => Option(impStore.get(key)) match { case Some(imp) => collector.send( - new OutgoingMessageEnvelope(OUTPUT_STREAM, key, s"{raw_imp:$imp, raw_bid:$event }") + new OutgoingMessageEnvelope( + OUTPUT_STREAM, key, mapAsJavaMap(buildJoinedEvent(mapAsScalaMap(imp).toMap, event)) + ) ) impStore.delete(key) - case None => bidStore.put(key, event) + case None => bidStore.put(key, mapAsJavaMap(event)) } case notSupportedStream => throw new RuntimeException(s"Not supported stream: $notSupportedStream") From 98401ecd43c8f6f82a81962ea97d5d05db6d71ea Mon Sep 17 00:00:00 2001 From: Stanislav Los Date: Thu, 17 Dec 2015 16:08:15 -0500 Subject: [PATCH 08/11] Cleanup works! --- src/main/config/magnetic-join.properties | 2 ++ src/main/scala/README.md | 34 +++++++++++++++---- .../samza/MagneticJoinStreamsTask.scala | 18 ++++++++-- 3 files changed, 45 insertions(+), 9 deletions(-) diff --git a/src/main/config/magnetic-join.properties b/src/main/config/magnetic-join.properties index 3d6fbde3..7a539258 100644 --- a/src/main/config/magnetic-join.properties +++ b/src/main/config/magnetic-join.properties @@ -8,6 +8,8 @@ yarn.package.path=file://${basedir}/target/${project.artifactId}-${pom.version}- # Task task.class=com.magnetic.streaming.samza.MagneticJoinStreamsTask task.inputs=kafka.imp-meta,kafka.bid-meta +# Call the window() method every hour (actual window size defined in window method) +task.window.ms=3600000 # Serializers serializers.registry.json.class=org.apache.samza.serializers.JsonSerdeFactory diff --git a/src/main/scala/README.md b/src/main/scala/README.md index 5dad2af2..0ee55425 100644 --- a/src/main/scala/README.md +++ b/src/main/scala/README.md @@ -1,7 +1,8 @@ ``` -for the first time: +# For the first time run to set up Samza environment bin/grid bootstrap +# Create Kafka topics (some of them created automatically, but here we do it explicitly) sh deploy/kafka/bin/kafka-topics.sh --create --topic imp-raw --zookeeper localhost:2181 --partitions 1 --replication 1 sh deploy/kafka/bin/kafka-topics.sh --create --topic bid-raw --zookeeper localhost:2181 --partitions 1 --replication 1 sh deploy/kafka/bin/kafka-topics.sh --create --topic imp-meta --zookeeper localhost:2181 --partitions 4 --replication 1 @@ -13,9 +14,10 @@ sh deploy/kafka/bin/kafka-topics.sh --create --topic imp-store-changelog --zooke sh deploy/kafka/bin/kafka-topics.sh --create --topic bid-store-changelog --zookeeper localhost:2181 --partitions 4 --replication 1 sh deploy/kafka/bin/kafka-topics.sh --list --zookeeper localhost:2181 -after that: +# After that start the grid bin/grid start all +# Build and deploy project mvn clean package rm -rf deploy/samza mkdir -p deploy/samza @@ -29,19 +31,37 @@ deploy/samza/bin/run-job.sh --config-factory=org.apache.samza.config.factories.P # Logs can be found: tail -100f deploy/yarn/logs/userlogs/application_XXXXXXXXXX_XXXX/container_XXXXXXXXXX_XXXX_XX_XXXXXX/{logs} +# Submit some ad logs sh deploy/kafka/bin/kafka-console-producer.sh --broker-list localhost:9092 --topic imp-raw + +# Copy paste ad log event(impression) + V=magnetic.domdex.com t=[10/Dec/2015:00:00:00 +0000] a= u=- c=c4706fc6df6f48b683d6aca71863f99f m=GET l=/ahtm q=js=t&r=c&b=39634&c=57391&n=9468&id=650e33b95a1449705601&sz=728x90&s=onetravel.com&u=c4706fc6df6f48b683d6aca71863f99f&f=1&cat=00-00&ms=558536&kw=&kwcat=&dp=&a=VmjAfwAOX7AUNL2pBW_4_aHw4x_o6q1Wy3wCYA s=200 b=2849 r=http://www.onetravel.com/ a0=2601:346:404:4e50:b090:77f3:4343:fbc1 ua=Mozilla/4.0 (compatible; MSIE 8.0; Windows NT 6.1; WOW64; Trident/4.0; SLCC2; .NET CLR 2.0.50727; .NET CLR 3.5.30729; .NET CLR 3.0.30729; Media Center PC 6.0; .NET4.0C; .NET4.0E; InfoPath.3) d=1570 rpt=ahtm x= + +# If matching bid log is not submitted, adding those impressions will lead to cleanup of previous one from imp-store +# when next time window() function runs (you can see this happen by tailing imp-store-changelog topic, it's delayed, so be patient) + V=magnetic.domdex.com t=[10/Dec/2015:01:00:01 +0000] a= u=- c=c4706fc6df6f48b683d6aca71863f99f m=GET l=/ahtm q=js=t&r=c&b=39634&c=57391&n=9468&id=650e33b95a1449705611&sz=728x90&s=onetravel.com&u=c4706fc6df6f48b683d6aca71863f99f&f=1&cat=00-00&ms=558536&kw=&kwcat=&dp=&a=VmjAfwAOX7AUNL2pBW_4_aHw4x_o6q1Wy3wCYA s=200 b=2849 r=http://www.onetravel.com/ a0=2601:346:404:4e50:b090:77f3:4343:fbc1 ua=Mozilla/4.0 (compatible; MSIE 8.0; Windows NT 6.1; WOW64; Trident/4.0; SLCC2; .NET CLR 2.0.50727; .NET CLR 3.5.30729; .NET CLR 3.0.30729; Media Center PC 6.0; .NET4.0C; .NET4.0E; InfoPath.3) d=1570 rpt=ahtm x= + V=magnetic.domdex.com t=[10/Dec/2015:01:00:01 +0000] a= u=- c=c4706fc6df6f48b683d6aca71863f99f m=GET l=/ahtm q=js=t&r=c&b=39634&c=57391&n=9468&id=650e33b95a1449705621&sz=728x90&s=onetravel.com&u=c4706fc6df6f48b683d6aca71863f99f&f=1&cat=00-00&ms=558536&kw=&kwcat=&dp=&a=VmjAfwAOX7AUNL2pBW_4_aHw4x_o6q1Wy3wCYA s=200 b=2849 r=http://www.onetravel.com/ a0=2601:346:404:4e50:b090:77f3:4343:fbc1 ua=Mozilla/4.0 (compatible; MSIE 8.0; Windows NT 6.1; WOW64; Trident/4.0; SLCC2; .NET CLR 2.0.50727; .NET CLR 3.5.30729; .NET CLR 3.0.30729; Media Center PC 6.0; .NET4.0C; .NET4.0E; InfoPath.3) d=1570 rpt=ahtm x= + V=magnetic.domdex.com t=[10/Dec/2015:01:00:01 +0000] a= u=- c=c4706fc6df6f48b683d6aca71863f99f m=GET l=/ahtm q=js=t&r=c&b=39634&c=57391&n=9468&id=650e33b95a1449705631&sz=728x90&s=onetravel.com&u=c4706fc6df6f48b683d6aca71863f99f&f=1&cat=00-00&ms=558536&kw=&kwcat=&dp=&a=VmjAfwAOX7AUNL2pBW_4_aHw4x_o6q1Wy3wCYA s=200 b=2849 r=http://www.onetravel.com/ a0=2601:346:404:4e50:b090:77f3:4343:fbc1 ua=Mozilla/4.0 (compatible; MSIE 8.0; Windows NT 6.1; WOW64; Trident/4.0; SLCC2; .NET CLR 2.0.50727; .NET CLR 3.5.30729; .NET CLR 3.0.30729; Media Center PC 6.0; .NET4.0C; .NET4.0E; InfoPath.3) d=1570 rpt=ahtm x= + V=magnetic.domdex.com t=[10/Dec/2015:01:00:01 +0000] a= u=- c=c4706fc6df6f48b683d6aca71863f99f m=GET l=/ahtm q=js=t&r=c&b=39634&c=57391&n=9468&id=650e33b95a1449705641&sz=728x90&s=onetravel.com&u=c4706fc6df6f48b683d6aca71863f99f&f=1&cat=00-00&ms=558536&kw=&kwcat=&dp=&a=VmjAfwAOX7AUNL2pBW_4_aHw4x_o6q1Wy3wCYA s=200 b=2849 r=http://www.onetravel.com/ a0=2601:346:404:4e50:b090:77f3:4343:fbc1 ua=Mozilla/4.0 (compatible; MSIE 8.0; Windows NT 6.1; WOW64; Trident/4.0; SLCC2; .NET CLR 2.0.50727; .NET CLR 3.5.30729; .NET CLR 3.0.30729; Media Center PC 6.0; .NET4.0C; .NET4.0E; InfoPath.3) d=1570 rpt=ahtm x= + V=magnetic.domdex.com t=[10/Dec/2015:01:00:01 +0000] a= u=- c=c4706fc6df6f48b683d6aca71863f99f m=GET l=/ahtm q=js=t&r=c&b=39634&c=57391&n=9468&id=650e33b95a1449705651&sz=728x90&s=onetravel.com&u=c4706fc6df6f48b683d6aca71863f99f&f=1&cat=00-00&ms=558536&kw=&kwcat=&dp=&a=VmjAfwAOX7AUNL2pBW_4_aHw4x_o6q1Wy3wCYA s=200 b=2849 r=http://www.onetravel.com/ a0=2601:346:404:4e50:b090:77f3:4343:fbc1 ua=Mozilla/4.0 (compatible; MSIE 8.0; Windows NT 6.1; WOW64; Trident/4.0; SLCC2; .NET CLR 2.0.50727; .NET CLR 3.5.30729; .NET CLR 3.0.30729; Media Center PC 6.0; .NET4.0C; .NET4.0E; InfoPath.3) d=1570 rpt=ahtm x= + V=magnetic.domdex.com t=[10/Dec/2015:01:00:01 +0000] a= u=- c=c4706fc6df6f48b683d6aca71863f99f m=GET l=/ahtm q=js=t&r=c&b=39634&c=57391&n=9468&id=650e33b95a1449705661&sz=728x90&s=onetravel.com&u=c4706fc6df6f48b683d6aca71863f99f&f=1&cat=00-00&ms=558536&kw=&kwcat=&dp=&a=VmjAfwAOX7AUNL2pBW_4_aHw4x_o6q1Wy3wCYA s=200 b=2849 r=http://www.onetravel.com/ a0=2601:346:404:4e50:b090:77f3:4343:fbc1 ua=Mozilla/4.0 (compatible; MSIE 8.0; Windows NT 6.1; WOW64; Trident/4.0; SLCC2; .NET CLR 2.0.50727; .NET CLR 3.5.30729; .NET CLR 3.0.30729; Media Center PC 6.0; .NET4.0C; .NET4.0E; InfoPath.3) d=1570 rpt=ahtm x= + V=magnetic.domdex.com t=[10/Dec/2015:01:00:01 +0000] a= u=- c=c4706fc6df6f48b683d6aca71863f99f m=GET l=/ahtm q=js=t&r=c&b=39634&c=57391&n=9468&id=650e33b95a1449705671&sz=728x90&s=onetravel.com&u=c4706fc6df6f48b683d6aca71863f99f&f=1&cat=00-00&ms=558536&kw=&kwcat=&dp=&a=VmjAfwAOX7AUNL2pBW_4_aHw4x_o6q1Wy3wCYA s=200 b=2849 r=http://www.onetravel.com/ a0=2601:346:404:4e50:b090:77f3:4343:fbc1 ua=Mozilla/4.0 (compatible; MSIE 8.0; Windows NT 6.1; WOW64; Trident/4.0; SLCC2; .NET CLR 2.0.50727; .NET CLR 3.5.30729; .NET CLR 3.0.30729; Media Center PC 6.0; .NET4.0C; .NET4.0E; InfoPath.3) d=1570 rpt=ahtm x= + V=magnetic.domdex.com t=[10/Dec/2015:01:00:01 +0000] a= u=- c=c4706fc6df6f48b683d6aca71863f99f m=GET l=/ahtm q=js=t&r=c&b=39634&c=57391&n=9468&id=650e33b95a1449705681&sz=728x90&s=onetravel.com&u=c4706fc6df6f48b683d6aca71863f99f&f=1&cat=00-00&ms=558536&kw=&kwcat=&dp=&a=VmjAfwAOX7AUNL2pBW_4_aHw4x_o6q1Wy3wCYA s=200 b=2849 r=http://www.onetravel.com/ a0=2601:346:404:4e50:b090:77f3:4343:fbc1 ua=Mozilla/4.0 (compatible; MSIE 8.0; Windows NT 6.1; WOW64; Trident/4.0; SLCC2; .NET CLR 2.0.50727; .NET CLR 3.5.30729; .NET CLR 3.0.30729; Media Center PC 6.0; .NET4.0C; .NET4.0E; InfoPath.3) d=1570 rpt=ahtm x= + V=magnetic.domdex.com t=[10/Dec/2015:01:00:01 +0000] a= u=- c=c4706fc6df6f48b683d6aca71863f99f m=GET l=/ahtm q=js=t&r=c&b=39634&c=57391&n=9468&id=650e33b95a1449705691&sz=728x90&s=onetravel.com&u=c4706fc6df6f48b683d6aca71863f99f&f=1&cat=00-00&ms=558536&kw=&kwcat=&dp=&a=VmjAfwAOX7AUNL2pBW_4_aHw4x_o6q1Wy3wCYA s=200 b=2849 r=http://www.onetravel.com/ a0=2601:346:404:4e50:b090:77f3:4343:fbc1 ua=Mozilla/4.0 (compatible; MSIE 8.0; Windows NT 6.1; WOW64; Trident/4.0; SLCC2; .NET CLR 2.0.50727; .NET CLR 3.5.30729; .NET CLR 3.0.30729; Media Center PC 6.0; .NET4.0C; .NET4.0E; InfoPath.3) d=1570 rpt=ahtm x= + +# Submit some bid logs sh deploy/kafka/bin/kafka-console-producer.sh --broker-list localhost:9092 --topic bid-raw +# Copy-paste matching bid log event +1449705600 45799578e9064ca5b4e87af2aba77092 161.253.120.255 US 511 DC 20016 America/New_York 650e33b95a1449705601 5668c08c0008a5e80a1f1acb6c0f76fa g 1 thegradcafe.com 728x90 1 00-00 1054641 9115 54663 38227 54663,52593,51249,51246,55928,50856,46309,52454,32235,50944 Mozilla/5.0 (Macintosh; Intel Mac OS X 10_10_5) AppleWebKit/601.3.9 (KHTML, like Gecko) Version/9.0.2 Safari/601.3.9 http://thegradcafe.com/survey/index.php 1 875 1000 1000 en iab_tech 85 85 45 6500 45 15 25000 1000 600 1000 1000 1000 1000 1000 magnetic_ctr_variable_price 1.0 2.64151881627e-05 2015120920 -4.05492019653 2015120920 Safari Mac_OS_X 00-00 0 1 70 n_a + +# Monitor propagation of data thru system sh deploy/kafka/bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic imp-meta sh deploy/kafka/bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic bid-meta sh deploy/kafka/bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic imp-error sh deploy/kafka/bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic bid-error sh deploy/kafka/bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic imp-bid-join +# Shutdown everything bin/grid stop all ``` - -ad log event: - V=magnetic.domdex.com t=[10/Dec/2015:00:00:00 +0000] a= u=- c=c4706fc6df6f48b683d6aca71863f99f m=GET l=/ahtm q=js=t&r=c&b=39634&c=57391&n=9468&id=650e33b95a1449705601&sz=728x90&s=onetravel.com&u=c4706fc6df6f48b683d6aca71863f99f&f=1&cat=00-00&ms=558536&kw=&kwcat=&dp=&a=VmjAfwAOX7AUNL2pBW_4_aHw4x_o6q1Wy3wCYA s=200 b=2849 r=http://www.onetravel.com/ a0=2601:346:404:4e50:b090:77f3:4343:fbc1 ua=Mozilla/4.0 (compatible; MSIE 8.0; Windows NT 6.1; WOW64; Trident/4.0; SLCC2; .NET CLR 2.0.50727; .NET CLR 3.5.30729; .NET CLR 3.0.30729; Media Center PC 6.0; .NET4.0C; .NET4.0E; InfoPath.3) d=1570 rpt=ahtm x= -bid log event: -1449705600 45799578e9064ca5b4e87af2aba77092 161.253.120.255 US 511 DC 20016 America/New_York 650e33b95a1449705601 5668c08c0008a5e80a1f1acb6c0f76fa g 1 thegradcafe.com 728x90 1 00-00 1054641 9115 54663 38227 54663,52593,51249,51246,55928,50856,46309,52454,32235,50944 Mozilla/5.0 (Macintosh; Intel Mac OS X 10_10_5) AppleWebKit/601.3.9 (KHTML, like Gecko) Version/9.0.2 Safari/601.3.9 http://thegradcafe.com/survey/index.php 1 875 1000 1000 en iab_tech 85 85 45 6500 45 15 25000 1000 600 1000 1000 1000 1000 1000 magnetic_ctr_variable_price 1.0 2.64151881627e-05 2015120920 -4.05492019653 2015120920 Safari Mac_OS_X 00-00 0 1 70 n_a diff --git a/src/main/scala/com/magnetic/streaming/samza/MagneticJoinStreamsTask.scala b/src/main/scala/com/magnetic/streaming/samza/MagneticJoinStreamsTask.scala index 5e2c96d7..15b4629a 100644 --- a/src/main/scala/com/magnetic/streaming/samza/MagneticJoinStreamsTask.scala +++ b/src/main/scala/com/magnetic/streaming/samza/MagneticJoinStreamsTask.scala @@ -12,6 +12,8 @@ class MagneticJoinStreamsTask extends StreamTask with InitableTask with Windowab val OUTPUT_STREAM = new SystemStream("kafka", "imp-bid-joined") var impStore: KeyValueStore[String, java.util.Map[String, Any]] = null var bidStore: KeyValueStore[String, java.util.Map[String, Any]] = null + var lastImpTimestamp = 0 + var lastBidTimestamp = 0 override def init(config: Config, context: TaskContext) { this.impStore = context.getStore("imp-store").asInstanceOf[KeyValueStore[String, java.util.Map[String, Any]]] @@ -30,10 +32,10 @@ class MagneticJoinStreamsTask extends StreamTask with InitableTask with Windowab override def process(envelope: IncomingMessageEnvelope, collector: MessageCollector, coordinator: TaskCoordinator) { val key = envelope.getKey.asInstanceOf[String] - println(envelope.getMessage) val event = mapAsScalaMap(envelope.getMessage.asInstanceOf[java.util.Map[String,Any]]).toMap envelope.getSystemStreamPartition.getSystemStream.getStream match { case "imp-meta" => + lastImpTimestamp = event("log_timestamp").asInstanceOf[Integer] Option(bidStore.get(key)) match { case Some(bid) => collector.send( @@ -45,6 +47,7 @@ class MagneticJoinStreamsTask extends StreamTask with InitableTask with Windowab case None => impStore.put(key, mapAsJavaMap(event)) } case "bid-meta" => + lastBidTimestamp = event("log_timestamp").asInstanceOf[Integer] Option(impStore.get(key)) match { case Some(imp) => collector.send( @@ -60,7 +63,18 @@ class MagneticJoinStreamsTask extends StreamTask with InitableTask with Windowab } } - override def window(messageCollector: MessageCollector, taskCoordinator: TaskCoordinator) { + def cleanUpEventStore(eventStore: KeyValueStore[String, java.util.Map[String, Any]], thresholdTimestamp: Integer) { + val it = eventStore.all() + while (it.hasNext) { + val entry = it.next() + if (entry.getValue.get("log_timestamp").asInstanceOf[Integer] < thresholdTimestamp) { + eventStore.delete(entry.getKey) + } + } + } + override def window(messageCollector: MessageCollector, taskCoordinator: TaskCoordinator) { + cleanUpEventStore(impStore, lastImpTimestamp - 3600) //TODO Keep one hour of events. Make it configurable + cleanUpEventStore(bidStore, lastBidTimestamp - 3600) //TODO Keep one hour of events. Make it configurable } } From 5582ea3ab36b7c201ef3a435b9dfa6e3bc69e2f7 Mon Sep 17 00:00:00 2001 From: Stanislav Los Date: Fri, 18 Dec 2015 09:26:20 -0500 Subject: [PATCH 09/11] Updated docs --- src/main/scala/README.md | 4 +++- .../samza/MagneticEventsFeedStreamTask.scala | 11 ++++++++++- .../streaming/samza/MagneticJoinStreamsTask.scala | 8 ++++++++ 3 files changed, 21 insertions(+), 2 deletions(-) diff --git a/src/main/scala/README.md b/src/main/scala/README.md index 0ee55425..f17aac1b 100644 --- a/src/main/scala/README.md +++ b/src/main/scala/README.md @@ -1,3 +1,5 @@ +# Magnetic Imp-Bid Join PoC using Samza + ``` # For the first time run to set up Samza environment bin/grid bootstrap @@ -14,7 +16,7 @@ sh deploy/kafka/bin/kafka-topics.sh --create --topic imp-store-changelog --zooke sh deploy/kafka/bin/kafka-topics.sh --create --topic bid-store-changelog --zookeeper localhost:2181 --partitions 4 --replication 1 sh deploy/kafka/bin/kafka-topics.sh --list --zookeeper localhost:2181 -# After that start the grid +# Start the grid bin/grid start all # Build and deploy project diff --git a/src/main/scala/com/magnetic/streaming/samza/MagneticEventsFeedStreamTask.scala b/src/main/scala/com/magnetic/streaming/samza/MagneticEventsFeedStreamTask.scala index a90f0fd6..4753547f 100644 --- a/src/main/scala/com/magnetic/streaming/samza/MagneticEventsFeedStreamTask.scala +++ b/src/main/scala/com/magnetic/streaming/samza/MagneticEventsFeedStreamTask.scala @@ -14,8 +14,14 @@ import org.apache.samza.task.TaskCoordinator.RequestScope import scala.util.{Failure, Success, Try} +/** + * Reads raw impressions and bids from Kafka. + * Does minor parsing to extract meta information, like auction id and log timestamp. + * Then sends event to the appropriate topic/partition. + * Chooses partition, by hashing auction id. + */ class MagneticEventsFeedStreamTask extends StreamTask { - val NUM_PARTITIONS = 4 + val NUM_PARTITIONS = 4 // Equals to the number of partitions in target topic val IMP_OUTPUT_STREAM = new SystemStream("kafka", "imp-meta") val BID_OUTPUT_STREAM = new SystemStream("kafka", "bid-meta") val IMP_ERROR_STREAM = new SystemStream("kafka", "imp-error") @@ -58,6 +64,9 @@ class MagneticEventsFeedStreamTask extends StreamTask { } } +/** + * TODO remove this and create a proper unit test with mock objects + */ object HelloMagneticSamza { def main(args: Array[String]): Unit = { val t = new MagneticEventsFeedStreamTask() diff --git a/src/main/scala/com/magnetic/streaming/samza/MagneticJoinStreamsTask.scala b/src/main/scala/com/magnetic/streaming/samza/MagneticJoinStreamsTask.scala index 15b4629a..d89b742d 100644 --- a/src/main/scala/com/magnetic/streaming/samza/MagneticJoinStreamsTask.scala +++ b/src/main/scala/com/magnetic/streaming/samza/MagneticJoinStreamsTask.scala @@ -7,6 +7,14 @@ import org.apache.samza.storage.kv.KeyValueStore import org.apache.samza.system.{OutgoingMessageEnvelope, SystemStream, IncomingMessageEnvelope} import org.apache.samza.task._ +/** + * Reads impressions and bids pre-parsed by previous step. + * Since they were partitioned by auction id, ims and bids with the same key end up in the same task. + * Uses local KV store assigned for given task to lookup matching events. + * If match found, creates a joined event and sends it downstream. + * Otherwise persists event for further lookups. + * Periodically deletes stale events from local KV store. + */ class MagneticJoinStreamsTask extends StreamTask with InitableTask with WindowableTask { val OUTPUT_STREAM = new SystemStream("kafka", "imp-bid-joined") From 510a515b4d4892e6a9b7045dc33bf95992c34266 Mon Sep 17 00:00:00 2001 From: Stanislav Los Date: Fri, 18 Dec 2015 10:07:19 -0500 Subject: [PATCH 10/11] Updated docs --- src/main/scala/README.md | 29 +++++++++++++++++-- .../samza/MagneticJoinStreamsTask.scala | 2 +- 2 files changed, 28 insertions(+), 3 deletions(-) diff --git a/src/main/scala/README.md b/src/main/scala/README.md index f17aac1b..da8a959f 100644 --- a/src/main/scala/README.md +++ b/src/main/scala/README.md @@ -1,5 +1,28 @@ # Magnetic Imp-Bid Join PoC using Samza +###Intro +- Repartition impressions and bids by the same key (auction id) and send them to related topic/partition. +- That way n-th partition of impressions topic will have the same auctions as n-th partition of bids topic. +- When reading those topics, Samza will provide data from n-th partition on both topics to the same task, +i.e. all the necessary information to make a join will end up on the same machine/process. +- Samza's local state storage (KV store) provides a lookup mechanism to make a join. +- Increasing amount of partitions/concurrent tasks allows the join to scale linearly (nothing-share architecture). +- Tested on local Samza grid + +###TODO +- Deploy to Hadoop cluster and test at scale +- Performance of the state storage for lookups, size of the data we can hold (KV storage works well on SSD, +but can suffer on regular HDD) +- Performance of the state storage at clean up since it pauses main processing, +i.e. window() method blocks process() method +- Restoring from checkpoint; can we control read rates so two streams stay more or less aligned? +- Replays from Kafka at some time interval; need to maintain timestamp->offsets/topic/partition information +- Replays from s3? If we can keep big enough window, it's easier that with Spark, +because data streams alignment is not so critical + +### Current state PoC +Use following commands to run PoC locally. + ``` # For the first time run to set up Samza environment bin/grid bootstrap @@ -25,15 +48,17 @@ rm -rf deploy/samza mkdir -p deploy/samza tar -xvf ./target/hello-samza-0.10.0-dist.tar.gz -C deploy/samza +# Start raw events repartition deploy/samza/bin/run-job.sh --config-factory=org.apache.samza.config.factories.PropertiesConfigFactory \ --config-path=file://$PWD/deploy/samza/config/magnetic-feed.properties +# Start join process deploy/samza/bin/run-job.sh --config-factory=org.apache.samza.config.factories.PropertiesConfigFactory \ --config-path=file://$PWD/deploy/samza/config/magnetic-join.properties # Logs can be found: tail -100f deploy/yarn/logs/userlogs/application_XXXXXXXXXX_XXXX/container_XXXXXXXXXX_XXXX_XX_XXXXXX/{logs} -# Submit some ad logs +# Start Kafka concole producer to submit some ad logs sh deploy/kafka/bin/kafka-console-producer.sh --broker-list localhost:9092 --topic imp-raw # Copy paste ad log event(impression) @@ -51,7 +76,7 @@ sh deploy/kafka/bin/kafka-console-producer.sh --broker-list localhost:9092 --top V=magnetic.domdex.com t=[10/Dec/2015:01:00:01 +0000] a= u=- c=c4706fc6df6f48b683d6aca71863f99f m=GET l=/ahtm q=js=t&r=c&b=39634&c=57391&n=9468&id=650e33b95a1449705681&sz=728x90&s=onetravel.com&u=c4706fc6df6f48b683d6aca71863f99f&f=1&cat=00-00&ms=558536&kw=&kwcat=&dp=&a=VmjAfwAOX7AUNL2pBW_4_aHw4x_o6q1Wy3wCYA s=200 b=2849 r=http://www.onetravel.com/ a0=2601:346:404:4e50:b090:77f3:4343:fbc1 ua=Mozilla/4.0 (compatible; MSIE 8.0; Windows NT 6.1; WOW64; Trident/4.0; SLCC2; .NET CLR 2.0.50727; .NET CLR 3.5.30729; .NET CLR 3.0.30729; Media Center PC 6.0; .NET4.0C; .NET4.0E; InfoPath.3) d=1570 rpt=ahtm x= V=magnetic.domdex.com t=[10/Dec/2015:01:00:01 +0000] a= u=- c=c4706fc6df6f48b683d6aca71863f99f m=GET l=/ahtm q=js=t&r=c&b=39634&c=57391&n=9468&id=650e33b95a1449705691&sz=728x90&s=onetravel.com&u=c4706fc6df6f48b683d6aca71863f99f&f=1&cat=00-00&ms=558536&kw=&kwcat=&dp=&a=VmjAfwAOX7AUNL2pBW_4_aHw4x_o6q1Wy3wCYA s=200 b=2849 r=http://www.onetravel.com/ a0=2601:346:404:4e50:b090:77f3:4343:fbc1 ua=Mozilla/4.0 (compatible; MSIE 8.0; Windows NT 6.1; WOW64; Trident/4.0; SLCC2; .NET CLR 2.0.50727; .NET CLR 3.5.30729; .NET CLR 3.0.30729; Media Center PC 6.0; .NET4.0C; .NET4.0E; InfoPath.3) d=1570 rpt=ahtm x= -# Submit some bid logs +# Start Kafka concole producer to submit some bid logs sh deploy/kafka/bin/kafka-console-producer.sh --broker-list localhost:9092 --topic bid-raw # Copy-paste matching bid log event diff --git a/src/main/scala/com/magnetic/streaming/samza/MagneticJoinStreamsTask.scala b/src/main/scala/com/magnetic/streaming/samza/MagneticJoinStreamsTask.scala index d89b742d..46a32a84 100644 --- a/src/main/scala/com/magnetic/streaming/samza/MagneticJoinStreamsTask.scala +++ b/src/main/scala/com/magnetic/streaming/samza/MagneticJoinStreamsTask.scala @@ -13,7 +13,7 @@ import org.apache.samza.task._ * Uses local KV store assigned for given task to lookup matching events. * If match found, creates a joined event and sends it downstream. * Otherwise persists event for further lookups. - * Periodically deletes stale events from local KV store. + * Periodically deletes stale events from local KV store (it blocks main processing, so performance is critical). */ class MagneticJoinStreamsTask extends StreamTask with InitableTask with WindowableTask { From 1a09ab703f1b3de1d43c02ef97f31ca5ae0938b9 Mon Sep 17 00:00:00 2001 From: Stanislav Los Date: Thu, 14 Jan 2016 10:06:11 -0500 Subject: [PATCH 11/11] Updated README --- src/main/scala/README.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/main/scala/README.md b/src/main/scala/README.md index da8a959f..8bc63267 100644 --- a/src/main/scala/README.md +++ b/src/main/scala/README.md @@ -9,7 +9,7 @@ i.e. all the necessary information to make a join will end up on the same machin - Increasing amount of partitions/concurrent tasks allows the join to scale linearly (nothing-share architecture). - Tested on local Samza grid -###TODO +###TODO (See https://github.com/staslos/samza-hello-samza/tree/imp_bid_join_cdh) - Deploy to Hadoop cluster and test at scale - Performance of the state storage for lookups, size of the data we can hold (KV storage works well on SSD, but can suffer on regular HDD)