Skip to content

S3

handle_record(record)

Handle a single S3 event record.

Parameters:

Name Type Description Default
record S3EventRecord

The S3 event record to process.

required
Source code in templates/s3/handler.py
19
20
21
22
23
24
25
26
27
28
@tracer.capture_method
def handle_record(record: S3EventRecord) -> None:
    """Handle a single S3 event record.

    Args:
        record: The S3 event record to process.
    """
    message = {"bucket": record.s3.bucket.name, "key": record.s3.get_object.key, "event_time": record.event_time}
    queue.publish(ProcessedMessage.model_validate(message).model_dump_json(by_alias=True, exclude_none=True))
    logger.info("Processed record", extra=message)

main(event, context)

Lambda entry point for the S3-to-SQS handler.

Parameters:

Name Type Description Default
event S3Event

The S3 event containing object-creation records.

required
context LambdaContext

The Lambda execution context.

required

Returns:

Type Description
dict

A dictionary containing batchItemFailures (empty if all succeed).

Raises:

Type Description
ValueError

If the event shape is invalid.

Exception

If any record fails to ensure retry by the S3 event source.

Source code in templates/s3/handler.py
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
@logger.inject_lambda_context
@tracer.capture_lambda_handler
@metrics.log_metrics
def main(event: S3Event, context: LambdaContext) -> dict:
    """Lambda entry point for the S3-to-SQS handler.

    Args:
        event: The S3 event containing object-creation records.
        context: The Lambda execution context.

    Returns:
        A dictionary containing batchItemFailures (empty if all succeed).

    Raises:
        ValueError: If the event shape is invalid.
        Exception: If any record fails to ensure retry by the S3 event source.
    """
    processed = 0
    errors = []
    for record in event.records:
        try:
            handle_record(record)
            processed += 1
        except Exception as error:
            logger.error("Failed to process record", exc_info=error)
            metrics.add_metric(name="publish_failure", unit="Count", value=1)
            errors.append(error)

    metrics.add_metric(name="records_processed", unit="Count", value=processed)

    if errors:
        raise Exception(f"Batch processing failed with {len(errors)} errors. First error: {errors[0]}")

    return {"batchItemFailures": []}