junit
junit
@@ -294,7 +300,7 @@ under the License.
UTF-8
- 1.6.0-SNAPSHOT
+ 1.7.0-SNAPSHOT
2.6.1
diff --git a/src/main/config/filter-example.properties b/src/main/config/filter-example.properties
index 02b0fa56..d6ce3ef4 100644
--- a/src/main/config/filter-example.properties
+++ b/src/main/config/filter-example.properties
@@ -18,8 +18,11 @@
# Application/Job
app.class=samza.examples.cookbook.FilterExample
job.factory.class=org.apache.samza.job.yarn.YarnJobFactory
-job.name=pageview-filter
-job.container.count=2
+job.name=startpoint-test-4
+job.container.count=1
+ job.systemstreampartition.grouper.factory=org.apache.samza.container.grouper.stream.AllSspToSingleTaskGrouperFactory
+task.checkpoint.factory=org.apache.samza.checkpoint.kafka.KafkaCheckpointManagerFactory
+task.commit.ms=30000
# YARN
yarn.package.path=file://${basedir}/target/${project.artifactId}-${pom.version}-dist.tar.gz
diff --git a/src/main/java/samza/examples/cookbook/CouchbaseTableExample.java b/src/main/java/samza/examples/cookbook/CouchbaseTableExample.java
deleted file mode 100644
index fdc5bc6c..00000000
--- a/src/main/java/samza/examples/cookbook/CouchbaseTableExample.java
+++ /dev/null
@@ -1,245 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package samza.examples.cookbook;
-
-import com.couchbase.client.java.document.json.JsonObject;
-import com.google.common.base.Preconditions;
-import com.google.common.collect.ImmutableList;
-import com.google.common.collect.ImmutableMap;
-import java.text.SimpleDateFormat;
-import java.time.Duration;
-import java.util.Arrays;
-import java.util.Date;
-import java.util.List;
-import java.util.Map;
-import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.TimeUnit;
-import org.apache.samza.SamzaException;
-import org.apache.samza.application.StreamApplication;
-import org.apache.samza.application.descriptors.StreamApplicationDescriptor;
-import org.apache.samza.context.Context;
-import org.apache.samza.operators.MessageStream;
-import org.apache.samza.operators.OutputStream;
-import org.apache.samza.operators.functions.MapFunction;
-import org.apache.samza.serializers.StringSerde;
-import org.apache.samza.system.kafka.descriptors.KafkaInputDescriptor;
-import org.apache.samza.system.kafka.descriptors.KafkaOutputDescriptor;
-import org.apache.samza.system.kafka.descriptors.KafkaSystemDescriptor;
-import org.apache.samza.table.descriptors.RemoteTableDescriptor;
-import org.apache.samza.table.remote.NoOpTableReadFunction;
-import org.apache.samza.table.remote.RemoteTable;
-import org.apache.samza.table.remote.couchbase.CouchbaseTableWriteFunction;
-import org.apache.samza.table.retry.TableRetryPolicy;
-
-
-/**
- * This is a simple word count example using a remote store.
- *
- * In this example, we use Couchbase to demonstrate how to invoke API's on a remote store other than get, put or delete
- * as defined in {@link org.apache.samza.table.remote.AsyncRemoteTable}. Input messages are collected from user through
- * a Kafka console producer, and tokenized using space. For each word, we increment a counter for this word
- * as well as a counter for all words on Couchbase. We also output the current value of both counters to Kafka console
- * consumer.
- *
- * A rate limit of 4 requests/second to Couchbase is set of the entire job, internally Samza uses an embedded
- * rate limiter, which evenly distributes the total rate limit among tasks. As we invoke 2 calls on Couchbase
- * for each word, you should see roughly 2 messages per second in the Kafka console consumer
- * window.
- *
- * A retry policy with 1 second fixed backoff time and max 3 retries is attached to the remote table.
- *
- * Concepts covered: remote table, rate limiter, retry, arbitrary operation on remote store.
- *
- * To run the below example:
- *
- *
- * -
- * Create a Couchbase instance using docker; Log into the admin UI at http://localhost:8091 (Administrator/password)
- * create a bucket called "my-bucket"
- * Under Security tab, create a user with the same name, set 123456 as the password, and give it "Data Reader"
- * and "Data Writer" privilege for this bucket.
- * More information can be found at https://docs.couchbase.com/server/current/getting-started/do-a-quick-install.html
- *
- * -
- * Create Kafka topics "word-input" and "count-output"
- * ./deploy/kafka/bin/kafka-topics.sh --zookeeper localhost:2181 --create --topic word-input --partitions 2 --replication-factor 1
- * ./deploy/kafka/bin/kafka-topics.sh --zookeeper localhost:2181 --create --topic count-output --partitions 2 --replication-factor 1
- *
- * -
- * Run the application using the run-app.sh script
- * ./deploy/samza/bin/run-app.sh --config-path=$PWD/deploy/samza/config/couchbase-table-example.properties
- *
- * -
- * Consume messages from the output topic
- * ./deploy/kafka/bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic count-output
- *
- * -
- * Produce some messages to the input topic
- * ./deploy/kafka/bin/kafka-console-producer.sh --topic word-input --broker-list localhost:9092
- *
- * After the console producer is started, type
- * 1
- * 2
- * 3
- * 4
- * 5
- * 4
- * 3
- * 2
- * 1
- *
- * You should see messages like below from the console consumer window
- *
- * 2019-05-23 21:18:07 2019-05-23 21:18:07 word=2, count=1, total-count=1
- * 2019-05-23 21:18:07 2019-05-23 21:18:07 word=1, count=1, total-count=2
- * 2019-05-23 21:18:07 2019-05-23 21:18:07 word=4, count=1, total-count=3
- * 2019-05-23 21:18:07 2019-05-23 21:18:07 word=3, count=1, total-count=4
- * 2019-05-23 21:18:08 2019-05-23 21:18:08 word=4, count=2, total-count=5
- * 2019-05-23 21:18:08 2019-05-23 21:18:08 word=5, count=1, total-count=6
- * 2019-05-23 21:18:09 2019-05-23 21:18:09 word=2, count=2, total-count=7
- * 2019-05-23 21:18:09 2019-05-23 21:18:09 word=3, count=2, total-count=8
- * 2019-05-23 21:18:10 2019-05-23 21:18:10 word=1, count=2, total-count=9
- *
- * You can examine the result on Couchbase Admin GUI as well.
- *
- * Note:
- * - If you enter "1 2 3 4 5 4 3 2 1", you should see roughly 1 QPS as
- * the input is processed by only one task
- *
- *
- *
- *
- *
- */
-public class CouchbaseTableExample implements StreamApplication {
-
- private static final SimpleDateFormat DATE_FORMAT = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
-
- private static final String KAFKA_SYSTEM_NAME = "kafka";
- private static final List KAFKA_CONSUMER_ZK_CONNECT = ImmutableList.of("localhost:2181");
- private static final List KAFKA_PRODUCER_BOOTSTRAP_SERVERS = ImmutableList.of("localhost:9092");
- private static final Map KAFKA_DEFAULT_STREAM_CONFIGS = ImmutableMap.of("replication.factor", "1");
-
- private static final String INPUT_STREAM_ID = "word-input";
- private static final String OUTPUT_STREAM_ID = "count-output";
-
- private static final String CLUSTER_NODES = "couchbase://127.0.0.1";
- private static final int COUCHBASE_PORT = 11210;
- private static final String BUCKET_NAME = "my-bucket";
- private static final String BUCKET_PASSWORD = "123456";
- private static final String TOTAL_COUNT_ID = "total-count";
-
- @Override
- public void describe(StreamApplicationDescriptor app) {
-
- KafkaSystemDescriptor kafkaSystemDescriptor = new KafkaSystemDescriptor(KAFKA_SYSTEM_NAME)
- .withConsumerZkConnect(KAFKA_CONSUMER_ZK_CONNECT)
- .withProducerBootstrapServers(KAFKA_PRODUCER_BOOTSTRAP_SERVERS)
- .withDefaultStreamConfigs(KAFKA_DEFAULT_STREAM_CONFIGS);
-
- KafkaInputDescriptor wordInputDescriptor =
- kafkaSystemDescriptor.getInputDescriptor(INPUT_STREAM_ID, new StringSerde());
-
- KafkaOutputDescriptor countOutputDescriptor =
- kafkaSystemDescriptor.getOutputDescriptor(OUTPUT_STREAM_ID, new StringSerde());
-
- MyCouchbaseTableWriteFunction writeFn = new MyCouchbaseTableWriteFunction(BUCKET_NAME, CLUSTER_NODES)
- .withBootstrapCarrierDirectPort(COUCHBASE_PORT)
- .withUsernameAndPassword(BUCKET_NAME, BUCKET_PASSWORD)
- .withTimeout(Duration.ofSeconds(5));
-
- TableRetryPolicy retryPolicy = new TableRetryPolicy()
- .withFixedBackoff(Duration.ofSeconds(1))
- .withStopAfterAttempts(3);
-
- RemoteTableDescriptor couchbaseTableDescriptor = new RemoteTableDescriptor("couchbase-table")
- .withReadFunction(new NoOpTableReadFunction())
- .withReadRateLimiterDisabled()
- .withWriteFunction(writeFn)
- .withWriteRetryPolicy(retryPolicy)
- .withWriteRateLimit(4);
-
- app.withDefaultSystem(kafkaSystemDescriptor);
- MessageStream wordStream = app.getInputStream(wordInputDescriptor);
- OutputStream countStream = app.getOutputStream(countOutputDescriptor);
- app.getTable(couchbaseTableDescriptor);
-
- wordStream
- .flatMap(m -> Arrays.asList(m.split(" ")))
- .filter(word -> word != null && word.length() > 0)
- .map(new MyCountFunction())
- .map(countString -> currentTime() + " " + countString)
- .sendTo(countStream);
- }
-
- static class MyCountFunction implements MapFunction {
-
- private MyCouchbaseTableWriteFunction writeFn;
-
- @Override
- public void init(Context context) {
- RemoteTable table = (RemoteTable) context.getTaskContext().getTable("couchbase-table");
- writeFn = (MyCouchbaseTableWriteFunction) table.getWriteFunction();
- }
-
- @Override
- public String apply(String word) {
- CompletableFuture countFuture = writeFn.incCounter(word);
- CompletableFuture totalCountFuture = writeFn.incCounter(TOTAL_COUNT_ID);
- return String.format("%s word=%s, count=%d, total-count=%d",
- currentTime(), word, countFuture.join(), totalCountFuture.join());
- }
- }
-
- static class MyCouchbaseTableWriteFunction extends CouchbaseTableWriteFunction {
-
- private final static int OP_COUNTER = 1;
-
- public MyCouchbaseTableWriteFunction(String bucketName, String... clusterNodes) {
- super(bucketName, JsonObject.class, clusterNodes);
- }
-
- @Override
- public CompletableFuture writeAsync(int opId, Object... args) {
- switch (opId) {
- case OP_COUNTER:
- Preconditions.checkArgument(2 == args.length,
- String.format("Two arguments (String and int) are expected for counter operation (opId=%d)", opId));
- String id = (String) args[0];
- int delta = (int) args[1];
- return asyncWriteHelper(
- bucket.async().counter(id, delta, 1, timeout.toMillis(), TimeUnit.MILLISECONDS),
- String.format("Failed to invoke counter with Id %s from bucket %s.", id, bucketName),
- false);
- default:
- throw new SamzaException("Unknown opId: " + opId);
- }
- }
-
- public CompletableFuture incCounter(String id) {
- return table.writeAsync(OP_COUNTER, id, 1);
- }
-
- }
-
- private static String currentTime() {
- return DATE_FORMAT.format(new Date());
- }
-
-}
diff --git a/src/main/java/samza/examples/cookbook/FilterExample.java b/src/main/java/samza/examples/cookbook/FilterExample.java
index 218502a5..960615bc 100644
--- a/src/main/java/samza/examples/cookbook/FilterExample.java
+++ b/src/main/java/samza/examples/cookbook/FilterExample.java
@@ -47,22 +47,22 @@
*
* -
* Ensure that the topic "pageview-filter-input" is created
- * ./deploy/kafka/bin/kafka-topics.sh --zookeeper localhost:2181 --create --topic pageview-filter-input --partitions 2 --replication-factor 1
+ * ./deploy/kafka/bin/kafka-topics.sh --zookeeper localhost:2181 --create --topic startpoint-test-input-4 --partitions 2 --replication-factor 1
*
* -
* Run the application using the run-app.sh script
* ./deploy/samza/bin/run-app.sh --config-path=$PWD/deploy/samza/config/filter-example.properties
*
* -
- * Produce some messages to the "pageview-filter-input" topic
- * ./deploy/kafka/bin/kafka-console-producer.sh --topic pageview-filter-input --broker-list localhost:9092
- * {"userId": "user1", "country": "india", "pageId":"google.com"}
- * {"userId": "invalidUserId", "country": "france", "pageId":"facebook.com"}
- * {"userId": "user2", "country": "china", "pageId":"yahoo.com"}
+ * Produce some messages to the "startpoint-test-input-4" topic
+ * ./deploy/kafka/bin/kafka-console-producer.sh --topic startpoint-test-input-4 --broker-list localhost:9092 --property "parse.key=true" --property "key.separator=|"
+ * 0|{"userId": "user1", "country": "india", "pageId":"google.com"}
+ * 0|{"userId": "invalidUserId", "country": "france", "pageId":"facebook.com"}
+ * 1|{"userId": "user2", "country": "china", "pageId":"yahoo.com"}
*
* -
- * Consume messages from the "pageview-filter-output" topic (e.g. bin/kafka-console-consumer.sh)
- * ./deploy/kafka/bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic pageview-filter-output --property print.key=true
+ * Consume messages from the "startpoint-test-output-4" topic (e.g. bin/kafka-console-consumer.sh)
+ * ./deploy/kafka/bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic startpoint-test-output-4 --property print.key=true
*
*
*/
@@ -72,8 +72,8 @@ public class FilterExample implements StreamApplication {
private static final List KAFKA_PRODUCER_BOOTSTRAP_SERVERS = ImmutableList.of("localhost:9092");
private static final Map KAFKA_DEFAULT_STREAM_CONFIGS = ImmutableMap.of("replication.factor", "1");
- private static final String INPUT_STREAM_ID = "pageview-filter-input";
- private static final String OUTPUT_STREAM_ID = "pageview-filter-output";
+ private static final String INPUT_STREAM_ID = "startpoint-test-input-4";
+ private static final String OUTPUT_STREAM_ID = "startpoint-test-output-4";
private static final String INVALID_USER_ID = "invalidUserId";
@Override
diff --git a/src/main/java/samza/examples/cookbook/RemoteTableJoinExample.java b/src/main/java/samza/examples/cookbook/RemoteTableJoinExample.java
deleted file mode 100644
index 4f5c5f77..00000000
--- a/src/main/java/samza/examples/cookbook/RemoteTableJoinExample.java
+++ /dev/null
@@ -1,202 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package samza.examples.cookbook;
-
-import com.google.common.collect.ImmutableList;
-import com.google.common.collect.ImmutableMap;
-
-import java.io.Serializable;
-import java.net.URL;
-import java.time.Duration;
-import java.util.List;
-import java.util.Map;
-import java.util.concurrent.CompletableFuture;
-import org.apache.samza.SamzaException;
-import org.apache.samza.application.StreamApplication;
-import org.apache.samza.application.descriptors.StreamApplicationDescriptor;
-import org.apache.samza.operators.KV;
-import org.apache.samza.operators.MessageStream;
-import org.apache.samza.operators.OutputStream;
-import org.apache.samza.operators.functions.StreamTableJoinFunction;
-import org.apache.samza.serializers.JsonSerdeV2;
-import org.apache.samza.serializers.StringSerde;
-import org.apache.samza.system.kafka.descriptors.KafkaInputDescriptor;
-import org.apache.samza.system.kafka.descriptors.KafkaOutputDescriptor;
-import org.apache.samza.system.kafka.descriptors.KafkaSystemDescriptor;
-import org.apache.samza.table.Table;
-import org.apache.samza.table.descriptors.CachingTableDescriptor;
-import org.apache.samza.table.remote.BaseTableFunction;
-import org.apache.samza.table.remote.TableReadFunction;
-import org.apache.samza.table.descriptors.RemoteTableDescriptor;
-import org.apache.samza.util.ExponentialSleepStrategy;
-import org.apache.samza.util.HttpUtil;
-import org.codehaus.jackson.JsonFactory;
-import org.codehaus.jackson.JsonParser;
-import org.codehaus.jackson.JsonToken;
-import org.codehaus.jackson.annotate.JsonProperty;
-
-/**
- * In this example, we join a stream of stock symbols with a remote table backed by a RESTful service,
- * which delivers latest stock quotes. The join results contain stock symbol and latest price, and are
- * delivered to an output stream.
- *
- * A rate limit of 10 requests/second is set of the entire job, internally Samza uses an embedded
- * rate limiter, which evenly distributes the total rate limit among tasks.
- *
- * A caching table is used over the remote table with a read TTL of 5 seconds, therefore one would
- * receive the same quote with this time span.
- *
- * Concepts covered: remote table, rate limiter, caching table, stream to table joins.
- *
- * To run the below example:
- *
- *
- * -
- * Create Kafka topics "stock-symbol-input", "stock-price-output" are created
- * ./deploy/kafka/bin/kafka-topics.sh --zookeeper localhost:2181 --create --topic stock-symbol-input --partitions 2 --replication-factor 1
- * ./deploy/kafka/bin/kafka-topics.sh --zookeeper localhost:2181 --create --topic stock-price-output --partitions 2 --replication-factor 1
- *
- * -
- * Run the application using the run-app.sh script
- * ./deploy/samza/bin/run-app.sh --config-path=$PWD/deploy/samza/config/remote-table-join-example.properties
- *
- * -
- * Consume messages from the output topic
- * ./deploy/kafka/bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic stock-price-output
- *
- * -
- * Produce some messages to the input topic
- * ./deploy/kafka/bin/kafka-console-producer.sh --topic stock-symbol-input --broker-list localhost:9092
- *
- * After the console producer is started, type
- * MSFT
- *
- * You should see messages like below from the console consumer window
- * {"symbol":"MSFT","close":107.64}
- *
- * Note: you will need a free API key for symbols other than MSFT, see below for more information.
- *
- *
- *
- */
-public class RemoteTableJoinExample implements StreamApplication {
- private static final String KAFKA_SYSTEM_NAME = "kafka";
- private static final List KAFKA_CONSUMER_ZK_CONNECT = ImmutableList.of("localhost:2181");
- private static final List KAFKA_PRODUCER_BOOTSTRAP_SERVERS = ImmutableList.of("localhost:9092");
- private static final Map KAFKA_DEFAULT_STREAM_CONFIGS = ImmutableMap.of("replication.factor", "1");
-
- /**
- * Default API key "demo" only works for symbol "MSFT"; however you can get an
- * API key for free at https://www.alphavantage.co/, which will work for other symbols.
- */
- private static final String API_KEY = "demo";
-
- private static final String URL_TEMPLATE =
- "https://www.alphavantage.co/query?function=TIME_SERIES_DAILY&symbol=%s&apikey=" + API_KEY;
-
- private static final String INPUT_STREAM_ID = "stock-symbol-input";
- private static final String OUTPUT_STREAM_ID = "stock-price-output";
-
- @Override
- public void describe(StreamApplicationDescriptor appDescriptor) {
- KafkaSystemDescriptor kafkaSystemDescriptor = new KafkaSystemDescriptor(KAFKA_SYSTEM_NAME)
- .withConsumerZkConnect(KAFKA_CONSUMER_ZK_CONNECT)
- .withProducerBootstrapServers(KAFKA_PRODUCER_BOOTSTRAP_SERVERS)
- .withDefaultStreamConfigs(KAFKA_DEFAULT_STREAM_CONFIGS);
-
- KafkaInputDescriptor stockSymbolInputDescriptor =
- kafkaSystemDescriptor.getInputDescriptor(INPUT_STREAM_ID, new StringSerde());
- KafkaOutputDescriptor stockPriceOutputDescriptor =
- kafkaSystemDescriptor.getOutputDescriptor(OUTPUT_STREAM_ID, new JsonSerdeV2<>(StockPrice.class));
- appDescriptor.withDefaultSystem(kafkaSystemDescriptor);
- MessageStream stockSymbolStream = appDescriptor.getInputStream(stockSymbolInputDescriptor);
- OutputStream stockPriceStream = appDescriptor.getOutputStream(stockPriceOutputDescriptor);
-
- RemoteTableDescriptor remoteTableDescriptor =
- new RemoteTableDescriptor("remote-table")
- .withReadRateLimit(10)
- .withReadFunction(new StockPriceReadFunction());
- CachingTableDescriptor cachedRemoteTableDescriptor =
- new CachingTableDescriptor<>("cached-remote-table", remoteTableDescriptor)
- .withReadTtl(Duration.ofSeconds(5));
- Table> cachedRemoteTable = appDescriptor.getTable(cachedRemoteTableDescriptor);
-
- stockSymbolStream
- .map(symbol -> new KV(symbol, null))
- .join(cachedRemoteTable, new JoinFn())
- .sendTo(stockPriceStream);
-
- }
-
- static class JoinFn implements StreamTableJoinFunction, KV, StockPrice> {
- @Override
- public StockPrice apply(KV message, KV record) {
- return record == null ? null : new StockPrice(message.getKey(), record.getValue());
- }
- @Override
- public String getMessageKey(KV message) {
- return message.getKey();
- }
- @Override
- public String getRecordKey(KV record) {
- return record.getKey();
- }
- }
-
- static class StockPriceReadFunction extends BaseTableFunction
- implements TableReadFunction {
- @Override
- public CompletableFuture getAsync(String symbol) {
- return CompletableFuture.supplyAsync(() -> {
- try {
- URL url = new URL(String.format(URL_TEMPLATE, symbol));
- String response = HttpUtil.read(url, 5000, new ExponentialSleepStrategy());
- JsonParser parser = new JsonFactory().createJsonParser(response);
- while (!parser.isClosed()) {
- if (JsonToken.FIELD_NAME.equals(parser.nextToken()) && "4. close".equalsIgnoreCase(parser.getCurrentName())) {
- return Double.valueOf(parser.nextTextValue());
- }
- }
- return -1d;
- } catch (Exception ex) {
- throw new SamzaException(ex);
- }
- });
- }
-
- @Override
- public boolean isRetriable(Throwable throwable) {
- return false;
- }
- }
-
- public static class StockPrice implements Serializable {
-
- public final String symbol;
- public final Double close;
-
- public StockPrice(
- @JsonProperty("symbol") String symbol,
- @JsonProperty("close") Double close) {
- this.symbol = symbol;
- this.close = close;
- }
- }
-
-}
diff --git a/src/main/java/samza/examples/cookbook/StartpointMain.java b/src/main/java/samza/examples/cookbook/StartpointMain.java
new file mode 100644
index 00000000..c9b2a11e
--- /dev/null
+++ b/src/main/java/samza/examples/cookbook/StartpointMain.java
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package samza.examples.cookbook;
+
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.ImmutableSet;
+import java.io.IOException;
+import java.util.Map;
+import java.util.Optional;
+import org.apache.samza.Partition;
+import org.apache.samza.config.MapConfig;
+import org.apache.samza.container.TaskName;
+import org.apache.samza.coordinator.metadatastore.CoordinatorStreamStore;
+import org.apache.samza.startpoint.Startpoint;
+import org.apache.samza.startpoint.StartpointManager;
+import org.apache.samza.startpoint.StartpointOldest;
+import org.apache.samza.startpoint.StartpointUpcoming;
+import org.apache.samza.system.SystemStreamPartition;
+import org.apache.samza.util.NoOpMetricsRegistry;
+
+
+public class StartpointMain {
+
+ public static void main(String[] args) throws IOException {
+ Map configs = ImmutableMap.of(
+ "job.name", "startpoint-test-4",
+ "job.coordinator.system", "kafka",
+ "systems.kafka.consumer.zookeeper.connect", "localhost:2181",
+ "systems.kafka.samza.factory", "org.apache.samza.system.kafka.KafkaSystemFactory",
+ "systems.kafka.producer.bootstrap.servers", "localhost:9092"
+ );
+
+ TaskName taskName = new TaskName("Task-0");
+
+ SystemStreamPartition p0 = new SystemStreamPartition("kafka", "startpoint-test-input-4", new Partition(0));
+ SystemStreamPartition p1 = new SystemStreamPartition("kafka", "startpoint-test-input-4", new Partition(1));
+
+ CoordinatorStreamStore store =
+ new CoordinatorStreamStore(new MapConfig(configs), new NoOpMetricsRegistry());
+ store.init();
+ StartpointManager startpointManager = new StartpointManager(store);
+ startpointManager.start();
+
+// startpointManager.writeStartpoint(p0, new StartpointUpcoming());
+// startpointManager.writeStartpoint(p1, new StartpointUpcoming());
+
+ System.out.println(startpointManager.readStartpoint(p0));
+ System.out.println(startpointManager.readStartpoint(p1));
+
+// System.out.println(startpointManager.getFanOutForTask(new TaskName("Partition 0")));
+// System.out.println(startpointManager.getFanOutForTask(new TaskName("Partition 1")));
+ System.out.println(startpointManager.getFanOutForTask(taskName));
+
+// startpointManager.fanOut(ImmutableMap.of(taskName, ImmutableSet.of(p0, p1)));
+
+ }
+}
diff --git a/src/main/java/samza/examples/cookbook/data/PageView.java b/src/main/java/samza/examples/cookbook/data/PageView.java
index 96406944..7a700f0f 100644
--- a/src/main/java/samza/examples/cookbook/data/PageView.java
+++ b/src/main/java/samza/examples/cookbook/data/PageView.java
@@ -18,7 +18,8 @@
*/
package samza.examples.cookbook.data;
-import org.codehaus.jackson.annotate.JsonProperty;
+import com.fasterxml.jackson.annotation.JsonProperty;
+
/**
* A page view event
diff --git a/src/main/resources/log4j2.xml b/src/main/resources/log4j2.xml
index adcdadb1..440354ad 100644
--- a/src/main/resources/log4j2.xml
+++ b/src/main/resources/log4j2.xml
@@ -42,10 +42,10 @@
-
+
-
+
diff --git a/src/test/java/samza/examples/cookbook/test/TestSamzaCookBookExamples.java b/src/test/java/samza/examples/cookbook/test/TestSamzaCookBookExamples.java
deleted file mode 100644
index 1e4b39a6..00000000
--- a/src/test/java/samza/examples/cookbook/test/TestSamzaCookBookExamples.java
+++ /dev/null
@@ -1,179 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package samza.examples.cookbook.test;
-
-import java.time.Duration;
-import java.util.ArrayList;
-import java.util.List;
-import org.apache.samza.operators.KV;
-import org.apache.samza.serializers.NoOpSerde;
-import org.apache.samza.test.framework.StreamAssert;
-import org.apache.samza.test.framework.TestRunner;
-import org.apache.samza.test.framework.system.descriptors.InMemoryInputDescriptor;
-import org.apache.samza.test.framework.system.descriptors.InMemoryOutputDescriptor;
-import org.apache.samza.test.framework.system.descriptors.InMemorySystemDescriptor;
-import org.junit.Assert;
-import org.junit.Test;
-import samza.examples.cookbook.FilterExample;
-import samza.examples.cookbook.JoinExample;
-import samza.examples.cookbook.SessionWindowExample;
-import samza.examples.cookbook.StreamTableJoinExample;
-import samza.examples.cookbook.TumblingWindowExample;
-import samza.examples.cookbook.data.AdClick;
-import samza.examples.cookbook.data.PageView;
-import samza.examples.cookbook.data.Profile;
-import samza.examples.cookbook.data.UserPageViews;
-import samza.examples.test.utils.TestUtils;
-
-import static samza.examples.cookbook.StreamTableJoinExample.EnrichedPageView;
-
-
-public class TestSamzaCookBookExamples {
- @Test
- public void testFilterExample() {
- List rawPageViewEvents = new ArrayList<>();
- rawPageViewEvents.add(new PageView("google.com", "user1", "india"));
- rawPageViewEvents.add(new PageView("facebook.com", "invalidUserId", "france"));
- rawPageViewEvents.add(new PageView("yahoo.com", "user2", "china"));
-
- InMemorySystemDescriptor inMemorySystem = new InMemorySystemDescriptor("kafka");
-
- InMemoryInputDescriptor badPageViewEvents =
- inMemorySystem.getInputDescriptor("pageview-filter-input", new NoOpSerde());
-
- InMemoryOutputDescriptor goodPageViewEvents =
- inMemorySystem.getOutputDescriptor("pageview-filter-output", new NoOpSerde());
-
- TestRunner
- .of(new FilterExample())
- .addInputStream(badPageViewEvents, rawPageViewEvents)
- .addOutputStream(goodPageViewEvents, 1)
- .run(Duration.ofMillis(1500));
-
- Assert.assertEquals(TestRunner.consumeStream(goodPageViewEvents, Duration.ofMillis(1000)).get(0).size(), 2);
- }
-
- @Test
- public void testJoinExample() {
- List pageViewEvents = new ArrayList<>();
- pageViewEvents.add(new PageView("google.com", "user1", "india"));
- pageViewEvents.add(new PageView("yahoo.com", "user2", "china"));
- List adClickEvents = new ArrayList<>();
- adClickEvents.add(new AdClick("google.com", "adClickId1", "user1"));
- adClickEvents.add(new AdClick("yahoo.com", "adClickId2", "user1"));
-
- InMemorySystemDescriptor inMemorySystem = new InMemorySystemDescriptor("kafka");
-
- InMemoryInputDescriptor pageViews =
- inMemorySystem.getInputDescriptor("pageview-join-input", new NoOpSerde());
-
- InMemoryInputDescriptor adClicks =
- inMemorySystem.getInputDescriptor("adclick-join-input", new NoOpSerde());
-
- InMemoryOutputDescriptor pageViewAdClickJoin =
- inMemorySystem.getOutputDescriptor("pageview-adclick-join-output", new NoOpSerde<>());
-
- TestRunner
- .of(new JoinExample())
- .addInputStream(pageViews, pageViewEvents)
- .addInputStream(adClicks, adClickEvents)
- .addOutputStream(pageViewAdClickJoin, 1)
- .run(Duration.ofMillis(1500));
-
- Assert.assertEquals(TestRunner.consumeStream(pageViewAdClickJoin, Duration.ofMillis(1000)).get(0).size(), 2);
- }
-
- @Test
- public void testTumblingWindowExample() {
- List pageViewEvents = TestUtils.genSamplePageViewData();
-
- InMemorySystemDescriptor inMemorySystem = new InMemorySystemDescriptor("kafka");
-
- InMemoryInputDescriptor> pageViewInputDescriptor =
- inMemorySystem.getInputDescriptor("pageview-tumbling-input", new NoOpSerde>());
-
- InMemoryOutputDescriptor> userPageViewOutputDescriptor =
- inMemorySystem.getOutputDescriptor("pageview-tumbling-output", new NoOpSerde>());
-
- TestRunner
- .of(new TumblingWindowExample())
- .addInputStream(pageViewInputDescriptor, pageViewEvents)
- .addOutputStream(userPageViewOutputDescriptor, 1)
- .run(Duration.ofMinutes(1));
-
- Assert.assertTrue(TestRunner.consumeStream(userPageViewOutputDescriptor, Duration.ofMillis(1000)).get(0).size() > 1);
- }
-
- @Test
- public void testSessionWindowExample() {
- List pageViewEvents = TestUtils.genSamplePageViewData();
-
- InMemorySystemDescriptor inMemorySystem = new InMemorySystemDescriptor("kafka");
-
- InMemoryInputDescriptor> pageViewInputDescriptor =
- inMemorySystem.getInputDescriptor("pageview-session-input", new NoOpSerde>());
-
- InMemoryOutputDescriptor> userPageViewOutputDescriptor =
- inMemorySystem.getOutputDescriptor("pageview-session-output", new NoOpSerde>());
-
- TestRunner
- .of(new SessionWindowExample())
- .addInputStream(pageViewInputDescriptor, pageViewEvents)
- .addOutputStream(userPageViewOutputDescriptor, 1)
- .run(Duration.ofMinutes(1));
-
- Assert.assertEquals(2, TestRunner.consumeStream(userPageViewOutputDescriptor, Duration.ofMillis(1000)).get(0).size());
- }
-
- @Test
- public void testStreamTableJoinExample() throws InterruptedException{
- List pageViewEvents = new ArrayList<>();
- pageViewEvents.add(new PageView("google.com", "user1", "india"));
- pageViewEvents.add(new PageView("yahoo.com", "user2", "china"));
- List profiles = new ArrayList<>();
- profiles.add(new Profile("user1", "LNKD"));
- profiles.add(new Profile("user2", "MSFT"));
-
- InMemorySystemDescriptor inMemorySystem = new InMemorySystemDescriptor("kafka");
-
- InMemoryInputDescriptor pageViews =
- inMemorySystem.getInputDescriptor("pageview-join-input", new NoOpSerde());
-
- InMemoryInputDescriptor profileViews =
- inMemorySystem.getInputDescriptor("profile-table-input", new NoOpSerde());
-
- InMemoryOutputDescriptor joinResultOutputDescriptor =
- inMemorySystem.getOutputDescriptor("enriched-pageview-join-output", new NoOpSerde());
-
- TestRunner
- .of(new StreamTableJoinExample())
- .addInputStream(pageViews, pageViewEvents)
- .addInputStream(profileViews, profiles)
- .addOutputStream(joinResultOutputDescriptor, 1)
- .run(Duration.ofMillis(1500));
-
- List expectedOutput = new ArrayList<>();
- expectedOutput.add(new EnrichedPageView("user1", "LNKD", "google.com"));
- expectedOutput.add(new EnrichedPageView("user2", "MSFT", "yahoo.com"));
-
- StreamAssert.containsInAnyOrder(expectedOutput, joinResultOutputDescriptor, Duration.ofMillis(200));
-
- }
-
-}
diff --git a/src/test/java/samza/examples/test/utils/TestUtils.java b/src/test/java/samza/examples/test/utils/TestUtils.java
deleted file mode 100644
index d5e957ee..00000000
--- a/src/test/java/samza/examples/test/utils/TestUtils.java
+++ /dev/null
@@ -1,93 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package samza.examples.test.utils;
-
-import com.google.common.io.Resources;
-import java.io.BufferedReader;
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.InputStreamReader;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.stream.Collectors;
-import org.codehaus.jackson.map.ObjectMapper;
-import samza.examples.cookbook.data.PageView;
-import samza.examples.wikipedia.application.WikipediaApplication;
-
-import static samza.examples.wikipedia.system.WikipediaFeed.WikipediaFeedEvent;
-
-
-public class TestUtils {
-
- public static List genWikipediaFeedEvents(String channel) {
- List wikiEvents = null;
- switch (channel) {
- case WikipediaApplication.WIKIPEDIA_CHANNEL:
- wikiEvents = readFile("WikipediaEditEvents.txt");
- break;
-
- case WikipediaApplication.WIKINEWS_CHANNEL:
- wikiEvents = readFile("WikinewsEditEvents.txt");
- break;
-
- case WikipediaApplication.WIKTIONARY_CHANNEL:
- wikiEvents = readFile("WikitionaryEditEvents.txt");
- break;
- }
- ObjectMapper mapper = new ObjectMapper();
- return wikiEvents.stream().map(event -> {
- try {
- return new WikipediaFeedEvent(mapper.readValue(event, HashMap.class));
- } catch (Exception e) {
- e.printStackTrace();
- }
- return null;
- }).filter(x -> x != null).collect(Collectors.toList());
- }
-
- public static List genSamplePageViewData() {
- List pageViewEvents = new ArrayList<>();
- pageViewEvents.add(new PageView("google.com/home", "user1", "india"));
- pageViewEvents.add(new PageView("google.com/search", "user1", "india"));
- pageViewEvents.add(new PageView("yahoo.com/home", "user2", "china"));
- pageViewEvents.add(new PageView("yahoo.com/search", "user2", "china"));
- pageViewEvents.add(new PageView("google.com/news", "user1", "india"));
- pageViewEvents.add(new PageView("yahoo.com/fashion", "user2", "china"));
- return pageViewEvents;
- }
-
- private static List readFile(String path) {
- try {
- InputStream in = Resources.getResource(path).openStream();
- List lines = new ArrayList<>();
- String line = null;
- BufferedReader reader = new BufferedReader(new InputStreamReader(in));
- while ((line = reader.readLine()) != null) {
- lines.add(line);
- }
- reader.close();
- return lines;
- } catch (IOException e) {
- e.printStackTrace();
- return null;
- }
- }
-}
diff --git a/src/test/java/samza/examples/wikipedia/application/test/TestWikipediaApplication.java b/src/test/java/samza/examples/wikipedia/application/test/TestWikipediaApplication.java
deleted file mode 100644
index dc1b5bd2..00000000
--- a/src/test/java/samza/examples/wikipedia/application/test/TestWikipediaApplication.java
+++ /dev/null
@@ -1,82 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package samza.examples.wikipedia.application.test;
-
-import java.time.Duration;
-import java.util.HashMap;
-import java.util.Map;
-import org.apache.samza.serializers.NoOpSerde;
-import org.apache.samza.test.framework.TestRunner;
-import org.apache.samza.test.framework.system.descriptors.InMemoryInputDescriptor;
-import org.apache.samza.test.framework.system.descriptors.InMemoryOutputDescriptor;
-import org.apache.samza.test.framework.system.descriptors.InMemorySystemDescriptor;
-import org.junit.Assert;
-import org.junit.Test;
-import samza.examples.wikipedia.application.WikipediaApplication;
-import samza.examples.test.utils.TestUtils;
-
-
-public class TestWikipediaApplication {
-
- @Test
- public void testWikipediaApplication() throws Exception {
-
- InMemorySystemDescriptor wikipediaSystemDescriptor = new InMemorySystemDescriptor("wikipedia");
-
- // These config must be removed once examples are refactored to use Table-API
- Map conf = new HashMap<>();
- conf.put("stores.wikipedia-stats.factory", "org.apache.samza.storage.kv.RocksDbKeyValueStorageEngineFactory");
- conf.put("stores.wikipedia-stats.key.serde", "string");
- conf.put("stores.wikipedia-stats.msg.serde", "integer");
- conf.put("serializers.registry.string.class", "org.apache.samza.serializers.StringSerdeFactory");
- conf.put("serializers.registry.integer.class", "org.apache.samza.serializers.IntegerSerdeFactory");
-
- InMemoryInputDescriptor wikipediaInputDescriptor = wikipediaSystemDescriptor
- .getInputDescriptor("en-wikipedia", new NoOpSerde<>())
- .withPhysicalName(WikipediaApplication.WIKIPEDIA_CHANNEL);
-
- InMemoryInputDescriptor wiktionaryInputDescriptor = wikipediaSystemDescriptor
- .getInputDescriptor("en-wiktionary", new NoOpSerde<>())
- .withPhysicalName(WikipediaApplication.WIKTIONARY_CHANNEL);
-
- InMemoryInputDescriptor wikiNewsInputDescriptor = wikipediaSystemDescriptor
- .getInputDescriptor("en-wikinews", new NoOpSerde<>())
- .withPhysicalName(WikipediaApplication.WIKINEWS_CHANNEL);
-
- InMemorySystemDescriptor kafkaSystemDescriptor = new InMemorySystemDescriptor("kafka");
-
- InMemoryOutputDescriptor outputStreamDesc = kafkaSystemDescriptor
- .getOutputDescriptor("wikipedia-stats", new NoOpSerde<>());
-
-
- TestRunner
- .of(new WikipediaApplication())
- .addInputStream(wikipediaInputDescriptor, TestUtils.genWikipediaFeedEvents(WikipediaApplication.WIKIPEDIA_CHANNEL))
- .addInputStream(wiktionaryInputDescriptor, TestUtils.genWikipediaFeedEvents(WikipediaApplication.WIKTIONARY_CHANNEL))
- .addInputStream(wikiNewsInputDescriptor, TestUtils.genWikipediaFeedEvents(WikipediaApplication.WIKINEWS_CHANNEL))
- .addOutputStream(outputStreamDesc, 1)
- .addConfig(conf)
- .addConfig("deploy.test", "true")
- .run(Duration.ofMinutes(1));
-
- Assert.assertTrue(TestRunner.consumeStream(outputStreamDesc, Duration.ofMillis(100)).get(0).size() > 0);
- }
-
-}
diff --git a/src/test/java/samza/examples/wikipedia/task/test/TestWikipediaTask.java b/src/test/java/samza/examples/wikipedia/task/test/TestWikipediaTask.java
deleted file mode 100644
index 0fc992ad..00000000
--- a/src/test/java/samza/examples/wikipedia/task/test/TestWikipediaTask.java
+++ /dev/null
@@ -1,70 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package samza.examples.wikipedia.task.test;
-
-import java.time.Duration;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import org.apache.samza.serializers.NoOpSerde;
-import org.apache.samza.test.framework.TestRunner;
-import org.apache.samza.test.framework.system.descriptors.InMemoryInputDescriptor;
-import org.apache.samza.test.framework.system.descriptors.InMemoryOutputDescriptor;
-import org.apache.samza.test.framework.system.descriptors.InMemorySystemDescriptor;
-import org.codehaus.jackson.map.ObjectMapper;
-import org.junit.Assert;
-import org.junit.Test;
-import samza.examples.wikipedia.system.WikipediaFeed.WikipediaFeedEvent;
-import samza.examples.wikipedia.task.application.WikipediaParserTaskApplication;
-
-public class TestWikipediaTask {
-
- @Test
- public void testWikipediaFeedTask() throws Exception {
- String[] wikipediaFeedSamples = new String[] { "{\"channel\":\"#en.wikipedia\",\"raw\":\"[[Fear Is the Key (song)]] https://en.wikipedia.org/w/index.php?diff=865574761&oldid=861177329 * Sam Sailor * (+46) Redirecting to [[Fear of the Dark (Iron Maiden album)]] ([[User:Sam Sailor/Scripts/Sagittarius+|♐]])\",\"time\":1540408899419,\"source\":\"rc-pmtpa\"}" };
-
- InMemorySystemDescriptor isd = new InMemorySystemDescriptor("kafka");
-
- InMemoryInputDescriptor rawWikiEvents = isd
- .getInputDescriptor("wikipedia-raw", new NoOpSerde<>());
-
- InMemoryOutputDescriptor outputStreamDesc = isd
- .getOutputDescriptor("wikipedia-edits", new NoOpSerde<>());
-
- TestRunner
- .of(new WikipediaParserTaskApplication())
- .addInputStream(rawWikiEvents, parseJSONToMap(wikipediaFeedSamples))
- .addOutputStream(outputStreamDesc, 1)
- .run(Duration.ofSeconds(2));
-
- Assert.assertEquals(1
- , TestRunner.consumeStream(outputStreamDesc, Duration.ofSeconds(1)).get(0).size());
- }
-
- public static List