Categories
Data & Analytics

SQL Workbench: On-Demand DuckDB-Wasm With No Bill

In this post, I try Tobias Müller‘s free SQL Workbench tool powered by DuckDB-Wasm. And maybe make one or two gratuitous duck puns.

Table of Contents

Introduction

Hopefully like many people in tech, I have a collection of articles, repos, projects and whatnot saved under the banner of “That sounds cool / looks interesting / feels useful – I should check that out at some point.” Sometimes things even come off that list…

DuckDB was on my list, and went to the top when I heard about Tobias Müller’s free SQL Workbench tool powered by DuckDB-Wasm. What I heard was very impressive and pushed me to finally examine DuckDB up close.

So what is DuckDB?

About DuckDB

This section examines DuckDB and DuckDB-Wasm – core components of SQL Workbench.

DuckDB

DuckDB is an open-source SQL Online Analytical Processing (OLAP) database management system. It is intended for analytical workloads, and common use cases include in-process analytics, exploration of large datasets and machine learning model prototyping. DuckDB was released in 2019 and reached v1.0.0 on June 03 2024.

DuckDB is an embedded database that runs within a host process. It is lightweight, portable and requires no separate server. It can be embedded into applications, scripts, and notebooks without bespoke infrastructure.

DuckDB uses a columnar storage format and supports standard SQL features like complex queries, joins, aggregations and window functions. It can be used with programming languages like Python and R.

In this Python example, DuckDB:

  • Creates an in-memory database
  • Defines a users table.
  • Inserts data into users.
  • Runs a query to fetch results.
Python
import duckdb

# Create a new DuckDB database in memory
con = duckdb.connect(database=':memory:')

# Create a table and insert some data
con.execute("""
CREATE TABLE users (
    user_id INTEGER,
    user_name VARCHAR,
    age INTEGER
);
""")

con.execute("INSERT INTO users VALUES (1, 'Alice', 30), (2, 'Bob', 25), (3, 'Charlie', 35)")

# Run a query
results = con.execute("SELECT * FROM users WHERE age > 25").fetchall()
print(results)

DuckDB-Wasm

Launched in 2021, DuckDB WebAssembly (Wasm) is a version of the DuckDB database that has been compiled to run in WebAssembly. This lets DuckDB run in web browser processes and other environments with WebAssembly support.

So what’s WebAssembly? Don’t worry – I didn’t know either and so deferred to an expert:

Since all data processing happens in the browser, DuckDB-Wasm brings new benefits to DuckDB. In-browser dashboards and data analysis tools with DuckDB-Wasm enabled can operate without any server-side processing, improving their speed and security. If data is being sent somewhere then DuckDB-Wasm can handle ETL operations first, reducing both processing time and cost.

Running in-browser SQL queries also removes the need for setting up database servers and IDEs, reducing patching and maintenance (but don’t forget about the browser!) while increasing availability and convenience for remote work and educational purposes.

Like DuckDB, it is open-source and the code is on GitHub. A DuckDB Web Shell is available at shell.duckdb.org and there are also more technical details on DuckDB’s blog.

SQL Workbench

This section examines SQL Workbench and tries out some of its core features.

About SQL Workbench

Tobias Müller produced SQL Workbench in January 2024. He integrated DuckDB-Wasm into an AWS serverless static site and built an online interactive tool for querying data, showing results and generating visualizations.

SQL Workbench is available at sql-workbench.com. Note that it doesn’t currently support mobile browsers. Tobias has also written a great tutorial post that includes:

and loads more content in addition that I won’t reproduce here. So go and give Tobias some traffic!

Layout

The core SQL Workbench components are:

  • The Object Explorer section shows databases, schema and tables:
2024 06 26 SQLWorkbenchSchema
  • The Tools section shows links, settings and a drag-and-drop section for adding files (more on that later):
2024 06 26 SQLWorkbenchOptions
  • The Query Pane for writing SQL, which preloads with the below script upon each browser refresh:
2024 06 26 SQLWorkbenchQueryWindow
  • And finally, the Results Pane shows query results and visuals:
2024 06 26 SQLWorkbenchResultsWindow

Next, let’s try it out!

Sample Queries

SQL Workbench opens with a pre-loaded script that features:

  • Instructions:
SQL
-- WELCOME TO THE ONLINE SQL WORKBENCH!
-- To run a SQL query in your browser, select the query text and press:
-- CTRL + Enter (Windows/Linux) / CMD + Enter (Mac OS)

-- See https://duckdb.org/docs/sql/introduction for more info about DuckDB SQL syntax
  • Example queries using Parquet files:
SQL
-- Remote Parquet scans:
SELECT * FROM 'https://shell.duckdb.org/data/tpch/0_01/parquet/orders.parquet' LIMIT 1000;

SELECT avg(c_acctbal) FROM 'https://shell.duckdb.org/data/tpch/0_01/parquet/customer.parquet';

SELECT count(*)::int as aws_service_cnt FROM 'https://raw.githubusercontent.com/tobilg/aws-iam-data/main/data/parquet/aws_services.parquet';

SELECT * FROM 'https://raw.githubusercontent.com/tobilg/aws-edge-locations/main/data/aws-edge-locations.parquet';

SELECT cloud_provider, sum(ip_address_cnt)::int as cnt FROM 'https://raw.githubusercontent.com/tobilg/public-cloud-provider-ip-ranges/main/data/providers/all.parquet' GROUP BY cloud_provider;

SELECT * FROM 'https://raw.githubusercontent.com/tripl-ai/tpch/main/parquet/lineitem/part-0.parquet';
  • Example query using a CSV file:
SQL
-- Remote CSV scan
SELECT * FROM read_csv_auto('https://raw.githubusercontent.com/tobilg/public-cloud-provider-ip-ranges/main/data/providers/all.csv');

These queries are designed to show SQL Workbench’s capabilities and the style of SQL queries it can run (expanded on further here). They can also be used for experimentation, so here goes!

Firstly, I’ve added an ORDER BY to the first Parquet query to make sure the first 1000 orders are returned:

SQL
-- Show first 1000 orders for Parquet file
SELECT * 
FROM 'https://shell.duckdb.org/data/tpch/0_01/parquet/orders.parquet'
ORDER BY o_orderkey
LIMIT 1000;
2024 06 26 SQL WorkbenchQueryLimit1000

Secondly, I’ve used the COUNT and CAST functions to count the total orders and return the value as an integer:

SQL
-- Show number of orders for Parquet file
SELECT CAST(COUNT(o_orderkey) AS INT) AS orderkeycount 
FROM 'https://shell.duckdb.org/data/tpch/0_01/parquet/orders.parquet';
2024 06 26 SQL WorkbenchQueryOrderCount

Finally, this query captures all 1996 orders in a CTE and uses it to calculate the 1996 order price totals grouped by order priority:

SQL
-- Show 1996 order totals in priority order
WITH cte_orders1996 AS (
    SELECT o_orderkey, o_custkey, o_totalprice, o_orderdate, o_orderpriority
    FROM 'https://shell.duckdb.org/data/tpch/0_01/parquet/orders.parquet'
    WHERE YEAR(o_orderdate) = 1996
)

