diff --git a/ANSWERS.md b/ANSWERS.md new file mode 100644 index 0000000..c9b6313 --- /dev/null +++ b/ANSWERS.md @@ -0,0 +1,18 @@ +Q1: Please explain what is the advantage of using SQS in this solution. +A1: +- Dont need to invest on setup, maintence & reliability of delivery of messages. +- Easy integration with other AWS services else need to maintain a library. +- Scalable in different peaks. + +Q2: Compare SQS to a message broker you have used before. What are the differences? [RabbitMQ/Kafka] +A2: +- RabbitMQ/Kafka is more appropiate where Organisation has product distributed across multiClouds like banking or multi teants apps. +- Kafka is alot faster then SQS/Pubsub. +- For very big payloads, SQS uses s3 bucket which will another cost expense. + +Q3: If we run multiple instances of this tool, what prevents a message from processed twice? +A3: +- message_id column in mysql db is a primary key and hence if we try to add same message_id again it will raise and concern. +- From code also, we are first checking if message_id is present or not and if not present then only insert. + +Q4: In very rough terms, can you suggest an alternative solution aside from using SQS from your previous experience using different technologies? diff --git a/DOCUMENTATION.md b/DOCUMENTATION.md new file mode 100644 index 0000000..4948dfe --- /dev/null +++ b/DOCUMENTATION.md @@ -0,0 +1,23 @@ +Tool introduction/explanation +- This a easy tool to produce & consume messages and persist consumed messge to database. +- MYSQL 5.7 DB is configured. Default db name - sqs & table is sqs_messages +How to build the tool and build requirements +- cd solution && docker-compose up --build + +How to configure the environment (if necessary) +- NA + +How to run the tool +- All 3 containers are up & running (check_python_container_1, check_localstack_1, check_db_1) +- To produce new message + + docker exec -it aws_sqs_app python3 /opt/generator.py +- To consume existing messages + + docker exec -it aws_sqs_app python3 /opt/msg_consumer_app.py consume --count 40 +- To fetch all messages from DB + + docker exec -it aws_sqs_app python3 /opt/msg_consumer_app.py show +- To clean all messages from DB + + docker exec -it aws_sqs_app python3 /opt/msg_consumer_app.py clear + +Challenges while solving the problem +- I have elementry knowledge of AWS cloud (Mostly worked on GCP/Azure),thereforeunderstanding of Boto3 and SQS is very limited. So it was kind of challenges to implement solution while learning. +- localStack - Its a really nice tool to play around with AWS APIs and intially it took some time for me to understand what is it and how to use it. While trying It was asking AWS credentails and hence need to read article to solve that issue by adding summy AWS credentails. \ No newline at end of file diff --git a/docker-compose.yml b/docker-compose.yml index d4f485f..59aa697 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -1,7 +1,20 @@ version: '2.2' services: - + db: + image: mysql:5.7 + restart: always + environment: + MYSQL_DATABASE: 'db' + MYSQL_USER: 'root' + MYSQL_PASSWORD: 'root123' + MYSQL_ROOT_PASSWORD: 'root123' + ports: + - '3306:3306' + expose: + - '3306' + volumes: + - my-db:/var/lib/mysql interview-localstack: image: localstack/localstack ports: @@ -14,4 +27,5 @@ services: - DOCKER_HOST=unix:///var/run/docker.sock volumes: - "/var/run/docker.sock:/var/run/docker.sock" - +volumes: + my-db: diff --git a/solution/docker-compose.yml b/solution/docker-compose.yml new file mode 100644 index 0000000..94aee34 --- /dev/null +++ b/solution/docker-compose.yml @@ -0,0 +1,41 @@ +version: '2.2' + +services: + db: + container_name: mysql + image: mysql:5.7 + restart: always + environment: + MYSQL_ROOT_PASSWORD: 'BAoMDUdhcmFudGlh' + ports: + - '3306:3306' + expose: + - '3306' + volumes: + - "./schema.sql:/docker-entrypoint-initdb.d/1.sql" + - my-db:/var/lib/mysql + localstack: + container_name: localstack + image: localstack/localstack + environment: + - SERVICES=sqs + - DEFAULT_REGION=ap-southeast-1 + ports: + - "4567-4582:4567-4582" + - "8081:8081" + python_container: + container_name: aws_sqs_app + build: + context: . + dockerfile: runnerDockerfile + environment: + - AWS_ACCESS_KEY_ID=foobar + - AWS_DEFAULT_REGION=foobar + - AWS_SECRET_ACCESS_KEY=foobar + links: + - localstack + - db + stdin_open: true # docker run -i + tty: true # docker run -t +volumes: + my-db: diff --git a/solution/generator.py b/solution/generator.py new file mode 100644 index 0000000..c01366d --- /dev/null +++ b/solution/generator.py @@ -0,0 +1,12 @@ +import boto3 +import random +sqs = boto3.client('sqs',region_name="ap-southeast-1",endpoint_url="http://localstack:4566") +sqs.create_queue(QueueName='test-queue') +count = random.randint(10, 100) +print(count) +while count > 0 : + message_body="Test message {}".format(count) + sqs.send_message(QueueUrl='http://localstack:4576/000000000000/test-queue',MessageBody=message_body) + count-=1 + +print("completed") diff --git a/solution/msg_consumer_app.py b/solution/msg_consumer_app.py new file mode 100644 index 0000000..b01723d --- /dev/null +++ b/solution/msg_consumer_app.py @@ -0,0 +1,90 @@ +import boto3 +import sys +import mysql.connector + +def conn_db(): + try: + mydb = mysql.connector.connect( + host="db", + user="root", + password="BAoMDUdhcmFudGlh", + database="sqs" + ) + return mydb + + except Exception as err: + print(Exception, err) + print("Can't connect to DB") + sys.exit() + + +def consume_message(db,db_cursor,q_name,maxNumberOfMessages): + insert_sql = "INSERT INTO sqs_messages (sqs_message_id, sqs_message) VALUES (%s, %s)" + sqs = boto3.resource('sqs',region_name="ap-southeast-1",endpoint_url="http://localstack:4566") + queue = sqs.get_queue_by_name(QueueName=q_name) + messages = queue.receive_messages(MaxNumberOfMessages=int(maxNumberOfMessages), WaitTimeSeconds=1) + print("lenght => " + str(len(messages))) + for message in messages: + if check_msg_id(db_cursor,message.message_id): + print(message.message_id + "is already present in DB, hence removing from Queue") + else: + print(message.message_id + "-->" + message.body) + val = (message.message_id,message.body) + db_cursor.execute(insert_sql, val) + db.commit() + message.delete() + +def check_msg_id(db_cursor,msg_id): + check_sql = "select * from sqs_messages where sqs_message_id = " + msg_id + db_cursor.execute(check_sql) + results = db_cursor.fetchall() + if(len(results) > 0): + return True + else: + return False + +def show_messages(db_cursor): + show_sql = "select * from sqs_messages" + try: + db_cursor.execute(show_sql) + results = db_cursor.fetchall() + if(len(results) > 0): + for row in results: + print(row) + else: + print("No messages are consumed yet") + except Exception as err: + print(Exception, err) + print("Error in displaying messages") + +def clear_messages(db_cursor): + clear_message = "truncate table sqs_messages" + try: + db_cursor.execute(clear_message) + print("All messages have been cleared from DB") + return True + except: + print("Error in deleting messages") + return False + + +if len(sys.argv) == 1: + print("Action is required") +else: + db_conn = conn_db() + db_cursor = db_conn.cursor() + action = sys.argv[1] + if action == "consume": + if len(sys.argv) == 4 and sys.argv[2] == "--count": + maxNumberOfMessages = sys.argv[3] + else: + print("Selected action is consume & its requires queue_name & maxNumberOfMessages") + consume_message(db_conn,db_cursor,"test-queue",maxNumberOfMessages) + + elif action == "clear": + clear_messages(db_cursor) + + elif action == "show": + show_messages(db_cursor) + else: + print("Invalid choice") diff --git a/solution/runnerDockerfile b/solution/runnerDockerfile new file mode 100644 index 0000000..62c21fb --- /dev/null +++ b/solution/runnerDockerfile @@ -0,0 +1,6 @@ +FROM python +RUN pip3 install boto3 mysql.connector && mkdir -p /opt +WORKDIR /opt +COPY generator.py /opt/generator.py +COPY msg_consumer_app.py /opt/msg_consumer_app.py +CMD /bin/bash diff --git a/solution/schema.sql b/solution/schema.sql new file mode 100644 index 0000000..42ebe58 --- /dev/null +++ b/solution/schema.sql @@ -0,0 +1,6 @@ +create database sqs; +use sqs; +CREATE TABLE IF NOT EXISTS sqs_messages ( + `sqs_message_id` VARCHAR(255), + `sqs_message` VARCHAR(255), + PRIMARY KEY (sqs_message_id));