Categories
Developing & Application Integration

WordPress Bronze Data Orchestration With AWS

In this post, I create my WordPress pipeline’s bronze data orchestration process using AWS Lambda layers and AWS Step Functions.

Table of Contents

Introduction

In recent posts, I’ve written a Python script to extract WordPress API data and automated the script’s invocation with AWS services. This script creates five JSON objects in an S3 bucket at 07:00 each morning.

Now, I want to transform the data from semi-structured raw JSON into a more structured and query-friendly ‘bronze’ format to prepare it for downstream partitioning, cleansing and filtration.

Firstly, I’ll cover the additions and changes to my pipeline architecture. Next, I’ll examine both my new bronze Python function and the changes made to the existing raw function.

Finally, I’ll deploy the bronze script to AWS Lambda and create my WordPress pipeline orchestration process with AWS Step Functions. This process will ensure both Lambdas run in a set order each day.

Let’s start by examining my latest architectural decisions.

Architectural Decisions

In this section, I examine my architectural decisions for the bronze AWS Lambda function and the WordPress pipeline orchestration. Note that these decisions are in addition to my previous ones here and here.

AWS SDK For pandas

AWS SDK For pandas is an open-source Python initiative using the pandas library. It integrates with AWS services including Athena, Glue, Redshift, DynamoDB and S3, offering abstracted functions to execute various data processes.

AWS SDK For pandas used to be called awswrangler until AWS renamed it for clarity. It now exists as AWS SDK For pandas in documentation and awswrangler in code.

AWS Lambda Layers

A Lambda layer is an archive containing code like libraries, dependencies, or custom runtimes. Layers can be both created manually and provided by AWS and third parties. Each Lambda function can include up to five layers.

Layers can be shared between functions, reducing code duplication and package sizes. This reduces storage costs and lets the smaller packages deploy markedly faster. Layers also separate dependencies from function code, supporting decoupling and separation of concerns.

AWS Step Functions

AWS Step Functions is a serverless orchestration service that integrates with other AWS services to build application workflows as a series of event-driven steps. For example, chaining Athena queries and ML model training.

Central to the Step Functions service are the concepts of States and State Machines:

  • States represent single steps or tasks in a workflow, and can be one of several types. The Step Functions Developer Guide has a full list of states.

The AWS Step Functions Developer Guide’s welcome page has more details including workflow types, use cases and a variety of sample projects.

Apache Parquet

Onto the data architecture! Let’s start by choosing a structured file type for the bronze data:

Apache Parquet is an open source, column-oriented data file format designed for efficient data storage and retrieval. It provides efficient data compression and encoding schemes with enhanced performance to handle complex data in bulk.

Databricks: What is Parquet?

There’s a more detailed explanation in the Parquet documentation too. So why choose Parquet over something like CSV? Well:

  • Size: Parquet supports highly efficient compression, so files take up less space and are cheaper to store than CSVs.
  • Performance: Parquet files store metadata about the data they hold. Query engines can use this metadata to find only the data needed, whereas with CSVs the whole file must be read first. This reduces the amount of processed data and enhances query performance.
  • Compatibility: Parquet is an open standard supported by various data processing frameworks including Apache Spark, Apache Hive and Presto. This means that data stored in Parquet format can be read and processed across many platforms and services.

Data Lakehouse

A Data Lakehouse is an emerging data architecture combining the centralized storage of raw data synonymous with Data Lakes with the transactional and analytical processing associated with Data Warehouses.

The result is a unified platform for efficient data management, analytics, and insights. Lakehouses have gained popularity as cloud services increasingly support them, with AWS, Azure and GCP all providing Lakehouse services.

This segues neatly into…

Medallion Architecture

Medallion Architecture is a data design pattern for logically organizing data in a Lakehouse. It aims to improve data quality as it flows through various layers incrementally. Names for these layers vary, tending to be Bronze, Silver, and Gold.

Implementations of the Medallion Architecture also vary. I like this Advancing Analytics video, which maps the Medallion Architecture to their approach. Despite the title it’s not a negative video, instead outlining how the three layers don’t necessarily fit every use case.

I’m using Raw and Bronze layers here because they best fit what I’m doing with my data.

Architectural Updates

In this section, I examine the changes made to my existing architecture.

Amazon S3

I’ve created a new data-lakehouse-bronze s3 bucket in the same region as the data-lakehouse-raw bucket to separate the two data layers.

Why use two buckets instead of one bucket with two prefixes? Well, after much research I’ve not found a right or wrong answer for this. There’s no difference in cost, performance or availability as long as all objects are stored in the same AWS region.

I chose two buckets because I find it easier to manage multiple buckets with flat structures and small bucket policies, as opposed to single buckets with deep structures and large bucket policies.

The truest answer is ‘it depends’, as other factors can come into play like:

  • Data Sovereignty: S3 bucket prefixes exist in the same region as the parent bucket. Regulations like GDPR and CCPA may require using separate buckets in order to isolate data within designated locations.

AWS SNS

I previously had two standard SNS Topics:

  • wordpress-api-raw for Lambda function alerts
  • failure-lambda for Lambda Destination alerts.

Firstly, there’s now an additional failure-stepfunction topic for any state machine failures.

Secondly, I’ve replaced my wordpress-api-raw topic with a data-lakehouse-raw topic to simplify my alerting channels and allow resource reuse. I’ve also created a new data-lakehouse-bronze topic for bronze process alerts.

Why two data topics? Well, different teams and services care about different things. A bronze-level failure may only concern the Data Engineering team as no other teams consume the data. Conversely, a gold-level failure will concern the AI and MI teams as it impacts their models and reports. Having separate SNS topics for each layer type enables granular monitoring controls.

AWS Parameter Store

Finally, Parameter Store needs the new S3 bucket name and SNS ARNs. I’ve replaced the /sns/pipeline/wordpressapi/raw parameter with /sns/data/lakehouse/raw to preserve the name schema.

I’m now storing five parameters:

  • 2x S3 Bucket names (Raw and Bronze)
  • 2x SNS Topic ARNs (Raw and Bronze notifications)
  • WordPress API Endpoints (unchanged)

Architectural Diagram

There are two diagrams this time! Firstly, here is the data_wordpressapi_bronze AWS Lambda function:

Where:

  1. AWS Lambda calls Parameter Store for S3 and SNS parameters. Parameter Store returns these to AWS Lambda.
  2. Lambda function gets raw WordPress JSON data from S3 Raw Bucket.
  3. Lambda function transforms the raw WordPress JSON data to bronze WordPress Parquet data and puts the new object in the S3 Bronze Bucket.

Meanwhile, Lambda is writing to a CloudWatch Log Group throughout its invocation. If there’s a failure, the Lambda function publishes a message to an SNS topic. SNS then delivers this message to the user’s subscribed email address.

Next, this is the AWS Step Functions WordPress bronze orchestration process:

Where:

  1. EventBridge Schedule invokes the State Machine.
  2. State Machine invokes the Raw Lambda function.
  3. State Machine invokes the Bronze Lambda function.

The State Machine also has its own logging and alerting channels.

Python

In this section, I work on my raw and bronze Python scripts for the WordPress pipeline orchestration process.

Raw Script Updates

I try to update my existing resources when I find something pertinent online. My latest find was this Indently video that covers, amongst other things, type annotations:

So how are type annotations different from type hints? Type annotations were released in 2006 and aimed to standardize function parameters and return value annotation. Type hints (released in 2014) then added updated definitions and conventions to enrich type annotations further.

The type hints PEP shows this difference between the two:

When used in a type hint, the expression None is considered equivalent to type(None)

https://peps.python.org/pep-0484/#using-none

So in this function:

Python
def send_email(name: str, message: str) -> None:
  • name: str is an example of type annotation because the parameter name is of type string.
  • -> None is an example of a type hint because although None isn’t a type, it confirms that the function has no output.

So what’s changed in my raw script?

Updated Import & Functions

Let’s open with a new import:

Python
from botocore.client import BaseClient

BaseClient serves as a foundational base class for AWS service clients within botocore – a low-level library providing the core functionality of boto3 (the AWS Python SDK) and the AWS CLI.

I’m using it here to add type annotations to my boto3 clients. For example, send_sns_message already had these annotations:

Python
def send_sns_message(sns_client, topic_arn: str, subject:str, message: str):

I’ve now annotated sns_client with BaseClient to indicate its boto3 relation. I’ve also added a -> None type hint to confirm the function has no output:

