From 74ba47cb6b9c217621befb5e11971d11ae60d6b1 Mon Sep 17 00:00:00 2001 From: Derrick Williams Date: Mon, 8 Dec 2025 18:57:11 +0000 Subject: [PATCH 01/12] first draft --- yaml/src/main/yaml/PubSubToBigTable.yaml | 124 +++++++++++++++++++++++ 1 file changed, 124 insertions(+) create mode 100644 yaml/src/main/yaml/PubSubToBigTable.yaml diff --git a/yaml/src/main/yaml/PubSubToBigTable.yaml b/yaml/src/main/yaml/PubSubToBigTable.yaml new file mode 100644 index 0000000000..8d9ab66860 --- /dev/null +++ b/yaml/src/main/yaml/PubSubToBigTable.yaml @@ -0,0 +1,124 @@ +template: + + name: "PubSub_To_BigTable_Yaml" + category: "STREAMING" + type: "YAML" + display_name: "PubSub to BigTable (YAML)" + description: > + The PubSub to BigTable template is a streaming pipeline which ingests + data from a PubSub topic, executes a user-defined mapping, and + writes the resulting records to BigTable. Any errors which occur in the + transformation of the data are written to a separate Pub/Sub topic. + flex_container_name: "pubsub-to-bigtable-yaml" + yamlTemplateFile: "PubSubToBigTable.yaml" + filesToCopy: > + {"PubSubToBigTable.yaml", "main.py", "requirements.txt"} + documentation: > + https://cloud.google.com/dataflow/docs/guides/templates/provided-yaml/pubsub-to-bigtable + contactInformation: "https://cloud.google.com/support" + requirements: { + "The output BigTable table must exist.", + "The input Pub/Sub topic must exist.", + "The error Pub/Sub topic must exist." + } + streaming: true + hidden: false + + parameters: + - name: "topic" + description: "Pub/Sub input topic" + help: "Pub/Sub topic to read the input from." + example: "projects/your-project-id/topics/your-topic-name" + required: true + type: text + order: 1 + + - name: "table_id" + description: "BigTable output table" + help: "BigTable table ID to write the output to." + required: true + type: text + order: 2 + + - name: "instance_id" + description: "BigTable instance ID" + help: "The BigTable instance ID." + required: true + type: text + order: 3 + + - name: "project_id" + description: "BigTable project ID" + help: "The Google Cloud project ID of the BigTable instance." + required: true + type: text + order: 4 + + - name: "error_topic" + description: "Pub/Sub error topic" + help: "Pub/Sub topic for failed messages." + example: "projects/your-project-id/topics/your-error-topic-name" + required: true + type: text + order: 5 + + - name: "format" + description: "The message format." + help: "The message format. One of: AVRO, JSON, PROTO, RAW, or STRING." + default: JSON + required: false + type: text + order: 6 + + - name: "schema" + description: "Data schema." + help: > + A schema is required if data format is JSON, AVRO or + PROTO. For JSON, this is a JSON schema. For AVRO and PROTO, this is the full schema definition. + required: false + type: text + order: 7 + + - name: "mapping" + description: "Field mapping configuration" + help: > + A YAML/JSON string that defines the mapping for the MapToFields transform. + e.g., 'language: python\nfields:\n key:\n value: str(uuid.uuid4()).encode()' + required: true + type: text + order: 8 + + + +pipeline: + type: composite + transforms: + - type: ReadFromPubSub + name: ReadMessages + config: + topic: {{ topic }} + format: {{ format }} + schema: | + {{ schema }} + error_handling: + output: errors + - type: MapToFields + name: ConvertStringsToBytes + config: + {{ (mapping + '\nerror_handling:\n output: errors') | indent(8) }} + - type: WriteToBigTable + config: + project: {{ project_id }} + instance: {{ instance_id }} + table: {{ table_id }} + - type: WriteToPubSub + name: WriteReadErrorsToDeadLetterQueue + input: ReadMessages.errors + config: + topic: {{ outputDeadLetterPubSubTopic }} + format: {{ format }} + schema: | + {{ schema }} + +options: + streaming: true From 9459d29f744b8bb2b6856841a841048289113276 Mon Sep 17 00:00:00 2001 From: Derrick Williams Date: Thu, 11 Dec 2025 15:55:07 +0000 Subject: [PATCH 02/12] generated pubsubtoBigTableYaml.java first draft --- .../templates/yaml/PubSubToBigTableYaml.java | 138 ++++++++++++++++++ 1 file changed, 138 insertions(+) create mode 100644 yaml/src/main/java/com/google/cloud/teleport/templates/yaml/PubSubToBigTableYaml.java diff --git a/yaml/src/main/java/com/google/cloud/teleport/templates/yaml/PubSubToBigTableYaml.java b/yaml/src/main/java/com/google/cloud/teleport/templates/yaml/PubSubToBigTableYaml.java new file mode 100644 index 0000000000..1cb8167fa9 --- /dev/null +++ b/yaml/src/main/java/com/google/cloud/teleport/templates/yaml/PubSubToBigTableYaml.java @@ -0,0 +1,138 @@ +/* + * Copyright (C) 2025 Google LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not + * use this file except in compliance with the License. You may obtain a copy of + * the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ +package com.google.cloud.teleport.templates.yaml; + +import com.google.cloud.teleport.metadata.Template; +import com.google.cloud.teleport.metadata.TemplateCategory; +import com.google.cloud.teleport.metadata.TemplateParameter; +import org.apache.beam.sdk.options.Default; +import org.apache.beam.sdk.options.Validation; + +@Template( + name = "PubSub_To_BigTable_Yaml", + category = TemplateCategory.STREAMING, + type = Template.TemplateType.YAML, + displayName = "PubSub to BigTable (YAML)", + description = + "The PubSub to BigTable template is a streaming pipeline which ingests data from a PubSub topic, executes a user-defined mapping, and writes the resulting records to BigTable. Any errors which occur in the transformation of the data are written to a separate Pub/Sub topic.", + flexContainerName = "pubsub-to-bigtable-yaml", + yamlTemplateFile = "PubSubToBigTable.yaml", + filesToCopy = {"PubSubToBigTable.yaml", "main.py", "requirements.txt"}, + documentation = + "https://cloud.google.com/dataflow/docs/guides/templates/provided-yaml/pubsub-to-bigtable", + contactInformation = "https://cloud.google.com/support", + requirements = { + "The input Pub/Sub topic must exist.", + "The mapToField Pub/Sub error topic must exist.", + "The output BigTable table must exist." + }, + streaming = true, + hidden = false) +public interface PubSubToBigTableYaml { + + @TemplateParameter.Text( + order = 1, + name = "topic", + optional = false, + description = "Pub/Sub input topic", + helpText = "Pub/Sub topic to read the input from.", + example = "projects/your-project-id/topics/your-topic-name") + @Validation.Required + String getTopic(); + + @TemplateParameter.Text( + order = 2, + name = "format", + optional = true, + description = "The message format.", + helpText = "The message format. One of: AVRO, JSON, PROTO, RAW, or STRING.", + example = "") + @Default.String("JSON") + String getFormat(); + + @TemplateParameter.Text( + order = 3, + name = "schema", + optional = false, + description = "Data schema.", + helpText = + "A schema is required if data format is JSON, AVRO or PROTO. For JSON, this is a JSON schema. For AVRO and PROTO, this is the full schema definition.", + example = "") + @Validation.Required + String getSchema(); + + @TemplateParameter.Text( + order = 4, + name = "language", + optional = false, + description = "Language used to define the expressions.", + helpText = + "The language used to define (and execute) the expressions and/or callables in fields. Defaults to generic.", + example = "") + @Validation.Required + String getLanguage(); + + @TemplateParameter.Text( + order = 5, + name = "fields", + optional = false, + description = "Field mapping configuration", + helpText = + "The output fields to compute, each mapping to the expression or callable that creates them.", + example = "") + @Validation.Required + String getFields(); + + @TemplateParameter.Text( + order = 6, + name = "project_id", + optional = false, + description = "BigTable project ID", + helpText = "The Google Cloud project ID of the BigTable instance.", + example = "") + @Validation.Required + String getProject_id(); + + @TemplateParameter.Text( + order = 7, + name = "instance_id", + optional = false, + description = "BigTable instance ID", + helpText = "The BigTable instance ID.", + example = "") + @Validation.Required + String getInstance_id(); + + @TemplateParameter.Text( + order = 8, + name = "table_id", + optional = false, + description = "BigTable output table", + helpText = "BigTable table ID to write the output to.", + example = "") + @Validation.Required + String getTable_id(); + + @TemplateParameter.Text( + order = 9, + name = "outputDeadLetterPubSubTopic", + optional = false, + description = "Pub/Sub transformation error topic", + helpText = "Pub/Sub error topic for failed transformation messages.", + example = "projects/your-project-id/topics/your-error-topic-name") + @Validation.Required + String getOutputDeadLetterPubSubTopic(); +} From bdc3559973fd3a30aceb501e3285fc8480245b46 Mon Sep 17 00:00:00 2001 From: Derrick Williams Date: Thu, 11 Dec 2025 15:57:01 +0000 Subject: [PATCH 03/12] updataed blueprint with options --- yaml/src/main/yaml/PubSubToBigTable.yaml | 86 ++++++------------------ 1 file changed, 20 insertions(+), 66 deletions(-) diff --git a/yaml/src/main/yaml/PubSubToBigTable.yaml b/yaml/src/main/yaml/PubSubToBigTable.yaml index 8d9ab66860..d8c0cf08d2 100644 --- a/yaml/src/main/yaml/PubSubToBigTable.yaml +++ b/yaml/src/main/yaml/PubSubToBigTable.yaml @@ -17,77 +17,30 @@ template: https://cloud.google.com/dataflow/docs/guides/templates/provided-yaml/pubsub-to-bigtable contactInformation: "https://cloud.google.com/support" requirements: { - "The output BigTable table must exist.", "The input Pub/Sub topic must exist.", - "The error Pub/Sub topic must exist." + "The mapToField Pub/Sub error topic must exist.", + "The output BigTable table must exist." } streaming: true hidden: false - parameters: - - name: "topic" - description: "Pub/Sub input topic" - help: "Pub/Sub topic to read the input from." - example: "projects/your-project-id/topics/your-topic-name" - required: true - type: text - order: 1 + options_file: + - "pubsub_options" + - "maptofields_options" + - "bigtable_options" - - name: "table_id" - description: "BigTable output table" - help: "BigTable table ID to write the output to." - required: true - type: text - order: 2 - - - name: "instance_id" - description: "BigTable instance ID" - help: "The BigTable instance ID." - required: true - type: text - order: 3 - - - name: "project_id" - description: "BigTable project ID" - help: "The Google Cloud project ID of the BigTable instance." - required: true - type: text - order: 4 + parameters: + - pubsub_common_options + - maptofields_common_options + - bigtable_common_options - - name: "error_topic" - description: "Pub/Sub error topic" - help: "Pub/Sub topic for failed messages." + - name: "outputDeadLetterPubSubTopic" + description: "Pub/Sub transformation error topic" + help: "Pub/Sub error topic for failed transformation messages." example: "projects/your-project-id/topics/your-error-topic-name" required: true type: text - order: 5 - - - name: "format" - description: "The message format." - help: "The message format. One of: AVRO, JSON, PROTO, RAW, or STRING." - default: JSON - required: false - type: text - order: 6 - - - name: "schema" - description: "Data schema." - help: > - A schema is required if data format is JSON, AVRO or - PROTO. For JSON, this is a JSON schema. For AVRO and PROTO, this is the full schema definition. - required: false - type: text - order: 7 - - - name: "mapping" - description: "Field mapping configuration" - help: > - A YAML/JSON string that defines the mapping for the MapToFields transform. - e.g., 'language: python\nfields:\n key:\n value: str(uuid.uuid4()).encode()' - required: true - type: text - order: 8 - + order: 4 pipeline: @@ -100,20 +53,21 @@ pipeline: format: {{ format }} schema: | {{ schema }} - error_handling: - output: errors - type: MapToFields name: ConvertStringsToBytes config: - {{ (mapping + '\nerror_handling:\n output: errors') | indent(8) }} + language: {{ language }} + fields: {{ fields }} + error_handling: + output: errors - type: WriteToBigTable config: project: {{ project_id }} instance: {{ instance_id }} table: {{ table_id }} - type: WriteToPubSub - name: WriteReadErrorsToDeadLetterQueue - input: ReadMessages.errors + name: WriteTransformationErrorsToDeadLetterQueue + input: ConvertStringsToBytes.errors config: topic: {{ outputDeadLetterPubSubTopic }} format: {{ format }} From 5b6e0616a7136548394b458bffaf9ff492554d0c Mon Sep 17 00:00:00 2001 From: Derrick Williams Date: Thu, 11 Dec 2025 15:57:44 +0000 Subject: [PATCH 04/12] add option files --- .../main/python/options/bigtable_options.yaml | 27 +++++++++++++++ .../python/options/maptofields_options.yaml | 25 ++++++++++++++ .../main/python/options/pubsub_options.yaml | 33 +++++++++++++++++++ 3 files changed, 85 insertions(+) create mode 100644 yaml/src/main/python/options/bigtable_options.yaml create mode 100644 yaml/src/main/python/options/maptofields_options.yaml create mode 100644 yaml/src/main/python/options/pubsub_options.yaml diff --git a/yaml/src/main/python/options/bigtable_options.yaml b/yaml/src/main/python/options/bigtable_options.yaml new file mode 100644 index 0000000000..f1d94cdc3b --- /dev/null +++ b/yaml/src/main/python/options/bigtable_options.yaml @@ -0,0 +1,27 @@ +options: + - name: "bigtable_common_options" + parameters: + - order: 1 + name: "project_id" + description: "BigTable project ID" + help: "The Google Cloud project ID of the BigTable instance." + required: true + type: text + - order: 2 + name: "instance_id" + description: "BigTable instance ID" + help: "The BigTable instance ID." + required: true + type: text + - order: 3 + name: "table_id" + description: "BigTable output table" + help: "BigTable table ID to write the output to." + required: true + type: text + + - name: "bigtable_read_options" + parameters: + + - name: "bigtable_write_options" + parameters: diff --git a/yaml/src/main/python/options/maptofields_options.yaml b/yaml/src/main/python/options/maptofields_options.yaml new file mode 100644 index 0000000000..f50d050eda --- /dev/null +++ b/yaml/src/main/python/options/maptofields_options.yaml @@ -0,0 +1,25 @@ +options: + - name: "maptofields_common_options" + parameters: + - order: 1 + name: "language" + description: "Language used to define the expressions." + help: > + The language used to define (and execute) the expressions and/or + callables in fields. Defaults to generic. + required: true + type: text + - order: 2 + name: "fields" + description: "Field mapping configuration" + help: > + The output fields to compute, each mapping to the expression or callable + that creates them. + required: true + type: text + + - name: "maptofields_read_options" + parameters: + + - name: "maptofields_write_options" + parameters: diff --git a/yaml/src/main/python/options/pubsub_options.yaml b/yaml/src/main/python/options/pubsub_options.yaml new file mode 100644 index 0000000000..05d94a0267 --- /dev/null +++ b/yaml/src/main/python/options/pubsub_options.yaml @@ -0,0 +1,33 @@ +options: + - name: "pubsub_common_options" + parameters: + - order: 1 + name: "topic" + description: "Pub/Sub input topic" + help: "Pub/Sub topic to read the input from." + example: "projects/your-project-id/topics/your-topic-name" + required: true + type: text + - order: 2 + name: "format" + description: "The message format." + help: "The message format. One of: AVRO, JSON, PROTO, RAW, or STRING." + default: JSON + required: false + type: text + - order: 3 + name: "schema" + description: "Data schema." + help: > + A schema is required if data format is JSON, AVRO or PROTO. For JSON, + this is a JSON schema. For AVRO and PROTO, this is the full schema + definition. + required: true + type: text + + - name: "pubsub_read_options" + parameters: + + - name: "pubsub_write_options" + parameters: + From c3d62d39190e072d32fd6cbb749171a1dedba404 Mon Sep 17 00:00:00 2001 From: Derrick Williams Date: Thu, 11 Dec 2025 15:58:53 +0000 Subject: [PATCH 05/12] first draft of it file --- .../yaml/PubSubToBigTableYamlIT.java | 241 ++++++++++++++++++ 1 file changed, 241 insertions(+) create mode 100644 yaml/src/test/java/com/google/cloud/teleport/templates/yaml/PubSubToBigTableYamlIT.java diff --git a/yaml/src/test/java/com/google/cloud/teleport/templates/yaml/PubSubToBigTableYamlIT.java b/yaml/src/test/java/com/google/cloud/teleport/templates/yaml/PubSubToBigTableYamlIT.java new file mode 100644 index 0000000000..5ab2594e73 --- /dev/null +++ b/yaml/src/test/java/com/google/cloud/teleport/templates/yaml/PubSubToBigTableYamlIT.java @@ -0,0 +1,241 @@ +/* + * Copyright (C) 2025 Google LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not + * use this file except in compliance with the License. You may obtain a copy of + * the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ +package com.google.cloud.teleport.templates.yaml; + +import static com.google.common.truth.Truth.assertThat; +import static org.apache.beam.it.truthmatchers.PipelineAsserts.assertThatPipeline; +import static org.apache.beam.it.truthmatchers.PipelineAsserts.assertThatResult; + +import com.google.cloud.bigtable.data.v2.models.Row; +import com.google.cloud.teleport.metadata.SkipDirectRunnerTest; +import com.google.cloud.teleport.metadata.TemplateIntegrationTest; +import com.google.common.collect.ImmutableList; +import com.google.protobuf.ByteString; +import com.google.pubsub.v1.TopicName; +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import java.util.concurrent.TimeUnit; +import java.util.function.Function; +import java.util.stream.Collectors; +import org.apache.beam.it.common.PipelineLauncher; +import org.apache.beam.it.common.PipelineOperator; +import org.apache.beam.it.common.utils.ResourceManagerUtils; +import org.apache.beam.it.gcp.TemplateTestBase; +import org.apache.beam.it.gcp.bigtable.BigtableResourceManager; +import org.apache.beam.it.gcp.pubsub.PubsubResourceManager; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; +import org.junit.experimental.categories.Category; +import org.junit.runner.RunWith; +import org.junit.runners.JUnit4; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +// import com.google.pubsub.v1.SubscriptionName; +// import org.apache.beam.it.gcp.pubsub.conditions.PubsubMessagesCheck; + +@Category({TemplateIntegrationTest.class, SkipDirectRunnerTest.class}) +@TemplateIntegrationTest(PubSubToBigTableYaml.class) +@RunWith(JUnit4.class) +public final class PubSubToBigTableYamlIT extends TemplateTestBase { + + private static final Logger LOG = LoggerFactory.getLogger(PubSubToBigTableYamlIT.class); + + private PubsubResourceManager pubsubResourceManager; + private BigtableResourceManager bigtableResourceManager; + + @Before + public void setup() throws IOException { + // Pubsub and BigTable resource managers + pubsubResourceManager = + PubsubResourceManager.builder(testName, PROJECT, credentialsProvider).build(); + bigtableResourceManager = + BigtableResourceManager.builder(testName, PROJECT, credentialsProvider).build(); + } + + @After + public void tearDown() { + // Clean up resource managers + ResourceManagerUtils.cleanResources(pubsubResourceManager, bigtableResourceManager); + } + + @Test + public void testPubSubToBigTable() throws IOException { + pubSubToBigTable(Function.identity()); // no extra parameters + } + + public void pubSubToBigTable( + Function + paramsAdder) + throws IOException { + + /******************************* Arrange ********************************/ + + // Create main and dead letter queue topics + TopicName topic = pubsubResourceManager.createTopic("input"); + TopicName dlqTopic = pubsubResourceManager.createTopic("dlq"); + + // Create Bigtable table + String tableId = "test_table"; + bigtableResourceManager.createTable(tableId, ImmutableList.of("cf")); + + // Create launch config with the yaml pipeline parameters + PipelineLauncher.LaunchConfig.Builder options = + paramsAdder.apply( + PipelineLauncher.LaunchConfig.builder(testName, specPath) + .addParameter("topic", topic.toString()) + .addParameter("format", "JSON") + // .addParameter("schema", + // "{\"type\":\"object\",\"properties\":{\"key\":{\"type\":\"string\"},\"type\":{\"type\":\"string\"},\"family_name\":{\"type\":\"string\"},\"column_qualifier\":{\"type\":\"string\"},\"value\":{\"type\":\"string\"},\"timestamp_micros\":{\"type\":\"integer\"}},\"required\":[\"key\",\"type\",\"family_name\",\"column_qualifier\",\"value\"]}") + .addParameter( + "schema", + "{\"type\":\"object\",\"properties\":{\"key\":\"string\",\"type\":\"string\",\"family_name\":\"string\",\"column_qualifier\":\"string\",\"value\":\"string\",\"timestamp_micros\":\"integer\"})") + // .addParameter( + // "schema", + // + // "{\"type\":\"object\",\"properties\":{\"id\":{\"type\":\"integer\"},\"name\":{\"type\":\"string\"}}}") + .addParameter("table_id", tableId) + .addParameter("instance_id", bigtableResourceManager.getInstanceId()) + .addParameter("project_id", PROJECT) + .addParameter("language", "generic") + .addParameter( + "fields", + "[{\"name\": \"rowkey\", \"value\": \"id\"}, {\"name\": \"cf:name\", \"value\": \"name\"}]") + .addParameter("outputDeadLetterPubSubTopic", dlqTopic.toString())); + + /********************************* Act **********************************/ + + // Launch pipeline and assert running + PipelineLauncher.LaunchInfo info = launchTemplate(options); + assertThatPipeline(info).isRunning(); + + // Publish messages to the topic while the pipeline is running + List> expectedSuccessRecords = new ArrayList<>(); + List expectedFailureRecords = new ArrayList<>(); + for (int i = 1; i <= 10; i++) { + // Valid schema + long id1 = Long.parseLong(i + "1"); + long id2 = Long.parseLong(i + "2"); + pubsubResourceManager.publish( + topic, + null, + ByteString.copyFromUtf8( + "{\"key\": \"row" + + id1 + + "\", \"type\": \"SetCell\", \"family_name\": \"cf1\", \"column_qualifier\": \"cq1\", \"value\": \"value1\", \"timestamp_micros\": 5000}")); + pubsubResourceManager.publish( + topic, + null, + ByteString.copyFromUtf8( + "{\"key\": \"row" + + id2 + + "\", \"type\": \"SetCell\", \"family_name\": \"cf1\", \"column_qualifier\": \"cq2\", \"value\": \"value2\", \"timestamp_micros\": 1000}")); + expectedSuccessRecords.add( + Map.of( + "key", + "row" + id1, + "type", + "SetCell", + "family_name", + "cf1", + "column_qualifier", + "cq1", + "value", + "value1", + "timestamp_micros", + 5000)); + expectedSuccessRecords.add( + Map.of( + "key", + "row" + id2, + "type", + "SetCell", + "family_name", + "cf1", + "column_qualifier", + "cq2", + "value", + "value2", + "timestamp_micros", + 1000)); + + // Invalid schema + String invalidRow = + "{\"key\": 123, \"type\": \"SetCell\", \"family_name\": \"cf1\", \"column_qualifier\": \"cq-invalid\", \"value\": \"invalid-value\", \"timestamp_micros\": 0}"; + pubsubResourceManager.publish(topic, null, ByteString.copyFromUtf8(invalidRow)); + expectedFailureRecords.add(ByteString.copyFromUtf8(invalidRow)); + + try { + TimeUnit.SECONDS.sleep(3); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + } + } + + // Add check that 10 dlq messages were received + // PubsubMessagesCheck dlqCheck = + // PubsubMessagesCheck.builder(pubsubResourceManager, dlqTopic, "dlq-topic") + // .setMinMessages(10) + // .build(); + + // Add check that 20 messages were written to the table + + // PipelineOperator.Result result = + // pipelineOperator().waitForConditionAndFinish(createConfig(info), dlqCheck); + + PipelineOperator.Result result = + pipelineOperator() + .waitForConditionAndFinish( + createConfig(info), + // dlqCheck, + () -> bigtableResourceManager.readTable(tableId).size() >= 20); + + /******************************** Assert ********************************/ + assertThatResult(result).meetsConditions(); + + // Verify 20 rows in the table + List tableRows = bigtableResourceManager.readTable(tableId); + assertThat(tableRows).hasSize(20); + + // Verify 20 rows are expected + Map rowMap = + tableRows.stream() + .collect(Collectors.toMap(row -> row.getKey().toStringUtf8(), row -> row)); + + for (int i = 1; i <= 10; i++) { + String key1 = "row" + i + "1"; + assertThat(rowMap).containsKey(key1); + Row row1 = rowMap.get(key1); + assertThat(row1.getCells()).hasSize(1); + assertThat(row1.getCells().get(0).getFamily()).isEqualTo("cf1"); + assertThat(row1.getCells().get(0).getQualifier().toStringUtf8()).isEqualTo("cq1"); + assertThat(row1.getCells().get(0).getValue().toStringUtf8()).isEqualTo("value1"); + assertThat(row1.getCells().get(0).getTimestamp()).isEqualTo(5000L); + + String key2 = "row" + i + "2"; + assertThat(rowMap).containsKey(key2); + Row row2 = rowMap.get(key2); + assertThat(row2.getCells()).hasSize(1); + assertThat(row2.getCells().get(0).getFamily()).isEqualTo("cf1"); + assertThat(row2.getCells().get(0).getQualifier().toStringUtf8()).isEqualTo("cq2"); + assertThat(row2.getCells().get(0).getValue().toStringUtf8()).isEqualTo("value2"); + assertThat(row2.getCells().get(0).getTimestamp()).isEqualTo(1000L); + } + } +} From 003c97cd65b8b7cf220494634ad5d6e58d6e42f9 Mon Sep 17 00:00:00 2001 From: Derrick Williams Date: Thu, 11 Dec 2025 16:14:13 +0000 Subject: [PATCH 06/12] first draft of readme --- yaml/README_PubSub_To_BigTable_Yaml.md | 244 +++++++++++++++++++++++++ 1 file changed, 244 insertions(+) create mode 100644 yaml/README_PubSub_To_BigTable_Yaml.md diff --git a/yaml/README_PubSub_To_BigTable_Yaml.md b/yaml/README_PubSub_To_BigTable_Yaml.md new file mode 100644 index 0000000000..55b5f14e78 --- /dev/null +++ b/yaml/README_PubSub_To_BigTable_Yaml.md @@ -0,0 +1,244 @@ + +PubSub to BigTable (YAML) template +--- +The PubSub to BigTable template is a streaming pipeline which ingests data from a +PubSub topic, executes a user-defined mapping, and writes the resulting records +to BigTable. Any errors which occur in the transformation of the data are written +to a separate Pub/Sub topic. + + +:memo: This is a Google-provided template! Please +check [Provided templates documentation](https://cloud.google.com/dataflow/docs/guides/templates/provided-yaml/pubsub-to-bigtable) +on how to use it without having to build from sources using [Create job from template](https://console.cloud.google.com/dataflow/createjob?template=PubSub_To_BigTable_Yaml). + +:bulb: This is a generated documentation based +on [Metadata Annotations](https://github.com/GoogleCloudPlatform/DataflowTemplates/blob/main/contributor-docs/code-contributions.md#metadata-annotations) +. Do not change this file directly. + +## Parameters + +### Required parameters + +* **topic**: Pub/Sub topic to read the input from. For example, `projects/your-project-id/topics/your-topic-name`. +* **schema**: A schema is required if data format is JSON, AVRO or PROTO. For JSON, this is a JSON schema. For AVRO and PROTO, this is the full schema definition. +* **language**: The language used to define (and execute) the expressions and/or callables in fields. Defaults to generic. +* **fields**: The output fields to compute, each mapping to the expression or callable that creates them. +* **project_id**: The Google Cloud project ID of the BigTable instance. +* **instance_id**: The BigTable instance ID. +* **table_id**: BigTable table ID to write the output to. +* **outputDeadLetterPubSubTopic**: Pub/Sub error topic for failed transformation messages. For example, `projects/your-project-id/topics/your-error-topic-name`. + +### Optional parameters + +* **format**: The message format. One of: AVRO, JSON, PROTO, RAW, or STRING. Defaults to: JSON. + + + +## Getting Started + +### Requirements + +* Java 17 +* Maven +* [gcloud CLI](https://cloud.google.com/sdk/gcloud), and execution of the + following commands: + * `gcloud auth login` + * `gcloud auth application-default login` + +:star2: Those dependencies are pre-installed if you use Google Cloud Shell! + +[![Open in Cloud Shell](http://gstatic.com/cloudssh/images/open-btn.svg)](https://console.cloud.google.com/cloudshell/editor?cloudshell_git_repo=https%3A%2F%2Fgithub.com%2FGoogleCloudPlatform%2FDataflowTemplates.git&cloudshell_open_in_editor=yaml/src/main/java/com/google/cloud/teleport/templates/yaml/PubSubToBigTableYaml.java) + +### Templates Plugin + +This README provides instructions using +the [Templates Plugin](https://github.com/GoogleCloudPlatform/DataflowTemplates/blob/main/contributor-docs/code-contributions.md#templates-plugin). + +#### Validating the Template + +This template has a validation command that is used to check code quality. + +```shell +mvn clean install -PtemplatesValidate \ +-DskipTests -am \ +-pl yaml +``` + +### Building Template + +This template is a Flex Template, meaning that the pipeline code will be +containerized and the container will be executed on Dataflow. Please +check [Use Flex Templates](https://cloud.google.com/dataflow/docs/guides/templates/using-flex-templates) +and [Configure Flex Templates](https://cloud.google.com/dataflow/docs/guides/templates/configuring-flex-templates) +for more information. + +#### Staging the Template + +If the plan is to just stage the template (i.e., make it available to use) by +the `gcloud` command or Dataflow "Create job from template" UI, +the `-PtemplatesStage` profile should be used: + +```shell +export PROJECT= +export BUCKET_NAME= +export ARTIFACT_REGISTRY_REPO=-docker.pkg.dev/$PROJECT/ + +mvn clean package -PtemplatesStage \ +-DskipTests \ +-DprojectId="$PROJECT" \ +-DbucketName="$BUCKET_NAME" \ +-DartifactRegistry="$ARTIFACT_REGISTRY_REPO" \ +-DstagePrefix="templates" \ +-DtemplateName="PubSub_To_BigTable_Yaml" \ +-f yaml +``` + +The `-DartifactRegistry` parameter can be specified to set the artifact registry repository of the Flex Templates image. +If not provided, it defaults to `gcr.io/`. + +The command should build and save the template to Google Cloud, and then print +the complete location on Cloud Storage: + +``` +Flex Template was staged! gs:///templates/flex/PubSub_To_BigTable_Yaml +``` + +The specific path should be copied as it will be used in the following steps. + +#### Running the Template + +**Using the staged template**: + +You can use the path above run the template (or share with others for execution). + +To start a job with the template at any time using `gcloud`, you are going to +need valid resources for the required parameters. + +Provided that, the following command line can be used: + +```shell +export PROJECT= +export BUCKET_NAME= +export REGION=us-central1 +export TEMPLATE_SPEC_GCSPATH="gs://$BUCKET_NAME/templates/flex/PubSub_To_BigTable_Yaml" + +### Required +export TOPIC= +export SCHEMA= +export LANGUAGE= +export FIELDS= +export PROJECT_ID= +export INSTANCE_ID= +export TABLE_ID= +export OUTPUT_DEAD_LETTER_PUB_SUB_TOPIC= + +### Optional +export FORMAT=JSON + +gcloud dataflow flex-template run "pubsub-to-bigtable-yaml-job" \ + --project "$PROJECT" \ + --region "$REGION" \ + --template-file-gcs-location "$TEMPLATE_SPEC_GCSPATH" \ + --parameters "topic=$TOPIC" \ + --parameters "format=$FORMAT" \ + --parameters "schema=$SCHEMA" \ + --parameters "language=$LANGUAGE" \ + --parameters "fields=$FIELDS" \ + --parameters "project_id=$PROJECT_ID" \ + --parameters "instance_id=$INSTANCE_ID" \ + --parameters "table_id=$TABLE_ID" \ + --parameters "outputDeadLetterPubSubTopic=$OUTPUT_DEAD_LETTER_PUB_SUB_TOPIC" +``` + +For more information about the command, please check: +https://cloud.google.com/sdk/gcloud/reference/dataflow/flex-template/run + + +**Using the plugin**: + +Instead of just generating the template in the folder, it is possible to stage +and run the template in a single command. This may be useful for testing when +changing the templates. + +```shell +export PROJECT= +export BUCKET_NAME= +export REGION=us-central1 + +### Required +export TOPIC= +export SCHEMA= +export LANGUAGE= +export FIELDS= +export PROJECT_ID= +export INSTANCE_ID= +export TABLE_ID= +export OUTPUT_DEAD_LETTER_PUB_SUB_TOPIC= + +### Optional +export FORMAT=JSON + +mvn clean package -PtemplatesRun \ +-DskipTests \ +-DprojectId="$PROJECT" \ +-DbucketName="$BUCKET_NAME" \ +-Dregion="$REGION" \ +-DjobName="pubsub-to-bigtable-yaml-job" \ +-DtemplateName="PubSub_To_BigTable_Yaml" \ +-Dparameters="topic=$TOPIC,format=$FORMAT,schema=$SCHEMA,language=$LANGUAGE,fields=$FIELDS,project_id=$PROJECT_ID,instance_id=$INSTANCE_ID,table_id=$TABLE_ID,outputDeadLetterPubSubTopic=$OUTPUT_DEAD_LETTER_PUB_SUB_TOPIC" \ +-f yaml +``` + +## Terraform + +Dataflow supports the utilization of Terraform to manage template jobs, +see [dataflow_flex_template_job](https://registry.terraform.io/providers/hashicorp/google/latest/docs/resources/dataflow_flex_template_job). + +Terraform modules have been generated for most templates in this repository. This includes the relevant parameters +specific to the template. If available, they may be used instead of +[dataflow_flex_template_job](https://registry.terraform.io/providers/hashicorp/google/latest/docs/resources/dataflow_flex_template_job) +directly. + +To use the autogenerated module, execute the standard +[terraform workflow](https://developer.hashicorp.com/terraform/intro/core-workflow): + +```shell +cd v2/yaml/terraform/PubSub_To_BigTable_Yaml +terraform init +terraform apply +``` + +To use +[dataflow_flex_template_job](https://registry.terraform.io/providers/hashicorp/google/latest/docs/resources/dataflow_flex_template_job) +directly: + +```terraform +provider "google-beta" { + project = var.project +} +variable "project" { + default = "" +} +variable "region" { + default = "us-central1" +} + +resource "google_dataflow_flex_template_job" "pubsub_to_bigtable_yaml" { + + provider = google-beta + container_spec_gcs_path = "gs://dataflow-templates-${var.region}/latest/flex/PubSub_To_BigTable_Yaml" + name = "pubsub-to-bigtable-yaml" + region = var.region + parameters = { + topic = "" + schema = "" + language = "" + fields = "" + project_id = "" + instance_id = "" + table_id = "" + outputDeadLetterPubSubTopic = "" + # format = "JSON" + } +} +``` From 9ea167084101b8a598c894be72405586813a8863 Mon Sep 17 00:00:00 2001 From: Derrick Williams Date: Mon, 22 Dec 2025 15:23:19 +0000 Subject: [PATCH 07/12] add bigtable options --- yaml/src/main/python/options/bigtable_options.yaml | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/yaml/src/main/python/options/bigtable_options.yaml b/yaml/src/main/python/options/bigtable_options.yaml index f1d94cdc3b..28a6ea7956 100644 --- a/yaml/src/main/python/options/bigtable_options.yaml +++ b/yaml/src/main/python/options/bigtable_options.yaml @@ -2,19 +2,19 @@ options: - name: "bigtable_common_options" parameters: - order: 1 - name: "project_id" + name: "projectId" description: "BigTable project ID" help: "The Google Cloud project ID of the BigTable instance." required: true type: text - order: 2 - name: "instance_id" + name: "instanceId" description: "BigTable instance ID" help: "The BigTable instance ID." required: true type: text - order: 3 - name: "table_id" + name: "tableId" description: "BigTable output table" help: "BigTable table ID to write the output to." required: true From 6c409f5fe654cf4cb542c265c66c2fea761bf496 Mon Sep 17 00:00:00 2001 From: Derrick Williams Date: Mon, 22 Dec 2025 15:25:03 +0000 Subject: [PATCH 08/12] add new builder method --- .../beam/it/gcp/pubsub/conditions/PubsubMessagesCheck.java | 7 +++++++ 1 file changed, 7 insertions(+) diff --git a/it/google-cloud-platform/src/main/java/org/apache/beam/it/gcp/pubsub/conditions/PubsubMessagesCheck.java b/it/google-cloud-platform/src/main/java/org/apache/beam/it/gcp/pubsub/conditions/PubsubMessagesCheck.java index 6803eec291..e02f6b3479 100644 --- a/it/google-cloud-platform/src/main/java/org/apache/beam/it/gcp/pubsub/conditions/PubsubMessagesCheck.java +++ b/it/google-cloud-platform/src/main/java/org/apache/beam/it/gcp/pubsub/conditions/PubsubMessagesCheck.java @@ -21,6 +21,7 @@ import com.google.pubsub.v1.PullResponse; import com.google.pubsub.v1.ReceivedMessage; import com.google.pubsub.v1.SubscriptionName; +import com.google.pubsub.v1.TopicName; import java.util.ArrayList; import java.util.List; import javax.annotation.Nullable; @@ -86,6 +87,12 @@ public CheckResult check() { String.format("Expected at least %d messages and found %d", minMessages(), totalRows)); } + public static Builder builder( + PubsubResourceManager resourceManager, TopicName topic, String subscription) { + SubscriptionName subscriptionName = resourceManager.createSubscription(topic, subscription); + return builder(resourceManager, subscriptionName); + } + public static Builder builder( PubsubResourceManager resourceManager, SubscriptionName subscription) { return new AutoValue_PubsubMessagesCheck.Builder() From 1ecd0b1ed69e9f2b642934ba77b8198ff636be1e Mon Sep 17 00:00:00 2001 From: Derrick Williams Date: Tue, 23 Dec 2025 18:32:07 +0000 Subject: [PATCH 09/12] add windowing option file --- yaml/src/main/python/options/windowinto_options.yaml | 10 ++++++++++ 1 file changed, 10 insertions(+) create mode 100644 yaml/src/main/python/options/windowinto_options.yaml diff --git a/yaml/src/main/python/options/windowinto_options.yaml b/yaml/src/main/python/options/windowinto_options.yaml new file mode 100644 index 0000000000..461405d0c4 --- /dev/null +++ b/yaml/src/main/python/options/windowinto_options.yaml @@ -0,0 +1,10 @@ +options: + - name: "windowinto_common_options" + parameters: + - order: 1 + name: "windowing" + description: "Windowing options" + help: "Windowing options - see https://beam.apache.org/documentation/sdks/yaml/#windowing" + is_optional: false + type: text + From 8c5dd578cbcf6290c7ecc9b78f19c695b33c4045 Mon Sep 17 00:00:00 2001 From: Derrick Williams Date: Tue, 23 Dec 2025 18:32:35 +0000 Subject: [PATCH 10/12] update pipeline based on new windowing parameter --- yaml/src/main/yaml/PubSubToBigTable.yaml | 21 +++++++++++++++------ 1 file changed, 15 insertions(+), 6 deletions(-) diff --git a/yaml/src/main/yaml/PubSubToBigTable.yaml b/yaml/src/main/yaml/PubSubToBigTable.yaml index d8c0cf08d2..76908d4998 100644 --- a/yaml/src/main/yaml/PubSubToBigTable.yaml +++ b/yaml/src/main/yaml/PubSubToBigTable.yaml @@ -27,12 +27,14 @@ template: options_file: - "pubsub_options" - "maptofields_options" + - "windowinto_options" - "bigtable_options" parameters: - pubsub_common_options - maptofields_common_options - bigtable_common_options + - windowinto_common_options - name: "outputDeadLetterPubSubTopic" description: "Pub/Sub transformation error topic" @@ -40,7 +42,7 @@ template: example: "projects/your-project-id/topics/your-error-topic-name" required: true type: text - order: 4 + order: 1 pipeline: @@ -51,20 +53,27 @@ pipeline: config: topic: {{ topic }} format: {{ format }} - schema: | - {{ schema }} + schema: {{ schema }} - type: MapToFields name: ConvertStringsToBytes + input: ReadMessages config: language: {{ language }} fields: {{ fields }} error_handling: output: errors + - type: WindowInto + name: WindowInto + input: ConvertStringsToBytes + config: + windowing: {{ windowing }} - type: WriteToBigTable + name: WriteGoodMessagesToBigTable + input: WindowInto config: - project: {{ project_id }} - instance: {{ instance_id }} - table: {{ table_id }} + project: {{ projectId }} + instance: {{ instanceId }} + table: {{ tableId }} - type: WriteToPubSub name: WriteTransformationErrorsToDeadLetterQueue input: ConvertStringsToBytes.errors From 0ab8300ace89e0ba5c43e32b7fc4d7aead630fc7 Mon Sep 17 00:00:00 2001 From: Derrick Williams Date: Tue, 23 Dec 2025 18:32:54 +0000 Subject: [PATCH 11/12] generated new java template file --- .../templates/yaml/PubSubToBigTableYaml.java | 22 ++++++++++++++----- 1 file changed, 16 insertions(+), 6 deletions(-) diff --git a/yaml/src/main/java/com/google/cloud/teleport/templates/yaml/PubSubToBigTableYaml.java b/yaml/src/main/java/com/google/cloud/teleport/templates/yaml/PubSubToBigTableYaml.java index 1cb8167fa9..825303183b 100644 --- a/yaml/src/main/java/com/google/cloud/teleport/templates/yaml/PubSubToBigTableYaml.java +++ b/yaml/src/main/java/com/google/cloud/teleport/templates/yaml/PubSubToBigTableYaml.java @@ -98,36 +98,46 @@ public interface PubSubToBigTableYaml { @TemplateParameter.Text( order = 6, - name = "project_id", + name = "projectId", optional = false, description = "BigTable project ID", helpText = "The Google Cloud project ID of the BigTable instance.", example = "") @Validation.Required - String getProject_id(); + String getProjectId(); @TemplateParameter.Text( order = 7, - name = "instance_id", + name = "instanceId", optional = false, description = "BigTable instance ID", helpText = "The BigTable instance ID.", example = "") @Validation.Required - String getInstance_id(); + String getInstanceId(); @TemplateParameter.Text( order = 8, - name = "table_id", + name = "tableId", optional = false, description = "BigTable output table", helpText = "BigTable table ID to write the output to.", example = "") @Validation.Required - String getTable_id(); + String getTableId(); @TemplateParameter.Text( order = 9, + name = "windowing", + optional = true, + description = "Windowing options", + helpText = + "Windowing options - see https://beam.apache.org/documentation/sdks/yaml/#windowing", + example = "") + String getWindowing(); + + @TemplateParameter.Text( + order = 10, name = "outputDeadLetterPubSubTopic", optional = false, description = "Pub/Sub transformation error topic", From 0c8c4375472b771ce0812d55a9f33f3f7559e4f8 Mon Sep 17 00:00:00 2001 From: Derrick Williams Date: Tue, 23 Dec 2025 18:33:15 +0000 Subject: [PATCH 12/12] updated it to handle pub sub messages better etc --- .../yaml/PubSubToBigTableYamlIT.java | 195 +++++++++--------- 1 file changed, 102 insertions(+), 93 deletions(-) diff --git a/yaml/src/test/java/com/google/cloud/teleport/templates/yaml/PubSubToBigTableYamlIT.java b/yaml/src/test/java/com/google/cloud/teleport/templates/yaml/PubSubToBigTableYamlIT.java index 5ab2594e73..f3a6c489f4 100644 --- a/yaml/src/test/java/com/google/cloud/teleport/templates/yaml/PubSubToBigTableYamlIT.java +++ b/yaml/src/test/java/com/google/cloud/teleport/templates/yaml/PubSubToBigTableYamlIT.java @@ -19,17 +19,20 @@ import static org.apache.beam.it.truthmatchers.PipelineAsserts.assertThatPipeline; import static org.apache.beam.it.truthmatchers.PipelineAsserts.assertThatResult; +import com.google.api.core.ApiFuture; +import com.google.api.core.ApiFutures; import com.google.cloud.bigtable.data.v2.models.Row; +import com.google.cloud.pubsub.v1.Publisher; import com.google.cloud.teleport.metadata.SkipDirectRunnerTest; import com.google.cloud.teleport.metadata.TemplateIntegrationTest; import com.google.common.collect.ImmutableList; import com.google.protobuf.ByteString; +import com.google.pubsub.v1.PubsubMessage; import com.google.pubsub.v1.TopicName; import java.io.IOException; import java.util.ArrayList; import java.util.List; import java.util.Map; -import java.util.concurrent.TimeUnit; import java.util.function.Function; import java.util.stream.Collectors; import org.apache.beam.it.common.PipelineLauncher; @@ -38,6 +41,7 @@ import org.apache.beam.it.gcp.TemplateTestBase; import org.apache.beam.it.gcp.bigtable.BigtableResourceManager; import org.apache.beam.it.gcp.pubsub.PubsubResourceManager; +import org.apache.beam.it.gcp.pubsub.conditions.PubsubMessagesCheck; import org.junit.After; import org.junit.Before; import org.junit.Test; @@ -47,9 +51,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; -// import com.google.pubsub.v1.SubscriptionName; -// import org.apache.beam.it.gcp.pubsub.conditions.PubsubMessagesCheck; - +/** Integration test for {@link PubSubToBigTableYaml}. */ @Category({TemplateIntegrationTest.class, SkipDirectRunnerTest.class}) @TemplateIntegrationTest(PubSubToBigTableYaml.class) @RunWith(JUnit4.class) @@ -77,7 +79,7 @@ public void tearDown() { @Test public void testPubSubToBigTable() throws IOException { - pubSubToBigTable(Function.identity()); // no extra parameters + pubSubToBigTable(Function.identity()); } public void pubSubToBigTable( @@ -85,135 +87,142 @@ public void pubSubToBigTable( paramsAdder) throws IOException { + LOG.info("Starting pubSubToBigTable test. Test name: {}. Spec path: {}", testName, specPath); + /******************************* Arrange ********************************/ - // Create main and dead letter queue topics + LOG.info("Creating main and dead letter queue topics..."); TopicName topic = pubsubResourceManager.createTopic("input"); TopicName dlqTopic = pubsubResourceManager.createTopic("dlq"); - // Create Bigtable table + LOG.info("Creating Bigtable table..."); String tableId = "test_table"; - bigtableResourceManager.createTable(tableId, ImmutableList.of("cf")); + bigtableResourceManager.createTable(tableId, ImmutableList.of("cf1")); - // Create launch config with the yaml pipeline parameters + LOG.info("Creating launch config with yaml pipeline parameters..."); PipelineLauncher.LaunchConfig.Builder options = paramsAdder.apply( PipelineLauncher.LaunchConfig.builder(testName, specPath) .addParameter("topic", topic.toString()) .addParameter("format", "JSON") - // .addParameter("schema", - // "{\"type\":\"object\",\"properties\":{\"key\":{\"type\":\"string\"},\"type\":{\"type\":\"string\"},\"family_name\":{\"type\":\"string\"},\"column_qualifier\":{\"type\":\"string\"},\"value\":{\"type\":\"string\"},\"timestamp_micros\":{\"type\":\"integer\"}},\"required\":[\"key\",\"type\",\"family_name\",\"column_qualifier\",\"value\"]}") .addParameter( "schema", - "{\"type\":\"object\",\"properties\":{\"key\":\"string\",\"type\":\"string\",\"family_name\":\"string\",\"column_qualifier\":\"string\",\"value\":\"string\",\"timestamp_micros\":\"integer\"})") - // .addParameter( - // "schema", - // - // "{\"type\":\"object\",\"properties\":{\"id\":{\"type\":\"integer\"},\"name\":{\"type\":\"string\"}}}") - .addParameter("table_id", tableId) - .addParameter("instance_id", bigtableResourceManager.getInstanceId()) - .addParameter("project_id", PROJECT) - .addParameter("language", "generic") + "{\"type\":\"object\",\"properties\":{\"key\":{\"type\":\"string\"},\"type\":{\"type\":\"string\"},\"family_name\":{\"type\":\"string\"},\"column_qualifier\":{\"type\":\"string\"},\"value\":{\"type\":\"string\"},\"timestamp_micros\":{\"type\":\"integer\"}}}") + .addParameter("windowing", "{\"type\":\"fixed\",\"size\":\"10s\"}") + .addParameter("tableId", tableId) + .addParameter("instanceId", bigtableResourceManager.getInstanceId()) + .addParameter("projectId", PROJECT) + .addParameter("language", "python") .addParameter( "fields", - "[{\"name\": \"rowkey\", \"value\": \"id\"}, {\"name\": \"cf:name\", \"value\": \"name\"}]") + "{" + + "\"key\": {\"expression\": \"key.encode('utf-8')\", \"output_type\": \"bytes\"}," + + "\"type\": {\"expression\": \"type\", \"output_type\": \"string\"}," + + "\"family_name\": {\"expression\": \"family_name\", \"output_type\": \"string\"}," + + "\"column_qualifier\": {\"expression\": \"column_qualifier.encode('utf-8')\", \"output_type\": \"bytes\"}," + + "\"value\": {\"expression\": \"value.encode('utf-8')\", \"output_type\": \"bytes\"}," + + "\"timestamp_micros\": {\"expression\": \"timestamp_micros\", \"output_type\": \"integer\"}" + + "}") .addParameter("outputDeadLetterPubSubTopic", dlqTopic.toString())); /********************************* Act **********************************/ - // Launch pipeline and assert running + LOG.info("Launching template with options..."); PipelineLauncher.LaunchInfo info = launchTemplate(options); + + LOG.info("Template launched. LaunchInfo: {}", info); assertThatPipeline(info).isRunning(); - // Publish messages to the topic while the pipeline is running - List> expectedSuccessRecords = new ArrayList<>(); - List expectedFailureRecords = new ArrayList<>(); + LOG.info("Creating messages to be published into the topic..."); + List successMessages = new ArrayList<>(); + List failureMessages = new ArrayList<>(); for (int i = 1; i <= 10; i++) { - // Valid schema long id1 = Long.parseLong(i + "1"); long id2 = Long.parseLong(i + "2"); - pubsubResourceManager.publish( - topic, - null, + successMessages.add( ByteString.copyFromUtf8( "{\"key\": \"row" + id1 + "\", \"type\": \"SetCell\", \"family_name\": \"cf1\", \"column_qualifier\": \"cq1\", \"value\": \"value1\", \"timestamp_micros\": 5000}")); - pubsubResourceManager.publish( - topic, - null, + successMessages.add( ByteString.copyFromUtf8( "{\"key\": \"row" + id2 + "\", \"type\": \"SetCell\", \"family_name\": \"cf1\", \"column_qualifier\": \"cq2\", \"value\": \"value2\", \"timestamp_micros\": 1000}")); - expectedSuccessRecords.add( - Map.of( - "key", - "row" + id1, - "type", - "SetCell", - "family_name", - "cf1", - "column_qualifier", - "cq1", - "value", - "value1", - "timestamp_micros", - 5000)); - expectedSuccessRecords.add( - Map.of( - "key", - "row" + id2, - "type", - "SetCell", - "family_name", - "cf1", - "column_qualifier", - "cq2", - "value", - "value2", - "timestamp_micros", - 1000)); - - // Invalid schema + + // Missing a field, which during MapToFields transformation will cause a failure String invalidRow = - "{\"key\": 123, \"type\": \"SetCell\", \"family_name\": \"cf1\", \"column_qualifier\": \"cq-invalid\", \"value\": \"invalid-value\", \"timestamp_micros\": 0}"; - pubsubResourceManager.publish(topic, null, ByteString.copyFromUtf8(invalidRow)); - expectedFailureRecords.add(ByteString.copyFromUtf8(invalidRow)); - - try { - TimeUnit.SECONDS.sleep(3); - } catch (InterruptedException e) { - Thread.currentThread().interrupt(); - } + "{\"type\": \"SetCell\", \"family_name\": \"cf1\", \"column_qualifier\": \"cq-invalid\", \"value\": \"invalid-value\", \"timestamp_micros\": 0}"; + failureMessages.add(ByteString.copyFromUtf8(invalidRow)); } - // Add check that 10 dlq messages were received - // PubsubMessagesCheck dlqCheck = - // PubsubMessagesCheck.builder(pubsubResourceManager, dlqTopic, "dlq-topic") - // .setMinMessages(10) - // .build(); - - // Add check that 20 messages were written to the table - - // PipelineOperator.Result result = - // pipelineOperator().waitForConditionAndFinish(createConfig(info), dlqCheck); - - PipelineOperator.Result result = - pipelineOperator() - .waitForConditionAndFinish( - createConfig(info), - // dlqCheck, - () -> bigtableResourceManager.readTable(tableId).size() >= 20); - - /******************************** Assert ********************************/ - assertThatResult(result).meetsConditions(); + LOG.info("Waiting for pipeline condition..."); + + Publisher publisher = null; + try { + publisher = Publisher.newBuilder(topic).setCredentialsProvider(credentialsProvider).build(); + final Publisher finalPublisher = publisher; + + PipelineOperator.Result result = + pipelineOperator() + .waitForConditionsAndFinish( + createConfig(info), + // Publish messages and check that 20 rows are in the BigTable + () -> { + LOG.info( + "Publishing messages to the topic to ensure pipeline has messages to" + + " process..."); + List> futures = new ArrayList<>(); + for (ByteString successMessage : successMessages) { + futures.add( + finalPublisher.publish( + PubsubMessage.newBuilder().setData(successMessage).build())); + } + for (ByteString failureMessage : failureMessages) { + futures.add( + finalPublisher.publish( + PubsubMessage.newBuilder().setData(failureMessage).build())); + } + try { + ApiFutures.allAsList(futures).get(); + LOG.info("All messages published successfully for this check."); + Thread.sleep(1000); // Sleep for 1 second to avoid crashing the vm + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new RuntimeException("Action interrupted", e); + } catch (java.util.concurrent.ExecutionException e) { + throw new RuntimeException("Error publishing messages", e); + } + + List rows = bigtableResourceManager.readTable(tableId); + if (rows == null) { + LOG.warn( + "bigtableResourceManager.readTable(tableId) returned null. Retrying."); + return false; + } + int tableSize = rows.size(); + LOG.info("Checking table size. Current size: {}", tableSize); + return tableSize == 20; + }, + // Check that a minimum of 10 messages are in the dead letter queue + PubsubMessagesCheck.builder(pubsubResourceManager, dlqTopic, "dlq-topic") + .setMinMessages(10) + .build()); + + /******************************** Assert ********************************/ + assertThatResult(result).meetsConditions(); + + } finally { + if (publisher != null) { + publisher.shutdown(); + } + } - // Verify 20 rows in the table + LOG.info("Verifying 20 rows in the BigTable still exist..."); List tableRows = bigtableResourceManager.readTable(tableId); assertThat(tableRows).hasSize(20); - // Verify 20 rows are expected + LOG.info("Verifying the exact 20 rows in BigTable..."); Map rowMap = tableRows.stream() .collect(Collectors.toMap(row -> row.getKey().toStringUtf8(), row -> row));