SELECT SUM(o_totalprice) AS totalpriorityprice, o_orderpriority 
FROM cte_orders1996 
GROUP BY o_orderpriority 
ORDER BY o_orderpriority;
2024 06 26 SQL WorkbenchQueryCTE

Now let’s try some of my data!

File Imports

SQL Workbench can also query data stored locally (as well as remote data which is out of this post’s scope). For this section I’ll be using some Parquet files generated by my WordPress bronze data orchestration process: posts.parquet and statistics_pages.parquet.

After dragging each file onto the assigned section they quickly appear as new tables:

2024 06 26 SQLWorkbenchSchemaWordPress

These tables show column names and inferred data types when expanded:

2024 06 26 SQLWorkbenchSchemaWordPressExpanded

Queries can be written manually or by right-clicking the tables in the Object Explorer. A statistics_pages SELECT * test query shows the expected results:

2024 06 26 SQLWorkbenchQueryStatsPages

As posts.parquet‘s ID primary key column matches statistics_pages.parquet‘s ID foreign key column, the two files can be joined on this column using an INNER JOIN:

SQL
SELECT 
  p.post_title,
  sp.type, 
  sp.date as viewdate,
  sp.count
FROM 
  'statistics_pages.parquet' AS sp
  INNER JOIN 'posts.parquet' AS p ON sp.id = p.id
 ORDER BY sp.date
2024 06 26 SQLWorkbenchQueryJoin

I can use this query to start making visuals!

Visuals

Next, let’s examine SQL Workbench’s visualisation capabilities. To help things along, I’ve written a new query showing the cumulative view count for my Using Athena To Query S3 Inventory Parquet Objects post (ID 92):

SQL
 SELECT 
  sp.date AS viewdate,
  SUM(sp.count) OVER (PARTITION BY sp.id ORDER BY sp.date) AS cumulative_views
FROM 
  'statistics_pages.parquet' AS sp
  INNER JOIN 'posts.parquet' AS p ON sp.id = p.id
WHERE p.id = 92
ORDER BY sp.date;
2024 06 26 SQLWorkbenchQueryCumulViews

(On reflection the join wasn’t necessary for this output, but anyway… – Ed)

Tobias has written visualisation instructions here so I’ll focus more on how my visual turns out. After the query finishes running, several visual types can be selected:

2024 06 26 SQLWorkbenchVisualizationsOptions

Selecting a visual quickly renders it in the browser. Customisation options are also available. That said, in my case the first chart was mostly fine!

2024 06 26 SQLWorkbenchVisualizationsLineChart

All I need to do is move viewdata (bottom right) to the Group By bin (top right) to add dates to the X-axis.

Exports & Cleanup

Finally, let’s examine SQL Workbench’s export options. And there’s plenty to choose from!

2024 06 26 SQLWorkbenchExports

The CSV and JSON options export the query results in their respective formats. The Arrow option exports query results in Apache Arrow format, which is designed for high-speed in-memory data processing and so compliments DuckDB very well. Dremio wrote a great Arrow technical guide for those curious.

The HTML option produces an interactive graph with tooltips and configuration options. Handy for sharing and I’m sure there’ll be a way to use the backend code for embedding if I take a closer look. The PNG has me covered in the meantime, immediately producing this image:

athenainventorycumulativeCropped
(Image cropped for visibility – Ed)

Finally there’s the Config.JSON option, which exports the visual’s settings for version control and reproducibility:

JSON
{"version":"2.10.0","plugin":"Y Line","plugin_config":{},"columns_config":{},"settings":true,"theme":"Pro Light","title":"Using Athena To Query S3 Inventory Parquet Objects Cumulative Views","group_by":["viewdate"],"split_by":[],"columns":["cumulative_views"],"filter":[],"sort":[],"expressions":{},"aggregates":{}}

All done! So how do I remove my data from SQL Workbench? I…refresh the site! Because SQL Workbench uses DuckDB, my data has only ever been stored locally and in memory. This means the data only persists until the SQL Workbench page is closed or reloaded.

Summary

In this post, I tried Tobias Müller’s free SQL Workbench tool powered by DuckDB-Wasm.

SQL Workbench and DuckDB are both very impressive! The results they produce with no servers or data movement are remarkable, and their tooling greatly simplifies working with modern data formats.

DuckDB makes lots of sense as an AWS Lambda layer, and DuckDB-Wasm simplifies in-browser analytics at a time when being data-driven and latency-averse are constantly gaining importance. I don’t feel like I’ve even scratched the surface of SQL Workbench either, so go and check it out!

Phew – no duck puns. No one has time for that quack. If this post has been useful then the button below has links for contact, socials, projects and sessions:

SharkLinkButton 1

Thanks for reading ~~^~~

Categories
Developing & Application Integration

WordPress Bronze Data Orchestration With AWS

In this post, I create my WordPress pipeline’s bronze data orchestration process using AWS Lambda layers and AWS Step Functions.

Table of Contents

Introduction

In recent posts, I’ve written a Python script to extract WordPress API data and automated the script’s invocation with AWS services. This script creates five JSON objects in an S3 bucket at 07:00 each morning.

Now, I want to transform the data from semi-structured raw JSON into a more structured and query-friendly ‘bronze’ format to prepare it for downstream partitioning, cleansing and filtration.

Firstly, I’ll cover the additions and changes to my pipeline architecture. Next, I’ll examine both my new bronze Python function and the changes made to the existing raw function.

Finally, I’ll deploy the bronze script to AWS Lambda and create my WordPress pipeline orchestration process with AWS Step Functions. This process will ensure both Lambdas run in a set order each day.

Let’s start by examining my latest architectural decisions.

Architectural Decisions

In this section, I examine my architectural decisions for the bronze AWS Lambda function and the WordPress pipeline orchestration. Note that these decisions are in addition to my previous ones here and here.

AWS SDK For pandas

AWS SDK For pandas is an open-source Python initiative using the pandas library. It integrates with AWS services including Athena, Glue, Redshift, DynamoDB and S3, offering abstracted functions to execute various data processes.

AWS SDK For pandas used to be called awswrangler until AWS renamed it for clarity. It now exists as AWS SDK For pandas in documentation and awswrangler in code.

AWS Lambda Layers

A Lambda layer is an archive containing code like libraries, dependencies, or custom runtimes. Layers can be both created manually and provided by AWS and third parties. Each Lambda function can include up to five layers.

Layers can be shared between functions, reducing code duplication and package sizes. This reduces storage costs and lets the smaller packages deploy markedly faster. Layers also separate dependencies from function code, supporting decoupling and separation of concerns.

AWS Step Functions

AWS Step Functions is a serverless orchestration service that integrates with other AWS services to build application workflows as a series of event-driven steps. For example, chaining Athena queries and ML model training.

