Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@
import com.google.cloud.teleport.metadata.Template;
import com.google.cloud.teleport.metadata.TemplateCategory;
import com.google.cloud.teleport.metadata.TemplateParameter;
import org.apache.beam.sdk.options.Default;
import org.apache.beam.sdk.options.Validation;

@Template(
Expand Down Expand Up @@ -56,11 +55,11 @@ public interface PubSubToBigTableYaml {
@TemplateParameter.Text(
order = 2,
name = "format",
optional = true,
optional = false,
description = "The message format.",
helpText = "The message format. One of: AVRO, JSON, PROTO, RAW, or STRING.",
example = "")
@Default.String("JSON")
@Validation.Required
String getFormat();

@TemplateParameter.Text(
Expand All @@ -76,6 +75,65 @@ public interface PubSubToBigTableYaml {

@TemplateParameter.Text(
order = 4,
name = "attributes",
optional = true,
description = "List of attribute keys.",
helpText =
"List of attribute keys whose values will be flattened into the output message as additional fields. For example, if the format is `raw` and attributes is `[a, b]` then this read will produce elements of the form `Row(payload=..., a=..., b=...)`.",
example = "")
String getAttributes();

@TemplateParameter.Text(
order = 5,
name = "attributesMap",
optional = true,
description = "Name of a field in which to store the full set of attributes.",
helpText =
"Name of a field in which to store the full set of attributes associated with this message. For example, if the format is `raw` and `attribute_map` is set to `attrs` then this read will produce elements of the form `Row(payload=..., attrs=...)` where `attrs` is a Map type of string to string. If both `attributes` and `attribute_map` are set, the overlapping attribute values will be present in both the flattened structure and the attribute map.",
example = "")
String getAttributesMap();

@TemplateParameter.Text(
order = 6,
name = "idAttribute",
optional = true,
description =
"The attribute on incoming Pub/Sub messages to use as a unique record identifier.",
helpText =
"The attribute on incoming Pub/Sub messages to use as a unique record identifier. When specified, the value of this attribute (which can be any string that uniquely identifies the record) will be used for deduplication of messages. If not provided, we cannot guarantee that no duplicate data will be delivered on the Pub/Sub stream. In this case, deduplication of the stream will be strictly best effort.",
example = "")
String getIdAttribute();

@TemplateParameter.Text(
order = 7,
name = "timestampAttribute",
optional = true,
description = "Message value to use as element timestamp.",
helpText =
"Message value to use as element timestamp. If None, uses message publishing time as the timestamp. Timestamp values should be in one of two formats: 1). A numerical value representing the number of milliseconds since the Unix epoch. 2). A string in RFC 3339 format, UTC timezone. Example: ``2015-10-29T23:41:41.123Z``. The sub-second component of the timestamp is optional, and digits beyond the first three (i.e., time units smaller than milliseconds) may be ignored.",
example = "")
String getTimestampAttribute();

@TemplateParameter.Text(
order = 8,
name = "errorHandling",
optional = true,
description = "Error handling configuration",
helpText = "This option specifies whether and where to output error rows.",
example = "")
String getErrorHandling();

@TemplateParameter.Text(
order = 9,
name = "subscription",
optional = true,
description = "Pub/Sub subscription",
helpText = "Pub/Sub subscription to read the input from.",
example = "projects/your-project-id/subscriptions/your-subscription-name")
String getSubscription();

@TemplateParameter.Text(
order = 10,
name = "language",
optional = false,
description = "Language used to define the expressions.",
Expand All @@ -86,7 +144,7 @@ public interface PubSubToBigTableYaml {
String getLanguage();

@TemplateParameter.Text(
order = 5,
order = 11,
name = "fields",
optional = false,
description = "Field mapping configuration",
Expand All @@ -97,7 +155,7 @@ public interface PubSubToBigTableYaml {
String getFields();

@TemplateParameter.Text(
order = 6,
order = 12,
name = "projectId",
optional = false,
description = "BigTable project ID",
Expand All @@ -107,7 +165,7 @@ public interface PubSubToBigTableYaml {
String getProjectId();

@TemplateParameter.Text(
order = 7,
order = 13,
name = "instanceId",
optional = false,
description = "BigTable instance ID",
Expand All @@ -117,7 +175,7 @@ public interface PubSubToBigTableYaml {
String getInstanceId();

@TemplateParameter.Text(
order = 8,
order = 14,
name = "tableId",
optional = false,
description = "BigTable output table",
Expand All @@ -127,7 +185,7 @@ public interface PubSubToBigTableYaml {
String getTableId();

@TemplateParameter.Text(
order = 9,
order = 15,
name = "windowing",
optional = true,
description = "Windowing options",
Expand All @@ -137,7 +195,7 @@ public interface PubSubToBigTableYaml {
String getWindowing();

@TemplateParameter.Text(
order = 10,
order = 16,
name = "outputDeadLetterPubSubTopic",
optional = false,
description = "Pub/Sub transformation error topic",
Expand Down
10 changes: 5 additions & 5 deletions yaml/src/main/python/generate_yaml_java_templates.py
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ def run_mvn_spotless():
print("Error: 'mvn' command not found. Please ensure Maven is installed and in your PATH.")
return e
except subprocess.CalledProcessError as e:
print(f"Error running mvn spotless:apply: {e}", file=sys.stderr)
print(f"Error running mvn spotless:apply: {e.stdout}")
return e

def generate_java_interface(yaml_path, java_path):
Expand Down Expand Up @@ -140,17 +140,17 @@ def generate_java_interface(yaml_path, java_path):
java_type = JAVA_TYPE_BY_YAML_TYPE.get(param.get('type', 'text'), 'String')
template_param_type = TEMPLATE_TYPE_BY_YAML_TYPE.get(param.get('type', 'text'))
getter_name = "get" + param_name[0].upper() + param_name[1:]
wrapped_description = str(param.get('description', '')).strip()
wrapped_help_text = str(param.get('help', '')).strip()
description = str(param.get('description', '')).strip()
help_text = str(param.get('help', '')).strip()
example = str(param.get('example', '')).strip().replace('"', '\\"')

param_code = f"""
@{template_param_type}(
order = {i + 1},
name = "{param_name}",
optional = {str(not param.get('required', False)).lower()},
description = "{wrapped_description}",
helpText = "{wrapped_help_text}",
description = "{description}",
helpText = "{help_text}",
example = "{example}"
)
"""
Expand Down
7 changes: 7 additions & 0 deletions yaml/src/main/python/options/bigtable_options.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,13 @@ options:

- name: "bigtable_read_options"
parameters:
- order: 1
name: "flatten"
description: "BigTable flatten boolean"
help: "If set to false, output rows are nested; if true or omitted, output rows are flattened."
default: true
required: false
type: boolean

- name: "bigtable_write_options"
parameters:
65 changes: 63 additions & 2 deletions yaml/src/main/python/options/pubsub_options.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,7 @@ options:
name: "format"
description: "The message format."
help: "The message format. One of: AVRO, JSON, PROTO, RAW, or STRING."
default: JSON
required: false
required: true
type: text
- order: 3
name: "schema"
Expand All @@ -24,9 +23,71 @@ options:
definition.
required: true
type: text
- order: 4
name: "attributes"
description: "List of attribute keys."
help: >
List of attribute keys whose values will be flattened into the
output message as additional fields. For example, if the format is `raw`
and attributes is `[a, b]` then this read will produce elements of
the form `Row(payload=..., a=..., b=...)`.
required: false
type: text
- order: 5
name: "attributesMap"
description: "Name of a field in which to store the full set of attributes."
help: >
Name of a field in which to store the full set of attributes
associated with this message. For example, if the format is `raw` and
`attribute_map` is set to `attrs` then this read will produce elements
of the form `Row(payload=..., attrs=...)` where `attrs` is a Map type
of string to string.
If both `attributes` and `attribute_map` are set, the overlapping
attribute values will be present in both the flattened structure and the
attribute map.
required: false
type: text
- order: 6
name: "idAttribute"
description: "The attribute on incoming Pub/Sub messages to use as a unique record identifier."
help: >
The attribute on incoming Pub/Sub messages to use as a unique
record identifier. When specified, the value of this attribute (which
can be any string that uniquely identifies the record) will be used for
deduplication of messages. If not provided, we cannot guarantee
that no duplicate data will be delivered on the Pub/Sub stream. In this
case, deduplication of the stream will be strictly best effort.
required: false
type: text
- order: 7
name: "timestampAttribute"
description: "Message value to use as element timestamp."
help: >
Message value to use as element timestamp. If None, uses message
publishing time as the timestamp. Timestamp values should be in one of two formats:
1). A numerical value representing the number of milliseconds since the
Unix epoch.
2). A string in RFC 3339 format, UTC timezone. Example:
``2015-10-29T23:41:41.123Z``. The sub-second component of the
timestamp is optional, and digits beyond the first three (i.e., time
units smaller than milliseconds) may be ignored.
- order: 8
name: "errorHandling"
description: "Error handling configuration"
help: "This option specifies whether and where to output error rows."
required: false
type: text


- name: "pubsub_read_options"
parameters:
- order: 1
name: "subscription"
description: "Pub/Sub subscription"
help: "Pub/Sub subscription to read the input from."
example: "projects/your-project-id/subscriptions/your-subscription-name"
required: false
type: text

- name: "pubsub_write_options"
parameters:
Expand Down
6 changes: 6 additions & 0 deletions yaml/src/main/yaml/PubSubToBigTable.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ template:

parameters:
- pubsub_common_options
- pubsub_read_options
- maptofields_common_options
- bigtable_common_options
- windowinto_common_options
Expand All @@ -52,8 +53,13 @@ pipeline:
name: ReadMessages
config:
topic: {{ topic }}
subscription: {{ subscription }}
format: {{ format }}
schema: {{ schema }}
attributes: {{ attributes }}
attribute_map: {{ attributeMap }}
id_attribute: {{ idAttribute }}
timestamp_attribute: {{ timestampAttribute }}
- type: MapToFields
name: ConvertStringsToBytes
input: ReadMessages
Expand Down
Loading