Building a Stream Processing Pipeline in AWS
In my earlier article here in Medium, I took a kaggle credit card transaction dataset and modeled it for the OLTP and OLAP databases as below.
https://arockianirmal26.medium.com/oltp-olap-database-modelling-on-a-kaggle-dataset-3d52c2e02ac7
In this article I am writing, how I created a simple stream processing pipeline in AWS for the OLTP data model as in my earlier medium article.
Overview
Credit Card Transaction Test Data
For the purpose of simplicity and testing, I took around 17 rows from the kaggle dataset. I also had some invalid rows in the sample dataset (for example the credit card number or transaction numbers are null. But those details are mandatory for every valid transaction). This dataset is stored on the local PC.
Local Python Script
A python script has been created locally to send the transactional data to AWS API Gateway as put request
AWS API Gateway
This API Gateway acts as a bridge between the local csv data and kinesis data streams.
Lambda Function — Write to Kinesis Data Streams
Once a transaction data reaches the API gateway, this lambda function will be triggered. The JSON format data is now validated here against the JSON schema defined in the lambda function. If the validation is successful, the transaction data will be sent to the respective kinesis data stream. If not the transaction data will be diverted to the S3 bucket. For this function, the role needs to be created with attached policies (S3 full access, put records to Kinesis stream, Lambda basic execution role). The Lambda basic execution role allows the creation of log group in the Cloud Watch so that the lambda function executions can be monitored for debugging purposes.
Amazon S3
A S3 bucket has been created to store all the invalid transactions. Each transaction will be stored as a separate file. The name of the file will be the current timestamp.
AWS Kinesis Data Stream
A basic kinesis data stream has been created with the number of open shards = 1 and with the data retention period of one day. Once the event reaches kinesis the write to aurora database function will be triggered.
Lambda Function — Write to RDS Aurora Serverless
This function will now take an event received from kinesis data streams and insert data to different OLTP tables in the aurora serverless database. There are many tables in the OLTP database. Hence only if all the inserts/updates on the tables for a particular event succeeds, then the transaction gets committed. If not, the transaction will be rolled back. Therefore all the SQL DML statements are enclosed within the transaction as in the code below. For this function, the role needs to be created with attached policies (write to aurora, Kinesis stream read only access, Lambda basic execution role).
AWS RDS — Aurora Serverless
A serverless aurora RDS has been created to store the transactional data. Below are the DDL statements of the OLTP database tables
Summary
This is a simple stream processing pipeline created in AWS. Once the local python script has been started, the pipelines ends after writing the transaction data to OLTP database to the respective tables. The cloud watch logs can be used to monitor the lambda function executions.