Central to the Step Functions service are the concepts of States and State Machines:

  • States represent single steps or tasks in a workflow, and can be one of several types. The Step Functions Developer Guide has a full list of states.

The AWS Step Functions Developer Guide’s welcome page has more details including workflow types, use cases and a variety of sample projects.

Apache Parquet

Onto the data architecture! Let’s start by choosing a structured file type for the bronze data:

Apache Parquet is an open source, column-oriented data file format designed for efficient data storage and retrieval. It provides efficient data compression and encoding schemes with enhanced performance to handle complex data in bulk.

Databricks: What is Parquet?

There’s a more detailed explanation in the Parquet documentation too. So why choose Parquet over something like CSV? Well:

  • Size: Parquet supports highly efficient compression, so files take up less space and are cheaper to store than CSVs.
  • Performance: Parquet files store metadata about the data they hold. Query engines can use this metadata to find only the data needed, whereas with CSVs the whole file must be read first. This reduces the amount of processed data and enhances query performance.
  • Compatibility: Parquet is an open standard supported by various data processing frameworks including Apache Spark, Apache Hive and Presto. This means that data stored in Parquet format can be read and processed across many platforms and services.

Data Lakehouse

A Data Lakehouse is an emerging data architecture combining the centralized storage of raw data synonymous with Data Lakes with the transactional and analytical processing associated with Data Warehouses.

The result is a unified platform for efficient data management, analytics, and insights. Lakehouses have gained popularity as cloud services increasingly support them, with AWS, Azure and GCP all providing Lakehouse services.

This segues neatly into…

Medallion Architecture

Medallion Architecture is a data design pattern for logically organizing data in a Lakehouse. It aims to improve data quality as it flows through various layers incrementally. Names for these layers vary, tending to be Bronze, Silver, and Gold.

Implementations of the Medallion Architecture also vary. I like this Advancing Analytics video, which maps the Medallion Architecture to their approach. Despite the title it’s not a negative video, instead outlining how the three layers don’t necessarily fit every use case.

I’m using Raw and Bronze layers here because they best fit what I’m doing with my data.

Architectural Updates

In this section, I examine the changes made to my existing architecture.

Amazon S3

I’ve created a new data-lakehouse-bronze s3 bucket in the same region as the data-lakehouse-raw bucket to separate the two data layers.

Why use two buckets instead of one bucket with two prefixes? Well, after much research I’ve not found a right or wrong answer for this. There’s no difference in cost, performance or availability as long as all objects are stored in the same AWS region.

I chose two buckets because I find it easier to manage multiple buckets with flat structures and small bucket policies, as opposed to single buckets with deep structures and large bucket policies.

The truest answer is ‘it depends’, as other factors can come into play like:

  • Data Sovereignty: S3 bucket prefixes exist in the same region as the parent bucket. Regulations like GDPR and CCPA may require using separate buckets in order to isolate data within designated locations.

AWS SNS

I previously had two standard SNS Topics:

  • wordpress-api-raw for Lambda function alerts
  • failure-lambda for Lambda Destination alerts.

Firstly, there’s now an additional failure-stepfunction topic for any state machine failures.

Secondly, I’ve replaced my wordpress-api-raw topic with a data-lakehouse-raw topic to simplify my alerting channels and allow resource reuse. I’ve also created a new data-lakehouse-bronze topic for bronze process alerts.

Why two data topics? Well, different teams and services care about different things. A bronze-level failure may only concern the Data Engineering team as no other teams consume the data. Conversely, a gold-level failure will concern the AI and MI teams as it impacts their models and reports. Having separate SNS topics for each layer type enables granular monitoring controls.

AWS Parameter Store

Finally, Parameter Store needs the new S3 bucket name and SNS ARNs. I’ve replaced the /sns/pipeline/wordpressapi/raw parameter with /sns/data/lakehouse/raw to preserve the name schema.

I’m now storing five parameters:

  • 2x S3 Bucket names (Raw and Bronze)
  • 2x SNS Topic ARNs (Raw and Bronze notifications)
  • WordPress API Endpoints (unchanged)

Architectural Diagram

There are two diagrams this time! Firstly, here is the data_wordpressapi_bronze AWS Lambda function:

Where:

  1. AWS Lambda calls Parameter Store for S3 and SNS parameters. Parameter Store returns these to AWS Lambda.
  2. Lambda function gets raw WordPress JSON data from S3 Raw Bucket.
  3. Lambda function transforms the raw WordPress JSON data to bronze WordPress Parquet data and puts the new object in the S3 Bronze Bucket.

Meanwhile, Lambda is writing to a CloudWatch Log Group throughout its invocation. If there’s a failure, the Lambda function publishes a message to an SNS topic. SNS then delivers this message to the user’s subscribed email address.

Next, this is the AWS Step Functions WordPress bronze orchestration process:

Where:

  1. EventBridge Schedule invokes the State Machine.
  2. State Machine invokes the Raw Lambda function.
  3. State Machine invokes the Bronze Lambda function.

The State Machine also has its own logging and alerting channels.

Python

In this section, I work on my raw and bronze Python scripts for the WordPress pipeline orchestration process.

Raw Script Updates

I try to update my existing resources when I find something pertinent online. My latest find was this Indently video that covers, amongst other things, type annotations:

So how are type annotations different from type hints? Type annotations were released in 2006 and aimed to standardize function parameters and return value annotation. Type hints (released in 2014) then added updated definitions and conventions to enrich type annotations further.

The type hints PEP shows this difference between the two:

When used in a type hint, the expression None is considered equivalent to type(None)

https://peps.python.org/pep-0484/#using-none

So in this function:

Python
def send_email(name: str, message: str) -> None:
  • name: str is an example of type annotation because the parameter name is of type string.
  • -> None is an example of a type hint because although None isn’t a type, it confirms that the function has no output.

So what’s changed in my raw script?

Updated Import & Functions

Let’s open with a new import:

Python
from botocore.client import BaseClient

BaseClient serves as a foundational base class for AWS service clients within botocore – a low-level library providing the core functionality of boto3 (the AWS Python SDK) and the AWS CLI.

I’m using it here to add type annotations to my boto3 clients. For example, send_sns_message already had these annotations:

Python
def send_sns_message(sns_client, topic_arn: str, subject:str, message: str):

I’ve now annotated sns_client with BaseClient to indicate its boto3 relation. I’ve also added a -> None type hint to confirm the function has no output:

Python
def send_sns_message(sns_client: BaseClient, topic_arn: str, subject:str, message: str) -> None:

Elsewhere, I’ve added the BaseClient annotation to get_parameter_from_ssm‘s ssm_client parameter:

Python
def get_parameter_from_ssm(ssm_client: BaseClient, parameter_name: str) -> str:

And put_s3_object‘s s3_client parameter:

Python
def put_s3_object(s3_client: BaseClient, bucket: str, prefix:str, name: str, json_data: str, suffix: str) -> bool:

put_s3_object also has new prefix and suffix parameters. Before this, it was hard-coded to create JSON objects in a wordpress-api S3 prefix:

