Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
160 changes: 160 additions & 0 deletions bin/grid-elastic
Original file line number Diff line number Diff line change
@@ -0,0 +1,160 @@
#!/bin/bash -e
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.

# This script will download, setup, start, and stop servers for Kafka, YARN, and ZooKeeper,
# as well as downloading, building and locally publishing Samza

if [ -z "$JAVA_HOME" ]; then
if [ -x /usr/libexec/java_home ]; then
export JAVA_HOME="$(/usr/libexec/java_home)"
else
echo "JAVA_HOME not set. Exiting."
exit 1
fi
fi

DIR="$( cd "$( dirname "${BASH_SOURCE[0]}" )" && pwd )"
BASE_DIR=$(dirname $DIR)
DEPLOY_ROOT_DIR=$BASE_DIR/deploy
DOWNLOAD_CACHE_DIR=$HOME/.samza/download
COMMAND=$1
SYSTEM=$2

DOWNLOAD_ELASTICSEARCH=https://download.elastic.co/elasticsearch/elasticsearch/elasticsearch-1.7.0.tar.gz
DOWNLOAD_KIBANA=https://download.elastic.co/kibana/kibana/kibana-4.1.1-linux-x86.tar.gz
DOWNLOAD_KIBANA_64=https://download.elastic.co/kibana/kibana/kibana-4.1.1-linux-x64.tar.gz

bootstrap() {
echo "Bootstrapping the system..."
stop_all
rm -rf "$DEPLOY_ROOT_DIR"
mkdir "$DEPLOY_ROOT_DIR"
install_all
start_all
exit 0
}

install_all() {
$DIR/grid-elastic install elasticsearch
$DIR/grid-elastic install kibana
}

install_all_64() {
$DIR/grid-elastic install elasticsearch
$DIR/grid-elastic install kibana_64
}

install_elasticsearch() {
mkdir -p "$DEPLOY_ROOT_DIR"
install elasticsearch $DOWNLOAD_ELASTICSEARCH elasticsearch-1.7.0
}

install_kibana() {
mkdir -p "$DEPLOY_ROOT_DIR"
install kibana $DOWNLOAD_KIBANA kibana-4.1.1-linux-x86
}

install_kibana_64() {
mkdir -p "$DEPLOY_ROOT_DIR"
install kibana $DOWNLOAD_KIBANA_64 kibana-4.1.1-linux-x64
}

install() {
DESTINATION_DIR="$DEPLOY_ROOT_DIR/$1"
DOWNLOAD_URL=$2
PACKAGE_DIR="$DOWNLOAD_CACHE_DIR/$3"
PACKAGE_FILE="$DOWNLOAD_CACHE_DIR/$(basename $DOWNLOAD_URL)"
if [ -f "$PACKAGE_FILE" ]; then
echo "Using previously downloaded file $PACKAGE_FILE"
else
echo "Downloading $(basename $DOWNLOAD_URL)..."
mkdir -p $DOWNLOAD_CACHE_DIR
curl "$DOWNLOAD_URL" > "${PACKAGE_FILE}.tmp"
mv "${PACKAGE_FILE}.tmp" "$PACKAGE_FILE"
fi
rm -rf "$DESTINATION_DIR" "$PACKAGE_DIR"
tar -xf "$PACKAGE_FILE" -C $DOWNLOAD_CACHE_DIR
mv "$PACKAGE_DIR" "$DESTINATION_DIR"
}

start_all() {
$DIR/grid-elastic start elasticsearch
$DIR/grid-elastic start kibana
}

start_elasticsearch() {
if [ -f $DEPLOY_ROOT_DIR/$SYSTEM/bin/elasticsearch ]; then
$DEPLOY_ROOT_DIR/$SYSTEM/bin/elasticsearch -d
else
echo 'ElasticSearch is not installed. Run: bin/grid-elastic install elasticsearch'
fi
}

start_kibana() {
if [ -f $DEPLOY_ROOT_DIR/$SYSTEM/bin/kibana ]; then
mkdir -p $DEPLOY_ROOT_DIR/$SYSTEM/logs
cd $DEPLOY_ROOT_DIR/$SYSTEM
nohup bin/kibana > logs/kibana.log 2>&1 &
cd - > /dev/null
else
echo 'Kibana is not installed. Run: bin/grid-elastic install kibana(32bit)/kibana_64(64bit)'
fi
}

stop_all() {
$DIR/grid-elastic stop elasticsearch
$DIR/grid-elastic stop kibana
}

stop_elasticsearch() {
if [ -f $DEPLOY_ROOT_DIR/$SYSTEM/bin/elasticsearch ]; then
cd $DEPLOY_ROOT_DIR/$SYSTEM
ps ax | grep -i 'elasticsearch' | grep java | grep -v grep | awk '{print $1}' | xargs kill -SIGTERM
cd - > /dev/null
else
echo 'Elasticsearch is not installed. Run: bin/grid-elastic install elasticsearch'
fi
}

stop_kibana() {
if [ -f $DEPLOY_ROOT_DIR/$SYSTEM/bin/kibana ]; then
cd $DEPLOY_ROOT_DIR/$SYSTEM
ps ax | grep -i 'kibana.js' | grep -v grep | awk '{print $1}' | xargs kill -SIGTERM
cd - > /dev/null
else
echo 'Kibana is not installed. Run: bin/grid-elastic install elasticsearch'
fi
}

# Check arguments
if (test -z "$COMMAND" && test -z "$SYSTEM") \
|| ( [ "$COMMAND" == "help" ] || test -z "$COMMAND" || test -z "$SYSTEM"); then
echo
echo " Usage.."
echo
echo " $ grid-elastic"
echo " $ grid-elastic install [elasticsearch|kibana|all]"
echo " $ grid-elastic start [elasticsearch|kibana|all]"
echo " $ grid-elastic stop [elasticsearch|kibana|all]"
echo
exit 1
else
echo "EXECUTING: $COMMAND $SYSTEM"

