Building a Stream Processing Pipeline in AWS

Arockia Nirmal Amala Doss
3 min readAug 1, 2021

--

In my earlier article here in Medium, I took a kaggle credit card transaction dataset and modeled it for the OLTP and OLAP databases as below.

https://arockianirmal26.medium.com/oltp-olap-database-modelling-on-a-kaggle-dataset-3d52c2e02ac7

In this article I am writing, how I created a simple stream processing pipeline in AWS for the OLTP data model as in my earlier medium article.

Overview

Overview

Credit Card Transaction Test Data

For the purpose of simplicity and testing, I took around 17 rows from the kaggle dataset. I also had some invalid rows in the sample dataset (for example the credit card number or transaction numbers are null. But those details are mandatory for every valid transaction). This dataset is stored on the local PC.

https://github.com/arockianirmal26/MediumBlogPosts/blob/main/AWS_Stream_Processing_Pipeline/Transaction_c1.csv

Local Python Script

A python script has been created locally to send the transactional data to AWS API Gateway as put request

https://github.com/arockianirmal26/MediumBlogPosts/blob/main/AWS_Stream_Processing_Pipeline/put_streaming_data_to_API.py

AWS API Gateway

This API Gateway acts as a bridge between the local csv data and kinesis data streams.

API Gateway (POST)

Lambda Function — Write to Kinesis Data Streams

Once a transaction data reaches the API gateway, this lambda function will be triggered. The JSON format data is now validated here against the JSON schema defined in the lambda function. If the validation is successful, the transaction data will be sent to the respective kinesis data stream. If not the transaction data will be diverted to the S3 bucket. For this function, the role needs to be created with attached policies (S3 full access, put records to Kinesis stream, Lambda basic execution role). The Lambda basic execution role allows the creation of log group in the Cloud Watch so that the lambda function executions can be monitored for debugging purposes.

https://github.com/arockianirmal26/MediumBlogPosts/blob/main/AWS_Stream_Processing_Pipeline/lambda_write_to_kinesis.py

Amazon S3

A S3 bucket has been created to store all the invalid transactions. Each transaction will be stored as a separate file. The name of the file will be the current timestamp.

Sample Invalid Transactions in S3 Bucket

AWS Kinesis Data Stream

A basic kinesis data stream has been created with the number of open shards = 1 and with the data retention period of one day. Once the event reaches kinesis the write to aurora database function will be triggered.

Lambda Function — Write to RDS Aurora Serverless

This function will now take an event received from kinesis data streams and insert data to different OLTP tables in the aurora serverless database. There are many tables in the OLTP database. Hence only if all the inserts/updates on the tables for a particular event succeeds, then the transaction gets committed. If not, the transaction will be rolled back. Therefore all the SQL DML statements are enclosed within the transaction as in the code below. For this function, the role needs to be created with attached policies (write to aurora, Kinesis stream read only access, Lambda basic execution role).

https://github.com/arockianirmal26/MediumBlogPosts/blob/main/AWS_Stream_Processing_Pipeline/lambda_write_to_Aurora.py

AWS RDS — Aurora Serverless

A serverless aurora RDS has been created to store the transactional data. Below are the DDL statements of the OLTP database tables

https://github.com/arockianirmal26/MediumBlogPosts/blob/main/AWS_Stream_Processing_Pipeline/OLTP_Tables.txt

Summary

This is a simple stream processing pipeline created in AWS. Once the local python script has been started, the pipelines ends after writing the transaction data to OLTP database to the respective tables. The cloud watch logs can be used to monitor the lambda function executions.

--

--

Arockia Nirmal Amala Doss
Arockia Nirmal Amala Doss

Written by Arockia Nirmal Amala Doss

Experienced Data Engineer in Germany, with almost a decade of global experience across diverse databases and sectors, from large enterprises to startups.

Responses (1)