Python
    try:
        logging.info(f"Attempting to put {name} data in {bucket} bucket...")
        s3_client.put_object(
            Body = json_data,
            Bucket = bucket,
            Key = f"wordpress-api/{name}.json"
        )

Not any more! The S3 prefix and object suffix can now be changed dynamically:

Python
    try:
        logging.info(f"Attempting to put {name} data in {bucket} bucket's {prefix}/{name} prefix...")
        s3_client.put_object(
            Body = json_data,
            Bucket = bucket,
            Key = f"{prefix}/{name}/{name}.{suffix}"
        )

This improves put_s3_object‘s reusability as I can now pass any prefix and suffix to it during a function call. For example, this call creates a JSON object:

Python
ok = put_s3_object(client_s3, s3_bucket, data_source, object_name, api_json_string, 'json')

While this creates a CSV object:

Python
ok = put_s3_object(client_s3, s3_bucket, data_source, object_name, api_json_string, 'csv')

Likewise, this creates a TXT object:

Python
ok = put_s3_object(client_s3, s3_bucket, data_source, object_name, api_json_string, 'txt')

I can also set data_source (which I’ll cover shortly) to any S3 prefix, giving total control over where the object is stored.

Updated Variables

Next, some of my variables need to change. My SNS parameter name needs updating from:

Python
# AWS Parameter Store Names
parametername_s3bucket = '/s3/lakehouse/name/raw'
parametername_snstopic = '/sns/pipeline/wordpressapi/raw'
parametername_wordpressapi = '/wordpress/amazonwebshark/api/mysqlendpoints'

To:

Python
# AWS Parameter Store Names
parametername_s3bucket = '/s3/lakehouse/name/raw'
parametername_snstopic = '/sns/data/lakehouse/raw'
parametername_wordpressapi = '/wordpress/amazonwebshark/api/mysqlendpoints'

I also need to lay the groundwork for put_s3_object‘s new prefix parameter. I used to have a lambdaname variable that was used in the logs:

Python
# Lambda name for messages
lambdaname = 'data_wordpressapi_raw'

I’ve replaced this with two new variables. data_source records the data’s origin, which matches my S3 prefix naming schema. function_name then adds context to data_source to match my Lambda function naming schema:

Python
# Lambda name for messages
data_source = 'wordpress_api'
function_name = f'data_{data_source}_raw'

data_source is then passed to the put_s3_object function call when creating raw objects:

Python
ok = put_s3_object(client_s3, s3_bucket, data_source, object_name, api_json_string)

While function_name is used in the logs when referring to the Lambda function:

Python
    # Check an S3 bucket has been returned.
    if not s3_bucket_raw:
        message = f"{function_name}: No S3 Raw bucket returned."
        subject = f"{function_name}: Failed"

Updated Script Body

My variables all now have type annotations. They’ve gone from:

Python
    # AWS Parameter Store Names
    parametername_s3bucket = '/s3/lakehouse/name/raw'
    parametername_snstopic = '/sns/data/lakehouse/raw'
    parametername_wordpressapi = '/wordpress/amazonwebshark/api/mysqlendpoints'

    # Lambda name for messages
    data_source = 'wordpress_api'
    function_name = f'data_{data_source}_raw'

    # Counters
    api_call_timeout = 30
    endpoint_count_all = 0
    endpoint_count_failure = 0
    endpoint_count_success = 0

To:

Python
    # AWS Parameter Store Names
    parametername_s3bucket: str = '/s3/lakehouse/name/raw'
    parametername_snstopic: str = '/sns/data/lakehouse/raw'
    parametername_wordpressapi: str = '/wordpress/amazonwebshark/api/mysqlendpoints'

    # Lambda name for messages
    data_source: str = 'wordpress_api'
    function_name: str = f'data_{data_source}_raw'

    # Counters
    api_call_timeout: int = 30
    endpoint_count_all: int = 0
    endpoint_count_failure: int = 0
    endpoint_count_success: int = 0

This is helpful when the variables are passed in from settings files or external services and are not immediately apparent. So a good habit to get into!

Bronze Script

Now let’s talk about the new script, which transforms raw S3 JSON objects into bronze S3 Parquet objects. Both raw and bronze WordPress scripts will then feed into an AWS orchestration workflow.

Reused Raw Functions

The following functions are re-used from the Raw script with no changes:

Get Filename Function

Here, I want to get each S3 path’s object name. The object name has some important uses:

  • Using it instead of the full S3 path makes the logs easier to read and cheaper to store.
  • Using it during bronze S3 object creation ensures consistent naming.

A typical S3 path has the schema s3://bucket/prefix/object.suffix, from which I want object.

This function is a remake of the raw script’s Get Filename function. This time, the source string is an S3 path instead of an API endpoint:

I define a get_objectname_from_s3_path function, which expects a path argument with a string type hint and returns a new string.

Firstly, my name_full variable uses the rsplit method to capture the substring I need, using forward slashes as separators. This converts s3://bucket/prefix/object.suffix to object.suffix.

Next, my name_full_last_period_index variable uses the rfind method to find the last occurrence of the period character in the name_full string.

Finally, my name_partial variable uses slicing to extract a substring from the beginning of the name_full string up to (but not including) the index specified by name_full_last_period_index. This converts object.suffix to object.

If the function cannot return a string, an exception is logged and a blank string is returned instead.

Get Data Function

Next, I want to read data from an S3 JSON object in my Raw bucket and store it in a pandas DataFrame.

Here, I define a get_data_from_s3_object function that returns a pandas DataFrame and expects three arguments:

  • boto3_session: the authenticated session to use with a BaseClient type hint.
  • s3_object: the S3 object path with a string type hint.
  • name: the S3 object name with a string type hint (used for logging).

This function uses AWS SDK For pandas s3.read_json to read the data from the S3 object path using the existing boto3_session authentication.

If data is found then get_data_from_s3_object returns a populated DataFrame. Otherwise, an empty DataFrame is returned instead.

Put Data Function

Finally, I want to convert the DataFrame to Parquet and store it in my bronze S3 bucket.

I define a put_s3_parquet_object function that expects four arguments:

  • df: the pandas DataFrame containing the raw data.
  • name: the S3 object name.
  • s3_object_bronze: the S3 path for the new bronze object
  • session: the authenticated boto3 session to use.

I give string type hints to the name and s3_object_bronze parameters. session gets the same BaseClient hint as before, and df is identified as a pandas DataFrame.

I open a try except block that uses s3.to_parquet with the existing boto3_session to upload the DataFrame data to S3 as a Parquet object. If this operation succeeds, the function returns True. If it fails, a botocore exception is logged and the function returns False.

Imports & Variables

The bronze script has two new imports to examine: awswrangler and pandas:

Python
import logging
import boto3
import botocore
import awswrangler as wr
import pandas as pd
from botocore.client import BaseClient

I’ve used both before. Here, pandas handles my in-memory data storage and awswrangler handles my S3 interactions.

