Categories
Data & Analytics

Python Data Validation And Observability As Code With Pydantic

In this post, I use the Pydantic Python library to create data validation and observability processes for my Project Wolfie iTunes data.

Table of Contents

Introduction

Data validation is a crucial component of any data project. It ensures that data is accurate, consistent and reliable. It verifies that data meets set criteria and rules to maintain its quality, and stops erroneous or unreliable information from entering downstream systems. I’ve written about it, scripted it and talked about it.

Validation will be a crucial aspect of Project Wolfie. It is an ongoing process that should occur from data ingestion to exposure, and should be automated wherever possible. Thankfully, most data processes within Project Wolfie are (and will be) built using Python, which provides several libraries to simplify data validation. These include Pandera, Great Expectations and the focus of this post – Pydantic (specifically, version 2).

Firstly, I’ll explore the purpose and benefits of Pydantic. Next, I’ll import some iTunes data and use it to explore key Pydantic validation concepts. Finally, I’ll explore how Pydantic handles observability and test its findings. The complete code will be in a GitHub repo.

Let’s begin!

Introducing Pydantic

This section introduces Pydantic and examines some of its benefits.

About Pydantic

Pydantic is an open-source data validation Python library. It uses established Python notation and constructs to define data structures, types and constraints. These can then validate the provided data, generating clear error messages when issues occur.

Pydantic is a widely used tool for managing application settings, validating API requests and responses, and streamlining data transfer between Python objects and formats like JSON. By integrating both existing and custom elements, it offers a powerful and Pythonic method for ensuring data quality and consistency within projects. This makes data handling in Python more reliable and reduces the likelihood of errors through its intuitive definition and validation processes.

Pydantic Benefits

Pydantic’s benefits are thoroughly documented, and the ones I want to highlight here are:

Intuitive: Pydantic’s use of type hints, functions and classes fits well with my current Python skill level, so I can focus on learning Pydantic without also having to explore unfamiliar Python concepts.

Fast: Pydantic’s core validation logic is written in Rust, which enables rapid development, testing, and validation. This speed has contributed towards…

Well-Supported: Pydantic has extensive community use and support from organisations like Anthropic, Netflix and OpenAI, as well as popular Python libraries like Airflow, FastAPI and LangChain. It also has extensive AWS Lambda support via user-configurable artefacts and the community-managed Powertools for AWS Lambda (Python)‘s Parser utility.

Preparation

Before I can start using Pydantic, I need some data. This section examines the data I am using and how I prepare it for Pydantic.

iTunes Data

Firstly, let’s extract some data from iTunes. I create iTunes Export files using the iTunes > Export Playlist command. Apple has documented this, but WikiHow’s documentation is more illustrative. The export file type choices are…interesting. The one closest to matching my needs is the txt format, although the files are technically tab-separated files (TSVs).

iTunes Exports contain many metadata columns. I’m not including them all here (after all, this is a Pydantic post not an iTunes one), but I will be using the following subset (using my existing metadata definitions):

Metadata TypeColumn NameData TypePurpose
TechnicalAlbumStringTrack key as Camelot Notation*
TechnicalLocationStringTrack file path
TechnicalTrack NumberIntegerTrack BPM*
DescriptiveArtistStringTrack artist(s)
DescriptiveGenreStringTrack genre
DescriptiveNameStringTrack name and mix
DescriptiveWorkStringPublishing record label
DescriptiveYearIntegerTrack release year
InteractionMy RatingIntegerTrack personal rating

Note that the starred Album and Track Number columns have purposes that differ from the column names. The reasons for this are…not ideal.

  • Track Number contains BPM data as, although iTunes does have a BPM column, it isn’t included in the exports. And the exports can’t be customised! To include BPMs in an export, I had to repurpose an existing column.

Great. But that’s not as bad as…

  • Album contains musical keys, as iTunes doesn’t even have a key column, despite MP3s having a native Initial Key metadata field! Approaches to dealing with this vary – I chose to use another donor column. I’ll explain Camelot Notations later on.

That’s enough about the iTunes data for now – I’ll go into more detail in future Project Wolfie posts. Now let’s focus on getting this data into memory for Python.

Data Capture

Next, let’s get the iTunes data into memory. Starting with a familiar library…

pandas

I’ll be using pandas to ingest the iTunes data. This is a well-established and widely supported module. It also has its own data validation functions and will assist with issues like handling spaces in column names.

While iTunes files aren’t CSVs, the pandas read_csv function can still read their data into a DataFrame. It needs some help though – the delimiter parameter must be \t to identify the tabs’ delimiting status.

So let’s read the iTunes metadata into memory and…

Python
df = pd.read_csv(csv_path, delimiter='\t')

>> UnicodeDecodeError: 'utf-8' codec can't decode byte 0xff in position 0: invalid start byte

Oh. pandas can’t read the file. The error says it’s trying the utf-8 codec, so the export must be using something else. Fortunately, there’s another Python library that can help!

charset_normalizer

charset_normalizer is an open-source encoding detector. It determines the encoding of a file or text and records the result. It’s related to the older chardet library but is faster, has a more permissive MIT license and supports more encodings.

Here, I’m using charset_normalizer.detect in a detect_file_encoding function to detect the export’s codec:

Python
def detect_file_encoding(file_path: Path) -> str:
    with open(file_path, 'rb') as file:
        raw_data = file.read()
        
    detection_result = charset_normalizer.detect(raw_data)
    return detection_result['encoding'] or 'utf-8'

In which:

  • I define a detect_file_encoding function that expects a filepath and returns a string.
  • detect_file_encoding opens the file, reads the data and stores it as raw_data.
  • charset_normalizer detects raw_data‘s codec and stores this as detection_result.
  • detect_file_encoding returns either the successfully detected codec, or the common utf-8 codec if the attempt fails.

I can then pass the export’s filepath to the detect_file_encoding function, capture the results as encoding and pass this as a parameter to pandas.read_csv:

Python
encoding = detect_file_encoding(csv_path)
    
df = pd.read_csv(csv_path, encoding=encoding, delimiter='\t')

>> Loaded 4407 rows

There’s one more action to take before moving on. Some columns contain spaces. This will become a problem as spaces are not allowed in Python identifiers!

As the data is now in a pandas DataFrame, I can use pandas.DataFrame.rename to remove these spaces:

Python
df = df.rename(columns={
        'Track Number': 'TrackNumber',
        'My Rating': 'MyRating'
    })

The metadata is now ready for Pydantic.

Installing Pydantic

Finally, let’s install Pydantic. This process is fully documented. My preferred method is via pip install in a local virtual environment:

Python
pip install pydantic

And then importing Pydantic into my script:

Python
import pydantic

Now I can start using Pydantic.

Pydantic Data Models

In this section, I tell Pydantic about my data model and the types of data it should expect for validation.

Introducing BaseModel

At the core of Pydantic is the BaseModel class – used for defining data models. Every Pydantic model inherits from it, and by doing so gains features like type enforcement, automatic data parsing and built-in validation.

By subclassing BaseModel, a schema for the data is defined using standard Python type hints. Pydantic uses these hints to validate and convert input data automatically.

Let’s explore BaseModel by creating a new Track class.

Creating A Track Class

Pydantic supports standard library types like string and integer. This reduces Pydantic’s learning curve and simplifies integration into existing Python processes.

Here are the very beginnings of my Track data model. I have a new Track class inheriting from Pydantic’s BaseModel, and a Name field with string data type:

Python
class Track(BaseModel):
    Name: str

Next, I add a Year field with integer data type:

Python
class Track(BaseModel):
    Name: str
    Year: int

And so on for each field I want to validate with Pydantic:

Python
class Track(BaseModel):
    Name: str
    Artist: str
    Album: str
    Work: str
    Genre: str
    TrackNumber: int
    Year: int
    MyRating: int
    Location: str

Now, if any field is missing or has the wrong type, Pydantic will raise a ValidationError. But there’s far more to Pydantic data types than this…

Defining Special Data Types

Where no standards exist or where validation rules are more complex to determine, Pydantic offers further type coverage. These include:

One of my Track fields will immediately benefit from this:

Python
class Track(BaseModel):
    Location: str

Currently, my Location field validation is highly permissive. It will accept any string. I can improve this using Pydantic’s FilePath data type:

Python
class Track(BaseModel):
    Location: FilePath

Now, Pydantic will check that the given location is a path that exists and links to a valid file. No custom code; no for loops – the FilePath type handles everything for me.

So I now have data type validation in my Pydantic data model. What else can I have?

Pydantic Built-In Validation

This section explores the native data validation features of Pydantic, including field annotation and constraints.

Introducing Field

In Pydantic models, data attributes are typically defined using Python type hints. The Field function enables further customisation like constraints, schema metadata and default values.

While type hints define what kind of data is allowed, Field defines how that data should behave, what happens if it’s missing and how it should be documented. It adds clarity to models and helps Pydantic enforce stricter rules.

Let’s run through some examples.

Custom Schema Metadata

One of the challenges in creating data pipelines is that the data fields can sometimes be unclear or difficult to explain. This can cause confusion and delay when building ETLs, examining repos and interacting with code.

Field helps here by adding custom fields to annotate data within Pydantic classes. Examples include description:

Python
class Track(BaseModel):
    Name: str = Field(
        description="Track's name and mix.")

And examples:

Python
class Track(BaseModel):
    Name: str = Field(
        description="Track's name and mix.",
        examples=["Track Title (Original Mix)", "Track Title (Extended Mix)"])

Using these throughout my Track class simplifies the code and reduces context switching:

Python
class Track(BaseModel):
    Name: str = Field(
        description="Track's name and mix.",
        examples=["Track Title (Original Mix)", "Track Title (Extended Mix)"])
    
    Artist: str = Field(
        description="The artist(s) of the track.",
        examples=["Above & Beyond", "Armin van Buuren"])
    
    Album: str = Field(
        description="Track's Camelot Notation indicating the key.",
        examples=["01A-Abm", "02B-GbM"])
    
    Work: str = Field(
        description="The record label that published the track.",
        examples=["Armada Music", "Anjunabeats"])
    
    Genre: str = Field(
        description="Track's musical genre.",
        examples=["Trance", "Progressive House"])
    
    TrackNumber: int = Field(
        description="Track's BPM (Beats Per Minute).",
        examples=[130, 140])
    
    Year: int = Field(
        description="Track's release year.",
        examples=[1998, 2004])
    
    MyRating: int = Field(
        description="Personal Rating.  Stars expressed as 0, 20, 40, 60, 80, or 100",
        examples=[60, 80])
    
    Location: FilePath = Field(
        description="Track's Location on the filesystem.",
        examples=[r"C:\Users\User\Music\iTunes\TranquilityBase-GettingAway-OriginalMix.mp3"])

This is especially useful for Album and TrackNumber given their unique properties.

Field Constraints

Field can also constrain the data that a class accepts. This includes string constraints:

  • max_length: Maximum length of the string.
  • min_length: Minimum length of the string.
  • pattern: A regular expression that the string must match.

and numeric constraints:

  • ge & le – greater than or equal to/less than or equal to
  • gt & lt – greater/less than
  • multiple_of – multiple of a given number

Constraints can also be combined as needed. For example, iTunes exports record MyRating values in increments of 20, where 1 star is 20 and 2 stars are 40, rising to the maximum 5 stars being 100.

I can express this within the Track class as:

Python
class Track(BaseModel):
    MyRating: int = Field(
        description="Personal Rating.  Stars expressed as 0, 20, 40, 60, 80, or 100",
        examples=[60, 80],
        ge=20,
        le=100,
        multiple_of=20)

Here, MyRating must be greater than or equal to 20 (ge=20), less than or equal to 100 (le=100), and must be a multiple of 20 (multiple_of=20).

I can also parameterise these constraints using variables instead of hard-coded values:

Python
ITUNES_RATING_RAW_LOWEST = 20
ITUNES_RATING_RAW_HIGHEST = 100

class Track(BaseModel):
    MyRating: int = Field(
        description="Personal Rating.  Stars expressed as 0, 20, 40, 60, 80, or 100",
        examples=[60, 80],
        ge=ITUNES_RATING_RAW_LOWEST,
        le=ITUNES_RATING_RAW_HIGHEST,
        multiple_of=20)

This property lets me use Pydantic with other Python libraries. Here, my Year validation checks for years greater than or equal to 1970 and less than or equal to the current year (using the datetime library):

Python
YEAR_EARLIEST = 1970
YEAR_CURRENT = datetime.datetime.now().year

