diff --git a/it/google-cloud-platform/src/main/java/org/apache/beam/it/gcp/pubsub/conditions/PubsubMessagesCheck.java b/it/google-cloud-platform/src/main/java/org/apache/beam/it/gcp/pubsub/conditions/PubsubMessagesCheck.java index 6803eec291..e02f6b3479 100644 --- a/it/google-cloud-platform/src/main/java/org/apache/beam/it/gcp/pubsub/conditions/PubsubMessagesCheck.java +++ b/it/google-cloud-platform/src/main/java/org/apache/beam/it/gcp/pubsub/conditions/PubsubMessagesCheck.java @@ -21,6 +21,7 @@ import com.google.pubsub.v1.PullResponse; import com.google.pubsub.v1.ReceivedMessage; import com.google.pubsub.v1.SubscriptionName; +import com.google.pubsub.v1.TopicName; import java.util.ArrayList; import java.util.List; import javax.annotation.Nullable; @@ -86,6 +87,12 @@ public CheckResult check() { String.format("Expected at least %d messages and found %d", minMessages(), totalRows)); } + public static Builder builder( + PubsubResourceManager resourceManager, TopicName topic, String subscription) { + SubscriptionName subscriptionName = resourceManager.createSubscription(topic, subscription); + return builder(resourceManager, subscriptionName); + } + public static Builder builder( PubsubResourceManager resourceManager, SubscriptionName subscription) { return new AutoValue_PubsubMessagesCheck.Builder() diff --git a/yaml/README_PubSub_To_BigTable_Yaml.md b/yaml/README_PubSub_To_BigTable_Yaml.md new file mode 100644 index 0000000000..55b5f14e78 --- /dev/null +++ b/yaml/README_PubSub_To_BigTable_Yaml.md @@ -0,0 +1,244 @@ + +PubSub to BigTable (YAML) template +--- +The PubSub to BigTable template is a streaming pipeline which ingests data from a +PubSub topic, executes a user-defined mapping, and writes the resulting records +to BigTable. Any errors which occur in the transformation of the data are written +to a separate Pub/Sub topic. + + +:memo: This is a Google-provided template! Please +check [Provided templates documentation](https://cloud.google.com/dataflow/docs/guides/templates/provided-yaml/pubsub-to-bigtable) +on how to use it without having to build from sources using [Create job from template](https://console.cloud.google.com/dataflow/createjob?template=PubSub_To_BigTable_Yaml). + +:bulb: This is a generated documentation based +on [Metadata Annotations](https://github.com/GoogleCloudPlatform/DataflowTemplates/blob/main/contributor-docs/code-contributions.md#metadata-annotations) +. Do not change this file directly. + +## Parameters + +### Required parameters + +* **topic**: Pub/Sub topic to read the input from. For example, `projects/your-project-id/topics/your-topic-name`. +* **schema**: A schema is required if data format is JSON, AVRO or PROTO. For JSON, this is a JSON schema. For AVRO and PROTO, this is the full schema definition. +* **language**: The language used to define (and execute) the expressions and/or callables in fields. Defaults to generic. +* **fields**: The output fields to compute, each mapping to the expression or callable that creates them. +* **project_id**: The Google Cloud project ID of the BigTable instance. +* **instance_id**: The BigTable instance ID. +* **table_id**: BigTable table ID to write the output to. +* **outputDeadLetterPubSubTopic**: Pub/Sub error topic for failed transformation messages. For example, `projects/your-project-id/topics/your-error-topic-name`. + +### Optional parameters + +* **format**: The message format. One of: AVRO, JSON, PROTO, RAW, or STRING. Defaults to: JSON. + + + +## Getting Started + +### Requirements + +* Java 17 +* Maven +* [gcloud CLI](https://cloud.google.com/sdk/gcloud), and execution of the + following commands: + * `gcloud auth login` + * `gcloud auth application-default login` + +:star2: Those dependencies are pre-installed if you use Google Cloud Shell! + +[![Open in Cloud Shell](http://gstatic.com/cloudssh/images/open-btn.svg)](https://console.cloud.google.com/cloudshell/editor?cloudshell_git_repo=https%3A%2F%2Fgithub.com%2FGoogleCloudPlatform%2FDataflowTemplates.git&cloudshell_open_in_editor=yaml/src/main/java/com/google/cloud/teleport/templates/yaml/PubSubToBigTableYaml.java) + +### Templates Plugin + +This README provides instructions using +the [Templates Plugin](https://github.com/GoogleCloudPlatform/DataflowTemplates/blob/main/contributor-docs/code-contributions.md#templates-plugin). + +#### Validating the Template + +This template has a validation command that is used to check code quality. + +```shell +mvn clean install -PtemplatesValidate \ +-DskipTests -am \ +-pl yaml +``` + +### Building Template + +This template is a Flex Template, meaning that the pipeline code will be +containerized and the container will be executed on Dataflow. Please +check [Use Flex Templates](https://cloud.google.com/dataflow/docs/guides/templates/using-flex-templates) +and [Configure Flex Templates](https://cloud.google.com/dataflow/docs/guides/templates/configuring-flex-templates) +for more information. + +#### Staging the Template + +If the plan is to just stage the template (i.e., make it available to use) by +the `gcloud` command or Dataflow "Create job from template" UI, +the `-PtemplatesStage` profile should be used: + +```shell +export PROJECT= +export BUCKET_NAME= +export ARTIFACT_REGISTRY_REPO=-docker.pkg.dev/$PROJECT/ + +mvn clean package -PtemplatesStage \ +-DskipTests \ +-DprojectId="$PROJECT" \ +-DbucketName="$BUCKET_NAME" \ +-DartifactRegistry="$ARTIFACT_REGISTRY_REPO" \ +-DstagePrefix="templates" \ +-DtemplateName="PubSub_To_BigTable_Yaml" \ +-f yaml +``` + +The `-DartifactRegistry` parameter can be specified to set the artifact registry repository of the Flex Templates image. +If not provided, it defaults to `gcr.io/`. + +The command should build and save the template to Google Cloud, and then print +the complete location on Cloud Storage: + +``` +Flex Template was staged! gs:///templates/flex/PubSub_To_BigTable_Yaml +``` + +The specific path should be copied as it will be used in the following steps. + +#### Running the Template + +**Using the staged template**: + +You can use the path above run the template (or share with others for execution). + +To start a job with the template at any time using `gcloud`, you are going to +need valid resources for the required parameters. + +Provided that, the following command line can be used: + +```shell +export PROJECT= +export BUCKET_NAME= +export REGION=us-central1 +export TEMPLATE_SPEC_GCSPATH="gs://$BUCKET_NAME/templates/flex/PubSub_To_BigTable_Yaml" + +### Required +export TOPIC= +export SCHEMA= +export LANGUAGE= +export FIELDS= +export PROJECT_ID= +export INSTANCE_ID= +export TABLE_ID= +export OUTPUT_DEAD_LETTER_PUB_SUB_TOPIC= + +### Optional +export FORMAT=JSON + +gcloud dataflow flex-template run "pubsub-to-bigtable-yaml-job" \ + --project "$PROJECT" \ + --region "$REGION" \ + --template-file-gcs-location "$TEMPLATE_SPEC_GCSPATH" \ + --parameters "topic=$TOPIC" \ + --parameters "format=$FORMAT" \ + --parameters "schema=$SCHEMA" \ + --parameters "language=$LANGUAGE" \ + --parameters "fields=$FIELDS" \ + --parameters "project_id=$PROJECT_ID" \ + --parameters "instance_id=$INSTANCE_ID" \ + --parameters "table_id=$TABLE_ID" \ + --parameters "outputDeadLetterPubSubTopic=$OUTPUT_DEAD_LETTER_PUB_SUB_TOPIC" +``` + +For more information about the command, please check: +https://cloud.google.com/sdk/gcloud/reference/dataflow/flex-template/run + + +**Using the plugin**: + +Instead of just generating the template in the folder, it is possible to stage +and run the template in a single command. This may be useful for testing when +changing the templates. + +```shell +export PROJECT= +export BUCKET_NAME= +export REGION=us-central1 + +### Required +export TOPIC= +export SCHEMA= +export LANGUAGE= +export FIELDS= +export PROJECT_ID= +export INSTANCE_ID= +export TABLE_ID= +export OUTPUT_DEAD_LETTER_PUB_SUB_TOPIC= + +### Optional +export FORMAT=JSON + +mvn clean package -PtemplatesRun \ +-DskipTests \ +-DprojectId="$PROJECT" \ +-DbucketName="$BUCKET_NAME" \ +-Dregion="$REGION" \ +-DjobName="pubsub-to-bigtable-yaml-job" \ +-DtemplateName="PubSub_To_BigTable_Yaml" \ +-Dparameters="topic=$TOPIC,format=$FORMAT,schema=$SCHEMA,language=$LANGUAGE,fields=$FIELDS,project_id=$PROJECT_ID,instance_id=$INSTANCE_ID,table_id=$TABLE_ID,outputDeadLetterPubSubTopic=$OUTPUT_DEAD_LETTER_PUB_SUB_TOPIC" \ +-f yaml +``` + +## Terraform + +Dataflow supports the utilization of Terraform to manage template jobs, +see [dataflow_flex_template_job](https://registry.terraform.io/providers/hashicorp/google/latest/docs/resources/dataflow_flex_template_job). + +Terraform modules have been generated for most templates in this repository. This includes the relevant parameters +specific to the template. If available, they may be used instead of +[dataflow_flex_template_job](https://registry.terraform.io/providers/hashicorp/google/latest/docs/resources/dataflow_flex_template_job) +directly. + +To use the autogenerated module, execute the standard +[terraform workflow](https://developer.hashicorp.com/terraform/intro/core-workflow): + +```shell +cd v2/yaml/terraform/PubSub_To_BigTable_Yaml +terraform init +terraform apply +``` + +To use +[dataflow_flex_template_job](https://registry.terraform.io/providers/hashicorp/google/latest/docs/resources/dataflow_flex_template_job) +directly: + +```terraform +provider "google-beta" { + project = var.project +} +variable "project" { + default = "" +} +variable "region" { + default = "us-central1" +} + +resource "google_dataflow_flex_template_job" "pubsub_to_bigtable_yaml" { + + provider = google-beta + container_spec_gcs_path = "gs://dataflow-templates-${var.region}/latest/flex/PubSub_To_BigTable_Yaml" + name = "pubsub-to-bigtable-yaml" + region = var.region + parameters = { + topic = "" + schema = "" + language = "" + fields = "" + project_id = "" + instance_id = "" + table_id = "" + outputDeadLetterPubSubTopic = "" + # format = "JSON" + } +} +``` diff --git a/yaml/src/main/java/com/google/cloud/teleport/templates/yaml/PubSubToBigTableYaml.java b/yaml/src/main/java/com/google/cloud/teleport/templates/yaml/PubSubToBigTableYaml.java new file mode 100644 index 0000000000..825303183b --- /dev/null +++ b/yaml/src/main/java/com/google/cloud/teleport/templates/yaml/PubSubToBigTableYaml.java @@ -0,0 +1,148 @@ +/* + * Copyright (C) 2025 Google LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not + * use this file except in compliance with the License. You may obtain a copy of + * the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ +package com.google.cloud.teleport.templates.yaml; + +import com.google.cloud.teleport.metadata.Template; +import com.google.cloud.teleport.metadata.TemplateCategory; +import com.google.cloud.teleport.metadata.TemplateParameter; +import org.apache.beam.sdk.options.Default; +import org.apache.beam.sdk.options.Validation; + +@Template( + name = "PubSub_To_BigTable_Yaml", + category = TemplateCategory.STREAMING, + type = Template.TemplateType.YAML, + displayName = "PubSub to BigTable (YAML)", + description = + "The PubSub to BigTable template is a streaming pipeline which ingests data from a PubSub topic, executes a user-defined mapping, and writes the resulting records to BigTable. Any errors which occur in the transformation of the data are written to a separate Pub/Sub topic.", + flexContainerName = "pubsub-to-bigtable-yaml", + yamlTemplateFile = "PubSubToBigTable.yaml", + filesToCopy = {"PubSubToBigTable.yaml", "main.py", "requirements.txt"}, + documentation = + "https://cloud.google.com/dataflow/docs/guides/templates/provided-yaml/pubsub-to-bigtable", + contactInformation = "https://cloud.google.com/support", + requirements = { + "The input Pub/Sub topic must exist.", + "The mapToField Pub/Sub error topic must exist.", + "The output BigTable table must exist." + }, + streaming = true, + hidden = false) +public interface PubSubToBigTableYaml { + + @TemplateParameter.Text( + order = 1, + name = "topic", + optional = false, + description = "Pub/Sub input topic", + helpText = "Pub/Sub topic to read the input from.", + example = "projects/your-project-id/topics/your-topic-name") + @Validation.Required + String getTopic(); + + @TemplateParameter.Text( + order = 2, + name = "format", + optional = true, + description = "The message format.", + helpText = "The message format. One of: AVRO, JSON, PROTO, RAW, or STRING.", + example = "") + @Default.String("JSON") + String getFormat(); + + @TemplateParameter.Text( + order = 3, + name = "schema", + optional = false, + description = "Data schema.", + helpText = + "A schema is required if data format is JSON, AVRO or PROTO. For JSON, this is a JSON schema. For AVRO and PROTO, this is the full schema definition.", + example = "") + @Validation.Required + String getSchema(); + + @TemplateParameter.Text( + order = 4, + name = "language", + optional = false, + description = "Language used to define the expressions.", + helpText = + "The language used to define (and execute) the expressions and/or callables in fields. Defaults to generic.", + example = "") + @Validation.Required + String getLanguage(); + + @TemplateParameter.Text( + order = 5, + name = "fields", + optional = false, + description = "Field mapping configuration", + helpText = + "The output fields to compute, each mapping to the expression or callable that creates them.", + example = "") + @Validation.Required + String getFields(); + + @TemplateParameter.Text( + order = 6, + name = "projectId", + optional = false, + description = "BigTable project ID", + helpText = "The Google Cloud project ID of the BigTable instance.", + example = "") + @Validation.Required + String getProjectId(); + + @TemplateParameter.Text( + order = 7, + name = "instanceId", + optional = false, + description = "BigTable instance ID", + helpText = "The BigTable instance ID.", + example = "") + @Validation.Required + String getInstanceId(); + + @TemplateParameter.Text( + order = 8, + name = "tableId", + optional = false, + description = "BigTable output table", + helpText = "BigTable table ID to write the output to.", + example = "") + @Validation.Required + String getTableId(); + + @TemplateParameter.Text( + order = 9, + name = "windowing", + optional = true, + description = "Windowing options", + helpText = + "Windowing options - see https://beam.apache.org/documentation/sdks/yaml/#windowing", + example = "") + String getWindowing(); + + @TemplateParameter.Text( + order = 10, + name = "outputDeadLetterPubSubTopic", + optional = false, + description = "Pub/Sub transformation error topic", + helpText = "Pub/Sub error topic for failed transformation messages.", + example = "projects/your-project-id/topics/your-error-topic-name") + @Validation.Required + String getOutputDeadLetterPubSubTopic(); +} diff --git a/yaml/src/main/python/options/bigtable_options.yaml b/yaml/src/main/python/options/bigtable_options.yaml new file mode 100644 index 0000000000..28a6ea7956 --- /dev/null +++ b/yaml/src/main/python/options/bigtable_options.yaml @@ -0,0 +1,27 @@ +options: + - name: "bigtable_common_options" + parameters: + - order: 1 + name: "projectId" + description: "BigTable project ID" + help: "The Google Cloud project ID of the BigTable instance." + required: true + type: text + - order: 2 + name: "instanceId" + description: "BigTable instance ID" + help: "The BigTable instance ID." + required: true + type: text + - order: 3 + name: "tableId" + description: "BigTable output table" + help: "BigTable table ID to write the output to." + required: true + type: text + + - name: "bigtable_read_options" + parameters: + + - name: "bigtable_write_options" + parameters: diff --git a/yaml/src/main/python/options/maptofields_options.yaml b/yaml/src/main/python/options/maptofields_options.yaml new file mode 100644 index 0000000000..f50d050eda --- /dev/null +++ b/yaml/src/main/python/options/maptofields_options.yaml @@ -0,0 +1,25 @@ +options: + - name: "maptofields_common_options" + parameters: + - order: 1 + name: "language" + description: "Language used to define the expressions." + help: > + The language used to define (and execute) the expressions and/or + callables in fields. Defaults to generic. + required: true + type: text + - order: 2 + name: "fields" + description: "Field mapping configuration" + help: > + The output fields to compute, each mapping to the expression or callable + that creates them. + required: true + type: text + + - name: "maptofields_read_options" + parameters: + + - name: "maptofields_write_options" + parameters: diff --git a/yaml/src/main/python/options/pubsub_options.yaml b/yaml/src/main/python/options/pubsub_options.yaml new file mode 100644 index 0000000000..05d94a0267 --- /dev/null +++ b/yaml/src/main/python/options/pubsub_options.yaml @@ -0,0 +1,33 @@ +options: + - name: "pubsub_common_options" + parameters: + - order: 1 + name: "topic" + description: "Pub/Sub input topic" + help: "Pub/Sub topic to read the input from." + example: "projects/your-project-id/topics/your-topic-name" + required: true + type: text + - order: 2 + name: "format" + description: "The message format." + help: "The message format. One of: AVRO, JSON, PROTO, RAW, or STRING." + default: JSON + required: false + type: text + - order: 3 + name: "schema" + description: "Data schema." + help: > + A schema is required if data format is JSON, AVRO or PROTO. For JSON, + this is a JSON schema. For AVRO and PROTO, this is the full schema + definition. + required: true + type: text + + - name: "pubsub_read_options" + parameters: + + - name: "pubsub_write_options" + parameters: + diff --git a/yaml/src/main/python/options/windowinto_options.yaml b/yaml/src/main/python/options/windowinto_options.yaml new file mode 100644 index 0000000000..461405d0c4 --- /dev/null +++ b/yaml/src/main/python/options/windowinto_options.yaml @@ -0,0 +1,10 @@ +options: + - name: "windowinto_common_options" + parameters: + - order: 1 + name: "windowing" + description: "Windowing options" + help: "Windowing options - see https://beam.apache.org/documentation/sdks/yaml/#windowing" + is_optional: false + type: text + diff --git a/yaml/src/main/yaml/PubSubToBigTable.yaml b/yaml/src/main/yaml/PubSubToBigTable.yaml new file mode 100644 index 0000000000..76908d4998 --- /dev/null +++ b/yaml/src/main/yaml/PubSubToBigTable.yaml @@ -0,0 +1,87 @@ +template: + + name: "PubSub_To_BigTable_Yaml" + category: "STREAMING" + type: "YAML" + display_name: "PubSub to BigTable (YAML)" + description: > + The PubSub to BigTable template is a streaming pipeline which ingests + data from a PubSub topic, executes a user-defined mapping, and + writes the resulting records to BigTable. Any errors which occur in the + transformation of the data are written to a separate Pub/Sub topic. + flex_container_name: "pubsub-to-bigtable-yaml" + yamlTemplateFile: "PubSubToBigTable.yaml" + filesToCopy: > + {"PubSubToBigTable.yaml", "main.py", "requirements.txt"} + documentation: > + https://cloud.google.com/dataflow/docs/guides/templates/provided-yaml/pubsub-to-bigtable + contactInformation: "https://cloud.google.com/support" + requirements: { + "The input Pub/Sub topic must exist.", + "The mapToField Pub/Sub error topic must exist.", + "The output BigTable table must exist." + } + streaming: true + hidden: false + + options_file: + - "pubsub_options" + - "maptofields_options" + - "windowinto_options" + - "bigtable_options" + + parameters: + - pubsub_common_options + - maptofields_common_options + - bigtable_common_options + - windowinto_common_options + + - name: "outputDeadLetterPubSubTopic" + description: "Pub/Sub transformation error topic" + help: "Pub/Sub error topic for failed transformation messages." + example: "projects/your-project-id/topics/your-error-topic-name" + required: true + type: text + order: 1 + + +pipeline: + type: composite + transforms: + - type: ReadFromPubSub + name: ReadMessages + config: + topic: {{ topic }} + format: {{ format }} + schema: {{ schema }} + - type: MapToFields + name: ConvertStringsToBytes + input: ReadMessages + config: + language: {{ language }} + fields: {{ fields }} + error_handling: + output: errors + - type: WindowInto + name: WindowInto + input: ConvertStringsToBytes + config: + windowing: {{ windowing }} + - type: WriteToBigTable + name: WriteGoodMessagesToBigTable + input: WindowInto + config: + project: {{ projectId }} + instance: {{ instanceId }} + table: {{ tableId }} + - type: WriteToPubSub + name: WriteTransformationErrorsToDeadLetterQueue + input: ConvertStringsToBytes.errors + config: + topic: {{ outputDeadLetterPubSubTopic }} + format: {{ format }} + schema: | + {{ schema }} + +options: + streaming: true diff --git a/yaml/src/test/java/com/google/cloud/teleport/templates/yaml/PubSubToBigTableYamlIT.java b/yaml/src/test/java/com/google/cloud/teleport/templates/yaml/PubSubToBigTableYamlIT.java new file mode 100644 index 0000000000..f3a6c489f4 --- /dev/null +++ b/yaml/src/test/java/com/google/cloud/teleport/templates/yaml/PubSubToBigTableYamlIT.java @@ -0,0 +1,250 @@ +/* + * Copyright (C) 2025 Google LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not + * use this file except in compliance with the License. You may obtain a copy of + * the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ +package com.google.cloud.teleport.templates.yaml; + +import static com.google.common.truth.Truth.assertThat; +import static org.apache.beam.it.truthmatchers.PipelineAsserts.assertThatPipeline; +import static org.apache.beam.it.truthmatchers.PipelineAsserts.assertThatResult; + +import com.google.api.core.ApiFuture; +import com.google.api.core.ApiFutures; +import com.google.cloud.bigtable.data.v2.models.Row; +import com.google.cloud.pubsub.v1.Publisher; +import com.google.cloud.teleport.metadata.SkipDirectRunnerTest; +import com.google.cloud.teleport.metadata.TemplateIntegrationTest; +import com.google.common.collect.ImmutableList; +import com.google.protobuf.ByteString; +import com.google.pubsub.v1.PubsubMessage; +import com.google.pubsub.v1.TopicName; +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import java.util.function.Function; +import java.util.stream.Collectors; +import org.apache.beam.it.common.PipelineLauncher; +import org.apache.beam.it.common.PipelineOperator; +import org.apache.beam.it.common.utils.ResourceManagerUtils; +import org.apache.beam.it.gcp.TemplateTestBase; +import org.apache.beam.it.gcp.bigtable.BigtableResourceManager; +import org.apache.beam.it.gcp.pubsub.PubsubResourceManager; +import org.apache.beam.it.gcp.pubsub.conditions.PubsubMessagesCheck; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; +import org.junit.experimental.categories.Category; +import org.junit.runner.RunWith; +import org.junit.runners.JUnit4; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** Integration test for {@link PubSubToBigTableYaml}. */ +@Category({TemplateIntegrationTest.class, SkipDirectRunnerTest.class}) +@TemplateIntegrationTest(PubSubToBigTableYaml.class) +@RunWith(JUnit4.class) +public final class PubSubToBigTableYamlIT extends TemplateTestBase { + + private static final Logger LOG = LoggerFactory.getLogger(PubSubToBigTableYamlIT.class); + + private PubsubResourceManager pubsubResourceManager; + private BigtableResourceManager bigtableResourceManager; + + @Before + public void setup() throws IOException { + // Pubsub and BigTable resource managers + pubsubResourceManager = + PubsubResourceManager.builder(testName, PROJECT, credentialsProvider).build(); + bigtableResourceManager = + BigtableResourceManager.builder(testName, PROJECT, credentialsProvider).build(); + } + + @After + public void tearDown() { + // Clean up resource managers + ResourceManagerUtils.cleanResources(pubsubResourceManager, bigtableResourceManager); + } + + @Test + public void testPubSubToBigTable() throws IOException { + pubSubToBigTable(Function.identity()); + } + + public void pubSubToBigTable( + Function + paramsAdder) + throws IOException { + + LOG.info("Starting pubSubToBigTable test. Test name: {}. Spec path: {}", testName, specPath); + + /******************************* Arrange ********************************/ + + LOG.info("Creating main and dead letter queue topics..."); + TopicName topic = pubsubResourceManager.createTopic("input"); + TopicName dlqTopic = pubsubResourceManager.createTopic("dlq"); + + LOG.info("Creating Bigtable table..."); + String tableId = "test_table"; + bigtableResourceManager.createTable(tableId, ImmutableList.of("cf1")); + + LOG.info("Creating launch config with yaml pipeline parameters..."); + PipelineLauncher.LaunchConfig.Builder options = + paramsAdder.apply( + PipelineLauncher.LaunchConfig.builder(testName, specPath) + .addParameter("topic", topic.toString()) + .addParameter("format", "JSON") + .addParameter( + "schema", + "{\"type\":\"object\",\"properties\":{\"key\":{\"type\":\"string\"},\"type\":{\"type\":\"string\"},\"family_name\":{\"type\":\"string\"},\"column_qualifier\":{\"type\":\"string\"},\"value\":{\"type\":\"string\"},\"timestamp_micros\":{\"type\":\"integer\"}}}") + .addParameter("windowing", "{\"type\":\"fixed\",\"size\":\"10s\"}") + .addParameter("tableId", tableId) + .addParameter("instanceId", bigtableResourceManager.getInstanceId()) + .addParameter("projectId", PROJECT) + .addParameter("language", "python") + .addParameter( + "fields", + "{" + + "\"key\": {\"expression\": \"key.encode('utf-8')\", \"output_type\": \"bytes\"}," + + "\"type\": {\"expression\": \"type\", \"output_type\": \"string\"}," + + "\"family_name\": {\"expression\": \"family_name\", \"output_type\": \"string\"}," + + "\"column_qualifier\": {\"expression\": \"column_qualifier.encode('utf-8')\", \"output_type\": \"bytes\"}," + + "\"value\": {\"expression\": \"value.encode('utf-8')\", \"output_type\": \"bytes\"}," + + "\"timestamp_micros\": {\"expression\": \"timestamp_micros\", \"output_type\": \"integer\"}" + + "}") + .addParameter("outputDeadLetterPubSubTopic", dlqTopic.toString())); + + /********************************* Act **********************************/ + + LOG.info("Launching template with options..."); + PipelineLauncher.LaunchInfo info = launchTemplate(options); + + LOG.info("Template launched. LaunchInfo: {}", info); + assertThatPipeline(info).isRunning(); + + LOG.info("Creating messages to be published into the topic..."); + List successMessages = new ArrayList<>(); + List failureMessages = new ArrayList<>(); + for (int i = 1; i <= 10; i++) { + long id1 = Long.parseLong(i + "1"); + long id2 = Long.parseLong(i + "2"); + successMessages.add( + ByteString.copyFromUtf8( + "{\"key\": \"row" + + id1 + + "\", \"type\": \"SetCell\", \"family_name\": \"cf1\", \"column_qualifier\": \"cq1\", \"value\": \"value1\", \"timestamp_micros\": 5000}")); + successMessages.add( + ByteString.copyFromUtf8( + "{\"key\": \"row" + + id2 + + "\", \"type\": \"SetCell\", \"family_name\": \"cf1\", \"column_qualifier\": \"cq2\", \"value\": \"value2\", \"timestamp_micros\": 1000}")); + + // Missing a field, which during MapToFields transformation will cause a failure + String invalidRow = + "{\"type\": \"SetCell\", \"family_name\": \"cf1\", \"column_qualifier\": \"cq-invalid\", \"value\": \"invalid-value\", \"timestamp_micros\": 0}"; + failureMessages.add(ByteString.copyFromUtf8(invalidRow)); + } + + LOG.info("Waiting for pipeline condition..."); + + Publisher publisher = null; + try { + publisher = Publisher.newBuilder(topic).setCredentialsProvider(credentialsProvider).build(); + final Publisher finalPublisher = publisher; + + PipelineOperator.Result result = + pipelineOperator() + .waitForConditionsAndFinish( + createConfig(info), + // Publish messages and check that 20 rows are in the BigTable + () -> { + LOG.info( + "Publishing messages to the topic to ensure pipeline has messages to" + + " process..."); + List> futures = new ArrayList<>(); + for (ByteString successMessage : successMessages) { + futures.add( + finalPublisher.publish( + PubsubMessage.newBuilder().setData(successMessage).build())); + } + for (ByteString failureMessage : failureMessages) { + futures.add( + finalPublisher.publish( + PubsubMessage.newBuilder().setData(failureMessage).build())); + } + try { + ApiFutures.allAsList(futures).get(); + LOG.info("All messages published successfully for this check."); + Thread.sleep(1000); // Sleep for 1 second to avoid crashing the vm + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new RuntimeException("Action interrupted", e); + } catch (java.util.concurrent.ExecutionException e) { + throw new RuntimeException("Error publishing messages", e); + } + + List rows = bigtableResourceManager.readTable(tableId); + if (rows == null) { + LOG.warn( + "bigtableResourceManager.readTable(tableId) returned null. Retrying."); + return false; + } + int tableSize = rows.size(); + LOG.info("Checking table size. Current size: {}", tableSize); + return tableSize == 20; + }, + // Check that a minimum of 10 messages are in the dead letter queue + PubsubMessagesCheck.builder(pubsubResourceManager, dlqTopic, "dlq-topic") + .setMinMessages(10) + .build()); + + /******************************** Assert ********************************/ + assertThatResult(result).meetsConditions(); + + } finally { + if (publisher != null) { + publisher.shutdown(); + } + } + + LOG.info("Verifying 20 rows in the BigTable still exist..."); + List tableRows = bigtableResourceManager.readTable(tableId); + assertThat(tableRows).hasSize(20); + + LOG.info("Verifying the exact 20 rows in BigTable..."); + Map rowMap = + tableRows.stream() + .collect(Collectors.toMap(row -> row.getKey().toStringUtf8(), row -> row)); + + for (int i = 1; i <= 10; i++) { + String key1 = "row" + i + "1"; + assertThat(rowMap).containsKey(key1); + Row row1 = rowMap.get(key1); + assertThat(row1.getCells()).hasSize(1); + assertThat(row1.getCells().get(0).getFamily()).isEqualTo("cf1"); + assertThat(row1.getCells().get(0).getQualifier().toStringUtf8()).isEqualTo("cq1"); + assertThat(row1.getCells().get(0).getValue().toStringUtf8()).isEqualTo("value1"); + assertThat(row1.getCells().get(0).getTimestamp()).isEqualTo(5000L); + + String key2 = "row" + i + "2"; + assertThat(rowMap).containsKey(key2); + Row row2 = rowMap.get(key2); + assertThat(row2.getCells()).hasSize(1); + assertThat(row2.getCells().get(0).getFamily()).isEqualTo("cf1"); + assertThat(row2.getCells().get(0).getQualifier().toStringUtf8()).isEqualTo("cq2"); + assertThat(row2.getCells().get(0).getValue().toStringUtf8()).isEqualTo("value2"); + assertThat(row2.getCells().get(0).getTimestamp()).isEqualTo(1000L); + } + } +}