There are also parameter changes. I’ve added Parameter Store names for both the bronze S3 bucket and the SNS topic. I’ve kept the raw S3 bucket parameter as awswrangler needs it for the get_data_from_s3_object function.

Python
parametername_s3bucket_raw: str = '/s3/lakehouse/name/raw'
parametername_s3bucket_bronze: str = '/s3/lakehouse/name/bronze'
parametername_snstopic: str = '/sns/data/lakehouse/bronze'

I’ve also swapped out _raw for _bronze in function_name, and renamed the counters from endpoint_count to object_count to reflect their new function:

Python
    # Lambda name for messages
    data_source: str = 'wordpress_api'
    function_name: str = f'data_{data_source}_bronze'

    # Counters
    object_count_all: int = 0
    object_count_failure: int = 0
    object_count_success: int = 0

Script Body

Most of the bronze script is reused from the raw script. Tasks like logging config, name parsing and validation checks only needed the updated parameters! There are some changes though, as S3 is now my data source and I’m also doing additional tasks.

Firstly, I need to get the raw S3 objects. The AWS SDK For pandas S3 class has a list_objects function which is purpose-built for this:

Python
s3_objects_raw = wr.s3.list_objects(
      path = f's3://{s3_bucket_raw}/{data_source}',
      suffix = 'json',
      boto3_session = session)
  • path is the S3 location to list – in this case the raw S3 bucket’s wordpress_api prefix.
  • suffix filters the list by the specified suffix.
  • boto3_session specifies my existing boto3_session to prevent unnecessary re-authentication.

During the loop, my script checks if the pandas DataFrame returned from get_data_from_s3_object contains data. If it’s empty then the loop ends, otherwise the column and row counts are logged:

Python
if df.empty:
  logging.warning(f"{object_name} DataFrame is empty!")
  endpoint_count_failure += 1
  continue
  
logging.info(f'{object_name} DataFrame has {len(df.columns)} columns and {len(df)} rows.')

Assuming all checks succeed, I want to put a new Parquet object into my bronze S3 bucket. AWS SDK For pandas has an s3.to_parquet function that does this using a pandas DataFrame and an S3 path.

I already have the DataFrame so let’s make the path. This is done by the s3_object_bronze parameter, which joins existing parameters with additional characters. This is then passed to put_s3_parquet_object:

Python
s3_object_bronze = f's3://{s3_bucket_bronze}/{data_source}/{object_name}/{object_name}.parquet'

logging.info(f"Attempting {object_name} S3 Bronze upload...")
ok = put_s3_parquet_object(df, object_name, s3_object_bronze, session)

That’s the bronze script done. Now to deploy it to AWS Lambda.

Lambda

In this section, I configure and deploy my Bronze Lambda function.

Hitting Size Limits

So, remember when I said that I expected my future Lambda deployments to improve? Well, this was the result of my retrying the virtual environment deployment process the Raw Lambda used:

2024 03 08 LambdaError

While my zipped raw function is 19.1 MB, my zipped bronze function is over five times bigger at 101.6 MB! My poorly optimised package wouldn’t cut it this time, so I prepared for some pruning. Until I discovered something…

Using A Layer

There’s a managed AWS SDK for pandas Lambda layer!

2024 03 05 LayerAWSSDKPandas

It can be selected in the Lambda console or programmatically called from this list of AWS SDK for pandas Managed Layer ARNs, which covers:

  • All AWS commercial regions.
  • All Python versions currently supported by Lambda (currently 3.8+)
  • Both Lambda architectures.

Additionally, the Lambda Python 3.12 runtime includes boto3 and botocore. So by using this runtime and the managed layer, I’ve gone from a large deployment package to no deployment package! And because my function is now basically just code, I can view and edit that code in the Lambda console directly.

Lambda Config

My Bronze Lambda function borrows several config settings from the raw one, including:

Where it differs is the IAM setup. I needed additional permissions anyway because this function is reading from two S3 buckets now, but by the time I was done the policy was hard to read, maintain and troubleshoot:

JSON
{
    "Version": "2012-10-17",
    "Statement": [
        {
            "Sid": "VisualEditor0",
            "Effect": "Allow",
            "Action": [
                "s3:PutObject",
                "s3:GetObject",
                "sns:Publish",
                "s3:ListBucket",
                "logs:CreateLogGroup"
            ],
            "Resource": [
                "arn:aws:s3:::data-lakehouse-raw/wordpress_api/*",
                "arn:aws:s3:::data-lakehouse-bronze/wordpress_api/*",
                "arn:aws:s3:::data-lakehouse-raw",
                "arn:aws:s3:::data-lakehouse-bronze",
                "arn:aws:logs:eu-west-1:REDACTED:*",
                "arn:aws:sns:eu-west-1:REDACTED:data-lakehouse-raw",
                "arn:aws:sns:eu-west-1:REDACTED:data-lakehouse-bronze"
            ]
        },
        {
            "Sid": "VisualEditor1",
            "Effect": "Allow",
            "Action": [
                "logs:CreateLogStream",
                "logs:PutLogEvents",
                "ssm:GetParameter"
            ],
            "Resource": [
                "arn:aws:logs:eu-west-1:REDACTED:log-group:/aws/lambda/data_wordpressapi_bronze:*",
                "arn:aws:ssm:eu-west-1:REDACTED:parameter/s3/lakehouse/name/raw",
                "arn:aws:ssm:eu-west-1:REDACTED:parameter/s3/lakehouse/name/bronze",
                "arn:aws:ssm:eu-west-1:REDACTED:parameter/sns/data/lakehouse/raw",
                "arn:aws:ssm:eu-west-1:REDACTED:parameter/sns/data/lakehouse/bronze"
            ]
        }
    ]
}

So let’s refactor it! The below policy has the same actions, grouped by service and with appropriately named Sids:

JSON
{
    "Version": "2012-10-17",
    "Statement": [
        {
            "Sid": "CloudWatchLogGroupActions",
            "Effect": "Allow",
            "Action": [
                "logs:CreateLogGroup"
            ],
            "Resource": [
                "arn:aws:logs:eu-west-1:REDACTED:*"
            ]
        },
        {
            "Sid": "CloudWatchLogStreamActions",
            "Effect": "Allow",
            "Action": [
                "logs:CreateLogStream",
                "logs:PutLogEvents"
            ],
            "Resource": [
                "arn:aws:logs:eu-west-1:REDACTED:log-group:/aws/lambda/data_wordpressapi_bronze:*"
            ]
        },
        {
            "Sid": "S3BucketActions",
            "Effect": "Allow",
            "Action": [
                "s3:ListBucket"
            ],
            "Resource": [
                "arn:aws:s3:::data-lakehouse-raw",
                "arn:aws:s3:::data-lakehouse-bronze"
            ]
        },
        {
            "Sid": "S3ObjectActions",
            "Effect": "Allow",
            "Action": [
                "s3:PutObject",
                "s3:GetObject"
            ],
            "Resource": [
                "arn:aws:s3:::data-lakehouse-raw/wordpress_api/*",
                "arn:aws:s3:::data-lakehouse-bronze/wordpress_api/*"
            ]
        },
        {
            "Sid": "SNSActions",
            "Effect": "Allow",
            "Action": [
                "sns:Publish"
            ],
            "Resource": [
                "arn:aws:sns:eu-west-1:REDACTED:data-lakehouse-bronze"
            ]
        },
        {
            "Sid": "ParameterStoreActions",
            "Effect": "Allow",
            "Action": [
                "ssm:GetParameter"
            ],
            "Resource": [
                "arn:aws:ssm:eu-west-1:REDACTED:parameter/s3/lakehouse/name/raw",
                "arn:aws:ssm:eu-west-1:REDACTED:parameter/s3/lakehouse/name/bronze",
                "arn:aws:ssm:eu-west-1:REDACTED:parameter/sns/data/lakehouse/bronze"
            ]
        }
    ]
}