class Track(BaseModel):
    Year: int = Field(
        description="Track's release year.",
        examples=[1998, 2004],
        ge=YEAR_EARLIEST,
        le=YEAR_CURRENT)

No track in the collection should exist beyond the current year – this constraint will now update itself as time passes.

Having applied other constraints, my Track class looks like this:

Python
class Track(BaseModel):
    """Pydantic model for validating iTunes track metadata."""
    
    Name: str = Field(
        description="Track's name and mix type.",
        examples=["Track Title (Original Mix)", "Track Title (Extended Mix)"])
    
    Artist: str = Field(
        description="The artist(s) of the track.",
        examples=["Above & Beyond", "Armin van Buuren"])
    
    Album: str = Field(
        description="Track's Camelot Notation indicating the key.",
        examples=["01A-Abm", "02B-GbM"])
    
    Work: str = Field(
        description="The record label that published the track.",
        examples=["Armada Music", "Anjunabeats"])
    
    Genre: str = Field(
        description="Track's musical genre.",
        examples=["Trance", "Progressive House"])
    
    TrackNumber: int = Field(
        description="Track's BPM (Beats Per Minute).",
        examples=[130, 140],
        ge=BPM_LOWEST,
        le=BPM_HIGHEST)
    
    Year: int = Field(
        description="Track's release year.",
        examples=[1998, 2004],
        ge=YEAR_EARLIEST,
        le=YEAR_CURRENT)
    
    MyRating: int = Field(
        description="Personal Rating. Stars expressed as 0, 20, 40, 60, 80, or 100",
        examples=[60, 80],
        ge=ITUNES_RATING_RAW_LOWEST,
        le=ITUNES_RATING_RAW_HIGHEST,
        multiple_of=20)
    
    Location: FilePath = Field(
        description="Track's Location on the filesystem.",
        examples=[r"C:\Users\User\Music\iTunes\AboveAndBeyond-AloneTonight-OriginalMix.mp3"])

This is already very helpful. Next, let’s examine my custom requirements.

Pydantic Custom Validation

This section discusses how to create custom data validation using Pydantic. I will outline what the requirements are, and then examine how these validations are defined and implemented.

Introducing Decorators

In Python, decorators modify or enhance the behaviour of functions or methods without changing their actual code. Decorators are usually written using the @ symbol followed by the decorator name, just above the function definition:

Python
@my_decorator
def my_function():
    ...

For example, consider this logger_decorator function:

Python
def logger_decorator(func):
    def wrapper():
        print(f"Running {func.__name__}...")
        func()  # Execute the supplied function
        print("Done!")
    return wrapper

This function takes another function (func) as an argument, printing a message before and after execution. If the logger_decorator function is then used as a decorator when running this greet function:

Python
@logger_decorator
def greet():
    print("Hello, world!")

greet()

Python will add the logging behaviour of logger_decorator without modifying greet:

Python
Running greet...
Hello, world!
Done!

Introducing Field Validators

In addition to the built-in data validation capabilities of Pydantic, custom validators with more specific rules can be defined for individual fields using Field Validators. These use the field_validator() decorator, and are declared as class methods within a class inheriting from Pydantic’s BaseModel.

Here’s a basic example using my Track model:

Python
class Track(BaseModel):
    Name: str = Field(
        description="Track's name and mix.",
        examples=["Track Title (Original Mix)", "Track Title (Extended Mix)"]
    )

    @field_validator("Name")
    @classmethod
    def validate_name(cls, value):
        # custom validation logic here
        return value

Where:

  • @field_validator("Name") tells Pydantic to use the function to validate the Name field.
  • @classmethod lets the validator access the Track class (cls).
  • The validator executes the validate_name function with the field value (in this case Name) as input, performs the checks and must either:
    • return the validated value, or
    • raise a ValueError or TypeError if validation fails.

Let’s see this in action.

Null Checks

Firstly, let’s perform a common data validation check by identifying empty fields. I have two variants of this – one for strings and another for numbers.

The first – validate_non_empty_string – uses pandas.isna to catch missing values and strip() to catch empty strings. This field validator applies to the Artist, Work and Genre columns:

Python
    @field_validator("Artist", "Work", "Genre")
    @classmethod
    def validate_non_empty_string(cls, value, info):
        """Validate that a string field is not empty."""
        if pd.isna(value) or str(value).strip() == "":
            raise ValueError(f"{info.field_name} must not be null or empty")
        return value

The second – validate_non_null_numeric – checks the TrackNumber, Year and MyRating numeric columns for empty values using pandas.isna:

Python
    @field_validator("TrackNumber", "Year", "MyRating", mode="before")
    @classmethod
    def validate_non_null_numeric(cls, value, info):
        """Validate that a numeric field is not null."""
        if pd.isna(value):
            raise ValueError(f"{info.field_name} must not be null")
        return value

Also, it uses Pydantic’s before validator (mode="before"), ensuring the data validation happens before Pydantic coerces types. This catches edge cases like "" or "NaN" before they become None or float("nan") values.

Character Check

Now let’s create a validator for something a little more challenging to define. All tracks in my collection follow a Track Name (Mix) schema. This can take many forms:

  • Original track: Getting Away (Original Mix)
  • Remixed track: Shapes (Oliver Smith Remix)
  • Updated remixed track: Distant Planet (Menno de Jong Interpretation) (2020 Remaster)
  • …and many more variants.

But generally, there should be at least one instance of text enclosed by parentheses. However, some tracks have no remixer and are released with just a title:

  • Getting Away
  • Shapes
  • Distant Planet

This not only looks untidy (eww!), but also breaks some of my downstream automation that expects the Track Name (Mix) schema. So any track without a remixer gets (Original Mix) added to the Name field upon download:

  • Getting Away (Original Mix)
  • Shapes (Original Mix)
  • Distant Planet (Original Mix)

Expressing this is possible with RegEx, but I can make a more straightforward and more understandable check with a field validator:

Python
    @field_validator("Name")
    @classmethod
    def validate_name(cls, value):
        if pd.isna(value) or str(value).strip() == "":
            raise ValueError("Name must not be null or empty")
        
        value_str = str(value)
        if '(' not in value_str:
            raise ValueError("Name must contain an opening parenthesis '('")
        if ')' not in value_str:
            raise ValueError("Name must contain a closing parenthesis ')'")
        return value

This validator checks that the value isn’t empty and then performs additional checks for parentheses. This could be one check, but having it as two checks improves log readability (insert foreshadowing – Ed). I could also have added Name to the validate_non_empty_string validation, but this way I have all my Name checks in the same place.

Parameterised Checks

Like constraints, field validators can also be parameterised. Let’s examine Album.

As iTunes exports can’t be customised, I use Album for a track’s Camelot Notation. These are based on the Camelot WheelMixedInKey‘s representation of the Circle Of Fifths. DJs generally favour Camelot Notation as it is simpler than traditional music notation for human understanding and application sorting.

Importantly, there are only twenty-four possible notations:

For example:

  • 1A (A-Flat Minor)
  • 6A (G Minor)
  • 6B (B-Flat Major)
  • 10A (B Minor)

So let’s capture these values in a CAMELOT_NOTATIONS list:

Python
CAMELOT_NOTATIONS = {
    '01A-Abm', '01B-BM', '02A-Ebm', '02B-GbM', '03A-Bbm', '03B-DbM',
    '04A-Fm', '04B-AbM', '05A-Cm', '05B-EbM', '06A-Gm', '06B-BbM',
    '07A-Dm', '07B-FM', '08A-Am', '08B-CM', '09A-Em', '09B-GM',
    '10A-Bm', '10B-DM', '11A-Gbm', '11B-AM', '12A-Dbm', '12B-EM'
}

(Note the leading zeros. Without them, iTunes sorts the Album column as (10, 11, 12, 1, 2, 3…) – you can imagine how I felt about that – Ed)

Next, I pass the CAMELOT_NOTATIONS list to an Album field validator that checks if the given value is in the list:

Python
    @field_validator("Album")
    @classmethod
    def validate_album(cls, value):
        if pd.isna(value) or str(value).strip() == "":
            raise ValueError("Album must not be null or empty")
        
        if str(value) not in CAMELOT_NOTATIONS:
            raise ValueError(f"Album must be a valid Camelot notation: {value} is not in the valid list")
        return value

Pydantic now fails any value not found in the CAMELOT_NOTATIONS list.

Now I have my validation needs fully covered. What observability does Pydantic give me over these data validation checks?

Pydantic Observability

In this section, I assess and adjust the default Pydantic observability abilities to ensure my data validation is accurately recorded.

Default Output

Pydantic automatically generates data validation error messages if validation fails. These detailed messages provide a structured overview of the issues encountered, including:

  • The index of the failing input (e.g., a DataFrame row number).
  • The model class where the error occurred.
  • The field name that failed validation.
  • A human-readable explanation of the issue.
  • The offending input value and its type.
  • A direct link to relevant documentation for further guidance.

Here’s an example of Pydantic’s output when a string field receives a NaN value:

Python
Row 2353: 1 validation error for Track
Work
  Input should be a valid string [type=string_type, input_value=nan, input_type=float]
    For further information visit https://errors.pydantic.dev/2.11/v/string_type

In this example:

  • Row 2353 indicates the problematic input row.
  • Track is the Pydantic model where validation failed.
  • Work is the failing field.
  • Pydantic detects that the input is nan (a float) and not a valid string.
  • Pydantic provides a URL to the string_type documentation.

Here’s another example, this time for a MyRating error:

Python
Row 3040: 1 validation error for Track
MyRating
  Value error, MyRating must not be null [type=value_error, input_value=nan, input_type=float]
    For further information visit https://errors.pydantic.dev/2.11/v/value_error

In this case, a field validator raised a ValueError because MyRating must not be null.

Pydantic’s error reporting is clear and actionable, making it suitable for debugging and systemic data validation tasks. However, for larger datasets or more user-friendly outputs (such as reports or UI feedback), further customisation is helpful, such as…

Terminal Output Customisation

As good as Pydantic’s default output is, it’s not that human-readable. For example, in this Terminal output I have no idea which tracks are on rows 2353, 2495 and 3040:

Plaintext
Row 2353: 1 validation error for Track
Work
  Input should be a valid string [type=string_type, input_value=nan, input_type=float]
    For further information visit https://errors.pydantic.dev/2.11/v/string_type
    
Row 2495: 1 validation error for Track
Work
  Input should be a valid string [type=string_type, input_value=nan, input_type=float]
    For further information visit https://errors.pydantic.dev/2.11/v/string_type
    
Row 3040: 1 validation error for Track
MyRating
  Value error, MyRating must not be null [type=value_error, input_value=nan, input_type=float]
    For further information visit https://errors.pydantic.dev/2.11/v/value_error

While I can find this out, it would be better to know at a glance. Fortunately, I can improve this when capturing the errors by appending the artist and name to each row of the errors object:

Python
except (ValidationError, ValueError) as e:
            artist = row['Artist'] if not pd.isna(row['Artist']) else "Unknown Artist"
            name = row['Name'] if not pd.isna(row['Name']) else "Unknown Name"
            errors.append((index, artist, name, str(e)))

Now, Artist and Name are added to each row:

Plaintext
Row 2353: Ben Stone - Mercure (Extended Mix): 1 validation error for Track
Work
  Input should be a valid string [type=string_type, input_value=nan, input_type=float]
    For further information visit https://errors.pydantic.dev/2.11/v/string_type
    
Row 2495: DJ Hell - My Definition Of House Music (Resistance D Remix): 1 validation error for Track
Work
  Input should be a valid string [type=string_type, input_value=nan, input_type=float]
    For further information visit https://errors.pydantic.dev/2.11/v/string_type
    
Row 3040: York - Reachers Of Civilisation (In Search Of Sunrise Mix): 1 validation error for Track
MyRating
  Value error, MyRating must not be null [type=value_error, input_value=nan, input_type=float]
    For further information visit https://errors.pydantic.dev/2.11/v/value_error

This makes it far easier to find the problematic files in my collection. As long as there aren’t many findings…

Creating An Error File

There are three main problems with Pydantic printing all data validation errors in the Terminal:

  • They don’t persist outside of the Terminal session.
  • The Terminal isn’t that easy to read when it’s full of text.
  • The Terminal may run out of space if there are a large number of errors.

So let’s capture the errors in a file instead. This write_error_report function generates a text-based error report from validation failures, saving it in a logs subfolder adjacent to the input file:

