Categories
Developing & Application Integration

Event-Based Cost Control In AWS Glue: Build

In this post, I build my event-based AWS Glue automated cost control process using serverless managed services.

Table of Contents

Introduction

Last time, I examined some unexpected AWS Glue costs and designed an event-based cost control process architecture. I also wrote this user story:

As an AWS account owner, I want Glue interactive sessions to stop automatically after a chosen duration so that I don’t accidentally generate unexpected and avoidable costs.

Here, I’m going to build my event-based Glue cost control process using these AWS services:

  • SNS
  • CloudTrail
  • Step Functions
  • EventBridge
  • CloudWatch

The order is based on dependencies, which I will explain shortly. Some of these resources already exist, so let’s start by reviewing those.

Existing Resources

I have two existing SNS topics that this process will use. These are general-purpose topics used for all my Step Functions notifications. They are:

  • failure-stepfunction
  • success-stepfunction

Both topics are largely alike, with the main difference being the distinct subaddressing in their respective email endpoints.

CloudTrail

Let’s start by examining an AWS Glue CreateSession CloudTrail event record. I haven’t included a full Glue CreateSession CloudTrail event record here because:

  • They’re around 90 lines long. Each.
  • They contain sensitive data.

The AWS documentation covers CloudTrail record contents in full for those curious.

Here’s part of a Glue CreateSession CloudTrail event record. This one shows session glue-studio-datapreview-e09f88a9-4d7f-4e64-95f2-e435fbd1963a:

JSON
{
    "eventSource": "glue.amazonaws.com",
    "eventName": "CreateSession",
    "requestParameters": {
        "id": "glue-studio-datapreview-e09f88a9-4d7f-4e64-95f2-e435fbd1963a",
        "command": {
            "name": "glueetl",
            "pythonVersion": "3"
        },
        "idleTimeout": 30,
        "maxCapacity": 2,
        "glueVersion": "4.0",
        "requestOrigin": "GlueStudioDataPreview"
    },
}

Here, requestParameters contains the new session’s details including its ID:

JSON
{
    "eventSource": "glue.amazonaws.com",
    "eventName": "CreateSession",
    "requestParameters": {
        "id": "glue-studio-datapreview-e09f88a9-4d7f-4e64-95f2-e435fbd1963a",
        "command": {
            "name": "glueetl",
            "pythonVersion": "3"
        },
        "idleTimeout": 30,
        "maxCapacity": 2,
        "glueVersion": "4.0",
        "requestOrigin": "GlueStudioDataPreview"
    },
}

This is the Glue Interactive Session’s unique identifier. I’ll be using this in my event-based Glue cost control build shortly. For now, understand that:

  • The Glue Interactive Session’s ID is found in the event record’s requestParameters object.
  • The requestParameters object is in turn found in the event record’s details object.

This is represented as:

JSON
detail.requestParameters.id

I’m going to pass this ID to a Step Functions state machine later. Speaking of which…

Step Functions

In this section, I start creating my event-based Glue cost control build automation. This consists of two components:

  • An event router – built with an EventBridge rule.
  • A service orchestrator – built with a Step Functions state machine.

Since the state machine will be the EventBridge rule’s target, I must create the state machine first.

State Machine Actions

The state machine’s architecture was covered in my previous post. As a reminder, when given a Glue SessionID the state machine must:

  • Wait for a set period.
  • Stop the Glue session.
  • Trigger a confirmation email.

So let’s run through each step, starting with how the Glue SessionID is acquired.

Getting Glue Session ID

When executing a Step Functions state machine, an optional JSON input can be specified. There are several ways to supply this input:

2024 12 14 StateMachineInputJSON

For my event-based Glue cost control build, a typical JSON input will be:

JSON
{
  "session_id": "glue-studio-datapreview-123-456-789"
}

This can then be used in the other states as "$.session_id"

The state machine must then enter a wait state.

Wait

Step Functions has a built-in Wait state for handling delays. I want a thirty-second delay. This is configurable both in Workflow Studio and Amazon States Language (ASL):

JSON
    "Wait": {
      "Type": "Wait",
      "Seconds": 30,
      "Next": "StopGlueSession"
    },

The state machine must then stop the Glue session.

Glue: Stop Session

To understand what’s needed here, let’s review the Glue StopSession API reference. ID is the only required parameter, which comes from the earlier JSON input.

This is represented in ASL as:

JSON
{
  "Id.$": "$.session_id"
}

Now, as discussed previously, this action can fail. In the example below, a Glue StopSession request fails because the session is still being provisioned. Since nothing has started, there is nothing to stop:

JSON
{
  "cause": "Session is in PROVISIONING status (Service: Glue, Status Code: 400, Request ID: null)",
  "error": "Glue.IllegalSessionStateException",
  "resource": "stopSession",
  "resourceType": "aws-sdk:glue"
}

To that end, I’ve added retry parameters. Upon error, StopGlueSession will retry three times, with a ten-second delay between attempts. If the third retry fails, then the state machine’s error handling will be invoked.

This is the state’s ASL:

JSON
    "StopGlueSession": {
      "Type": "Task",
      "Resource": "arn:aws:states:::aws-sdk:glue:stopSession",
      "Parameters": {
        "Id.$": "$.session_id"
      },
      "Next": "SNS Publish",
      "Retry": [
        {
          "ErrorEquals": [
            "States.ALL"
          ],
          "IntervalSeconds": 10,
          "MaxAttempts": 3
        }
      ]
    },

Where:

  • "Id.$": "$.session_id" is the Glue SessionID from the JSON input.
  • "ErrorEquals": ["States.ALL"] captures all errors.
  • "IntervalSeconds": 10, "MaxAttempts": 3 sets the retry parameters.

Finally, the state machine must trigger a confirmation email.

SNS: Publish

I usually avoid state machine success notifications to avoid alarm fatigue, but I decided to include them here for two reasons:

  • I can check the state machine is working without accessing AWS.
  • I can see excessive activity without viewing logs.

Here, I publish a message to my existing success-stepfunction SNS topic using SNS’s optimised integration:

JSON
"SNS Publish": {
      "Type": "Task",
      "Resource": "arn:aws:states:::sns:publish",
      "Parameters": {
        "TopicArn": "arn:aws:sns:eu-west-1:[REDACTED]:success-stepfunction",
        "Message.$": "States.Format('Hi! AWS Step Functions has stopped this Glue session for you: {}', $)"
      },
      "End": true
    }

I customised the Message.$ parameter using the States.Format intrinsic function:

  • The string starting with 'Hi!... is the message I want SNS to use.
  • {} is a placeholder for the value I want to insert.
  • $ is the state machine data to insert into {}

This produces a better email notification for the user:

Hi! AWS Step Functions has stopped this Glue session for you: {Id=glue-studio-datapreview-3f905608-50f1-4b9e-80e2-f4071feb2282}

Finally, "End": true stops the state machine.

Final Workflow

The state machine is now as follows:

stepfunctions graph

With this auto-generated ASL:

JSON
{
  "StartAt": "Wait",
  "States": {
    "Wait": {
      "Type": "Wait",
      "Seconds": 30,
      "Next": "StopGlueSession"
    },
    "StopGlueSession": {
      "Type": "Task",
      "Resource": "arn:aws:states:::aws-sdk:glue:stopSession",
      "Parameters": {
        "Id.$": "$.session_id"
      },
      "Next": "SNS Publish",
      "Retry": [
        {
          "ErrorEquals": [
            "States.ALL"
          ],
          "IntervalSeconds": 10,
          "MaxAttempts": 3
        }
      ]
    },
    "SNS Publish": {
      "Type": "Task",
      "Resource": "arn:aws:states:::sns:publish",
      "Parameters": {
        "TopicArn": "arn:aws:sns:eu-west-1:[REDACTED]:success-stepfunction",
        "Message.$": "States.Format('Hi! AWS Step Functions has stopped this Glue session for you: {}', $)"
      },
      "End": true
    }
  },
  "Comment": "When given a Glue SessionID start a wait, stop the session and send an SNS message."
}

There’s one more aspect to sort out. What happens if the state machine fails?

Error Logging

Firstly, let’s examine the state of events if the state machine fails:

  • A Glue session must have started.
  • An Eventbridge Rule must have sent the event to Step Functions.
  • One of the state machine states must have failed.

Unless the failing state is SNS:Publish, then there is an active Glue session still incurring costs. Therefore, triggering an alarm is much more appropriate than a notification. Alarm creation requires sending the state machine logs to CloudWatch.

By default, new state machines do not enable logging due to storage expenses. However, in this case, the log storage cost will be significantly lower than that of an unattended Glue Session. So I activate the logging for my state machine.

Step Functions log levels range from ALL to ERROR to FATAL to OFF, which are explained in the AWS documentation. As I’m only interested in failures, I select ERROR and include the execution data. This consists of execution input, data passed between states and execution output:

2024 12 14 StateMachineLogging

