From 4ec98f34b71fac009ed725cb4c5cd5a8083ed75c Mon Sep 17 00:00:00 2001 From: Derrick Williams Date: Fri, 2 Jan 2026 17:06:35 +0000 Subject: [PATCH 1/6] update options with more parameters --- .../main/python/options/bigtable_options.yaml | 7 ++ .../main/python/options/pubsub_options.yaml | 64 ++++++++++++++++++- 2 files changed, 70 insertions(+), 1 deletion(-) diff --git a/yaml/src/main/python/options/bigtable_options.yaml b/yaml/src/main/python/options/bigtable_options.yaml index 28a6ea7956..0e298e5748 100644 --- a/yaml/src/main/python/options/bigtable_options.yaml +++ b/yaml/src/main/python/options/bigtable_options.yaml @@ -22,6 +22,13 @@ options: - name: "bigtable_read_options" parameters: + - order: 1 + name: "flatten" + description: "BigTable flatten boolean" + help: "If set to false, output rows are nested; if true or omitted, output rows are flattened." + default: true + required: false + type: boolean - name: "bigtable_write_options" parameters: diff --git a/yaml/src/main/python/options/pubsub_options.yaml b/yaml/src/main/python/options/pubsub_options.yaml index 05d94a0267..1a9948d902 100644 --- a/yaml/src/main/python/options/pubsub_options.yaml +++ b/yaml/src/main/python/options/pubsub_options.yaml @@ -13,7 +13,7 @@ options: description: "The message format." help: "The message format. One of: AVRO, JSON, PROTO, RAW, or STRING." default: JSON - required: false + required: true type: text - order: 3 name: "schema" @@ -24,9 +24,71 @@ options: definition. required: true type: text + - order: 4 + name: "attributes" + description: "List of attribute keys." + help: > + List of attribute keys whose values will be flattened into the + output message as additional fields. For example, if the format is `raw` + and attributes is `[a, b]` then this read will produce elements of + the form `Row(payload=..., a=..., b=...)`. + required: false + type: text + - order: 5 + name: "attributesMap" + description: "Name of a field in which to store the full set of attributes." + help: > + Name of a field in which to store the full set of attributes + associated with this message. For example, if the format is `raw` and + `attribute_map` is set to `attrs` then this read will produce elements + of the form `Row(payload=..., attrs=...)` where `attrs` is a Map type + of string to string. + If both `attributes` and `attribute_map` are set, the overlapping + attribute values will be present in both the flattened structure and the + attribute map. + required: false + type: text + - order: 6 + name: "idAttribute" + description: "The attribute on incoming Pub/Sub messages to use as a unique record identifier." + help: > + The attribute on incoming Pub/Sub messages to use as a unique + record identifier. When specified, the value of this attribute (which + can be any string that uniquely identifies the record) will be used for + deduplication of messages. If not provided, we cannot guarantee + that no duplicate data will be delivered on the Pub/Sub stream. In this + case, deduplication of the stream will be strictly best effort. + required: false + type: text + - order: 7 + name: "timestampAttribute" + description: "Message value to use as element timestamp." + help: > + Message value to use as element timestamp. If None, uses message + publishing time as the timestamp. Timestamp values should be in one of two formats: + 1). A numerical value representing the number of milliseconds since the + Unix epoch. + 2). A string in RFC 3339 format, UTC timezone. Example: + ``2015-10-29T23:41:41.123Z``. The sub-second component of the + timestamp is optional, and digits beyond the first three (i.e., time + units smaller than milliseconds) may be ignored. + - order: 8 + name: "errorHandling" + description: "Error handling configuration" + help: "This option specifies whether and where to output error rows." + required: false + type: text + - name: "pubsub_read_options" parameters: + - order: 1 + name: "subscription" + description: "Pub/Sub subscription" + help: "Pub/Sub subscription to read the input from." + example: "projects/your-project-id/subscriptions/your-subscription-name" + required: false + type: text - name: "pubsub_write_options" parameters: From e125bd4e2e094a16617cccb0825296b10513cc68 Mon Sep 17 00:00:00 2001 From: Derrick Williams Date: Fri, 2 Jan 2026 17:06:55 +0000 Subject: [PATCH 2/6] update pipeline with more parameters --- yaml/src/main/yaml/PubSubToBigTable.yaml | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/yaml/src/main/yaml/PubSubToBigTable.yaml b/yaml/src/main/yaml/PubSubToBigTable.yaml index 76908d4998..6845014dae 100644 --- a/yaml/src/main/yaml/PubSubToBigTable.yaml +++ b/yaml/src/main/yaml/PubSubToBigTable.yaml @@ -32,6 +32,7 @@ template: parameters: - pubsub_common_options + - pubsub_read_options - maptofields_common_options - bigtable_common_options - windowinto_common_options @@ -52,8 +53,13 @@ pipeline: name: ReadMessages config: topic: {{ topic }} + subscription: {{ subscription }} format: {{ format }} schema: {{ schema }} + attributes: {{ attributes }} + attribute_map: {{ attributeMap }} + id_attribute: {{ idAttribute }} + timestamp_attribute: {{ timestampAttribute }} - type: MapToFields name: ConvertStringsToBytes input: ReadMessages From 14413727ab066f4cd5909f556a9328c4c035b650 Mon Sep 17 00:00:00 2001 From: Derrick Williams Date: Fri, 2 Jan 2026 17:07:29 +0000 Subject: [PATCH 3/6] update error and parameter names --- yaml/src/main/python/generate_yaml_java_templates.py | 11 ++++++----- 1 file changed, 6 insertions(+), 5 deletions(-) diff --git a/yaml/src/main/python/generate_yaml_java_templates.py b/yaml/src/main/python/generate_yaml_java_templates.py index 1e277d8634..da1d21a234 100644 --- a/yaml/src/main/python/generate_yaml_java_templates.py +++ b/yaml/src/main/python/generate_yaml_java_templates.py @@ -16,6 +16,7 @@ import re import subprocess import sys +import traceback from pathlib import Path @@ -68,7 +69,7 @@ def run_mvn_spotless(): print("Error: 'mvn' command not found. Please ensure Maven is installed and in your PATH.") return e except subprocess.CalledProcessError as e: - print(f"Error running mvn spotless:apply: {e}", file=sys.stderr) + print(f"Error running mvn spotless:apply: {e.stdout}") return e def generate_java_interface(yaml_path, java_path): @@ -140,8 +141,8 @@ def generate_java_interface(yaml_path, java_path): java_type = JAVA_TYPE_BY_YAML_TYPE.get(param.get('type', 'text'), 'String') template_param_type = TEMPLATE_TYPE_BY_YAML_TYPE.get(param.get('type', 'text')) getter_name = "get" + param_name[0].upper() + param_name[1:] - wrapped_description = str(param.get('description', '')).strip() - wrapped_help_text = str(param.get('help', '')).strip() + description = str(param.get('description', '')).strip() + help_text = str(param.get('help', '')).strip() example = str(param.get('example', '')).strip().replace('"', '\\"') param_code = f""" @@ -149,8 +150,8 @@ def generate_java_interface(yaml_path, java_path): order = {i + 1}, name = "{param_name}", optional = {str(not param.get('required', False)).lower()}, - description = "{wrapped_description}", - helpText = "{wrapped_help_text}", + description = "{description}", + helpText = "{help_text}", example = "{example}" ) """ From c5cd3a2e9c150cbf17571793d44ff78853245c1a Mon Sep 17 00:00:00 2001 From: Derrick Williams Date: Fri, 2 Jan 2026 17:07:48 +0000 Subject: [PATCH 4/6] update template based on more parameters --- .../templates/yaml/PubSubToBigTableYaml.java | 74 +++++++++++++++++-- 1 file changed, 67 insertions(+), 7 deletions(-) diff --git a/yaml/src/main/java/com/google/cloud/teleport/templates/yaml/PubSubToBigTableYaml.java b/yaml/src/main/java/com/google/cloud/teleport/templates/yaml/PubSubToBigTableYaml.java index 825303183b..a9024d35e5 100644 --- a/yaml/src/main/java/com/google/cloud/teleport/templates/yaml/PubSubToBigTableYaml.java +++ b/yaml/src/main/java/com/google/cloud/teleport/templates/yaml/PubSubToBigTableYaml.java @@ -56,10 +56,11 @@ public interface PubSubToBigTableYaml { @TemplateParameter.Text( order = 2, name = "format", - optional = true, + optional = false, description = "The message format.", helpText = "The message format. One of: AVRO, JSON, PROTO, RAW, or STRING.", example = "") + @Validation.Required @Default.String("JSON") String getFormat(); @@ -76,6 +77,65 @@ public interface PubSubToBigTableYaml { @TemplateParameter.Text( order = 4, + name = "attributes", + optional = true, + description = "List of attribute keys.", + helpText = + "List of attribute keys whose values will be flattened into the output message as additional fields. For example, if the format is `raw` and attributes is `[a, b]` then this read will produce elements of the form `Row(payload=..., a=..., b=...)`.", + example = "") + String getAttributes(); + + @TemplateParameter.Text( + order = 5, + name = "attributesMap", + optional = true, + description = "Name of a field in which to store the full set of attributes.", + helpText = + "Name of a field in which to store the full set of attributes associated with this message. For example, if the format is `raw` and `attribute_map` is set to `attrs` then this read will produce elements of the form `Row(payload=..., attrs=...)` where `attrs` is a Map type of string to string. If both `attributes` and `attribute_map` are set, the overlapping attribute values will be present in both the flattened structure and the attribute map.", + example = "") + String getAttributesMap(); + + @TemplateParameter.Text( + order = 6, + name = "idAttribute", + optional = true, + description = + "The attribute on incoming Pub/Sub messages to use as a unique record identifier.", + helpText = + "The attribute on incoming Pub/Sub messages to use as a unique record identifier. When specified, the value of this attribute (which can be any string that uniquely identifies the record) will be used for deduplication of messages. If not provided, we cannot guarantee that no duplicate data will be delivered on the Pub/Sub stream. In this case, deduplication of the stream will be strictly best effort.", + example = "") + String getIdAttribute(); + + @TemplateParameter.Text( + order = 7, + name = "timestampAttribute", + optional = true, + description = "Message value to use as element timestamp.", + helpText = + "Message value to use as element timestamp. If None, uses message publishing time as the timestamp. Timestamp values should be in one of two formats: 1). A numerical value representing the number of milliseconds since the Unix epoch. 2). A string in RFC 3339 format, UTC timezone. Example: ``2015-10-29T23:41:41.123Z``. The sub-second component of the timestamp is optional, and digits beyond the first three (i.e., time units smaller than milliseconds) may be ignored.", + example = "") + String getTimestampAttribute(); + + @TemplateParameter.Text( + order = 8, + name = "errorHandling", + optional = true, + description = "Error handling configuration", + helpText = "This option specifies whether and where to output error rows.", + example = "") + String getErrorHandling(); + + @TemplateParameter.Text( + order = 9, + name = "subscription", + optional = true, + description = "Pub/Sub subscription", + helpText = "Pub/Sub subscription to read the input from.", + example = "projects/your-project-id/subscriptions/your-subscription-name") + String getSubscription(); + + @TemplateParameter.Text( + order = 10, name = "language", optional = false, description = "Language used to define the expressions.", @@ -86,7 +146,7 @@ public interface PubSubToBigTableYaml { String getLanguage(); @TemplateParameter.Text( - order = 5, + order = 11, name = "fields", optional = false, description = "Field mapping configuration", @@ -97,7 +157,7 @@ public interface PubSubToBigTableYaml { String getFields(); @TemplateParameter.Text( - order = 6, + order = 12, name = "projectId", optional = false, description = "BigTable project ID", @@ -107,7 +167,7 @@ public interface PubSubToBigTableYaml { String getProjectId(); @TemplateParameter.Text( - order = 7, + order = 13, name = "instanceId", optional = false, description = "BigTable instance ID", @@ -117,7 +177,7 @@ public interface PubSubToBigTableYaml { String getInstanceId(); @TemplateParameter.Text( - order = 8, + order = 14, name = "tableId", optional = false, description = "BigTable output table", @@ -127,7 +187,7 @@ public interface PubSubToBigTableYaml { String getTableId(); @TemplateParameter.Text( - order = 9, + order = 15, name = "windowing", optional = true, description = "Windowing options", @@ -137,7 +197,7 @@ public interface PubSubToBigTableYaml { String getWindowing(); @TemplateParameter.Text( - order = 10, + order = 16, name = "outputDeadLetterPubSubTopic", optional = false, description = "Pub/Sub transformation error topic", From 91ace5b0ada360bc8cbc72dc0231874a9c7711b0 Mon Sep 17 00:00:00 2001 From: Derrick Williams Date: Fri, 2 Jan 2026 17:12:10 +0000 Subject: [PATCH 5/6] update default on format --- .../cloud/teleport/templates/yaml/PubSubToBigTableYaml.java | 2 -- yaml/src/main/python/options/pubsub_options.yaml | 1 - 2 files changed, 3 deletions(-) diff --git a/yaml/src/main/java/com/google/cloud/teleport/templates/yaml/PubSubToBigTableYaml.java b/yaml/src/main/java/com/google/cloud/teleport/templates/yaml/PubSubToBigTableYaml.java index a9024d35e5..425bcc5f6d 100644 --- a/yaml/src/main/java/com/google/cloud/teleport/templates/yaml/PubSubToBigTableYaml.java +++ b/yaml/src/main/java/com/google/cloud/teleport/templates/yaml/PubSubToBigTableYaml.java @@ -18,7 +18,6 @@ import com.google.cloud.teleport.metadata.Template; import com.google.cloud.teleport.metadata.TemplateCategory; import com.google.cloud.teleport.metadata.TemplateParameter; -import org.apache.beam.sdk.options.Default; import org.apache.beam.sdk.options.Validation; @Template( @@ -61,7 +60,6 @@ public interface PubSubToBigTableYaml { helpText = "The message format. One of: AVRO, JSON, PROTO, RAW, or STRING.", example = "") @Validation.Required - @Default.String("JSON") String getFormat(); @TemplateParameter.Text( diff --git a/yaml/src/main/python/options/pubsub_options.yaml b/yaml/src/main/python/options/pubsub_options.yaml index 1a9948d902..807b506fdf 100644 --- a/yaml/src/main/python/options/pubsub_options.yaml +++ b/yaml/src/main/python/options/pubsub_options.yaml @@ -12,7 +12,6 @@ options: name: "format" description: "The message format." help: "The message format. One of: AVRO, JSON, PROTO, RAW, or STRING." - default: JSON required: true type: text - order: 3 From e72ca76713874d6f346a05f26fc0e8b4e9d2585e Mon Sep 17 00:00:00 2001 From: Derrick Williams Date: Fri, 2 Jan 2026 17:13:32 +0000 Subject: [PATCH 6/6] remove unused import --- yaml/src/main/python/generate_yaml_java_templates.py | 1 - 1 file changed, 1 deletion(-) diff --git a/yaml/src/main/python/generate_yaml_java_templates.py b/yaml/src/main/python/generate_yaml_java_templates.py index da1d21a234..71082760be 100644 --- a/yaml/src/main/python/generate_yaml_java_templates.py +++ b/yaml/src/main/python/generate_yaml_java_templates.py @@ -16,7 +16,6 @@ import re import subprocess import sys -import traceback from pathlib import Path