Skip to content

EventBridge

EventBridge event handler implementation.

Handler

Handles EventBridge events by calling an external API and persisting the response.

Source code in templates/eventbridge/handler.py
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
class Handler:
    """Handles EventBridge events by calling an external API and persisting the response."""

    def __init__(self, secret_manager: SecretManager, repository: Repository) -> None:
        """Initialize the Handler.

        Args:
            secret_manager: Manager for AWS Secrets Manager.
            repository: Repository for DynamoDB access.
        """
        self._secret_manager = secret_manager
        self._repository = repository

    @tracer.capture_method
    def handle(self, event: EventBridgeModel) -> ApiResponse:
        """Process an EventBridge event.

        Args:
            event: The parsed EventBridge event.

        Returns:
            The validated API response.
        """
        try:
            token = self._secret_manager.get(settings.secret_name)
            response = session.get(settings.api_url, headers={"Authorization": f"Bearer {token}"})
            response.raise_for_status()
            api_response = ApiResponse.model_validate_json(response.content)
            self._repository.put_item(api_response.dump())
            metrics.add_metric(name="ApiCallSuccess", unit=MetricUnit.Count, value=1)
            logger.info("API call succeeded", extra={"api_message": api_response.message})
            return api_response
        except Exception as exc:
            metrics.add_metric(name="ApiCallFailure", unit=MetricUnit.Count, value=1)
            logger.error("API call failed", exc_info=exc)
            raise

__init__(secret_manager, repository)

Initialize the Handler.

Parameters:

Name Type Description Default
secret_manager SecretManager

Manager for AWS Secrets Manager.

required
repository Repository

Repository for DynamoDB access.

required
Source code in templates/eventbridge/handler.py
35
36
37
38
39
40
41
42
43
def __init__(self, secret_manager: SecretManager, repository: Repository) -> None:
    """Initialize the Handler.

    Args:
        secret_manager: Manager for AWS Secrets Manager.
        repository: Repository for DynamoDB access.
    """
    self._secret_manager = secret_manager
    self._repository = repository

handle(event)

Process an EventBridge event.

Parameters:

Name Type Description Default
event EventBridgeModel

The parsed EventBridge event.

required

Returns:

Type Description
ApiResponse

The validated API response.

Source code in templates/eventbridge/handler.py
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
@tracer.capture_method
def handle(self, event: EventBridgeModel) -> ApiResponse:
    """Process an EventBridge event.

    Args:
        event: The parsed EventBridge event.

    Returns:
        The validated API response.
    """
    try:
        token = self._secret_manager.get(settings.secret_name)
        response = session.get(settings.api_url, headers={"Authorization": f"Bearer {token}"})
        response.raise_for_status()
        api_response = ApiResponse.model_validate_json(response.content)
        self._repository.put_item(api_response.dump())
        metrics.add_metric(name="ApiCallSuccess", unit=MetricUnit.Count, value=1)
        logger.info("API call succeeded", extra={"api_message": api_response.message})
        return api_response
    except Exception as exc:
        metrics.add_metric(name="ApiCallFailure", unit=MetricUnit.Count, value=1)
        logger.error("API call failed", exc_info=exc)
        raise

main(event, context)

Lambda entry point for the EventBridge handler.

Parameters:

Name Type Description Default
event EventBridgeModel

The EventBridge event.

required
context LambdaContext

The Lambda execution context.

required
Source code in templates/eventbridge/handler.py
73
74
75
76
77
78
79
80
81
82
83
84
@logger.inject_lambda_context
@tracer.capture_lambda_handler
@metrics.log_metrics
@event_parser(model=EventBridgeModel)
def main(event: EventBridgeModel, context: LambdaContext) -> None:
    """Lambda entry point for the EventBridge handler.

    Args:
        event: The EventBridge event.
        context: The Lambda execution context.
    """
    handler.handle(event)

Data models for the EventBridge template.

ApiResponse

Bases: Entity

Represents the response from the external API.

Source code in templates/eventbridge/models.py
 8
 9
10
11
class ApiResponse(Entity):
    """Represents the response from the external API."""

    message: str = Field(description="Message returned by the external API", min_length=1, max_length=1000)

Secrets management for the EventBridge template.

SecretManager

Wrapper around SecretsProvider with configurable retries and caching.

Source code in templates/eventbridge/secrets.py
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
class SecretManager:
    """Wrapper around SecretsProvider with configurable retries and caching."""

    def __init__(self, max_retries: int = 3, max_age: int = 60) -> None:
        """Initialize the SecretManager.

        Args:
            max_retries: Maximum number of retry attempts for AWS service calls.
            max_age: Maximum age of the cached secret in seconds.
        """
        self._max_age = max_age
        config = Config(tcp_keepalive=True, retries={"max_attempts": max_retries, "mode": "standard"})
        self._provider = SecretsProvider(boto_config=config)

    def get(self, name: str) -> str:
        """Retrieve a secret by name.

        Args:
            name: The name of the secret to retrieve.

        Returns:
            The secret value as a string.
        """
        return cast(str, self._provider.get(name, max_age=self._max_age))

__init__(max_retries=3, max_age=60)

Initialize the SecretManager.

Parameters:

Name Type Description Default
max_retries int

Maximum number of retry attempts for AWS service calls.

3
max_age int

Maximum age of the cached secret in seconds.

60
Source code in templates/eventbridge/secrets.py
12
13
14
15
16
17
18
19
20
21
def __init__(self, max_retries: int = 3, max_age: int = 60) -> None:
    """Initialize the SecretManager.

    Args:
        max_retries: Maximum number of retry attempts for AWS service calls.
        max_age: Maximum age of the cached secret in seconds.
    """
    self._max_age = max_age
    config = Config(tcp_keepalive=True, retries={"max_attempts": max_retries, "mode": "standard"})
    self._provider = SecretsProvider(boto_config=config)

