diff --git a/yaml/src/main/java/com/google/cloud/teleport/templates/yaml/PubSubToBigTableYaml.java b/yaml/src/main/java/com/google/cloud/teleport/templates/yaml/PubSubToBigTableYaml.java index 825303183b..425bcc5f6d 100644 --- a/yaml/src/main/java/com/google/cloud/teleport/templates/yaml/PubSubToBigTableYaml.java +++ b/yaml/src/main/java/com/google/cloud/teleport/templates/yaml/PubSubToBigTableYaml.java @@ -18,7 +18,6 @@ import com.google.cloud.teleport.metadata.Template; import com.google.cloud.teleport.metadata.TemplateCategory; import com.google.cloud.teleport.metadata.TemplateParameter; -import org.apache.beam.sdk.options.Default; import org.apache.beam.sdk.options.Validation; @Template( @@ -56,11 +55,11 @@ public interface PubSubToBigTableYaml { @TemplateParameter.Text( order = 2, name = "format", - optional = true, + optional = false, description = "The message format.", helpText = "The message format. One of: AVRO, JSON, PROTO, RAW, or STRING.", example = "") - @Default.String("JSON") + @Validation.Required String getFormat(); @TemplateParameter.Text( @@ -76,6 +75,65 @@ public interface PubSubToBigTableYaml { @TemplateParameter.Text( order = 4, + name = "attributes", + optional = true, + description = "List of attribute keys.", + helpText = + "List of attribute keys whose values will be flattened into the output message as additional fields. For example, if the format is `raw` and attributes is `[a, b]` then this read will produce elements of the form `Row(payload=..., a=..., b=...)`.", + example = "") + String getAttributes(); + + @TemplateParameter.Text( + order = 5, + name = "attributesMap", + optional = true, + description = "Name of a field in which to store the full set of attributes.", + helpText = + "Name of a field in which to store the full set of attributes associated with this message. For example, if the format is `raw` and `attribute_map` is set to `attrs` then this read will produce elements of the form `Row(payload=..., attrs=...)` where `attrs` is a Map type of string to string. If both `attributes` and `attribute_map` are set, the overlapping attribute values will be present in both the flattened structure and the attribute map.", + example = "") + String getAttributesMap(); + + @TemplateParameter.Text( + order = 6, + name = "idAttribute", + optional = true, + description = + "The attribute on incoming Pub/Sub messages to use as a unique record identifier.", + helpText = + "The attribute on incoming Pub/Sub messages to use as a unique record identifier. When specified, the value of this attribute (which can be any string that uniquely identifies the record) will be used for deduplication of messages. If not provided, we cannot guarantee that no duplicate data will be delivered on the Pub/Sub stream. In this case, deduplication of the stream will be strictly best effort.", + example = "") + String getIdAttribute(); + + @TemplateParameter.Text( + order = 7, + name = "timestampAttribute", + optional = true, + description = "Message value to use as element timestamp.", + helpText = + "Message value to use as element timestamp. If None, uses message publishing time as the timestamp. Timestamp values should be in one of two formats: 1). A numerical value representing the number of milliseconds since the Unix epoch. 2). A string in RFC 3339 format, UTC timezone. Example: ``2015-10-29T23:41:41.123Z``. The sub-second component of the timestamp is optional, and digits beyond the first three (i.e., time units smaller than milliseconds) may be ignored.", + example = "") + String getTimestampAttribute(); + + @TemplateParameter.Text( + order = 8, + name = "errorHandling", + optional = true, + description = "Error handling configuration", + helpText = "This option specifies whether and where to output error rows.", + example = "") + String getErrorHandling(); + + @TemplateParameter.Text( + order = 9, + name = "subscription", + optional = true, + description = "Pub/Sub subscription", + helpText = "Pub/Sub subscription to read the input from.", + example = "projects/your-project-id/subscriptions/your-subscription-name") + String getSubscription(); + + @TemplateParameter.Text( + order = 10, name = "language", optional = false, description = "Language used to define the expressions.", @@ -86,7 +144,7 @@ public interface PubSubToBigTableYaml { String getLanguage(); @TemplateParameter.Text( - order = 5, + order = 11, name = "fields", optional = false, description = "Field mapping configuration", @@ -97,7 +155,7 @@ public interface PubSubToBigTableYaml { String getFields(); @TemplateParameter.Text( - order = 6, + order = 12, name = "projectId", optional = false, description = "BigTable project ID", @@ -107,7 +165,7 @@ public interface PubSubToBigTableYaml { String getProjectId(); @TemplateParameter.Text( - order = 7, + order = 13, name = "instanceId", optional = false, description = "BigTable instance ID", @@ -117,7 +175,7 @@ public interface PubSubToBigTableYaml { String getInstanceId(); @TemplateParameter.Text( - order = 8, + order = 14, name = "tableId", optional = false, description = "BigTable output table", @@ -127,7 +185,7 @@ public interface PubSubToBigTableYaml { String getTableId(); @TemplateParameter.Text( - order = 9, + order = 15, name = "windowing", optional = true, description = "Windowing options", @@ -137,7 +195,7 @@ public interface PubSubToBigTableYaml { String getWindowing(); @TemplateParameter.Text( - order = 10, + order = 16, name = "outputDeadLetterPubSubTopic", optional = false, description = "Pub/Sub transformation error topic", diff --git a/yaml/src/main/python/generate_yaml_java_templates.py b/yaml/src/main/python/generate_yaml_java_templates.py index 1e277d8634..71082760be 100644 --- a/yaml/src/main/python/generate_yaml_java_templates.py +++ b/yaml/src/main/python/generate_yaml_java_templates.py @@ -68,7 +68,7 @@ def run_mvn_spotless(): print("Error: 'mvn' command not found. Please ensure Maven is installed and in your PATH.") return e except subprocess.CalledProcessError as e: - print(f"Error running mvn spotless:apply: {e}", file=sys.stderr) + print(f"Error running mvn spotless:apply: {e.stdout}") return e def generate_java_interface(yaml_path, java_path): @@ -140,8 +140,8 @@ def generate_java_interface(yaml_path, java_path): java_type = JAVA_TYPE_BY_YAML_TYPE.get(param.get('type', 'text'), 'String') template_param_type = TEMPLATE_TYPE_BY_YAML_TYPE.get(param.get('type', 'text')) getter_name = "get" + param_name[0].upper() + param_name[1:] - wrapped_description = str(param.get('description', '')).strip() - wrapped_help_text = str(param.get('help', '')).strip() + description = str(param.get('description', '')).strip() + help_text = str(param.get('help', '')).strip() example = str(param.get('example', '')).strip().replace('"', '\\"') param_code = f""" @@ -149,8 +149,8 @@ def generate_java_interface(yaml_path, java_path): order = {i + 1}, name = "{param_name}", optional = {str(not param.get('required', False)).lower()}, - description = "{wrapped_description}", - helpText = "{wrapped_help_text}", + description = "{description}", + helpText = "{help_text}", example = "{example}" ) """ diff --git a/yaml/src/main/python/options/bigtable_options.yaml b/yaml/src/main/python/options/bigtable_options.yaml index 28a6ea7956..0e298e5748 100644 --- a/yaml/src/main/python/options/bigtable_options.yaml +++ b/yaml/src/main/python/options/bigtable_options.yaml @@ -22,6 +22,13 @@ options: - name: "bigtable_read_options" parameters: + - order: 1 + name: "flatten" + description: "BigTable flatten boolean" + help: "If set to false, output rows are nested; if true or omitted, output rows are flattened." + default: true + required: false + type: boolean - name: "bigtable_write_options" parameters: diff --git a/yaml/src/main/python/options/pubsub_options.yaml b/yaml/src/main/python/options/pubsub_options.yaml index 05d94a0267..807b506fdf 100644 --- a/yaml/src/main/python/options/pubsub_options.yaml +++ b/yaml/src/main/python/options/pubsub_options.yaml @@ -12,8 +12,7 @@ options: name: "format" description: "The message format." help: "The message format. One of: AVRO, JSON, PROTO, RAW, or STRING." - default: JSON - required: false + required: true type: text - order: 3 name: "schema" @@ -24,9 +23,71 @@ options: definition. required: true type: text + - order: 4 + name: "attributes" + description: "List of attribute keys." + help: > + List of attribute keys whose values will be flattened into the + output message as additional fields. For example, if the format is `raw` + and attributes is `[a, b]` then this read will produce elements of + the form `Row(payload=..., a=..., b=...)`. + required: false + type: text + - order: 5 + name: "attributesMap" + description: "Name of a field in which to store the full set of attributes." + help: > + Name of a field in which to store the full set of attributes + associated with this message. For example, if the format is `raw` and + `attribute_map` is set to `attrs` then this read will produce elements + of the form `Row(payload=..., attrs=...)` where `attrs` is a Map type + of string to string. + If both `attributes` and `attribute_map` are set, the overlapping + attribute values will be present in both the flattened structure and the + attribute map. + required: false + type: text + - order: 6 + name: "idAttribute" + description: "The attribute on incoming Pub/Sub messages to use as a unique record identifier." + help: > + The attribute on incoming Pub/Sub messages to use as a unique + record identifier. When specified, the value of this attribute (which + can be any string that uniquely identifies the record) will be used for + deduplication of messages. If not provided, we cannot guarantee + that no duplicate data will be delivered on the Pub/Sub stream. In this + case, deduplication of the stream will be strictly best effort. + required: false + type: text + - order: 7 + name: "timestampAttribute" + description: "Message value to use as element timestamp." + help: > + Message value to use as element timestamp. If None, uses message + publishing time as the timestamp. Timestamp values should be in one of two formats: + 1). A numerical value representing the number of milliseconds since the + Unix epoch. + 2). A string in RFC 3339 format, UTC timezone. Example: + ``2015-10-29T23:41:41.123Z``. The sub-second component of the + timestamp is optional, and digits beyond the first three (i.e., time + units smaller than milliseconds) may be ignored. + - order: 8 + name: "errorHandling" + description: "Error handling configuration" + help: "This option specifies whether and where to output error rows." + required: false + type: text + - name: "pubsub_read_options" parameters: + - order: 1 + name: "subscription" + description: "Pub/Sub subscription" + help: "Pub/Sub subscription to read the input from." + example: "projects/your-project-id/subscriptions/your-subscription-name" + required: false + type: text - name: "pubsub_write_options" parameters: diff --git a/yaml/src/main/yaml/PubSubToBigTable.yaml b/yaml/src/main/yaml/PubSubToBigTable.yaml index 76908d4998..6845014dae 100644 --- a/yaml/src/main/yaml/PubSubToBigTable.yaml +++ b/yaml/src/main/yaml/PubSubToBigTable.yaml @@ -32,6 +32,7 @@ template: parameters: - pubsub_common_options + - pubsub_read_options - maptofields_common_options - bigtable_common_options - windowinto_common_options @@ -52,8 +53,13 @@ pipeline: name: ReadMessages config: topic: {{ topic }} + subscription: {{ subscription }} format: {{ format }} schema: {{ schema }} + attributes: {{ attributes }} + attribute_map: {{ attributeMap }} + id_attribute: {{ idAttribute }} + timestamp_attribute: {{ timestampAttribute }} - type: MapToFields name: ConvertStringsToBytes input: ReadMessages