Python
def send_sns_message(sns_client: BaseClient, topic_arn: str, subject:str, message: str) -> None:

Elsewhere, I’ve added the BaseClient annotation to get_parameter_from_ssm‘s ssm_client parameter:

Python
def get_parameter_from_ssm(ssm_client: BaseClient, parameter_name: str) -> str:

And put_s3_object‘s s3_client parameter:

Python
def put_s3_object(s3_client: BaseClient, bucket: str, prefix:str, name: str, json_data: str, suffix: str) -> bool:

put_s3_object also has new prefix and suffix parameters. Before this, it was hard-coded to create JSON objects in a wordpress-api S3 prefix:

Python
    try:
        logging.info(f"Attempting to put {name} data in {bucket} bucket...")
        s3_client.put_object(
            Body = json_data,
            Bucket = bucket,
            Key = f"wordpress-api/{name}.json"
        )

Not any more! The S3 prefix and object suffix can now be changed dynamically:

Python
    try:
        logging.info(f"Attempting to put {name} data in {bucket} bucket's {prefix}/{name} prefix...")
        s3_client.put_object(
            Body = json_data,
            Bucket = bucket,
            Key = f"{prefix}/{name}/{name}.{suffix}"
        )

This improves put_s3_object‘s reusability as I can now pass any prefix and suffix to it during a function call. For example, this call creates a JSON object:

Python
ok = put_s3_object(client_s3, s3_bucket, data_source, object_name, api_json_string, 'json')

While this creates a CSV object:

Python
ok = put_s3_object(client_s3, s3_bucket, data_source, object_name, api_json_string, 'csv')

Likewise, this creates a TXT object:

Python
ok = put_s3_object(client_s3, s3_bucket, data_source, object_name, api_json_string, 'txt')

I can also set data_source (which I’ll cover shortly) to any S3 prefix, giving total control over where the object is stored.

Updated Variables

Next, some of my variables need to change. My SNS parameter name needs updating from:

Python
# AWS Parameter Store Names
parametername_s3bucket = '/s3/lakehouse/name/raw'
parametername_snstopic = '/sns/pipeline/wordpressapi/raw'
parametername_wordpressapi = '/wordpress/amazonwebshark/api/mysqlendpoints'

To:

Python
# AWS Parameter Store Names
parametername_s3bucket = '/s3/lakehouse/name/raw'
parametername_snstopic = '/sns/data/lakehouse/raw'
parametername_wordpressapi = '/wordpress/amazonwebshark/api/mysqlendpoints'

I also need to lay the groundwork for put_s3_object‘s new prefix parameter. I used to have a lambdaname variable that was used in the logs:

Python
# Lambda name for messages
lambdaname = 'data_wordpressapi_raw'

I’ve replaced this with two new variables. data_source records the data’s origin, which matches my S3 prefix naming schema. function_name then adds context to data_source to match my Lambda function naming schema:

Python
# Lambda name for messages
data_source = 'wordpress_api'
function_name = f'data_{data_source}_raw'

data_source is then passed to the put_s3_object function call when creating raw objects:

Python
ok = put_s3_object(client_s3, s3_bucket, data_source, object_name, api_json_string)

While function_name is used in the logs when referring to the Lambda function:

Python
    # Check an S3 bucket has been returned.
    if not s3_bucket_raw:
        message = f"{function_name}: No S3 Raw bucket returned."
        subject = f"{function_name}: Failed"

Updated Script Body

My variables all now have type annotations. They’ve gone from:

Python
    # AWS Parameter Store Names
    parametername_s3bucket = '/s3/lakehouse/name/raw'
    parametername_snstopic = '/sns/data/lakehouse/raw'
    parametername_wordpressapi = '/wordpress/amazonwebshark/api/mysqlendpoints'

    # Lambda name for messages
    data_source = 'wordpress_api'
    function_name = f'data_{data_source}_raw'

    # Counters
    api_call_timeout = 30
    endpoint_count_all = 0
    endpoint_count_failure = 0
    endpoint_count_success = 0

To:

Python
    # AWS Parameter Store Names
    parametername_s3bucket: str = '/s3/lakehouse/name/raw'
    parametername_snstopic: str = '/sns/data/lakehouse/raw'
    parametername_wordpressapi: str = '/wordpress/amazonwebshark/api/mysqlendpoints'

    # Lambda name for messages
    data_source: str = 'wordpress_api'
    function_name: str = f'data_{data_source}_raw'

    # Counters
    api_call_timeout: int = 30
    endpoint_count_all: int = 0
    endpoint_count_failure: int = 0
    endpoint_count_success: int = 0

This is helpful when the variables are passed in from settings files or external services and are not immediately apparent. So a good habit to get into!

Bronze Script

Now let’s talk about the new script, which transforms raw S3 JSON objects into bronze S3 Parquet objects. Both raw and bronze WordPress scripts will then feed into an AWS orchestration workflow.

Reused Raw Functions

The following functions are re-used from the Raw script with no changes:

Get Filename Function

Here, I want to get each S3 path’s object name. The object name has some important uses:

  • Using it instead of the full S3 path makes the logs easier to read and cheaper to store.
  • Using it during bronze S3 object creation ensures consistent naming.

A typical S3 path has the schema s3://bucket/prefix/object.suffix, from which I want object.

This function is a remake of the raw script’s Get Filename function. This time, the source string is an S3 path instead of an API endpoint:

I define a get_objectname_from_s3_path function, which expects a path argument with a string type hint and returns a new string.

Firstly, my name_full variable uses the rsplit method to capture the substring I need, using forward slashes as separators. This converts s3://bucket/prefix/object.suffix to object.suffix.

Next, my name_full_last_period_index variable uses the rfind method to find the last occurrence of the period character in the name_full string.

Finally, my name_partial variable uses slicing to extract a substring from the beginning of the name_full string up to (but not including) the index specified by name_full_last_period_index. This converts object.suffix to object.

If the function cannot return a string, an exception is logged and a blank string is returned instead.

Get Data Function

Next, I want to read data from an S3 JSON object in my Raw bucket and store it in a pandas DataFrame.

Here, I define a get_data_from_s3_object function that returns a pandas DataFrame and expects three arguments:

  • boto3_session: the authenticated session to use with a BaseClient type hint.
  • s3_object: the S3 object path with a string type hint.
  • name: the S3 object name with a string type hint (used for logging).

This function uses AWS SDK For pandas s3.read_json to read the data from the S3 object path using the existing boto3_session authentication.

If data is found then get_data_from_s3_object returns a populated DataFrame. Otherwise, an empty DataFrame is returned instead.

Put Data Function

Finally, I want to convert the DataFrame to Parquet and store it in my bronze S3 bucket.

I define a put_s3_parquet_object function that expects four arguments:

  • df: the pandas DataFrame containing the raw data.
  • name: the S3 object name.
  • s3_object_bronze: the S3 path for the new bronze object
  • session: the authenticated boto3 session to use.

I give string type hints to the name and s3_object_bronze parameters. session gets the same BaseClient hint as before, and df is identified as a pandas DataFrame.

I open a try except block that uses s3.to_parquet with the existing boto3_session to upload the DataFrame data to S3 as a Parquet object. If this operation succeeds, the function returns True. If it fails, a botocore exception is logged and the function returns False.

Imports & Variables

The bronze script has two new imports to examine: awswrangler and pandas:

Python
import logging
import boto3
import botocore
import awswrangler as wr
import pandas as pd
from botocore.client import BaseClient

I’ve used both before. Here, pandas handles my in-memory data storage and awswrangler handles my S3 interactions.

There are also parameter changes. I’ve added Parameter Store names for both the bronze S3 bucket and the SNS topic. I’ve kept the raw S3 bucket parameter as awswrangler needs it for the get_data_from_s3_object function.

Python
parametername_s3bucket_raw: str = '/s3/lakehouse/name/raw'
parametername_s3bucket_bronze: str = '/s3/lakehouse/name/bronze'
parametername_snstopic: str = '/sns/data/lakehouse/bronze'

I’ve also swapped out _raw for _bronze in function_name, and renamed the counters from endpoint_count to object_count to reflect their new function:

Python
    # Lambda name for messages
    data_source: str = 'wordpress_api'
    function_name: str = f'data_{data_source}_bronze'

    # Counters
    object_count_all: int = 0
    object_count_failure: int = 0
    object_count_success: int = 0

Script Body

Most of the bronze script is reused from the raw script. Tasks like logging config, name parsing and validation checks only needed the updated parameters! There are some changes though, as S3 is now my data source and I’m also doing additional tasks.

