Categories
Developing & Application Integration

Simplified Data Workflows With AWS Step Functions Variables

In this post, I use AWS Step Functions variables and JSONata to create a simplified API data capture workflow with Lambda and DynamoDB.

Table of Contents

Introduction

I’ve become an AWS Step Functions convert in recent times. Back in 2020 when I first studied it for some AWS certifications, Step Functions defined workflows entirely in JSON, making it less approachable and often overlooked.

How times change! With 2021’s inclusion of a visual editor, Step Functions became far more accessible, helping it become a key tool in serverless application design. And in 2024 two major updates significantly enhanced Step Functions’ flexibility: JSONata support, which I recently explored, and built-in variables, which simplify state transitions and data management. This post focuses on the latter.

To demonstrate the power of Step Functions variables, I’ll walk through a practical example: fetching API data, verifying the response, and inserting it into DynamoDB. Firstly, I’ll examine the services and features I’ll use. Then I’ll create a state machine and examine each state’s use of variables. Finally, I’ll complete some test executions to ensure everything works as expected.

If a ‘simplified’ workflow seems hard to justify as a 20-minute read…that’s fair. But mastering Step Functions variables now can save hours of debugging and development in the long run! – Ed

Also, special thanks to AWS Community Builder Md. Mostafa Al Mahmud for generously providing AWS credits to support this and future posts!

Architecture

This section provides a top-level view of the architecture behind my simplified Step Functions variables workflow, highlighting the main AWS services involved in getting and processing API data. I’ll briefly cover the data being used, the role of Step Functions variables and the integration of DynamoDB within the workflow.

API Data

The data comes from a RESTful API that provides UK car details. The API needs both an authentication key and query parameters. Response data is provided in JSON.

The data used in this post is about my car. As some of it is sensitive, I will only use data that is already publicly available:

JSON
{
    "make": "FORD",
    "yearOfManufacture": 2014,
    "engineCapacity": 1242,
    "co2Emissions": 120,
    "fuelType": "PETROL",
    "markedForExport": false,
    "colour": "GREY",
}

There are several data types here. This will be important when writing to DynamoDB!

AWS Step Functions Variables

In my last post, I talked about JSONata in AWS Step Functions. This time let’s talk about Step Functions variables, which were introduced alongside JSONata in November 2024.

Step Functions variables offer a simple way to store and reuse data within a state machine, enabling dynamic workflows without complex transformations. They work well with both JSONata and JSONPath and are available at no extra cost in all AWS regions that support Step Functions.

Variables are set using Assign. They can be assigned static values for fixed values:

JSON
"Assign": {
    "productName": "product1",
    "count" : 42,
    "available" : true
}

As well as dynamic values for changing values. To dynamically set variables, Step Functions uses JSONata expressions within {% ... %}. The following example extracts productName and available from the state input using the JSONata $states reserved variable:

JSON
"Assign": {
    "product": "{% $states.input.productName %}",
    "available": "{% $states.input.available %}"
}

Variables are then referenced using dollar signs ($), e.g. $productName.

There’s tonnes more to this. For details on name syntax, ASL integration and creating JSONPath variables, check the Step Functions Developer Guide variables section. Additionally, watch AWS Principal Developer Advocate Eric Johnson‘s related video:

With Step Functions variables handling data transformation and persistence, the next step is storing processed data efficiently. This is where Amazon DynamoDB comes in.

Amazon DynamoDB

DynamoDB is a fully managed NoSQL database built for high performance and seamless scalability. Its flexible, schema-less design makes it perfect for storing and retrieving JSON-like data with minimal overhead.

DynamoDB can automatically scale to manage millions of requests per second while maintaining low latency. It integrates seamlessly with AWS services like Lambda and API Gateway, providing built-in security, automated backups, and global replication to ensure reliability at any scale.

Popular use cases include:

  • Serverless backends (paired with AWS Lambda/API Gateway) for API-driven apps.
  • Real-time workloads like user sessions, shopping carts, or live leaderboards.
  • High-velocity data streams from IoT devices or clickstream analytics.

Diagram

Finally, here is an architectural diagram of my simplified Step Functions variables workflow:

In which:

  1. The user triggers an AWS Step Functions state machine with a JSON key-value pair as input.
  2. A Lambda function is invoked with the input payload.
  3. The Lambda function sends a POST request to a third-party API.
  4. The API server responds with JSON data.
  5. The Lambda function assigns Step Functions variables to store API response values and enters a Choice state that checks the API response code.
  6. If the Choice state condition fails, SNS publishes a failure notification email.
  7. The state machine terminates with an ExecutionFailed status.
  8. If the Choice state condition passes, the processed API response data is written to a DynamoDB table.
  9. SNS publishes a success notification email.
  10. The state machine terminates with an ExecutionSucceeded status.

If an error occurs at any point in execution, SNS publishes a failure notification email and the state machine terminates with an ExecutionFailed status.

Resources

In this section, I create and configure my simplified Step Functions variables workflow resources, which are:

  • AWS Lambda function
  • AWS Step Functions Choice state
  • Amazon DynamoDB table
  • Amazon SNS topic

AWS Lambda

This section details how I created an AWS Lambda function to interact with a REST API. The function makes a secure POST request, manages potential errors and parses the response for use in downstream states. Additionally, I will discuss an issue I encountered while attempting to create variables from the function’s output using JSONata.

Lambda Function

First, I need to write a function. It’s nothing special – I have the API link and security key I need, and I just want to retrieve some data. So I decided to give Amazon Q Developer a try!

I used this prompt:

I have an HTTPS hyperlink for a REST API and an API key. I want to send a POST request to the API using the key. I have a USERID parameter that I will pass as JSON.

Please write a Python function for AWS Lambda with error handling. Please use security best practises and Pythonic standards.

Yes I do say Please in my prompts.

Amazon Q produced a decent script with:

  • API endpoint and key stored as environment variables.
  • Use of the requests module with a timeout to prevent indefinite hangs.
  • Error handling for request failures, validation errors, and unexpected issues.
  • A structured JSON response including the status code and parsed response.
Python
import json
import os
import logging
import requests
from typing import Dict, Any
from botocore.exceptions import ClientError

# Configure logging
logger = logging.getLogger()
logger.setLevel(logging.INFO)