Python
def write_error_report(
    csv_path: Path, 
    field_error_details: Dict[str, List[str]],  
    sorted_fields: List[str]
) -> Path:

    timestamp = datetime.now().strftime("%Y%m%d-%H%M%S")
    logs_dir = csv_path.parent / "logs"
    logs_dir.mkdir(exist_ok=True)
    
    error_output_path = logs_dir / f"{timestamp}-PydanticErrors-{csv_path.stem}.txt"
    
    with open(error_output_path, 'w', encoding='utf-8') as f:
        f.write(f"Validation Error Report - {timestamp}\n")
        f.write("=" * 80 + "\n")

        for field in sorted_fields:
            messages = field_error_details.get(field, [])
            if messages:
                f.write(f"\n{field} Errors ({len(messages)}):\n")
                f.write("-" * 80 + "\n")
                for message in messages:
                    f.write(message + "\n\n")
    
    return error_output_path

Firstly, it constructs a timestamped filename using the original file’s stem (e.g., 20250529-142304-PydanticErrors-data.txt) and the logs subfolder, creating the latter if it doesn’t exist:

Python
    timestamp = datetime.now().strftime("%Y%m%d-%H%M%S")
    logs_dir = csv_path.parent / "logs"
    logs_dir.mkdir(exist_ok=True)
    
    error_output_path = logs_dir / f"{timestamp}-PydanticErrors-{csv_path.stem}.txt"

Next, Python orders the errors by the sorted_fields input, displays error counts per field and formats each error message with clear section dividers. A structured report listing all validation errors by field is saved in the logs subfolder:

Python
    with open(error_output_path, 'w', encoding='utf-8') as f:
        f.write(f"Validation Error Report - {timestamp}\n")
        f.write("=" * 80 + "\n")

        for field in sorted_fields:
            messages = field_error_details.get(field, [])
            if messages:
                f.write(f"\n{field} Errors ({len(messages)}):\n")
                f.write("-" * 80 + "\n")
                for message in messages:
                    f.write(message + "\n\n")

Finally, the filesystem path of the generated report is returned:

Python
    return error_output_path

When executed, the Terminal tells me the error file path:

Plaintext
Detailed error log written to: 20250513-133743-PydanticErrors-iTunes-Elec-Dance-Club-Main.txt

And stores the findings in a local txt file, grouped by error type for simpler readability:

Plaintext
Validation Error Report - 20250513-133743
================================================================================

MyRating Errors (5):
--------------------------------------------------------------------------------
Row 3040: York - Reachers Of Civilisation (In Search Of Sunrise Mix): 1 validation error for Track
MyRating
  Value error, MyRating must not be null [type=value_error, input_value=nan, input_type=float]
    For further information visit https://errors.pydantic.dev/2.11/v/value_error

Work Errors (22):
--------------------------------------------------------------------------------
Row 223: Dave Angel - Artech (Original Mix): 1 validation error for Track
Work
  Input should be a valid string [type=string_type, input_value=nan, input_type=float]
    For further information visit https://errors.pydantic.dev/2.11/v/string_type

Adding A Terminal Summary

Finally, I created a Terminal summary of Pydantic’s findings:

Python
print("\nValidation Summary:\n")
sorted_fields = sorted(Track.model_fields.keys())

for field in sorted_fields:
  count = error_analysis['counts'].get(field, 0)
  print(f"{field} findings: {count}")

This shows feedback after each execution:

Plaintext
Validation Summary:

Album findings: 0
Artist findings: 0
Genre findings: 0
Location findings: 0
MyRating findings: 5
Name findings: 1
TrackNumber findings: 0
Work findings: 22
Year findings: 0

Now, let’s ensure everything works properly!

Testing Pydantic

In this section, I test that my Pydantic data validation and observability processes are working correctly using iTunes export files and pytest unit tests.

Recent File Test

The first test used a recent export from the end of April 2025. Here is the Terminal output:

Plaintext
Processing file: iTunes-Elec-Dance-Club-Main-2025-04-28.txt
Reading iTunes-Elec-Dance-Club-Main-2025-04-28.txt with detected encoding UTF-16
Loaded 4407 rows
Validated 4379 rows
Found 28 errors!

Validation Summary for iTunes-Elec-Dance-Club-Main-2025-04-28.txt:
Album errors: 0
Artist errors: 0
Genre errors: 0
Location errors: 0
MyRating errors: 5
Name errors: 1
TrackNumber errors: 0
Work errors: 22
Year errors: 0

Detailed error log written to: 20250521-164324-PydanticErrors-iTunes-Elec-Dance-Club-Main-2025-04-28.txt

Good first impressions – the 4407 row count matches the export file, the summary is shown in the Terminal and an error log is created. So what’s in the log?

Firstly, five tracks have no MyRating values. For example:

Plaintext
MyRating Errors (5):
--------------------------------------------------------------------------------
Row 558: Reel People Feat Angela Johnson - Can't Stop (Michael Gray Instrumental Remix): 1 validation error for Track
MyRating
  Value error, MyRating must not be null [type=value_error, input_value=nan, input_type=float]
    For further information visit https://errors.pydantic.dev/2.11/v/value_error

This is correct, as this export was created when I added some new tracks to my collection.

Next, one track has a Name issue:

Plaintext
Name Errors (1):
--------------------------------------------------------------------------------
Row 1292: The Prodigy - Firestarter (Original Mix}: 1 validation error for Track
Name
  Value error, Name must contain a closing parenthesis ')' [type=value_error, input_value='Firestarter (Original Mix}', input_type=str]
    For further information visit https://errors.pydantic.dev/2.11/v/value_error

This one confused me at first, until I looked at the error more closely and realised the closing parenthesis is wrong! } is used instead of )! This is why my validate_name field validator has separate checks for each character – it makes it easier to understand the results!

Finally, twenty-two tracks are missing record label metadata in Work:

Plaintext
Work Errors (22):
--------------------------------------------------------------------------------
Row 223: Dave Angel - Artech (Original Mix): 1 validation error for Track
Work
  Input should be a valid string [type=string_type, input_value=nan, input_type=float]
    For further information visit https://errors.pydantic.dev/2.11/v/string_type

This means some tracks are missing full metadata. This won’t break any downstream processes as I have no reliance on this field. That said, it’s good to know about this in case my future needs change.

Older File Test

The next test uses an older file from March 2025. Let’s see what the Terminal says this time…

Plaintext
Processing file: iTunes-AllTunesMaster-2025-03-01.txt      
Reading iTunes-AllTunesMaster-2025-03-01.txt with detected encoding UTF-16
Loaded 4381 rows
Validated 0 rows
Found 4381 errors!

Validation Summary for iTunes-AllTunesMaster-2025-03-01.txt:

Album errors: 0
Artist errors: 0
Genre errors: 0
Location errors: 4381
MyRating errors: 0
Name errors: 1
TrackNumber errors: 2
Work errors: 17
Year errors: 0
Detailed error log written to: 20250521-164322-PydanticErrors-iTunes-AllTunesMaster-2025-03-01.txt

There are fewer rows here – 4381 vs 4407. This is correct, as my collection was smaller in March. But no rows were validated successfully!

I don’t have to go far to find out why:

Plaintext
Location Errors (4381):
--------------------------------------------------------------------------------
Row 0: Ariel - A9 (Original Mix): 1 validation error for Track
Location
  Path does not point to a file [type=path_not_file, input_value='C:\\Users\\User\\Folder...riel-A9-OriginalMix.mp3', input_type=str]

All the location checks failed. But this is actually a successful test!

In the time between these two exports, I reorganised my music collection. As a result, the file paths in this export no longer exist. Remember – the Location field uses the FilePath data type, which checks that the given paths exist and link to valid files. And these don’t!

The Name results are the same as the first test. This has been around for a while apparently…

Plaintext
Name Errors (1):
--------------------------------------------------------------------------------
Row 1292: The Prodigy - Firestarter (Original Mix}: 1 validation error for Track
Name
  Value error, Name must contain a closing parenthesis ')' [type=value_error, input_value='Firestarter (Original Mix}', input_type=str]
    For further information visit https://errors.pydantic.dev/2.11/v/value_error

There are also TrackNumber errors in this export:

Plaintext
TrackNumber Errors (2):
--------------------------------------------------------------------------------
Row 485: Andrew Bayer Feat Alison May - Brick (Original Mix): 2 validation errors for Track
TrackNumber
  Input should be greater than or equal to 100 [type=greater_than_equal, input_value=90, input_type=int]
    For further information visit https://errors.pydantic.dev/2.11/v/greater_than_equal

Two tracks have BPM values lower than the set range. Both files were moved during my reorganisation, but were included in this export at the time and therefore fail this validation check.

Finally, the Work errors are the same as the first test (although more have crept in since!):

Plaintext
Work Errors (17):
--------------------------------------------------------------------------------
Row 223: Dave Angel - Artech (Original Mix): 1 validation error for Track
Work
  Input should be a valid string [type=string_type, input_value=nan, input_type=float]
    For further information visit https://errors.pydantic.dev/2.11/v/string_type

Ultimately, both tests match expectations!

Unit Tests With Amazon Q

Finally, I wanted to include some unit tests for this project. Unit testing is always a good idea, especially in this context where I can verify function outputs and error generation without needing to create numerous test files.

I figured this was a good opportunity to test Amazon Q Developer and see what it came up with. I gave it a fairly basic prompt, using the @workspace context to allow Q access to my project’s entire workspace as context for its responses:

Plaintext
@workspace write unit tests for this script using pytest

I tend to use pytest for my Python testing, as I find it simpler and more flexible than Python’s standard unittest library.

Q promptly provided several reasonable tests in response. This initiated a half-hour exchange between us focused on calibrating the existing tests and creating new ones. To be fair to Q, my initial prompt was quite basic and could have been much more detailed.

Amongst Q’s tests was this one testing an empty Artist field:

Python
    @patch('pathlib.Path.exists')
    def test_empty_artist(self, mock_exists):
        """Test that an empty artist fails validation."""
        # Mock file existence check
        mock_exists.return_value = True
        
        invalid_track_data = {
            "Name": "Test Track (Original Mix)",
            "Artist": "",  # Empty artist
            "Album": "01A-Abm",
            "Work": "Test Label",
            "Genre": "Trance",
            "TrackNumber": 130,
            "Year": 2020,
            "MyRating": 80,
            "Location": "C:\\Music\\test_track.mp3"
        }

This one, checking an invalid Camelot Notation:

Python
@patch('pathlib.Path.exists')
    def test_invalid_album_not_camelot(self, mock_exists):
        """Test that an invalid Camelot notation fails validation."""
        # Mock file existence check
        mock_exists.return_value = True
        
        invalid_track_data = {
            "Name": "Test Track (Original Mix)",
            "Artist": "Test Artist",
            "Album": "Invalid Key",  # Not a valid Camelot notation
            "Work": "Test Label",
            "Genre": "Trance",
            "TrackNumber": 130,
            "Year": 2020,
            "MyRating": 80,
            "Location": "C:\\Music\\test_track.mp3"
        }
        
        with pytest.raises(ValueError, match="Album must be a valid Camelot notation"):
            Track(**invalid_track_data)

And this one, checking what happens with an incomplete DataFrame:

Python
    @patch('wolfie_exportvalidator_itunes.detect_file_encoding')
    @patch('pandas.read_csv')
    def test_load_itunes_data_missing_columns(self, mock_read_csv, mock_detect_encoding):
        """Test loading iTunes data with missing columns."""
        # Setup mocks
        mock_detect_encoding.return_value = 'utf-8'
        mock_df = pd.DataFrame({
            'Name': ['Test Track (Original Mix)'],
            'Artist': ['Test Artist'],
            # Missing required columns
        })
        mock_read_csv.return_value = mock_df
        
        # Call function and verify it raises an error
        with pytest.raises(ValueError, match="Missing expected columns"):
            load_itunes_data(Path('dummy_path.txt'))

I’ll include the whole test suite in my GitHub repo. Let’s conclude with pytest‘s output:

Plaintext
collected 41 items                                                                                                                                        