get(name)

Retrieve a secret by name.

Parameters:

Name Type Description Default
name str

The name of the secret to retrieve.

required

Returns:

Type Description
str

The secret value as a string.

Source code in templates/eventbridge/secrets.py
23
24
25
26
27
28
29
30
31
32
def get(self, name: str) -> str:
    """Retrieve a secret by name.

    Args:
        name: The name of the secret to retrieve.

    Returns:
        The secret value as a string.
    """
    return cast(str, self._provider.get(name, max_age=self._max_age))

HTTP session management for the EventBridge template.

ApiSession

Manages a configured requests Session with retries and connection pooling.

Source code in templates/eventbridge/session.py
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
class ApiSession:
    """Manages a configured requests Session with retries and connection pooling."""

    def __init__(
        self,
        max_retries: int = 3,
        backoff_factor: float = 0.3,
        timeout: int = 10,
        status_forcelist: list[int] | None = None,
        pool_connections: int = 10,
        pool_maxsize: int = 10,
    ) -> None:
        """Initialize the ApiSession.

        Args:
            max_retries: Maximum number of retries.
            backoff_factor: Backoff factor for retries.
            timeout: Preset timeout for requests in seconds.
            status_forcelist: List of HTTP status codes to retry on.
            pool_connections: Number of connection pools to cache.
            pool_maxsize: Maximum number of connections to save in the pool.
        """
        self._timeout = timeout
        self._session = Session()
        retry_strategy = Retry(
            total=max_retries,
            backoff_factor=backoff_factor,
            status_forcelist=status_forcelist or [429, 500, 502, 503, 504],
        )
        adapter = HTTPAdapter(
            max_retries=retry_strategy,
            pool_connections=pool_connections,
            pool_maxsize=pool_maxsize,
        )
        self._session.mount("http://", adapter)
        self._session.mount("https://", adapter)

    def get(self, url: str, **kwargs: Any) -> Response:
        """Perform a GET request with the preset timeout.

        Args:
            url: The URL to request.
            **kwargs: Additional arguments passed to the session.get call.

        Returns:
            The Response object.
        """
        kwargs.setdefault("timeout", self._timeout)
        return self._session.get(url, **kwargs)

__init__(max_retries=3, backoff_factor=0.3, timeout=10, status_forcelist=None, pool_connections=10, pool_maxsize=10)

Initialize the ApiSession.

Parameters:

Name Type Description Default
max_retries int

Maximum number of retries.

3
backoff_factor float

Backoff factor for retries.

0.3
timeout int

Preset timeout for requests in seconds.

10
status_forcelist list[int] | None

List of HTTP status codes to retry on.

None
pool_connections int

Number of connection pools to cache.

10
pool_maxsize int

Maximum number of connections to save in the pool.

10
Source code in templates/eventbridge/session.py
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
def __init__(
    self,
    max_retries: int = 3,
    backoff_factor: float = 0.3,
    timeout: int = 10,
    status_forcelist: list[int] | None = None,
    pool_connections: int = 10,
    pool_maxsize: int = 10,
) -> None:
    """Initialize the ApiSession.

    Args:
        max_retries: Maximum number of retries.
        backoff_factor: Backoff factor for retries.
        timeout: Preset timeout for requests in seconds.
        status_forcelist: List of HTTP status codes to retry on.
        pool_connections: Number of connection pools to cache.
        pool_maxsize: Maximum number of connections to save in the pool.
    """
    self._timeout = timeout
    self._session = Session()
    retry_strategy = Retry(
        total=max_retries,
        backoff_factor=backoff_factor,
        status_forcelist=status_forcelist or [429, 500, 502, 503, 504],
    )
    adapter = HTTPAdapter(
        max_retries=retry_strategy,
        pool_connections=pool_connections,
        pool_maxsize=pool_maxsize,
    )
    self._session.mount("http://", adapter)
    self._session.mount("https://", adapter)

get(url, **kwargs)

Perform a GET request with the preset timeout.

Parameters:

Name Type Description Default
url str

The URL to request.

required
**kwargs Any

Additional arguments passed to the session.get call.

{}

Returns:

Type Description
Response

The Response object.

Source code in templates/eventbridge/session.py
47
48
49
50
51
52
53
54
55
56
57
58
def get(self, url: str, **kwargs: Any) -> Response:
    """Perform a GET request with the preset timeout.

    Args:
        url: The URL to request.
        **kwargs: Additional arguments passed to the session.get call.

    Returns:
        The Response object.
    """
    kwargs.setdefault("timeout", self._timeout)
    return self._session.get(url, **kwargs)

Configuration settings for the EventBridge template.

Settings

Bases: CommonSettings

Configuration settings for the EventBridge template.

Source code in templates/eventbridge/settings.py
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
class Settings(CommonSettings):
    """Configuration settings for the EventBridge template."""

    api_url: str = Field(description="URL of the external HTTP API to call")
    api_timeout_seconds: int = Field(description="Timeout for the external API call in seconds", default=10)
    api_max_retries: int = Field(description="Maximum number of retries for the external API call", default=3)
    api_backoff_factor: float = Field(description="Backoff factor for API retries", default=0.3)

    secret_name: str = Field(description="AWS Secrets Manager secret name holding the API token")
    secret_cache_max_age: int = Field(description="Maximum age of the cached secret in seconds", default=60)
    secret_manager_max_retries: int = Field(
        description="Maximum number of retry attempts for AWS service calls", default=3
    )

    table_name: str = Field(description="DynamoDB table name for persisting API responses")
    service_name: str = "eventbridge"
    metrics_namespace: str = "EventBridge"