def lambda_handler(event: Dict[str, Any], context: Any) -> Dict[str, Any]:
    """
    AWS Lambda function to make POST request to API endpoint
    Args:
        event: Lambda event object containing request data
        context: Lambda context object
    Returns:
        API response as dictionary
    """
    try:
        # Get API endpoint and key from environment variables
        api_endpoint = os.environ['API_ENDPOINT']
        api_key = os.environ['API_KEY']

        # Extract user ID from event
        user_id = event.get('userId')
        if not user_id:
            raise ValueError("userId is required in the request")

        # Set request headers
        headers = {
            'Content-Type': 'application/json',
            'x-api-key': api_key
        }

        # Prepare request payload
        payload = {
            'userId': user_id
        }

        # Make POST request
        response = requests.post(
            api_endpoint,
            headers=headers,
            json=payload,
            timeout=30  # Set timeout for the request
        )

        # Raise exception for bad status codes
        response.raise_for_status()

        return {
            'statusCode': response.status_code,
            'body': response.json()
        }

    except requests.exceptions.RequestException as e:
        logger.error(f"API request failed: {str(e)}")
        return {
            'statusCode': 500,
            'body': json.dumps({'error': 'Failed to make API request'})
        }

    except ValueError as e:
        logger.error(f"Validation error: {str(e)}")
        return {
            'statusCode': 400,
            'body': json.dumps({'error': str(e)})
        }

    except Exception as e:
        logger.error(f"Unexpected error: {str(e)}")
        return {
            'statusCode': 500,
            'body': json.dumps({'error': 'Internal server error'})
        }

It needed some tweaks for my purposes, but was still faster than typing it all out manually!

Step Functions Config

The Lambda: Invoke action defaults to using the state input as the payload, so "Payload": "{% $states.input %}" is scripted automatically:

JSON
    "Lambda Invoke": {
      "Type": "Task",
      "Resource": "arn:aws:states:::lambda:invoke",
      "Output": "{% $states.result.Payload %}",
      "Arguments": {
        "FunctionName": "[LAMBDA_ARN]:$LATEST",
        "Payload": "{% $states.input %}"
      },
      "Next": "Check API Status Code"
    }

This is going to be helpful in the next section!

Step Functions manages retries and error handling. If my Lambda function fails, it will retry up to three times with exponential backoff before sending a failure notification through SNS:

JSON
    "Lambda Invoke": {
      "Retry": [
        {
          "ErrorEquals": [
            "Lambda.ServiceException",
            "Lambda.AWSLambdaException",
            "Lambda.SdkClientException",
            "Lambda.TooManyRequestsException"
          ],
          "IntervalSeconds": 1,
          "MaxAttempts": 3,
          "BackoffRate": 2,
          "JitterStrategy": "FULL"
        }
      ],
      "Next": "Check API Status Code",
      "Catch": [
        {
          "ErrorEquals": [
            "States.ALL"
          ],
          "Next": "SNS Publish: Fail"
        }
      ]
    }

Next, let’s talk about the function’s outputs.

Outputs & JSONata Variables

The Lambda function returns a nested JSON structure. Here’s a redacted example of it:

JSON
{
  "output": {
    "ExecutedVersion": "$LATEST",
    "Payload": {
      "statusCode": 200,
      "body": {
        "make": "FORD",
        "yearOfManufacture": 2014,
        "engineCapacity": 1242,
        "co2Emissions": 120,
        "fuelType": "PETROL",
        "markedForExport": false,
        "colour": "GREY"
      }
    },
    "SdkHttpMetadata": {
      "AllHttpHeaders": {
        "REDACTED": "REDACTED"
      },
      "HttpHeaders": {
        "REDACTED": "REDACTED"
      },
      "HttpStatusCode": 200
    },
    "SdkResponseMetadata": {
      "REDACTED": "REDACTED"
    },
    "StatusCode": 200
  }
}

I mentioned earlier about Lambda: Invoke‘s default Payload setting. This default creates a {% $states.result.Payload %} JSONata expression output that I can use to assign variables for downstream states.

In this example, {% $states.result.Payload %} returns this:

JSON
{
  "Payload": {
      "statusCode": 200,
      "body": {
        "make": "FORD",
        "yearOfManufacture": 2014,
        "engineCapacity": 1242,
        "co2Emissions": 120,
        "fuelType": "PETROL",
        "markedForExport": false,
        "colour": "GREY"
      }
    }
}

Let’s make a variable for statusCode. In the response, statusCode is a property of Payload:

JSON
{
  "Payload": {
      "statusCode": 200
    }
}

In JSONata this is expressed as {% $states.result.Payload.statusCode %}. Then I can assign the JSONata expression to a statusCode variable via JSON. In the AWS console, I do this via:

JSON
{
  "statusCode": "{% $states.result.Payload.statusCode %}"
}

And in Step Functions ASL via:

JSON
"Assign": {"statusCode": "{% $states.result.Payload.statusCode %}"}

I can then call this variable using $statusCode. Here, this will return 200.

Next, let’s make a make variable. This is slightly more involved as make is a property of body, which is itself a property of Payload:

JSON
{
  "Payload": {
      "body": {
        "make": "FORD"
      }
    }
}

So this time I need:

JSON
CONSOLE:
"make": "{% $states.result.Payload.body.make%}"

ASL:
"Assign": {"make": "{% $states.result.Payload.body.make%}"}

And now $make will return "FORD".

So let’s do the other values:

JSON
"Assign": {
    "statusCode": "{% $states.result.Payload.statusCode %}",
    "make": "{% $states.result.Payload.body.make%}",
    "yearOfManufacture": "{% $string($states.result.Payload.body.yearOfManufacture) %}",
    "engineCapacity": "{% $string($states.result.Payload.body.engineCapacity) %}",
    "co2Emissions": "{% $string($states.result.Payload.body.co2Emissions) %}",
    "fuelType": "{% $states.result.Payload.body.fuelType %}",
    "markedForExport": "{% $states.result.Payload.body.markedForExport%}",
    "colour": "{% $states.result.Payload.body.colour%}"
}

Note that variables returning numbers from the response body like yearOfManufacture have an additional $string JSONata expression. I’ll explain the reason for this in the DynamoDB section.

Lambda Issues

When I first started using Step Functions variables, I used a different Lambda function for the API call and kept getting this error:

An error occurred.

The JSONata expression '$states.input.body.make' specified for the field 'Assign/make' returned nothing (undefined).

After getting myself confused, I checked the function’s return statement and found this:

