Skip to content

DynamoDB Stream

Handler

Processes DynamoDB Stream records and writes results to a destination table.

Source code in templates/stream/handler.py
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
class Handler:
    """Processes DynamoDB Stream records and writes results to a destination table."""

    def __init__(self, repository: Repository) -> None:
        """Initialize the handler with a destination repository.

        Args:
            repository: The repository used to write processed items.
        """
        self._repository = repository

    @tracer.capture_method
    def _process(self, item: SourceItem) -> DestinationItem | None:
        """Transform a source item into a destination item.

        Args:
            item: The source item to process.

        Returns:
            A `DestinationItem` on success, or `None` if validation fails.
        """
        try:
            # TODO: process here
            return DestinationItem.model_validate(item.model_dump(by_alias=True))
        except ValidationError as exc:
            logger.error("DestinationItem validation failed", exc_info=exc)
            return None

    @tracer.capture_method
    def handle_record(self, record: DynamoDBRecord) -> None:
        """Handle a single DynamoDB Stream record.

        Processes INSERT and MODIFY events by transforming and writing the new image
        to the destination table. Handles REMOVE events by deleting the corresponding
        item from the destination table.

        Args:
            record: The DynamoDB Stream record to process.

        Raises:
            ValueError: If the record cannot be transformed into a `DestinationItem`.
        """
        event_name = record.event_name

        if event_name and event_name.name in ("INSERT", "MODIFY"):
            item = self._process(SourceItem.model_validate(record.dynamodb.new_image))
            if item is None:
                raise ValueError("Failed to process record into DestinationItem")
            self._repository.put_item(item.model_dump())
        elif event_name and event_name.name == "REMOVE":
            plain_keys = SourceItem.model_validate(record.dynamodb.keys)
            self._repository.delete_item(plain_keys.model_dump(exclude_none=True))

__init__(repository)

Initialize the handler with a destination repository.

Parameters:

Name Type Description Default
repository Repository

The repository used to write processed items.

required
Source code in templates/stream/handler.py
25
26
27
28
29
30
31
def __init__(self, repository: Repository) -> None:
    """Initialize the handler with a destination repository.

    Args:
        repository: The repository used to write processed items.
    """
    self._repository = repository

handle_record(record)

Handle a single DynamoDB Stream record.

Processes INSERT and MODIFY events by transforming and writing the new image to the destination table. Handles REMOVE events by deleting the corresponding item from the destination table.

Parameters:

Name Type Description Default
record DynamoDBRecord

The DynamoDB Stream record to process.

required

Raises:

Type Description
ValueError

If the record cannot be transformed into a DestinationItem.

Source code in templates/stream/handler.py
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
@tracer.capture_method
def handle_record(self, record: DynamoDBRecord) -> None:
    """Handle a single DynamoDB Stream record.

    Processes INSERT and MODIFY events by transforming and writing the new image
    to the destination table. Handles REMOVE events by deleting the corresponding
    item from the destination table.

    Args:
        record: The DynamoDB Stream record to process.

    Raises:
        ValueError: If the record cannot be transformed into a `DestinationItem`.
    """
    event_name = record.event_name

    if event_name and event_name.name in ("INSERT", "MODIFY"):
        item = self._process(SourceItem.model_validate(record.dynamodb.new_image))
        if item is None:
            raise ValueError("Failed to process record into DestinationItem")
        self._repository.put_item(item.model_dump())
    elif event_name and event_name.name == "REMOVE":
        plain_keys = SourceItem.model_validate(record.dynamodb.keys)
        self._repository.delete_item(plain_keys.model_dump(exclude_none=True))

main(event, context)

Lambda entry point for the DynamoDB Streams handler.

Parameters:

Name Type Description Default
event dict

The DynamoDB Streams event containing a batch of records.

required
context LambdaContext

The Lambda execution context.

required

Returns:

Type Description
dict

A partial batch response indicating which records failed processing.

Source code in templates/stream/handler.py
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
@logger.inject_lambda_context
@tracer.capture_lambda_handler
@metrics.log_metrics
def main(event: dict, context: LambdaContext) -> dict:
    """Lambda entry point for the DynamoDB Streams handler.

    Args:
        event: The DynamoDB Streams event containing a batch of records.
        context: The Lambda execution context.

    Returns:
        A partial batch response indicating which records failed processing.
    """
    return process_partial_response(
        event=event,
        record_handler=handler.handle_record,
        processor=processor,
        context=context,
    )