Much better! This policy is now far easier to read and update.

There’s also a clear distinction between the bucket-level s3:ListBucket operation and the object-level s3:PutObject and s3:GetObject operations now. Getting these wrong can have big consequences, so the clearer the better!

One deployment and test later, and I have some new S3 objects!

[INFO]: WordPress API Bronze process complete: 5 Successful | 0 Failed.

REPORT RequestId: 899d1658-f7de-4e74-8d64-b4f029fe2bec	Duration: 7108.50 ms	Billed Duration: 7109 ms	Memory Size: 250 MB	Max Memory Used: 250 MB	Init Duration: 4747.38 ms

So now I have two Lambda functions with some requirements around them:

  • They need to run sequentially.
  • The Raw Lambda must finish before the Bronze Lambda starts.
  • If the Raw Lambda fails then the Bronze Lambda shouldn’t run at all.

Now that AWS Lambda is creating WordPress raw and bronze objects, it’s time to start thinking about orchestration!

Step Functions & EventBridge

In this section, I create both an AWS Step Functions State Machine and an Amazon EventBridge Schedule for my WordPress bronze orchestration process.

State Machine Requirements

Before writing any code, let’s outline the steps I need the state machine to perform:

  1. data_wordpressapi_raw Lambda function is invoked. If it succeeds then move to the next step. If it fails then send a notification and end the workflow reporting failure.
  2. data_wordpressapi_bronze Lambda function is invoked. If it succeeds then end the workflow reporting success. If it fails then send a notification and end the workflow reporting failure.

With the states defined, it’s time to create the state machine.

State Machine Creation

The following state machine was created using Step Functions Workflow Studio – a low-code visual designer released in 2021, with drag-and-drop functionality that auto-generates code in real-time:

Workflow Studio produced this section’s code and diagrams.

Firstly I create a data_wordpressapi_raw task state to invoke my Raw Lambda. This task uses the lambda:invoke action to invoke my data_wordpressapi_raw function. I set the next state as data_wordpressapi_bronze and add a Catch block that sends all errors to a PublishFailure state (which I’ll define later):

JSON
    "data_wordpressapi_raw": {
      "Type": "Task",
      "Resource": "arn:aws:states:::lambda:invoke",
      "Parameters": {
        "Payload.$": "$",
        "FunctionName": "arn:aws:lambda:eu-west-1:REDACTED:function:data_wordpressapi_raw:$LATEST"
      },
      "Next": "data_wordpressapi_bronze",
      "Catch": [
        {
          "ErrorEquals": [
            "States.ALL"
          ],
          "ResultPath": "$.Error",
          "Next": "PublishFailure"
        }
      ],
      "TimeoutSeconds": 120
    }

Note the TimeoutSeconds parameter. All my task states will have 120-second timeouts. These stop the state machine from waiting indefinitely if the task becomes unresponsive, and are recommended best practice. Also note that state machines wait for Lambda invocations to finish by default, so no additional config is needed for this.

Next, I create a data_wordpressapi_bronze task state to invoke my Bronze Lambda. This task uses the lambda:invoke action to invoke my data_wordpressapi_bronze function. I then add a Catch block that sends all errors to a PublishFailure state.

Finally, "End": true designates this state as a terminal state which ends the execution if the task is successful:

JSON
    "data_wordpressapi_bronze": {
      "Type": "Task",
      "Resource": "arn:aws:states:::lambda:invoke",
      "Parameters": {
        "Payload.$": "$",
        "FunctionName": "arn:aws:lambda:eu-west-1:973122011240:function:data_wordpressapi_bronze:$LATEST"
      },
      "Catch": [
        {
          "ErrorEquals": [
            "States.ALL"
          ],
          "ResultPath": "$.Error",
          "Next": "PublishFailure"
        }
      ],
      "TimeoutSeconds": 120,
      "End": true
    }

Finally, I create a PublishFailure task state that publishes failure notifications. This task uses the sns:Publish action to publish a simple message to the failure-stepfunction SNS Topic ARN. "End": true marks this task as the other potential way the state machine execution can end:

JSON
    "PublishFailure": {
      "Type": "Task",
      "Resource": "arn:aws:states:::sns:publish",
      "Parameters": {
        "TopicArn": "arn:aws:sns:eu-west-1:REDACTED:failure-stepfunction",
        "Message": "An error occurred in the state machine: { \"error\": \"$.Error\" }"
      },
      "End": true,
      "TimeoutSeconds": 120
    }

While both Lambdas already have SNS alerting, the state machine itself may also fail so the added observability is justified. This Marcia Villalba video was very helpful here:

And that’s everything I need! At this point Wordflow Studio gives me two things – firstly the state machine’s code, which I’ve committed to GitHub. And secondly this handy downloadable diagram:

stepfunctions graph

State Machine Config

It’s now time to think about security and monitoring.

When new state machines are created in the AWS Step Functions console, an IAM Role is created with policies based on the state machine’s resources. The nuances and templates are covered in the Step Functions Developer Guide, so let’s examine my WordPress_Raw_To_Bronze state machine’s auto-generated IAM Role consisting of two policies:

JSON
{
    "Version": "2012-10-17",
    "Statement": [
        {
            "Effect": "Allow",
            "Action": [
                "xray:PutTraceSegments",
                "xray:PutTelemetryRecords",
                "xray:GetSamplingRules",
                "xray:GetSamplingTargets"
            ],
            "Resource": [
                "*"
            ]
        }
    ]
}

This supports the AWS X-Ray integration with AWS Step Functions. If X-Ray trancing is never enabled then this policy is unused.

Besides X-Ray tracing, there is also an option to log a state machine’s execution history to CloudWatch Logs. There are three log levels available plus a fourth default choice: OFF. Each state machine retains recent execution history and I’ve got no need to keep that history long-term, so I leave the log retention disabled. Remember – CloudWatch Logs is only free for the first 5GB!

State Machine Testing

There are various ways to test a state machine. There’s a testing and debugging section in the developer guide that goes into further details, the three main options being:

I’ll focus on console testing here.