Firstly, I need to get the raw S3 objects. The AWS SDK For pandas S3 class has a list_objects function which is purpose-built for this:

Python
s3_objects_raw = wr.s3.list_objects(
      path = f's3://{s3_bucket_raw}/{data_source}',
      suffix = 'json',
      boto3_session = session)
  • path is the S3 location to list – in this case the raw S3 bucket’s wordpress_api prefix.
  • suffix filters the list by the specified suffix.
  • boto3_session specifies my existing boto3_session to prevent unnecessary re-authentication.

During the loop, my script checks if the pandas DataFrame returned from get_data_from_s3_object contains data. If it’s empty then the loop ends, otherwise the column and row counts are logged:

Python
if df.empty:
  logging.warning(f"{object_name} DataFrame is empty!")
  endpoint_count_failure += 1
  continue
  
logging.info(f'{object_name} DataFrame has {len(df.columns)} columns and {len(df)} rows.')

Assuming all checks succeed, I want to put a new Parquet object into my bronze S3 bucket. AWS SDK For pandas has an s3.to_parquet function that does this using a pandas DataFrame and an S3 path.

I already have the DataFrame so let’s make the path. This is done by the s3_object_bronze parameter, which joins existing parameters with additional characters. This is then passed to put_s3_parquet_object:

Python
s3_object_bronze = f's3://{s3_bucket_bronze}/{data_source}/{object_name}/{object_name}.parquet'

logging.info(f"Attempting {object_name} S3 Bronze upload...")
ok = put_s3_parquet_object(df, object_name, s3_object_bronze, session)

That’s the bronze script done. Now to deploy it to AWS Lambda.

Lambda

In this section, I configure and deploy my Bronze Lambda function.

Hitting Size Limits

So, remember when I said that I expected my future Lambda deployments to improve? Well, this was the result of my retrying the virtual environment deployment process the Raw Lambda used:

2024 03 08 LambdaError

While my zipped raw function is 19.1 MB, my zipped bronze function is over five times bigger at 101.6 MB! My poorly optimised package wouldn’t cut it this time, so I prepared for some pruning. Until I discovered something…

Using A Layer

There’s a managed AWS SDK for pandas Lambda layer!

2024 03 05 LayerAWSSDKPandas

It can be selected in the Lambda console or programmatically called from this list of AWS SDK for pandas Managed Layer ARNs, which covers:

  • All AWS commercial regions.
  • All Python versions currently supported by Lambda (currently 3.8+)
  • Both Lambda architectures.

Additionally, the Lambda Python 3.12 runtime includes boto3 and botocore. So by using this runtime and the managed layer, I’ve gone from a large deployment package to no deployment package! And because my function is now basically just code, I can view and edit that code in the Lambda console directly.

Lambda Config

My Bronze Lambda function borrows several config settings from the raw one, including:

Where it differs is the IAM setup. I needed additional permissions anyway because this function is reading from two S3 buckets now, but by the time I was done the policy was hard to read, maintain and troubleshoot:

JSON
{
    "Version": "2012-10-17",
    "Statement": [
        {
            "Sid": "VisualEditor0",
            "Effect": "Allow",
            "Action": [
                "s3:PutObject",
                "s3:GetObject",
                "sns:Publish",
                "s3:ListBucket",
                "logs:CreateLogGroup"
            ],
            "Resource": [
                "arn:aws:s3:::data-lakehouse-raw/wordpress_api/*",
                "arn:aws:s3:::data-lakehouse-bronze/wordpress_api/*",
                "arn:aws:s3:::data-lakehouse-raw",
                "arn:aws:s3:::data-lakehouse-bronze",
                "arn:aws:logs:eu-west-1:REDACTED:*",
                "arn:aws:sns:eu-west-1:REDACTED:data-lakehouse-raw",
                "arn:aws:sns:eu-west-1:REDACTED:data-lakehouse-bronze"
            ]
        },
        {
            "Sid": "VisualEditor1",
            "Effect": "Allow",
            "Action": [
                "logs:CreateLogStream",
                "logs:PutLogEvents",
                "ssm:GetParameter"
            ],
            "Resource": [
                "arn:aws:logs:eu-west-1:REDACTED:log-group:/aws/lambda/data_wordpressapi_bronze:*",
                "arn:aws:ssm:eu-west-1:REDACTED:parameter/s3/lakehouse/name/raw",
                "arn:aws:ssm:eu-west-1:REDACTED:parameter/s3/lakehouse/name/bronze",
                "arn:aws:ssm:eu-west-1:REDACTED:parameter/sns/data/lakehouse/raw",
                "arn:aws:ssm:eu-west-1:REDACTED:parameter/sns/data/lakehouse/bronze"
            ]
        }
    ]
}

So let’s refactor it! The below policy has the same actions, grouped by service and with appropriately named Sids:

JSON
{
    "Version": "2012-10-17",
    "Statement": [
        {
            "Sid": "CloudWatchLogGroupActions",
            "Effect": "Allow",
            "Action": [
                "logs:CreateLogGroup"
            ],
            "Resource": [
                "arn:aws:logs:eu-west-1:REDACTED:*"
            ]
        },
        {
            "Sid": "CloudWatchLogStreamActions",
            "Effect": "Allow",
            "Action": [
                "logs:CreateLogStream",
                "logs:PutLogEvents"
            ],
            "Resource": [
                "arn:aws:logs:eu-west-1:REDACTED:log-group:/aws/lambda/data_wordpressapi_bronze:*"
            ]
        },
        {
            "Sid": "S3BucketActions",
            "Effect": "Allow",
            "Action": [
                "s3:ListBucket"
            ],
            "Resource": [
                "arn:aws:s3:::data-lakehouse-raw",
                "arn:aws:s3:::data-lakehouse-bronze"
            ]
        },
        {
            "Sid": "S3ObjectActions",
            "Effect": "Allow",
            "Action": [
                "s3:PutObject",
                "s3:GetObject"
            ],
            "Resource": [
                "arn:aws:s3:::data-lakehouse-raw/wordpress_api/*",
                "arn:aws:s3:::data-lakehouse-bronze/wordpress_api/*"
            ]
        },
        {
            "Sid": "SNSActions",
            "Effect": "Allow",
            "Action": [
                "sns:Publish"
            ],
            "Resource": [
                "arn:aws:sns:eu-west-1:REDACTED:data-lakehouse-bronze"
            ]
        },
        {
            "Sid": "ParameterStoreActions",
            "Effect": "Allow",
            "Action": [
                "ssm:GetParameter"
            ],
            "Resource": [
                "arn:aws:ssm:eu-west-1:REDACTED:parameter/s3/lakehouse/name/raw",
                "arn:aws:ssm:eu-west-1:REDACTED:parameter/s3/lakehouse/name/bronze",
                "arn:aws:ssm:eu-west-1:REDACTED:parameter/sns/data/lakehouse/bronze"
            ]
        }
    ]
}

Much better! This policy is now far easier to read and update.

There’s also a clear distinction between the bucket-level s3:ListBucket operation and the object-level s3:PutObject and s3:GetObject operations now. Getting these wrong can have big consequences, so the clearer the better!

One deployment and test later, and I have some new S3 objects!

[INFO]: WordPress API Bronze process complete: 5 Successful | 0 Failed.

REPORT RequestId: 899d1658-f7de-4e74-8d64-b4f029fe2bec	Duration: 7108.50 ms	Billed Duration: 7109 ms	Memory Size: 250 MB	Max Memory Used: 250 MB	Init Duration: 4747.38 ms

So now I have two Lambda functions with some requirements around them:

  • They need to run sequentially.
  • The Raw Lambda must finish before the Bronze Lambda starts.
  • If the Raw Lambda fails then the Bronze Lambda shouldn’t run at all.

Now that AWS Lambda is creating WordPress raw and bronze objects, it’s time to start thinking about orchestration!

Step Functions & EventBridge

In this section, I create both an AWS Step Functions State Machine and an Amazon EventBridge Schedule for my WordPress bronze orchestration process.

State Machine Requirements

Before writing any code, let’s outline the steps I need the state machine to perform:

  1. data_wordpressapi_raw Lambda function is invoked. If it succeeds then move to the next step. If it fails then send a notification and end the workflow reporting failure.
  2. data_wordpressapi_bronze Lambda function is invoked. If it succeeds then end the workflow reporting success. If it fails then send a notification and end the workflow reporting failure.