tests/test_wolfie_exportvalidator_itunes.py::TestTrackModel::test_valid_track PASSED                                                                [  2%] 
tests/test_wolfie_exportvalidator_itunes.py::TestTrackModel::test_valid_track_boundary_values PASSED                                                [  4%] 
tests/test_wolfie_exportvalidator_itunes.py::TestTrackModel::test_invalid_name_no_parentheses PASSED                                                [  7%]
tests/test_wolfie_exportvalidator_itunes.py::TestTrackModel::test_empty_name PASSED                                                                 [  9%] 
tests/test_wolfie_exportvalidator_itunes.py::TestTrackModel::test_empty_artist PASSED                                                               [ 12%] 
tests/test_wolfie_exportvalidator_itunes.py::TestTrackModel::test_empty_work PASSED                                                                 [ 14%] 
tests/test_wolfie_exportvalidator_itunes.py::TestTrackModel::test_empty_genre PASSED                                                                [ 17%] 
tests/test_wolfie_exportvalidator_itunes.py::TestTrackModel::test_invalid_album_not_camelot PASSED                                                  [ 19%] 
tests/test_wolfie_exportvalidator_itunes.py::TestTrackModel::test_valid_camelot_notations PASSED                                                    [ 21%] 
tests/test_wolfie_exportvalidator_itunes.py::TestTrackModel::test_invalid_bpm_range_high PASSED                                                     [ 24%] 
tests/test_wolfie_exportvalidator_itunes.py::TestTrackModel::test_invalid_bpm_range_low PASSED                                                      [ 26%] 
tests/test_wolfie_exportvalidator_itunes.py::TestTrackModel::test_invalid_year_range_early PASSED                                                   [ 29%]
tests/test_wolfie_exportvalidator_itunes.py::TestTrackModel::test_invalid_year_range_future PASSED                                                  [ 31%] 
tests/test_wolfie_exportvalidator_itunes.py::TestTrackModel::test_invalid_rating_not_multiple PASSED                                                [ 34%] 
tests/test_wolfie_exportvalidator_itunes.py::TestTrackModel::test_invalid_rating_too_low PASSED                                                     [ 36%] 
tests/test_wolfie_exportvalidator_itunes.py::TestTrackModel::test_invalid_rating_too_high PASSED                                                    [ 39%] 
tests/test_wolfie_exportvalidator_itunes.py::TestTrackModel::test_null_track_number PASSED                                                          [ 41%] 
tests/test_wolfie_exportvalidator_itunes.py::TestTrackModel::test_null_year PASSED                                                                  [ 43%] 
tests/test_wolfie_exportvalidator_itunes.py::TestTrackModel::test_null_rating PASSED                                                                [ 46%]
tests/test_wolfie_exportvalidator_itunes.py::TestFileOperations::test_detect_file_encoding PASSED                                                   [ 48%] 
tests/test_wolfie_exportvalidator_itunes.py::TestFileOperations::test_detect_file_encoding_latin1 PASSED                                            [ 51%] 
tests/test_wolfie_exportvalidator_itunes.py::TestFileOperations::test_detect_file_encoding_no_result PASSED                                         [ 53%] 
tests/test_wolfie_exportvalidator_itunes.py::TestFileOperations::test_load_itunes_data_success PASSED                                               [ 56%]
tests/test_wolfie_exportvalidator_itunes.py::TestFileOperations::test_load_itunes_data_missing_columns PASSED                                       [ 58%] 
tests/test_wolfie_exportvalidator_itunes.py::TestFileOperations::test_load_itunes_data_empty_dataframe PASSED                                       [ 60%] 
tests/test_wolfie_exportvalidator_itunes.py::TestValidation::test_validate_tracks_all_valid PASSED                                                  [ 63%] 
tests/test_wolfie_exportvalidator_itunes.py::TestValidation::test_validate_tracks_with_errors PASSED                                                [ 65%] 
tests/test_wolfie_exportvalidator_itunes.py::TestValidation::test_duplicate_location PASSED                                                         [ 68%] 
tests/test_wolfie_exportvalidator_itunes.py::TestValidation::test_analyze_errors PASSED                                                             [ 70%] 
tests/test_wolfie_exportvalidator_itunes.py::TestValidation::test_analyze_errors_with_general_error PASSED                                          [ 73%] 
tests/test_wolfie_exportvalidator_itunes.py::TestValidation::test_write_error_report PASSED                                                         [ 75%]
tests/test_wolfie_exportvalidator_itunes.py::TestValidation::test_process_file_with_errors PASSED                                                   [ 78%] 
tests/test_wolfie_exportvalidator_itunes.py::TestValidation::test_process_file_no_errors PASSED                                                     [ 80%]
tests/test_wolfie_exportvalidator_itunes.py::TestParamsData::test_bpm_range_valid PASSED                                                            [ 82%] 
tests/test_wolfie_exportvalidator_itunes.py::TestParamsData::test_year_range_valid PASSED                                                           [ 85%] 
tests/test_wolfie_exportvalidator_itunes.py::TestParamsData::test_rating_range_valid PASSED                                                         [ 87%] 
tests/test_wolfie_exportvalidator_itunes.py::TestParamsData::test_camelot_notations_valid PASSED                                                    [ 90%] 
tests/test_wolfie_exportvalidator_itunes.py::TestMain::test_main_with_files PASSED                                                                  [ 92%] 
tests/test_wolfie_exportvalidator_itunes.py::TestMain::test_main_no_files PASSED                                                                    [ 95%] 
tests/test_wolfie_exportvalidator_itunes.py::TestMain::test_main_with_exception PASSED                                                              [ 97%]
tests/test_wolfie_exportvalidator_itunes.py::TestMain::test_main_with_critical_exception PASSED                                                     [100%] 

=================================================================== 41 passed in 0.20s =================================================================== 

I had a very positive experience overall! Working with Amazon Q allowed me to write the tests more quickly than I could have done on my own. We would have been even faster if I had put more thought into my initial prompt. Additionally, since Q Developer offers a generous free tier, it didn’t cost me anything.

GitHub Repo

I have committed my Pydantic data validation script, test suite and documentation in the repo below:

GitHub-BannerSmall

Note that the parameters are decoupled from the Pydantic script. This will allow me to reuse some parameters across future validation scripts and has enabled me to exclude the system parameters from the repository.

Summery

In this post, I used the Pydantic Python library to create data validation and observability processes for my Project Wolfie iTunes data.

I found Pydantic very impressive! Its simplicity, functionality and interoperability make it an attractive addition to Python data pipelines, and its strong community support keeps Pydantic relevant and current. Additionally, Pydantic’s presence in FastAPI, PydanticAI and a managed AWS Lambda layer enables rapid integration and seamless deployment. I see many applications for it within Project Wolfie.

There’s lots more to Pydantic – this Pixegami video is a great walkthrough of Pydantic in action:

If this post has been useful then the button below has links for contact, socials, projects and sessions:

SharkLinkButton 1

Thanks for reading ~~^~~

Categories
Data & Analytics

Silver Layer Python ETL With The AWS Glue ETL Job Script Editor

In this post, I create my WordPress data pipeline’s Silver ETL process using Python and the AWS Glue ETL Job Script Editor.

Table of Contents

Introduction

Last time I worked on my WordPress AWS data pipeline, I produced my Bronze layer data and created a Glue Crawler to derive the schema of the Bronze S3 objects. It’s now time to start cleaning that data to prepare it for reporting, aggregation and consumption.

I’m also currently studying for the AWS Certified Data Engineer – Associate certification. While revising for this I learned the capabilities of the AWS Glue ETL Job Script Editor, and it seemed an ideal fit for my Silver ETL process. So I decided to make a post out of it and see how things went!

Firstly, I’ll examine the AWS Glue ETL Job Script Editor and how it will benefit my Silver ETL process. Then I’ll define the architecture of the Silver ETL job and how it fits into the existing data pipeline. Next, I’ll script and test the job. Finally, I’ll integrate it into the pipeline and explore the job’s costs.

Glue ETL Job Script Editor

This section examines the AWS Glue ETL Script Editor and Python Shell and considers some of Python Shell’s benefits and limitations.

Script Editor & Python Shell

Script Editor is a feature of AWS Glue. It offers serverless Spark, Ray and Python shells, enabling data transformation, preparation and cleaning with no infrastructure management. Scripts can be both uploaded and created from scratch, and version control is configurable to several Git services.

This post focuses on AWS Glue Python Shell. Introduced in 2019, Python Shell jobs suit small to medium-sized tasks as part of an ETL workflow.

Python Shell Pros

This section examines some of Python Shell’s benefits.

Low Cost

Python Shell jobs are the cheapest of the Glue job types to run. Glue charges are based on data processing units (DPUs). A single standard DPU currently provides 4 vCPU and 16 GB of memory. While regular Glue ETL jobs using Apache Spark need at least 2 DPUs, Python Shell jobs default to using only 1/16 (or 0.0625) DPU!

This can also be extended to 1 DPU, resulting in faster completion times. Like AWS Lambda, charges accrue based on resource usage and duration. So increased resource allocation can potentially create further savings.

This section was correct as of August 2024 – the latest pricing data is on the AWS Glue pricing site.

Low Barrier To Entry

Python Shell jobs offer accessibility for those from a scripting background. When creating a new script in the console, users only need to choose the engine (in this case Python) and whether the script is being uploaded or created fresh. And that’s it! No configuring interpreters, environments or dependencies.

Python Shell jobs also integrate with other AWS services. They can easily connect to data sources like S3, RDS and DynamoDB. They can be automated with Glue Workflows and Triggers. IAM can also control access to both the Python Shell job and the AWS services it interacts with.

Included Python Libraries

AWS Glue Python Shell includes a variety of built-in Python libraries that are useful for ETL tasks. These libraries cover a range of functionalities such as data processing, machine learning, and interacting with AWS services.

They include:

This AWS post has a full table of included libraries and their versions. Additional libraries can be installed and imported using PIP.

Some people will quickly see issues with this list though…

Python Shell Cons

This section examines some of Python Shell’s limitations.

Outdated Python Versions & Libraries

While the included libraries are welcome, they are also quite outdated. For example, boto3‘s included version is 1.21.21 while the current version is 1.34.150. pandas is at 1.4.2 in the table and 2.2.2 online.

This is likely due to the supported Python versions – currently Python 3.6 and Python 3.9. Now, while Python 3.9 isn’t out of support until October 2025, it was released back in October 2020 and has had three major upgrades since. Worse, Python 3.6 ended life at Christmas 2021!

With the Data Engineer Associate certification drawing attention to various AWS data services, it’s a shame that this feature is so far behind. This would be a great modernisation tool for importing legacy Python scripts into Glue, but the last feature update was in 2022 and it’s really starting to lag behind now.

No Visual Editor

Yes I know it’s a script editor but hear me out.

Let’s briefly segue to AWS IAM. In the early days, updating IAM policies had the potential of losing afternoons to missing braces or errant commas. There was no native AWS validation tooling and the whole thing felt like a dark art for those less experienced.

Then AWS released an IAM visual policy editor. And things went from this:

2024 07 30 IAMPolicyJSON

To this!

2024 07 30 IAMPolicyDown

This transformed the IAM policy-writing process. The guesswork was gone – new policies could be written using dropdowns and checkboxes. And AWS would generate the same code each time, in the same way and to the same standard.

In today’s AWS console, IAM can be administrated both visually and as JSON. Updates made in the visual editor reflect in the code in real-time, and vice versa. And the IAM IDE immediately flags syntax issues, unclosed keypairs and whatnot.

This interface would work so well with Glue Script Editor. It would simplify and encourage using Script Editor, creating standardised code by default and reducing development time. No more syntax violations, verbose comments or missing dependencies – AWS could handle all that.

This doesn’t even need AI – it would just be procedural code generation. Something like selecting awswrangler from a dropdown list, then selecting an S3 location to read or write and a file type to expect. Or even a list of code snippets for the included libraries. These features could all lighten the dev load.

Limited IDE

Let’s consider AWS Lambda’s IDE:

2024 07 30 LambdaIDE

Its benefits include:

  • Code autocompletion
  • Integrated testing
  • Integrated monitoring

And tons of other user-focused functionality. Conversely, this is the Glue Script Editor IDE:

2024 07 30 GlueIDE

Hmm.

Now don’t get me wrong – I’m not asking for Lambda Lite. But something a bit more than Notepad would be nice. AWS are currently making a massive deal of Amazon CodeWhisperer and Amazon Q Developer‘s autocomplete actions, but here pandas isn’t even suggested when I type import pan. And it’s an included library!

The obvious solution is to just use Lambda. But Glue Script Editor offers a sweet spot where it runs custom Python while operating entirely within the AWS Glue service. This is helpful for features like Glue Triggers and Workflows that can’t currently trigger Lambda functions. It’s also helpful with AWS Organisations, where using Glue Script Editor for Python ETL can enable SCPs that entirely block access to AWS Lambda for data-centric accounts.