Both individual states and the entire state machine can be tested in the console. Each state can be tested in isolation (using the TestState API under the hood) with customisable inputs and IAM roles. This is great for checking the state outputs are correct, and that the attached IAM role is sufficient.

The state machine itself can also be tested via on-demand execution. The Execution Details page shows the state machine’s statistics and events, and has great coverage in the developer guide.

During testing, my WordPress_Raw_To_Bronze state machine returned this error:

States.Runtime in step: data_wordpressapi_bronze.

An error occurred while executing the state 'data_wordpressapi_bronze' (entered at the event id #7). Unable to apply Path transformation to null or empty input.

This turned out to be a problem with the OutputPath parameter, which Wordflow Studio enables by default:

2024 03 04 StepFunctionsOutPutPath

I’m not using this setting for anything, so I disabled it to solve this problem.

Eventbridge Schedule

Finally, I want to automate the execution of my state machine. This calls for an EventBridge Schedule!

EventBridge makes this quite simple, using mostly the same process as last time. The Step Functions StartExecution operation is a templated target like Lambda’s Invoke operation, so it’s a case of selecting the WordPress_Raw_To_Bronze state machine from the list and updating the schedule’s IAM role accordingly.

And that’s it! EventBridge now executes the state machine at 07:00 each morning. The state machine then sequentially invokes both Lambda functions and catches any errors.

Costs

In this section, I’ll examine my recent AWS WordPress bronze orchestration process costs.

Let’s start with Step Functions. There are two kinds of Step Function workflow:

  • Standard workflows are charged based on the number of state transitions. These are counted each time a workflow step is executed. The first 4000 transitions each month are free. After that, every 1000 transitions cost $0.025.
  • Express workflows are priced by the number of executions, duration, and memory consumption. The specifics of these criteria, coupled with full details of all charges are on the Step Functions pricing page.

I’m using standard workflows, and as of 26 March I’ve used 118 state transitions. In other words, free! Elsewhere, my costs are broadly on par with previous months. These are my S3 costs from 2024-02-01 to 2024-03-26:

S3 ActionsMonthUsageCost
PUT, COPY, POST, or LIST requests2024-0264,1960.32
PUT, COPY, POST, or LIST requests2024-0317,5660.09
GET and all other requests2024-02101,4620.04
GET and all other requests2024-038,6560.00
GB month of storage used2024-020.1090.00
GB month of storage used2024-030.1610.00

And this is my recent free tier usage from 2024-02-01 to 2024-03-26:

ServiceMonthUsage
EventBridge2024-0231 Invocations
EventBridge2024-0325 Invocations
Lambda2024-02122.563 Second Compute
Lambda2024-0284 Requests
Lambda2024-0382.376 Second Compute
Lambda2024-0358 Requests
Parameter Store2024-0234 API Requests
Parameter Store2024-0325 API Requests
SNS2024-028 Email-JSON Notifications
SNS2024-02438 API Requests
SNS2024-033 Email-JSON Notifications
SNS2024-03205 API Requests

So my only costs are still for storage.

Resources

The following items have been checked into the amazonwebshark GitHub repo for the AWS WordPress bronze orchestration process, available via the button below:

  • Updated data_wordpressapi_raw Python script & requirements.txt file.
  • New data_wordpressapi_bronze Python script & requirements.txt file.
  • WordPress_Raw_To_Bronze state machine JSON.
GitHub-BannerSmall

Summary

In this post, I created my WordPress pipeline’s bronze data orchestration process using AWS Lambda layers and AWS Step Functions.

I’ve wanted to try Step Functions out for a while, and all things considered they’re great! Workflow Studio is easy to use, and the templates and tutorials undoubtedly highlight the value that Step Functions can bring.

Additionally, the integration with both EventBridge Scheduler and other AWS services makes Step Functions a compelling orchestration service for both my ongoing WordPress bronze work and the future projects in my pipeline. This combined with some extra Lambda layers will reduce my future dev and test time.

If this post has been useful then the button below has links for contact, socials, projects and sessions:

SharkLinkButton 1

Thanks for reading ~~^~~

Categories
Data & Analytics

Ingesting iTunes Data Into AWS With Python And Athena

In this post, I will update my existing iTunes Python ETL to return a Parquet file, which I will then upload to S3 and view using Athena.

Table of Contents

Introduction

In my last post, I made an ETL that exported data from a CSV into a Pandas DataFrame using AWS Data Wrangler. That post ended with the transformed data being saved locally as a new CSV.

It’s time to do something with that data! I want to analyse my iTunes data and look for trends and insights into my listening habits. I also want to access these insights in the cloud, as my laptop is a bit bulky and quite slow. Finally, I’d prefer to keep my costs to a minimum.

Here, I’ll show how AWS and Python can be used together to meet these requirements. Let’s start with AWS.

Amazon S3

In this section, I will update my S3 setup. I’ll create some new buckets and explain my approach.

New S3 Buckets

Currently, I have a single S3 bucket containing my iTunes Export CSV. Moving forward, this bucket will contain all of my unmodified source objects, otherwise known as raw data.

To partner the raw objects bucket, I now have an ingested objects bucket. This bucket will contain objects where the data has been transformed in some way. My analytics tools and Athena tables will point here for their data.

Speaking of Athena, the other new bucket will be used for Athena’s query results. Although Athena is serverless, it still needs a place to record queries and store results. Creating this bucket now will save time later on.

Having separate buckets for each of these functions isn’t a requirement, although it is something I prefer to do. Before moving on, I’d like to run through some of the benefits I find with this approach.

Advantages Of Multiple Buckets

Firstly, having buckets with clearly defined purposes makes navigation way easier. I always know where to find objects, and rarely lose track of or misplace them.

Secondly, having multiple buckets usually makes my S3 paths shorter. This doesn’t sound like much of a benefit upfront, but the S3 path textboxes in the AWS console are quite small, and using long S3 paths in the command line can be a pain.

Finally, I find security and access controls are far simpler to implement with a multi-bucket setup. Personally I prefer “You can’t come into this house/bucket” over “You can come into this house/bucket, but you can’t go into this room/prefix”. However, both S3 buckets and S3 prefixes can be used as IAM policy resources so there’s technically no difference.

That concludes the S3 section. Next, let’s set up Athena.

Amazon Athena

In this section, I’ll get Athena ready for use. I’ll show the process I followed and explain my key decisions. Let’s start with my reasons for choosing Athena.

Why Athena?

Plenty has been written about Athena’s benefits over the years. So instead of retreading old ground, I’ll discuss what makes Athena a good choice for this particular use case.

Firstly, Athena is cheap. The serverless nature of Athena means I only pay for what I query, scan and store, and I’ve yet to see a charge for Athena in the three years I’ve been an AWS customer.

Secondly, like S3, Athena’s security is managed by IAM. I can use IAM policies to control who and what can access my Athena data, and can monitor that access in CloudTrail. This also means I can manage access to Athena independently of S3.

Finally, Athena is highly available. Authorised calls to the service have a 99.9% Monthly Uptime Percentage SLA and Athena benefits from S3’s availability and durability. This allows 24/7 access to Athena data for users and applications.

Setting Up Athena

To start this section, I recommend reading the AWS Athena Getting Started documentation for a great Athena introduction. I’ll cover some basics here, but I can’t improve on the AWS documentation.

Athena needs three things to get off the ground:

  • An S3 path for Athena query results.
  • A database for Athena tables.
  • A table for interacting with S3 data objects.

I’ve already talked about the S3 path, so let’s move on to the database. A database in Athena is a logical grouping for the tables created in it. Here, I create a blog_amazonwebshark database using the following script:

CREATE DATABASE blog_amazonwebshark

Next, I enter the column names from my iTunes Export CSV into Athena’s Create Table form, along with appropriate data types for each column. In response, the form creates this Athena table:

The form adds several table properties to the table’s DDL. These, along with the data types, are expanded on in the Athena Create Table documentation.

Please note that I have removed the S3 path from the LOCATION property to protect my data. The actual Athena table is pointing at an S3 prefix in my ingested objects bucket that will receive my transformed iTunes data.

Speaking of data, the form offers several choices of source data format including CSV, JSON and Parquet. I chose Parquet, but why do this when I’m already getting a CSV? Why create extra work?

Let me explain.

About Parquet

Apache Parquet is a file format that supports fast processing for complex data. It can essentially be seen as the next generation of CSV. Both formats have their place, but at scale CSV files have large file sizes and slow performance.

In contrast, Parquet files have built-in compression and indexing for rapid data location and retrieval. In addition, the data in Parquet files is organized by column, resulting in smaller sizes and faster queries.

This also results in Athena cost savings as Athena only needs to read the columns relevant to the queries being run. If the same data was in a CSV, Athena would have to read the entire CSV whether the data is needed or not.

For further reading, Databricks have a great Parquet section in their glossary.

That’s everything for Athena. Now I need to update my Python scripts.

Python

In this section, I’ll make changes to my Basic iTunes ETL to include my new S3 and Athena resources and to replace the CSV output with a Parquet file. Let’s start with some variables.

New Python Variables

My first update is a change to ETL_ITU_Play_Variables.py, which contains my global variables. Originally there were two S3 global variables – S3_BUCKET containing the bucket name and S3_PREFIX containing the S3 prefix path leading to the raw data:

S3_BUCKET
S3_PREFIX

Now I have two buckets and two prefixes, so it makes sense to update the variable names. I now have two additional global variables, adding _RAW to the originals and _INGESTED to the new ones for clarity:

S3_BUCKET_RAW
S3_PREFIX_RAW

S3_BUCKET_INGESTED
S3_PREFIX_INGESTED

Changing CSV To Parquet

The next change is to ETL_ITU_Play.py. The initial version converts a Pandas DataFrame to CSV using pandas.DataFrame.to_csv. I’m now replacing this with awswrangler.s3.to_parquet, which needs three parameters:

Put together, it looks like this:

wr.s3.to_parquet(
    df = df,
    boto3_session = session,
    path = s3_path_ingested

Before committing my changes, I took the time to put the main workings of my ETL in a class. This provides a clean structure for my Python script and will make it easier to reuse in future projects.

That completes the changes. Let’s review what has been created.

Architecture

Here is an architectural diagram of how everything fits together:

Here is a breakdown of the processes involved:

  1. User runs the Python ETL script locally.
  2. Python reads the CSV object in datalake-raw S3 bucket.
  3. Python extracts data from CSV into a DataFrame and transforms several columns.
  4. Python writes the DataFrame to datalake-ingested S3 bucket as a Parquet file.
  5. Python notifies User of a successful run.
  6. User sends query to Athena.
  7. Athena reads data from datalake-ingested S3 bucket.
  8. Athena returns query results to User.

Testing

In this section, I will test my resources to make sure they work as expected. Bare in mind that this setup hasn’t been designed with production use in mind, so my testing is somewhat limited and would be insufficient for production deployment.

Testing Python

TEST: Upload a CSV to the datalake-raw S3 bucket, then run the Python script. The Python script must run successfully and print updates in the terminal throughout.

RESULT: I upload an iTunes Export CSV to the datalake-raw S3 bucket:

The Python script runs, printing the following output in the terminal:

Creating DataFrame.
DataFrame columns are Index(['Name', 'Artist', 'Album', 'Genre', 'Time', 'Track Number', 'Year', 'Date Modified', 'Date Added', 'Bit Rate', 'Plays', 'Last Played', 'Skips', 'Last Skipped', 'My Rating', 'Location'], dtype='object')
Deleting unnecessary DataFrame columns.
Renaming DataFrame columns.
Reformatting DateTime DataFrame columns.
Creating Date Columns From DateTime Columns.
Creating MyRatingDigit Column.
Replacing blank values to prevent IntCastingNaN errors.
Setting Data Types.
Creating Parquet file from DataFrame.
Processes complete.

Testing S3

TEST: After the Python script successfully runs, the datalake-ingested S3 bucket must contain an itunesdata.parquet object.

RESULT: Upon accessing the datalake-ingested S3 bucket, an itunesdata.parquet object is found:

(On an unrelated note, look at the size difference between the Parquet and CSV files!)

Testing Athena

TEST: When the datalake-ingested S3 bucket contains an itunesdata.parquet object, data from the iTunes Export CSV must be shown when the following Athena query is run:

SELECT * FROM basic_itunes_python_etl;

RESULT: Most of the Athena results match the iTunes Export data. However, the transformed dates did not match expectations:

This appears to be a formatting problem, as some parts of a date format are still visible.

To diagnose the problem I wanted to see how these columns were being stored in the Parquet file. I used mukunku’s ParquetViewer for this, which is described in the GitHub repo as:

…a quick and dirty utility that I created to easily view Apache Parquet files on Windows desktop machines.

It works very well!

Here is a screenshot of the data. The lastplayed column has dates and times, while the datamodifieddate column has dates only:

The cause of the problem becomes apparent when the date columns are viewed using the ISO 8601 format:

The date columns are all using timestamps, even when no times are included!

A potential fix would be to change the section of my Python ETL script that handles data types. Instead, I update the data types used in my Athena table from date:

  `datemodifieddate` date, 
  `dateaddeddate` date, 
  `lastplayeddate` date, 

To timestamp:

  `datemodifieddate` timestamp, 
  `dateaddeddate` timestamp, 
  `lastplayeddate` timestamp, 

This time, when I view my Athena table the values all appear as expected:

Scripts

My ETL_ITU_Play.py file commit from 2022-08-08 can be viewed here:

ETL_ITU_Play.py on GitHub

My updated repo readme can be viewed here:

README.md on GitHub

Summary

In this post, I updated my existing iTunes Python ETL to return a Parquet file, which I then uploaded S3 and viewed using Athena. I explained my reasoning for choosing S3, Athena and the Parquet file format, and I handled a data formatting issue.

If this post has been useful, please feel free to follow me on the following platforms for future updates:

Thanks for reading ~~^~~