Python
return {
    'statusCode': response.status_code,
    'body': response.text
}

Here, response.text returns the response body as a JSON-formatted string rather than as a nested dictionary:

Plaintext
{
  "statusCode": 200,
  "body": "{\"make\":\"FORD\",\"yearOfManufacture\":2014,\"engineCapacity\":1242,\"co2Emissions\":120,\"fuelType\":\"PETROL\",\"markedForExport\":false,\"colour\":\"GREY\"}"
}

That string isn’t compatible with dot notation. So while $states.input.body will match the whole body, $states.input.body.make can’t match anything because the string can’t be traversed. So nothing is returned, causing the error.

Using response.json() fixes this, as the response is now correctly structured for JSONata expressions:

Python
return {
    'statusCode': response.status_code,
    'body': response.json()
}

Choice State

The Choice state here is very similar to a previous one. This Choice state checks the Lambda function’s API response and routes accordingly.

Here, the Choice state uses the JSONata expression {% $statusCode = 200 %} to check the $statusCode variable value. By default, it will transition to the SNS Publish: Fail state. However, if $statusCode equals 200, then the Choice state will transition to the DynamoDB PutItem state instead:

JSON
    "Check API Status Code": {
      "Type": "Choice",
      "Choices": [
        {
          "Next": "DynamoDB PutItem",
          "Condition": "{% $statusCode = 200 %}"
        }
      ],
      "Default": "SNS Publish: Fail"
    }

This step prevents silent failures by ensuring unsuccessful API responses trigger an SNS notification instead of proceeding to DynamoDB. It also helps maintain data integrity by isolating success and failure paths, and ensuring only valid responses are saved in DynamoDB.

So now I’ve captured the data and confirmed its integrity. Next, let’s store it somewhere!

Amazon DynamoDB

It’s time to think about storing the API data. Enter DynamoDB! This section covers creating a table, writing data and integrating DynamoDB with AWS Step Functions and JSONata. I’ll share key lessons learned, especially about handling data types correctly.

Let’s start by creating a table.

Creating A Table

Before inserting data into DynamoDB, I need to create a table. Since DynamoDB is a schemaless database, all that is required to create a new table is a table name and a primary key. Naming the table is straightforward, so let’s focus on the key.

DynamoDB has two types of key:

  • Partition key (required): Part of the table’s primary key. It’s a hash value that is used to retrieve items from the table and allocate data across hosts for scalability and availability.
  • Sort key (optional): The second part of a table’s primary key. The sort key enables sorting or searching among all items sharing the same partition key.

Let’s look at an example using a Login table. In this table, the user ID serves as the partition key, while the login date acts as the sort key. This structure enables efficient lookups and sorting, allowing quick retrieval of a user’s login history while minimizing operational overhead.

To use a physical analogy, consider the DynamoDB table as a filing cabinet, the Partition key as a drawer, and the Sort key as a folder. If I wanted to retrieve User 123‘s logins for 2025, I would:

  • Access the Logins filing cabinet (DynamoDB table).
  • Find User 123’s drawer (Partition Key).
  • Get User 123’s 2025 folder (Sort Key).

DynamoDB provides many features beyond those discussed here. For the latest features, please refer to the Amazon DynamoDB Developer Guide.

Writing Data

So now I have a table, how do I put data in it?

DynamoDB offers several ways to write data, and a common one is PutItem. This lets me insert or replace an item in my table. Here’s a basic example of adding a login event to a UserLogins table:

JSON
{
    "TableName": "UserLogins",
    "Item": {
        "UserID": { "S": "123" },
        "LoginDate": { "S": "2025-02-25T12:00:00Z" },
        "Device": { "S": "Laptop" }
    }
}

Here:

  • TableName specifies the name of the DynamoDB table where the item will be stored.
  • Item represents the data being inserted into the table. It contains key-value pairs, where the attributes (e.g. UserID) are mapped to their corresponding data types (e.g. "S") and values (e.g. "123").
  • UserID is an attribute in the item being inserted.
  • "S" is a data type descriptor, ensuring that DynamoDB knows how to store and index it.
  • "123" is the value assigned to the UserID attribute.

While DynamoDB is NoSQL, it still enforces strict data types and naming rules to ensure consistency. These are detailed in the DynamoDB Developer Guide, but here’s a quick rundown of supported data types as of March 2025:

  • S – String
  • N – Number
  • B – Binary
  • BOOL – Boolean
  • NULL – Null
  • M – Map
  • L – List
  • SS – String Set
  • NS – Number Set
  • BS – Binary Set

Step Functions Config

So how do I apply this to Step Functions? Well, remember when I set variables in the output of the Lambda function? Step Functions lets me reference those variables here.

Here’s how I store a make attribute in DynamoDB, using my $make variable in a JSONata expression:

JSON
{
    "TableName": "REDACTED",
    "Item": {
        "make": { "S": "{% $make %}" }
    }
}

This is equivalent to:

JSON
{
    "TableName": "REDACTED",
    "Item": {
        "make": { "S": "FORD" }
    }
}

Using JSONata, I can dynamically inject values during execution instead of hardcoding them.

Now let’s add a yearOfManufacture attribute:

JSON
{
    "TableName": "REDACTED",
    "Item": {
        "make": { "S": "{% $make %}" },
        "yearOfManufacture": { "N": "{% $yearOfManufacture %}" }
    }
}

This pattern continues for my other attributes:

JSON
{
  "TableName": "REDACTED",
  "Item": {
    "make": {
      "S": "{% $make %}"
    },
    "yearOfManufacture": {
      "N": "{% $yearOfManufacture%}"
    },
    "engineCapacity": {
      "N": "{% $engineCapacity %}"
    },
    "co2Emissions": {
      "N": "{% $co2Emissions%}"
    },
    "fuelType": {
      "S": "{% $fuelType %}"
    },
    "markedForExport": {
      "BOOL": "{% $markedForExport %}"
    },
    "colour": {
      "S": "{% $colour %}"
    }
  }
}

All this is then passed as an Argument to the DynamoDB: PutItem action in the state machine’s ASL:

