Skip to content

Working with Queues in Stelvio

Stelvio supports creating and managing Amazon SQS (Simple Queue Service) queues using the Queue component. This allows you to build decoupled, event-driven architectures with reliable message delivery.

Creating a Queue

Create a queue by instantiating the Queue component in your stlv_app.py:

from stelvio.aws.queue import Queue
from stelvio.aws.function import Function

@app.run
def run() -> None:
    # Create a standard queue
    orders_queue = Queue("orders")

    # Link it to a function
    order_processor = Function(
        "process-orders",
        handler="functions/orders.handler",
        links=[orders_queue],
    )

Queue Configuration

Configure your queue with custom settings:

from stelvio.aws.queue import Queue, QueueConfig

# Using keyword arguments
orders_queue = Queue(
    "orders",
    delay=5,                  # Delay delivery by 5 seconds
    visibility_timeout=60,    # Message hidden for 60 seconds after read
)

# Or using QueueConfig
orders_queue = Queue(
    "orders",
    config=QueueConfig(
        delay=5,
        visibility_timeout=60,
    )
)

Configuration Options

Option Default Description
fifo False Enable FIFO (First-In-First-Out) queue ordering
delay 0 Default delay (in seconds) before messages become visible
visibility_timeout 30 Time (in seconds) a message is hidden after being read
retention 345600 Message retention period in seconds (default: 4 days)
dlq None Dead-letter queue configuration

FIFO Queues

FIFO queues guarantee exactly-once processing and preserve message order:

orders_queue = Queue("orders", fifo=True)

When you create a FIFO queue, Stelvio automatically:

  • Adds the .fifo suffix to the queue name (required by AWS)
  • Enables content-based deduplication

FIFO Queue Naming

AWS requires FIFO queue names to end with .fifo. Stelvio handles this automatically when you set fifo=True.

FIFO Throughput

FIFO queues have lower throughput than standard queues (300 messages/second without batching, 3,000 with high-throughput mode). Use standard queues when message order isn't critical.

Dead-Letter Queues

Configure a dead-letter queue (DLQ) to capture messages that fail processing:

from stelvio.aws.queue import Queue, DlqConfig

# First, create the DLQ
orders_dlq = Queue("orders-dlq")

# Reference DLQ by Queue component
orders_queue = Queue("orders", dlq=DlqConfig(queue=orders_dlq))

# Custom retry count (default is 3)
orders_queue = Queue(
    "orders",
    dlq=DlqConfig(queue=orders_dlq, retry=5)
)

# Using dictionary syntax
orders_queue = Queue(
    "orders",
    dlq={"queue": orders_dlq, "retry": 5}
)

DLQ Best Practices

  • Always configure a DLQ for production queues to capture failed messages
  • Set up alerts on your DLQ to detect processing failures
  • Choose retry counts based on your use case (typically 3-5 retries)

Queue Subscriptions

Subscribe Lambda functions to process messages from your queue:

orders_queue = Queue("orders")

# Simple subscription
orders_queue.subscribe("processor", "functions/orders.process")

# Multiple subscriptions with different names
orders_queue.subscribe("analytics", "functions/analytics.track_order")

Each subscription creates a separate Lambda function, so you can subscribe the same handler multiple times with different configurations:

orders_queue = Queue("orders")

# Same handler, different batch sizes for different throughput needs
orders_queue.subscribe("fast-processor", "functions/orders.process", batch_size=1)
orders_queue.subscribe("batch-processor", "functions/orders.process", batch_size=100)

Lambda Configuration

Customize the Lambda function for your subscription:

# With direct options
orders_queue.subscribe(
    "processor",
    "functions/orders.process",
    memory=512,
    timeout=60,
)

# With FunctionConfig
from stelvio.aws.function import FunctionConfig

orders_queue.subscribe(
    "processor",
    FunctionConfig(
        handler="functions/orders.process",
        memory=512,
        timeout=60,
    )
)

# With dictionary
orders_queue.subscribe(
    "processor",
    {"handler": "functions/orders.process", "memory": 256}
)

Batch Size

Control how many messages Lambda receives per invocation:

orders_queue.subscribe(
    "batch-processor",
    "functions/orders.process",
    batch_size=5,  # Process 5 messages at a time (default: 10)
)

Choosing Batch Size

  • Smaller batches (1-5): Lower latency, faster processing of individual messages
  • Larger batches (10+): Higher throughput, more efficient for high-volume queues
  • Consider your Lambda timeout when choosing batch size

Message Filtering

Filter messages before they reach your Lambda function to reduce costs and improve efficiency. SQS uses AWS EventBridge filter patterns to match messages.

# Filter by order type in message body
orders_queue.subscribe(
    "process-refunds",
    "functions/refunds.handler",
    filters=[{"body": {"orderType": ["refund"]}}]
)

# Filter high-priority customer orders (AND logic within a filter)
orders_queue.subscribe(
    "process-vip-orders",
    "functions/vip.handler",
    filters=[{
        "body": {
            "customerTier": ["gold", "platinum"],
            "priority": ["high"]
        }
    }]
)

# Multiple filters with OR logic (process returns OR cancellations)
orders_queue.subscribe(
    "process-exceptions",
    "functions/exceptions.handler",
    filters=[
        {"body": {"orderType": ["return"]}},
        {"body": {"orderType": ["cancellation"]}}
    ]
)

# Filter by message attributes
orders_queue.subscribe(
    "process-regional",
    "functions/regional.handler",
    filters=[{
        "messageAttributes": {
            "region": ["us-west-2", "us-east-1"]
        }
    }]
)

Filter Syntax

Filters can match against different parts of an SQS message:

