Stelvio supports creating and managing Amazon SNS (Simple Notification Service) topics using the Topic component. This allows you to build pub/sub messaging patterns where messages are published to a topic and delivered to multiple subscribers.
Creating a Topic
Create a topic by instantiating the Topic component in your stlv_app.py:
Each subscription creates a separate Lambda function that receives messages published to the topic.
Lambda Configuration
Customize the Lambda function for your subscription:
# With direct optionsnotifications.subscribe("processor","functions/notify.process",memory=512,timeout=60,)# With FunctionConfigfromstelvio.aws.functionimportFunctionConfignotifications.subscribe("processor",FunctionConfig(handler="functions/notify.process",memory=512,timeout=60,))# With dictionarynotifications.subscribe("processor",{"handler":"functions/notify.process","memory":256})
Filter Policies
Use filter policies to route specific messages to specific subscribers:
notifications=Topic("notifications")# Only receive high-priority messagesnotifications.subscribe("urgent-handler","functions/urgent.process",filter_={"priority":["high","critical"]},)# Only receive order-related messagesnotifications.subscribe("order-handler","functions/orders.process",filter_={"type":["order_created","order_updated"]},)
Your subscribed Lambda receives SNS events. Here's how to process them:
importjsondefprocess(event,context):forrecordinevent.get('Records',[]):# Get the SNS messagesns_message=record['Sns']# Parse the message bodybody=json.loads(sns_message['Message'])# Access message attributes (used for filtering)attributes=sns_message.get('MessageAttributes',{})message_type=attributes.get('type',{}).get('Value')print(f"Processing {message_type}: {body}")return{"statusCode":200}
SNS → SQS → Lambda
For more control over message processing (retries, batching, dead-letter queues), use subscribe_queue() to send messages to an SQS queue, then process them with queue.subscribe(). This pattern also works with FIFO topics where direct Lambda subscription isn't supported. See Working with Queues for details.
Subscribing SQS Queues
Subscribe SQS queues to receive messages from your topic:
fromstelvio.aws.topicimportTopicfromstelvio.aws.queueimportQueuenotifications=Topic("notifications")analytics_queue=Queue("analytics")# Subscribe a Queue componentnotifications.subscribe_queue("analytics",analytics_queue)# Subscribe using a queue ARNnotifications.subscribe_queue("external","arn:aws:sqs:us-east-1:123456789012:external-queue")
When you subscribe a Queue component, Stelvio automatically creates the necessary SQS policy to allow SNS to send messages to the queue.
Queue Subscription Options
notifications.subscribe_queue("analytics",analytics_queue,filter_={"type":["analytics_event"]},# Filter policyraw_message_delivery=True,# Send raw message without SNS envelope)
Option
Default
Description
filter_
None
SNS filter policy for message filtering
raw_message_delivery
False
Send raw message body without SNS metadata wrapper
Raw Message Delivery
When raw_message_delivery=True, SNS sends just the message body to the queue without wrapping it in SNS metadata (MessageId, Timestamp, etc.). This is useful when your queue processor expects a specific message format. Note that raw message delivery only works with SQS subscriptions, not Lambda subscriptions.
FIFO Topics with SQS
FIFO topics can deliver to both FIFO and standard SQS queues, but message ordering is only preserved with FIFO queues:
Use the linking mechanism to publish messages to a topic from Lambda functions.
First, link the topic to your function in stlv_app.py:
fromstelvio.aws.topicimportTopicfromstelvio.aws.functionimportFunctionnotifications=Topic("notifications")# Link the topic so this function can publish to itpublisher=Function("publisher",handler="functions/publish.handler",links=[notifications],)
Then in your handler, use the linked topic ARN to publish:
importboto3importjsonfromstlv_resourcesimportResourcesdefhandler(event,context):sns=boto3.client('sns')# Access the linked topic ARNtopic_arn=Resources.notifications.topic_arn# Publish a messageresponse=sns.publish(TopicArn=topic_arn,Message=json.dumps({"user_id":"12345","action":"signup_completed",}),MessageAttributes={"type":{"DataType":"String","StringValue":"user_event"},"priority":{"DataType":"String","StringValue":"high"}})return{"statusCode":200,"body":"Message published!"}
Publishing to FIFO Topics
FIFO topics require a MessageGroupId:
importboto3importjsonfromstlv_resourcesimportResourcesdefhandler(event,context):sns=boto3.client('sns')topic_arn=Resources.orders.topic_arnresponse=sns.publish(TopicArn=topic_arn,Message=json.dumps({"order_id":"12345","status":"created"}),# Required for FIFO - messages with same group ID are delivered in orderMessageGroupId="order-12345",# Optional if content-based deduplication is enabled (default in Stelvio)# MessageDeduplicationId="unique-id",)return{"statusCode":200,"body":"Order event published!"}
FIFO Message Parameters
MessageGroupId (required): Messages with the same group ID are delivered in order. Use different group IDs for messages that can be processed in parallel.
MessageDeduplicationId (optional): When content-based deduplication is enabled (default in Stelvio), SNS uses a hash of the message body.
Link Properties
When you link a topic to a Lambda function, these properties are available:
Property
Description
topic_arn
The topic ARN
topic_name
The topic name
Link Permissions
Linked Lambda functions receive this SNS permission:
sns:Publish - Publish messages to the topic
Fanout Pattern
A common pattern is to use SNS topics to fan out messages to multiple queues:
fromstelvio.aws.topicimportTopicfromstelvio.aws.queueimportQueue# Create topic and queuesorders=Topic("orders")email_queue=Queue("order-emails")analytics_queue=Queue("order-analytics")inventory_queue=Queue("inventory-updates")# Fan out to multiple queuesorders.subscribe_queue("email",email_queue)orders.subscribe_queue("analytics",analytics_queue)orders.subscribe_queue("inventory",inventory_queue)# Each queue has its own processoremail_queue.subscribe("sender","functions/email.send_order_confirmation")analytics_queue.subscribe("tracker","functions/analytics.track_order")inventory_queue.subscribe("updater","functions/inventory.update_stock")
With this pattern, a single order event triggers three independent processors. Each queue can scale separately, fail independently, and be updated without affecting the others.
Customization
The Topic component supports the customize parameter to override underlying Pulumi resource properties. For an overview of how customization works, see the Customization guide.