Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
31 changes: 26 additions & 5 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -108,10 +108,17 @@ under the License.
<artifactId>hadoop-hdfs</artifactId>
<version>2.6.1</version>
</dependency>
<dependency>
<groupId>org.scala-lang</groupId>
<artifactId>scala-library</artifactId>
<version>${scala.version}</version>
</dependency>
</dependencies>

<properties>
<!-- maven specific properties -->
<scala.binary.version>2.10</scala.binary.version>
<scala.version>2.10.4</scala.version>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<samza.version>0.10.0-SNAPSHOT</samza.version>
</properties>
Expand Down Expand Up @@ -168,9 +175,9 @@ under the License.
</pluginRepositories>

<build>
<pluginManagement>
<!--<pluginManagement>
<plugins>
<plugin>
plugin>
<groupId>org.apache.rat</groupId>
<artifactId>apache-rat-plugin</artifactId>
<version>0.9</version>
Expand All @@ -193,8 +200,22 @@ under the License.
</configuration>
</plugin>
</plugins>
</pluginManagement>
</pluginManagement>-->
<sourceDirectory>src/main/scala</sourceDirectory>
<plugins>
<plugin>
<groupId>org.scala-tools</groupId>
<artifactId>maven-scala-plugin</artifactId>
<version>2.15.0</version>
<executions>
<execution>
<goals>
<goal>compile</goal>
<goal>testCompile</goal>
</goals>
</execution>
</executions>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
Expand All @@ -204,7 +225,7 @@ under the License.
<target>1.7</target>
</configuration>
</plugin>
<plugin>
<!--<plugin>
<groupId>org.apache.rat</groupId>
<artifactId>apache-rat-plugin</artifactId>
<version>0.9</version>
Expand All @@ -216,7 +237,7 @@ under the License.
</goals>
</execution>
</executions>
</plugin>
</plugin>-->
<!-- plugin to build the tar.gz file filled with examples -->
<plugin>
<artifactId>maven-assembly-plugin</artifactId>
Expand Down
10 changes: 10 additions & 0 deletions src/main/assembly/src.xml
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,16 @@
</file>
<!-- filtered=true, so we do variable expansion so the yarn package path
always points to the correct spot on any machine -->
<file>
<source>${basedir}/src/main/config/magnetic-feed.properties</source>
<outputDirectory>config</outputDirectory>
<filtered>true</filtered>
</file>
<file>
<source>${basedir}/src/main/config/magnetic-join.properties</source>
<outputDirectory>config</outputDirectory>
<filtered>true</filtered>
</file>
<file>
<source>${basedir}/src/main/config/wikipedia-feed.properties</source>
<outputDirectory>config</outputDirectory>
Expand Down
27 changes: 27 additions & 0 deletions src/main/config/magnetic-feed.properties
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
# Job
job.factory.class=org.apache.samza.job.yarn.YarnJobFactory
job.name=magnetic-feed

# YARN
yarn.package.path=file://${basedir}/target/${project.artifactId}-${pom.version}-dist.tar.gz

# Task
task.class=com.magnetic.streaming.samza.MagneticEventsFeedStreamTask
task.inputs=kafka.imp-raw,kafka.bid-raw

# Serializers
serializers.registry.json.class=org.apache.samza.serializers.JsonSerdeFactory
serializers.registry.string.class=org.apache.samza.serializers.StringSerdeFactory

# Kafka Systems
systems.kafka.samza.factory=org.apache.samza.system.kafka.KafkaSystemFactory
systems.kafka.samza.key.serde=string
systems.kafka.samza.msg.serde=json
systems.kafka.streams.imp-raw.samza.msg.serde=string
systems.kafka.streams.bid-raw.samza.msg.serde=string
systems.kafka.consumer.zookeeper.connect=localhost:2181/
systems.kafka.producer.bootstrap.servers=localhost:9092

# Job Coordinator
job.coordinator.system=kafka
job.coordinator.replication.factor=1
40 changes: 40 additions & 0 deletions src/main/config/magnetic-join.properties
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
# Job
job.factory.class=org.apache.samza.job.yarn.YarnJobFactory
job.name=magnetic-join

# YARN
yarn.package.path=file://${basedir}/target/${project.artifactId}-${pom.version}-dist.tar.gz

# Task
task.class=com.magnetic.streaming.samza.MagneticJoinStreamsTask
task.inputs=kafka.imp-meta,kafka.bid-meta
# Call the window() method every hour (actual window size defined in window method)
task.window.ms=3600000

# Serializers
serializers.registry.json.class=org.apache.samza.serializers.JsonSerdeFactory
serializers.registry.string.class=org.apache.samza.serializers.StringSerdeFactory

# Kafka System
systems.kafka.samza.factory=org.apache.samza.system.kafka.KafkaSystemFactory
systems.kafka.samza.key.serde=string
systems.kafka.samza.msg.serde=json
systems.kafka.consumer.zookeeper.connect=localhost:2181/
systems.kafka.producer.bootstrap.servers=localhost:9092

# KV Store
stores.imp-store.factory=org.apache.samza.storage.kv.RocksDbKeyValueStorageEngineFactory
stores.imp-store.changelog=kafka.imp-store-changelog
stores.imp-store.changelog.replication.factor=1
stores.imp-store.key.serde=string
stores.imp-store.msg.serde=json

stores.bid-store.factory=org.apache.samza.storage.kv.RocksDbKeyValueStorageEngineFactory
stores.bid-store.changelog=kafka.bid-store-changelog
stores.bid-store.changelog.replication.factor=1
stores.bid-store.key.serde=string
stores.bid-store.msg.serde=json

# Job Coordinator
job.coordinator.system=kafka
job.coordinator.replication.factor=1
94 changes: 94 additions & 0 deletions src/main/scala/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,94 @@
# Magnetic Imp-Bid Join PoC using Samza

###Intro
- Repartition impressions and bids by the same key (auction id) and send them to related topic/partition.
- That way n-th partition of impressions topic will have the same auctions as n-th partition of bids topic.
- When reading those topics, Samza will provide data from n-th partition on both topics to the same task,
i.e. all the necessary information to make a join will end up on the same machine/process.
- Samza's local state storage (KV store) provides a lookup mechanism to make a join.
- Increasing amount of partitions/concurrent tasks allows the join to scale linearly (nothing-share architecture).
- Tested on local Samza grid

###TODO (See https://github.com/staslos/samza-hello-samza/tree/imp_bid_join_cdh)
- Deploy to Hadoop cluster and test at scale
- Performance of the state storage for lookups, size of the data we can hold (KV storage works well on SSD,
but can suffer on regular HDD)
- Performance of the state storage at clean up since it pauses main processing,
i.e. window() method blocks process() method
- Restoring from checkpoint; can we control read rates so two streams stay more or less aligned?
- Replays from Kafka at some time interval; need to maintain timestamp->offsets/topic/partition information
- Replays from s3? If we can keep big enough window, it's easier that with Spark,
because data streams alignment is not so critical

### Current state PoC
Use following commands to run PoC locally.