"$COMMAND"_"$SYSTEM"
fi
2 changes: 1 addition & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,7 @@ under the License.
<properties>
<!-- maven specific properties -->
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<samza.version>0.9.1</samza.version>
<samza.version>0.10.0</samza.version>
</properties>

<developers>
Expand Down
5 changes: 5 additions & 0 deletions src/main/assembly/src.xml
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,11 @@
<outputDirectory>config</outputDirectory>
<filtered>true</filtered>
</file>
<file>
<source>${basedir}/src/main/config/wikipedia-elastic.properties</source>
<outputDirectory>config</outputDirectory>
<filtered>true</filtered>
</file>
</files>
<dependencySets>
<dependencySet>
Expand Down
45 changes: 45 additions & 0 deletions src/main/config/wikipedia-elastic.properties
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.

# Job
job.factory.class=org.apache.samza.job.yarn.YarnJobFactory
job.name=wikipedia-elastic

# YARN
yarn.package.path=file://${basedir}/target/${project.artifactId}-${pom.version}-dist.tar.gz

# Task
task.class=samza.examples.wikipedia.task.WikipediaElasticStreamTask
task.inputs=kafka.wikipedia-raw
task.checkpoint.factory=org.apache.samza.checkpoint.kafka.KafkaCheckpointManagerFactory
task.checkpoint.system=kafka
# Normally, this would be 3, but we have only one broker.
task.checkpoint.replication.factor=1

# Systems
systems.kafka.samza.factory=org.apache.samza.system.kafka.KafkaSystemFactory
systems.kafka.samza.msg.serde=json
systems.kafka.consumer.zookeeper.connect=localhost:2181/
systems.kafka.consumer.auto.offset.reset=largest
systems.kafka.producer.bootstrap.servers=localhost:9092

systems.elasticsearch.samza.factory=org.apache.samza.system.elasticsearch.ElasticsearchSystemFactory
systems.elasticsearch.client.factory=org.apache.samza.system.elasticsearch.client.TransportClientFactory
systems.elasticsearch.client.transport.host=localhost
systems.elasticsearch.client.transport.port=9300
systems.elasticsearch.index.request.factory=org.apache.samza.system.elasticsearch.indexrequest.DefaultIndexRequestFactory
systems.elasticsearch.client.elasticsearch.cluster.name=elasticsearch
Original file line number Diff line number Diff line change
@@ -0,0 +1,89 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package samza.examples.wikipedia.task;

import java.util.HashMap;
import java.util.Map;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import org.apache.samza.system.IncomingMessageEnvelope;
import org.apache.samza.system.OutgoingMessageEnvelope;
import org.apache.samza.system.SystemStream;
import org.apache.samza.task.MessageCollector;
import org.apache.samza.task.StreamTask;
import org.apache.samza.task.TaskCoordinator;
import samza.examples.wikipedia.system.WikipediaFeed.WikipediaFeedEvent;

public class WikipediaElasticStreamTask implements StreamTask {

// This is the index we intend to submit to.
public static final String ELASTICSEARCH_INDEX = "samza";

// This is the type that will be used under _type
public static final String ELASTICSEARCH_TYPE = "wikiedit";

@SuppressWarnings("unchecked")
@Override
public void process(IncomingMessageEnvelope envelope, MessageCollector collector, TaskCoordinator coordinator) {
Map<String, Object> jsonObject = (Map<String, Object>) envelope.getMessage();
WikipediaFeedEvent event = new WikipediaFeedEvent(jsonObject);

try {
Map<String, Object> parsedJsonObject = parse(event.getRawEvent());

parsedJsonObject.put("channel", event.getChannel());
parsedJsonObject.put("source", event.getSource());
parsedJsonObject.put("time", event.getTime());

collector.send(new OutgoingMessageEnvelope(new SystemStream("elasticsearch", ELASTICSEARCH_INDEX + "/" + ELASTICSEARCH_TYPE), parsedJsonObject));
} catch (Exception e) {
System.err.println("Unable to parse line: " + event);
}
}

public static Map<String, Object> parse(String line) {
Pattern p = Pattern.compile("\\[\\[(.*)\\]\\]\\s(.*)\\s(.*)\\s\\*\\s(.*)\\s\\*\\s\\(\\+?(.\\d*)\\)\\s(.*)");
Matcher m = p.matcher(line);

if (m.find() && m.groupCount() == 6) {
int byteDiff = Integer.parseInt(m.group(5));
Map<String, Object> root = new HashMap<String, Object>();

root.put("title", m.group(1));
root.put("user", m.group(4));
root.put("unparsed-flags", m.group(2));
root.put("diff-bytes", byteDiff);
root.put("diff-url", m.group(3));
root.put("summary", m.group(6));

return root;
} else {
throw new IllegalArgumentException();
}
}

public static void main(String[] args) {
String[] lines = new String[] { "[[Wikipedia talk:Articles for creation/Lords of War]] http://en.wikipedia.org/w/index.php?diff=562991653&oldid=562991567 * BBGLordsofWar * (+95) /* Lords of War: Elves versus Lizardmen */]", "[[David Shepard (surgeon)]] M http://en.wikipedia.org/w/index.php?diff=562993463&oldid=562989820 * Jacobsievers * (+115) /* American Revolution (1775�1783) */ Added to note regarding David Shepard's brothers" };

for (String line : lines) {
System.out.println(parse(line));
}
}
}