Stelvio supports creating and managing Amazon SQS (Simple Queue Service) queues using the Queue component. This allows you to build decoupled, event-driven architectures with reliable message delivery.
Creating a Queue
Create a queue by instantiating the Queue component in your stlv_app.py:
fromstelvio.aws.queueimportQueuefromstelvio.aws.functionimportFunction@app.rundefrun()->None:# Create a standard queueorders_queue=Queue("orders")# Link it to a functionorder_processor=Function("process-orders",handler="functions/orders.handler",links=[orders_queue],)
Queue Configuration
Configure your queue with custom settings:
fromstelvio.aws.queueimportQueue,QueueConfig# Using keyword argumentsorders_queue=Queue("orders",delay=5,# Delay delivery by 5 secondsvisibility_timeout=60,# Message hidden for 60 seconds after read)# Or using QueueConfigorders_queue=Queue("orders",config=QueueConfig(delay=5,visibility_timeout=60,))
Configuration Options
Option
Default
Description
fifo
False
Enable FIFO (First-In-First-Out) queue ordering
delay
0
Default delay (in seconds) before messages become visible
visibility_timeout
30
Time (in seconds) a message is hidden after being read
retention
345600
Message retention period in seconds (default: 4 days)
dlq
None
Dead-letter queue configuration
FIFO Queues
FIFO queues guarantee exactly-once processing and preserve message order:
orders_queue=Queue("orders",fifo=True)
When you create a FIFO queue, Stelvio automatically:
AWS requires FIFO queue names to end with .fifo. Stelvio handles this automatically when you set fifo=True.
FIFO Throughput
FIFO queues have lower throughput than standard queues (300 messages/second without batching, 3,000 with high-throughput mode). Use standard queues when message order isn't critical.
Dead-Letter Queues
Configure a dead-letter queue (DLQ) to capture messages that fail processing:
fromstelvio.aws.queueimportQueue,DlqConfig# First, create the DLQorders_dlq=Queue("orders-dlq")# Reference DLQ by Queue componentorders_queue=Queue("orders",dlq=DlqConfig(queue=orders_dlq))# Custom retry count (default is 3)orders_queue=Queue("orders",dlq=DlqConfig(queue=orders_dlq,retry=5))# Using dictionary syntaxorders_queue=Queue("orders",dlq={"queue":orders_dlq,"retry":5})
DLQ Best Practices
Always configure a DLQ for production queues to capture failed messages
Set up alerts on your DLQ to detect processing failures
Choose retry counts based on your use case (typically 3-5 retries)
Queue Subscriptions
Subscribe Lambda functions to process messages from your queue:
orders_queue=Queue("orders")# Simple subscriptionorders_queue.subscribe("processor","functions/orders.process")# Multiple subscriptions with different namesorders_queue.subscribe("analytics","functions/analytics.track_order")
Each subscription creates a separate Lambda function, so you can subscribe the same handler multiple times with different configurations:
orders_queue=Queue("orders")# Same handler, different batch sizes for different throughput needsorders_queue.subscribe("fast-processor","functions/orders.process",batch_size=1)orders_queue.subscribe("batch-processor","functions/orders.process",batch_size=100)
Lambda Configuration
Customize the Lambda function for your subscription:
# With direct optionsorders_queue.subscribe("processor","functions/orders.process",memory=512,timeout=60,)# With FunctionConfigfromstelvio.aws.functionimportFunctionConfigorders_queue.subscribe("processor",FunctionConfig(handler="functions/orders.process",memory=512,timeout=60,))# With dictionaryorders_queue.subscribe("processor",{"handler":"functions/orders.process","memory":256})
Batch Size
Control how many messages Lambda receives per invocation:
orders_queue.subscribe("batch-processor","functions/orders.process",batch_size=5,# Process 5 messages at a time (default: 10))
Choosing Batch Size
Smaller batches (1-5): Lower latency, faster processing of individual messages
Larger batches (10+): Higher throughput, more efficient for high-volume queues
Consider your Lambda timeout when choosing batch size
Message Filtering
Filter messages before they reach your Lambda function to reduce costs and improve efficiency. SQS uses AWS EventBridge filter patterns to match messages.
# Filter by order type in message bodyorders_queue.subscribe("process-refunds","functions/refunds.handler",filters=[{"body":{"orderType":["refund"]}}])# Filter high-priority customer orders (AND logic within a filter)orders_queue.subscribe("process-vip-orders","functions/vip.handler",filters=[{"body":{"customerTier":["gold","platinum"],"priority":["high"]}}])# Multiple filters with OR logic (process returns OR cancellations)orders_queue.subscribe("process-exceptions","functions/exceptions.handler",filters=[{"body":{"orderType":["return"]}},{"body":{"orderType":["cancellation"]}}])# Filter by message attributesorders_queue.subscribe("process-regional","functions/regional.handler",filters=[{"messageAttributes":{"region":["us-west-2","us-east-1"]}}])
Filter Syntax
Filters can match against different parts of an SQS message:
Field
Description
Example
body
Match fields in the JSON message body
{"body": {"orderType": ["refund"]}}
attributes
Match SQS system attributes (e.g., SentTimestamp)
{"attributes": {"SentTimestamp": [...]}}
messageAttributes
Match custom message attributes you set when sending
{"messageAttributes": {"priority": ["high"]}}
Filter Logic
Within a filter (multiple fields): All conditions must match (AND logic)
Multiple filters (list items): Any filter can match (OR logic)
Within a field (array values): Any value can match (OR logic)
Filter Limits
AWS limits EventSourceMapping filters to a maximum of 5 filters per subscription. Stelvio validates this at runtime.
Stelvio automatically configures the necessary IAM permissions for queue subscriptions:
EventSourceMapping: Connects the SQS queue to your Lambda function
SQS IAM permissions: Grants read access (sqs:ReceiveMessage, sqs:DeleteMessage, sqs:GetQueueAttributes)
Sending Messages
Use the linking mechanism to send messages to your queue from Lambda functions:
importboto3importjsonfromstlv_resourcesimportResourcesdefhandler(event,context):sqs=boto3.client('sqs')# Access the linked queue URLqueue_url=Resources.orders.queue_url# Send a messageresponse=sqs.send_message(QueueUrl=queue_url,MessageBody=json.dumps({"order_id":"12345","customer":"john@example.com","items":[{"sku":"WIDGET-001","qty":2}]}))return{"statusCode":200,"body":"Message sent!"}
Sending to FIFO Queues
FIFO queues require additional parameters when sending messages:
importboto3importjsonfromstlv_resourcesimportResourcesdefhandler(event,context):sqs=boto3.client('sqs')queue_url=Resources.orders.queue_urlresponse=sqs.send_message(QueueUrl=queue_url,MessageBody=json.dumps({"order_id":"12345"}),# Required for FIFO queues - messages with same group ID are processed in orderMessageGroupId="order-processing",# Optional if content-based deduplication is enabled (Stelvio enables this by default)# MessageDeduplicationId="unique-id-12345",)return{"statusCode":200,"body":"Message sent!"}
FIFO Message Parameters
MessageGroupId (required): Messages with the same group ID are processed in order. Use different group IDs for messages that can be processed in parallel.
MessageDeduplicationId (optional): When content-based deduplication is enabled (default in Stelvio), SQS uses a hash of the message body. Provide this explicitly if you need custom deduplication logic.
Link Properties
When you link a queue to a Lambda function, these properties are available:
Property
Description
queue_url
The queue URL for sending messages
queue_arn
The queue ARN
queue_name
The queue name
Link Permissions
Linked Lambda functions receive these SQS permissions for sending messages:
sqs:SendMessage - Send messages to the queue
sqs:GetQueueAttributes - Read queue metadata
Receiving Messages
For processing messages from a queue, use queue.subscribe() instead of linking.
Subscriptions automatically configure the necessary permissions (sqs:ReceiveMessage,
sqs:DeleteMessage, sqs:GetQueueAttributes) for the Lambda event source mapping.
Processing Messages
Your Lambda function receives SQS events with batched messages:
importjsondefprocess(event,context):"""Process SQS messages."""forrecordinevent.get('Records',[]):# Parse the message bodybody=json.loads(record['body'])order_id=body.get('order_id')customer=body.get('customer')print(f"Processing order {order_id} for {customer}")# Process the order...# Return success - SQS will delete processed messagesreturn{"statusCode":200}
Error Handling
If your Lambda raises an exception, SQS will retry the message after the visibility timeout
Successfully processed messages are automatically deleted
Failed messages eventually move to the DLQ (if configured)
Linking vs Subscriptions
Use Case
Approach
Process queue messages
Use queue.subscribe() - creates Lambda triggered by queue
Send messages to queue
Use links=[queue] - grants permissions to send messages
Pipeline (read → write)
Subscribe to one queue, link to another for forwarding
A common pattern is to combine subscriptions and linking to create multi-stage processing pipelines.
With this pattern, one Lambda function subscribes to a queue to process incoming messages, then forwards transformed data to a second queue.
Another Lambda function subscribes to that second queue to handle the next stage of processing.
Here is an example:
# Example: Multi-stage pipeline with two Lambda functionsorders_queue=Queue("orders")fulfillment_queue=Queue("fulfillment")# Lambda #1: Process incoming orders and forward to fulfillmentorders_queue.subscribe("process-orders","functions/orders.process",links=[fulfillment_queue],# Grant permission to send to fulfillment queue)# Lambda #2: Process fulfillment tasksfulfillment_queue.subscribe("fulfill-orders","functions/fulfillment.fulfill",)
This creates a two-stage processing pipeline:
Order Processing Lambda (functions/orders.process):
Handles order fulfillment (shipping, inventory, etc.)
Customization
The Queue component supports the customize parameter to override underlying Pulumi resource properties. For an overview of how customization works, see the Customization guide.