Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
63 changes: 63 additions & 0 deletions bin/produce-ad-event-data.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
#!/bin/bash -e
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.

# This script will prepare topics and start AdEventProducer which generates impression and click events

DIR="$( cd "$( dirname "${BASH_SOURCE[0]}" )" && pwd )"
BASE_DIR=$(dirname ${DIR})
AD_PRODUCER_PATH=${BASE_DIR}/target/classes/samza/examples/join/AdEventProducer
SAMZA_LIB=${BASE_DIR}/deploy/samza/lib/

ZOOKEEPER=localhost:2181
KAFKA_BROKER=localhost:9092

echo Checking if required topics exist in kafka...
TOPICSRAW="ad-impression:1;ad-click:1;ad-imp-metadata:4;ad-clk-metadata:4;ad-event-error:1;ad-join:1;ad-imp-store-changelog:4;ad-clk-store-changelog:4"
IFS=';' read -a TOPICS <<< "$TOPICSRAW"
for i in "${!TOPICS[@]}"
do
IFS=':' read -a TOPIC <<< "${TOPICS[$i]}"
TOPIC_NAME=${TOPIC[0]}
PARTITION_NUMBER=${TOPIC[1]}
EXIST=$(${BASE_DIR}/deploy/kafka/bin/kafka-topics.sh --describe --topic ${TOPIC_NAME} --zookeeper ${ZOOKEEPER})
if [ -z "${EXIST}" ]
then
echo -e topic "${TOPIC_NAME}" doesn\'t exists. Creating topic...
${BASE_DIR}/deploy/kafka/bin/kafka-topics.sh --create --topic ${TOPIC_NAME} --zookeeper ${ZOOKEEPER} --partitions ${PARTITION_NUMBER} --replication 1
else
echo Topic "${TOPIC_NAME}" already exists.
read -a TOPIC_DESCRIPTION <<< "${EXIST}"
for DESC in "${TOPIC_DESCRIPTION[@]}"
do
IFS=':' read -a KV <<< "${DESC}"
if [ ${KV[0]} == 'PartitionCount' ]
then
if [ ${KV[1]} != $PARTITION_NUMBER ]
then
echo Number of partitions for topic "${TOPIC_NAME}" is wrong. It should be ${PARTITION_NUMBER} instead of ${KV[1]}. Exiting.
exit 0
fi
fi
done
fi
done

cd ${SAMZA_LIB}

echo -e "\nStarting AdEventProducer...\n"
java -cp "*" samza.examples.join.AdEventProducer
10 changes: 10 additions & 0 deletions src/main/assembly/src.xml
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,16 @@
<outputDirectory>config</outputDirectory>
<filtered>true</filtered>
</file>
<file>
<source>${basedir}/src/main/config/ad-event-feed.properties</source>
<outputDirectory>config</outputDirectory>
<filtered>true</filtered>
</file>
<file>
<source>${basedir}/src/main/config/ad-event-join.properties</source>
<outputDirectory>config</outputDirectory>
<filtered>true</filtered>
</file>
</files>
<dependencySets>
<dependencySet>
Expand Down
45 changes: 45 additions & 0 deletions src/main/config/ad-event-feed.properties
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.

# Job
job.factory.class=org.apache.samza.job.yarn.YarnJobFactory
job.name=ad-event-feed

# Job Coordinator
job.coordinator.system=kafka
job.coordinator.replication.factor=1

# Yarn
yarn.package.path=file://${basedir}/target/${project.artifactId}-${pom.version}-dist.tar.gz

# Task
task.class=samza.examples.join.task.AdEventFeedStreamTask
task.inputs=kafka.ad-impression, kafka.ad-click

# Serializers
serializers.registry.string.class=org.apache.samza.serializers.StringSerdeFactory
serializers.registry.json.class=org.apache.samza.serializers.JsonSerdeFactory

# Kafka System
systems.kafka.samza.factory=org.apache.samza.system.kafka.KafkaSystemFactory
systems.kafka.samza.key.serde=string
systems.kafka.samza.msg.serde=json
systems.kafka.streams.ad-impression.samza.msg.serde=string
systems.kafka.streams.ad-click.samza.msg.serde=string
systems.kafka.consumer.zookeeper.connect=localhost:2181/
systems.kafka.producer.bootstrap.servers=localhost:9092
systems.kafka.samza.offset.default=oldest
63 changes: 63 additions & 0 deletions src/main/config/ad-event-join.properties
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.

# Job
job.factory.class=org.apache.samza.job.yarn.YarnJobFactory
job.name=ad-event-join

# Job Coordinator
job.coordinator.system=kafka
job.coordinator.replication.factor=1

# Yarn
yarn.package.path=file://${basedir}/target/${project.artifactId}-${pom.version}-dist.tar.gz

# Task
task.class=samza.examples.join.task.AdEventJoinStreamTask
task.inputs=kafka.ad-imp-metadata, kafka.ad-clk-metadata
# Call window method every 5 minutes
task.window.ms=300000

# Serializers
serializers.registry.string.class=org.apache.samza.serializers.StringSerdeFactory
serializers.registry.json.class=org.apache.samza.serializers.JsonSerdeFactory

# Kafka System
systems.kafka.samza.factory=org.apache.samza.system.kafka.KafkaSystemFactory
systems.kafka.samza.key.serde=string
systems.kafka.samza.msg.serde=json
systems.kafka.consumer.zookeeper.connect=localhost:2181/
systems.kafka.producer.bootstrap.servers=localhost:9092
systems.kafka.samza.offset.default=oldest

# Key/value storages
stores.imp-meta-store.factory=org.apache.samza.storage.kv.RocksDbKeyValueStorageEngineFactory
stores.imp-meta-store.changelog=kafka.ad-imp-store-changelog
stores.imp-meta-store.changelog.replication.factor=1
stores.imp-meta-store.key.serde=string
stores.imp-meta-store.msg.serde=json


stores.clk-meta-store.factory=org.apache.samza.storage.kv.RocksDbKeyValueStorageEngineFactory
stores.clk-meta-store.changelog=kafka.ad-clk-store-changelog
stores.clk-meta-store.changelog.replication.factor=1
stores.clk-meta-store.key.serde=string
stores.clk-meta-store.msg.serde=json

# Metrics
metrics.reporters=jmx
metrics.reporter.jmx.class=org.apache.samza.metrics.reporter.JmxReporterFactory
50 changes: 50 additions & 0 deletions src/main/java/samza/examples/join/AdEventParser.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package samza.examples.join;

import java.text.ParseException;
import java.util.HashMap;
import java.util.Map;

public class AdEventParser {

/**
* Parse raw ad event that should have "key=value" pairs separated with space character into a map.
* Example argument:
* {@code impression-id=1 type=click advertiser-id=1 ip=111.111.111.* agent=Chrome timestamp=2017-01-01T00:00:00.000"}
* Return value is a map that contains given key-value pairs
*
* @param rawAdEvent raw ad event String that should have key=value pairs separated with one space character
* @return event map
* @throws ParseException
*/
public static synchronized Map<String, String> parseAdEvent(String rawAdEvent) throws ParseException{
Map<String, String> adEvent = new HashMap<>();
String[] fields = rawAdEvent.split(" ");
for(String field : fields){
String[] keyValuePair = field.split("=");
if(keyValuePair.length == 2)
adEvent.put(keyValuePair[0], keyValuePair[1]);
else
throw new ParseException("Error while parsing. Messages should have only 'key=value' pairs separated by one space characters with no space and '=' characters in keys and values", -1);
}
return adEvent;
}
}
Loading