SQS and Lambda
We will create a lambda function that is activated whenever new messages arrive in a queue.
The lambda function will receive a batch of messages and, after processing them, will send them to a new queue.
A Lambda that Sends Messages
Let's start by creating a lambda function that can write to a queue (send messages):
Important!
The DESTINATION_SQS_URL
environment variable will be set when creating the Lambda function.
For now, rest assured!
import boto3
import os
def lambda_handler(event, context):
# Create an SQS client
sqs = boto3.client("sqs")
# Define the queue URL
queue_url = os.environ.get("DESTINATION_SQS_URL")
# Define the message to send
message_body = "I can send messages to SQS!"
# Send message to the queue
response = sqs.send_message(QueueUrl=queue_url, MessageBody=message_body)
return response
Notice that the handler sends a fixed message "I can send messages to SQS!"
. Furthermore, the destination queue URL is in an environment variable.
Question 1
Question 2
To create the lambda function, use:
Atention!
Change the:
function_name
variable.DESTINATION_SQS_URL
to the SQS URL of the destination queue ("lambda_destination_queue_INSPER_USERNAME"
).
Atention!
Notice that the DESTINATION_SQS_URL
must be a new queue. If you use the same queue that triggered the function, then you function will be in INFINITE RECURSION!.
This could get very expensive!
import os
import boto3
from dotenv import load_dotenv
load_dotenv()
# Lambda function name: send_sqs_<INSPER_USERNAME>
function_name = ""
# Change the DESTINATION_SQS_URL to the SQS URL of the destination queue
environment_variables = {
"DESTINATION_SQS_URL": "xxxxxxxxxxxxx", # Update HERE
}
# Timeout in seconds. Default is 3.
timeout = 15
# Lambda basic execution role
lambda_role_arn = os.getenv("AWS_LAMBDA_ROLE_ARN")
# Create a Boto3 client for AWS Lambda
lambda_client = boto3.client(
"lambda",
aws_access_key_id=os.getenv("AWS_ACCESS_KEY_ID"),
aws_secret_access_key=os.getenv("AWS_SECRET_ACCESS_KEY"),
region_name=os.getenv("AWS_REGION"),
)
# Read the contents of the zip file that you want to deploy
with open("lambda_send_sqs.zip", "rb") as f:
zip_to_deploy = f.read()
lambda_response = lambda_client.create_function(
FunctionName=function_name,
Runtime="python3.10",
Role=lambda_role_arn,
Handler="lambda_send_sqs.lambda_handler",
Code={"ZipFile": zip_to_deploy},
Timeout=timeout,
Environment={"Variables": environment_variables}, # This is New!
)
print("Function ARN:", lambda_response["FunctionArn"])
Tip 1
If you prefer, you can also use the AWS CLI to create the Lambda function.
Question 3
A Lambda that is Triggered
Let's create a lambda function that is triggered whenever messages arrive on a origin queue. The function must process and forward these messages to the destination queue.
The handler will be:
import boto3
import os
def send_sqs_message(message):
sqs = boto3.client("sqs")
queue_url = os.environ.get("DESTINATION_SQS_URL")
response = sqs.send_message(QueueUrl=queue_url, MessageBody=message)
return response
def lambda_handler(event, context):
# Read batch of messages
for record in event["Records"]:
payload = record["body"]
# Send each message to destination queue
send_sqs_message(f"process: {payload}")
return event["Records"]
Question 4
Question 5
To create the lambda function, use:
Atention!
Change the:
function_name
variable.queue_name
to"lambda_origin_queue_YOUR_INSPER_USERNAME"
DESTINATION_SQS_URL
to the SQS URL of the destination queue ("lambda_destination_queue_INSPER_USERNAME"
).
import os
import boto3
from dotenv import load_dotenv
load_dotenv()
# Lambda function name: read_write_sqs_<INSPER_USERNAME>
function_name = ""
# Origin queue: lambda_origin_queue_INSPER_USERNAME
queue_name = ""
# Destination queue URL
environment_variables = {
"DESTINATION_SQS_URL": "",
}
# Timeout in seconds. Default is 3.
timeout = 15
# Lambda basic execution role
lambda_role_arn = os.getenv("AWS_LAMBDA_ROLE_ARN")
# Create a Boto3 client for AWS Lambda
lambda_client = boto3.client(
"lambda",
aws_access_key_id=os.getenv("AWS_ACCESS_KEY_ID"),
aws_secret_access_key=os.getenv("AWS_SECRET_ACCESS_KEY"),
region_name=os.getenv("AWS_REGION"),
)
# Read the contents of the zip file that you want to deploy
with open("read_write.zip", "rb") as f:
zip_to_deploy = f.read()
lambda_response = lambda_client.create_function(
FunctionName=function_name,
Runtime="python3.10",
Role=lambda_role_arn,
Handler="read_write.lambda_handler",
Code={"ZipFile": zip_to_deploy},
Timeout=timeout,
Environment={"Variables": environment_variables},
)
function_arn = lambda_response["FunctionArn"]
print(f"Function ARN: {function_arn}")
event_source_arn = (
f'arn:aws:sqs:{os.getenv("AWS_REGION")}:{os.getenv("AWS_ACCOUNT_ID")}:{queue_name}'
)
# Configure the function's event source mapping with the SQS queue
response = lambda_client.create_event_source_mapping(
EventSourceArn=event_source_arn,
FunctionName=function_arn,
BatchSize=2, # Number of messages to retrieve per batch (optional)
)
print("Lambda function created and configured with SQS event source mapping.")
Important!
Notice that, at the end of the code, an event was created that will associate the sending of messages to the origin queue with the triggering of the lamdba function.
Question 6
Question 7
Ready! We have a lambda function that receives and sends messages to queues!
Task
Suppose that a queue receives JSONs containing messages of product reviews:
Question! 8