JSON
    "DynamoDB PutItem": {
      "Type": "Task",
      "Resource": "arn:aws:states:::dynamodb:putItem",
      "Arguments": {
        "TableName": "REDACTED",
        "Item": {
          "make": {
            "S": "{% $make %}"
          },
          "yearOfManufacture": {
            "N": "{% $yearOfManufacture%}"
          },
          "engineCapacity": {
            "N": "{% $engineCapacity %}"
          },
          "co2Emissions": {
            "N": "{% $co2Emissions%}"
          },
          "fuelType": {
            "S": "{% $fuelType %}"
          },
          "markedForExport": {
            "BOOL": "{% $markedForExport %}"
          },
          "colour": {
            "S": "{% $colour %}"
          }
        }
      }

Finally, DynamoDB:PutAction gets the same error handling as Lambda:Invoke.

So I got all this working first time, right? Well…

DynamoDB Issues

During my first attempts, I got this error:

An error occurred while executing the state 'DynamoDB PutItem'.

The Parameters '{"TableName":"REDACTED","Item":{"make":{"S":"FORD"},"yearOfManufacture":{"N":2014}}}' could not be used to start the Task:

[The value for the field 'N' must be a STRING]

Ok. Not the first time I’ve seen data type problems. I’ll just change the yearOfManufacture data type to "S"(string) and try again…

An error occurred while executing the state 'DynamoDB PutItem'.

The Parameters '{"TableName":"REDACTED","Item":{"make":{"S":"FORD"},"yearOfManufacture":{"S":2014}}}' could not be used to start the Task:

[The value for the field 'S' must be a STRING]

DynamoDB rejected both approaches (╯°□°)╯︵ ┻━┻

The issue wasn’t the data type, but how it was formatted. DynamoDB treats numbers as strings in its JSON-like structure, so even when using numbers they must be wrapped in quotes.

In the case of yearOfManufacture, where I was providing 2014:

Plaintext
"yearOfManufacture": {"N": 2014}

DynamoDB needed "2014":

Plaintext
"yearOfManufacture": {"N": "2014"}

Thankfully, JSONata came to the rescue again! Remember the $string function from the Lambda section? Well, $string casts the given argument to a string!

So this:

JSON
"yearOfManufacture": "{% $states.result.Payload.body.yearOfManufacture %}"

> 2014

Becomes this:

JSON
"yearOfManufacture": "{% $string($states.result.Payload.body.yearOfManufacture) %}"

> "2014"

This solved the problem with no Lambda function changes or additional states!

Amazon SNS

After successfully writing data to DynamoDB, I want to include a confirmation step by sending a notification through Amazon SNS.

While this approach is not recommended for high-volume use cases because of potential costs and notification fatigue, it can be helpful for testing, monitoring, and debugging. Additionally, it offers an opportunity to reuse variables from previous states and dynamically format a message using JSONata.

The goal is to send an email notification like this:

A 2014 GREY FORD has been added to DynamoDB on (current date and time)

To do this, I’ll use:

  • $yearOfManufacture for the vehicle’s year (2014)
  • $colour for the vehicle’s colour (GREY)
  • $make for the manufacturer (FORD)

Plus the JSONata $now() function for the current date and time. This generates a UTC timestamp in ISO 8601-compatible format and returns it as a string. E.g. "2025-02-25T19:12:59.152Z"

So the code will look something like:

A $yearOfManufacture $colour $make has been added to DynamoDB on $now()

Which translates to this JSONata expression:

Plaintext
{% 'A ' & $yearOfManufacture & ' ' & $colour & ' ' & $make & ' has been added to DynamoDB on ' & $now() %}

Let’s analyse each part of the JSONata expression to understand how it builds the final message:

Plaintext
{%

  'A '
& 
  $yearOfManufacture 
& 
  ' ' 
& 
  $colour 
& 
  ' ' 
& 
  $make 
& 
  ' has been added to DynamoDB on ' 
& 
  $now() 
  
%}"

Each part of this expression plays a specific role:

  • ‘A ‘ | ‘ has been added to DynamoDB on ‘: Static strings & spaces.
  • $yearOfManufacture | $colour | $make: Dynamic values.
  • $now(): JSONata function.
  • ‘ ‘: Static spaces to separate JSONata variable outputs.

The static spaces are important! Without them, I’d get this:

2014GREYFORD

Instead of the expected:

2014 GREY FORD

This JSONata expression is passed as the Message argument in the SNS:Publish action, ensuring the notification contains the correctly formatted message:

JSON
"Message": "{% 'A ' & $yearOfManufacture & ' ' & $colour & ' ' & $make & ' has been added to DynamoDB on ' & $now() %}"

Finally, to integrate this with Step Functions it is included in the SNS Publish: Success task ASL:

JSON
"SNS Publish: Success": {
    "Type": "Task",
    "Resource": "arn:aws:states:::sns:publish",
    "Arguments": {
      "Message": "{% 'A ' & $yearOfManufacture & ' ' & $colour & ' ' & $make & ' has been added to DynamoDB on ' & $now() %}",
      "TopicArn": "arn:aws:sns:REDACTED:success-stepfunction"
}

Final Workflow

Finally, let’s see what the workflows look like. Here’s the workflow graph:

stepfunctions graph

And here’s the workflow ASL on GitHub.

Testing

In this section, I run some test executions against my simplified Step Functions workflow and check the variables. I’ll test four requests – two valid and two invalid.

Valid Request: Ford

Firstly, what happens when a valid API request is made and everything works as expected?

The Step Functions execution succeeds:

stepfunctions graph testsuccess

Each state completes successfully:

2025 02 26 StateViewSuccess

My DynamoDB table now contains one item:

2025 02 26 DyDBTable1

I receive a confirmation email from SNS:

2025 02 26 SNSSuccessFord

If I send the same request again, the existing DynamoDB item is overwritten because the primary key remains the same.

Valid Request: Audi

Next, what happens if I make a valid request for a different car? The steps repeat as above, and my DynamoDB table now has two items:

2025 02 26 DyDBTable2

And I get a different email:

2025 02 26 SNSSuccessAudi

Invalid Request

Next, what happens if the car in my request doesn’t exist? Well, it does fail, but in an unexpected way:

stepfunctions graphfail

The API returns an error response:

JSON
"Payload": {
      "statusCode": 500,
      "body": "{\"error\": \"API request failed: 400 Client Error: Bad Request for url"}"
    }

I’d expected the response to be passed to the Choice state, which would then notice the 500 status code and start the Fail process. But this happened instead:

2025 02 26 StateViewFail

The failure occurs at the assignment of the Lambda action variable! It attempts to assign a yearOfManufacture value from the API response body to a variable, but since there is no response body the assignment fails:

JSON
{
  "cause": "An error occurred while executing the state 'Lambda Invoke' (entered at the event id #2). The JSONata expression '$states.result.Payload.body.yearOfManufacture ' specified for the field 'Assign/yearOfManufacture ' returned nothing (undefined).",
  "error": "States.QueryEvaluationError",
  "location": "Assign/registrationNumber",
  "state": "Lambda Invoke"
}

I also get an email, but this one is less fancy as it just dumps the whole output:

2025 02 26 SNSFail

So I still get my Fail outcome – just not in the expected way. Despite this, the Choice state remains valuable for preventing invalid data from entering DynamoDB.

No Request

Finally, what happens if no data is passed to the state machine at all?

Actually, this situation is very similar to the invalid request! There’s a different error message in the log:

JSON
"Payload": {
      "statusCode": 400,
      "body": "{\"error\": \"Registration number not provided\"}"
    }

But otherwise it’s the same events and outcome. The Lambda variable assignment fails, triggering an SNS email and an ExecutionFailed result.

Cost Analysis

This section examines the costs of my simplified Step Functions variables workflow. This section is brief since all services used in this workflow fall within the AWS Free Tier! For transparency, I’ll include my billing metrics for the month. These are account-wide, and I’m still nowhere near paying AWS anything!

DynamoDB:

$0.1415 per million read request units (EU (Ireland))30.5 ReadRequestUnits
$0.705 per million write request units (EU (Ireland))13 WriteRequestUnits

Lambda:

AWS Lambda – Compute Free Tier – 400,000 GB-Seconds – EU (Ireland)76.219 Second
AWS Lambda – Requests Free Tier – 1,000,000 Requests – EU (Ireland)110 Request

SNS:

First 1,000 Amazon SNS Email/Email-JSON Notifications per month are free19 Notifications
First 1,000,000 Amazon SNS API Requests per month are free289 Requests

Step Functions:

$0 for first 4,000 state transitions431 StateTransitions

This experiment demonstrates how cost-effective Step Functions can be. As long as my usage remains within the Free Tier, I pay nothing! If my workflow grows, I’ll monitor costs and optimise accordingly.

Summary

In this post, I used AWS Step Functions variables and JSONata to create a simplified API data capture workflow with Lambda and DynamoDB.

With a background in SQL and Python, I’m no stranger to variables, and I love that they’re now a native part of Step Functions. AWS keeps enhancing Step Functions every few months, making it more powerful and versatile. The introduction of variables unlocks new possibilities for data manipulation, serverless applications and event-driven workflows, and I’m excited to explore them further in the coming months!

For a visual walkthrough of Step Functions variables and JSONata, check out this Serverless Office Hours episode with AWS Principal Developer Advocates Eric Johnson and Julian Wood:

If this post has been useful then the button below has links for contact, socials, projects and sessions:

SharkLinkButton 1

Thanks for reading ~~^~~

Categories
Data & Analytics

Silver Layer Python ETL With The AWS Glue ETL Job Script Editor

In this post, I create my WordPress data pipeline’s Silver ETL process using Python and the AWS Glue ETL Job Script Editor.

Table of Contents

Introduction

Last time I worked on my WordPress AWS data pipeline, I produced my Bronze layer data and created a Glue Crawler to derive the schema of the Bronze S3 objects. It’s now time to start cleaning that data to prepare it for reporting, aggregation and consumption.

I’m also currently studying for the AWS Certified Data Engineer – Associate certification. While revising for this I learned the capabilities of the AWS Glue ETL Job Script Editor, and it seemed an ideal fit for my Silver ETL process. So I decided to make a post out of it and see how things went!

Firstly, I’ll examine the AWS Glue ETL Job Script Editor and how it will benefit my Silver ETL process. Then I’ll define the architecture of the Silver ETL job and how it fits into the existing data pipeline. Next, I’ll script and test the job. Finally, I’ll integrate it into the pipeline and explore the job’s costs.

Glue ETL Job Script Editor

This section examines the AWS Glue ETL Script Editor and Python Shell and considers some of Python Shell’s benefits and limitations.

Script Editor & Python Shell

Script Editor is a feature of AWS Glue. It offers serverless Spark, Ray and Python shells, enabling data transformation, preparation and cleaning with no infrastructure management. Scripts can be both uploaded and created from scratch, and version control is configurable to several Git services.

This post focuses on AWS Glue Python Shell. Introduced in 2019, Python Shell jobs suit small to medium-sized tasks as part of an ETL workflow.

Python Shell Pros

This section examines some of Python Shell’s benefits.

Low Cost

Python Shell jobs are the cheapest of the Glue job types to run. Glue charges are based on data processing units (DPUs). A single standard DPU currently provides 4 vCPU and 16 GB of memory. While regular Glue ETL jobs using Apache Spark need at least 2 DPUs, Python Shell jobs default to using only 1/16 (or 0.0625) DPU!

This can also be extended to 1 DPU, resulting in faster completion times. Like AWS Lambda, charges accrue based on resource usage and duration. So increased resource allocation can potentially create further savings.

This section was correct as of August 2024 – the latest pricing data is on the AWS Glue pricing site.

Low Barrier To Entry

Python Shell jobs offer accessibility for those from a scripting background. When creating a new script in the console, users only need to choose the engine (in this case Python) and whether the script is being uploaded or created fresh. And that’s it! No configuring interpreters, environments or dependencies.

Python Shell jobs also integrate with other AWS services. They can easily connect to data sources like S3, RDS and DynamoDB. They can be automated with Glue Workflows and Triggers. IAM can also control access to both the Python Shell job and the AWS services it interacts with.

Included Python Libraries

AWS Glue Python Shell includes a variety of built-in Python libraries that are useful for ETL tasks. These libraries cover a range of functionalities such as data processing, machine learning, and interacting with AWS services.

They include:

This AWS post has a full table of included libraries and their versions. Additional libraries can be installed and imported using PIP.

Some people will quickly see issues with this list though…

Python Shell Cons

This section examines some of Python Shell’s limitations.

Outdated Python Versions & Libraries

While the included libraries are welcome, they are also quite outdated. For example, boto3‘s included version is 1.21.21 while the current version is 1.34.150. pandas is at 1.4.2 in the table and 2.2.2 online.

This is likely due to the supported Python versions – currently Python 3.6 and Python 3.9. Now, while Python 3.9 isn’t out of support until October 2025, it was released back in October 2020 and has had three major upgrades since. Worse, Python 3.6 ended life at Christmas 2021!

With the Data Engineer Associate certification drawing attention to various AWS data services, it’s a shame that this feature is so far behind. This would be a great modernisation tool for importing legacy Python scripts into Glue, but the last feature update was in 2022 and it’s really starting to lag behind now.

No Visual Editor

Yes I know it’s a script editor but hear me out.

Let’s briefly segue to AWS IAM. In the early days, updating IAM policies had the potential of losing afternoons to missing braces or errant commas. There was no native AWS validation tooling and the whole thing felt like a dark art for those less experienced.

Then AWS released an IAM visual policy editor. And things went from this:

2024 07 30 IAMPolicyJSON

To this!

2024 07 30 IAMPolicyDown

This transformed the IAM policy-writing process. The guesswork was gone – new policies could be written using dropdowns and checkboxes. And AWS would generate the same code each time, in the same way and to the same standard.

In today’s AWS console, IAM can be administrated both visually and as JSON. Updates made in the visual editor reflect in the code in real-time, and vice versa. And the IAM IDE immediately flags syntax issues, unclosed keypairs and whatnot.

This interface would work so well with Glue Script Editor. It would simplify and encourage using Script Editor, creating standardised code by default and reducing development time. No more syntax violations, verbose comments or missing dependencies – AWS could handle all that.

This doesn’t even need AI – it would just be procedural code generation. Something like selecting awswrangler from a dropdown list, then selecting an S3 location to read or write and a file type to expect. Or even a list of code snippets for the included libraries. These features could all lighten the dev load.

Limited IDE

Let’s consider AWS Lambda’s IDE:

2024 07 30 LambdaIDE

Its benefits include:

  • Code autocompletion
  • Integrated testing
  • Integrated monitoring

And tons of other user-focused functionality. Conversely, this is the Glue Script Editor IDE:

2024 07 30 GlueIDE

Hmm.

Now don’t get me wrong – I’m not asking for Lambda Lite. But something a bit more than Notepad would be nice. AWS are currently making a massive deal of Amazon CodeWhisperer and Amazon Q Developer‘s autocomplete actions, but here pandas isn’t even suggested when I type import pan. And it’s an included library!

The obvious solution is to just use Lambda. But Glue Script Editor offers a sweet spot where it runs custom Python while operating entirely within the AWS Glue service. This is helpful for features like Glue Triggers and Workflows that can’t currently trigger Lambda functions. It’s also helpful with AWS Organisations, where using Glue Script Editor for Python ETL can enable SCPs that entirely block access to AWS Lambda for data-centric accounts.

So Why Use It?

So are Glue Python Shell jobs worth considering with these limitations? Definately! There are several use cases favouring them:

  • Legacy ETL jobs that either can’t use recent Python versions and libraries, or simply don’t need them.
  • Simple, lightweight tasks that don’t require the more advanced (and expensive) features of Apache Spark or Ray.
  • Tasks that need to run quickly, as Python Shells have faster startup times than the Spark environments used by regular Glue ETL jobs.
  • Long-running ETL tasks unsuitable for AWS Lambda, as Python Shell jobs can run for up to 48 hours compared to Lambda’s 15 minutes. Thanks to Yan Cui‘s blog for that one!

For my requirements, a Python Shell job makes sense because I’m doing simple transformations on small volumes of data.

Architecture

This section examines the architecture of my proposed solution. Much of this architecture is similar to the Bronze layer. I’ll examine the new Silver ELT job, followed by the updated data pipeline Step Function workflow.

Glue Silver ETL Job

Firstly, this is the Glue Silver ETL job:

Amazon S3
Bronze Bucket
Amazon S3…
Amazon S3
Silver Bucket
Amazon S3…
AWS Glue
Silver ETL Job
AWS Glue…
Amazon CloudWatch
Logs
Amazon CloudWatch…
1
1
2
2
AWS Cloud
AWS Cloud
Text is not SVG – cannot display

While updating CloudWatch Logs throughout:

  1. Silver Glue ETL job extracts data from wordpress-api Bronze S3 objects and performs Python transformations.
  2. Silver Glue ETL job loads the transformed data into Silver S3 bucket as Parquet objects.

Step Function Workflow

Next, the updated Step Function workflow:

AWS Cloud
AWS Cloud
EventBridge
Schedule
EventBridge…
AWS Step Functions workflow
AWS Step Functions workflow
3
3
AWS Lambda Raw Function
AWS Lambda Ra…
AWS SNS Topic
AWS SNS Topic
2
2
State
Machine
State…
AWS Lambda Bronze Function
AWS Lambda Br…
F
F
5
5
AWS Glue
Bronze Crawler
AWS Glue…
4
4
AWS Glue
Silver ETL Job
AWS Glue…
F
F
F
F
1
1
F
F
EventBridge
Scheduler
EventBridge…
AWS SNS Topic
AWS SNS Topic
User
User
CloudWatch Logs
CloudWatch Lo…
F
F
F
F
Text is not SVG – cannot display

While updating the workflow’s CloudWatch Log Group throughout:

  1. An EventBridge Schedule executes the Step Functions workflow.
  2. Raw Lambda function is invoked.
    • Invocation Fails: Publish SNS message. Workflow ends.
    • Invocation Succeeds: Invoke Bronze Lambda function.
  3. Bronze Lambda function is invoked.
    • Invocation Fails: Publish SNS message. Workflow ends.
    • Invocation Succeeds: Run Glue Crawler.
  4. Glue Crawler runs.
    • Run Fails: Publish SNS message. Workflow ends.
    • Run Succeeds: Update Glue Data Catalog. Run Glue Silver ETL job.
  5. Glue Silver ETL job runs.
    • Run Fails: Publish SNS message. Workflow ends.
    • Run Succeeds: Workflow ends.

An SNS message is published if the Step Functions workflow fails.

Silver ETL Job

In this section, I create the Silver ETL Python script for the AWS Glue Script Editor. Firstly I’ll define the script’s requirements. Next, I’ll translate them into Python code, and finally I’ll create the ETL script and upload it to Git.

Requirements

Firstly, let’s define the requirements for this data pipeline layer. So what does a typical Silver ETL process involve?

Databricks defines the Silver layer as cleansed and conformed data:

In the Silver layer of the lakehouse, the data from the Bronze layer is matched, merged, conformed and cleansed (“just-enough”) so that the Silver layer can provide an “Enterprise view” of all its key business entities, concepts and transactions. (e.g. master customers, stores, non-duplicated transactions and cross-reference tables).

https://www.databricks.com/glossary/medallion-architecture

Because my data source is a WordPress MySQL database, most of the cleansing and conforming work I’d expect to do has already been done there! That said, there’s data that I definitely won’t need, as well as other transformations I can apply to help downstream reporting.

Some of the following transformations can be done at the SQL reporting level with date and string functions. However, these add repetitive load and complexity to queries, which can be avoided by some cleaning transformations. Roche’s Maxim of Data Transformation applies here:

Data should be transformed as far upstream as possible, and as far downstream as necessary.

https://ssbipolar.com/2021/05/31/roches-maxim/

The Silver layer transformations I’m doing here are:

Column Removal

Many columns are empty or unneeded, so now is the time to remove them. This will reduce the data held in the Silver objects, making them cheaper to store and faster to query.

My script uses the pandas.DataFrame.drop function to remove columns by specifying column names. Here, a term_order column is dropped from the DataFrame df:

Python
df = df.drop(columns=['term_order'])

Date Splitting

Dates are tough to analyse and don’t aggregate well, as each date is effectively three different data points in one field. Splitting dates into years, months and days improves data bucketing, query granularity and time series analytics.

My script uses the pandas to_datetime function to convert scalar, array-like, Series or DataFrame/dict-like objects to pandas datetime objects.

Here, values in the date column of the DataFrame df are converted from strings to datetime objects and stored in a new date_todate column. Next, the year attribute of each date_todate column object is extracted and stored in a new date_year column. Finally, the same happens for month and day attributes:

Python
df['date_todate'] = pd.to_datetime(df['date'])

df['date_year'] = df['date_todate'].dt.year
df['date_month'] = df['date_todate'].dt.month
df['date_day'] = df['date_todate'].dt.day

String Editing

Some columns use HTML character entity names for reserved characters. For example, & in place of &. This is great for rendering HTML but not great for analytics.

My script uses the str.replace string method to return a copy of each string with all occurrences of the specified substring replaced by a new one. Here, all instances of & amp; in the name column are overwritten with &:

Python
df['name'] = df['name'].str.replace('& amp;','&')

So that’s the transformations. What else is the script doing?

Python Script

Most of the Silver script processes are similar to the Bronze script ones, including:

  • Logging
  • Getting parameters
  • Accessing S3 objects

So most functionality is reused from my Bronze Lambda function, which is fully documented in this post. To summarise the imports:

Python
import logging                          # Logging
import boto3                            # AWS Interactions
import botocore                         # AWS Exceptions
import awswrangler as wr                # S3 Interactions
import pandas as pd                     # Data Manipulation
from botocore.client import BaseClient  # AWS Type Hints

Some changes have been made for the Silver script:

  • Parameters, object names and logs have been updated from Bronze to Silver:
Python
parametername_snstopic: str = '/sns/data/lakehouse/silver'

logging.info("Getting S3 Silver parameter...")

s3_bucket_silver = get_parameter_from_ssm(client_ssm, parametername_s3bucket_silver)
  • New functionality identifies the AWS AccountID the script is running in:
Python
# Get & display AWS AccountID
identity = client_sts.get_caller_identity()
account_id = identity['Account']
logging.info(f"Starting in AWS Account ID {account_id}")

This is more of a sanity check for me – I have several AWS accounts and want to check I’ve accessed the right one!

  • A test that stops the current loop interaction if the object name doesn’t match one of the expected ones:
Python
# Check if object is mapped and bypass if not.
if object_name not in {'posts', 'statistics_pages', 'term_relationship', 'term_taxonomy', 'terms'}:

logging.warning(f'{object_name} is not currently mapped.  Skipping transform...')

object_count_failure += 1
continue

Finally, I wrote a new function for my Silver transformation logic. This isn’t included here (although it is in my repo) because it’s long. Very long! My first thought was to decouple the ETL processes from each other and write separate scripts for each object. So 5 in total.

However, Python Shell jobs are billed per second with a 1-minute minimum. So 5 jobs = 5 minutes billed. But the job only takes around 60 seconds to process all five objects! I’d have run up 5 times the usage and 5 times the cost for no real benefit.

The full script is in my Github repo.

Testing was quick because it was effectively repeating the Bronze script tests with new parameters. After successfully testing the script locally, it’s time to get it working in AWS!

Uploading & Testing

In this section I upload my Silver ETL script, integrate it with AWS Glue Script Editor and AWS Step Functions and test everything works as expected.

Creating The Python Shell Job

Firstly, let’s get my script into AWS Glue. There are several ways of doing this. If the script is uploaded to S3 then AWS can create a Glue ETL job with the AWS CLI create-job command:

Bash

 aws glue create-job --name python-job-cli --role Glue_DefaultRole 
     --command '{"Name" :  "pythonshell", "PythonVersion": "3.9", "ScriptLocation" : "s3://DOC-EXAMPLE-BUCKET/scriptname.py"}'  
     --max-capacity 0.0625

And with the AWS CloudFormation AWS::Glue::Job resource:

YAML
AWSTemplateFormatVersion: 2010-09-09
Resources:
  Python39Job:
    Type: 'AWS::Glue::Job'
    Properties:
      Command:
        Name: pythonshell
        PythonVersion: '3.9'
        ScriptLocation: 's3://DOC-EXAMPLE-BUCKET/scriptname.py'
      MaxRetries: 0
      Name: python-39-job
      Role: RoleName           
        

Scripts can also be pulled from Git repositories. Here I’ll create my Silver ETL job in the Glue Script Editor console. This creates a new Python script in an S3 bucket location of s3://aws-glue-assets-[AWSAccountID]-[Region]/scripts/.

Next, the new job needs an IAM role with appropriate permissions for the AWS services the script interacts with. Other parameters, including maximum DPU, job timeout value and Python version, can also be set. In addition, Glue Data Quality checks are also supported. And, once saved, the Glue job can have a schedule applied.

Testing Job Execution

AWS Glue records data for each job execution and publishes extensive details and logs:

2024 08 09 AWSGlueJobRun

Glue stores details about the job and Python environment, and logs are published and stored in Amazon Cloudwatch.

And so begins the testing! Initially, I was getting one of my own Python boto3 exceptions:

ValueError: No SNS topic returned.

Easy to fix. This IAM policy was based on the same one that my Bronze Lambda function uses. But the Silver ETL script uses different AWS resources so some IAM policy ARNs need to change. Specifically, the Silver ETL job’s IAM role needs to allow:

  • ssm:GetParameter on the required Parameter Store parameters.
  • sns:Publish on the required SNS topics.
  • s3:GetObject on the data-lakehouse-silver/wordpress_api/* objects.

With these changes, the Silver ETL job runs perfectly and creates new objects in the Silver S3 bucket:

2024 08 09 MonitoringTimeline

With the Glue job running and S3 object creation verified successfully, it’s time to validate the data.

Data Integration & Validation

Validating the data involves two processes:

  • Integrating the data into the Glue Data Catalog.
  • Querying the data with Amazon Athena.

There are several ways to update the Glue Data Catalog, and here I’ll create a new Glue Crawler using a similar setup to my Bronze Crawler. This time the crawler is reading objects from the Silver S3 bucket instead of the Bronze one, and the new Glue Data Catalog tables are prefixed with silver- instead of bronze-.

The Silver crawler creates these new tables in the Glue Data Catalog’s wordpress_api database:

2024 08 06 GlueDataCatalog

This gives Athena visibility of the tables, enabling data validation via SQL query execution. Querying wordpress_api.silver-terms shows the removed column and updated strings:

2024 08 06 AthenaSilverTerms

And querying wordpress_api.silver-statistics_pages shows the split dates:

2024 08 06 AthenaSilverStatistics pages

Looks good! Now that everything has been validated, let’s add these steps to the WordPress Data Pipeline.

Step Function Update

The WordPress Data Pipeline Step Function workflow that I started back in March continues to grow. There’s a new job and a second crawler to add to it now!

The Silver crawler is added in the same way as the Bronze one (including the IAM changes) so let’s focus on adding the new Glue Python Shell ETL job.

Adding Glue ETL jobs to a Step Function workflow is well documented The task uses the StartJobRun Glue API action under the hood and has an optimized integration that enables the .sync integration pattern. Enabling this means the Step Functions workflow waits for the StartJobRun request to complete before progressing to the next state.

However, my workflow currently lacks IAM permissions to run the Silver Glue ETL job. So I make a new IAM policy that allows the glue:StartJobRun action on the Silver Glue ETL job and attach it to the workflow’s IAM role:

JSON
{
	"Version": "2012-10-17",
	"Statement": [
		{
			"Sid": "VisualEditor0",
			"Effect": "Allow",
			"Action": [
				"glue:StartJobRun"
			],
			"Resource": "arn:aws:glue:eu-west-1:[REDACTED]:job/wordpressapi_silver"
		}
	]
}

My Step Function workflow now looks like this:

2024 08 09 stepfunctions graph

Let’s execute the Step Function workflow and check it works.

Step Function Test

Upon execution, everything works as intended. The new StartGlueJob action is triggered and the Glue ETL job is successful:

2024 08 06 GlueJobDetails

But the Step Function doesn’t transition to the next step. In fact it continued running to the point I had to stop it myself after several minutes:

2024 08 06 StepFunctionsStop

So what’s going on? I asked Amazon Q about this behaviour, and in its response were the following points:

  1. Step Functions uses a “sync” integration with AWS Glue, which means it relies on polling the status of the Glue job using the GetJobRun API call.
  2. The polling schedule is designed to be once per minute for the first 10 minutes, and then every 5 minutes thereafter. This is to avoid excessive API calls to Glue.
Amazon Q

Q also linked to this AWS repost answer with further details:

This is an expected behavior in case of .sync integration with AWS Glue. Service integrations that use the .sync pattern require additional IAM permissions where Step Functions will make use of a managed Eventbridge rule to monitor the status of the job. However, AWS Glue does not support Eventbridge integration and thus, Step Functions polls the job status using the GetJobRun API call to fetch the status of the job.

https://repost.aws/questions/QUFFlHcbvIQFe-bS3RAi7TWA/a-glue-job-in-a-step-function-is-taking-so-long-to-continue-the-next-step

This made things clearer. When Step Functions starts a Glue ETL job using a StartGlueJob action with optimized integration, Step Functions determines that job’s status (and thus when to transition to the next action) by calling Glue’s GetJobRun API.

However, my workflow’s IAM role doesn’t have permission to do that! And because Step Functions can’t determine the ETL job’s status, it doesn’t know that the job has finished and the next state transition never happens! Everything stops!

This is resolved by adding the glue:GetJobRun action to the workflow’s IAM policy:

JSON
{
	"Version": "2012-10-17",
	"Statement": [
		{
			"Sid": "VisualEditor0",
			"Effect": "Allow",
			"Action": [
				"glue:StartJobRun",
				"glue:GetJobRun"
			],
			"Resource": "arn:aws:glue:eu-west-1:REDACTED:job/wordpressapi_silver"
		}
	]
}

This time, the Glue GetJobRun API calls are successful. The Step Functions workflow validates that the ETL job has finished, moves to the next state as intended and ultimately completes successfully:

2024 08 06 ExecutionSuccessFull

Thanks Amazon Q!

Costs

Finally, let’s look at the costs for my Glue Script Editor Silver ETL Job resources.

This graph shows all Glue API costs between 2024-07-31 (first AWS job execution) and 2024-09-09:

2024 08 09 CostExplorerGlue

Of the $0.38:

  • $0.37 is the CrawlerRun API for the two Glue Crawlers I’m running.
  • $0.01 is the Jobrun API for the 15 job runs between 2024-07-31 and 2024-09-09.

So all things considered, very manageable!

Summary

In this post, I created my WordPress data pipeline’s Silver ETL process using Python and the AWS Glue ETL Job Script Editor.

I found the Script Editor jobs very useful. They offer Lambda’s benefits of scalability, managed infrastructure and integration with other AWS services, combined with data-centric libraries and features that make it easier to hit the ground running development-wise. It has clear limitations and could do with some AWS TLC, but it was a good fit here and rivals Lambda for some future ETL processes I have planned.

If this post has been useful then the button below has links for contact, socials, projects and sessions:

SharkLinkButton 1

Thanks for reading ~~^~~