Skip to content

Commit

Permalink
CMR-10164: Adding subscription worker to process granule subscription…
Browse files Browse the repository at this point in the history
…s. (#2193)

* CMR-10164: Adding subscription worker to process granule subscriptions.

* CMR-10164: bamboo trial

* CMR-10164: bamboo trial

* CMR-10164: bamboo trial

* CMR-10164: bamboo trial

* CMR-10164: Finishing Subscription Worker

* CMR-10164: Fixing Dockerfile

* CMR-10164: splitting out class to its own file.

* CMR-10164: Fixing PR comments.

* CMR-10164: Fixing PR comments.

* CMR-10164: Fixing PR comments.
  • Loading branch information
eereiter authored Nov 14, 2024
1 parent c794f37 commit 34103b8
Show file tree
Hide file tree
Showing 8 changed files with 198 additions and 0 deletions.
29 changes: 29 additions & 0 deletions subscription/Dockerfile
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
# Use python image
FROM python:3.13-slim
ARG AWS_REGION
ARG QUEUE_URL
ARG DEAD_LETTER_QUEUE_URL
ARG SNS_NAME
ARG SUB_DEAD_LETTER_QUEUE_URL

#Set environment variables
ENV AWS_REGION=$AWS_REGION
ENV QUEUE_URL=$QUEUE_URL
ENV DEAD_LETTER_QUEUE_URL=$DEAD_LETTER_QUEUE_URL
ENV LONG_POLL_TIME 10
ENV SNS_NAME=$SNS_NAME
ENV SUB_DEAD_LETTER_QUEUE_URL=$SUB_DEAD_LETTER_QUEUE_URL

#Set working directory
WORKDIR /app

#Copy the application files
COPY *.py .

#Install the required packages
RUN pip3 install boto3 Flask

#EXPOSE 8089
# Command to run the application
CMD ["python", "subscription_worker.py"]

12 changes: 12 additions & 0 deletions subscription/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
# subscription

This project contains the subscription-worker python tool. This tool reads messages from an AWS SQS queue, checks to see if the user has permission to view these records, and then publishes them to the cmr-subscription-{env} topic, so that external users can receive messages to their subscriptions.

## To build and deploy this project manually for AWS - this is not for local development.

Cd into the subscription project.
You will need to make sure that Docker is up and running.
Buid the project: docker build -t {AWS Repository}/cmr-subscription-worker-{env}:latest .
Log in docker to the AWS repository: aws ecr get-login-password --region us-east-1 | docker login --username AWS --password-stdin {AWS Repository}
Using docker to push the deployment artifact: docker push {AWS Repository}/cmr-subscription-worker-{env}:latest
For the ECS to update the service: aws ecs update-service --force-new-deployment --service subscription-worker-sit --cluster cmr-service-sit
1 change: 1 addition & 0 deletions subscription/build.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
zip deployment_package.zip subscription_worker.py sns.py part1_docker part_docker
3 changes: 3 additions & 0 deletions subscription/part1_docker
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
# Use python image
FROM python:3.13-slim

13 changes: 13 additions & 0 deletions subscription/part_docker
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@

#Set working directory
WORKDIR /app

#Copy the application files
COPY *.py .

#Install the required packages
RUN pip3 install boto3 Flask

# Command to run the application
CMD ["python", "subscription_worker.py"]

4 changes: 4 additions & 0 deletions subscription/run-tests-cicd.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
#!/bin/bash

pip3 install boto3 Flask
python3 -m unittest -v
47 changes: 47 additions & 0 deletions subscription/sns.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
import boto3
import json
from sys import stdout
from botocore.exceptions import ClientError

class Sns:
"""Encapsulates AWS SNS topics."""

def __init__(self, sns_resource):
""":param sns_resource: A Boto3 AWS SNS resource."""
self.sns_resource = sns_resource

def create_topic(self, topic_name):
"""The topic is already created, but this is best practice to get the topic client
from AWS."""
try:
topic = self.sns_resource.create_topic(Name=topic_name)
except ClientError as error:
print("Could not get the topic ARN: {error}.")
stdout.flush()
raise error
else:
return topic

@staticmethod
def publish_message(topic, message):
""" Publishes a message with attributes to the CMR external topic. Subscriptions
can be filtered based on the message attributes. """
message_body_str = message["Body"]
message_body = json.loads(message_body_str)
message_subject = message_body["Subject"]
message_attributes = message_body["MessageAttributes"]
message_message = message_body["Message"]
try:
if message_attributes:
att_dict = {}
for key in message_attributes.keys():
att_dict[key] = {"DataType": "String", "StringValue": message_attributes[key]["Value"]}
response = topic.publish(Subject=message_subject, Message=message_message, MessageAttributes=att_dict)
else:
response = topic.publish(Subject=message_subject, Message=message_message)
except ClientError as error:
print(f"Could not publish message to topic {topic}. {error}")
stdout.flush()
raise error
else:
return response
89 changes: 89 additions & 0 deletions subscription/subscription_worker.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,89 @@
import boto3
import multiprocessing
import os
from flask import Flask, jsonify
from sns import Sns
from sys import stdout
from botocore.exceptions import ClientError

AWS_REGION = os.getenv("AWS_REGION")
QUEUE_URL = os.getenv("QUEUE_URL")
DEAD_LETTER_QUEUE_URL = os.getenv("DEAD_LETTER_QUEUE_URL")
SUB_DEAD_LETTER_QUEUE_URL = os.getenv("SUB_DEAD_LETTER_QUEUE_URL")
LONG_POLL_TIME = os.getenv("LONG_POLL_TIME")
SNS_NAME = os.getenv("SNS_NAME")

def receive_message(sqs_client, queue_url):
response = sqs_client.receive_message(
QueueUrl=queue_url,
MaxNumberOfMessages=1,
# Long Polling
WaitTimeSeconds=(int (LONG_POLL_TIME)))

if len(response.get('Messages', [])) > 0:
print(f"Number of messages received: {len(response.get('Messages', []))}")
stdout.flush()
return response

def delete_message(sqs_client, queue_url, receipt_handle):
sqs_client.delete_message(QueueUrl=queue_url, ReceiptHandle=receipt_handle)

def delete_messages(sqs_client, queue_url, messages):
for message in messages.get("Messages", []):
receipt_handle = message['ReceiptHandle']
delete_message(sqs_client=sqs_client, queue_url=queue_url, receipt_handle=receipt_handle)

def process_messages(topic, messages):
for message in messages.get("Messages", []):
sns_client.publish_message(topic, message)

def poll_queue(running):
""" Poll the SQS queue and process messages. """
while running.value:
try:
# Poll the SQS
messages = receive_message(sqs_client=sqs_client, queue_url=QUEUE_URL)

if messages:
process_messages(topic=topic, messages=messages)
delete_messages(sqs_client=sqs_client, queue_url=QUEUE_URL, messages=messages)

dl_messages = receive_message(sqs_client=sqs_client, queue_url=DEAD_LETTER_QUEUE_URL)
if dl_messages:
process_messages(topic=topic, messages=dl_messages)
delete_messages(sqs_client=sqs_client, queue_url=DEAD_LETTER_QUEUE_URL, messages=dl_messages)

except Exception as e:
print(f"An error occurred receiving or deleting messages: {e}")
stdout.flush()

app = Flask(__name__)
@app.route('/shutdown', methods=['POST'])
def shutdown():
""" Gracefully shutdown the polling process."""

#Set the shared variable to False, shutting down the process
running.value = False
return jsonify({'status': 'shutting down'})

sqs_client = boto3.client("sqs", region_name=AWS_REGION)
sns_resource = boto3.resource("sns")
sns_client = Sns(sns_resource)
topic = sns_client.create_topic(SNS_NAME)

#Shared boolean value for process communication
running = multiprocessing.Value('b',True)

if __name__ == "__main__":
print("Starting to poll the SQS queue...")
# Start the polling process
poll_process = multiprocessing.Process(target=poll_queue, args=(running,))
poll_process.start()

# Start the Flask app in the main process
# Expose the app on all interfaces
app.run(host='0.0.0.0', port=5000)

# Wait for the polling process to finish before exiting
poll_process.join()
print("Exited polling loop.")

0 comments on commit 34103b8

Please sign in to comment.