Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
25 changes: 23 additions & 2 deletions aws_lambda_powertools/utilities/batch/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,9 @@
from aws_lambda_powertools.utilities.data_classes.dynamo_db_stream_event import (
DynamoDBRecord,
)
from aws_lambda_powertools.utilities.data_classes.kafka_event import (
KafkaEventRecord,
)
from aws_lambda_powertools.utilities.data_classes.kinesis_stream_event import (
KinesisStreamRecord,
)
Expand All @@ -46,12 +49,13 @@ class EventType(Enum):
SQS = "SQS"
KinesisDataStreams = "KinesisDataStreams"
DynamoDBStreams = "DynamoDBStreams"
Kafka = "Kafka"


# When using processor with default arguments, records will carry EventSourceDataClassTypes
# and depending on what EventType it's passed it'll correctly map to the right record
# When using Pydantic Models, it'll accept any subclass from SQS, DynamoDB and Kinesis
EventSourceDataClassTypes = Union[SQSRecord, KinesisStreamRecord, DynamoDBRecord]
# When using Pydantic Models, it'll accept any subclass from SQS, DynamoDB, Kinesis and Kafka
EventSourceDataClassTypes = Union[SQSRecord, KinesisStreamRecord, DynamoDBRecord, KafkaEventRecord]
BatchEventTypes = Union[EventSourceDataClassTypes, BatchTypeModels]
SuccessResponse = Tuple[str, Any, BatchEventTypes]
FailureResponse = Tuple[str, str, BatchEventTypes]
Expand Down Expand Up @@ -272,11 +276,13 @@ def __init__(
EventType.SQS: self._collect_sqs_failures,
EventType.KinesisDataStreams: self._collect_kinesis_failures,
EventType.DynamoDBStreams: self._collect_dynamodb_failures,
EventType.Kafka: self._collect_kafka_failures,
}
self._DATA_CLASS_MAPPING = {
EventType.SQS: SQSRecord,
EventType.KinesisDataStreams: KinesisStreamRecord,
EventType.DynamoDBStreams: DynamoDBRecord,
EventType.Kafka: KafkaEventRecord,
}

super().__init__()
Expand Down Expand Up @@ -365,6 +371,21 @@ def _collect_dynamodb_failures(self):
failures.append({"itemIdentifier": msg_id})
return failures

def _collect_kafka_failures(self):
failures = []
for msg in self.fail_messages:
# Kafka uses a composite identifier with topic-partition and offset
# Both data class and Pydantic model use the same field names
failures.append(
{
"itemIdentifier": {
"topic-partition": f"{msg.topic}-{msg.partition}",
"offset": msg.offset,
},
},
)
return failures