So Why Use It?

So are Glue Python Shell jobs worth considering with these limitations? Definately! There are several use cases favouring them:

  • Legacy ETL jobs that either can’t use recent Python versions and libraries, or simply don’t need them.
  • Simple, lightweight tasks that don’t require the more advanced (and expensive) features of Apache Spark or Ray.
  • Tasks that need to run quickly, as Python Shells have faster startup times than the Spark environments used by regular Glue ETL jobs.
  • Long-running ETL tasks unsuitable for AWS Lambda, as Python Shell jobs can run for up to 48 hours compared to Lambda’s 15 minutes. Thanks to Yan Cui‘s blog for that one!

For my requirements, a Python Shell job makes sense because I’m doing simple transformations on small volumes of data.

Architecture

This section examines the architecture of my proposed solution. Much of this architecture is similar to the Bronze layer. I’ll examine the new Silver ELT job, followed by the updated data pipeline Step Function workflow.

Glue Silver ETL Job

Firstly, this is the Glue Silver ETL job:

Amazon S3
Bronze Bucket
Amazon S3…
Amazon S3
Silver Bucket
Amazon S3…
AWS Glue
Silver ETL Job
AWS Glue…
Amazon CloudWatch
Logs
Amazon CloudWatch…
1
1
2
2
AWS Cloud
AWS Cloud
Text is not SVG – cannot display

While updating CloudWatch Logs throughout:

  1. Silver Glue ETL job extracts data from wordpress-api Bronze S3 objects and performs Python transformations.
  2. Silver Glue ETL job loads the transformed data into Silver S3 bucket as Parquet objects.

Step Function Workflow

Next, the updated Step Function workflow:

AWS Cloud
AWS Cloud
EventBridge
Schedule
EventBridge…
AWS Step Functions workflow
AWS Step Functions workflow
3
3
AWS Lambda Raw Function
AWS Lambda Ra…
AWS SNS Topic
AWS SNS Topic
2
2
State
Machine
State…
AWS Lambda Bronze Function
AWS Lambda Br…
F
F
5
5
AWS Glue
Bronze Crawler
AWS Glue…
4
4
AWS Glue
Silver ETL Job
AWS Glue…
F
F
F
F
1
1
F
F
EventBridge
Scheduler
EventBridge…
AWS SNS Topic
AWS SNS Topic
User
User
CloudWatch Logs
CloudWatch Lo…
F
F
F
F
Text is not SVG – cannot display

While updating the workflow’s CloudWatch Log Group throughout:

  1. An EventBridge Schedule executes the Step Functions workflow.
  2. Raw Lambda function is invoked.
    • Invocation Fails: Publish SNS message. Workflow ends.
    • Invocation Succeeds: Invoke Bronze Lambda function.
  3. Bronze Lambda function is invoked.
    • Invocation Fails: Publish SNS message. Workflow ends.
    • Invocation Succeeds: Run Glue Crawler.
  4. Glue Crawler runs.
    • Run Fails: Publish SNS message. Workflow ends.
    • Run Succeeds: Update Glue Data Catalog. Run Glue Silver ETL job.
  5. Glue Silver ETL job runs.
    • Run Fails: Publish SNS message. Workflow ends.
    • Run Succeeds: Workflow ends.

An SNS message is published if the Step Functions workflow fails.

Silver ETL Job

In this section, I create the Silver ETL Python script for the AWS Glue Script Editor. Firstly I’ll define the script’s requirements. Next, I’ll translate them into Python code, and finally I’ll create the ETL script and upload it to Git.

Requirements

Firstly, let’s define the requirements for this data pipeline layer. So what does a typical Silver ETL process involve?

Databricks defines the Silver layer as cleansed and conformed data:

In the Silver layer of the lakehouse, the data from the Bronze layer is matched, merged, conformed and cleansed (“just-enough”) so that the Silver layer can provide an “Enterprise view” of all its key business entities, concepts and transactions. (e.g. master customers, stores, non-duplicated transactions and cross-reference tables).

https://www.databricks.com/glossary/medallion-architecture

Because my data source is a WordPress MySQL database, most of the cleansing and conforming work I’d expect to do has already been done there! That said, there’s data that I definitely won’t need, as well as other transformations I can apply to help downstream reporting.

Some of the following transformations can be done at the SQL reporting level with date and string functions. However, these add repetitive load and complexity to queries, which can be avoided by some cleaning transformations. Roche’s Maxim of Data Transformation applies here:

Data should be transformed as far upstream as possible, and as far downstream as necessary.

https://ssbipolar.com/2021/05/31/roches-maxim/

The Silver layer transformations I’m doing here are:

Column Removal

Many columns are empty or unneeded, so now is the time to remove them. This will reduce the data held in the Silver objects, making them cheaper to store and faster to query.

My script uses the pandas.DataFrame.drop function to remove columns by specifying column names. Here, a term_order column is dropped from the DataFrame df:

Python
df = df.drop(columns=['term_order'])

Date Splitting

Dates are tough to analyse and don’t aggregate well, as each date is effectively three different data points in one field. Splitting dates into years, months and days improves data bucketing, query granularity and time series analytics.

My script uses the pandas to_datetime function to convert scalar, array-like, Series or DataFrame/dict-like objects to pandas datetime objects.

Here, values in the date column of the DataFrame df are converted from strings to datetime objects and stored in a new date_todate column. Next, the year attribute of each date_todate column object is extracted and stored in a new date_year column. Finally, the same happens for month and day attributes:

Python
df['date_todate'] = pd.to_datetime(df['date'])

df['date_year'] = df['date_todate'].dt.year
df['date_month'] = df['date_todate'].dt.month
df['date_day'] = df['date_todate'].dt.day

String Editing

Some columns use HTML character entity names for reserved characters. For example, & in place of &. This is great for rendering HTML but not great for analytics.

My script uses the str.replace string method to return a copy of each string with all occurrences of the specified substring replaced by a new one. Here, all instances of & amp; in the name column are overwritten with &:

Python
df['name'] = df['name'].str.replace('& amp;','&')

So that’s the transformations. What else is the script doing?

Python Script

Most of the Silver script processes are similar to the Bronze script ones, including:

  • Logging
  • Getting parameters
  • Accessing S3 objects

So most functionality is reused from my Bronze Lambda function, which is fully documented in this post. To summarise the imports:

Python
import logging                          # Logging
import boto3                            # AWS Interactions
import botocore                         # AWS Exceptions
import awswrangler as wr                # S3 Interactions
import pandas as pd                     # Data Manipulation
from botocore.client import BaseClient  # AWS Type Hints

Some changes have been made for the Silver script:

  • Parameters, object names and logs have been updated from Bronze to Silver:
Python
parametername_snstopic: str = '/sns/data/lakehouse/silver'

logging.info("Getting S3 Silver parameter...")

s3_bucket_silver = get_parameter_from_ssm(client_ssm, parametername_s3bucket_silver)
  • New functionality identifies the AWS AccountID the script is running in:
Python
# Get & display AWS AccountID
identity = client_sts.get_caller_identity()
account_id = identity['Account']
logging.info(f"Starting in AWS Account ID {account_id}")

This is more of a sanity check for me – I have several AWS accounts and want to check I’ve accessed the right one!

  • A test that stops the current loop interaction if the object name doesn’t match one of the expected ones:
Python
# Check if object is mapped and bypass if not.
if object_name not in {'posts', 'statistics_pages', 'term_relationship', 'term_taxonomy', 'terms'}:

logging.warning(f'{object_name} is not currently mapped.  Skipping transform...')

object_count_failure += 1
continue

Finally, I wrote a new function for my Silver transformation logic. This isn’t included here (although it is in my repo) because it’s long. Very long! My first thought was to decouple the ETL processes from each other and write separate scripts for each object. So 5 in total.

However, Python Shell jobs are billed per second with a 1-minute minimum. So 5 jobs = 5 minutes billed. But the job only takes around 60 seconds to process all five objects! I’d have run up 5 times the usage and 5 times the cost for no real benefit.

The full script is in my Github repo.

Testing was quick because it was effectively repeating the Bronze script tests with new parameters. After successfully testing the script locally, it’s time to get it working in AWS!

Uploading & Testing

In this section I upload my Silver ETL script, integrate it with AWS Glue Script Editor and AWS Step Functions and test everything works as expected.

Creating The Python Shell Job

Firstly, let’s get my script into AWS Glue. There are several ways of doing this. If the script is uploaded to S3 then AWS can create a Glue ETL job with the AWS CLI create-job command:

Bash

 aws glue create-job --name python-job-cli --role Glue_DefaultRole 
     --command '{"Name" :  "pythonshell", "PythonVersion": "3.9", "ScriptLocation" : "s3://DOC-EXAMPLE-BUCKET/scriptname.py"}'  
     --max-capacity 0.0625

And with the AWS CloudFormation AWS::Glue::Job resource:

YAML
AWSTemplateFormatVersion: 2010-09-09
Resources:
  Python39Job:
    Type: 'AWS::Glue::Job'
    Properties:
      Command:
        Name: pythonshell
        PythonVersion: '3.9'
        ScriptLocation: 's3://DOC-EXAMPLE-BUCKET/scriptname.py'
      MaxRetries: 0
      Name: python-39-job
      Role: RoleName           
        

Scripts can also be pulled from Git repositories. Here I’ll create my Silver ETL job in the Glue Script Editor console. This creates a new Python script in an S3 bucket location of s3://aws-glue-assets-[AWSAccountID]-[Region]/scripts/.

Next, the new job needs an IAM role with appropriate permissions for the AWS services the script interacts with. Other parameters, including maximum DPU, job timeout value and Python version, can also be set. In addition, Glue Data Quality checks are also supported. And, once saved, the Glue job can have a schedule applied.

Testing Job Execution

AWS Glue records data for each job execution and publishes extensive details and logs:

2024 08 09 AWSGlueJobRun

Glue stores details about the job and Python environment, and logs are published and stored in Amazon Cloudwatch.

And so begins the testing! Initially, I was getting one of my own Python boto3 exceptions:

ValueError: No SNS topic returned.