Next, I create a new CloudWatch log group called /aws/vendedlogs/states/GlueSession-WaitAndStop-Logs. This will form the basis of my failure alerting.

CloudWatch

Here, I configure the CloudWatch resources for my event-based Glue cost control build.

Log Groups & Metrics

The previously configured GlueSession-WaitAndStop-Logs group receives all the Step Functions state machine’s ERROR events. In most cases, these are Glue.IllegalSessionStateException events:

JSON
{
    "id": "7",
    "type": "TaskFailed",
    "details": {
        "cause": "Session is in PROVISIONING status (Service: Glue, Status Code: 400, Request ID: b1baaf14-ae89-4106-a286-87cf5445de6c)",
        "error": "Glue.IllegalSessionStateException",
        "resource": "stopSession",
        "resourceType": "aws-sdk:glue"
    },

Note the TaskFailed event type – it indicates the failure of a single state, not the entire state machine. Thus, I don’t need alerts for those events.

However, there are also ExecutionFailed events like these:

JSON
{
    "id": "5",
    "type": "ExecutionFailed",
    "details": {
        "cause": "An error occurred while executing the state 'StopGlueSession' (entered at the event id #4). The JSONPath '$.session_id' specified for the field 'Id.$' could not be found in the input '{\n  \"sessionId\": \"\"\n}'",
        "error": "States.Runtime"
    },

I definitely want to know about these! ExecutionFailed means the entire state machine failed, and there’s probably a Glue Session still running!

These events are captured as ExecutionsFailed CloudWatch metrics. Keep in mind that the AWS Step Functions console automatically publishes various metrics irrespective of logging configurations, including ExecutionFailed. However, in my experience, having both the metrics and failure logs centralised in CloudWatch simplifies troubleshooting.

Next, let’s use these metrics to create an alarm.

Alarm

Creating a CloudWatch alarm begins with selecting the ExecutionsFailed metric from States > Execution Metrics

2024 12 13 CWMetrics

This alarm will have a static value threshold with a value greater than zero, which is checked every minute. When the alarm’s state is In Alarm, an email notification will be sent to my failure-stepfunction SNS topic.

Finally, CloudWatch creates a new alarm graph:

2024 12 13 CWAlarm

So that’s everything state machine needs. Next, how do I pass the Glue SessionID to it?

EventBridge

In this section, I create the EventBridge Rule responsible for handling my event-based Glue cost control build’s events.

EventBridge Rule Anatomy

EventBridge Rules specify the criteria for routing events from an event bus to designated targets like Lambda functions, Step Functions and SQS queues. They use event patterns to filter incoming events and identify targets to route to, enabling event-driven and event-based workflows without custom processing logic.

Creating an EventBridge Rule involves three steps:

  • Define rule detail
  • Build event pattern
  • Select target

Define Rule Detail

Besides the name and description, this section is mainly concerned with:

  • Event Bus: The event bus to monitor for events. Default is fine.
  • Rule Type: EventBridge’s rule type. This can either match an event pattern or operate on a schedule (this is different from EventBridge Scheduler – Ed).

Next, let’s discuss event patterns!

Build Event Pattern

Firstly, event patterns are a very expansive topic, so please refer to the EventBridge user guide afterwards for definitions and examples.

Event patterns act as filters, defining how EventBridge identifies whether to send an event to a target. The EventBridge console provides options for sample events and testing patterns.

As a reminder, this is part of a typical CreateSession event record from which I want to capture ID:

JSON
"eventSource": "glue.amazonaws.com",
"eventName": "CreateSession",
"requestParameters": {
  "id": "glue-studio-datapreview-3f905608-50f1-4b9e-80e2-f4071feb2282",
  "role": "arn:aws:iam::[REDACTED]:role/service-role/AWSGlueServiceRole-wordpress_bronze",
        "command": {
            "name": "glueetl",
            "pythonVersion": "3"
        },
        "idleTimeout": 30
....

EventBridge currently has three pattern creation methods:

  • Schema: Using either manual entry or the schema registry.
  • Pattern Form: Using pre-defined EventBridge templates.
  • Custom Pattern: Using a manual JSON editor.

Pattern Form offers a series of dropdowns that quickly construct the desired pattern:

2024 12 28 EventBridgeEventPattern

Selecting AWS Services > Glue > AWS API Call via CloudTrail creates this event pattern:

JSON
{
  "source": ["aws.glue"],
  "detail-type": ["AWS API Call via CloudTrail"],
  "detail": {
    "eventSource": ["glue.amazonaws.com"]
  }
}

This will send all Glue events to the target, so it could use some refinement. An eventName can be added to the pattern either by manual editing or via the Specific Operation(s) setting.

The updated pattern will now only send Glue CreateSession events:

JSON
{
  "source": ["aws.glue"],
  "detail-type": ["AWS API Call via CloudTrail"],
  "detail": {
    "eventSource": ["glue.amazonaws.com"],
    "eventName": ["CreateSession"]
  }
}

Select Target

Finally, I must select the EventBridge Rule’s target – my state machine. This is why I created the state machine first; for it to be an EventBridge target it must first exist.

At this point, I could pass the whole event to the state machine. However, the state machine had no way to parse the SessionID from the event. While JSONata could now meet this requirement, it wasn’t a Step Functions feature back in June.

Luckily, EventBridge offers relevant settings here. One of these – an Input Transformer – can customise an event’s text before EventBridge sends it to the rule’s target. Input Transformers consist of an Input Path and Input Template.

An Input Path uses a JSON path and key-value pairs to reference items in events and store them as variables. For instance, capturing ID from this event:

JSON
"eventSource": "glue.amazonaws.com",
"eventName": "CreateSession",
"requestParameters": {
  "id": "glue-studio-datapreview-3f905608-50f1-4b9e-80e2-f4071feb2282",
  "role": "arn:aws:iam::[REDACTED]:role/service-role/AWSGlueServiceRole-wordpress_bronze",
        "command": {
            "name": "glueetl",
            "pythonVersion": "3"
        },
        "idleTimeout": 30
....

Requires this Input Path:

JSON
{
  "id": "$.detail.requestParameters.id"
}

In which:

  1. $.detail accesses the detail object of the CloudTrail event record.
  2. $.detail.requestParameters accesses the requestParameters object within detail.
  3. Finally, $.detail.requestParameters.id accesses the id value within requestParameters.

This is passed to an Input Template, mapping the path’s output to a templated key-value pair. This is then passed to the rule target verbatim, replacing placeholders with the Input Path values.

So this template:

JSON
{
  "session_id": "<id>"
}

Produces a JSON object comprising a "session_id": string and the Input Path’s Glue SessionID value:

JSON
{
  "session_id": "glue-studio-datapreview-3f905608-50f1-4b9e-80e2-f4071feb2282"
}

This will be passed as the JSON input when executing the state machine.

That’s everything done now. So let’s see if it works!

Testing

This section tests my event-based Glue cost control build.

In the following tests, a Glue Interactive Session was started with the build fully active and was observed in the AWS console. AWS assigned the SessionID glue-studio-datapreview-3f905608-50f1-4b9e-80e2-f4071feb2282.

EventBridge Rule

Expectation: When a Glue CreateSession CloudTrail event record is created:

  • EventBridge matches the CloudTrail event record to my EventBridge Rule.
  • The EventBridge Rule triggers and defines a session_id variable.
  • The EventBridge Rule executes my target state machine with session_id JSON input.

Result: CloudWatch indicates EventBridge matched the CloudTrail Event Record to my EventBridge Rule’s Event Pattern, executing the intended actions:

2024 06 11 EventBridgeCWGraph

The EventBridge Rule’s extracts the glue-studio-datapreview-3f905608-50f1-4b9e-80e2-f4071feb2282 SessionID from the CloudTrail Event Record and adds it as a JSON input when executing the targeted GlueSession-WaitAndStop state machine.

Step Functions State Machine

Expectation: When a Glue CreateSession CloudTrail event record is created:

  • State machine is executed with session_id JSON input.
  • Glue StopSession API is called after 30 seconds.
  • If the first StopSession API call fails, a retry occurs after ten seconds.
  • A confirmation email is sent to the user.

Result: State machine executes successfully:

2024 06 11 StepFGraph

The state machine logs also correctly show a thirty-second wait between rows 2 and 3 (the start and end of the Wait state):

2024 06 11 StepFExec

Additionally, if a Glue.IllegalSessionStateException error occurs, a retry occurs after ten seconds (see rows 7 and 8):

2024 12 13 SFRetry

Finally, SNS sends the correct email to the user:

2024 06 11 GmailNotif

The failure alarm is tested later.

Glue Session

Expectation: When an Interactive Session starts while the EventBridge Rule is enabled, it is automatically stopped thirty seconds after becoming active.

Result: This session runs for seventy seconds. Although this exceeds thirty seconds, keep in mind that the session needs to be provisioned before it can be stopped.

2024 06 11 GlueSessionConsole

These results can also be verified using the Glue Get-Session AWS CLI command:

Bash
[cloudshell-user@ip-[REDACTED] ~]$ aws glue get-session --id glue-studio-datapreview-3f905608-50f1-4b9e-80e2-f4071feb2282

{
    "Session": {
        "Id": "glue-studio-datapreview-3f905608-50f1-4b9e-80e2-f4071feb2282",
        "CreatedOn": "2024-06-11T12:23:04.586000+00:00",
        "Status": "STOPPED",
        
	[REDACTED]
	
        "WorkerType": "G.1X",
        "CompletedOn": "2024-06-11T12:24:30.210000+00:00",
        "ExecutionTime": 70.384,
        "DPUSeconds": 140.768,
        "IdleTimeout": 30
    }
}
(END)

CloudWatch Alarm

The CloudWatch Alarm was tested by briefly changing the Step Function state machine’s IAM policy to deny the StopSession action and then starting a new Interactive Session, forcing the desired failure without altering the cost control process itself.

Expectation: If the state machine fails, then a CloudWatch Alert is sent to the user.

Result: Upon the state machine’s failure, an ExecutionsFailed metric is emitted to CloudWatch, shown in this chart:

2024 06 11 CloudWatchMetric

This triggers the CloudWatch Alarm when its Sum > 0 threshold condition is met, changing the alarm’s state to In Alarm and sending an email notification using my failure-stepfunction SNS topic:

2024 06 11 CloudWatchAlerting

And with that, all tests are successful. Now let’s look at the costs.

Cost Analysis

This section analyses the costs of my event-based Glue cost control build. There are two aspects to this:

  • Cost Expenditure: How much is the cost control process costing me to run?
  • Cost Savings: How much money am I saving on the stopped Glue Sessions?

Because the biggest test of all is whether this build satisfies the user story. Does it prevent unexpected and avoidable costs?

Cost Expenditure

Firstly, let’s examine my event-based Glue cost control build costs between June 2024 and November 2024:

2024 12 13 CostsZero

So I guess this kinda makes my point. Zero cost doesn’t mean zero usage though, so let’s check the bills for that period.

Caveat: I didn’t tag any of my resources (yes ok I know), so this usage is for the entire account.

CloudTrail & CloudWatch Usage

CloudTrail FreeEventsRecorded:

Service Period Metric Quantity
CloudTrail 2024-06 FreeEventsRecorded 33,217
CloudTrail 2024-07 FreeEventsRecorded 28,993
CloudTrail 2024-08 FreeEventsRecorded 40,682
CloudTrail 2024-09 FreeEventsRecorded 29,891
CloudTrail 2024-10 FreeEventsRecorded 36,208
CloudTrail 2024-11 FreeEventsRecorded 28,630

CloudWatch Alarms:

Service Period Metric Quantity
CloudWatch 2024-06 Alarms 0.919
CloudWatch 2024-07 Alarms 2
CloudWatch 2024-08 Alarms 2.126
CloudWatch 2024-09 Alarms 2
CloudWatch 2024-10 Alarms 2
CloudWatch 2024-11 Alarms 2

CloudWatch Metrics:

Service Period Metric Quantity
CloudWatch 2024-06 Metrics 5.29
CloudWatch 2024-07 Metrics 0.372
CloudWatch 2024-08 Metrics 4.766
CloudWatch 2024-09 Metrics 0.003
CloudWatch 2024-10 Metrics 4.003
CloudWatch 2024-11 Metrics 4.626

CloudWatch Requests:

Service Period Metric Quantity
CloudWatch 2024-06 Requests 696
CloudWatch 2024-07 Requests 15
CloudWatch 2024-08 Requests 230
CloudWatch 2024-09 Requests 0
CloudWatch 2024-10 Requests 181
CloudWatch 2024-11 Requests 122

EventBridge, SNS & Step Functions Usage

EventBridge EventsInvocation:

Service Period Metric Quantity
EventBridge 2024-06 EventsInvocation 30
EventBridge 2024-07 EventsInvocation 31
EventBridge 2024-08 EventsInvocation 31
EventBridge 2024-09 EventsInvocation 30
EventBridge 2024-10 EventsInvocation 31
EventBridge 2024-11 EventsInvocation 30

SNS NotificationDeliveryAttempts-SMTP:

Service Period Metric Quantity
SNS 2024-06 NotificationDeliveryAttempts-SMTP 52
SNS 2024-07 NotificationDeliveryAttempts-SMTP 29
SNS 2024-08 NotificationDeliveryAttempts-SMTP 85
SNS 2024-09 NotificationDeliveryAttempts-SMTP 2
SNS 2024-10 NotificationDeliveryAttempts-SMTP 58
SNS 2024-11 NotificationDeliveryAttempts-SMTP 11

SNS Requests:

Service Period Metric Quantity
SNS 2024-06 Requests-Tier1 315
SNS 2024-07 Requests-Tier1 542
SNS 2024-08 Requests-Tier1 553
SNS 2024-09 Requests-Tier1 325
SNS 2024-10 Requests-Tier1 366
SNS 2024-11 Requests-Tier1 299

Step Functions StateTransition:

Service Period Metric Quantity
Step Functions 2024-06 StateTransition 388
Step Functions 2024-07 StateTransition 180
Step Functions 2024-08 StateTransition 566
Step Functions 2024-09 StateTransition 300
Step Functions 2024-10 StateTransition 616
Step Functions 2024-11 StateTransition 362

All within free tier. So how did Glue fare?

Cost Savings

Next, let’s pull my InteractiveSessions costs between June 2024 and November 2024:

2024 12 13 CostsGlue

The high June costs kickstarted this process, and there’s a massive difference between June and the others! September isn’t a mistake – I was kinda busy.

Glue Costs

Here are the actual costs:

Service Period Metric Quantity Cost $
Glue 2024-06 InteractiveSessions 5.731 DPU-Hour 2.52
Glue 2024-07 InteractiveSessions 0.197 DPU-Hour 0.09
Glue 2024-08 InteractiveSessions 2.615 DPU-Hour 1.15
Glue 2024-09 InteractiveSessions 0.000 DPU-Hour 0.00
Glue 2024-10 InteractiveSessions 2.567 DPU-Hour 1.13
Glue 2024-11 InteractiveSessions 0.079 DPU-Hour 0.03
TOTAL 4.92

While these aren’t exactly huge sums, there are two items to consider here:

Glue Estimated Savings

Finally, what saving does this represent? While I can’t get a value from AWS Billing, I can reasonably estimate one. Firstly, using the AWS Calculator for Glue I calculated the cost of an Interactive Session that times out:

2 DPUs x 0.50 hours x 0.44 USD per DPU-Hour = 0.44 USD

https://calculator.aws/#/createCalculator/Glue

Next, I went back through my records and found how many sessions had been stopped each month:

Period Stops
2024-06 11
2024-07 5
2024-08 61
2024-09 0
2024-10 53
2024-11 2

Caveat: To be fair to AWS, some sessions were created while I was working on a Glue ETL job with automation enabled. So, while the automation was continually stopping sessions, I was constantly starting new ones. Thus, Glue isn’t the money pit I perhaps make out, and I’m not that careless with leaving them on!

By multiplying the number of stopped sessions by 0.44, I can determine each month’s potential cost, then subtract the actual cost to find the estimated savings:

Period Stops Potential Cost $ Actual Cost $ Est. Saving $
2024-06 11 4.84 2.52 2.32
2024-07 5 2.20 0.09 2.11
2024-08 61 26.84 1.15 25.69
2024-09 0 0.00 0.00 0.00
2024-10 53 23.32 1.13 22.19
2024-11 2 0.88 0.03 0.85
TOTAL 132 58.08 4.92 53.16

Almost $55! Even if I reduce that by 50% based on the caveat, that’s still around a $25 saving. And with no setup costs!

Summary

In this post, I built my event-based AWS Glue automated cost control process using serverless managed services.

I’m pleased with the outcome! My generally busy Summer and Autumn inadvertently tested this process for six months, and it’s been fine throughout! I may soon extend the state machine’s waiting duration, which only needs a parameter change for one state.

The great thing about this process is that it isn’t limited to Glue; EventBridge can use nearly all AWS services as event sources. I’m seriously impressed with EventBridge. It’s poked me about Glacier restores, scheduled my ETLs and now is also saving me a few quid!

If this post has been useful then the button below has links for contact, socials, projects and sessions:

SharkLinkButton 1

Thanks for reading ~~^~~

Categories
Architecture & Resilience

Event-Based Cost Control In AWS Glue: Architecture

In this post, I examine some unexpected AWS Glue costs and design an event-based cost control process architecture.

Table of Contents

Introduction

Last month, I finished a series of data pipeline posts using, among other services, AWS Glue. During this series I made many discoveries – some more desirable than others. One such undesirable was a cost spike in early June! Not enough to trigger a budget alarm, but still higher than expected at that time.

To Cost Explorer! These were the results:

2024 06 24 AWSCostsStartJune

Those Glue costs were…unexpected. While this doesn’t look like much, in contrast my entire May 2024 bill was $1.08. So June saw an almost 150% cost increase over just three days!

This post has two sections. Firstly, the Discovery section examines the costs in closer detail and considers potential solutions. Secondly, the Architecture section examines the decisions made for and the technical implementation of the chosen solution.

Discovery

This section examines the costs in closer detail and considers potential solutions. I’ll structure the cost analysis using three questions:

  • How are the costs made up?
  • What specifically is generating the costs?
  • Why are the costs being generated?

The How

Question 1: How are the costs made up?

Firstly, let’s break down the costs. The earlier chart shows that Glue is the main cost driver – I now want to drill down into the API-level costs. I can do this by changing the chart’s dimension to API Operation.

This updates it to:

2024 06 24 AWSCostsStartJuneDimAPI

And the raw data to:

2024 06 24 AWSCostsStartJuneTable

The main costs here are all Glue APIs, with the top two being:

  • GlueInteractiveSession
  • Jobrun

No operation is tax – Ed

Jobrun was easy to account for, as I was testing some Glue ETL jobs at the time. But I was unfamiliar with GlueInteractiveSession, and as it was the biggest cost driver it became the focus of my ongoing investigation.

The What

Question 2: What specifically is generating the costs?

So what is the GlueInteractiveSession API? What does it do? And how does it accrue costs? Let’s begin with the AWS User Guide definition:

The interactive sessions API describes the AWS Glue API related to using AWS Glue interactive sessions to build and test extract, transform, and load (ETL) scripts for data integration.

https://docs.aws.amazon.com/glue/latest/dg/aws-glue-api-interactive-sessions.html

AWS Glue Interactive Sessions offer serverless, on-demand Apache Spark environments that work seamlessly with Glue ETL jobs. These sessions allow for the live development, testing, and enhancement of data processing steps and ETL tasks. They can easily connect to data from various AWS services such as S3, DynamoDB, and Redshift.

Interactive Sessions let users preview data without running full ETL jobs. This offers several benefits during development and testing:

  • Data modifications are only temporary during an Interactive Session, protecting the original data from undesired and unintended changes.
  • Jobs can be evaluated step by step rather than after each complete run, allowing for quicker development and testing compared to always executing the full job. And because of this…
  • When testing ETL steps, interactive sessions usually use fewer resources than a Glue job, thus reducing costs.

Speaking of costs, Glue Interactive Sessions billing is similar to Glue ETL Job billing and is based on the following factors:

  1. Duration: How long the session runs, measured in seconds.
  2. Resource Usage: The resources consumed during the session, such as CPU, memory, and storage.

This all sounds good. So why is my bill so high?

The Why

Question 3: Why are the costs being generated?

So I now know that:

  • The GlueInteractiveSession API is the main cost driver.
  • My Glue Interactive Sessions are linked to my AWS Glue ETL Jobs.

Let’s now examine why the GlueInteractiveSession API is suddenly generating higher costs.

The How chart shows that GlueInteractiveSession costs can happen irrespective of Jobrun costs. Indeed – on June 03 there were no Jobrun costs. So running Glue ETL jobs isn’t causing these charges.

Helpfully, the AWS Glue console has a dedicated Interactive Sessions section that shows session instance histories. Upon inspection, I found lots of this:

2024 12 08 GlueSessionTImeout

So, timeouts. Timeouts are good. They stop Interactive Sessions from running indefinitely, and sessions started from the Glue console automatically get a 30-minute timeout.

What was more concerning was the number of timeouts I found: three on June 02 and six on June 03. That’s nine sessions, each of which timing out after 30 minutes. That’s four and a half hours of unused compute I’m being billed for! How are these timeouts happening?

…About that. I often open multiple browser tabs to compare screens quickly when I’m trying things out. Here, each new Glue ETL Job browser tab starts a new interactive session based on my commands, and I forget to close these sessions afterwards. Oops!

Solutions

So now I know the cost’s root cause is my own ineptitude, how do I fix this? There are several options:

Permission Blocking: I could deny CreateSession requests using IAM and SCPs. This solution works for non-data-facing AWS accounts but creates unreasonable barriers for Glue-based console workstreams elsewhere.

Parameter Adjustment: The CreateSession API has an IdleTimeout parameter that controls the number of minutes when idle before the session times out. Although this can be easily configured through the CLI or SDK, I haven’t found a way to adjust it in the console yet.

Local Sessions: AWS maintains a Glue Labs Docker image intended for local AWS Glue job script development and testing. This would replace the cloud-based Interactive Sessions entirely and is arguably the best solution for data teams and at scale. The main reason I’m not using it here is that I’m the only user of this particular AWS account.

Event-Based Automation: All Interactive Sessions are stopped using the StopSession API regardless of reason. This includes the timeout process. An automated mechanism that invokes this API after a set period would effectively emulate a timeout. Additionally, since I oversee this process, I’m able to swiftly adjust the duration as needed.

And so I finally have a user story:

As an AWS account owner, I want Glue interactive sessions to stop automatically after a chosen duration so that I don’t accidentally generate unexpected and avoidable costs.

Finally, there is one further topic I want to address…

Event-Based Vs Event-Driven

Let’s examine the difference between event-based and event-driven. Mainly because I thought this was an event-driven process for months until I did some digging.

Now, I’m no expert on this. However, James Eastham is. Go watch this. It’s only six minutes – I’ll wait.

Ok good. For those who are time-strapped or want the highlights:

  • Event-based systems are technical events. Represented in a data context as API calls like ObjectCreated and CrawlerStarted.
  • Event-driven systems are business events. Represented in a data context as processes like Refresh Started and Sales Data Ingested.

My Glue Cost Control system is event-based because it is governed entirely by AWS events and API calls: StartSession will trigger some AWS automation that ultimately invokes StopSession.

So what does that automation look like? Well…

Architecture

This section examines the decision-making and technical implementation of my AWS Glue event-based cost control architecture. In my investigations, I discovered that AWS is way ahead of me!

Existing AWS Solution

The AWS Big Data blog has a 2023 post about enforcing boundaries on AWS Glue interactive sessions using this architecture:

The whole process is listed here, and the post’s code is in a GitHub repo. In summary:

  • The Glue Interactive Session creates a CloudTrail Event Record.
  • An EventBridge Rule captures the event and invokes a Lambda function.
  • The Lambda function inspects the event and acts depending on set boundaries.
  • SNS handles user notifications.
  • SQS and CloudWatch handle errors.

I’m using this architecture as a basis for my event-based Glue cost control process with some changes.

Architectural Decisions

This section outlines my adjustments to the AWS architecture to better align with my event-based Glue cost control process.

Replace Lambda With Step Functions

The AWS solution uses a Lambda function for event inspection and API interaction. This function has lots going on. But my needs are far simpler and fall well within the remit of a Step Functions workflow.

Many AWS heavyweights evangelize Step Functions over Lambda. Most recently, Eric Johnson dedicated a slide of his 2024 re:Invent session to this mantra:

“Step Functions first,
Step Functions always.”

For this use case, I’m inclined to agree. Step Functions offers several advantages over Lambda here:

Service Integration: Lambda’s interactivity with other AWS services requires manual code (e.g. a Python boto3 client). Step Functions offer no-code AWS service integrations that interact directly with AWS APIs. So my Step Function will be faster to develop.

Error Handling: Lambda relies on the function code for error handling and retries. In contrast, Step Functions offer configurable built-in no-code error handling and retry mechanisms, making my Step Function more resilient.

Ongoing Maintenance: While AWS manages the Lambda service, the function code still needs runtime maintenance, security patching and general refactoring as it ages. Conversely, Step Functions use static JSON and YAML-based ASL, so my Step Function will require less ongoing maintenance.

Step Function Model

There are two Step Function models: Standard Workflows and Express Workflows. I’ll be using a Standard workflow here. Two factors drive this decision:

API Behaviour: Changing a Glue Interactive Session is not an idempotent action. Requesting a change to a session in an invalid state produces an IllegalSessionState exception. For example, the below error is thrown when trying to stop a Glue job that hasn’t yet been fully provisioned:

JSON
{
  "cause": "Session is in PROVISIONING status (Service: Glue, Status Code: 400, Request ID: null)",
  "error": "Glue.IllegalSessionStateException",
  "resource": "stopSession",
  "resourceType": "aws-sdk:glue"
}

Express Workflows utilize an at-least-once model, meaning an execution might run multiple times. Sending several requests that are very likely to fail will create confusion and waste resources. In contrast, Standard Workflows adhere to an exactly-once model with optional retries, significantly reducing the likelihood of these problems.

And speaking of resource use…

Cost: Express Workflow executions are charged according to how often they run, the duration of each run and the memory consumed during the process. Standard Workflow executions are billed based on the number of state transitions and feature a generous and indefinite free tier.

Standard Workflows are a better option here because my workflow requires waiting. While Express Workflows may not be too costly, I’d still be paying for the wait. And remember – the whole point is to reduce avoidable costs! Conversely, Standard Workflows would stay entirely within the free tier at the expected volumes.

Remove The SQS Queue

I’ve removed the SQS queue simply because I don’t need it here. It was originally intended to record events that triggered a Lambda function failure. However, the Step Function workflow’s inbuilt auditing will now capture this.

Considering the Frugal Architect Mindset and AWS Well-Architected Framework‘s Cost Optimization Pillar, the SQS queue’s financial and development costs are no longer justified. This cements its removal.

Architecture Diagram

This is my event-based Glue Cost Control process architecture diagram:

In this solution:

  1. User interacts with a Glue ETL Job and creates an Interactive Session.
  2. Glue CreateSession event is created.
  3. Glue CreateSession event creates a CloudTrail event record.
  4. EventBridge matches the event record to an event rule.
  5. Eventbridge extracts the event’s SessionID and passes it to the Step Functions workflow, which waits for the set duration.
  6. Workflow passes SessionID to the Glue StopSession API. This action retries twice if it is unsuccessful.
  7. Finally, Workflow triggers an SNS email confirming the session’s stop.

Additionally, several services send logs to CloudWatch and gain permissions using IAM. If the Step Function fails, a CloudWatch alarm triggers a user email.

Summary

In this post, I examined some unexpected AWS Glue costs and designed an event-based cost control process architecture.

Once I understood the problem clearly, I iterated on an existing AWS architecture to build my bespoke event-based process. My architecture diagram shows how the key components work together and provides a clear implementation roadmap. In the next post I’ll start the build!

If you found this post helpful, the button below will take you to my contact details, socials, projects, and sessions.

SharkLinkButton 1

Thanks for reading ~~^~~

Categories
Developing & Application Integration

WordPress Bronze Data Orchestration With AWS

In this post, I create my WordPress pipeline’s bronze data orchestration process using AWS Lambda layers and AWS Step Functions.

Table of Contents

Introduction

In recent posts, I’ve written a Python script to extract WordPress API data and automated the script’s invocation with AWS services. This script creates five JSON objects in an S3 bucket at 07:00 each morning.

Now, I want to transform the data from semi-structured raw JSON into a more structured and query-friendly ‘bronze’ format to prepare it for downstream partitioning, cleansing and filtration.

Firstly, I’ll cover the additions and changes to my pipeline architecture. Next, I’ll examine both my new bronze Python function and the changes made to the existing raw function.

Finally, I’ll deploy the bronze script to AWS Lambda and create my WordPress pipeline orchestration process with AWS Step Functions. This process will ensure both Lambdas run in a set order each day.

Let’s start by examining my latest architectural decisions.

Architectural Decisions

In this section, I examine my architectural decisions for the bronze AWS Lambda function and the WordPress pipeline orchestration. Note that these decisions are in addition to my previous ones here and here.

AWS SDK For pandas

AWS SDK For pandas is an open-source Python initiative using the pandas library. It integrates with AWS services including Athena, Glue, Redshift, DynamoDB and S3, offering abstracted functions to execute various data processes.

AWS SDK For pandas used to be called awswrangler until AWS renamed it for clarity. It now exists as AWS SDK For pandas in documentation and awswrangler in code.

AWS Lambda Layers

A Lambda layer is an archive containing code like libraries, dependencies, or custom runtimes. Layers can be both created manually and provided by AWS and third parties. Each Lambda function can include up to five layers.

Layers can be shared between functions, reducing code duplication and package sizes. This reduces storage costs and lets the smaller packages deploy markedly faster. Layers also separate dependencies from function code, supporting decoupling and separation of concerns.

AWS Step Functions

AWS Step Functions is a serverless orchestration service that integrates with other AWS services to build application workflows as a series of event-driven steps. For example, chaining Athena queries and ML model training.

Central to the Step Functions service are the concepts of States and State Machines:

  • States represent single steps or tasks in a workflow, and can be one of several types. The Step Functions Developer Guide has a full list of states.

The AWS Step Functions Developer Guide’s welcome page has more details including workflow types, use cases and a variety of sample projects.

Apache Parquet

Onto the data architecture! Let’s start by choosing a structured file type for the bronze data:

Apache Parquet is an open source, column-oriented data file format designed for efficient data storage and retrieval. It provides efficient data compression and encoding schemes with enhanced performance to handle complex data in bulk.

Databricks: What is Parquet?

There’s a more detailed explanation in the Parquet documentation too. So why choose Parquet over something like CSV? Well:

  • Size: Parquet supports highly efficient compression, so files take up less space and are cheaper to store than CSVs.
  • Performance: Parquet files store metadata about the data they hold. Query engines can use this metadata to find only the data needed, whereas with CSVs the whole file must be read first. This reduces the amount of processed data and enhances query performance.
  • Compatibility: Parquet is an open standard supported by various data processing frameworks including Apache Spark, Apache Hive and Presto. This means that data stored in Parquet format can be read and processed across many platforms and services.

Data Lakehouse

A Data Lakehouse is an emerging data architecture combining the centralized storage of raw data synonymous with Data Lakes with the transactional and analytical processing associated with Data Warehouses.

The result is a unified platform for efficient data management, analytics, and insights. Lakehouses have gained popularity as cloud services increasingly support them, with AWS, Azure and GCP all providing Lakehouse services.

This segues neatly into…

Medallion Architecture

Medallion Architecture is a data design pattern for logically organizing data in a Lakehouse. It aims to improve data quality as it flows through various layers incrementally. Names for these layers vary, tending to be Bronze, Silver, and Gold.

Implementations of the Medallion Architecture also vary. I like this Advancing Analytics video, which maps the Medallion Architecture to their approach. Despite the title it’s not a negative video, instead outlining how the three layers don’t necessarily fit every use case.

I’m using Raw and Bronze layers here because they best fit what I’m doing with my data.

Architectural Updates

In this section, I examine the changes made to my existing architecture.

Amazon S3

I’ve created a new data-lakehouse-bronze s3 bucket in the same region as the data-lakehouse-raw bucket to separate the two data layers.

Why use two buckets instead of one bucket with two prefixes? Well, after much research I’ve not found a right or wrong answer for this. There’s no difference in cost, performance or availability as long as all objects are stored in the same AWS region.

I chose two buckets because I find it easier to manage multiple buckets with flat structures and small bucket policies, as opposed to single buckets with deep structures and large bucket policies.

The truest answer is ‘it depends’, as other factors can come into play like:

  • Data Sovereignty: S3 bucket prefixes exist in the same region as the parent bucket. Regulations like GDPR and CCPA may require using separate buckets in order to isolate data within designated locations.

AWS SNS

I previously had two standard SNS Topics:

  • wordpress-api-raw for Lambda function alerts
  • failure-lambda for Lambda Destination alerts.

Firstly, there’s now an additional failure-stepfunction topic for any state machine failures.

Secondly, I’ve replaced my wordpress-api-raw topic with a data-lakehouse-raw topic to simplify my alerting channels and allow resource reuse. I’ve also created a new data-lakehouse-bronze topic for bronze process alerts.

Why two data topics? Well, different teams and services care about different things. A bronze-level failure may only concern the Data Engineering team as no other teams consume the data. Conversely, a gold-level failure will concern the AI and MI teams as it impacts their models and reports. Having separate SNS topics for each layer type enables granular monitoring controls.

AWS Parameter Store

Finally, Parameter Store needs the new S3 bucket name and SNS ARNs. I’ve replaced the /sns/pipeline/wordpressapi/raw parameter with /sns/data/lakehouse/raw to preserve the name schema.

I’m now storing five parameters:

  • 2x S3 Bucket names (Raw and Bronze)
  • 2x SNS Topic ARNs (Raw and Bronze notifications)
  • WordPress API Endpoints (unchanged)

Architectural Diagram

There are two diagrams this time! Firstly, here is the data_wordpressapi_bronze AWS Lambda function:

Where:

  1. AWS Lambda calls Parameter Store for S3 and SNS parameters. Parameter Store returns these to AWS Lambda.
  2. Lambda function gets raw WordPress JSON data from S3 Raw Bucket.
  3. Lambda function transforms the raw WordPress JSON data to bronze WordPress Parquet data and puts the new object in the S3 Bronze Bucket.

Meanwhile, Lambda is writing to a CloudWatch Log Group throughout its invocation. If there’s a failure, the Lambda function publishes a message to an SNS topic. SNS then delivers this message to the user’s subscribed email address.

Next, this is the AWS Step Functions WordPress bronze orchestration process:

Where:

  1. EventBridge Schedule invokes the State Machine.
  2. State Machine invokes the Raw Lambda function.
  3. State Machine invokes the Bronze Lambda function.

The State Machine also has its own logging and alerting channels.

Python

In this section, I work on my raw and bronze Python scripts for the WordPress pipeline orchestration process.

Raw Script Updates

I try to update my existing resources when I find something pertinent online. My latest find was this Indently video that covers, amongst other things, type annotations:

So how are type annotations different from type hints? Type annotations were released in 2006 and aimed to standardize function parameters and return value annotation. Type hints (released in 2014) then added updated definitions and conventions to enrich type annotations further.

The type hints PEP shows this difference between the two:

When used in a type hint, the expression None is considered equivalent to type(None)

https://peps.python.org/pep-0484/#using-none

So in this function:

Python
def send_email(name: str, message: str) -> None:
  • name: str is an example of type annotation because the parameter name is of type string.
  • -> None is an example of a type hint because although None isn’t a type, it confirms that the function has no output.

So what’s changed in my raw script?

Updated Import & Functions

Let’s open with a new import:

Python
from botocore.client import BaseClient

BaseClient serves as a foundational base class for AWS service clients within botocore – a low-level library providing the core functionality of boto3 (the AWS Python SDK) and the AWS CLI.

I’m using it here to add type annotations to my boto3 clients. For example, send_sns_message already had these annotations:

Python
def send_sns_message(sns_client, topic_arn: str, subject:str, message: str):

I’ve now annotated sns_client with BaseClient to indicate its boto3 relation. I’ve also added a -> None type hint to confirm the function has no output:

Python
def send_sns_message(sns_client: BaseClient, topic_arn: str, subject:str, message: str) -> None:

Elsewhere, I’ve added the BaseClient annotation to get_parameter_from_ssm‘s ssm_client parameter:

Python
def get_parameter_from_ssm(ssm_client: BaseClient, parameter_name: str) -> str:

And put_s3_object‘s s3_client parameter:

Python
def put_s3_object(s3_client: BaseClient, bucket: str, prefix:str, name: str, json_data: str, suffix: str) -> bool:

put_s3_object also has new prefix and suffix parameters. Before this, it was hard-coded to create JSON objects in a wordpress-api S3 prefix:

Python
    try:
        logging.info(f"Attempting to put {name} data in {bucket} bucket...")
        s3_client.put_object(
            Body = json_data,
            Bucket = bucket,
            Key = f"wordpress-api/{name}.json"
        )

Not any more! The S3 prefix and object suffix can now be changed dynamically:

Python
    try:
        logging.info(f"Attempting to put {name} data in {bucket} bucket's {prefix}/{name} prefix...")
        s3_client.put_object(
            Body = json_data,
            Bucket = bucket,
            Key = f"{prefix}/{name}/{name}.{suffix}"
        )

This improves put_s3_object‘s reusability as I can now pass any prefix and suffix to it during a function call. For example, this call creates a JSON object:

Python
ok = put_s3_object(client_s3, s3_bucket, data_source, object_name, api_json_string, 'json')

While this creates a CSV object:

Python
ok = put_s3_object(client_s3, s3_bucket, data_source, object_name, api_json_string, 'csv')

Likewise, this creates a TXT object:

Python
ok = put_s3_object(client_s3, s3_bucket, data_source, object_name, api_json_string, 'txt')

I can also set data_source (which I’ll cover shortly) to any S3 prefix, giving total control over where the object is stored.

Updated Variables

Next, some of my variables need to change. My SNS parameter name needs updating from:

Python
# AWS Parameter Store Names
parametername_s3bucket = '/s3/lakehouse/name/raw'
parametername_snstopic = '/sns/pipeline/wordpressapi/raw'
parametername_wordpressapi = '/wordpress/amazonwebshark/api/mysqlendpoints'

To:

Python
# AWS Parameter Store Names
parametername_s3bucket = '/s3/lakehouse/name/raw'
parametername_snstopic = '/sns/data/lakehouse/raw'
parametername_wordpressapi = '/wordpress/amazonwebshark/api/mysqlendpoints'

I also need to lay the groundwork for put_s3_object‘s new prefix parameter. I used to have a lambdaname variable that was used in the logs:

Python
# Lambda name for messages
lambdaname = 'data_wordpressapi_raw'

I’ve replaced this with two new variables. data_source records the data’s origin, which matches my S3 prefix naming schema. function_name then adds context to data_source to match my Lambda function naming schema:

Python
# Lambda name for messages
data_source = 'wordpress_api'
function_name = f'data_{data_source}_raw'

data_source is then passed to the put_s3_object function call when creating raw objects:

Python
ok = put_s3_object(client_s3, s3_bucket, data_source, object_name, api_json_string)

While function_name is used in the logs when referring to the Lambda function:

Python
    # Check an S3 bucket has been returned.
    if not s3_bucket_raw:
        message = f"{function_name}: No S3 Raw bucket returned."
        subject = f"{function_name}: Failed"

Updated Script Body

My variables all now have type annotations. They’ve gone from:

Python
    # AWS Parameter Store Names
    parametername_s3bucket = '/s3/lakehouse/name/raw'
    parametername_snstopic = '/sns/data/lakehouse/raw'
    parametername_wordpressapi = '/wordpress/amazonwebshark/api/mysqlendpoints'

    # Lambda name for messages
    data_source = 'wordpress_api'
    function_name = f'data_{data_source}_raw'

    # Counters
    api_call_timeout = 30
    endpoint_count_all = 0
    endpoint_count_failure = 0
    endpoint_count_success = 0

To:

Python
    # AWS Parameter Store Names
    parametername_s3bucket: str = '/s3/lakehouse/name/raw'
    parametername_snstopic: str = '/sns/data/lakehouse/raw'
    parametername_wordpressapi: str = '/wordpress/amazonwebshark/api/mysqlendpoints'

    # Lambda name for messages
    data_source: str = 'wordpress_api'
    function_name: str = f'data_{data_source}_raw'

    # Counters
    api_call_timeout: int = 30
    endpoint_count_all: int = 0
    endpoint_count_failure: int = 0
    endpoint_count_success: int = 0

This is helpful when the variables are passed in from settings files or external services and are not immediately apparent. So a good habit to get into!

Bronze Script

Now let’s talk about the new script, which transforms raw S3 JSON objects into bronze S3 Parquet objects. Both raw and bronze WordPress scripts will then feed into an AWS orchestration workflow.

Reused Raw Functions

The following functions are re-used from the Raw script with no changes:

Get Filename Function

Here, I want to get each S3 path’s object name. The object name has some important uses:

  • Using it instead of the full S3 path makes the logs easier to read and cheaper to store.
  • Using it during bronze S3 object creation ensures consistent naming.

A typical S3 path has the schema s3://bucket/prefix/object.suffix, from which I want object.

This function is a remake of the raw script’s Get Filename function. This time, the source string is an S3 path instead of an API endpoint:

I define a get_objectname_from_s3_path function, which expects a path argument with a string type hint and returns a new string.

Firstly, my name_full variable uses the rsplit method to capture the substring I need, using forward slashes as separators. This converts s3://bucket/prefix/object.suffix to object.suffix.

Next, my name_full_last_period_index variable uses the rfind method to find the last occurrence of the period character in the name_full string.

Finally, my name_partial variable uses slicing to extract a substring from the beginning of the name_full string up to (but not including) the index specified by name_full_last_period_index. This converts object.suffix to object.

If the function cannot return a string, an exception is logged and a blank string is returned instead.

Get Data Function

Next, I want to read data from an S3 JSON object in my Raw bucket and store it in a pandas DataFrame.

Here, I define a get_data_from_s3_object function that returns a pandas DataFrame and expects three arguments:

  • boto3_session: the authenticated session to use with a BaseClient type hint.
  • s3_object: the S3 object path with a string type hint.
  • name: the S3 object name with a string type hint (used for logging).

This function uses AWS SDK For pandas s3.read_json to read the data from the S3 object path using the existing boto3_session authentication.

If data is found then get_data_from_s3_object returns a populated DataFrame. Otherwise, an empty DataFrame is returned instead.

Put Data Function

Finally, I want to convert the DataFrame to Parquet and store it in my bronze S3 bucket.

I define a put_s3_parquet_object function that expects four arguments:

  • df: the pandas DataFrame containing the raw data.
  • name: the S3 object name.
  • s3_object_bronze: the S3 path for the new bronze object
  • session: the authenticated boto3 session to use.

I give string type hints to the name and s3_object_bronze parameters. session gets the same BaseClient hint as before, and df is identified as a pandas DataFrame.

I open a try except block that uses s3.to_parquet with the existing boto3_session to upload the DataFrame data to S3 as a Parquet object. If this operation succeeds, the function returns True. If it fails, a botocore exception is logged and the function returns False.

Imports & Variables

The bronze script has two new imports to examine: awswrangler and pandas:

Python
import logging
import boto3
import botocore
import awswrangler as wr
import pandas as pd
from botocore.client import BaseClient

I’ve used both before. Here, pandas handles my in-memory data storage and awswrangler handles my S3 interactions.

There are also parameter changes. I’ve added Parameter Store names for both the bronze S3 bucket and the SNS topic. I’ve kept the raw S3 bucket parameter as awswrangler needs it for the get_data_from_s3_object function.

Python
parametername_s3bucket_raw: str = '/s3/lakehouse/name/raw'
parametername_s3bucket_bronze: str = '/s3/lakehouse/name/bronze'
parametername_snstopic: str = '/sns/data/lakehouse/bronze'

I’ve also swapped out _raw for _bronze in function_name, and renamed the counters from endpoint_count to object_count to reflect their new function:

Python
    # Lambda name for messages
    data_source: str = 'wordpress_api'
    function_name: str = f'data_{data_source}_bronze'

    # Counters
    object_count_all: int = 0
    object_count_failure: int = 0
    object_count_success: int = 0

Script Body

Most of the bronze script is reused from the raw script. Tasks like logging config, name parsing and validation checks only needed the updated parameters! There are some changes though, as S3 is now my data source and I’m also doing additional tasks.

Firstly, I need to get the raw S3 objects. The AWS SDK For pandas S3 class has a list_objects function which is purpose-built for this:

Python
s3_objects_raw = wr.s3.list_objects(
      path = f's3://{s3_bucket_raw}/{data_source}',
      suffix = 'json',
      boto3_session = session)
  • path is the S3 location to list – in this case the raw S3 bucket’s wordpress_api prefix.
  • suffix filters the list by the specified suffix.
  • boto3_session specifies my existing boto3_session to prevent unnecessary re-authentication.

During the loop, my script checks if the pandas DataFrame returned from get_data_from_s3_object contains data. If it’s empty then the loop ends, otherwise the column and row counts are logged:

Python
if df.empty:
  logging.warning(f"{object_name} DataFrame is empty!")
  endpoint_count_failure += 1
  continue
  
logging.info(f'{object_name} DataFrame has {len(df.columns)} columns and {len(df)} rows.')

Assuming all checks succeed, I want to put a new Parquet object into my bronze S3 bucket. AWS SDK For pandas has an s3.to_parquet function that does this using a pandas DataFrame and an S3 path.

I already have the DataFrame so let’s make the path. This is done by the s3_object_bronze parameter, which joins existing parameters with additional characters. This is then passed to put_s3_parquet_object:

Python
s3_object_bronze = f's3://{s3_bucket_bronze}/{data_source}/{object_name}/{object_name}.parquet'

logging.info(f"Attempting {object_name} S3 Bronze upload...")
ok = put_s3_parquet_object(df, object_name, s3_object_bronze, session)

That’s the bronze script done. Now to deploy it to AWS Lambda.

Lambda

In this section, I configure and deploy my Bronze Lambda function.

Hitting Size Limits

So, remember when I said that I expected my future Lambda deployments to improve? Well, this was the result of my retrying the virtual environment deployment process the Raw Lambda used:

2024 03 08 LambdaError

While my zipped raw function is 19.1 MB, my zipped bronze function is over five times bigger at 101.6 MB! My poorly optimised package wouldn’t cut it this time, so I prepared for some pruning. Until I discovered something…

Using A Layer

There’s a managed AWS SDK for pandas Lambda layer!

2024 03 05 LayerAWSSDKPandas

It can be selected in the Lambda console or programmatically called from this list of AWS SDK for pandas Managed Layer ARNs, which covers:

  • All AWS commercial regions.
  • All Python versions currently supported by Lambda (currently 3.8+)
  • Both Lambda architectures.

Additionally, the Lambda Python 3.12 runtime includes boto3 and botocore. So by using this runtime and the managed layer, I’ve gone from a large deployment package to no deployment package! And because my function is now basically just code, I can view and edit that code in the Lambda console directly.

Lambda Config

My Bronze Lambda function borrows several config settings from the raw one, including:

Where it differs is the IAM setup. I needed additional permissions anyway because this function is reading from two S3 buckets now, but by the time I was done the policy was hard to read, maintain and troubleshoot:

JSON
{
    "Version": "2012-10-17",
    "Statement": [
        {
            "Sid": "VisualEditor0",
            "Effect": "Allow",
            "Action": [
                "s3:PutObject",
                "s3:GetObject",
                "sns:Publish",
                "s3:ListBucket",
                "logs:CreateLogGroup"
            ],
            "Resource": [
                "arn:aws:s3:::data-lakehouse-raw/wordpress_api/*",
                "arn:aws:s3:::data-lakehouse-bronze/wordpress_api/*",
                "arn:aws:s3:::data-lakehouse-raw",
                "arn:aws:s3:::data-lakehouse-bronze",
                "arn:aws:logs:eu-west-1:REDACTED:*",
                "arn:aws:sns:eu-west-1:REDACTED:data-lakehouse-raw",
                "arn:aws:sns:eu-west-1:REDACTED:data-lakehouse-bronze"
            ]
        },
        {
            "Sid": "VisualEditor1",
            "Effect": "Allow",
            "Action": [
                "logs:CreateLogStream",
                "logs:PutLogEvents",
                "ssm:GetParameter"
            ],
            "Resource": [
                "arn:aws:logs:eu-west-1:REDACTED:log-group:/aws/lambda/data_wordpressapi_bronze:*",
                "arn:aws:ssm:eu-west-1:REDACTED:parameter/s3/lakehouse/name/raw",
                "arn:aws:ssm:eu-west-1:REDACTED:parameter/s3/lakehouse/name/bronze",
                "arn:aws:ssm:eu-west-1:REDACTED:parameter/sns/data/lakehouse/raw",
                "arn:aws:ssm:eu-west-1:REDACTED:parameter/sns/data/lakehouse/bronze"
            ]
        }
    ]
}

So let’s refactor it! The below policy has the same actions, grouped by service and with appropriately named Sids:

JSON
{
    "Version": "2012-10-17",
    "Statement": [
        {
            "Sid": "CloudWatchLogGroupActions",
            "Effect": "Allow",
            "Action": [
                "logs:CreateLogGroup"
            ],
            "Resource": [
                "arn:aws:logs:eu-west-1:REDACTED:*"
            ]
        },
        {
            "Sid": "CloudWatchLogStreamActions",
            "Effect": "Allow",
            "Action": [
                "logs:CreateLogStream",
                "logs:PutLogEvents"
            ],
            "Resource": [
                "arn:aws:logs:eu-west-1:REDACTED:log-group:/aws/lambda/data_wordpressapi_bronze:*"
            ]
        },
        {
            "Sid": "S3BucketActions",
            "Effect": "Allow",
            "Action": [
                "s3:ListBucket"
            ],
            "Resource": [
                "arn:aws:s3:::data-lakehouse-raw",
                "arn:aws:s3:::data-lakehouse-bronze"
            ]
        },
        {
            "Sid": "S3ObjectActions",
            "Effect": "Allow",
            "Action": [
                "s3:PutObject",
                "s3:GetObject"
            ],
            "Resource": [
                "arn:aws:s3:::data-lakehouse-raw/wordpress_api/*",
                "arn:aws:s3:::data-lakehouse-bronze/wordpress_api/*"
            ]
        },
        {
            "Sid": "SNSActions",
            "Effect": "Allow",
            "Action": [
                "sns:Publish"
            ],
            "Resource": [
                "arn:aws:sns:eu-west-1:REDACTED:data-lakehouse-bronze"
            ]
        },
        {
            "Sid": "ParameterStoreActions",
            "Effect": "Allow",
            "Action": [
                "ssm:GetParameter"
            ],
            "Resource": [
                "arn:aws:ssm:eu-west-1:REDACTED:parameter/s3/lakehouse/name/raw",
                "arn:aws:ssm:eu-west-1:REDACTED:parameter/s3/lakehouse/name/bronze",
                "arn:aws:ssm:eu-west-1:REDACTED:parameter/sns/data/lakehouse/bronze"
            ]
        }
    ]
}

Much better! This policy is now far easier to read and update.

There’s also a clear distinction between the bucket-level s3:ListBucket operation and the object-level s3:PutObject and s3:GetObject operations now. Getting these wrong can have big consequences, so the clearer the better!

One deployment and test later, and I have some new S3 objects!

[INFO]: WordPress API Bronze process complete: 5 Successful | 0 Failed.

REPORT RequestId: 899d1658-f7de-4e74-8d64-b4f029fe2bec	Duration: 7108.50 ms	Billed Duration: 7109 ms	Memory Size: 250 MB	Max Memory Used: 250 MB	Init Duration: 4747.38 ms

So now I have two Lambda functions with some requirements around them:

  • They need to run sequentially.
  • The Raw Lambda must finish before the Bronze Lambda starts.
  • If the Raw Lambda fails then the Bronze Lambda shouldn’t run at all.

Now that AWS Lambda is creating WordPress raw and bronze objects, it’s time to start thinking about orchestration!

Step Functions & EventBridge

In this section, I create both an AWS Step Functions State Machine and an Amazon EventBridge Schedule for my WordPress bronze orchestration process.

State Machine Requirements

Before writing any code, let’s outline the steps I need the state machine to perform:

  1. data_wordpressapi_raw Lambda function is invoked. If it succeeds then move to the next step. If it fails then send a notification and end the workflow reporting failure.
  2. data_wordpressapi_bronze Lambda function is invoked. If it succeeds then end the workflow reporting success. If it fails then send a notification and end the workflow reporting failure.

With the states defined, it’s time to create the state machine.

State Machine Creation

The following state machine was created using Step Functions Workflow Studio – a low-code visual designer released in 2021, with drag-and-drop functionality that auto-generates code in real-time:

Workflow Studio produced this section’s code and diagrams.

Firstly I create a data_wordpressapi_raw task state to invoke my Raw Lambda. This task uses the lambda:invoke action to invoke my data_wordpressapi_raw function. I set the next state as data_wordpressapi_bronze and add a Catch block that sends all errors to a PublishFailure state (which I’ll define later):

JSON
    "data_wordpressapi_raw": {
      "Type": "Task",
      "Resource": "arn:aws:states:::lambda:invoke",
      "Parameters": {
        "Payload.$": "$",
        "FunctionName": "arn:aws:lambda:eu-west-1:REDACTED:function:data_wordpressapi_raw:$LATEST"
      },
      "Next": "data_wordpressapi_bronze",
      "Catch": [
        {
          "ErrorEquals": [
            "States.ALL"
          ],
          "ResultPath": "$.Error",
          "Next": "PublishFailure"
        }
      ],
      "TimeoutSeconds": 120
    }

Note the TimeoutSeconds parameter. All my task states will have 120-second timeouts. These stop the state machine from waiting indefinitely if the task becomes unresponsive, and are recommended best practice. Also note that state machines wait for Lambda invocations to finish by default, so no additional config is needed for this.

Next, I create a data_wordpressapi_bronze task state to invoke my Bronze Lambda. This task uses the lambda:invoke action to invoke my data_wordpressapi_bronze function. I then add a Catch block that sends all errors to a PublishFailure state.

Finally, "End": true designates this state as a terminal state which ends the execution if the task is successful:

JSON
    "data_wordpressapi_bronze": {
      "Type": "Task",
      "Resource": "arn:aws:states:::lambda:invoke",
      "Parameters": {
        "Payload.$": "$",
        "FunctionName": "arn:aws:lambda:eu-west-1:973122011240:function:data_wordpressapi_bronze:$LATEST"
      },
      "Catch": [
        {
          "ErrorEquals": [
            "States.ALL"
          ],
          "ResultPath": "$.Error",
          "Next": "PublishFailure"
        }
      ],
      "TimeoutSeconds": 120,
      "End": true
    }

Finally, I create a PublishFailure task state that publishes failure notifications. This task uses the sns:Publish action to publish a simple message to the failure-stepfunction SNS Topic ARN. "End": true marks this task as the other potential way the state machine execution can end:

JSON
    "PublishFailure": {
      "Type": "Task",
      "Resource": "arn:aws:states:::sns:publish",
      "Parameters": {
        "TopicArn": "arn:aws:sns:eu-west-1:REDACTED:failure-stepfunction",
        "Message": "An error occurred in the state machine: { \"error\": \"$.Error\" }"
      },
      "End": true,
      "TimeoutSeconds": 120
    }

While both Lambdas already have SNS alerting, the state machine itself may also fail so the added observability is justified. This Marcia Villalba video was very helpful here:

And that’s everything I need! At this point Wordflow Studio gives me two things – firstly the state machine’s code, which I’ve committed to GitHub. And secondly this handy downloadable diagram:

stepfunctions graph

State Machine Config

It’s now time to think about security and monitoring.

When new state machines are created in the AWS Step Functions console, an IAM Role is created with policies based on the state machine’s resources. The nuances and templates are covered in the Step Functions Developer Guide, so let’s examine my WordPress_Raw_To_Bronze state machine’s auto-generated IAM Role consisting of two policies:

JSON
{
    "Version": "2012-10-17",
    "Statement": [
        {
            "Effect": "Allow",
            "Action": [
                "xray:PutTraceSegments",
                "xray:PutTelemetryRecords",
                "xray:GetSamplingRules",
                "xray:GetSamplingTargets"
            ],
            "Resource": [
                "*"
            ]
        }
    ]
}

This supports the AWS X-Ray integration with AWS Step Functions. If X-Ray trancing is never enabled then this policy is unused.

Besides X-Ray tracing, there is also an option to log a state machine’s execution history to CloudWatch Logs. There are three log levels available plus a fourth default choice: OFF. Each state machine retains recent execution history and I’ve got no need to keep that history long-term, so I leave the log retention disabled. Remember – CloudWatch Logs is only free for the first 5GB!

State Machine Testing

There are various ways to test a state machine. There’s a testing and debugging section in the developer guide that goes into further details, the three main options being:

I’ll focus on console testing here.

Both individual states and the entire state machine can be tested in the console. Each state can be tested in isolation (using the TestState API under the hood) with customisable inputs and IAM roles. This is great for checking the state outputs are correct, and that the attached IAM role is sufficient.

The state machine itself can also be tested via on-demand execution. The Execution Details page shows the state machine’s statistics and events, and has great coverage in the developer guide.

During testing, my WordPress_Raw_To_Bronze state machine returned this error:

States.Runtime in step: data_wordpressapi_bronze.

An error occurred while executing the state 'data_wordpressapi_bronze' (entered at the event id #7). Unable to apply Path transformation to null or empty input.

This turned out to be a problem with the OutputPath parameter, which Wordflow Studio enables by default:

2024 03 04 StepFunctionsOutPutPath

I’m not using this setting for anything, so I disabled it to solve this problem.

Eventbridge Schedule

Finally, I want to automate the execution of my state machine. This calls for an EventBridge Schedule!

EventBridge makes this quite simple, using mostly the same process as last time. The Step Functions StartExecution operation is a templated target like Lambda’s Invoke operation, so it’s a case of selecting the WordPress_Raw_To_Bronze state machine from the list and updating the schedule’s IAM role accordingly.

And that’s it! EventBridge now executes the state machine at 07:00 each morning. The state machine then sequentially invokes both Lambda functions and catches any errors.

Costs

In this section, I’ll examine my recent AWS WordPress bronze orchestration process costs.

Let’s start with Step Functions. There are two kinds of Step Function workflow:

  • Standard workflows are charged based on the number of state transitions. These are counted each time a workflow step is executed. The first 4000 transitions each month are free. After that, every 1000 transitions cost $0.025.
  • Express workflows are priced by the number of executions, duration, and memory consumption. The specifics of these criteria, coupled with full details of all charges are on the Step Functions pricing page.

I’m using standard workflows, and as of 26 March I’ve used 118 state transitions. In other words, free! Elsewhere, my costs are broadly on par with previous months. These are my S3 costs from 2024-02-01 to 2024-03-26:

S3 ActionsMonthUsageCost
PUT, COPY, POST, or LIST requests2024-0264,1960.32
PUT, COPY, POST, or LIST requests2024-0317,5660.09
GET and all other requests2024-02101,4620.04
GET and all other requests2024-038,6560.00
GB month of storage used2024-020.1090.00
GB month of storage used2024-030.1610.00

And this is my recent free tier usage from 2024-02-01 to 2024-03-26:

ServiceMonthUsage
EventBridge2024-0231 Invocations
EventBridge2024-0325 Invocations
Lambda2024-02122.563 Second Compute
Lambda2024-0284 Requests
Lambda2024-0382.376 Second Compute
Lambda2024-0358 Requests
Parameter Store2024-0234 API Requests
Parameter Store2024-0325 API Requests
SNS2024-028 Email-JSON Notifications
SNS2024-02438 API Requests
SNS2024-033 Email-JSON Notifications
SNS2024-03205 API Requests

So my only costs are still for storage.

Resources

The following items have been checked into the amazonwebshark GitHub repo for the AWS WordPress bronze orchestration process, available via the button below:

  • Updated data_wordpressapi_raw Python script & requirements.txt file.
  • New data_wordpressapi_bronze Python script & requirements.txt file.
  • WordPress_Raw_To_Bronze state machine JSON.
GitHub-BannerSmall

Summary

In this post, I created my WordPress pipeline’s bronze data orchestration process using AWS Lambda layers and AWS Step Functions.

I’ve wanted to try Step Functions out for a while, and all things considered they’re great! Workflow Studio is easy to use, and the templates and tutorials undoubtedly highlight the value that Step Functions can bring.

Additionally, the integration with both EventBridge Scheduler and other AWS services makes Step Functions a compelling orchestration service for both my ongoing WordPress bronze work and the future projects in my pipeline. This combined with some extra Lambda layers will reduce my future dev and test time.

If this post has been useful then the button below has links for contact, socials, projects and sessions:

SharkLinkButton 1

Thanks for reading ~~^~~