```
# For the first time run to set up Samza environment
bin/grid bootstrap

# Create Kafka topics (some of them created automatically, but here we do it explicitly)
sh deploy/kafka/bin/kafka-topics.sh --create --topic imp-raw --zookeeper localhost:2181 --partitions 1 --replication 1
sh deploy/kafka/bin/kafka-topics.sh --create --topic bid-raw --zookeeper localhost:2181 --partitions 1 --replication 1
sh deploy/kafka/bin/kafka-topics.sh --create --topic imp-meta --zookeeper localhost:2181 --partitions 4 --replication 1
sh deploy/kafka/bin/kafka-topics.sh --create --topic bid-meta --zookeeper localhost:2181 --partitions 4 --replication 1
sh deploy/kafka/bin/kafka-topics.sh --create --topic imp-error --zookeeper localhost:2181 --partitions 1 --replication 1
sh deploy/kafka/bin/kafka-topics.sh --create --topic bid-error --zookeeper localhost:2181 --partitions 1 --replication 1
sh deploy/kafka/bin/kafka-topics.sh --create --topic imp-bid-joined --zookeeper localhost:2181 --partitions 1 --replication 1
sh deploy/kafka/bin/kafka-topics.sh --create --topic imp-store-changelog --zookeeper localhost:2181 --partitions 4 --replication 1
sh deploy/kafka/bin/kafka-topics.sh --create --topic bid-store-changelog --zookeeper localhost:2181 --partitions 4 --replication 1
sh deploy/kafka/bin/kafka-topics.sh --list --zookeeper localhost:2181

# Start the grid
bin/grid start all

# Build and deploy project
mvn clean package
rm -rf deploy/samza
mkdir -p deploy/samza
tar -xvf ./target/hello-samza-0.10.0-dist.tar.gz -C deploy/samza

# Start raw events repartition
deploy/samza/bin/run-job.sh --config-factory=org.apache.samza.config.factories.PropertiesConfigFactory \
--config-path=file://$PWD/deploy/samza/config/magnetic-feed.properties
# Start join process
deploy/samza/bin/run-job.sh --config-factory=org.apache.samza.config.factories.PropertiesConfigFactory \
--config-path=file://$PWD/deploy/samza/config/magnetic-join.properties

# Logs can be found:
tail -100f deploy/yarn/logs/userlogs/application_XXXXXXXXXX_XXXX/container_XXXXXXXXXX_XXXX_XX_XXXXXX/{logs}

# Start Kafka concole producer to submit some ad logs
sh deploy/kafka/bin/kafka-console-producer.sh --broker-list localhost:9092 --topic imp-raw

# Copy paste ad log event(impression)
V=magnetic.domdex.com t=[10/Dec/2015:00:00:00 +0000] a= u=- c=c4706fc6df6f48b683d6aca71863f99f m=GET l=/ahtm q=js=t&r=c&b=39634&c=57391&n=9468&id=650e33b95a1449705601&sz=728x90&s=onetravel.com&u=c4706fc6df6f48b683d6aca71863f99f&f=1&cat=00-00&ms=558536&kw=&kwcat=&dp=&a=VmjAfwAOX7AUNL2pBW_4_aHw4x_o6q1Wy3wCYA s=200 b=2849 r=http://www.onetravel.com/ a0=2601:346:404:4e50:b090:77f3:4343:fbc1 ua=Mozilla/4.0 (compatible; MSIE 8.0; Windows NT 6.1; WOW64; Trident/4.0; SLCC2; .NET CLR 2.0.50727; .NET CLR 3.5.30729; .NET CLR 3.0.30729; Media Center PC 6.0; .NET4.0C; .NET4.0E; InfoPath.3) d=1570 rpt=ahtm x=

# If matching bid log is not submitted, adding those impressions will lead to cleanup of previous one from imp-store
# when next time window() function runs (you can see this happen by tailing imp-store-changelog topic, it's delayed, so be patient)
V=magnetic.domdex.com t=[10/Dec/2015:01:00:01 +0000] a= u=- c=c4706fc6df6f48b683d6aca71863f99f m=GET l=/ahtm q=js=t&r=c&b=39634&c=57391&n=9468&id=650e33b95a1449705611&sz=728x90&s=onetravel.com&u=c4706fc6df6f48b683d6aca71863f99f&f=1&cat=00-00&ms=558536&kw=&kwcat=&dp=&a=VmjAfwAOX7AUNL2pBW_4_aHw4x_o6q1Wy3wCYA s=200 b=2849 r=http://www.onetravel.com/ a0=2601:346:404:4e50:b090:77f3:4343:fbc1 ua=Mozilla/4.0 (compatible; MSIE 8.0; Windows NT 6.1; WOW64; Trident/4.0; SLCC2; .NET CLR 2.0.50727; .NET CLR 3.5.30729; .NET CLR 3.0.30729; Media Center PC 6.0; .NET4.0C; .NET4.0E; InfoPath.3) d=1570 rpt=ahtm x=
V=magnetic.domdex.com t=[10/Dec/2015:01:00:01 +0000] a= u=- c=c4706fc6df6f48b683d6aca71863f99f m=GET l=/ahtm q=js=t&r=c&b=39634&c=57391&n=9468&id=650e33b95a1449705621&sz=728x90&s=onetravel.com&u=c4706fc6df6f48b683d6aca71863f99f&f=1&cat=00-00&ms=558536&kw=&kwcat=&dp=&a=VmjAfwAOX7AUNL2pBW_4_aHw4x_o6q1Wy3wCYA s=200 b=2849 r=http://www.onetravel.com/ a0=2601:346:404:4e50:b090:77f3:4343:fbc1 ua=Mozilla/4.0 (compatible; MSIE 8.0; Windows NT 6.1; WOW64; Trident/4.0; SLCC2; .NET CLR 2.0.50727; .NET CLR 3.5.30729; .NET CLR 3.0.30729; Media Center PC 6.0; .NET4.0C; .NET4.0E; InfoPath.3) d=1570 rpt=ahtm x=
V=magnetic.domdex.com t=[10/Dec/2015:01:00:01 +0000] a= u=- c=c4706fc6df6f48b683d6aca71863f99f m=GET l=/ahtm q=js=t&r=c&b=39634&c=57391&n=9468&id=650e33b95a1449705631&sz=728x90&s=onetravel.com&u=c4706fc6df6f48b683d6aca71863f99f&f=1&cat=00-00&ms=558536&kw=&kwcat=&dp=&a=VmjAfwAOX7AUNL2pBW_4_aHw4x_o6q1Wy3wCYA s=200 b=2849 r=http://www.onetravel.com/ a0=2601:346:404:4e50:b090:77f3:4343:fbc1 ua=Mozilla/4.0 (compatible; MSIE 8.0; Windows NT 6.1; WOW64; Trident/4.0; SLCC2; .NET CLR 2.0.50727; .NET CLR 3.5.30729; .NET CLR 3.0.30729; Media Center PC 6.0; .NET4.0C; .NET4.0E; InfoPath.3) d=1570 rpt=ahtm x=
V=magnetic.domdex.com t=[10/Dec/2015:01:00:01 +0000] a= u=- c=c4706fc6df6f48b683d6aca71863f99f m=GET l=/ahtm q=js=t&r=c&b=39634&c=57391&n=9468&id=650e33b95a1449705641&sz=728x90&s=onetravel.com&u=c4706fc6df6f48b683d6aca71863f99f&f=1&cat=00-00&ms=558536&kw=&kwcat=&dp=&a=VmjAfwAOX7AUNL2pBW_4_aHw4x_o6q1Wy3wCYA s=200 b=2849 r=http://www.onetravel.com/ a0=2601:346:404:4e50:b090:77f3:4343:fbc1 ua=Mozilla/4.0 (compatible; MSIE 8.0; Windows NT 6.1; WOW64; Trident/4.0; SLCC2; .NET CLR 2.0.50727; .NET CLR 3.5.30729; .NET CLR 3.0.30729; Media Center PC 6.0; .NET4.0C; .NET4.0E; InfoPath.3) d=1570 rpt=ahtm x=
V=magnetic.domdex.com t=[10/Dec/2015:01:00:01 +0000] a= u=- c=c4706fc6df6f48b683d6aca71863f99f m=GET l=/ahtm q=js=t&r=c&b=39634&c=57391&n=9468&id=650e33b95a1449705651&sz=728x90&s=onetravel.com&u=c4706fc6df6f48b683d6aca71863f99f&f=1&cat=00-00&ms=558536&kw=&kwcat=&dp=&a=VmjAfwAOX7AUNL2pBW_4_aHw4x_o6q1Wy3wCYA s=200 b=2849 r=http://www.onetravel.com/ a0=2601:346:404:4e50:b090:77f3:4343:fbc1 ua=Mozilla/4.0 (compatible; MSIE 8.0; Windows NT 6.1; WOW64; Trident/4.0; SLCC2; .NET CLR 2.0.50727; .NET CLR 3.5.30729; .NET CLR 3.0.30729; Media Center PC 6.0; .NET4.0C; .NET4.0E; InfoPath.3) d=1570 rpt=ahtm x=
V=magnetic.domdex.com t=[10/Dec/2015:01:00:01 +0000] a= u=- c=c4706fc6df6f48b683d6aca71863f99f m=GET l=/ahtm q=js=t&r=c&b=39634&c=57391&n=9468&id=650e33b95a1449705661&sz=728x90&s=onetravel.com&u=c4706fc6df6f48b683d6aca71863f99f&f=1&cat=00-00&ms=558536&kw=&kwcat=&dp=&a=VmjAfwAOX7AUNL2pBW_4_aHw4x_o6q1Wy3wCYA s=200 b=2849 r=http://www.onetravel.com/ a0=2601:346:404:4e50:b090:77f3:4343:fbc1 ua=Mozilla/4.0 (compatible; MSIE 8.0; Windows NT 6.1; WOW64; Trident/4.0; SLCC2; .NET CLR 2.0.50727; .NET CLR 3.5.30729; .NET CLR 3.0.30729; Media Center PC 6.0; .NET4.0C; .NET4.0E; InfoPath.3) d=1570 rpt=ahtm x=
V=magnetic.domdex.com t=[10/Dec/2015:01:00:01 +0000] a= u=- c=c4706fc6df6f48b683d6aca71863f99f m=GET l=/ahtm q=js=t&r=c&b=39634&c=57391&n=9468&id=650e33b95a1449705671&sz=728x90&s=onetravel.com&u=c4706fc6df6f48b683d6aca71863f99f&f=1&cat=00-00&ms=558536&kw=&kwcat=&dp=&a=VmjAfwAOX7AUNL2pBW_4_aHw4x_o6q1Wy3wCYA s=200 b=2849 r=http://www.onetravel.com/ a0=2601:346:404:4e50:b090:77f3:4343:fbc1 ua=Mozilla/4.0 (compatible; MSIE 8.0; Windows NT 6.1; WOW64; Trident/4.0; SLCC2; .NET CLR 2.0.50727; .NET CLR 3.5.30729; .NET CLR 3.0.30729; Media Center PC 6.0; .NET4.0C; .NET4.0E; InfoPath.3) d=1570 rpt=ahtm x=
V=magnetic.domdex.com t=[10/Dec/2015:01:00:01 +0000] a= u=- c=c4706fc6df6f48b683d6aca71863f99f m=GET l=/ahtm q=js=t&r=c&b=39634&c=57391&n=9468&id=650e33b95a1449705681&sz=728x90&s=onetravel.com&u=c4706fc6df6f48b683d6aca71863f99f&f=1&cat=00-00&ms=558536&kw=&kwcat=&dp=&a=VmjAfwAOX7AUNL2pBW_4_aHw4x_o6q1Wy3wCYA s=200 b=2849 r=http://www.onetravel.com/ a0=2601:346:404:4e50:b090:77f3:4343:fbc1 ua=Mozilla/4.0 (compatible; MSIE 8.0; Windows NT 6.1; WOW64; Trident/4.0; SLCC2; .NET CLR 2.0.50727; .NET CLR 3.5.30729; .NET CLR 3.0.30729; Media Center PC 6.0; .NET4.0C; .NET4.0E; InfoPath.3) d=1570 rpt=ahtm x=
V=magnetic.domdex.com t=[10/Dec/2015:01:00:01 +0000] a= u=- c=c4706fc6df6f48b683d6aca71863f99f m=GET l=/ahtm q=js=t&r=c&b=39634&c=57391&n=9468&id=650e33b95a1449705691&sz=728x90&s=onetravel.com&u=c4706fc6df6f48b683d6aca71863f99f&f=1&cat=00-00&ms=558536&kw=&kwcat=&dp=&a=VmjAfwAOX7AUNL2pBW_4_aHw4x_o6q1Wy3wCYA s=200 b=2849 r=http://www.onetravel.com/ a0=2601:346:404:4e50:b090:77f3:4343:fbc1 ua=Mozilla/4.0 (compatible; MSIE 8.0; Windows NT 6.1; WOW64; Trident/4.0; SLCC2; .NET CLR 2.0.50727; .NET CLR 3.5.30729; .NET CLR 3.0.30729; Media Center PC 6.0; .NET4.0C; .NET4.0E; InfoPath.3) d=1570 rpt=ahtm x=

# Start Kafka concole producer to submit some bid logs
sh deploy/kafka/bin/kafka-console-producer.sh --broker-list localhost:9092 --topic bid-raw

# Copy-paste matching bid log event
1449705600 45799578e9064ca5b4e87af2aba77092 161.253.120.255 US 511 DC 20016 America/New_York 650e33b95a1449705601 5668c08c0008a5e80a1f1acb6c0f76fa g 1 thegradcafe.com 728x90 1 00-00 1054641 9115 54663 38227 54663,52593,51249,51246,55928,50856,46309,52454,32235,50944 Mozilla/5.0 (Macintosh; Intel Mac OS X 10_10_5) AppleWebKit/601.3.9 (KHTML, like Gecko) Version/9.0.2 Safari/601.3.9 http://thegradcafe.com/survey/index.php 1 875 1000 1000 en iab_tech 85 85 45 6500 45 15 25000 1000 600 1000 1000 1000 1000 1000 magnetic_ctr_variable_price 1.0 2.64151881627e-05 2015120920 -4.05492019653 2015120920 Safari Mac_OS_X 00-00 0 1 70 n_a

# Monitor propagation of data thru system
sh deploy/kafka/bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic imp-meta
sh deploy/kafka/bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic bid-meta
sh deploy/kafka/bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic imp-error
sh deploy/kafka/bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic bid-error
sh deploy/kafka/bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic imp-bid-join

# Shutdown everything
bin/grid stop all
```
99 changes: 99 additions & 0 deletions src/main/scala/com/magnetic/streaming/common/AdEventsParser.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,99 @@
package com.magnetic.streaming.common

