Skip to content

SQS Queue

Queue

Addresses all SQS interactions.

Source code in templates/queue.py
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
class Queue:
    """Addresses all SQS interactions."""

    def __init__(self, queue_url: str, region_name: str = "us-east-1") -> None:
        """Initialize the SQS client.

        Args:
            queue_url: The URL of the SQS queue.
            region_name: The region to use.
        """
        self._queue_url = queue_url
        self._client = client("sqs", region_name=region_name)

    def publish(self, message: str) -> None:
        """Publish a processed message to SQS.

        Args:
            message: The message to publish.
        """
        self._client.send_message(QueueUrl=self._queue_url, MessageBody=message)

__init__(queue_url, region_name='us-east-1')

Initialize the SQS client.

Parameters:

Name Type Description Default
queue_url str

The URL of the SQS queue.

required
region_name str

The region to use.

'us-east-1'
Source code in templates/queue.py
 7
 8
 9
10
11
12
13
14
15
def __init__(self, queue_url: str, region_name: str = "us-east-1") -> None:
    """Initialize the SQS client.

    Args:
        queue_url: The URL of the SQS queue.
        region_name: The region to use.
    """
    self._queue_url = queue_url
    self._client = client("sqs", region_name=region_name)

publish(message)

Publish a processed message to SQS.

Parameters:

Name Type Description Default
message str

The message to publish.

required
Source code in templates/queue.py
17
18
19
20
21
22
23
def publish(self, message: str) -> None:
    """Publish a processed message to SQS.

    Args:
        message: The message to publish.
    """
    self._client.send_message(QueueUrl=self._queue_url, MessageBody=message)