With the states defined, it’s time to create the state machine.

State Machine Creation

The following state machine was created using Step Functions Workflow Studio – a low-code visual designer released in 2021, with drag-and-drop functionality that auto-generates code in real-time:

Workflow Studio produced this section’s code and diagrams.

Firstly I create a data_wordpressapi_raw task state to invoke my Raw Lambda. This task uses the lambda:invoke action to invoke my data_wordpressapi_raw function. I set the next state as data_wordpressapi_bronze and add a Catch block that sends all errors to a PublishFailure state (which I’ll define later):

JSON
    "data_wordpressapi_raw": {
      "Type": "Task",
      "Resource": "arn:aws:states:::lambda:invoke",
      "Parameters": {
        "Payload.$": "$",
        "FunctionName": "arn:aws:lambda:eu-west-1:REDACTED:function:data_wordpressapi_raw:$LATEST"
      },
      "Next": "data_wordpressapi_bronze",
      "Catch": [
        {
          "ErrorEquals": [
            "States.ALL"
          ],
          "ResultPath": "$.Error",
          "Next": "PublishFailure"
        }
      ],
      "TimeoutSeconds": 120
    }

Note the TimeoutSeconds parameter. All my task states will have 120-second timeouts. These stop the state machine from waiting indefinitely if the task becomes unresponsive, and are recommended best practice. Also note that state machines wait for Lambda invocations to finish by default, so no additional config is needed for this.

Next, I create a data_wordpressapi_bronze task state to invoke my Bronze Lambda. This task uses the lambda:invoke action to invoke my data_wordpressapi_bronze function. I then add a Catch block that sends all errors to a PublishFailure state.

Finally, "End": true designates this state as a terminal state which ends the execution if the task is successful:

JSON
    "data_wordpressapi_bronze": {
      "Type": "Task",
      "Resource": "arn:aws:states:::lambda:invoke",
      "Parameters": {
        "Payload.$": "$",
        "FunctionName": "arn:aws:lambda:eu-west-1:973122011240:function:data_wordpressapi_bronze:$LATEST"
      },
      "Catch": [
        {
          "ErrorEquals": [
            "States.ALL"
          ],
          "ResultPath": "$.Error",
          "Next": "PublishFailure"
        }
      ],
      "TimeoutSeconds": 120,
      "End": true
    }

Finally, I create a PublishFailure task state that publishes failure notifications. This task uses the sns:Publish action to publish a simple message to the failure-stepfunction SNS Topic ARN. "End": true marks this task as the other potential way the state machine execution can end:

JSON
    "PublishFailure": {
      "Type": "Task",
      "Resource": "arn:aws:states:::sns:publish",
      "Parameters": {
        "TopicArn": "arn:aws:sns:eu-west-1:REDACTED:failure-stepfunction",
        "Message": "An error occurred in the state machine: { \"error\": \"$.Error\" }"
      },
      "End": true,
      "TimeoutSeconds": 120
    }

While both Lambdas already have SNS alerting, the state machine itself may also fail so the added observability is justified. This Marcia Villalba video was very helpful here:

And that’s everything I need! At this point Wordflow Studio gives me two things – firstly the state machine’s code, which I’ve committed to GitHub. And secondly this handy downloadable diagram:

stepfunctions graph

State Machine Config

It’s now time to think about security and monitoring.

When new state machines are created in the AWS Step Functions console, an IAM Role is created with policies based on the state machine’s resources. The nuances and templates are covered in the Step Functions Developer Guide, so let’s examine my WordPress_Raw_To_Bronze state machine’s auto-generated IAM Role consisting of two policies:

JSON
{
    "Version": "2012-10-17",
    "Statement": [
        {
            "Effect": "Allow",
            "Action": [
                "xray:PutTraceSegments",
                "xray:PutTelemetryRecords",
                "xray:GetSamplingRules",
                "xray:GetSamplingTargets"
            ],
            "Resource": [
                "*"
            ]
        }
    ]
}

This supports the AWS X-Ray integration with AWS Step Functions. If X-Ray trancing is never enabled then this policy is unused.

Besides X-Ray tracing, there is also an option to log a state machine’s execution history to CloudWatch Logs. There are three log levels available plus a fourth default choice: OFF. Each state machine retains recent execution history and I’ve got no need to keep that history long-term, so I leave the log retention disabled. Remember – CloudWatch Logs is only free for the first 5GB!

State Machine Testing

There are various ways to test a state machine. There’s a testing and debugging section in the developer guide that goes into further details, the three main options being:

I’ll focus on console testing here.

Both individual states and the entire state machine can be tested in the console. Each state can be tested in isolation (using the TestState API under the hood) with customisable inputs and IAM roles. This is great for checking the state outputs are correct, and that the attached IAM role is sufficient.

The state machine itself can also be tested via on-demand execution. The Execution Details page shows the state machine’s statistics and events, and has great coverage in the developer guide.

During testing, my WordPress_Raw_To_Bronze state machine returned this error:

States.Runtime in step: data_wordpressapi_bronze.