import com.magnetic.streaming.common.ParseHelpers._

object AdEventsParser {

val MAGNETIC_NETWORK_ID = 1

val IMPRESSION = "imp"
val CLICK = "clk"
val CONVERSION = "conv"
val CONVCOUNT = "ccount"
val RETARGETING = "rpx"
val RETARCOUNT = "rcount"
val ERROR = "error"
val SKIP = "skip"
val BID = "bid"

val ACLK_PATH_QS = "/aclk/(\\w+=.*)/qe=".r
val VALID_AUCTION_ID_PATTERN = "[\\-\\.\\w\\d]{6,50}".r

def parse_querystring(fields: Map[String, String]): Map[String, String] = {
fields.get("l") match {
case Some(ACLK_PATH_QS(aclk)) => aclk.split('/').map(split_one(_)).toMap
case _ =>
parse_qs(fields.getOrElse("q", "").stripSuffix("/"))
}
}

def parse_common(fields: Map[String, String]): (String, Long, String, Map[String, String], String) = {

val timestamp = parse_timestamp(fields.getOrElse("t", ""))
val querystring = parse_querystring(fields)
val cookie = {
val c = fields.getOrElse("c", "").split('|')(0)
if (c.length() == 32) c
else null
}

val (event_type, pixel_id) = {
if (fields.getOrElse("rpt", "") == "ahtm") (IMPRESSION, null)
else if (fields.getOrElse("rpt", "") == "aclk") (CLICK, null)
else (SKIP, null)
}

(event_type, timestamp, cookie, querystring, pixel_id)
}

def parse_fields(line: String):Map[String, String] = {
line.split("\t").withFilter(_.contains("=")).map(split_one(_)).toMap
}

def should_skip_impression(fields: Map[String, String], querystring: Map[String, String]): Boolean = {
false //TODO implement
}

def parse_imp_meta(line: String): Map[String, Any] = {
val fields = parse_fields(line)
val (event_type, timestamp, cookie, querystring, pixel_id) = parse_common(fields)
event_type match {
case IMPRESSION =>
if (should_skip_impression(fields, querystring))
throw new RuntimeException("Should skip impression")
else {
val network_id = _int(querystring.getOrElse("mp", MAGNETIC_NETWORK_ID.toString))

val auction_id = {
if (network_id == MAGNETIC_NETWORK_ID)
querystring("id")
else
VALID_AUCTION_ID_PATTERN.findFirstIn(querystring.getOrElse("id", "")) match {
case Some(x) => querystring("id")
case _ => s"fake-$cookie-$timestamp"
}
}
val d = scala.collection.mutable.Map[String, Any]()
d("event_type") = IMPRESSION
d("auction_id") = auction_id
d("log_timestamp") = timestamp
d("imp_log_line") = line
d.toMap
}
case _ =>
throw new RuntimeException("Not impression: " + event_type)
}
}

def parse_bid_meta(line: String): Map[String, Any] = {
val fields = line.split("\t", 10)
val auction_id = fields(8)
val timestamp = fields(0).toLong
val d = scala.collection.mutable.Map[String, Any]()
d("event_type") = BID
d("auction_id") = auction_id
d("log_timestamp") = timestamp
d("bid_log_line") = line
d.toMap
}
}
Loading