@overload
def _to_batch_type(
self,
Expand Down
55 changes: 43 additions & 12 deletions aws_lambda_powertools/utilities/batch/decorators.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,47 @@
from aws_lambda_powertools.utilities.typing import LambdaContext


def _get_records_from_event(
event: dict[str, Any],
processor: BasePartialBatchProcessor,
) -> list[dict]:
"""
Extract records from the event based on the processor's event type.

For SQS, Kinesis, and DynamoDB: Records are in event["Records"] as a list
For Kafka: Records are in event["records"] as a dict with topic-partition keys

Parameters
----------
event: dict
Lambda's original event
processor: BasePartialBatchProcessor
Batch Processor to determine event type

Returns
-------
records: list[dict]
Flattened list of records to process
"""
# Kafka events use lowercase "records" and have a nested dict structure
if processor.event_type == EventType.Kafka:
kafka_records = event.get("records", {})
if not kafka_records or not isinstance(kafka_records, dict):
raise UnexpectedBatchTypeError(
"Invalid Kafka event structure. Expected 'records' to be a non-empty dict with topic-partition keys.",
)
# Flatten the nested dict: {"topic-0": [r1, r2], "topic-1": [r3]} -> [r1, r2, r3]
return [record for topic_records in kafka_records.values() for record in topic_records]

# SQS, Kinesis, DynamoDB use uppercase "Records" as a list
records = event.get("Records", [])
if not records or not isinstance(records, list):
raise UnexpectedBatchTypeError(
"Unexpected batch event type. Possible values are: SQS, KinesisDataStreams, DynamoDBStreams, Kafka",
)
return records


@lambda_handler_decorator
@deprecated(
"`async_batch_processor` decorator is deprecated; use `async_process_partial_response` function instead.",
Expand Down Expand Up @@ -206,12 +247,7 @@ def handler(event, context):
* Async batch processors. Use `async_process_partial_response` instead.
"""
try:
records: list[dict] = event.get("Records", [])
if not records or not isinstance(records, list):
raise UnexpectedBatchTypeError(
"Unexpected batch event type. Possible values are: SQS, KinesisDataStreams, DynamoDBStreams",
)

records = _get_records_from_event(event, processor)
except AttributeError:
event_types = ", ".join(list(EventType.__members__))
docs = "https://docs.powertools.aws.dev/lambda/python/latest/utilities/batch/#processing-messages-from-sqs" # noqa: E501 # long-line
Expand Down Expand Up @@ -275,12 +311,7 @@ def handler(event, context):
* Sync batch processors. Use `process_partial_response` instead.
"""
try:
records: list[dict] = event.get("Records", [])
if not records or not isinstance(records, list):
raise UnexpectedBatchTypeError(
"Unexpected batch event type. Possible values are: SQS, KinesisDataStreams, DynamoDBStreams",
)

records = _get_records_from_event(event, processor)
except AttributeError:
event_types = ", ".join(list(EventType.__members__))
docs = "https://docs.powertools.aws.dev/lambda/python/latest/utilities/batch/#processing-messages-from-sqs" # noqa: E501 # long-line
Expand Down
24 changes: 22 additions & 2 deletions aws_lambda_powertools/utilities/batch/types.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,18 +12,38 @@
from aws_lambda_powertools.utilities.parser.models import (
KinesisDataStreamRecord as KinesisDataStreamRecordModel,
)
from aws_lambda_powertools.utilities.parser.models.kafka import KafkaRecordModel

BatchTypeModels = Optional[
Union[Type[SqsRecordModel], Type[DynamoDBStreamRecordModel], Type[KinesisDataStreamRecordModel]]
Union[
Type[SqsRecordModel],
Type[DynamoDBStreamRecordModel],
Type[KinesisDataStreamRecordModel],
Type[KafkaRecordModel],
]
]
BatchSqsTypeModel = Optional[Type[SqsRecordModel]]
else: # pragma: no cover
BatchTypeModels = "BatchTypeModels" # type: ignore
BatchSqsTypeModel = "BatchSqsTypeModel" # type: ignore


class KafkaItemIdentifier(TypedDict):
"""Kafka uses a composite identifier with topic-partition and offset."""

topic_partition: str # Maps to "topic-partition" in the actual response
offset: int


class PartialItemFailures(TypedDict):
itemIdentifier: str
"""
Represents a partial item failure response.

For SQS, Kinesis, and DynamoDB: itemIdentifier is a string (message_id or sequence_number)
For Kafka: itemIdentifier is a KafkaItemIdentifier dict with topic-partition and offset
"""

itemIdentifier: str | KafkaItemIdentifier


class PartialItemFailureResponse(TypedDict):
Expand Down
53 changes: 49 additions & 4 deletions docs/utilities/batch.md
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,12 @@ title: Batch Processing
description: Utility
---

The batch processing utility handles partial failures when processing batches from Amazon SQS, Amazon Kinesis Data Streams, and Amazon DynamoDB Streams.
The batch processing utility handles partial failures when processing batches from Amazon SQS, Amazon Kinesis Data Streams, Amazon DynamoDB Streams, and Amazon MSK/self-managed Apache Kafka.

```mermaid
stateDiagram-v2
direction LR
BatchSource: Amazon SQS <br/><br/> Amazon Kinesis Data Streams <br/><br/> Amazon DynamoDB Streams <br/><br/>
BatchSource: Amazon SQS <br/><br/> Amazon Kinesis Data Streams <br/><br/> Amazon DynamoDB Streams <br/><br/> Amazon MSK / Apache Kafka <br/><br/>
LambdaInit: Lambda invocation
BatchProcessor: Batch Processor
RecordHandler: Record Handler function
Expand Down Expand Up @@ -38,7 +38,7 @@ stateDiagram-v2

## Background

When using SQS, Kinesis Data Streams, or DynamoDB Streams as a Lambda event source, your Lambda functions are triggered with a batch of messages.
When using SQS, Kinesis Data Streams, DynamoDB Streams, or Amazon MSK/Apache Kafka as a Lambda event source, your Lambda functions are triggered with a batch of messages.

If your function fails to process any message from the batch, the entire batch returns to your queue or stream. This same batch is then retried until either condition happens first: **a)** your Lambda function returns a successful response, **b)** record reaches maximum retry attempts, or **c)** records expire.

Expand All @@ -55,13 +55,14 @@ This behavior changes when you enable Report Batch Item Failures feature in your
<!-- markdownlint-disable MD013 -->
* [**SQS queues**](#sqs-standard). Only messages reported as failure will return to the queue for a retry, while successful ones will be deleted.
* [**Kinesis data streams**](#kinesis-and-dynamodb-streams) and [**DynamoDB streams**](#kinesis-and-dynamodb-streams). Single reported failure will use its sequence number as the stream checkpoint. Multiple reported failures will use the lowest sequence number as checkpoint.
* [**Kafka (MSK and self-managed)**](#processing-messages-from-kafka). Failed records are identified by topic-partition and offset. Only failed records will be retried.

<!-- HTML tags are required in admonition content thus increasing line length beyond our limits -->
<!-- markdownlint-disable MD013 -->
???+ warning "Warning: This utility lowers the chance of processing records more than once; it does not guarantee it"
We recommend implementing processing logic in an [idempotent manner](idempotency.md){target="_blank"} wherever possible.

You can find more details on how Lambda works with either [SQS](https://docs.aws.amazon.com/lambda/latest/dg/with-sqs.html){target="_blank"}, [Kinesis](https://docs.aws.amazon.com/lambda/latest/dg/with-kinesis.html){target="_blank"}, or [DynamoDB](https://docs.aws.amazon.com/lambda/latest/dg/with-ddb.html){target="_blank"} in the AWS Documentation.
You can find more details on how Lambda works with either [SQS](https://docs.aws.amazon.com/lambda/latest/dg/with-sqs.html){target="_blank"}, [Kinesis](https://docs.aws.amazon.com/lambda/latest/dg/with-kinesis.html){target="_blank"}, [DynamoDB](https://docs.aws.amazon.com/lambda/latest/dg/with-ddb.html){target="_blank"}, or [MSK/Kafka](https://docs.aws.amazon.com/lambda/latest/dg/with-msk.html){target="_blank"} in the AWS Documentation.

## Getting started

Expand Down Expand Up @@ -93,6 +94,12 @@ The remaining sections of the documentation will rely on these samples. For comp
--8<-- "examples/batch_processing/sam/dynamodb_batch_processing.yaml"
```

=== "Kafka (MSK)"

```yaml title="template.yaml" hl_lines="74-75"
--8<-- "examples/batch_processing/sam/kafka_batch_processing.yaml"
```

### Processing messages from SQS

Processing batches from SQS works in three stages:
Expand Down Expand Up @@ -237,6 +244,44 @@ Processing batches from DynamoDB Streams works in three stages:
--8<-- "examples/batch_processing/src/getting_started_dynamodb_event.json"
```

### Processing messages from Kafka

Processing batches from Amazon MSK or self-managed Apache Kafka works in three stages:

1. Instantiate **`BatchProcessor`** and choose **`EventType.Kafka`** for the event type
2. Define your function to handle each batch record, and use [`KafkaEventRecord`](data_classes.md#kafka){target="_blank"} type annotation for autocompletion
3. Use **`process_partial_response`** to kick off processing

!!! info "This works with both MSK and self-managed Apache Kafka"
The batch processor automatically handles the different event structures from MSK and self-managed Kafka clusters.

=== "Recommended"

```python hl_lines="2-9 12 18 27"
--8<-- "examples/batch_processing/src/getting_started_kafka.py"
```

1. **Step 1**. Creates a partial failure batch processor for Kafka. See [partial failure mechanics for details](#partial-failure-mechanics)

=== "Sample response"

The second record failed to be processed, therefore the processor added its topic-partition and offset in the response.

```json
--8<-- "examples/batch_processing/src/getting_started_kafka_response.json"
```

=== "Sample event"

```json
--8<-- "examples/batch_processing/src/getting_started_kafka_event.json"
```

!!! tip "Extracting message value"
Use `record.json_value` to get the deserialized JSON body from the Kafka record. For raw bytes access, use `record.decoded_value`.

For advanced deserialization (Avro, Protobuf), see the [Kafka Consumer utility](kafka.md){target="_blank"} which can be used alongside the batch processor.

### Error handling

By default, we catch any exception raised by your record handler function. This allows us to **(1)** continue processing the batch, **(2)** collect each batch item that failed processing, and **(3)** return the appropriate response correctly without failing your Lambda function execution.
Expand Down
23 changes: 19 additions & 4 deletions docs/utilities/kafka.md
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ flowchart LR
Lambda processes Kafka messages as discrete events rather than continuous streams, requiring a different approach to consumer development that Powertools for AWS helps standardize.

| Aspect | Traditional Kafka Consumers | Lambda Kafka Consumer |
|--------|----------------------------|----------------------|
| ------ | --------------------------- | --------------------- |
| **Model** | Pull-based (you poll for messages) | Push-based (Lambda invoked with messages) |
| **Scaling** | Manual scaling configuration | Automatic scaling to partition count |
| **State** | Long-running application with state | Stateless, ephemeral executions |
Expand Down Expand Up @@ -241,7 +241,7 @@ Each Kafka record contains important metadata that you can access alongside the
#### Available metadata properties

| Property | Description | Example Use Case |
|----------|-------------|-----------------|
| -------- | ----------- | ---------------- |
| `topic` | Topic name the record was published to | Routing logic in multi-topic consumers |
| `partition` | Kafka partition number | Tracking message distribution |
| `offset` | Position in the partition | De-duplication, exactly-once processing |
Expand All @@ -253,7 +253,7 @@ Each Kafka record contains important metadata that you can access alongside the
| `original_value` | Base64-encoded original message value | Debugging or custom deserialization |
| `original_key` | Base64-encoded original message key | Debugging or custom deserialization |
| `value_schema_metadata` | Metadata about the value schema like `schemaId` and `dataFormat` | Data format and schemaId propagated when integrating with Schema Registry |
| `key_schema_metadata` | Metadata about the key schema like `schemaId` and `dataFormat` | Data format and schemaId propagated when integrating with Schema Registry |
| `key_schema_metadata` | Metadata about the key schema like `schemaId` and `dataFormat` | Data format and schemaId propagated when integrating with Schema Registry |

### Custom output serializers

Expand Down Expand Up @@ -304,7 +304,7 @@ Handle errors gracefully when processing Kafka messages to ensure your applicati
#### Exception types

| Exception | Description | Common Causes |
|-----------|-------------|---------------|
| --------- | ----------- | ------------- |
| `KafkaConsumerDeserializationError` | Raised when message deserialization fails | Corrupted message data, schema mismatch, or wrong schema type configuration |
| `KafkaConsumerAvroSchemaParserError` | Raised when parsing Avro schema definition fails | Syntax errors in schema JSON, invalid field types, or malformed schema |
| `KafkaConsumerMissingSchemaError` | Raised when a required schema is not provided | Missing schema for AVRO or PROTOBUF formats (required parameter) |
Expand All @@ -325,6 +325,21 @@ The [idempotency utility](idempotency.md){target="_blank"} automatically stores

TIP: By using the Kafka record's unique coordinates (topic, partition, offset) as the idempotency key, you ensure that even if a batch fails and Lambda retries the messages, each message will be processed exactly once.

### Handling partial batch failures

When processing Kafka messages, individual records may fail while others succeed. By default, Lambda retries the entire batch when any record fails. To retry only the failed records, use the [Batch Processing utility](batch.md#processing-messages-from-kafka){target="_blank"} with `EventType.Kafka`.

This feature allows Lambda to checkpoint successful records and only retry the failed ones, significantly improving processing efficiency and reducing duplicate processing.

=== "Kafka with Batch Processing"

```python hl_lines="2-6 12 18-19 27"
--8<-- "examples/batch_processing/src/getting_started_kafka.py"
```

!!! note "Using with deserialization"
The Batch Processing utility uses the basic `KafkaEventRecord` data class. For advanced deserialization (Avro, Protobuf), you can use the Kafka Consumer's deserialization utilities inside your record handler function.

### Best practices

#### Handling large messages
Expand Down
Loading
Loading