An error occurred while executing the state 'data_wordpressapi_bronze' (entered at the event id #7). Unable to apply Path transformation to null or empty input.

This turned out to be a problem with the OutputPath parameter, which Wordflow Studio enables by default:

2024 03 04 StepFunctionsOutPutPath

I’m not using this setting for anything, so I disabled it to solve this problem.

Eventbridge Schedule

Finally, I want to automate the execution of my state machine. This calls for an EventBridge Schedule!

EventBridge makes this quite simple, using mostly the same process as last time. The Step Functions StartExecution operation is a templated target like Lambda’s Invoke operation, so it’s a case of selecting the WordPress_Raw_To_Bronze state machine from the list and updating the schedule’s IAM role accordingly.

And that’s it! EventBridge now executes the state machine at 07:00 each morning. The state machine then sequentially invokes both Lambda functions and catches any errors.

Costs

In this section, I’ll examine my recent AWS WordPress bronze orchestration process costs.

Let’s start with Step Functions. There are two kinds of Step Function workflow:

  • Standard workflows are charged based on the number of state transitions. These are counted each time a workflow step is executed. The first 4000 transitions each month are free. After that, every 1000 transitions cost $0.025.
  • Express workflows are priced by the number of executions, duration, and memory consumption. The specifics of these criteria, coupled with full details of all charges are on the Step Functions pricing page.

I’m using standard workflows, and as of 26 March I’ve used 118 state transitions. In other words, free! Elsewhere, my costs are broadly on par with previous months. These are my S3 costs from 2024-02-01 to 2024-03-26:

S3 ActionsMonthUsageCost
PUT, COPY, POST, or LIST requests2024-0264,1960.32
PUT, COPY, POST, or LIST requests2024-0317,5660.09
GET and all other requests2024-02101,4620.04
GET and all other requests2024-038,6560.00
GB month of storage used2024-020.1090.00
GB month of storage used2024-030.1610.00

And this is my recent free tier usage from 2024-02-01 to 2024-03-26:

ServiceMonthUsage
EventBridge2024-0231 Invocations
EventBridge2024-0325 Invocations
Lambda2024-02122.563 Second Compute
Lambda2024-0284 Requests
Lambda2024-0382.376 Second Compute
Lambda2024-0358 Requests
Parameter Store2024-0234 API Requests
Parameter Store2024-0325 API Requests
SNS2024-028 Email-JSON Notifications
SNS2024-02438 API Requests
SNS2024-033 Email-JSON Notifications
SNS2024-03205 API Requests

So my only costs are still for storage.

Resources

The following items have been checked into the amazonwebshark GitHub repo for the AWS WordPress bronze orchestration process, available via the button below:

  • Updated data_wordpressapi_raw Python script & requirements.txt file.
  • New data_wordpressapi_bronze Python script & requirements.txt file.
  • WordPress_Raw_To_Bronze state machine JSON.
GitHub-BannerSmall

Summary

In this post, I created my WordPress pipeline’s bronze data orchestration process using AWS Lambda layers and AWS Step Functions.

I’ve wanted to try Step Functions out for a while, and all things considered they’re great! Workflow Studio is easy to use, and the templates and tutorials undoubtedly highlight the value that Step Functions can bring.

Additionally, the integration with both EventBridge Scheduler and other AWS services makes Step Functions a compelling orchestration service for both my ongoing WordPress bronze work and the future projects in my pipeline. This combined with some extra Lambda layers will reduce my future dev and test time.

If this post has been useful then the button below has links for contact, socials, projects and sessions:

SharkLinkButton 1

Thanks for reading ~~^~~

Categories
Data & Analytics

Using Python & AWS To Extract WordPress API Data

In this post, I use popular Python modules & AWS managed serverless services to extract WordPress API data.

Table of Contents

Introduction

Last year, I tested my Python skills by analysing amazonwebshark’s MySQL database with Python. I could have done the same in 2024, but I wouldn’t have learned anything new and it felt a bit pointless. One of my YearCompass 2023-2024 goals is to build more, so I instead decided to create a data pipeline using popular Python modules & AWS services to extract my WordPress data using their API.

A data pipeline involves many aspects, which future posts will explore. This post focuses on extracting data from my WordPress database and storing it as flat files in AWS.

Firstly, I’ll discuss my architectural decisions for this part of the pipeline. Then I’ll examine the functions in my Python script that interact with AWS and perform data extraction. Finally, I’ll bring everything together and explain how it all works.

Architectural Decisions

In this section, I examine my architectural decisions and outline the pipeline’s processes.

Programming Language

My first decision concerned which programming language to use. I’m using Python here for several reasons:

  • I use Python at work and am always looking to refine my skills.
  • Several AWS services natively support Python.
  • Python SDKs like Boto3 and awswrangler support my use case.

Data Extraction

Next, I chose what data to extract from my WordPress MySQL database. I’m interested in the following tables, which are explained in greater detail in 2023’s Deep Dive post:

In November I migrated amazonwebshark to Hostinger, whose MySQL remote access policy requires an IP address. While this isn’t a problem locally, AWS is a different story. I’d either need an EC2 instance with a static IP, or a Lambda function with several networking components. These are time and money costs I’d prefer to avoid, so no calling the database.

Fortunately, WordPress has an API!

WordPress API

The WordPress REST API lets applications interact with WordPress sites by sending and receiving data as JSON objects. Public content like posts and comments are publicly accessible via the API, while private and password-protected content requires authentication.

While researching options, I stumbled across MiniOrange‘s Custom API for WordPress plugin. It has a simple interface and a good feature list:

Custom API for WordPress plugin allows you to create WordPress APIs / custom endpoints / REST APIs. You can Fetch / Modify / Create / Delete data with an easy-to-use graphical interface. You can also build custom APIs by writing custom SQL queries for your WP APIs.

https://plugins.miniorange.com/custom-api-for-wordpress

This meant I could start using it straight away!

The free plan lets users create as many endpoints as needed. But it also has a pretty vital limitation – API key authentication is only possible on their Premium plan. In the free plan, all endpoints are public!

Now let me be clear – this isn’t necessarily a problem. After all, the WordPress API is public! And my WordPress data doesn’t contain any PII or sensitive data. No – the risk I’m trying to address here isn’t a security one.

Public endpoints can be called by anyone or anything at any time. With WordPress, they have dedicated, optimised resources that auto-scale on demand. Whereas I have one Hostinger server that is doing every site process. Could it be DDoSed into oblivion by tons of API calls from bad actors? Do I want to find out?

As I’m using the plugin’s free tier here, I’ll mitigate my risks by:

  • Adding random strings to the endpoints to make them less guessable.
  • Not showing the endpoints in my script or this post.

So ok – how will I get the API endpoints then?

Parameters

Next, I need to decide how my script will get the endpoints to query and the S3 bucket name to store the results.

With previous scripts, I’ve used features like gitignore and dot sourcing to hide parameters I don’t want to expose. While this works, it isn’t ideal. Dot sourcing breaks if the file paths change, and even with gitignore any credentials are still hardcoded into the script locally.

A better approach is to use a process similar to a password manager, where an authenticated user or role can request and receive credentials using secure channels. AWS has two services for this requirement: AWS Secrets Manager and AWS Systems Manager Parameter Store.

Secrets Manager Vs Parameter Store

Secrets Manager is designed for managing and rotating sensitive information like database credentials, API keys, and other types of secrets. Conversely, Parameter Store is designed for storing configuration data, including plaintext or sensitive information, in a hierarchical structure.

I’m using Parameter Store here for two reasons:

Storage

Next, I need to decide where to store the API data. And I’m already using AWS for parameters, so I was always going to end up using S3. But what makes S3 an obvious fit here?

  • Integration: S3 is one of the oldest and most mature AWS services. It is well supported by both the Python SDK and other AWS services like EventBridge, Glue and Athena for processing and analysis.
  • Scalability: S3 will accept objects from a couple of bytes to terabytes in size (although if I’m generating terabytes of data here something is very wrong!). I can run my script at any time and as often as I want, and S3 will handle all the data it receives.
  • Cost: S3 won’t be entirely free here because I’ll be creating and accessing lots of data during testing. But even so, I expect it to cost me pence. I’m not keeping versions at this stage either, so my costs will only be for the current objects.

Much has been written about S3 over the years, so I’ll leave it at this.

Use Of Flat Files

Finally, let’s examine the decision to store flat files in the first place. The data is already in a database – why duplicate it?

Decoupling: Putting raw data into S3 at an early stage of the pipeline decouples the database at that point. Databases can become inaccessible, corrupted or restricted. The S3 data would be completely unaffected by these database issues, allowing the pipeline to persist with the available data.

Reduced Server Load: Storing data in S3 means the rest of the pipeline reads the S3 objects instead of the database tables. This reduces the Hostinger server’s load, letting it focus on transactional queries and site processes. S3 is almost serving as a read replica here.

Security: It is simpler for AWS services to access data stored in S3 than the same data stored on Hostinger’s server. AWS services accessing server data require MySQL credentials and a whitelisted IP. In contrast, AWS services accessing S3 data require…an IAM policy.

Architectural Diagram

This is an architectural diagram of the expected process:

  1. User triggers the Python function.
  2. Python interacts with AWS Python SDK.
  3. SDK calls Parameter Store for WordPress & S3 parameters. These are returned to Python via the SDK.
  4. Python calls WordPress API. WordPress API returns data.
  5. Python writes API data to S3 bucket via the SDK.

Setup & Config

I completed some local and cloud configurations before I started writing my Python script to extract WordPress API data. This section explores my laptop setup and AWS infrastructure.

Local Machine

I’m using Windows 10 and WSL version 2 to create a Linux environment with the Ubuntu 22.04.3 LTS distribution. I’m using Python 3.12, with a fresh Python virtual environment for installing my dependencies.

AWS Data Storage

I already have an S3 bucket for ingesting raw data, so that’s sorted. I made a wordpress-api prefix in that bucket to partition the uploaded data.

This bucket doesn’t have versioning enabled because it has a high object turnover. Versioning is unneeded and could get very expensive without a good lifecycle policy! While this would be simple to do, it’s a wasted effort at this point in the pipeline.

Another factor against versioning is that I can recreate S3 objects from the MySQL database. As objects are reproducible, there is no need for the delete protection offered by versioning.

AWS Parameters

I’m using Parameter Store to hold two parameters: my S3 bucket name and my WordPress API endpoints. Each of these uses a different parameter type.

The S3 bucket name is a simple string that uses the String Parameter Type. This is intended for blocks of text up to 4096 characters (4kb). The API endpoints are a collection of strings generated by the WordPress plugin. I use the StringList Parameter Type here, which is intended for comma-separated lists of values. This lets me store all the endpoints in a single parameter, optimising my code and reducing my AWS API calls.

Python Script

In this section, I examine the various parts of my Python script that will extract data from the WordPress API. This includes functions, methods and intended functionality.

Advisory

Before continuing I want to make something clear. This advisory is on my amazonwebshark artefacts GitHub repo, but it bears repeating here too:

Artefacts within this post have been created at a certain point in my learning journey. They do not represent best practices, may be poorly optimised or include unexpected bugs, and may become obsolete.

If I find better ways of doing these processes in future then I’ll link to or update posts where appropriate.

Logging

Firstly, I’ll sort out some logging.

The logging module is a core Python library, so I can import it without a pip install command. I then use logging‘s basicConfig function to set my desired parameters:

Python
logging.basicConfig(
    level=logging.INFO,
    format="%(asctime)s [%(levelname)s]: %(message)s",
    datefmt="%Y-%m-%d %H:%M:%S"
    )

level sets the logging level to start at. logging.INFO records information about events like authentications, conversions and confirmations.

format sets how the logs will appear in the console. Sections enclosed by % and ( )s are placeholders that will be formatted as strings. Other characters are printed as-is. Here, my logs will return as Date/Time [Log Level]: Log Message.

datefmt sets the date/time format for format‘s asctime using the same directives as time.strftime().

These settings will give me logs in the style of:

2024-01-11 09:44:39 [INFO]: Parameter found.
2024-01-11 09:44:39 [INFO]: API endpoints returned.
2024-01-11 09:44:39 [INFO]: Getting S3 parameter...
2024-01-11 09:44:39 [WARNING]: S3 parameter not found!

This lets me keep track of what stage Python is at when I extract WordPress API data.

boto3 Session

To call the AWS services I want to use, I need to create a boto3 session. This object represents a single connection to AWS, encapsulating options including the configuration settings and credentials. Without this, Python cannot access AWS Parameter Store, and so cannot extract WordPress API data.

To begin, I run pip install boto3 in the terminal. I then script the following:

Python
import logging
import boto3

session = boto3.Session()

This code snippet performs two new actions:

  • Imports the boto3 module
  • Instantiates an instance of the boto3 module’s Session class.

As Session has no arguments, it will use the first AWS credentials it finds. In AWS, these will be from the Lambda function’s IAM role. No problems there. But I have several AWS profiles on my laptop, and my default profile is for a different AWS account!

In response, I can set an AWS profile using VSCode’s launch.json debugging object. By adding "env": {"AWS_PROFILE": "{my_profile_name}"} to the end of the configurations list, I can specify which local AWS profile to use without altering the Python script itself:

JSON
{
	"version": "0.2.0",
    "configurations": [
        {
            "name": "Python: Current File",
            "type": "python",
            "request": "launch",
            "program": "${file}",
            "console": "integratedTerminal",
            "justMyCode": true,
            "env": {"AWS_PROFILE": "profile"}
        }
    ]
}

Functions

This section examines my Python functions that extract WordPress API data. Each function has an embedded GitHub Gist and an explanation of the arguments and processes.

Get Parameters Function

Firstly, I need to get my parameter values from AWS Parameter Store.

Here, I define a get_parameter_from_ssm function that expects two arguments:

  • ssm_client: the boto3 client used to contact AWS.
  • parameter_name: the name of the required parameter.

I use type hints to annotate parameter_name and the returned object type as strings. For a great introduction to type hints, take a look at this short video from AWS Mad Lad Matheus Guimaraes:

I then create a try except block containing a response object which uses the ssm_client.get_parameter function to try getting the requested parameter. If this fails, the AWS error is logged and a blank string is returned. The parameter value is returned if successful.

I am capturing the AWS exceptions using the botocore module because it provides access to the underlying error information returned by AWS services. When an AWS service operation fails, it usually returns an error response that includes details about what went wrong. botocore can access these responses programmatically and log more exception details than the Python default.

I now have two additional changes to my main script:

Python
import logging
import boto3
import botocore

session = boto3.Session()
client_ssm = session.client('ssm')
  • botocore needs to be imported, so I add import botocore to the script. I don’t need to install botocore because it was installed with boto3.
  • I need a Simple Systems Manager (SSM) client to interact with AWS Systems Manager Parameter Store. I create an instance of the SSM client using my existing session and assign it to client_ssm. I can now use client_ssm throughout my script.

Get Filename Function

Next, I want to get each API endpoint’s filename. The filename has some important uses:

  • Logging processes without using the full endpoint.
  • Creating S3 objects.

A typical endpoint has the schema https://site/endpointname_12345/. There are two challenges here:

  • Extracting the name from the string.
  • Removing the name’s random characters.

I define a get_filename_from_endpoint function, which expects an endpoint argument with a string type hint and returns a new string.

Firstly, my name_full variable uses the rsplit method to capture the substring I need, using forward slashes as separators. This converts https://site/endpointname_12345/ to endpointname_12345.

Next, my name_full_last_underscore_index variable uses the rfind method to find the last occurrence of the underscore character in the name_full string.

Finally, my name_partial variable uses slicing to extract a substring from the beginning of the name_full string up to (but not including) the index specified by name_full_last_underscore_index. This converts endpointname_12345 to endpointname.

If the function is unable to return a string, an exception is logged and a blank string is returned instead.

No new imports are needed here. So let’s move on!

Call WordPress API Function

My next function queries a given API endpoint and handles the response.

Here, I define a get_wordpress_api_json function that expects three arguments:

  • requests_session
  • api_url: the WordPress API URL with a string type hint.
  • api_call_timeout: the number of seconds to wait for a response before timing out.

requests.Session is a part of the Requests library, and creates a session object that persists across multiple requests. I can now use the same session throughout the script instead of constantly creating new ones.

I open a try except block and create a response object. requests.Session attempts to call the API URL. If the response status code is 200 OK then the response is returned as a raw JSON dictionary.

This function can fail in three ways:

  • The status code isn’t 200. While this includes 3xx, 4xx and 5xx codes, it also includes the other 2xx codes. This was deliberate, as any 2xx responses other than 200 are still unusual, and something I want to know about.
  • The API call times out.
  • Requests throws an exception.

In all cases, the function raises an exception and doesn’t proceed. This was a conscious choice, as an API call failure represents a critical and unrecoverable problem with the WordPress API that should ring alarm bells.

As I’m using the Requests module now, I need to run pip install requests in the terminal and add import requests to my script. I then create my requests session in the same way as my boto3 session.

I’m also now using json – another pre-installed core Python module ready for import:

Python
import logging
import json
import requests
import boto3
import botocore

session = boto3.Session()
client_ssm = session.client('ssm')
requests_session = requests.Session()

S3 Upload Function

Finally, I need to put my JSON data into S3

I define a put_s3_object function that expects four arguments:

  • s3_client: the boto3 client used to contact AWS.
  • bucket: the S3 bucket to create the new object in
  • name: the name to use for the new object
  • json_data: the data to upload

I give string type hints to the bucket, name and json_data arguments. This is especially important for json_data because of what I plan to do with it.

I open a try except block and try to use put_s3_object to upload the JSON data to S3. In this context:

  • Body is the JSON data I want to store.
  • Bucket is the S3 bucket name from AWS Parameter Store.
  • Key is the S3 object key, using an f-string that includes the name from my get_filename_from_endpoint function.

The JSON data is created by my get_wordpress_api_json function, which returns that data as a dictionary. Passing a dictionary to put_s3_object‘s Body argument will throw a parameter validation error because its type is invalid for the Body parameter. json_data‘s string type hint will help prevent this scenario.

Moving on, the S3 client’s put_object function attempts to upload the data to the S3 bucket’s wordpress-api prefix as a new JSON object. If this operation succeeds, the function returns True. If it fails, a botocore exception is logged and the function returns False.

While no new imports are needed, I do now need an S3 client alongside the SSM one to allow S3 interactions:

Python
session = boto3.Session()
client_ssm = session.client('ssm')
client_s3 = session.client('s3')
requests_session = requests.Session()

Script Body

This section examines the body of my Python script. I look at the script’s flow, the objects passed to the functions and the responses to successful and failed processes.

Variables

In addition to the imports and sessions already listed, I have some additions:

  • The S3 bucket and WordPress API Parameter Store names.
  • An api_call_timeout value for the WordPress API requests in seconds.
  • Three endpoint counts used for monitoring failures, successes and overall progress.
Python
# Parameter Names
parametername_s3bucket = '/s3/lakehouse/name/raw'
parametername_wordpressapi = '/wordpress/amazonwebshark/api/mysqlendpoints'

# Counters
api_call_timeout = 30
endpoint_count_all = 0
endpoint_count_failure = 0
endpoint_count_success = 0

Getting The Parameters

The first part of the script’s body handles getting the AWS parameters.

Firstly, I pass my SSM client and WordPress API parameter name to my get_parameter_from_ssm function.

If successful, the function returns a comma-separated string of API endpoints. I transform this string into a list using .split(",") and assign the list to api_endpoints_list. Otherwise, an empty string is returned.

This empty string is unchanged by .split(",") and is assigned to api_endpoints_list. This is why get_parameter_from_ssm returns a blank string if it hits an exception. split(",") has no issues with a blank string, but throws attribute errors with returns like False and None.

I then check if api_endpoints_list contains anything using if not any(api_endpoints_list). return ends the script execution if the list contains no values, otherwise the number of endpoints is recorded.

A similar process happens with the S3 bucket parameter. My get_parameter_from_ssm function is called with the same SSM client and the S3 parameter name. This time a simple string is returned, so no splitting is needed. This string is assigned to s3_bucket, and if it’s found to be empty then return ends the current execution.

If both api_endpoints_list and s3_bucket pass their tests, the script moves on to the next section.

Getting The Data

The second part of the script’s body handles getting data from the API endpoints.

Firstly, I open a for loop for each endpoint in api_endpoints_list. I pass each endpoint to my get_filename_from_endpoint function to get the name to use for logging and object creation. This name is assigned to object_name.

object_name is then checked. If found to be empty, the loop skips that endpoint to prevent any useless API calls and to preserve the existing S3 data. The failure counter increments by 1, and continue ends the current iteration of the for loop.

Once the name is parsed, my Requests session, timeout values and current API endpoint are passed to the get_wordpress_api_json function. This function returns a JSON dictionary that I assign to api_json. api_json is then checked and, if empty, skipped from the loop using continue.

Next, I need to transform the api_json dictionary object before an S3 upload attempt. If I pass api_json to S3’s put_object as is, the Body parameter throws a ParamValidationError because it can’t accept dictionaries. I use the json.dumps function to transform api_json to a JSON-formatted string and assign it to api_json_string, which put_object‘s Body parameter can accept.

I can now pass my S3 client, S3 bucket name, object_name and api_json_string to my put_s3_object function. This function’s output is assigned to ok, which is then checked and updates the success or failure counter as appropriate.

Once all APIs are processed, the loop ends and the final success and failure totals are logged.

Adding A Handler

Finally, I encapsulate the script’s body into a lambda_handler function. Handlers let AWS Lambda run invoked functions, so I’ll need one when I deploy my script to the cloud.

Resources

The full Python script has been checked into the amazonwebshark GitHub repo, available via the button below. Included is a requirements.txt file for the Python libraries used to extract the WordPress API data.

GitHub-BannerSmall

Summary

In this post, I used popular Python modules & AWS managed serverless services to extract WordPress API data.

I took a lot away from this! The script was a good opportunity to practise my Python skills and try out unfamiliar features like type hints, continue and requests.Session. Additionally, I made several revisions to control flows, logging and error handling that were triggered by writing this post. The script is clearer and faster as a result.

With the script complete, my next step will be deploying it to AWS Lambda and automating its execution. So keep an eye out for that! If this post has been useful, the button below has links for contact, socials, projects and sessions:

SharkLinkButton 1

Thanks for reading ~~^~~

Categories
Data & Analytics

Creating A Basic iTunes ETL With Python And AWS Data Wrangler

In this post I will use Python and AWS Data Wrangler to create a basic iTunes ETL that extracts data from an iTunes export file into a Pandas DataFrame.

Table of Contents

Introduction

For many years I have enjoyed various forms of dance music. Starting with my first compilation CDs in 2000, I’ve since amassed a large collection of records, CDs and virtual media ranging from the late 80s to modern times.

I started using iTunes as my main media player in 2010. Since then I have built up a large database of iTunes metadata that includes various counts, ratings and timestamps.

Currently I use this data for a series of iTunes Smart Playlists. To derive further meaning from the data and to practise my Python skills, I want to extract this data from iTunes and analyse it using the various data tools at my disposal.

To get the ball rolling I’m going to build a basic iTunes ETL, which I will continue to develop over the coming months.

Let’s start by looking at the iTunes export process.

iTunes Export Files

I use iTunes 12.6.4.3. This isn’t by choice – iTunes 12.6.4.3 is the last version with a built-in App Store, allowing my battered old iPhone 3GS to live on in its second life as an iPod Touch:

Still works!

I mention this as newer versions of iTunes may be different, or may not offer an export feature at all. Why do I persist with this ageing setup? That…is a post for another time.

Every week I sync my Not-iPhone via iTunes, and then create an export of my master playlist:

iTunes doesn’t have many export options, and exports playlists as tab-delimited txt files by default:

To give myself an easier time for this post, I manually made the following changes to a recent iTunes export file:

  • Imported the txt file into Microsoft Excel.
  • Removed columns I didn’t want.
  • Saved the altered file as a csv.
  • Uploaded the csv to Amazon S3.

This Franken-File will be what I use to build my basic iTunes ETL. I understand there are ways of dealing with txt files in Python – I’ll be exploring this in future posts.

Setup

Before starting to write any code, I have done the following:

Advisory

During this post, I will make several decisions that will be revisited in the coming months as my skills improve. I have taken steps to protect my AWS credentials (more on that shortly) but at this stage my basic iTunes ETL Python script is a work in progress and should not be used in a Production environment.

Creating Secure Variables

My first job is to create the variables I’m going to need. As these variables can compromise my AWS account in the wrong hands, I want to create them as securely as possible.

The topic of security is something I will be returning to in future posts. For now, I’m using a similar method to PowerShell’s Dot Sourcing in last month’s post.

Python’s import statement can import other Python scripts in the same way as modules. With this in mind, I create a new ETL_ITU_Play_Variables.py file for my variables.

Importing ETL_ITU_Play_Variables into my main script will allow Python to locate the variables and call them successfully:

import ETL_ITU_Play_Variables

aws_accesskey = ETL_ITU_Play_Variables.AWS_ACCESSKEY
aws_secret = ETL_ITU_Play_Variables.AWS_SECRET

Next I create a gitignore file and add ETL_ITU_Play_Variables.py to it. I can now use these variables in my local environment, safe in the knowledge that Git will not track ETL_ITU_Play_Variables and will not include it in any commits.

With that taken care of, I need two sets of variables.

Creating Authentication Variables

AWS authenticates every request before completing it. As none of my AWS resources are public, I need to provide credentials that have the necessary IAM permissions.

There are various ways to provide these credentials – in this case I’m using an AWS Access Key / Secret Key combination with a variable for each string:

aws_accesskey = 'accesskey123456789'
aws_secretkey = 'secretkey123456789'

As additional security, these keys belong to a new IAM user that only has permission to read S3 objects in the appropriate bucket.

I now need a way to pass these keys to AWS. I use the AWS SDK for Python (Boto3) for this, creating a session variable using boto3.session.Session

session = boto3.session.Session
(
aws_access_key_id = aws_accesskey,
aws_secret_access_key = aws_secret
)

Creating S3 Variables

Next I create the S3 variables I need. I use s3_bucket for the bucket name and s3_prefix for the iTunes export csv‘s bucket prefix.

s3_bucket = 'example-my-bucket'
s3_prefix = 'Example/MyPath/'

I then use these variables to create s3_path for AWS Data Wrangler to use:

s3_path = f"s3://{s3_bucket}/{s3_prefix}"

Making The ETL

With my variables in place, I can start working on my basic iTunes ETL! AWS is now accepting my requests, so let’s start configuring AWS Data Wrangler.

Creating The DataFrame

AWS Data Wrangler is essentially Pandas on AWS, and the two tools share many commands. This DataEng Uncomplicated AWS Data Wrangler Overview does a great job of explaining the fundamentals:

I read the iTunes Export csv‘s contents by using awswrangler.s3.read_csv with the following parameters:

  • path: My s3_path variable.
  • path_suffix: The files I want to read, in this case .csv.
  • boto3_session: My session variable.

This reads all the csv files in the S3 path, which is fine for now.

df = wr.s3.read_csv(path = s3_path,
                    path_suffix = ".csv",
                    boto3_session = session
                    )

I can then print the columns in a DataFrame:

print (f'Dataframe columns are {df.columns}')
Dataframe columns are Index(['Name', 'Artist', 'Album', 'Genre', 'Time', 'Track Number', 'Year', 'Date Modified', 'Date Added', 'Bit Rate', 'Plays', 'Last Played', 'Skips', 'Last Skipped', 'My Rating', 'Location'], dtype='object')

Deleting Unnecessary Columns

Having seen the list of columns, there are some I don’t need. I can get rid of them using pandas.DataFrame.drop:

df = df.drop(columns=
    [
        'Time',
        'Bit Rate',
        'Skips',
        'Last Skipped',
        'Location'
    ]
)

Now, when I print the list of columns, the removed columns are no longer included:

print (f'Dataframe columns are now {df.columns}')
Dataframe columns are now Index(['Name', 'Artist', 'Album', 'Genre', 'Track Number', 'Year', 'Date Modified', 'Date Added', 'Plays', 'Last Played', 'My Rating'], dtype='object')

Renaming Columns

Next, I want to rename the columns. I use pandas.DataFrame.rename to map the current column names to the new ones:

df = df.rename(columns=
    {
        'Name' : 'name',
        'Artist' : 'artist',
        'Album' : 'album',
        'Genre' : 'genre',
        'Track Number' : 'tracknumber',
        'Year' : 'year',
        'Date Modified' : 'datemodified',
        'Date Added' : 'dateadded',
        'Plays' : 'plays',
        'Last Played' : 'lastplayed',
        'My Rating' : 'myrating'
    }
)

The columns are now changed to:

print (f'Dataframe columns are now named {df.columns}')
Dataframe columns are now named Index(['name', 'artist', 'album', 'genre', 'tracknumber', 'year', 'datemodified', 'dateadded', 'plays', 'lastplayed', 'myrating'], dtype='object')

Reformatting DateTime Columns

I now want to make sure that the dates in my DataFrame are stored in ISO 8601 format, as this will make them earlier to work with and report against.

When I print the dateadded column as an example, the dates are not currently in this format:

print (f'Dataframe Date Added column is {df.dateadded}')
1       05/04/2021 13:29
2       26/01/2019 18:25
3       30/12/2016 17:34
4       12/12/2015 00:43

I can resolve this using the dayfirst and yearfirst arguments of pandas.to_datetime:

df['dateadded'] = pd.to_datetime(df['dateadded'],yearfirst=False,dayfirst=True)

This tells Pandas how to interpret the dates. In the case of 05/04/2021, dayfirst=True tells Pandas this is 5th April 2021, as opposed to 4th May 2021.

Pandas then parses the rest of my dates in the same way, giving me the formatting I want:

1      2021-04-05 13:29:00
2      2019-01-26 18:25:00
3      2016-12-30 17:34:00
4      2015-12-12 00:43:00

I repeat this for the datemodified and lastplayed columns.

Creating Date Columns From DateTime Columns

I now want to create some new columns in my DataFrame.

The first of these new columns will mirror the values in the existing date columns. However, these columns will not contain the full timestamp – they will only contain the date instead. This will make it easier to aggregate my data.

To do this, I use pandas.Series.dt.date to create three new columns in the DataFrame:

df['datemodifieddate'] = df['datemodified'].dt.date
df['dateaddeddate'] = df['dateadded'].dt.date
df['lastplayeddate'] = df['lastplayed'].dt.date

The new columns retain the original date values and remove the unneeded time values:

print (f'Dataframe Date Added Date column is {df.dateaddeddate}')
1       2021-04-05
2       2019-01-26
3       2016-12-30
4       2015-12-12

Creating Simplified Rating Columns

I now want to add another column to the DataFrame to simplify reporting against a track’s rating. Ratings in iTunes export files appear in multiples of twenty:

  • 1 star = 20
  • 2 stars = 40
  • 3 stars = 60
  • 4 stars = 80
  • 5 stars = 100

In my current DataFrame, printing myrating produces this:

print (f'Dataframe My Rating is {df.myrating}')
1        40.0
2        40.0
3        60.0
4        80.0

This produces a disconnect between the data in the DataFrame and the data in the iTunes GUI. I would prefer to keep things simple by having a column where the rating value mirrors the iTunes GUI.

This can be added to my DataFrame by using a function. I define an itunes_rating function that will return an integer based on the value that is passed to it:

def itunes_rating(r):
    """Converts ratings in export file to familiar format"""
    if r == 20:
        return 1
    elif r == 40:
        return 2
    elif r == 60:
        return 3
    elif r == 80:
        return 4
    elif r == 100:
        return 5
    else:
        return 0

I then create a new myratingdigit column in my DataFrame by passing each value in the myrating column to the itunes_rating function and capturing the result:

df['myratingdigit'] = df['myrating'].apply(itunes_rating)

And when I print the new column, the results are as expected:

print (f'Dataframe My Rating Digit is {df.myratingdigit}')
1       2
2       2
3       3
4       4

Setting Data Types

Finally, I want to make sure the DataFrame is using the correct data types for each column. Pandas will usually infer data types correctly but doesn’t always get it right.

I can use pandas.DataFrame.dtypes to see the current data types in my DataFrame. At the moment they are:

name                        object
artist                      object
album                       object
genre                       object
tracknumber                  int64
year                         int64
datemodified        datetime64[ns]
dateadded           datetime64[ns]
plays                      float64
lastplayed          datetime64[ns]
myrating                   float64
datemodifieddate            object
dateaddeddate               object
lastplayeddate              object
myratingdigit                int64

Most of these are correct but some need changing. For example, plays will never have decimal places so should be int, and columns like datemodifieddate should be datetime64.

Pandas has several options for this, which are laid out in this helpful Stack Overflow thread. Here, I use astype to assign data types to my dataframe:

df = df.astype(
    {
        'name' : str,
        'artist' : str,
        'album' : str,
        'genre' : str,
        'tracknumber' : int,
        'year' : int,
        'datemodified' : datetime64,
        'dateadded' : datetime64,
        'plays' : int,
        'lastplayed' : datetime64,
        'myrating' : int,
        'datemodifieddate' : datetime64,
        'dateaddeddate' : datetime64,
        'lastplayeddate' : datetime64,
        'myratingint' : int
    }
)

Pandas uses NumPy datetime64 dtypes for working with time series data, so I import it at the top of my script:

from numpy import datetime64

Fixing A Casting Exception

Unfortunately, while testing the newly assigned dtypes I started getting an error:

Exception has occurred: IntCastingNaNError
Cannot convert non-finite values (NA or inf) to integer

This error means that at least one of the columns I’m trying to cast as int contains an empty value. An infinite value is possible, but unlikely due to the various integrity checks iTunes performs on its library.

To find the empty values, I create a second DataFrame using the data in the first, using pandas.DataFrame.isna and pandas.DataFrame.any to find any NA values:

df1 = df[df.isna().any(axis=1)]

Included within the resulting DataFrame were the following tracks:

3571	7 Hours (Original Mix)	Dan Stone	07A-Dm	...	2019-01-26	NaT	1

3575	8th Wonder (Espen & Stian Remix)	8 Wonders	04A-Fm	...	2019-01-26	NaT	1

Checking iTunes shows that these tracks have no plays:

iTunes represents no plays as an empty string as opposed to a zero. This is then extracted into the DataFrame as NA, causing the IntCastingNaN error.

To fix this, I use pandas.DataFrame.fillna to replace the empty fields with zero. Although only the plays column is generating the error, I apply fillna to all the columns being cast as int to prevent any future problems for the ETL:

df['tracknumber'] = df['tracknumber'].fillna(0)
df['year'] = df['year'].fillna(0)
df['plays'] = df['plays'].fillna(0)
df['myrating'] = df['myrating'].fillna(0)

The myratingint column doesn’t need this approach, since my itunes_rating function always returns zero if no conditions are met.

This time, printing the data types shows an acceptable list:

name                        object
artist                      object
album                       object
genre                       object
tracknumber                  int64
year                         int64
datemodified        datetime64[ns]
dateadded           datetime64[ns]
plays                        int64
lastplayed          datetime64[ns]
myrating                     int64
datemodifieddate    datetime64[ns]
dateaddeddate       datetime64[ns]
lastplayeddate      datetime64[ns]
myratingdigit                int64

Exporting The DataFrame As A CSV

This is as far as I’m going to take the DataFrame in this post. As a final check, I want to extract the DataFrame in some form to confirm its suitability for future work I have planned.

The quickest way to do this is with pandas.DataFrame.to_csv. This writes the entire DataFrame to a csv file. When I run:

df.to_csv('ETL-ITU.csv')

A ETL-ITU.csv file is created in the terminal’s working directory that can be viewed and sandboxed as needed.

Scripts

My gitignore file commit from 2022-07-17 can be viewed here:

Basic_iTunes_Python_ETL .gitignore on GitHub

My ETL_ITU_Play.py file commit from 2022-07-17 can be viewed here:

ETL_ITU_Play.py on GitHub

A requirements.txt file has also been created to aid installation. The file commit from 2022-07-20 can be viewed here:

Basic_iTunes_Python_ETL requirements.txt on GitHub

Summary

In this post I used Python and AWS Data Wrangler to create a basic iTunes ETL that extracts data from an iTunes export file into a Pandas DataFrame. I have used various Python modules to extract and transform the data, and the data is now ready to be loaded to a staging area of my choosing.

Expect to see further posts on this in the coming months. This basic iTunes ETL probably won’t stay basic for long!

If this post has been useful, please feel free to follow me on the following platforms for future updates:

Thanks for reading ~~^~~