diff --git a/aws_lambda_powertools/utilities/batch/base.py b/aws_lambda_powertools/utilities/batch/base.py
index ce2f6d52314..fede4c9d24a 100644
--- a/aws_lambda_powertools/utilities/batch/base.py
+++ b/aws_lambda_powertools/utilities/batch/base.py
@@ -25,6 +25,9 @@
from aws_lambda_powertools.utilities.data_classes.dynamo_db_stream_event import (
DynamoDBRecord,
)
+from aws_lambda_powertools.utilities.data_classes.kafka_event import (
+ KafkaEventRecord,
+)
from aws_lambda_powertools.utilities.data_classes.kinesis_stream_event import (
KinesisStreamRecord,
)
@@ -46,12 +49,13 @@ class EventType(Enum):
SQS = "SQS"
KinesisDataStreams = "KinesisDataStreams"
DynamoDBStreams = "DynamoDBStreams"
+ Kafka = "Kafka"
# When using processor with default arguments, records will carry EventSourceDataClassTypes
# and depending on what EventType it's passed it'll correctly map to the right record
-# When using Pydantic Models, it'll accept any subclass from SQS, DynamoDB and Kinesis
-EventSourceDataClassTypes = Union[SQSRecord, KinesisStreamRecord, DynamoDBRecord]
+# When using Pydantic Models, it'll accept any subclass from SQS, DynamoDB, Kinesis and Kafka
+EventSourceDataClassTypes = Union[SQSRecord, KinesisStreamRecord, DynamoDBRecord, KafkaEventRecord]
BatchEventTypes = Union[EventSourceDataClassTypes, BatchTypeModels]
SuccessResponse = Tuple[str, Any, BatchEventTypes]
FailureResponse = Tuple[str, str, BatchEventTypes]
@@ -272,11 +276,13 @@ def __init__(
EventType.SQS: self._collect_sqs_failures,
EventType.KinesisDataStreams: self._collect_kinesis_failures,
EventType.DynamoDBStreams: self._collect_dynamodb_failures,
+ EventType.Kafka: self._collect_kafka_failures,
}
self._DATA_CLASS_MAPPING = {
EventType.SQS: SQSRecord,
EventType.KinesisDataStreams: KinesisStreamRecord,
EventType.DynamoDBStreams: DynamoDBRecord,
+ EventType.Kafka: KafkaEventRecord,
}
super().__init__()
@@ -365,6 +371,21 @@ def _collect_dynamodb_failures(self):
failures.append({"itemIdentifier": msg_id})
return failures
+ def _collect_kafka_failures(self):
+ failures = []
+ for msg in self.fail_messages:
+ # Kafka uses a composite identifier with topic-partition and offset
+ # Both data class and Pydantic model use the same field names
+ failures.append(
+ {
+ "itemIdentifier": {
+ "topic-partition": f"{msg.topic}-{msg.partition}",
+ "offset": msg.offset,
+ },
+ },
+ )
+ return failures
+
@overload
def _to_batch_type(
self,
diff --git a/aws_lambda_powertools/utilities/batch/decorators.py b/aws_lambda_powertools/utilities/batch/decorators.py
index e818f7783a0..693c75443b2 100644
--- a/aws_lambda_powertools/utilities/batch/decorators.py
+++ b/aws_lambda_powertools/utilities/batch/decorators.py
@@ -22,6 +22,47 @@
from aws_lambda_powertools.utilities.typing import LambdaContext
+def _get_records_from_event(
+ event: dict[str, Any],
+ processor: BasePartialBatchProcessor,
+) -> list[dict]:
+ """
+ Extract records from the event based on the processor's event type.
+
+ For SQS, Kinesis, and DynamoDB: Records are in event["Records"] as a list
+ For Kafka: Records are in event["records"] as a dict with topic-partition keys
+
+ Parameters
+ ----------
+ event: dict
+ Lambda's original event
+ processor: BasePartialBatchProcessor
+ Batch Processor to determine event type
+
+ Returns
+ -------
+ records: list[dict]
+ Flattened list of records to process
+ """
+ # Kafka events use lowercase "records" and have a nested dict structure
+ if processor.event_type == EventType.Kafka:
+ kafka_records = event.get("records", {})
+ if not kafka_records or not isinstance(kafka_records, dict):
+ raise UnexpectedBatchTypeError(
+ "Invalid Kafka event structure. Expected 'records' to be a non-empty dict with topic-partition keys.",
+ )
+ # Flatten the nested dict: {"topic-0": [r1, r2], "topic-1": [r3]} -> [r1, r2, r3]
+ return [record for topic_records in kafka_records.values() for record in topic_records]
+
+ # SQS, Kinesis, DynamoDB use uppercase "Records" as a list
+ records = event.get("Records", [])
+ if not records or not isinstance(records, list):
+ raise UnexpectedBatchTypeError(
+ "Unexpected batch event type. Possible values are: SQS, KinesisDataStreams, DynamoDBStreams, Kafka",
+ )
+ return records
+
+
@lambda_handler_decorator
@deprecated(
"`async_batch_processor` decorator is deprecated; use `async_process_partial_response` function instead.",
@@ -206,12 +247,7 @@ def handler(event, context):
* Async batch processors. Use `async_process_partial_response` instead.
"""
try:
- records: list[dict] = event.get("Records", [])
- if not records or not isinstance(records, list):
- raise UnexpectedBatchTypeError(
- "Unexpected batch event type. Possible values are: SQS, KinesisDataStreams, DynamoDBStreams",
- )
-
+ records = _get_records_from_event(event, processor)
except AttributeError:
event_types = ", ".join(list(EventType.__members__))
docs = "https://docs.powertools.aws.dev/lambda/python/latest/utilities/batch/#processing-messages-from-sqs" # noqa: E501 # long-line
@@ -275,12 +311,7 @@ def handler(event, context):
* Sync batch processors. Use `process_partial_response` instead.
"""
try:
- records: list[dict] = event.get("Records", [])
- if not records or not isinstance(records, list):
- raise UnexpectedBatchTypeError(
- "Unexpected batch event type. Possible values are: SQS, KinesisDataStreams, DynamoDBStreams",
- )
-
+ records = _get_records_from_event(event, processor)
except AttributeError:
event_types = ", ".join(list(EventType.__members__))
docs = "https://docs.powertools.aws.dev/lambda/python/latest/utilities/batch/#processing-messages-from-sqs" # noqa: E501 # long-line
diff --git a/aws_lambda_powertools/utilities/batch/types.py b/aws_lambda_powertools/utilities/batch/types.py
index ac0a7d73efa..a3e13b17f86 100644
--- a/aws_lambda_powertools/utilities/batch/types.py
+++ b/aws_lambda_powertools/utilities/batch/types.py
@@ -12,9 +12,15 @@
from aws_lambda_powertools.utilities.parser.models import (
KinesisDataStreamRecord as KinesisDataStreamRecordModel,
)
+ from aws_lambda_powertools.utilities.parser.models.kafka import KafkaRecordModel
BatchTypeModels = Optional[
- Union[Type[SqsRecordModel], Type[DynamoDBStreamRecordModel], Type[KinesisDataStreamRecordModel]]
+ Union[
+ Type[SqsRecordModel],
+ Type[DynamoDBStreamRecordModel],
+ Type[KinesisDataStreamRecordModel],
+ Type[KafkaRecordModel],
+ ]
]
BatchSqsTypeModel = Optional[Type[SqsRecordModel]]
else: # pragma: no cover
@@ -22,8 +28,22 @@
BatchSqsTypeModel = "BatchSqsTypeModel" # type: ignore
+class KafkaItemIdentifier(TypedDict):
+ """Kafka uses a composite identifier with topic-partition and offset."""
+
+ topic_partition: str # Maps to "topic-partition" in the actual response
+ offset: int
+
+
class PartialItemFailures(TypedDict):
- itemIdentifier: str
+ """
+ Represents a partial item failure response.
+
+ For SQS, Kinesis, and DynamoDB: itemIdentifier is a string (message_id or sequence_number)
+ For Kafka: itemIdentifier is a KafkaItemIdentifier dict with topic-partition and offset
+ """
+
+ itemIdentifier: str | KafkaItemIdentifier
class PartialItemFailureResponse(TypedDict):
diff --git a/docs/utilities/batch.md b/docs/utilities/batch.md
index d20c3cdb0a6..c8b290ce3fd 100644
--- a/docs/utilities/batch.md
+++ b/docs/utilities/batch.md
@@ -3,12 +3,12 @@ title: Batch Processing
description: Utility
---
-The batch processing utility handles partial failures when processing batches from Amazon SQS, Amazon Kinesis Data Streams, and Amazon DynamoDB Streams.
+The batch processing utility handles partial failures when processing batches from Amazon SQS, Amazon Kinesis Data Streams, Amazon DynamoDB Streams, and Amazon MSK/self-managed Apache Kafka.
```mermaid
stateDiagram-v2
direction LR
- BatchSource: Amazon SQS
Amazon Kinesis Data Streams
Amazon DynamoDB Streams
+ BatchSource: Amazon SQS
Amazon Kinesis Data Streams
Amazon DynamoDB Streams
Amazon MSK / Apache Kafka
LambdaInit: Lambda invocation
BatchProcessor: Batch Processor
RecordHandler: Record Handler function
@@ -38,7 +38,7 @@ stateDiagram-v2
## Background
-When using SQS, Kinesis Data Streams, or DynamoDB Streams as a Lambda event source, your Lambda functions are triggered with a batch of messages.
+When using SQS, Kinesis Data Streams, DynamoDB Streams, or Amazon MSK/Apache Kafka as a Lambda event source, your Lambda functions are triggered with a batch of messages.
If your function fails to process any message from the batch, the entire batch returns to your queue or stream. This same batch is then retried until either condition happens first: **a)** your Lambda function returns a successful response, **b)** record reaches maximum retry attempts, or **c)** records expire.
@@ -55,13 +55,14 @@ This behavior changes when you enable Report Batch Item Failures feature in your
* [**SQS queues**](#sqs-standard). Only messages reported as failure will return to the queue for a retry, while successful ones will be deleted.
* [**Kinesis data streams**](#kinesis-and-dynamodb-streams) and [**DynamoDB streams**](#kinesis-and-dynamodb-streams). Single reported failure will use its sequence number as the stream checkpoint. Multiple reported failures will use the lowest sequence number as checkpoint.
+* [**Kafka (MSK and self-managed)**](#processing-messages-from-kafka). Failed records are identified by topic-partition and offset. Only failed records will be retried.
???+ warning "Warning: This utility lowers the chance of processing records more than once; it does not guarantee it"
We recommend implementing processing logic in an [idempotent manner](idempotency.md){target="_blank"} wherever possible.
- You can find more details on how Lambda works with either [SQS](https://docs.aws.amazon.com/lambda/latest/dg/with-sqs.html){target="_blank"}, [Kinesis](https://docs.aws.amazon.com/lambda/latest/dg/with-kinesis.html){target="_blank"}, or [DynamoDB](https://docs.aws.amazon.com/lambda/latest/dg/with-ddb.html){target="_blank"} in the AWS Documentation.
+ You can find more details on how Lambda works with either [SQS](https://docs.aws.amazon.com/lambda/latest/dg/with-sqs.html){target="_blank"}, [Kinesis](https://docs.aws.amazon.com/lambda/latest/dg/with-kinesis.html){target="_blank"}, [DynamoDB](https://docs.aws.amazon.com/lambda/latest/dg/with-ddb.html){target="_blank"}, or [MSK/Kafka](https://docs.aws.amazon.com/lambda/latest/dg/with-msk.html){target="_blank"} in the AWS Documentation.
## Getting started
@@ -93,6 +94,12 @@ The remaining sections of the documentation will rely on these samples. For comp
--8<-- "examples/batch_processing/sam/dynamodb_batch_processing.yaml"
```
+=== "Kafka (MSK)"
+
+ ```yaml title="template.yaml" hl_lines="74-75"
+ --8<-- "examples/batch_processing/sam/kafka_batch_processing.yaml"
+ ```
+
### Processing messages from SQS
Processing batches from SQS works in three stages:
@@ -237,6 +244,44 @@ Processing batches from DynamoDB Streams works in three stages:
--8<-- "examples/batch_processing/src/getting_started_dynamodb_event.json"
```
+### Processing messages from Kafka
+
+Processing batches from Amazon MSK or self-managed Apache Kafka works in three stages:
+
+1. Instantiate **`BatchProcessor`** and choose **`EventType.Kafka`** for the event type
+2. Define your function to handle each batch record, and use [`KafkaEventRecord`](data_classes.md#kafka){target="_blank"} type annotation for autocompletion
+3. Use **`process_partial_response`** to kick off processing
+
+!!! info "This works with both MSK and self-managed Apache Kafka"
+ The batch processor automatically handles the different event structures from MSK and self-managed Kafka clusters.
+
+=== "Recommended"
+
+ ```python hl_lines="2-9 12 18 27"
+ --8<-- "examples/batch_processing/src/getting_started_kafka.py"
+ ```
+
+ 1. **Step 1**. Creates a partial failure batch processor for Kafka. See [partial failure mechanics for details](#partial-failure-mechanics)
+
+=== "Sample response"
+
+ The second record failed to be processed, therefore the processor added its topic-partition and offset in the response.
+
+ ```json
+ --8<-- "examples/batch_processing/src/getting_started_kafka_response.json"
+ ```
+
+=== "Sample event"
+
+ ```json
+ --8<-- "examples/batch_processing/src/getting_started_kafka_event.json"
+ ```
+
+!!! tip "Extracting message value"
+ Use `record.json_value` to get the deserialized JSON body from the Kafka record. For raw bytes access, use `record.decoded_value`.
+
+ For advanced deserialization (Avro, Protobuf), see the [Kafka Consumer utility](kafka.md){target="_blank"} which can be used alongside the batch processor.
+
### Error handling
By default, we catch any exception raised by your record handler function. This allows us to **(1)** continue processing the batch, **(2)** collect each batch item that failed processing, and **(3)** return the appropriate response correctly without failing your Lambda function execution.
diff --git a/docs/utilities/kafka.md b/docs/utilities/kafka.md
index a3463e4e950..5bbab7e3062 100644
--- a/docs/utilities/kafka.md
+++ b/docs/utilities/kafka.md
@@ -50,7 +50,7 @@ flowchart LR
Lambda processes Kafka messages as discrete events rather than continuous streams, requiring a different approach to consumer development that Powertools for AWS helps standardize.
| Aspect | Traditional Kafka Consumers | Lambda Kafka Consumer |
-|--------|----------------------------|----------------------|
+| ------ | --------------------------- | --------------------- |
| **Model** | Pull-based (you poll for messages) | Push-based (Lambda invoked with messages) |
| **Scaling** | Manual scaling configuration | Automatic scaling to partition count |
| **State** | Long-running application with state | Stateless, ephemeral executions |
@@ -241,7 +241,7 @@ Each Kafka record contains important metadata that you can access alongside the
#### Available metadata properties
| Property | Description | Example Use Case |
-|----------|-------------|-----------------|
+| -------- | ----------- | ---------------- |
| `topic` | Topic name the record was published to | Routing logic in multi-topic consumers |
| `partition` | Kafka partition number | Tracking message distribution |
| `offset` | Position in the partition | De-duplication, exactly-once processing |
@@ -253,7 +253,7 @@ Each Kafka record contains important metadata that you can access alongside the
| `original_value` | Base64-encoded original message value | Debugging or custom deserialization |
| `original_key` | Base64-encoded original message key | Debugging or custom deserialization |
| `value_schema_metadata` | Metadata about the value schema like `schemaId` and `dataFormat` | Data format and schemaId propagated when integrating with Schema Registry |
-| `key_schema_metadata` | Metadata about the key schema like `schemaId` and `dataFormat` | Data format and schemaId propagated when integrating with Schema Registry |
+| `key_schema_metadata` | Metadata about the key schema like `schemaId` and `dataFormat` | Data format and schemaId propagated when integrating with Schema Registry |
### Custom output serializers
@@ -304,7 +304,7 @@ Handle errors gracefully when processing Kafka messages to ensure your applicati
#### Exception types
| Exception | Description | Common Causes |
-|-----------|-------------|---------------|
+| --------- | ----------- | ------------- |
| `KafkaConsumerDeserializationError` | Raised when message deserialization fails | Corrupted message data, schema mismatch, or wrong schema type configuration |
| `KafkaConsumerAvroSchemaParserError` | Raised when parsing Avro schema definition fails | Syntax errors in schema JSON, invalid field types, or malformed schema |
| `KafkaConsumerMissingSchemaError` | Raised when a required schema is not provided | Missing schema for AVRO or PROTOBUF formats (required parameter) |
@@ -325,6 +325,21 @@ The [idempotency utility](idempotency.md){target="_blank"} automatically stores
TIP: By using the Kafka record's unique coordinates (topic, partition, offset) as the idempotency key, you ensure that even if a batch fails and Lambda retries the messages, each message will be processed exactly once.
+### Handling partial batch failures
+
+When processing Kafka messages, individual records may fail while others succeed. By default, Lambda retries the entire batch when any record fails. To retry only the failed records, use the [Batch Processing utility](batch.md#processing-messages-from-kafka){target="_blank"} with `EventType.Kafka`.
+
+This feature allows Lambda to checkpoint successful records and only retry the failed ones, significantly improving processing efficiency and reducing duplicate processing.
+
+=== "Kafka with Batch Processing"
+
+ ```python hl_lines="2-6 12 18-19 27"
+ --8<-- "examples/batch_processing/src/getting_started_kafka.py"
+ ```
+
+!!! note "Using with deserialization"
+ The Batch Processing utility uses the basic `KafkaEventRecord` data class. For advanced deserialization (Avro, Protobuf), you can use the Kafka Consumer's deserialization utilities inside your record handler function.
+
### Best practices
#### Handling large messages
diff --git a/examples/batch_processing/sam/kafka_batch_processing.yaml b/examples/batch_processing/sam/kafka_batch_processing.yaml
new file mode 100644
index 00000000000..b6369803d2d
--- /dev/null
+++ b/examples/batch_processing/sam/kafka_batch_processing.yaml
@@ -0,0 +1,87 @@
+AWSTemplateFormatVersion: "2010-09-09"
+Transform: AWS::Serverless-2016-10-31
+Description: Kafka/MSK partial batch response sample
+
+Globals:
+ Function:
+ Timeout: 30
+ MemorySize: 256
+ Runtime: python3.12
+ Tracing: Active
+ Environment:
+ Variables:
+ POWERTOOLS_LOG_LEVEL: INFO
+ POWERTOOLS_SERVICE_NAME: kafka-processor
+
+Parameters:
+ MSKClusterArn:
+ Type: String
+ Description: ARN of the MSK cluster
+ KafkaTopic:
+ Type: String
+ Description: Name of the Kafka topic to consume from
+ Default: mytopic
+
+Resources:
+ KafkaConsumerFunction:
+ Type: AWS::Serverless::Function
+ Properties:
+ Handler: app.lambda_handler
+ CodeUri: hello_world
+ Policies:
+ # Permissions for MSK
+ - Version: "2012-10-17"
+ Statement:
+ - Effect: "Allow"
+ Action:
+ - kafka:DescribeCluster
+ - kafka:DescribeClusterV2
+ - kafka:GetBootstrapBrokers
+ Resource: !Ref MSKClusterArn
+ - Effect: "Allow"
+ Action:
+ - kafka-cluster:Connect
+ - kafka-cluster:DescribeGroup
+ - kafka-cluster:AlterGroup
+ - kafka-cluster:DescribeTopic
+ - kafka-cluster:ReadData
+ - kafka-cluster:DescribeClusterDynamicConfiguration
+ Resource:
+ - !Ref MSKClusterArn
+ - !Sub "${MSKClusterArn}/*"
+ # Lambda Destinations require additional permissions
+ # to send failure records to DLQ
+ - Version: "2012-10-17"
+ Statement:
+ Effect: "Allow"
+ Action:
+ - sqs:GetQueueAttributes
+ - sqs:GetQueueUrl
+ - sqs:SendMessage
+ Resource: !GetAtt SampleDLQ.Arn
+ Events:
+ MSKEvent:
+ Type: MSK
+ Properties:
+ Stream: !Ref MSKClusterArn
+ Topics:
+ - !Ref KafkaTopic
+ StartingPosition: LATEST
+ BatchSize: 100
+ MaximumBatchingWindowInSeconds: 5
+ DestinationConfig:
+ OnFailure:
+ Destination: !GetAtt SampleDLQ.Arn
+ FunctionResponseTypes:
+ - ReportBatchItemFailures
+
+ SampleDLQ:
+ Type: AWS::SQS::Queue
+
+Outputs:
+ KafkaConsumerFunctionArn:
+ Description: "Kafka Consumer Lambda Function ARN"
+ Value: !GetAtt KafkaConsumerFunction.Arn
+ SampleDLQUrl:
+ Description: "Dead Letter Queue URL"
+ Value: !Ref SampleDLQ
diff --git a/examples/batch_processing/src/getting_started_kafka.py b/examples/batch_processing/src/getting_started_kafka.py
new file mode 100644
index 00000000000..9327646bdd4
--- /dev/null
+++ b/examples/batch_processing/src/getting_started_kafka.py
@@ -0,0 +1,27 @@
+from aws_lambda_powertools import Logger, Tracer
+from aws_lambda_powertools.utilities.batch import (
+ BatchProcessor,
+ EventType,
+ process_partial_response,
+)
+from aws_lambda_powertools.utilities.data_classes.kafka_event import (
+ KafkaEventRecord,
+)
+from aws_lambda_powertools.utilities.typing import LambdaContext
+
+processor = BatchProcessor(event_type=EventType.Kafka) # (1)!
+tracer = Tracer()
+logger = Logger()
+
+
+@tracer.capture_method
+def record_handler(record: KafkaEventRecord):
+ logger.info(f"Processing record from topic: {record.topic}, partition: {record.partition}, offset: {record.offset}")
+ payload: dict = record.json_value
+ logger.info(payload)
+
+
+@logger.inject_lambda_context
+@tracer.capture_lambda_handler
+def lambda_handler(event, context: LambdaContext):
+ return process_partial_response(event=event, record_handler=record_handler, processor=processor, context=context)
diff --git a/examples/batch_processing/src/getting_started_kafka_event.json b/examples/batch_processing/src/getting_started_kafka_event.json
new file mode 100644
index 00000000000..74992763b5b
--- /dev/null
+++ b/examples/batch_processing/src/getting_started_kafka_event.json
@@ -0,0 +1,39 @@
+{
+ "eventSource": "aws:kafka",
+ "eventSourceArn": "arn:aws:kafka:us-east-1:123456789012:cluster/MyCluster/abcd1234-abcd-cafe-abab-9876543210ab-4",
+ "bootstrapServers": "b-2.demo-cluster-1.a1bcde.c1.kafka.us-east-1.amazonaws.com:9092,b-1.demo-cluster-1.a1bcde.c1.kafka.us-east-1.amazonaws.com:9092",
+ "records": {
+ "mytopic-0": [
+ {
+ "topic": "mytopic",
+ "partition": 0,
+ "offset": 0,
+ "timestamp": 1545084650987,
+ "timestampType": "CREATE_TIME",
+ "key": "MQ==",
+ "value": "eyJtZXNzYWdlIjogIlN1Y2Nlc3MifQ==",
+ "headers": []
+ },
+ {
+ "topic": "mytopic",
+ "partition": 0,
+ "offset": 1,
+ "timestamp": 1545084650988,
+ "timestampType": "CREATE_TIME",
+ "key": "Mg==",
+ "value": "eyJtZXNzYWdlIjogIkZhaWx1cmUifQ==",
+ "headers": []
+ },
+ {
+ "topic": "mytopic",
+ "partition": 0,
+ "offset": 2,
+ "timestamp": 1545084650989,
+ "timestampType": "CREATE_TIME",
+ "key": "Mw==",
+ "value": "eyJtZXNzYWdlIjogIlN1Y2Nlc3MifQ==",
+ "headers": []
+ }
+ ]
+ }
+}
diff --git a/examples/batch_processing/src/getting_started_kafka_response.json b/examples/batch_processing/src/getting_started_kafka_response.json
new file mode 100644
index 00000000000..a47f52ed9ae
--- /dev/null
+++ b/examples/batch_processing/src/getting_started_kafka_response.json
@@ -0,0 +1,10 @@
+{
+ "batchItemFailures": [
+ {
+ "itemIdentifier": {
+ "topic-partition": "mytopic-0",
+ "offset": 1
+ }
+ }
+ ]
+}
diff --git a/tests/functional/batch/required_dependencies/test_utilities_batch_kafka.py b/tests/functional/batch/required_dependencies/test_utilities_batch_kafka.py
new file mode 100644
index 00000000000..eed8a8951fa
--- /dev/null
+++ b/tests/functional/batch/required_dependencies/test_utilities_batch_kafka.py
@@ -0,0 +1,413 @@
+"""Tests for Kafka batch processing support."""
+
+from __future__ import annotations
+
+import base64
+import json
+from typing import TYPE_CHECKING, Any
+
+import pytest
+
+from aws_lambda_powertools.utilities.batch import (
+ AsyncBatchProcessor,
+ BatchProcessor,
+ EventType,
+ async_process_partial_response,
+ process_partial_response,
+)
+from aws_lambda_powertools.utilities.batch.exceptions import BatchProcessingError, UnexpectedBatchTypeError
+from aws_lambda_powertools.utilities.data_classes.kafka_event import KafkaEventRecord # noqa: TC001
+
+if TYPE_CHECKING:
+ from collections.abc import Awaitable, Callable
+
+
+def str_to_b64(value: str) -> str:
+ """Convert string to base64 encoded string."""
+ return base64.b64encode(value.encode()).decode()
+
+
+@pytest.fixture(scope="module")
+def kafka_event_factory() -> Callable:
+ """Factory for creating Kafka event records."""
+
+ def factory(body: str, topic: str = "mytopic", partition: int = 0, offset: int = 0):
+ return {
+ "topic": topic,
+ "partition": partition,
+ "offset": offset,
+ "timestamp": 1545084650987,
+ "timestampType": "CREATE_TIME",
+ "key": str_to_b64("recordKey"),
+ "value": str_to_b64(body),
+ "headers": [{"headerKey": [104, 101, 97, 100, 101, 114, 86, 97, 108, 117, 101]}],
+ }
+
+ return factory
+
+
+@pytest.fixture(scope="module")
+def kafka_record_handler() -> Callable:
+ """Handler for Kafka records that fails if body contains 'fail'."""
+
+ def handler(record: KafkaEventRecord):
+ body = record.decoded_value.decode("utf-8")
+ if "fail" in body:
+ raise Exception("Failed to process record.")
+ return body
+
+ return handler
+
+
+@pytest.fixture(scope="module")
+def async_kafka_record_handler() -> Callable[..., Awaitable[Any]]:
+ """Async handler for Kafka records that fails if body contains 'fail'."""
+
+ async def handler(record: KafkaEventRecord):
+ body = record.decoded_value.decode("utf-8")
+ if "fail" in body:
+ raise Exception("Failed to process record.")
+ return body
+
+ return handler
+
+
+def build_kafka_event(records: list[dict], topic_partition: str = "mytopic-0") -> dict:
+ """Build a complete Kafka event from records."""
+ return {
+ "eventSource": "aws:kafka",
+ "eventSourceArn": "arn:aws:kafka:us-east-1:123456789012:cluster/MyCluster/abc123",
+ "bootstrapServers": "b-1.cluster.kafka.us-east-1.amazonaws.com:9092",
+ "records": {topic_partition: records},
+ }
+
+
+def build_multi_topic_kafka_event(records_by_topic: dict[str, list[dict]]) -> dict:
+ """Build a Kafka event with multiple topic-partitions."""
+ return {
+ "eventSource": "aws:kafka",
+ "eventSourceArn": "arn:aws:kafka:us-east-1:123456789012:cluster/MyCluster/abc123",
+ "bootstrapServers": "b-1.cluster.kafka.us-east-1.amazonaws.com:9092",
+ "records": records_by_topic,
+ }
+
+
+class TestKafkaBatchProcessing:
+ """Test Kafka batch processing with process_partial_response."""
+
+ def test_kafka_batch_processor_success_only(self, kafka_event_factory, kafka_record_handler):
+ """Test successful processing of all Kafka records."""
+ # GIVEN
+ first_record = kafka_event_factory("success", offset=0)
+ second_record = kafka_event_factory("success", offset=1)
+ event = build_kafka_event([first_record, second_record])
+
+ processor = BatchProcessor(event_type=EventType.Kafka)
+
+ # WHEN
+ result = process_partial_response(
+ event=event,
+ record_handler=kafka_record_handler,
+ processor=processor,
+ )
+
+ # THEN
+ assert result["batchItemFailures"] == []
+
+ def test_kafka_batch_processor_failure_only(self, kafka_event_factory, kafka_record_handler):
+ """Test processing where all Kafka records fail."""
+ # GIVEN
+ first_record = kafka_event_factory("fail", offset=0)
+ second_record = kafka_event_factory("fail", offset=1)
+ event = build_kafka_event([first_record, second_record])
+
+ processor = BatchProcessor(event_type=EventType.Kafka)
+
+ # WHEN/THEN - entire batch failure should raise
+ with pytest.raises(BatchProcessingError):
+ process_partial_response(
+ event=event,
+ record_handler=kafka_record_handler,
+ processor=processor,
+ )
+
+ def test_kafka_batch_processor_partial_failure(self, kafka_event_factory, kafka_record_handler):
+ """Test partial failure processing for Kafka records."""
+ # GIVEN
+ success_record = kafka_event_factory("success", offset=0)
+ fail_record = kafka_event_factory("fail", topic="mytopic", partition=0, offset=1)
+ event = build_kafka_event([success_record, fail_record])
+
+ processor = BatchProcessor(event_type=EventType.Kafka)
+
+ # WHEN
+ result = process_partial_response(
+ event=event,
+ record_handler=kafka_record_handler,
+ processor=processor,
+ )
+
+ # THEN - Kafka uses composite identifier
+ assert len(result["batchItemFailures"]) == 1
+ assert result["batchItemFailures"][0]["itemIdentifier"] == {
+ "topic-partition": "mytopic-0",
+ "offset": 1,
+ }
+
+ def test_kafka_batch_processor_multiple_failures(self, kafka_event_factory, kafka_record_handler):
+ """Test multiple failures in Kafka batch."""
+ # GIVEN
+ success_record = kafka_event_factory("success", offset=0)
+ fail_record_1 = kafka_event_factory("fail", offset=1)
+ fail_record_2 = kafka_event_factory("fail", offset=2)
+ event = build_kafka_event([success_record, fail_record_1, fail_record_2])
+
+ processor = BatchProcessor(event_type=EventType.Kafka)
+
+ # WHEN
+ result = process_partial_response(
+ event=event,
+ record_handler=kafka_record_handler,
+ processor=processor,
+ )
+
+ # THEN
+ assert len(result["batchItemFailures"]) == 2
+ offsets = [f["itemIdentifier"]["offset"] for f in result["batchItemFailures"]]
+ assert 1 in offsets
+ assert 2 in offsets
+
+ def test_kafka_batch_processor_multi_topic_partition(self, kafka_event_factory, kafka_record_handler):
+ """Test processing records from multiple topic-partitions."""
+ # GIVEN
+ topic1_success = kafka_event_factory("success", topic="topic1", partition=0, offset=0)
+ topic1_fail = kafka_event_factory("fail", topic="topic1", partition=0, offset=1)
+ topic2_success = kafka_event_factory("success", topic="topic2", partition=1, offset=0)
+ topic2_fail = kafka_event_factory("fail", topic="topic2", partition=1, offset=1)
+
+ event = build_multi_topic_kafka_event(
+ {
+ "topic1-0": [topic1_success, topic1_fail],
+ "topic2-1": [topic2_success, topic2_fail],
+ },
+ )
+
+ processor = BatchProcessor(event_type=EventType.Kafka)
+
+ # WHEN
+ result = process_partial_response(
+ event=event,
+ record_handler=kafka_record_handler,
+ processor=processor,
+ )
+
+ # THEN
+ assert len(result["batchItemFailures"]) == 2
+ topic_partitions = [f["itemIdentifier"]["topic-partition"] for f in result["batchItemFailures"]]
+ assert "topic1-0" in topic_partitions
+ assert "topic2-1" in topic_partitions
+
+ def test_kafka_batch_processor_with_json_body(self, kafka_event_factory):
+ """Test processing Kafka records with JSON body."""
+ # GIVEN
+ json_body = json.dumps({"message": "hello", "status": "success"})
+ record = kafka_event_factory(json_body, offset=0)
+ event = build_kafka_event([record])
+
+ processor = BatchProcessor(event_type=EventType.Kafka)
+
+ def json_record_handler(record: KafkaEventRecord):
+ data = record.json_value
+ return data["message"]
+
+ # WHEN
+ result = process_partial_response(
+ event=event,
+ record_handler=json_record_handler,
+ processor=processor,
+ )
+
+ # THEN
+ assert result["batchItemFailures"] == []
+
+ def test_kafka_batch_processor_disable_raise_on_entire_batch_failure(
+ self,
+ kafka_event_factory,
+ kafka_record_handler,
+ ):
+ """Test that entire batch failure can be suppressed."""
+ # GIVEN
+ first_record = kafka_event_factory("fail", offset=0)
+ second_record = kafka_event_factory("fail", offset=1)
+ event = build_kafka_event([first_record, second_record])
+
+ processor = BatchProcessor(event_type=EventType.Kafka, raise_on_entire_batch_failure=False)
+
+ # WHEN
+ result = process_partial_response(
+ event=event,
+ record_handler=kafka_record_handler,
+ processor=processor,
+ )
+
+ # THEN
+ assert len(result["batchItemFailures"]) == 2
+
+ def test_kafka_batch_processor_invalid_event_structure(self, kafka_record_handler):
+ """Test that invalid Kafka event structure raises appropriate error."""
+ # GIVEN - Invalid event with empty records
+ event = {
+ "eventSource": "aws:kafka",
+ "records": {},
+ }
+
+ processor = BatchProcessor(event_type=EventType.Kafka)
+
+ # WHEN/THEN
+ with pytest.raises(UnexpectedBatchTypeError) as exc_info:
+ process_partial_response(
+ event=event,
+ record_handler=kafka_record_handler,
+ processor=processor,
+ )
+
+ assert "Invalid Kafka event structure" in str(exc_info.value)
+
+ def test_kafka_batch_processor_missing_records_key(self, kafka_record_handler):
+ """Test that missing records key raises appropriate error."""
+ # GIVEN
+ event = {
+ "eventSource": "aws:kafka",
+ }
+
+ processor = BatchProcessor(event_type=EventType.Kafka)
+
+ # WHEN/THEN
+ with pytest.raises(UnexpectedBatchTypeError):
+ process_partial_response(
+ event=event,
+ record_handler=kafka_record_handler,
+ processor=processor,
+ )
+
+
+class TestAsyncKafkaBatchProcessing:
+ """Test async Kafka batch processing with async_process_partial_response."""
+
+ def test_async_kafka_batch_processor_success_only(self, kafka_event_factory, async_kafka_record_handler):
+ """Test successful async processing of all Kafka records."""
+ # GIVEN
+ first_record = kafka_event_factory("success", offset=0)
+ second_record = kafka_event_factory("success", offset=1)
+ event = build_kafka_event([first_record, second_record])
+
+ processor = AsyncBatchProcessor(event_type=EventType.Kafka)
+
+ # WHEN
+ result = async_process_partial_response(
+ event=event,
+ record_handler=async_kafka_record_handler,
+ processor=processor,
+ )
+
+ # THEN
+ assert result["batchItemFailures"] == []
+
+ def test_async_kafka_batch_processor_partial_failure(self, kafka_event_factory, async_kafka_record_handler):
+ """Test async partial failure processing for Kafka records."""
+ # GIVEN
+ success_record = kafka_event_factory("success", offset=0)
+ fail_record = kafka_event_factory("fail", offset=1)
+ event = build_kafka_event([success_record, fail_record])
+
+ processor = AsyncBatchProcessor(event_type=EventType.Kafka)
+
+ # WHEN
+ result = async_process_partial_response(
+ event=event,
+ record_handler=async_kafka_record_handler,
+ processor=processor,
+ )
+
+ # THEN
+ assert len(result["batchItemFailures"]) == 1
+ assert result["batchItemFailures"][0]["itemIdentifier"] == {
+ "topic-partition": "mytopic-0",
+ "offset": 1,
+ }
+
+ def test_async_kafka_batch_processor_failure_only(self, kafka_event_factory, async_kafka_record_handler):
+ """Test async processing where all Kafka records fail."""
+ # GIVEN
+ first_record = kafka_event_factory("fail", offset=0)
+ second_record = kafka_event_factory("fail", offset=1)
+ event = build_kafka_event([first_record, second_record])
+
+ processor = AsyncBatchProcessor(event_type=EventType.Kafka)
+
+ # WHEN/THEN
+ with pytest.raises(BatchProcessingError):
+ async_process_partial_response(
+ event=event,
+ record_handler=async_kafka_record_handler,
+ processor=processor,
+ )
+
+
+class TestKafkaContextManager:
+ """Test Kafka batch processing using context manager pattern."""
+
+ def test_kafka_batch_processor_context_manager(self, kafka_event_factory, kafka_record_handler):
+ """Test Kafka batch processing using context manager."""
+ # GIVEN
+ success_record = kafka_event_factory("success", offset=0)
+ fail_record = kafka_event_factory("fail", offset=1)
+ event = build_kafka_event([success_record, fail_record])
+
+ processor = BatchProcessor(event_type=EventType.Kafka)
+
+ # Flatten records manually (mimicking what process_partial_response does)
+ records = [r for topic_records in event["records"].values() for r in topic_records]
+
+ # WHEN
+ with processor(records, kafka_record_handler):
+ processor.process()
+
+ result = processor.response()
+
+ # THEN
+ assert len(result["batchItemFailures"]) == 1
+ assert result["batchItemFailures"][0]["itemIdentifier"]["offset"] == 1
+
+
+class TestKafkaRecordDataClass:
+ """Test KafkaEventRecord data class integration with batch processor."""
+
+ def test_kafka_record_properties_accessible(self, kafka_event_factory):
+ """Test that Kafka record properties are accessible in handler."""
+ # GIVEN
+ record_data = kafka_event_factory("test message", topic="test-topic", partition=5, offset=100)
+ event = build_kafka_event([record_data], topic_partition="test-topic-5")
+
+ processor = BatchProcessor(event_type=EventType.Kafka)
+ captured_record = None
+
+ def capture_handler(record: KafkaEventRecord):
+ nonlocal captured_record
+ captured_record = record
+ return "processed"
+
+ # WHEN
+ process_partial_response(
+ event=event,
+ record_handler=capture_handler,
+ processor=processor,
+ )
+
+ # THEN
+ assert captured_record is not None
+ assert captured_record.topic == "test-topic"
+ assert captured_record.partition == 5
+ assert captured_record.offset == 100
+ assert captured_record.timestamp == 1545084650987
+ assert captured_record.timestamp_type == "CREATE_TIME"