Skip to content

SQS

Handler

Processes SQS messages and stores results in a DynamoDB table.

Source code in templates/sqs/handler.py
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
class Handler:
    """Processes SQS messages and stores results in a DynamoDB table."""

    def __init__(self, repository: Repository) -> None:
        """Initialize the handler with a repository.

        Args:
            repository: The repository used to store processed items.
        """
        self._repository = repository

    @tracer.capture_method
    def handle_record(self, record: SQSRecord) -> None:
        """Handle a single SQS record.

        Args:
            record: The SQS record to process.

        Raises:
            ValueError: If the message body cannot be parsed or processed.
        """
        try:
            message = SqsMessage.model_validate(record.json_body)
            processed = ProcessedItem(id=message.id, content=message.content, status="PROCESSED")
            self._repository.put_item(processed.model_dump())
            logger.info("Successfully processed and stored message", extra={"messageId": message.id})
        except Exception as exc:
            logger.error("Failed to process SQS record", exc_info=exc)
            raise

__init__(repository)

Initialize the handler with a repository.

Parameters:

Name Type Description Default
repository Repository

The repository used to store processed items.

required
Source code in templates/sqs/handler.py
25
26
27
28
29
30
31
def __init__(self, repository: Repository) -> None:
    """Initialize the handler with a repository.

    Args:
        repository: The repository used to store processed items.
    """
    self._repository = repository

handle_record(record)

Handle a single SQS record.

Parameters:

Name Type Description Default
record SQSRecord

The SQS record to process.

required

Raises:

Type Description
ValueError

If the message body cannot be parsed or processed.

Source code in templates/sqs/handler.py
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
@tracer.capture_method
def handle_record(self, record: SQSRecord) -> None:
    """Handle a single SQS record.

    Args:
        record: The SQS record to process.

    Raises:
        ValueError: If the message body cannot be parsed or processed.
    """
    try:
        message = SqsMessage.model_validate(record.json_body)
        processed = ProcessedItem(id=message.id, content=message.content, status="PROCESSED")
        self._repository.put_item(processed.model_dump())
        logger.info("Successfully processed and stored message", extra={"messageId": message.id})
    except Exception as exc:
        logger.error("Failed to process SQS record", exc_info=exc)
        raise

main(event, context)

Lambda entry point for the SQS-to-DynamoDB handler.

Parameters:

Name Type Description Default
event dict

The SQS event containing a batch of messages.

required
context LambdaContext

The Lambda execution context.

required

Returns:

Type Description
PartialItemFailureResponse

A partial batch response indicating which records failed processing.

Source code in templates/sqs/handler.py
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
@logger.inject_lambda_context
@tracer.capture_lambda_handler
@metrics.log_metrics
def main(event: dict, context: LambdaContext) -> PartialItemFailureResponse:
    """Lambda entry point for the SQS-to-DynamoDB handler.

    Args:
        event: The SQS event containing a batch of messages.
        context: The Lambda execution context.

    Returns:
        A partial batch response indicating which records failed processing.
    """

    return process_partial_response(
        event=event,
        record_handler=handler.handle_record,
        processor=processor,
        context=context,
    )