diff --git a/plugins/opentelemetry/plugin.yaml b/plugins/opentelemetry/plugin.yaml index a02df4f..adfa2c0 100644 --- a/plugins/opentelemetry/plugin.yaml +++ b/plugins/opentelemetry/plugin.yaml @@ -3,8 +3,8 @@ id: "open-telemetry" description: "A plugin for exporting Pixie data to any OpenTelemetry collector. Please make sure you fill out the custom export path to point to your OpenTelemetry collector." logo: > -version: "0.0.3" -updated: "2022-09-21" +version: "0.0.4" +updated: "2024-10-08" keywords: - otel - collector diff --git a/plugins/opentelemetry/retention.yaml b/plugins/opentelemetry/retention.yaml index 15a9246..d79d091 100644 --- a/plugins/opentelemetry/retention.yaml +++ b/plugins/opentelemetry/retention.yaml @@ -257,3 +257,966 @@ presetScripts: ), ) defaultFrequencyS: 10 + - name: "HTTP Spans" + description: "This script sends HTTP span events (distributed tracing) to the OTel plugin's endpoint." + script: | + #px:set max_output_rows_per_table=1500 + + import px + + def remove_ns_prefix(column): + return px.replace('[a-z0-9\-]*/', column, '') + + def add_source_dest_columns(df): + df.pod = df.ctx['pod'] + df.node = df.ctx['node'] + df.container = df.ctx['container'] + df.deployment = df.ctx['deployment'] + + # If remote_addr is a pod, get its name. If not, use IP address. + df.ra_pod = px.pod_id_to_pod_name(px.ip_to_pod_id(df.remote_addr)) + df.ra_name = px.select(df.ra_pod != '', df.ra_pod, px.nslookup(df.remote_addr)) + df.ra_node = px.pod_id_to_node_name(px.ip_to_pod_id(df.remote_addr)) + df.ra_deployment = px.pod_id_to_deployment_name(px.ip_to_pod_id(df.remote_addr)) + + df.is_server_tracing = df.trace_role == 2 + # Set client and server based on trace_role. + df.source_pod = px.select(df.is_server_tracing, df.ra_name, df.pod) + df.destination_pod = px.select(df.is_server_tracing, df.pod, df.ra_name) + df.destination_node = px.select(df.is_server_tracing, df.node, df.ra_node) + df.destination_deployment = px.select(df.is_server_tracing, df.deployment, df.ra_deployment) + + df.source_service = px.pod_name_to_service_name(df.source_pod) + df.destination_service = px.pod_name_to_service_name(df.destination_pod) + + df.destination_namespace = px.pod_name_to_namespace(df.destination_pod) + df.source_namespace = px.pod_name_to_namespace(df.source_pod) + df.source_node = px.pod_id_to_node_name(px.pod_name_to_pod_id(df.source_pod)) + df.source_deployment = px.pod_name_to_deployment_name(df.source_pod) + df.source_deployment = remove_ns_prefix(df.source_deployment) + df.source_service = remove_ns_prefix(df.source_service) + df.destination_service = remove_ns_prefix(df.destination_service) + df.source_container = px.select(df.is_server_tracing, '', df.container) + df.source_pod = remove_ns_prefix(df.source_pod) + df.destination_pod = remove_ns_prefix(df.destination_pod) + return df + + df = px.DataFrame('http_events', start_time=px.plugin.start_time, end_time=px.plugin.end_time) + df = add_source_dest_columns(df) + + df = df.head(15000) + df.start_time = df.time_ - df.latency + df.host = px.pluck(df.req_headers, 'Host') + df.req_url = df.host + df.req_path + df.user_agent = px.pluck(df.req_headers, 'User-Agent') + df.trace_id = px.pluck(df.req_headers, 'X-B3-TraceId') + df.span_id = px.pluck(df.req_headers, 'X-B3-SpanId') + df.parent_span_id = px.pluck(df.req_headers, 'X-B3-ParentSpanId') + + # Strip out all but the actual path value from req_path + df.req_path = px.uri_recompose('', '', '', 0, px.pluck(px.uri_parse(df.req_path), 'path'), '', '') + + # Replace any Hex IDS from the path with + df.req_path = px.replace('/[a-fA-F0-9\-:]{6,}(/?)', df.req_path, '/\\1') + + df.cluster_name = px.vizier_name() + df.cluster_id = px.vizier_id() + df.pixie = 'pixie' + + px.export( + df, px.otel.Data( + resource={ + # While other Pixie entities use `service.name=source_service`, + # the Services-OpenTelemetry entity is set up to only show clients so we use `service.name=destination_service`. + 'server.service.name': df.destination_service, + 'server.k8s.pod.name': df.destination_pod, + 'server.k8s.deployment.name': df.destination_deployment, + 'server.k8s.namespace.name': df.destination_namespace, + 'server.k8s.node.name': df.destination_node, + 'px.cluster.id': df.cluster_id, + 'k8s.cluster.name': df.cluster_name, + 'instrumentation.provider': df.pixie, + }, + data=[ + px.otel.trace.Span( + name=df.req_path, + start_time=df.start_time, + end_time=df.time_, + trace_id=df.trace_id, + span_id=df.span_id, + parent_span_id=df.parent_span_id, + kind=px.otel.trace.SPAN_KIND_SERVER, + attributes={ + # NOTE: the integration handles splitting of services. + 'client.service.name': df.source_service, + 'client.k8s.pod.name': df.source_pod, + 'client.k8s.deployment.name': df.source_deployment, + 'client.k8s.namespace.name': df.source_namespace, + 'client.k8s.node.name': df.source_node, + 'http.method': df.req_method, + 'http.url': df.req_url, + 'http.target': df.req_path, + 'http.host': df.host, + 'http.status_code': df.resp_status, + 'http.user_agent': df.user_agent, + 'http.req_headers': df.req_headers, + 'http.resp_headers': df.resp_headers, + }, + ), + ], + ), + ) + defaultFrequencyS: 10 + - name: "PostgreSQL Spans" + description: "This script sends PostgreSQL span events (distributed tracing) to the OTel plugin's endpoint." + script: | + #px:set max_output_rows_per_table=500 + import px + + def remove_ns_prefix(column): + return px.replace('[a-z0-9\-]*/', column, '') + + def add_source_dest_columns(df): + df.pod = df.ctx['pod'] + df.namespace = df.ctx['namespace'] + df.container = df.ctx['container'] + + # If remote_addr is a pod, get its name. If not, use IP address. + df.ra_pod = px.pod_id_to_pod_name(px.ip_to_pod_id(df.remote_addr)) + df.ra_name = px.select(df.ra_pod != '', df.ra_pod, px.nslookup(df.remote_addr)) + + df.is_server_tracing = df.trace_role == 2 + # Set client and server based on trace_role. + df.source_pod = px.select(df.is_server_tracing, df.ra_name, df.pod) + df.destination_pod = px.select(df.is_server_tracing, df.pod, df.ra_name) + + df.source_service = px.pod_name_to_service_name(df.source_pod) + df.source_service = px.select(df.source_service != '', df.source_service, df.source_pod) + df.destination_service = px.pod_name_to_service_name(df.destination_pod) + df.destination_service = px.select(df.destination_service != '', df.destination_service, df.destination_pod) + + df.destination_namespace = px.pod_name_to_namespace(df.destination_pod) + df.source_namespace = px.pod_name_to_namespace(df.source_pod) + df.source_service = px.Service(remove_ns_prefix(df.source_service)) + df.destination_service = px.Service(remove_ns_prefix(df.destination_service)) + df.source_container = px.select(df.is_server_tracing, '', df.container) + df.source_pod = remove_ns_prefix(df.source_pod) + df.destination_pod = remove_ns_prefix(df.destination_pod) + return df + + df = px.DataFrame('pgsql_events', start_time=px.plugin.start_time, end_time=px.plugin.end_time) + df = add_source_dest_columns(df) + df.normed_query_struct = px.normalize_pgsql(df.req, df.req_cmd) + df.query = px.pluck(df.normed_query_struct, 'query') + df = df[df.query != ""] + df.start_time = df.time_ - df.latency + df.cluster_name = px.vizier_name() + df.cluster_id = px.vizier_id() + df.pixie = 'pixie' + df.db_system = 'postgres' + + px.export( + df, px.otel.Data( + resource={ + 'service.name': df.source_service, + }, + data=[ + # Export client span format for the client side of the request + px.otel.trace.Span( + name=df.query, + start_time=df.start_time, + end_time=df.time_, + kind=px.otel.trace.SPAN_KIND_CLIENT, + attributes={ + 'k8s.container.name': df.source_container, + 'service.instance.id': df.source_pod, + 'k8s.pod.name': df.source_pod, + 'k8s.namespace.name': df.source_namespace, + 'px.cluster.id': df.cluster_id, + 'k8s.cluster.name': df.cluster_name, + 'instrumentation.provider': df.pixie, + 'db.system': df.db_system, + }, + ), + # Export server span format for the server (Postgres) side of the request + px.otel.trace.Span( + name=df.query, + start_time=df.start_time, + end_time=df.time_, + kind=px.otel.trace.SPAN_KIND_SERVER, + attributes={ + 'service.instance.id': df.source_pod, + 'k8s.namespace.name': df.source_namespace, + 'postgres.service.name': df.destination_service, + 'postgres.pod.name': df.destination_pod, + 'postgres.namespace.name': df.destination_namespace, + 'k8s.cluster.name': df.cluster_name, + 'px.cluster.id': df.cluster_id, + 'instrumentation.provider': df.pixie, + 'db.system': df.db_system, + 'postgres.req_cmd': df.req_cmd, + # Disabling sending df.req and df.resp for now until illegal characters are handled. + # 'postgres.req': df.req, + # 'postgres.resp': df.resp, + 'postgres.resp_latency': df.latency, + }, + ), + ], + ), + ) + defaultFrequencyS: 10 + - name: "MySQL Spans" + description: "This script generates span envents from queries to MySQL databases and sends them to the OTel plugin's endpoint." + script: | + #px:set max_output_rows_per_table=500 + + import px + + def remove_duplicate_traces(df): + ''' Removes duplicate traces. + For historical reasons, Pixie traces MySQL requests on both the client AND server side: + https://github.com/pixie-io/pixie/blob/5e5598ac46f39219148a36468b5318b1466a92d4/src/stirling/source_connectors/socket_tracer/conn_tracker.cc#L639 + ''' + # Keep client-side traces if server is outside the cluster (can't be resolved to pod or svc) + df.remote_pod_id = px.ip_to_pod_id(df.remote_addr) + df.remote_service_id = px.ip_to_service_id(df.remote_addr) + df.remote_outside_cluster = df.remote_pod_id == '' and df.remote_service_id == '' + df_client_traces = df[df.trace_role == 1 and df.remote_outside_cluster] + df_server_traces = df[df.trace_role == 2] + df_server_traces.append(df_client_traces) + return df_server_traces + + def remove_ns_prefix(column): + return px.replace('[a-z0-9\-]*/', column, '') + + def add_source_dest_columns(df): + df.pod = df.ctx['pod'] + df.namespace = df.ctx['namespace'] + df.container = df.ctx['container'] + + # If remote_addr is a pod, get its name. If not, use IP address. + df.ra_pod = px.pod_id_to_pod_name(px.ip_to_pod_id(df.remote_addr)) + df.ra_name = px.select(df.ra_pod != '', df.ra_pod, px.nslookup(df.remote_addr)) + + df.is_server_tracing = df.trace_role == 2 + # Set client and server based on trace_role. + df.source_pod = px.select(df.is_server_tracing, df.ra_name, df.pod) + df.destination_pod = px.select(df.is_server_tracing, df.pod, df.ra_name) + + df.source_service = px.pod_name_to_service_name(df.source_pod) + df.source_service = px.select(df.source_service != '', df.source_service, df.source_pod) + df.destination_service = px.pod_name_to_service_name(df.destination_pod) + df.destination_service = px.select(df.destination_service != '', df.destination_service, df.destination_pod) + + df.destination_namespace = px.pod_name_to_namespace(df.destination_pod) + df.source_namespace = px.pod_name_to_namespace(df.source_pod) + df.source_service = px.Service(remove_ns_prefix(df.source_service)) + df.destination_service = px.Service(remove_ns_prefix(df.destination_service)) + df.source_container = px.select(df.is_server_tracing, '', df.container) + df.source_pod = remove_ns_prefix(df.source_pod) + df.destination_pod = remove_ns_prefix(df.destination_pod) + return df + + df = px.DataFrame('mysql_events', start_time=px.plugin.start_time, end_time=px.plugin.end_time) + df = remove_duplicate_traces(df) + df = add_source_dest_columns(df) + df.normed_query_struct = px.normalize_mysql(df.req_body, df.req_cmd) + df.query = px.pluck(df.normed_query_struct, 'query') + df = df[df.query != ""] + df.start_time = df.time_ - df.latency + df.latency = df.latency / (1000 * 1000) + df.req_bytes = px.length(df.req_body) + df.resp_bytes = px.length(df.resp_body) + df.req_cmd = px.mysql_command_name(df.req_cmd) + df.cluster_name = px.vizier_name() + df.cluster_id = px.vizier_id() + df.pixie = 'pixie' + df.db_system = 'mysql' + + px.export( + df, px.otel.Data( + resource={ + 'service.name': df.source_service, + }, + data=[ + # Export client span format for the client side of the request + px.otel.trace.Span( + name=df.query, + start_time=df.start_time, + end_time=df.time_, + kind=px.otel.trace.SPAN_KIND_SERVER, + attributes={ + 'service.instance.id': df.source_pod, + 'k8s.namespace.name': df.source_namespace, + 'mysql.service.name': df.destination_service, + 'mysql.pod.name': df.destination_pod, + 'mysql.namespace.name': df.destination_namespace, + 'k8s.cluster.name': df.cluster_name, + 'px.cluster.id': df.cluster_id, + 'instrumentation.provider': df.pixie, + 'mysql.req_cmd': df.req_cmd, + 'mysql.req_body': df.req_body, + 'mysql.req_bytes': df.req_bytes, + 'mysql.resp_bytes': df.resp_bytes, + 'mysql.resp_latency': df.latency, + 'mysql.resp_status': df.resp_status, + 'mysql.resp_body': df.resp_body, + 'db.system': df.db_system, + }, + ), + # Export server span format for the server (MySQL) side of the request + px.otel.trace.Span( + name=df.query, + start_time=df.start_time, + end_time=df.time_, + kind=px.otel.trace.SPAN_KIND_CLIENT, + attributes={ + 'k8s.container.name': df.source_container, + 'service.instance.id': df.source_pod, + 'k8s.pod.name': df.source_pod, + 'k8s.namespace.name': df.source_namespace, + 'px.cluster.id': df.cluster_id, + 'k8s.cluster.name': df.cluster_name, + 'instrumentation.provider': df.pixie, + 'db.system': df.db_system, + }, + ), + ], + ), + ) + defaultFrequencyS: 10 + - name: "Kafka Spans" + description: "This script samples Kafka Spans and sends them to the OTel plugin's endpoint." + defaultDisabled: false + defaultFrequencyS: 10 + script: | + import px + + def unnest_topics_and_partitions(df, body_field: str): + ''' + Unnest the topics and partitions from a data frame. body_field is the target column to unnest, + usually 'req_body' or 'resp'. + ''' + # Get topic_name + df.topics = px.pluck(df[body_field], 'topics') + df = json_unnest_first5(df, 'topic', 'topics') + df = df[df.topic != ''] + df.topic_name = px.pluck(df.topic, 'name') + + # Get partition_idx + df.partitions = px.pluck(df.topic, 'partitions') + df = json_unnest_first5(df, 'partition', 'partitions') + df = df[df.partition != ''] + df.partition_idx = px.pluck(df.partition, 'index') + + # Get message_size + df.message_set = px.pluck(df.partition, 'message_set') + df.message_size = px.pluck(df.message_set, 'size') + df.message_size = px.atoi(df.message_size, 0) + return df + + + def json_unnest_first5(df, dest_col, src_col): + '''Unnest the first 5 values in a JSON array in the src_col, and put it in the + dest_col. + ''' + df0 = json_array_index(df, dest_col, src_col, 0) + df1 = json_array_index(df, dest_col, src_col, 1) + df2 = json_array_index(df, dest_col, src_col, 2) + df3 = json_array_index(df, dest_col, src_col, 3) + df4 = json_array_index(df, dest_col, src_col, 4) + df = df0.append(df1).append(df2).append(df3).append(df4) + return df + + + def json_array_index(df, dest_col, src_col, idx): + df[dest_col] = px.pluck_array(df[src_col], idx) + return df + + + def select_columns(df): + return df[[ + 'time_', 'upid', 'req_cmd', 'client_id', 'latency', + 'source_pod', 'destination_pod', 'source_service', 'destination_service', + 'source_namespace', 'destination_namespace', 'req_body', 'resp', + 'topic_name', 'partition_idx', 'partition', + 'message_size', 'error_code', + ]] + + + def get_produce_records(df): + ''' + Get all the produce records and filter by a specified topic. If topic is empty, all + produce records are retained. + ''' + # Produce requests have command 0. + producer_df = df[df.req_cmd == 0] + + producer_df = unnest_topics_and_partitions(producer_df, 'req_body') + producer_df.req_partition_idx = df.partition_idx + # Error code is always in the response. + producer_df = unnest_topics_and_partitions(producer_df, 'resp') + + producer_df.error_code = px.pluck(producer_df.partition, 'error_code') + producer_df = producer_df[producer_df.partition_idx == producer_df.req_partition_idx] + return select_columns(producer_df) + + + def get_fetch_records(df): + ''' + Get all the fetch records and filter by a specified topic. If topic is empty, all + fetch records are retained. + ''' + # Fetch requests have command 1. + consumer_df = df[df.req_cmd == 1] + + consumer_df = unnest_topics_and_partitions(consumer_df, 'resp') + consumer_df.error_code = px.pluck(consumer_df.partition, 'error_code') + return select_columns(consumer_df) + + + def get_remaining_records(df): + ''' + Get all the fetch records and filter by a specified topic. If topic is empty, all + fetch records are retained. + ''' + # Exclude Produce (cmd 0) and Fetch (cmd 1) commands. + df = df[df.req_cmd > 1] + df.topic_name = '' + df.partition_idx = '' + df.partition = '' + df.message_size = 0 + df.error_code = '0' + return select_columns(df) + + def remove_ns_prefix(column): + return px.replace('[a-z0-9\-]*/', column, '') + + def add_source_dest_columns(df): + df.pod = df.ctx['pod'] + df.namespace = df.ctx['namespace'] + + # If remote_addr is a pod, get its name. If not, use IP address. + df.ra_pod = px.pod_id_to_pod_name(px.ip_to_pod_id(df.remote_addr)) + df.ra_name = px.select(df.ra_pod != '', df.ra_pod, px.nslookup(df.remote_addr)) + + df.is_server_tracing = df.trace_role == 2 + # Set client and server based on trace_role. + df.source_pod = px.select(df.is_server_tracing, df.ra_name, df.pod) + df.destination_pod = px.select(df.is_server_tracing, df.pod, df.ra_name) + + df.source_service = px.pod_name_to_service_name(df.source_pod) + df.source_service = px.select(df.source_service != '', df.source_service, df.source_pod) + df.destination_service = px.pod_name_to_service_name(df.destination_pod) + df.destination_service = px.select(df.destination_service != '', df.destination_service, df.destination_pod) + + df.destination_namespace = px.pod_name_to_namespace(df.destination_pod) + df.source_namespace = px.pod_name_to_namespace(df.source_pod) + df.source_service = px.Service(remove_ns_prefix(df.source_service)) + df.destination_service = px.Service(remove_ns_prefix(df.destination_service)) + df.source_pod = remove_ns_prefix(df.source_pod) + df.destination_pod = remove_ns_prefix(df.destination_pod) + return df + + df = px.DataFrame(table='kafka_events.beta', start_time=px.plugin.start_time, end_time=px.plugin.end_time) + df.namespace = df.ctx['namespace'] + + df = add_source_dest_columns(df) + producer_df = get_produce_records(df) + consumer_df = get_fetch_records(df) + remaining_df = get_remaining_records(df) + df = producer_df.append([consumer_df, remaining_df]) + + # Convert latency from ns units to ms units. + # Kafka/NATS/AMQP measure latency differently than the rest of the Pixie protocols. + df.start_time = df.time_ + df.end_time = df.time_ + df.latency + df.latency = df.latency / (1000.0 * 1000.0) + + # Get throughput by adding size of message_sets. Note that this is the total size of the + # message batches, not the total number of bytes sent or received. + df.has_error = df.error_code != 'kNone' and df.error_code != '0' + df.req_cmd = px.kafka_api_key_name(df.req_cmd) + df.pixie = "pixie" + df.cluster_id = px.vizier_id() + df.cluster_name = px.vizier_name() + df.span_name = df.req_cmd + '/' + df.topic_name + + # Restrict number of results. + df = df.head(1500) + px.export( + df, + px.otel.Data( + resource={ + 'service.name': df.source_service, + 'service.instance.id': df.source_pod, + 'k8s.pod.name': df.source_pod, + 'k8s.namespace.name': df.source_namespace, + 'k8s.cluster.name': df.cluster_name, + 'px.cluster.id': df.cluster_id, + 'instrumentation.provider': df.pixie, + 'kafka.service.name': df.destination_service, + 'kafka.broker.pod': df.destination_pod, + 'kafka.namespace.name': df.destination_namespace, + }, + data=[ + px.otel.trace.Span( + name=df.span_name, + start_time=df.start_time, + end_time=df.end_time, + kind=px.otel.trace.SPAN_KIND_SERVER, + attributes={ + "kafka.client_id": df.client_id, + "kafka.has_error": df.has_error, + "kafka.message_size": df.message_size, + "kafka.partition_idx": df.partition_idx, + "kafka.partition": df.partition, + "kafka.req_body": df.req_body, + "kafka.req_cmd": df.req_cmd, + "kafka.resp": df.resp, + "kafka.topic": df.topic_name, + }, + ) + ], + ), + ) + - name: Redis Spans + description: This script generates OpenTelemetry spans for Redis commands. + defaultDisabled: false + defaultFrequencyS: 10 + script: | + #px:set max_output_rows_per_table=500 + import px + + def remove_ns_prefix(column): + return px.replace('[a-z0-9\-]*/', column, '') + + def add_source_dest_columns(df): + df.pod = df.ctx['pod'] + df.node = df.ctx['node'] + df.deployment = df.ctx['deployment'] + df.namespace = df.ctx['namespace'] + + # If remote_addr is a pod, get its name. If not, use IP address. + df.ra_pod = px.pod_id_to_pod_name(px.ip_to_pod_id(df.remote_addr)) + df.ra_name = px.select(df.ra_pod != '', df.ra_pod, px.nslookup(df.remote_addr)) + df.ra_node = px.pod_id_to_node_name(px.ip_to_pod_id(df.remote_addr)) + df.ra_deployment = px.pod_id_to_deployment_name(px.ip_to_pod_id(df.remote_addr)) + + df.is_server_tracing = df.trace_role == 2 + # Set client and server based on trace_role. + df.source_pod = px.select(df.is_server_tracing, df.ra_name, df.pod) + df.destination_pod = px.select(df.is_server_tracing, df.pod, df.ra_name) + df.destination_node = px.select(df.is_server_tracing, df.node, df.ra_node) + df.destination_deployment = px.select(df.is_server_tracing, df.deployment, df.ra_deployment) + + df.source_service = px.pod_name_to_service_name(df.source_pod) + df.source_service = px.select(df.source_service != '', df.source_service, df.source_pod) + df.destination_service = px.pod_name_to_service_name(df.destination_pod) + df.destination_service = px.select(df.destination_service != '', df.destination_service, df.destination_pod) + + df.destination_namespace = px.pod_name_to_namespace(df.destination_pod) + df.source_namespace = px.pod_name_to_namespace(df.source_pod) + df.source_node = px.pod_id_to_node_name(px.pod_name_to_pod_id(df.source_pod)) + df.source_deployment = remove_ns_prefix(px.pod_name_to_deployment_name(df.source_pod)) + df.source_service = px.Service(remove_ns_prefix(df.source_service)) + df.destination_service = px.Service(remove_ns_prefix(df.destination_service)) + df.source_pod = remove_ns_prefix(df.source_pod) + df.destination_pod = remove_ns_prefix(df.destination_pod) + df.destination_deployment = remove_ns_prefix(df.destination_deployment) + return df + + + df = px.DataFrame('redis_events', start_time=px.plugin.start_time, end_time=px.plugin.end_time) + df = add_source_dest_columns(df) + + df.start_time = df.time_ - df.latency + df.latency = df.latency / (1000 * 1000) + df.req_bytes = px.length(df.req_args) + df.resp_bytes = px.length(df.resp) + df.pixie = 'pixie' + df.cluster_id = px.vizier_id() + df.cluster_name = px.vizier_name() + df.db_system = 'redis' + df.has_error = px.substring(df.resp, 0, 1) == '-' + + px.export(df, px.otel.Data( + resource={ + 'service.name': df.source_service, + 'service.instance.id': df.source_pod, + 'k8s.deployment.name': df.source_deployment, + 'k8s.namespace.name': df.source_namespace, + 'k8s.node.name': df.source_node, + 'k8s.cluster.name': df.cluster_name, + 'px.cluster.id': df.cluster_id, + 'instrumentation.provider': df.pixie, + 'db.system': df.db_system, + }, + data=[ + px.otel.trace.Span( + name=df.req_cmd, + start_time=df.start_time, + end_time=df.time_, + kind=px.otel.trace.SPAN_KIND_SERVER, + attributes={ + 'redis.service.name': df.destination_service, + 'redis.pod.name': df.destination_pod, + 'redis.node.name': df.destination_node, + 'redis.deployment.name': df.destination_deployment, + 'redis.namespace.name': df.destination_namespace, + 'redis.req_cmd': df.req_cmd, + 'redis.req_bytes': df.req_bytes, + 'redis.resp_bytes': df.resp_bytes, + 'redis.resp_latency': df.latency, + 'redis.req_args': df.req_args, + 'redis.resp': df.resp, + 'redis.has_error': df.has_error, + }, + ), + px.otel.trace.Span( + name=df.req_cmd, + start_time=df.start_time, + end_time=df.time_, + kind=px.otel.trace.SPAN_KIND_CLIENT, + attributes={ + 'redis.pod.name': df.destination_pod, + 'redis.node.name': df.destination_node, + 'redis.deployment.name': df.destination_deployment, + 'redis.req_cmd': df.req_cmd, + 'redis.req_bytes': df.req_bytes, + 'redis.resp_bytes': df.resp_bytes, + 'redis.resp_latency': df.latency, + 'redis.req_args': df.req_args, + 'redis.resp': df.resp, + 'redis.has_error': df.has_error, + }, + ) + ] + )) + - name: "DNS Spans" + description: "Exports a sample of DNS Spans from Pixie data" + defaultDisabled: false + defaultFrequencyS: 10 + script: | + #px:set max_output_rows_per_table=500 + import px + + df = px.DataFrame('dns_events', start_time=px.plugin.start_time, end_time=px.plugin.end_time) + df = df[df.trace_role == 1] + df = df.drop(['trace_role']) + + # Add context. + df.pod = df.ctx['pod'] + df.service = df.ctx['service'] + df.namespace = df.ctx['namespace'] + df.node = df.ctx['node'] + df.deployment = df.ctx['deployment'] + df.container = df.ctx['container'] + df = df.drop(['upid']) + + # Extract some fields into their own columns for convenience. + df.queries = px.pluck(df.req_body, 'queries') + df.answers = px.pluck(df.resp_body, 'answers') + df.rcode = px.pluck_int64(df.resp_header, 'rcode') + df.rcode_name = px.dns_rcode_name(df.rcode) + df.resolved = px.contains(df.answers, 'name') + + df.query = px.replace('.*"name":"(.*?)".*', df.queries, '\\1') + df.query_type = px.replace('.*"type":"(.*?)".*', df.queries, '\\1') + + # Convert DNS IP to string service name + df.dns_server = px.service_id_to_service_name(px.ip_to_service_id(df.remote_addr)) + # If the dns service is missing then try by pod + df.dns_server = px.select( + df.dns_server != '', + df.dns_server, + px.pod_id_to_pod_name(px.ip_to_pod_id(df.remote_addr)), + ) + # If the dns service is missing then try the nslookup + df.dns_server = px.select( + df.dns_server != '', + df.dns_server, + px.nslookup(df.remote_addr), + ) + # If the dns service is still missing then set the remote_addr + df.dns_server = px.select( + df.dns_server != '', + df.dns_server, + df.remote_addr, + ) + df.namespace = px.pod_name_to_namespace(df.pod) + + def remove_ns_prefix(column): + return px.replace('[a-z0-9\-]*/', column, '') + + df.pod = remove_ns_prefix(df.pod) + df.deployment = remove_ns_prefix(df.deployment) + df.service = px.Service(remove_ns_prefix(df.service)) + + df.start_time = df.time_ - df.latency + # Before aggregating, output individual requests to drawer. + # Convert latency from ns units to ms units. + df.latency = df.latency / (1000 * 1000) + + df.pixie = 'pixie' + df.cluster_id = px.vizier_id() + df.cluster_name = px.vizier_name() + df.dns_server_name = remove_ns_prefix(df.dns_server) + df.dns_server_namespace = px.pod_name_to_namespace(df.dns_server) + # Choose the current namespace if the server name is localhost + df.dns_server_namespace = px.select(df.dns_server == 'localhost', df.namespace, df.dns_server_namespace) + df.dns_cluster_id = px.select(df.dns_server_namespace == '', '', px.vizier_id()) + + px.export(df, px.otel.Data( + resource={ + 'service.name': df.service, + 'service.instance.id': df.pod, + 'k8s.pod.name': df.pod, + 'k8s.deployment.name': df.deployment, + 'k8s.node.name': df.node, + 'k8s.namespace.name': df.namespace, + 'k8s.container.name': df.container, + 'k8s.cluster.name': df.cluster_name, + 'px.cluster.id': df.cluster_id, + 'instrumentation.provider': df.pixie, + }, + data=[ + px.otel.trace.Span( + name='dns', + start_time=df.start_time, + end_time=df.time_, + kind=px.otel.trace.SPAN_KIND_SERVER, + attributes={ + 'dns.latency': df.latency, + 'dns.req_body': df.req_body, + 'dns.resp_body': df.resp_body, + 'dns.server.name': df.dns_server_name, + 'dns.server.namespace': df.dns_server_namespace, + 'dns.server.cluster_id': df.dns_cluster_id, + 'dns.query': df.query, + 'dns.rcode': df.rcode, + 'dns.rcode_name': df.rcode_name, + 'dns.resolved': df.resolved, + 'dns.query_type': df.query_type, + }, + ) + ] + )) + - name: Cassandra Spans + description: This script generates OpenTelemetry spans for Cassandra span events. + defaultDisabled: false + defaultFrequencyS: 10 + script: | + #px:set max_output_rows_per_table=500 + import px + + def remove_ns_prefix(column): + return px.replace('[a-z0-9\-]*/', column, '') + + def add_source_dest_columns(df): + df.pod = df.ctx['pod'] + df.namespace = df.ctx['namespace'] + + # If remote_addr is a pod, get its name. If not, use IP address. + df.ra_pod = px.pod_id_to_pod_name(px.ip_to_pod_id(df.remote_addr)) + df.ra_name = px.select(df.ra_pod != '', df.ra_pod, px.nslookup(df.remote_addr)) + + df.is_server_tracing = df.trace_role == 2 + # Set client and server based on trace_role. + df.source_pod = px.select(df.is_server_tracing, df.ra_name, df.pod) + df.destination_pod = px.select(df.is_server_tracing, df.pod, df.ra_name) + + df.source_service = px.pod_name_to_service_name(df.source_pod) + df.source_service = px.select(df.source_service != '', df.source_service, df.source_pod) + df.destination_service = px.pod_name_to_service_name(df.destination_pod) + df.destination_service = px.select(df.destination_service != '', df.destination_service, df.destination_pod) + + df.destination_namespace = px.pod_name_to_namespace(df.destination_pod) + df.source_namespace = px.pod_name_to_namespace(df.source_pod) + df.source_service = px.Service(remove_ns_prefix(df.source_service)) + df.destination_service = px.Service(remove_ns_prefix(df.destination_service)) + df.source_pod = remove_ns_prefix(df.source_pod) + df.destination_pod = remove_ns_prefix(df.destination_pod) + return df + + df = px.DataFrame('cql_events', start_time=px.plugin.start_time, end_time=px.plugin.end_time) + df = add_source_dest_columns(df) + + df.start_time = df.time_ - df.latency + df.latency = df.latency / (1000 * 1000) + df.req_bytes = px.length(df.req_body) + df.resp_bytes = px.length(df.resp_body) + df.req_cmd = px.cql_opcode_name(df.req_op) + + # Work-around for a missing normalize_cql function. + df.query_struct = px.normalize_pgsql(df.req_body, df.req_cmd) + df.query = px.select(px.pluck(df.query_struct, 'error') == '', px.pluck(df.query_struct, 'query'), df.req_cmd) + + df.pixie = 'pixie' + df.cluster_id = px.vizier_id() + df.cluster_name = px.vizier_name() + df.db_system = 'cassandra' + + px.export( + df, px.otel.Data( + resource={ + 'service.name': df.source_service, + 'service.instance.id': df.source_pod, + 'k8s.namespace.name': df.source_namespace, + 'k8s.cluster.name': df.cluster_name, + 'px.cluster.id': df.cluster_id, + 'instrumentation.provider': df.pixie, + 'db.system': df.db_system, + }, + data=[ + px.otel.trace.Span( + name=df.req_cmd, + start_time=df.start_time, + end_time=df.time_, + kind=px.otel.trace.SPAN_KIND_SERVER, + attributes={ + 'cassandra.service.name': df.destination_service, + 'cassandra.pod.name': df.destination_pod, + 'cassandra.namespace.name': df.destination_namespace, + 'cassandra.req_cmd': df.req_cmd, + 'cassandra.query': df.query, + 'cassandra.req_body': df.req_body, + 'cassandra.req_bytes': df.req_bytes, + 'cassandra.resp_cmd': df.resp_op, + 'cassandra.resp_body': df.resp_body, + 'cassandra.resp_bytes': df.resp_bytes, + 'cassandra.resp_latency': df.latency, + } + ), + px.otel.trace.Span( + name=df.query, + start_time=df.start_time, + end_time=df.time_, + kind=px.otel.trace.SPAN_KIND_CLIENT, + ), + ], + ), + ) + - name: AMQP Spans + defaultDisabled: false + defaultFrequencyS: 10 + script: |- + #px:set max_output_rows_per_table=500 + import px + + + def remove_ns_prefix(column): + return px.replace('[a-z0-9\-]*/', column, '') + + + def add_source_dest_columns(df): + df.pod = df.ctx['pod'] + df.node = df.ctx['node'] + df.deployment = df.ctx['deployment'] + df.namespace = df.ctx['namespace'] + + # If remote_addr is a pod, get its name. If not, use IP address. + df.ra_pod = px.pod_id_to_pod_name(px.ip_to_pod_id(df.remote_addr)) + df.is_ra_pod = df.ra_pod != '' + df.ra_name = px.select(df.is_ra_pod, df.ra_pod, df.remote_addr) + df.ra_node = px.pod_id_to_node_name(px.ip_to_pod_id(df.remote_addr)) + df.ra_deployment = px.pod_id_to_deployment_name(px.ip_to_pod_id(df.remote_addr)) + + df.is_server_tracing = df.trace_role == 2 + df.is_source_pod_type = px.select(df.is_server_tracing, df.is_ra_pod, True) + df.is_dest_pod_type = px.select(df.is_server_tracing, True, df.is_ra_pod) + + # Set client and server based on trace_role. + df.source_pod = px.select(df.is_server_tracing, df.ra_name, df.pod) + df.destination_pod = px.select(df.is_server_tracing, df.pod, df.ra_name) + df.destination_node = px.select(df.is_server_tracing, df.node, df.ra_node) + df.destination_deployment = px.select(df.is_server_tracing, df.deployment, df.ra_deployment) + + df = df.drop(['ra_pod', 'is_ra_pod', 'ra_name', 'is_server_tracing']) + df.source_service = px.pod_name_to_service_name(df.source_pod) + df.destination_service = px.pod_name_to_service_name(df.destination_pod) + df.destination_namespace = px.pod_name_to_namespace(df.destination_pod) + df.source_namespace = px.pod_name_to_namespace(df.source_pod) + df.source_node = px.pod_id_to_node_name(px.pod_name_to_pod_id(df.source_pod)) + df.source_deployment = px.pod_name_to_deployment_name(df.source_pod) + + df.source_deployment = remove_ns_prefix(df.source_deployment) + df.source_service = px.Service(remove_ns_prefix(df.source_service)) + df.destination_service = px.Service(remove_ns_prefix(df.destination_service)) + df.source_pod = remove_ns_prefix(df.source_pod) + df.destination_pod = remove_ns_prefix(df.destination_pod) + df.destination_deployment = remove_ns_prefix(df.destination_deployment) + + # If the destination service is missing then try the nslookup + df.destination_service = px.select( + df.destination_service != '', + df.destination_service, + px.nslookup(df.remote_addr), + ) + # If the destination service is still missing then set the remote_addr + df.destination_service = px.select( + df.destination_service != '', + df.destination_service, + df.remote_addr, + ) + return df + + + df = px.DataFrame(table='amqp_events', start_time=px.plugin.start_time, end_time=px.plugin.end_time) + df = add_source_dest_columns(df) + + df.frame_name = px.amqp_frame_type_name(df.frame_type) + df.req_name = px.amqp_method_name(df.req_class_id, df.req_method_id) + df.resp_name = px.amqp_method_name(df.resp_class_id, df.resp_method_id) + df.req_name = px.select(df.req_name != 'Unknown', df.req_name, '') + df.resp_name = px.select(df.resp_name != 'Unknown', df.resp_name, '') + df.req_bytes = px.pluck_int64(df.req_msg, 'body_size') + df.resp_bytes = px.pluck_int64(df.resp_msg, 'body_size') + df.req_routing_key = px.pluck(df.req_msg, 'routing_key') + df.resp_routing_key = px.pluck(df.resp_msg, 'routing_key') + df.routing_key = px.select(df.req_routing_key != '', df.req_routing_key, df.resp_routing_key) + df.consumer_tag = px.pluck(df.resp_msg, "consumer_tag") + # We filter out the Content Body and Heartbeat frames from spans because they don't contain any useful information. + df = df[df.frame_name != "Content Body" and df.frame_name != "Heartbeat"] + + df.pixie = 'pixie' + df.cluster_id = px.vizier_id() + df.cluster_name = px.vizier_name() + + df.span_name = df.frame_name + px.select(df.req_name != '', '/' + df.req_name, '') + px.select(df.resp_name != '', '/' + df.resp_name, '') + + px.export( + df, px.otel.Data( + resource={ + 'service.name': df.source_service, + 'service.instance.id': df.source_pod, + 'k8s.deployment.name': df.source_deployment, + 'k8s.namespace.name': df.source_namespace, + 'k8s.node.name': df.source_node, + 'amqp.service.name': df.destination_service, + 'amqp.pod.name': df.destination_pod, + 'amqp.node.name': df.destination_node, + 'amqp.deployment.name': df.destination_deployment, + 'amqp.namespace.name': df.destination_namespace, + 'amqp.frame_name': df.frame_name, + 'amqp.req_name': df.req_name, + 'amqp.resp_name': df.resp_name, + 'amqp.req_body': df.req_msg, + 'amqp.resp_body': df.resp_msg, + 'amqp.routing_key': df.routing_key, + 'amqp.consumer_tag': df.consumer_tag, + 'k8s.cluster.name': df.cluster_name, + 'px.cluster.id': df.cluster_id, + 'instrumentation.provider': df.pixie, + }, + data=[ + px.otel.trace.Span( + name=df.span_name, + # We don't record the start of the span, so we use the end time as the start time. + start_time=df.time_, + end_time=df.time_, + kind=px.otel.trace.SPAN_KIND_CLIENT, + ), + ], + ), + )