Easy to fix. This IAM policy was based on the same one that my Bronze Lambda function uses. But the Silver ETL script uses different AWS resources so some IAM policy ARNs need to change. Specifically, the Silver ETL job’s IAM role needs to allow:

  • ssm:GetParameter on the required Parameter Store parameters.
  • sns:Publish on the required SNS topics.
  • s3:GetObject on the data-lakehouse-silver/wordpress_api/* objects.

With these changes, the Silver ETL job runs perfectly and creates new objects in the Silver S3 bucket:

2024 08 09 MonitoringTimeline

With the Glue job running and S3 object creation verified successfully, it’s time to validate the data.

Data Integration & Validation

Validating the data involves two processes:

  • Integrating the data into the Glue Data Catalog.
  • Querying the data with Amazon Athena.

There are several ways to update the Glue Data Catalog, and here I’ll create a new Glue Crawler using a similar setup to my Bronze Crawler. This time the crawler is reading objects from the Silver S3 bucket instead of the Bronze one, and the new Glue Data Catalog tables are prefixed with silver- instead of bronze-.

The Silver crawler creates these new tables in the Glue Data Catalog’s wordpress_api database:

2024 08 06 GlueDataCatalog

This gives Athena visibility of the tables, enabling data validation via SQL query execution. Querying wordpress_api.silver-terms shows the removed column and updated strings:

2024 08 06 AthenaSilverTerms

And querying wordpress_api.silver-statistics_pages shows the split dates:

2024 08 06 AthenaSilverStatistics pages

Looks good! Now that everything has been validated, let’s add these steps to the WordPress Data Pipeline.

Step Function Update

The WordPress Data Pipeline Step Function workflow that I started back in March continues to grow. There’s a new job and a second crawler to add to it now!

The Silver crawler is added in the same way as the Bronze one (including the IAM changes) so let’s focus on adding the new Glue Python Shell ETL job.

Adding Glue ETL jobs to a Step Function workflow is well documented The task uses the StartJobRun Glue API action under the hood and has an optimized integration that enables the .sync integration pattern. Enabling this means the Step Functions workflow waits for the StartJobRun request to complete before progressing to the next state.

However, my workflow currently lacks IAM permissions to run the Silver Glue ETL job. So I make a new IAM policy that allows the glue:StartJobRun action on the Silver Glue ETL job and attach it to the workflow’s IAM role:

JSON
{
	"Version": "2012-10-17",
	"Statement": [
		{
			"Sid": "VisualEditor0",
			"Effect": "Allow",
			"Action": [
				"glue:StartJobRun"
			],
			"Resource": "arn:aws:glue:eu-west-1:[REDACTED]:job/wordpressapi_silver"
		}
	]
}

My Step Function workflow now looks like this:

2024 08 09 stepfunctions graph

Let’s execute the Step Function workflow and check it works.

Step Function Test

Upon execution, everything works as intended. The new StartGlueJob action is triggered and the Glue ETL job is successful:

2024 08 06 GlueJobDetails

But the Step Function doesn’t transition to the next step. In fact it continued running to the point I had to stop it myself after several minutes:

2024 08 06 StepFunctionsStop

So what’s going on? I asked Amazon Q about this behaviour, and in its response were the following points:

  1. Step Functions uses a “sync” integration with AWS Glue, which means it relies on polling the status of the Glue job using the GetJobRun API call.
  2. The polling schedule is designed to be once per minute for the first 10 minutes, and then every 5 minutes thereafter. This is to avoid excessive API calls to Glue.
Amazon Q

Q also linked to this AWS repost answer with further details:

This is an expected behavior in case of .sync integration with AWS Glue. Service integrations that use the .sync pattern require additional IAM permissions where Step Functions will make use of a managed Eventbridge rule to monitor the status of the job. However, AWS Glue does not support Eventbridge integration and thus, Step Functions polls the job status using the GetJobRun API call to fetch the status of the job.

https://repost.aws/questions/QUFFlHcbvIQFe-bS3RAi7TWA/a-glue-job-in-a-step-function-is-taking-so-long-to-continue-the-next-step

This made things clearer. When Step Functions starts a Glue ETL job using a StartGlueJob action with optimized integration, Step Functions determines that job’s status (and thus when to transition to the next action) by calling Glue’s GetJobRun API.

However, my workflow’s IAM role doesn’t have permission to do that! And because Step Functions can’t determine the ETL job’s status, it doesn’t know that the job has finished and the next state transition never happens! Everything stops!

This is resolved by adding the glue:GetJobRun action to the workflow’s IAM policy:

JSON
{
	"Version": "2012-10-17",
	"Statement": [
		{
			"Sid": "VisualEditor0",
			"Effect": "Allow",
			"Action": [
				"glue:StartJobRun",
				"glue:GetJobRun"
			],
			"Resource": "arn:aws:glue:eu-west-1:REDACTED:job/wordpressapi_silver"
		}
	]
}

This time, the Glue GetJobRun API calls are successful. The Step Functions workflow validates that the ETL job has finished, moves to the next state as intended and ultimately completes successfully:

2024 08 06 ExecutionSuccessFull

Thanks Amazon Q!

Costs

Finally, let’s look at the costs for my Glue Script Editor Silver ETL Job resources.

This graph shows all Glue API costs between 2024-07-31 (first AWS job execution) and 2024-09-09:

2024 08 09 CostExplorerGlue

Of the $0.38:

  • $0.37 is the CrawlerRun API for the two Glue Crawlers I’m running.
  • $0.01 is the Jobrun API for the 15 job runs between 2024-07-31 and 2024-09-09.

So all things considered, very manageable!

Summary

In this post, I created my WordPress data pipeline’s Silver ETL process using Python and the AWS Glue ETL Job Script Editor.

I found the Script Editor jobs very useful. They offer Lambda’s benefits of scalability, managed infrastructure and integration with other AWS services, combined with data-centric libraries and features that make it easier to hit the ground running development-wise. It has clear limitations and could do with some AWS TLC, but it was a good fit here and rivals Lambda for some future ETL processes I have planned.

If this post has been useful then the button below has links for contact, socials, projects and sessions:

SharkLinkButton 1

Thanks for reading ~~^~~

Categories
Developing & Application Integration

WordPress Bronze Data Orchestration With AWS

In this post, I create my WordPress pipeline’s bronze data orchestration process using AWS Lambda layers and AWS Step Functions.

Table of Contents

Introduction

In recent posts, I’ve written a Python script to extract WordPress API data and automated the script’s invocation with AWS services. This script creates five JSON objects in an S3 bucket at 07:00 each morning.

Now, I want to transform the data from semi-structured raw JSON into a more structured and query-friendly ‘bronze’ format to prepare it for downstream partitioning, cleansing and filtration.

Firstly, I’ll cover the additions and changes to my pipeline architecture. Next, I’ll examine both my new bronze Python function and the changes made to the existing raw function.

Finally, I’ll deploy the bronze script to AWS Lambda and create my WordPress pipeline orchestration process with AWS Step Functions. This process will ensure both Lambdas run in a set order each day.

Let’s start by examining my latest architectural decisions.

Architectural Decisions

In this section, I examine my architectural decisions for the bronze AWS Lambda function and the WordPress pipeline orchestration. Note that these decisions are in addition to my previous ones here and here.

AWS SDK For pandas

AWS SDK For pandas is an open-source Python initiative using the pandas library. It integrates with AWS services including Athena, Glue, Redshift, DynamoDB and S3, offering abstracted functions to execute various data processes.

AWS SDK For pandas used to be called awswrangler until AWS renamed it for clarity. It now exists as AWS SDK For pandas in documentation and awswrangler in code.

AWS Lambda Layers

A Lambda layer is an archive containing code like libraries, dependencies, or custom runtimes. Layers can be both created manually and provided by AWS and third parties. Each Lambda function can include up to five layers.

Layers can be shared between functions, reducing code duplication and package sizes. This reduces storage costs and lets the smaller packages deploy markedly faster. Layers also separate dependencies from function code, supporting decoupling and separation of concerns.

AWS Step Functions

AWS Step Functions is a serverless orchestration service that integrates with other AWS services to build application workflows as a series of event-driven steps. For example, chaining Athena queries and ML model training.

Central to the Step Functions service are the concepts of States and State Machines:

  • States represent single steps or tasks in a workflow, and can be one of several types. The Step Functions Developer Guide has a full list of states.

The AWS Step Functions Developer Guide’s welcome page has more details including workflow types, use cases and a variety of sample projects.

Apache Parquet

Onto the data architecture! Let’s start by choosing a structured file type for the bronze data:

Apache Parquet is an open source, column-oriented data file format designed for efficient data storage and retrieval. It provides efficient data compression and encoding schemes with enhanced performance to handle complex data in bulk.

Databricks: What is Parquet?

There’s a more detailed explanation in the Parquet documentation too. So why choose Parquet over something like CSV? Well:

  • Size: Parquet supports highly efficient compression, so files take up less space and are cheaper to store than CSVs.
  • Performance: Parquet files store metadata about the data they hold. Query engines can use this metadata to find only the data needed, whereas with CSVs the whole file must be read first. This reduces the amount of processed data and enhances query performance.
  • Compatibility: Parquet is an open standard supported by various data processing frameworks including Apache Spark, Apache Hive and Presto. This means that data stored in Parquet format can be read and processed across many platforms and services.

Data Lakehouse

A Data Lakehouse is an emerging data architecture combining the centralized storage of raw data synonymous with Data Lakes with the transactional and analytical processing associated with Data Warehouses.

The result is a unified platform for efficient data management, analytics, and insights. Lakehouses have gained popularity as cloud services increasingly support them, with AWS, Azure and GCP all providing Lakehouse services.

This segues neatly into…

Medallion Architecture

Medallion Architecture is a data design pattern for logically organizing data in a Lakehouse. It aims to improve data quality as it flows through various layers incrementally. Names for these layers vary, tending to be Bronze, Silver, and Gold.

Implementations of the Medallion Architecture also vary. I like this Advancing Analytics video, which maps the Medallion Architecture to their approach. Despite the title it’s not a negative video, instead outlining how the three layers don’t necessarily fit every use case.

I’m using Raw and Bronze layers here because they best fit what I’m doing with my data.

Architectural Updates

In this section, I examine the changes made to my existing architecture.

Amazon S3

I’ve created a new data-lakehouse-bronze s3 bucket in the same region as the data-lakehouse-raw bucket to separate the two data layers.

Why use two buckets instead of one bucket with two prefixes? Well, after much research I’ve not found a right or wrong answer for this. There’s no difference in cost, performance or availability as long as all objects are stored in the same AWS region.

I chose two buckets because I find it easier to manage multiple buckets with flat structures and small bucket policies, as opposed to single buckets with deep structures and large bucket policies.

The truest answer is ‘it depends’, as other factors can come into play like:

  • Data Sovereignty: S3 bucket prefixes exist in the same region as the parent bucket. Regulations like GDPR and CCPA may require using separate buckets in order to isolate data within designated locations.

AWS SNS

I previously had two standard SNS Topics:

  • wordpress-api-raw for Lambda function alerts
  • failure-lambda for Lambda Destination alerts.

Firstly, there’s now an additional failure-stepfunction topic for any state machine failures.

Secondly, I’ve replaced my wordpress-api-raw topic with a data-lakehouse-raw topic to simplify my alerting channels and allow resource reuse. I’ve also created a new data-lakehouse-bronze topic for bronze process alerts.

Why two data topics? Well, different teams and services care about different things. A bronze-level failure may only concern the Data Engineering team as no other teams consume the data. Conversely, a gold-level failure will concern the AI and MI teams as it impacts their models and reports. Having separate SNS topics for each layer type enables granular monitoring controls.

AWS Parameter Store

Finally, Parameter Store needs the new S3 bucket name and SNS ARNs. I’ve replaced the /sns/pipeline/wordpressapi/raw parameter with /sns/data/lakehouse/raw to preserve the name schema.

I’m now storing five parameters:

  • 2x S3 Bucket names (Raw and Bronze)
  • 2x SNS Topic ARNs (Raw and Bronze notifications)
  • WordPress API Endpoints (unchanged)

Architectural Diagram

There are two diagrams this time! Firstly, here is the data_wordpressapi_bronze AWS Lambda function:

Where:

  1. AWS Lambda calls Parameter Store for S3 and SNS parameters. Parameter Store returns these to AWS Lambda.
  2. Lambda function gets raw WordPress JSON data from S3 Raw Bucket.
  3. Lambda function transforms the raw WordPress JSON data to bronze WordPress Parquet data and puts the new object in the S3 Bronze Bucket.

Meanwhile, Lambda is writing to a CloudWatch Log Group throughout its invocation. If there’s a failure, the Lambda function publishes a message to an SNS topic. SNS then delivers this message to the user’s subscribed email address.

Next, this is the AWS Step Functions WordPress bronze orchestration process:

Where:

  1. EventBridge Schedule invokes the State Machine.
  2. State Machine invokes the Raw Lambda function.
  3. State Machine invokes the Bronze Lambda function.

The State Machine also has its own logging and alerting channels.

Python

In this section, I work on my raw and bronze Python scripts for the WordPress pipeline orchestration process.

Raw Script Updates

I try to update my existing resources when I find something pertinent online. My latest find was this Indently video that covers, amongst other things, type annotations:

So how are type annotations different from type hints? Type annotations were released in 2006 and aimed to standardize function parameters and return value annotation. Type hints (released in 2014) then added updated definitions and conventions to enrich type annotations further.

The type hints PEP shows this difference between the two:

When used in a type hint, the expression None is considered equivalent to type(None)

https://peps.python.org/pep-0484/#using-none

So in this function:

Python
def send_email(name: str, message: str) -> None:
  • name: str is an example of type annotation because the parameter name is of type string.
  • -> None is an example of a type hint because although None isn’t a type, it confirms that the function has no output.

So what’s changed in my raw script?

Updated Import & Functions

Let’s open with a new import:

Python
from botocore.client import BaseClient

BaseClient serves as a foundational base class for AWS service clients within botocore – a low-level library providing the core functionality of boto3 (the AWS Python SDK) and the AWS CLI.

I’m using it here to add type annotations to my boto3 clients. For example, send_sns_message already had these annotations:

Python
def send_sns_message(sns_client, topic_arn: str, subject:str, message: str):

I’ve now annotated sns_client with BaseClient to indicate its boto3 relation. I’ve also added a -> None type hint to confirm the function has no output:

Python
def send_sns_message(sns_client: BaseClient, topic_arn: str, subject:str, message: str) -> None:

Elsewhere, I’ve added the BaseClient annotation to get_parameter_from_ssm‘s ssm_client parameter:

Python
def get_parameter_from_ssm(ssm_client: BaseClient, parameter_name: str) -> str:

And put_s3_object‘s s3_client parameter:

Python
def put_s3_object(s3_client: BaseClient, bucket: str, prefix:str, name: str, json_data: str, suffix: str) -> bool:

put_s3_object also has new prefix and suffix parameters. Before this, it was hard-coded to create JSON objects in a wordpress-api S3 prefix:

Python
    try:
        logging.info(f"Attempting to put {name} data in {bucket} bucket...")
        s3_client.put_object(
            Body = json_data,
            Bucket = bucket,
            Key = f"wordpress-api/{name}.json"
        )

Not any more! The S3 prefix and object suffix can now be changed dynamically:

Python
    try:
        logging.info(f"Attempting to put {name} data in {bucket} bucket's {prefix}/{name} prefix...")
        s3_client.put_object(
            Body = json_data,
            Bucket = bucket,
            Key = f"{prefix}/{name}/{name}.{suffix}"
        )

This improves put_s3_object‘s reusability as I can now pass any prefix and suffix to it during a function call. For example, this call creates a JSON object:

Python
ok = put_s3_object(client_s3, s3_bucket, data_source, object_name, api_json_string, 'json')

While this creates a CSV object:

Python
ok = put_s3_object(client_s3, s3_bucket, data_source, object_name, api_json_string, 'csv')

Likewise, this creates a TXT object:

Python
ok = put_s3_object(client_s3, s3_bucket, data_source, object_name, api_json_string, 'txt')

I can also set data_source (which I’ll cover shortly) to any S3 prefix, giving total control over where the object is stored.

Updated Variables

Next, some of my variables need to change. My SNS parameter name needs updating from:

Python
# AWS Parameter Store Names
parametername_s3bucket = '/s3/lakehouse/name/raw'
parametername_snstopic = '/sns/pipeline/wordpressapi/raw'
parametername_wordpressapi = '/wordpress/amazonwebshark/api/mysqlendpoints'

To:

Python
# AWS Parameter Store Names
parametername_s3bucket = '/s3/lakehouse/name/raw'
parametername_snstopic = '/sns/data/lakehouse/raw'
parametername_wordpressapi = '/wordpress/amazonwebshark/api/mysqlendpoints'

I also need to lay the groundwork for put_s3_object‘s new prefix parameter. I used to have a lambdaname variable that was used in the logs:

Python
# Lambda name for messages
lambdaname = 'data_wordpressapi_raw'

I’ve replaced this with two new variables. data_source records the data’s origin, which matches my S3 prefix naming schema. function_name then adds context to data_source to match my Lambda function naming schema:

Python
# Lambda name for messages
data_source = 'wordpress_api'
function_name = f'data_{data_source}_raw'

data_source is then passed to the put_s3_object function call when creating raw objects:

Python
ok = put_s3_object(client_s3, s3_bucket, data_source, object_name, api_json_string)

While function_name is used in the logs when referring to the Lambda function:

Python
    # Check an S3 bucket has been returned.
    if not s3_bucket_raw:
        message = f"{function_name}: No S3 Raw bucket returned."
        subject = f"{function_name}: Failed"

Updated Script Body

My variables all now have type annotations. They’ve gone from:

Python
    # AWS Parameter Store Names
    parametername_s3bucket = '/s3/lakehouse/name/raw'
    parametername_snstopic = '/sns/data/lakehouse/raw'
    parametername_wordpressapi = '/wordpress/amazonwebshark/api/mysqlendpoints'

    # Lambda name for messages
    data_source = 'wordpress_api'
    function_name = f'data_{data_source}_raw'

    # Counters
    api_call_timeout = 30
    endpoint_count_all = 0
    endpoint_count_failure = 0
    endpoint_count_success = 0

To:

Python
    # AWS Parameter Store Names
    parametername_s3bucket: str = '/s3/lakehouse/name/raw'
    parametername_snstopic: str = '/sns/data/lakehouse/raw'
    parametername_wordpressapi: str = '/wordpress/amazonwebshark/api/mysqlendpoints'

    # Lambda name for messages
    data_source: str = 'wordpress_api'
    function_name: str = f'data_{data_source}_raw'

    # Counters
    api_call_timeout: int = 30
    endpoint_count_all: int = 0
    endpoint_count_failure: int = 0
    endpoint_count_success: int = 0

This is helpful when the variables are passed in from settings files or external services and are not immediately apparent. So a good habit to get into!

Bronze Script

Now let’s talk about the new script, which transforms raw S3 JSON objects into bronze S3 Parquet objects. Both raw and bronze WordPress scripts will then feed into an AWS orchestration workflow.

Reused Raw Functions

The following functions are re-used from the Raw script with no changes:

Get Filename Function

Here, I want to get each S3 path’s object name. The object name has some important uses:

  • Using it instead of the full S3 path makes the logs easier to read and cheaper to store.
  • Using it during bronze S3 object creation ensures consistent naming.

A typical S3 path has the schema s3://bucket/prefix/object.suffix, from which I want object.

This function is a remake of the raw script’s Get Filename function. This time, the source string is an S3 path instead of an API endpoint:

I define a get_objectname_from_s3_path function, which expects a path argument with a string type hint and returns a new string.

Firstly, my name_full variable uses the rsplit method to capture the substring I need, using forward slashes as separators. This converts s3://bucket/prefix/object.suffix to object.suffix.

Next, my name_full_last_period_index variable uses the rfind method to find the last occurrence of the period character in the name_full string.

Finally, my name_partial variable uses slicing to extract a substring from the beginning of the name_full string up to (but not including) the index specified by name_full_last_period_index. This converts object.suffix to object.

If the function cannot return a string, an exception is logged and a blank string is returned instead.

Get Data Function

Next, I want to read data from an S3 JSON object in my Raw bucket and store it in a pandas DataFrame.

Here, I define a get_data_from_s3_object function that returns a pandas DataFrame and expects three arguments:

  • boto3_session: the authenticated session to use with a BaseClient type hint.
  • s3_object: the S3 object path with a string type hint.
  • name: the S3 object name with a string type hint (used for logging).

This function uses AWS SDK For pandas s3.read_json to read the data from the S3 object path using the existing boto3_session authentication.

If data is found then get_data_from_s3_object returns a populated DataFrame. Otherwise, an empty DataFrame is returned instead.

Put Data Function

Finally, I want to convert the DataFrame to Parquet and store it in my bronze S3 bucket.

I define a put_s3_parquet_object function that expects four arguments:

  • df: the pandas DataFrame containing the raw data.
  • name: the S3 object name.
  • s3_object_bronze: the S3 path for the new bronze object
  • session: the authenticated boto3 session to use.

I give string type hints to the name and s3_object_bronze parameters. session gets the same BaseClient hint as before, and df is identified as a pandas DataFrame.

I open a try except block that uses s3.to_parquet with the existing boto3_session to upload the DataFrame data to S3 as a Parquet object. If this operation succeeds, the function returns True. If it fails, a botocore exception is logged and the function returns False.

Imports & Variables

The bronze script has two new imports to examine: awswrangler and pandas:

Python
import logging
import boto3
import botocore
import awswrangler as wr
import pandas as pd
from botocore.client import BaseClient

I’ve used both before. Here, pandas handles my in-memory data storage and awswrangler handles my S3 interactions.

There are also parameter changes. I’ve added Parameter Store names for both the bronze S3 bucket and the SNS topic. I’ve kept the raw S3 bucket parameter as awswrangler needs it for the get_data_from_s3_object function.

Python
parametername_s3bucket_raw: str = '/s3/lakehouse/name/raw'
parametername_s3bucket_bronze: str = '/s3/lakehouse/name/bronze'
parametername_snstopic: str = '/sns/data/lakehouse/bronze'

I’ve also swapped out _raw for _bronze in function_name, and renamed the counters from endpoint_count to object_count to reflect their new function:

Python
    # Lambda name for messages
    data_source: str = 'wordpress_api'
    function_name: str = f'data_{data_source}_bronze'

    # Counters
    object_count_all: int = 0
    object_count_failure: int = 0
    object_count_success: int = 0

Script Body

Most of the bronze script is reused from the raw script. Tasks like logging config, name parsing and validation checks only needed the updated parameters! There are some changes though, as S3 is now my data source and I’m also doing additional tasks.

Firstly, I need to get the raw S3 objects. The AWS SDK For pandas S3 class has a list_objects function which is purpose-built for this:

Python
s3_objects_raw = wr.s3.list_objects(
      path = f's3://{s3_bucket_raw}/{data_source}',
      suffix = 'json',
      boto3_session = session)
  • path is the S3 location to list – in this case the raw S3 bucket’s wordpress_api prefix.
  • suffix filters the list by the specified suffix.
  • boto3_session specifies my existing boto3_session to prevent unnecessary re-authentication.

During the loop, my script checks if the pandas DataFrame returned from get_data_from_s3_object contains data. If it’s empty then the loop ends, otherwise the column and row counts are logged:

Python
if df.empty:
  logging.warning(f"{object_name} DataFrame is empty!")
  endpoint_count_failure += 1
  continue
  
logging.info(f'{object_name} DataFrame has {len(df.columns)} columns and {len(df)} rows.')

Assuming all checks succeed, I want to put a new Parquet object into my bronze S3 bucket. AWS SDK For pandas has an s3.to_parquet function that does this using a pandas DataFrame and an S3 path.

I already have the DataFrame so let’s make the path. This is done by the s3_object_bronze parameter, which joins existing parameters with additional characters. This is then passed to put_s3_parquet_object:

Python
s3_object_bronze = f's3://{s3_bucket_bronze}/{data_source}/{object_name}/{object_name}.parquet'

logging.info(f"Attempting {object_name} S3 Bronze upload...")
ok = put_s3_parquet_object(df, object_name, s3_object_bronze, session)

That’s the bronze script done. Now to deploy it to AWS Lambda.

Lambda

In this section, I configure and deploy my Bronze Lambda function.

Hitting Size Limits

So, remember when I said that I expected my future Lambda deployments to improve? Well, this was the result of my retrying the virtual environment deployment process the Raw Lambda used:

2024 03 08 LambdaError

While my zipped raw function is 19.1 MB, my zipped bronze function is over five times bigger at 101.6 MB! My poorly optimised package wouldn’t cut it this time, so I prepared for some pruning. Until I discovered something…

Using A Layer

There’s a managed AWS SDK for pandas Lambda layer!

2024 03 05 LayerAWSSDKPandas

It can be selected in the Lambda console or programmatically called from this list of AWS SDK for pandas Managed Layer ARNs, which covers:

  • All AWS commercial regions.
  • All Python versions currently supported by Lambda (currently 3.8+)
  • Both Lambda architectures.

Additionally, the Lambda Python 3.12 runtime includes boto3 and botocore. So by using this runtime and the managed layer, I’ve gone from a large deployment package to no deployment package! And because my function is now basically just code, I can view and edit that code in the Lambda console directly.

Lambda Config

My Bronze Lambda function borrows several config settings from the raw one, including:

Where it differs is the IAM setup. I needed additional permissions anyway because this function is reading from two S3 buckets now, but by the time I was done the policy was hard to read, maintain and troubleshoot:

JSON
{
    "Version": "2012-10-17",
    "Statement": [
        {
            "Sid": "VisualEditor0",
            "Effect": "Allow",
            "Action": [
                "s3:PutObject",
                "s3:GetObject",
                "sns:Publish",
                "s3:ListBucket",
                "logs:CreateLogGroup"
            ],
            "Resource": [
                "arn:aws:s3:::data-lakehouse-raw/wordpress_api/*",
                "arn:aws:s3:::data-lakehouse-bronze/wordpress_api/*",
                "arn:aws:s3:::data-lakehouse-raw",
                "arn:aws:s3:::data-lakehouse-bronze",
                "arn:aws:logs:eu-west-1:REDACTED:*",
                "arn:aws:sns:eu-west-1:REDACTED:data-lakehouse-raw",
                "arn:aws:sns:eu-west-1:REDACTED:data-lakehouse-bronze"
            ]
        },
        {
            "Sid": "VisualEditor1",
            "Effect": "Allow",
            "Action": [
                "logs:CreateLogStream",
                "logs:PutLogEvents",
                "ssm:GetParameter"
            ],
            "Resource": [
                "arn:aws:logs:eu-west-1:REDACTED:log-group:/aws/lambda/data_wordpressapi_bronze:*",
                "arn:aws:ssm:eu-west-1:REDACTED:parameter/s3/lakehouse/name/raw",
                "arn:aws:ssm:eu-west-1:REDACTED:parameter/s3/lakehouse/name/bronze",
                "arn:aws:ssm:eu-west-1:REDACTED:parameter/sns/data/lakehouse/raw",
                "arn:aws:ssm:eu-west-1:REDACTED:parameter/sns/data/lakehouse/bronze"
            ]
        }
    ]
}

So let’s refactor it! The below policy has the same actions, grouped by service and with appropriately named Sids:

JSON
{
    "Version": "2012-10-17",
    "Statement": [
        {
            "Sid": "CloudWatchLogGroupActions",
            "Effect": "Allow",
            "Action": [
                "logs:CreateLogGroup"
            ],
            "Resource": [
                "arn:aws:logs:eu-west-1:REDACTED:*"
            ]
        },
        {
            "Sid": "CloudWatchLogStreamActions",
            "Effect": "Allow",
            "Action": [
                "logs:CreateLogStream",
                "logs:PutLogEvents"
            ],
            "Resource": [
                "arn:aws:logs:eu-west-1:REDACTED:log-group:/aws/lambda/data_wordpressapi_bronze:*"
            ]
        },
        {
            "Sid": "S3BucketActions",
            "Effect": "Allow",
            "Action": [
                "s3:ListBucket"
            ],
            "Resource": [
                "arn:aws:s3:::data-lakehouse-raw",
                "arn:aws:s3:::data-lakehouse-bronze"
            ]
        },
        {
            "Sid": "S3ObjectActions",
            "Effect": "Allow",
            "Action": [
                "s3:PutObject",
                "s3:GetObject"
            ],
            "Resource": [
                "arn:aws:s3:::data-lakehouse-raw/wordpress_api/*",
                "arn:aws:s3:::data-lakehouse-bronze/wordpress_api/*"
            ]
        },
        {
            "Sid": "SNSActions",
            "Effect": "Allow",
            "Action": [
                "sns:Publish"
            ],
            "Resource": [
                "arn:aws:sns:eu-west-1:REDACTED:data-lakehouse-bronze"
            ]
        },
        {
            "Sid": "ParameterStoreActions",
            "Effect": "Allow",
            "Action": [
                "ssm:GetParameter"
            ],
            "Resource": [
                "arn:aws:ssm:eu-west-1:REDACTED:parameter/s3/lakehouse/name/raw",
                "arn:aws:ssm:eu-west-1:REDACTED:parameter/s3/lakehouse/name/bronze",
                "arn:aws:ssm:eu-west-1:REDACTED:parameter/sns/data/lakehouse/bronze"
            ]
        }
    ]
}

Much better! This policy is now far easier to read and update.

There’s also a clear distinction between the bucket-level s3:ListBucket operation and the object-level s3:PutObject and s3:GetObject operations now. Getting these wrong can have big consequences, so the clearer the better!

One deployment and test later, and I have some new S3 objects!

[INFO]: WordPress API Bronze process complete: 5 Successful | 0 Failed.

REPORT RequestId: 899d1658-f7de-4e74-8d64-b4f029fe2bec	Duration: 7108.50 ms	Billed Duration: 7109 ms	Memory Size: 250 MB	Max Memory Used: 250 MB	Init Duration: 4747.38 ms

So now I have two Lambda functions with some requirements around them:

  • They need to run sequentially.
  • The Raw Lambda must finish before the Bronze Lambda starts.
  • If the Raw Lambda fails then the Bronze Lambda shouldn’t run at all.

Now that AWS Lambda is creating WordPress raw and bronze objects, it’s time to start thinking about orchestration!

Step Functions & EventBridge

In this section, I create both an AWS Step Functions State Machine and an Amazon EventBridge Schedule for my WordPress bronze orchestration process.

State Machine Requirements

Before writing any code, let’s outline the steps I need the state machine to perform:

  1. data_wordpressapi_raw Lambda function is invoked. If it succeeds then move to the next step. If it fails then send a notification and end the workflow reporting failure.
  2. data_wordpressapi_bronze Lambda function is invoked. If it succeeds then end the workflow reporting success. If it fails then send a notification and end the workflow reporting failure.

With the states defined, it’s time to create the state machine.

State Machine Creation

The following state machine was created using Step Functions Workflow Studio – a low-code visual designer released in 2021, with drag-and-drop functionality that auto-generates code in real-time:

Workflow Studio produced this section’s code and diagrams.

Firstly I create a data_wordpressapi_raw task state to invoke my Raw Lambda. This task uses the lambda:invoke action to invoke my data_wordpressapi_raw function. I set the next state as data_wordpressapi_bronze and add a Catch block that sends all errors to a PublishFailure state (which I’ll define later):

JSON
    "data_wordpressapi_raw": {
      "Type": "Task",
      "Resource": "arn:aws:states:::lambda:invoke",
      "Parameters": {
        "Payload.$": "$",
        "FunctionName": "arn:aws:lambda:eu-west-1:REDACTED:function:data_wordpressapi_raw:$LATEST"
      },
      "Next": "data_wordpressapi_bronze",
      "Catch": [
        {
          "ErrorEquals": [
            "States.ALL"
          ],
          "ResultPath": "$.Error",
          "Next": "PublishFailure"
        }
      ],
      "TimeoutSeconds": 120
    }

Note the TimeoutSeconds parameter. All my task states will have 120-second timeouts. These stop the state machine from waiting indefinitely if the task becomes unresponsive, and are recommended best practice. Also note that state machines wait for Lambda invocations to finish by default, so no additional config is needed for this.

Next, I create a data_wordpressapi_bronze task state to invoke my Bronze Lambda. This task uses the lambda:invoke action to invoke my data_wordpressapi_bronze function. I then add a Catch block that sends all errors to a PublishFailure state.

Finally, "End": true designates this state as a terminal state which ends the execution if the task is successful:

JSON
    "data_wordpressapi_bronze": {
      "Type": "Task",
      "Resource": "arn:aws:states:::lambda:invoke",
      "Parameters": {
        "Payload.$": "$",
        "FunctionName": "arn:aws:lambda:eu-west-1:973122011240:function:data_wordpressapi_bronze:$LATEST"
      },
      "Catch": [
        {
          "ErrorEquals": [
            "States.ALL"
          ],
          "ResultPath": "$.Error",
          "Next": "PublishFailure"
        }
      ],
      "TimeoutSeconds": 120,
      "End": true
    }

Finally, I create a PublishFailure task state that publishes failure notifications. This task uses the sns:Publish action to publish a simple message to the failure-stepfunction SNS Topic ARN. "End": true marks this task as the other potential way the state machine execution can end:

JSON
    "PublishFailure": {
      "Type": "Task",
      "Resource": "arn:aws:states:::sns:publish",
      "Parameters": {
        "TopicArn": "arn:aws:sns:eu-west-1:REDACTED:failure-stepfunction",
        "Message": "An error occurred in the state machine: { \"error\": \"$.Error\" }"
      },
      "End": true,
      "TimeoutSeconds": 120
    }

While both Lambdas already have SNS alerting, the state machine itself may also fail so the added observability is justified. This Marcia Villalba video was very helpful here:

And that’s everything I need! At this point Wordflow Studio gives me two things – firstly the state machine’s code, which I’ve committed to GitHub. And secondly this handy downloadable diagram:

stepfunctions graph

State Machine Config

It’s now time to think about security and monitoring.

When new state machines are created in the AWS Step Functions console, an IAM Role is created with policies based on the state machine’s resources. The nuances and templates are covered in the Step Functions Developer Guide, so let’s examine my WordPress_Raw_To_Bronze state machine’s auto-generated IAM Role consisting of two policies:

JSON
{
    "Version": "2012-10-17",
    "Statement": [
        {
            "Effect": "Allow",
            "Action": [
                "xray:PutTraceSegments",
                "xray:PutTelemetryRecords",
                "xray:GetSamplingRules",
                "xray:GetSamplingTargets"
            ],
            "Resource": [
                "*"
            ]
        }
    ]
}

This supports the AWS X-Ray integration with AWS Step Functions. If X-Ray trancing is never enabled then this policy is unused.

Besides X-Ray tracing, there is also an option to log a state machine’s execution history to CloudWatch Logs. There are three log levels available plus a fourth default choice: OFF. Each state machine retains recent execution history and I’ve got no need to keep that history long-term, so I leave the log retention disabled. Remember – CloudWatch Logs is only free for the first 5GB!

State Machine Testing

There are various ways to test a state machine. There’s a testing and debugging section in the developer guide that goes into further details, the three main options being:

I’ll focus on console testing here.

Both individual states and the entire state machine can be tested in the console. Each state can be tested in isolation (using the TestState API under the hood) with customisable inputs and IAM roles. This is great for checking the state outputs are correct, and that the attached IAM role is sufficient.

The state machine itself can also be tested via on-demand execution. The Execution Details page shows the state machine’s statistics and events, and has great coverage in the developer guide.

During testing, my WordPress_Raw_To_Bronze state machine returned this error:

States.Runtime in step: data_wordpressapi_bronze.

An error occurred while executing the state 'data_wordpressapi_bronze' (entered at the event id #7). Unable to apply Path transformation to null or empty input.

This turned out to be a problem with the OutputPath parameter, which Wordflow Studio enables by default:

2024 03 04 StepFunctionsOutPutPath

I’m not using this setting for anything, so I disabled it to solve this problem.

Eventbridge Schedule

Finally, I want to automate the execution of my state machine. This calls for an EventBridge Schedule!

EventBridge makes this quite simple, using mostly the same process as last time. The Step Functions StartExecution operation is a templated target like Lambda’s Invoke operation, so it’s a case of selecting the WordPress_Raw_To_Bronze state machine from the list and updating the schedule’s IAM role accordingly.

And that’s it! EventBridge now executes the state machine at 07:00 each morning. The state machine then sequentially invokes both Lambda functions and catches any errors.

Costs

In this section, I’ll examine my recent AWS WordPress bronze orchestration process costs.

Let’s start with Step Functions. There are two kinds of Step Function workflow:

  • Standard workflows are charged based on the number of state transitions. These are counted each time a workflow step is executed. The first 4000 transitions each month are free. After that, every 1000 transitions cost $0.025.
  • Express workflows are priced by the number of executions, duration, and memory consumption. The specifics of these criteria, coupled with full details of all charges are on the Step Functions pricing page.

I’m using standard workflows, and as of 26 March I’ve used 118 state transitions. In other words, free! Elsewhere, my costs are broadly on par with previous months. These are my S3 costs from 2024-02-01 to 2024-03-26:

S3 ActionsMonthUsageCost
PUT, COPY, POST, or LIST requests2024-0264,1960.32
PUT, COPY, POST, or LIST requests2024-0317,5660.09
GET and all other requests2024-02101,4620.04
GET and all other requests2024-038,6560.00
GB month of storage used2024-020.1090.00
GB month of storage used2024-030.1610.00

And this is my recent free tier usage from 2024-02-01 to 2024-03-26:

ServiceMonthUsage
EventBridge2024-0231 Invocations
EventBridge2024-0325 Invocations
Lambda2024-02122.563 Second Compute
Lambda2024-0284 Requests
Lambda2024-0382.376 Second Compute
Lambda2024-0358 Requests
Parameter Store2024-0234 API Requests
Parameter Store2024-0325 API Requests
SNS2024-028 Email-JSON Notifications
SNS2024-02438 API Requests
SNS2024-033 Email-JSON Notifications
SNS2024-03205 API Requests

So my only costs are still for storage.

Resources

The following items have been checked into the amazonwebshark GitHub repo for the AWS WordPress bronze orchestration process, available via the button below:

  • Updated data_wordpressapi_raw Python script & requirements.txt file.
  • New data_wordpressapi_bronze Python script & requirements.txt file.
  • WordPress_Raw_To_Bronze state machine JSON.
GitHub-BannerSmall

Summary

In this post, I created my WordPress pipeline’s bronze data orchestration process using AWS Lambda layers and AWS Step Functions.

I’ve wanted to try Step Functions out for a while, and all things considered they’re great! Workflow Studio is easy to use, and the templates and tutorials undoubtedly highlight the value that Step Functions can bring.

Additionally, the integration with both EventBridge Scheduler and other AWS services makes Step Functions a compelling orchestration service for both my ongoing WordPress bronze work and the future projects in my pipeline. This combined with some extra Lambda layers will reduce my future dev and test time.

If this post has been useful then the button below has links for contact, socials, projects and sessions:

SharkLinkButton 1

Thanks for reading ~~^~~