diff --git a/bin/grid-elastic b/bin/grid-elastic new file mode 100644 index 00000000..dab76231 --- /dev/null +++ b/bin/grid-elastic @@ -0,0 +1,160 @@ +#!/bin/bash -e +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +# This script will download, setup, start, and stop servers for Kafka, YARN, and ZooKeeper, +# as well as downloading, building and locally publishing Samza + +if [ -z "$JAVA_HOME" ]; then + if [ -x /usr/libexec/java_home ]; then + export JAVA_HOME="$(/usr/libexec/java_home)" + else + echo "JAVA_HOME not set. Exiting." + exit 1 + fi +fi + +DIR="$( cd "$( dirname "${BASH_SOURCE[0]}" )" && pwd )" +BASE_DIR=$(dirname $DIR) +DEPLOY_ROOT_DIR=$BASE_DIR/deploy +DOWNLOAD_CACHE_DIR=$HOME/.samza/download +COMMAND=$1 +SYSTEM=$2 + +DOWNLOAD_ELASTICSEARCH=https://download.elastic.co/elasticsearch/elasticsearch/elasticsearch-1.7.0.tar.gz +DOWNLOAD_KIBANA=https://download.elastic.co/kibana/kibana/kibana-4.1.1-linux-x86.tar.gz +DOWNLOAD_KIBANA_64=https://download.elastic.co/kibana/kibana/kibana-4.1.1-linux-x64.tar.gz + +bootstrap() { + echo "Bootstrapping the system..." + stop_all + rm -rf "$DEPLOY_ROOT_DIR" + mkdir "$DEPLOY_ROOT_DIR" + install_all + start_all + exit 0 +} + +install_all() { + $DIR/grid-elastic install elasticsearch + $DIR/grid-elastic install kibana +} + +install_all_64() { + $DIR/grid-elastic install elasticsearch + $DIR/grid-elastic install kibana_64 +} + +install_elasticsearch() { + mkdir -p "$DEPLOY_ROOT_DIR" + install elasticsearch $DOWNLOAD_ELASTICSEARCH elasticsearch-1.7.0 +} + +install_kibana() { + mkdir -p "$DEPLOY_ROOT_DIR" + install kibana $DOWNLOAD_KIBANA kibana-4.1.1-linux-x86 +} + +install_kibana_64() { + mkdir -p "$DEPLOY_ROOT_DIR" + install kibana $DOWNLOAD_KIBANA_64 kibana-4.1.1-linux-x64 +} + +install() { + DESTINATION_DIR="$DEPLOY_ROOT_DIR/$1" + DOWNLOAD_URL=$2 + PACKAGE_DIR="$DOWNLOAD_CACHE_DIR/$3" + PACKAGE_FILE="$DOWNLOAD_CACHE_DIR/$(basename $DOWNLOAD_URL)" + if [ -f "$PACKAGE_FILE" ]; then + echo "Using previously downloaded file $PACKAGE_FILE" + else + echo "Downloading $(basename $DOWNLOAD_URL)..." + mkdir -p $DOWNLOAD_CACHE_DIR + curl "$DOWNLOAD_URL" > "${PACKAGE_FILE}.tmp" + mv "${PACKAGE_FILE}.tmp" "$PACKAGE_FILE" + fi + rm -rf "$DESTINATION_DIR" "$PACKAGE_DIR" + tar -xf "$PACKAGE_FILE" -C $DOWNLOAD_CACHE_DIR + mv "$PACKAGE_DIR" "$DESTINATION_DIR" +} + +start_all() { + $DIR/grid-elastic start elasticsearch + $DIR/grid-elastic start kibana +} + +start_elasticsearch() { + if [ -f $DEPLOY_ROOT_DIR/$SYSTEM/bin/elasticsearch ]; then + $DEPLOY_ROOT_DIR/$SYSTEM/bin/elasticsearch -d + else + echo 'ElasticSearch is not installed. Run: bin/grid-elastic install elasticsearch' + fi +} + +start_kibana() { + if [ -f $DEPLOY_ROOT_DIR/$SYSTEM/bin/kibana ]; then + mkdir -p $DEPLOY_ROOT_DIR/$SYSTEM/logs + cd $DEPLOY_ROOT_DIR/$SYSTEM + nohup bin/kibana > logs/kibana.log 2>&1 & + cd - > /dev/null + else + echo 'Kibana is not installed. Run: bin/grid-elastic install kibana(32bit)/kibana_64(64bit)' + fi +} + +stop_all() { + $DIR/grid-elastic stop elasticsearch + $DIR/grid-elastic stop kibana +} + +stop_elasticsearch() { + if [ -f $DEPLOY_ROOT_DIR/$SYSTEM/bin/elasticsearch ]; then + cd $DEPLOY_ROOT_DIR/$SYSTEM + ps ax | grep -i 'elasticsearch' | grep java | grep -v grep | awk '{print $1}' | xargs kill -SIGTERM + cd - > /dev/null + else + echo 'Elasticsearch is not installed. Run: bin/grid-elastic install elasticsearch' + fi +} + +stop_kibana() { + if [ -f $DEPLOY_ROOT_DIR/$SYSTEM/bin/kibana ]; then + cd $DEPLOY_ROOT_DIR/$SYSTEM + ps ax | grep -i 'kibana.js' | grep -v grep | awk '{print $1}' | xargs kill -SIGTERM + cd - > /dev/null + else + echo 'Kibana is not installed. Run: bin/grid-elastic install elasticsearch' + fi +} + +# Check arguments +if (test -z "$COMMAND" && test -z "$SYSTEM") \ + || ( [ "$COMMAND" == "help" ] || test -z "$COMMAND" || test -z "$SYSTEM"); then + echo + echo " Usage.." + echo + echo " $ grid-elastic" + echo " $ grid-elastic install [elasticsearch|kibana|all]" + echo " $ grid-elastic start [elasticsearch|kibana|all]" + echo " $ grid-elastic stop [elasticsearch|kibana|all]" + echo + exit 1 +else + echo "EXECUTING: $COMMAND $SYSTEM" + + "$COMMAND"_"$SYSTEM" +fi diff --git a/pom.xml b/pom.xml index f9c4fa9e..f484d579 100644 --- a/pom.xml +++ b/pom.xml @@ -113,7 +113,7 @@ under the License. UTF-8 - 0.9.1 + 0.10.0 diff --git a/src/main/assembly/src.xml b/src/main/assembly/src.xml index f57fee2a..17242652 100644 --- a/src/main/assembly/src.xml +++ b/src/main/assembly/src.xml @@ -51,6 +51,11 @@ config true + + ${basedir}/src/main/config/wikipedia-elastic.properties + config + true + diff --git a/src/main/config/wikipedia-elastic.properties b/src/main/config/wikipedia-elastic.properties new file mode 100644 index 00000000..a7b18414 --- /dev/null +++ b/src/main/config/wikipedia-elastic.properties @@ -0,0 +1,45 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +# Job +job.factory.class=org.apache.samza.job.yarn.YarnJobFactory +job.name=wikipedia-elastic + +# YARN +yarn.package.path=file://${basedir}/target/${project.artifactId}-${pom.version}-dist.tar.gz + +# Task +task.class=samza.examples.wikipedia.task.WikipediaElasticStreamTask +task.inputs=kafka.wikipedia-raw +task.checkpoint.factory=org.apache.samza.checkpoint.kafka.KafkaCheckpointManagerFactory +task.checkpoint.system=kafka +# Normally, this would be 3, but we have only one broker. +task.checkpoint.replication.factor=1 + +# Systems +systems.kafka.samza.factory=org.apache.samza.system.kafka.KafkaSystemFactory +systems.kafka.samza.msg.serde=json +systems.kafka.consumer.zookeeper.connect=localhost:2181/ +systems.kafka.consumer.auto.offset.reset=largest +systems.kafka.producer.bootstrap.servers=localhost:9092 + +systems.elasticsearch.samza.factory=org.apache.samza.system.elasticsearch.ElasticsearchSystemFactory +systems.elasticsearch.client.factory=org.apache.samza.system.elasticsearch.client.TransportClientFactory +systems.elasticsearch.client.transport.host=localhost +systems.elasticsearch.client.transport.port=9300 +systems.elasticsearch.index.request.factory=org.apache.samza.system.elasticsearch.indexrequest.DefaultIndexRequestFactory +systems.elasticsearch.client.elasticsearch.cluster.name=elasticsearch diff --git a/src/main/java/samza/examples/wikipedia/task/WikipediaElasticStreamTask.java b/src/main/java/samza/examples/wikipedia/task/WikipediaElasticStreamTask.java new file mode 100644 index 00000000..15081d8e --- /dev/null +++ b/src/main/java/samza/examples/wikipedia/task/WikipediaElasticStreamTask.java @@ -0,0 +1,89 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package samza.examples.wikipedia.task; + +import java.util.HashMap; +import java.util.Map; +import java.util.regex.Matcher; +import java.util.regex.Pattern; +import org.apache.samza.system.IncomingMessageEnvelope; +import org.apache.samza.system.OutgoingMessageEnvelope; +import org.apache.samza.system.SystemStream; +import org.apache.samza.task.MessageCollector; +import org.apache.samza.task.StreamTask; +import org.apache.samza.task.TaskCoordinator; +import samza.examples.wikipedia.system.WikipediaFeed.WikipediaFeedEvent; + +public class WikipediaElasticStreamTask implements StreamTask { + + // This is the index we intend to submit to. + public static final String ELASTICSEARCH_INDEX = "samza"; + + // This is the type that will be used under _type + public static final String ELASTICSEARCH_TYPE = "wikiedit"; + + @SuppressWarnings("unchecked") + @Override + public void process(IncomingMessageEnvelope envelope, MessageCollector collector, TaskCoordinator coordinator) { + Map jsonObject = (Map) envelope.getMessage(); + WikipediaFeedEvent event = new WikipediaFeedEvent(jsonObject); + + try { + Map parsedJsonObject = parse(event.getRawEvent()); + + parsedJsonObject.put("channel", event.getChannel()); + parsedJsonObject.put("source", event.getSource()); + parsedJsonObject.put("time", event.getTime()); + + collector.send(new OutgoingMessageEnvelope(new SystemStream("elasticsearch", ELASTICSEARCH_INDEX + "/" + ELASTICSEARCH_TYPE), parsedJsonObject)); + } catch (Exception e) { + System.err.println("Unable to parse line: " + event); + } + } + + public static Map parse(String line) { + Pattern p = Pattern.compile("\\[\\[(.*)\\]\\]\\s(.*)\\s(.*)\\s\\*\\s(.*)\\s\\*\\s\\(\\+?(.\\d*)\\)\\s(.*)"); + Matcher m = p.matcher(line); + + if (m.find() && m.groupCount() == 6) { + int byteDiff = Integer.parseInt(m.group(5)); + Map root = new HashMap(); + + root.put("title", m.group(1)); + root.put("user", m.group(4)); + root.put("unparsed-flags", m.group(2)); + root.put("diff-bytes", byteDiff); + root.put("diff-url", m.group(3)); + root.put("summary", m.group(6)); + + return root; + } else { + throw new IllegalArgumentException(); + } + } + + public static void main(String[] args) { + String[] lines = new String[] { "[[Wikipedia talk:Articles for creation/Lords of War]] http://en.wikipedia.org/w/index.php?diff=562991653&oldid=562991567 * BBGLordsofWar * (+95) /* Lords of War: Elves versus Lizardmen */]", "[[David Shepard (surgeon)]] M http://en.wikipedia.org/w/index.php?diff=562993463&oldid=562989820 * Jacobsievers * (+115) /* American Revolution (1775�1783) */ Added to note regarding David Shepard's brothers" }; + + for (String line : lines) { + System.out.println(parse(line)); + } + } +}