Field Description Example
body Match fields in the JSON message body {"body": {"orderType": ["refund"]}}
attributes Match SQS system attributes (e.g., SentTimestamp) {"attributes": {"SentTimestamp": [...]}}
messageAttributes Match custom message attributes you set when sending {"messageAttributes": {"priority": ["high"]}}

Filter Logic

  • Within a filter (multiple fields): All conditions must match (AND logic)
  • Multiple filters (list items): Any filter can match (OR logic)
  • Within a field (array values): Any value can match (OR logic)

Filter Limits

AWS limits EventSourceMapping filters to a maximum of 5 filters per subscription. Stelvio validates this at runtime.

For detailed filter pattern syntax and advanced matching rules, see the AWS EventBridge filtering documentation.

Subscription Permissions

Stelvio automatically configures the necessary IAM permissions for queue subscriptions:

  • EventSourceMapping: Connects the SQS queue to your Lambda function
  • SQS IAM permissions: Grants read access (sqs:ReceiveMessage, sqs:DeleteMessage, sqs:GetQueueAttributes)

Sending Messages

Use the linking mechanism to send messages to your queue from Lambda functions:

import boto3
import json
from stlv_resources import Resources

def handler(event, context):
    sqs = boto3.client('sqs')

    # Access the linked queue URL
    queue_url = Resources.orders.queue_url

    # Send a message
    response = sqs.send_message(
        QueueUrl=queue_url,
        MessageBody=json.dumps({
            "order_id": "12345",
            "customer": "john@example.com",
            "items": [{"sku": "WIDGET-001", "qty": 2}]
        })
    )

    return {"statusCode": 200, "body": "Message sent!"}

Sending to FIFO Queues

FIFO queues require additional parameters when sending messages:

import boto3
import json
from stlv_resources import Resources

def handler(event, context):
    sqs = boto3.client('sqs')

    queue_url = Resources.orders.queue_url

    response = sqs.send_message(
        QueueUrl=queue_url,
        MessageBody=json.dumps({"order_id": "12345"}),
        # Required for FIFO queues - messages with same group ID are processed in order
        MessageGroupId="order-processing",
        # Optional if content-based deduplication is enabled (Stelvio enables this by default)
        # MessageDeduplicationId="unique-id-12345",
    )

    return {"statusCode": 200, "body": "Message sent!"}

FIFO Message Parameters

  • MessageGroupId (required): Messages with the same group ID are processed in order. Use different group IDs for messages that can be processed in parallel.
  • MessageDeduplicationId (optional): When content-based deduplication is enabled (default in Stelvio), SQS uses a hash of the message body. Provide this explicitly if you need custom deduplication logic.

When you link a queue to a Lambda function, these properties are available:

Property Description
queue_url The queue URL for sending messages
queue_arn The queue ARN
queue_name The queue name

Linked Lambda functions receive these SQS permissions for sending messages:

  • sqs:SendMessage - Send messages to the queue
  • sqs:GetQueueAttributes - Read queue metadata

Receiving Messages

For processing messages from a queue, use queue.subscribe() instead of linking. Subscriptions automatically configure the necessary permissions (sqs:ReceiveMessage, sqs:DeleteMessage, sqs:GetQueueAttributes) for the Lambda event source mapping.

Processing Messages

Your Lambda function receives SQS events with batched messages:

import json

def process(event, context):
    """Process SQS messages."""
    for record in event.get('Records', []):
        # Parse the message body
        body = json.loads(record['body'])

        order_id = body.get('order_id')
        customer = body.get('customer')

        print(f"Processing order {order_id} for {customer}")

        # Process the order...

    # Return success - SQS will delete processed messages
    return {"statusCode": 200}

Error Handling

  • If your Lambda raises an exception, SQS will retry the message after the visibility timeout
  • Successfully processed messages are automatically deleted
  • Failed messages eventually move to the DLQ (if configured)

Linking vs Subscriptions

Use Case Approach
Process queue messages Use queue.subscribe() - creates Lambda triggered by queue
Send messages to queue Use links=[queue] - grants permissions to send messages
Pipeline (read → write) Subscribe to one queue, link to another for forwarding

A common pattern is to combine subscriptions and linking to create multi-stage processing pipelines. With this pattern, one Lambda function subscribes to a queue to process incoming messages, then forwards transformed data to a second queue. Another Lambda function subscribes to that second queue to handle the next stage of processing.

Here is an example:

# Example: Multi-stage pipeline with two Lambda functions
orders_queue = Queue("orders")
fulfillment_queue = Queue("fulfillment")

# Lambda #1: Process incoming orders and forward to fulfillment
orders_queue.subscribe(
    "process-orders",
    "functions/orders.process",
    links=[fulfillment_queue],  # Grant permission to send to fulfillment queue
)

# Lambda #2: Process fulfillment tasks
fulfillment_queue.subscribe(
    "fulfill-orders",
    "functions/fulfillment.fulfill",
)

This creates a two-stage processing pipeline:

  1. Order Processing Lambda (functions/orders.process):
  2. Triggered by messages in orders_queue
  3. Validates and processes incoming orders
  4. Sends fulfillment tasks to fulfillment_queue

  5. Fulfillment Lambda (functions/fulfillment.fulfill):

  6. Triggered by messages in fulfillment_queue
  7. Handles order fulfillment (shipping, inventory, etc.)

Customization

The Queue component supports the customize parameter to override underlying Pulumi resource properties. For an overview of how customization works, see the Customization guide.

Resource Keys

Resource Key Pulumi Args Type Description
queue QueueArgs The SQS queue

Example

queue = Queue(
    "my-queue",
    customize={
        "queue": {
            "kms_master_key_id": "alias/my-key",
        }
    }
)

Next Steps

Now that you understand SQS queues, you might want to explore: