Categories
Data & Analytics

Python Data Validation And Observability As Code With Pydantic

In this post, I use the Pydantic Python library to create data validation and observability processes for my Project Wolfie iTunes data.

Table of Contents

Introduction

Data validation is a crucial component of any data project. It ensures that data is accurate, consistent and reliable. It verifies that data meets set criteria and rules to maintain its quality, and stops erroneous or unreliable information from entering downstream systems. I’ve written about it, scripted it and talked about it.

Validation will be a crucial aspect of Project Wolfie. It is an ongoing process that should occur from data ingestion to exposure, and should be automated wherever possible. Thankfully, most data processes within Project Wolfie are (and will be) built using Python, which provides several libraries to simplify data validation. These include Pandera, Great Expectations and the focus of this post – Pydantic (specifically, version 2).

Firstly, I’ll explore the purpose and benefits of Pydantic. Next, I’ll import some iTunes data and use it to explore key Pydantic validation concepts. Finally, I’ll explore how Pydantic handles observability and test its findings. The complete code will be in a GitHub repo.

Let’s begin!

Introducing Pydantic

This section introduces Pydantic and examines some of its benefits.

About Pydantic

Pydantic is an open-source data validation Python library. It uses established Python notation and constructs to define data structures, types and constraints. These can then validate the provided data, generating clear error messages when issues occur.

Pydantic is a widely used tool for managing application settings, validating API requests and responses, and streamlining data transfer between Python objects and formats like JSON. By integrating both existing and custom elements, it offers a powerful and Pythonic method for ensuring data quality and consistency within projects. This makes data handling in Python more reliable and reduces the likelihood of errors through its intuitive definition and validation processes.

Pydantic Benefits

Pydantic’s benefits are thoroughly documented, and the ones I want to highlight here are:

Intuitive: Pydantic’s use of type hints, functions and classes fits well with my current Python skill level, so I can focus on learning Pydantic without also having to explore unfamiliar Python concepts.

Fast: Pydantic’s core validation logic is written in Rust, which enables rapid development, testing, and validation. This speed has contributed towards…

Well-Supported: Pydantic has extensive community use and support from organisations like Anthropic, Netflix and OpenAI, as well as popular Python libraries like Airflow, FastAPI and LangChain. It also has extensive AWS Lambda support via user-configurable artefacts and the community-managed Powertools for AWS Lambda (Python)‘s Parser utility.

Preparation

Before I can start using Pydantic, I need some data. This section examines the data I am using and how I prepare it for Pydantic.

iTunes Data

Firstly, let’s extract some data from iTunes. I create iTunes Export files using the iTunes > Export Playlist command. Apple has documented this, but WikiHow’s documentation is more illustrative. The export file type choices are…interesting. The one closest to matching my needs is the txt format, although the files are technically tab-separated files (TSVs).

iTunes Exports contain many metadata columns. I’m not including them all here (after all, this is a Pydantic post not an iTunes one), but I will be using the following subset (using my existing metadata definitions):

Metadata TypeColumn NameData TypePurpose
TechnicalAlbumStringTrack key as Camelot Notation*
TechnicalLocationStringTrack file path
TechnicalTrack NumberIntegerTrack BPM*
DescriptiveArtistStringTrack artist(s)
DescriptiveGenreStringTrack genre
DescriptiveNameStringTrack name and mix
DescriptiveWorkStringPublishing record label
DescriptiveYearIntegerTrack release year
InteractionMy RatingIntegerTrack personal rating

Note that the starred Album and Track Number columns have purposes that differ from the column names. The reasons for this are…not ideal.

  • Track Number contains BPM data as, although iTunes does have a BPM column, it isn’t included in the exports. And the exports can’t be customised! To include BPMs in an export, I had to repurpose an existing column.

Great. But that’s not as bad as…

  • Album contains musical keys, as iTunes doesn’t even have a key column, despite MP3s having a native Initial Key metadata field! Approaches to dealing with this vary – I chose to use another donor column. I’ll explain Camelot Notations later on.

That’s enough about the iTunes data for now – I’ll go into more detail in future Project Wolfie posts. Now let’s focus on getting this data into memory for Python.

Data Capture

Next, let’s get the iTunes data into memory. Starting with a familiar library…

pandas

I’ll be using pandas to ingest the iTunes data. This is a well-established and widely supported module. It also has its own data validation functions and will assist with issues like handling spaces in column names.

While iTunes files aren’t CSVs, the pandas read_csv function can still read their data into a DataFrame. It needs some help though – the delimiter parameter must be \t to identify the tabs’ delimiting status.

So let’s read the iTunes metadata into memory and…

Python
df = pd.read_csv(csv_path, delimiter='\t')

>> UnicodeDecodeError: 'utf-8' codec can't decode byte 0xff in position 0: invalid start byte

Oh. pandas can’t read the file. The error says it’s trying the utf-8 codec, so the export must be using something else. Fortunately, there’s another Python library that can help!

charset_normalizer

charset_normalizer is an open-source encoding detector. It determines the encoding of a file or text and records the result. It’s related to the older chardet library but is faster, has a more permissive MIT license and supports more encodings.

Here, I’m using charset_normalizer.detect in a detect_file_encoding function to detect the export’s codec:

Python
def detect_file_encoding(file_path: Path) -> str:
    with open(file_path, 'rb') as file:
        raw_data = file.read()
        
    detection_result = charset_normalizer.detect(raw_data)
    return detection_result['encoding'] or 'utf-8'

In which:

  • I define a detect_file_encoding function that expects a filepath and returns a string.
  • detect_file_encoding opens the file, reads the data and stores it as raw_data.
  • charset_normalizer detects raw_data‘s codec and stores this as detection_result.
  • detect_file_encoding returns either the successfully detected codec, or the common utf-8 codec if the attempt fails.

I can then pass the export’s filepath to the detect_file_encoding function, capture the results as encoding and pass this as a parameter to pandas.read_csv:

Python
encoding = detect_file_encoding(csv_path)
    
df = pd.read_csv(csv_path, encoding=encoding, delimiter='\t')

>> Loaded 4407 rows

There’s one more action to take before moving on. Some columns contain spaces. This will become a problem as spaces are not allowed in Python identifiers!

As the data is now in a pandas DataFrame, I can use pandas.DataFrame.rename to remove these spaces:

Python
df = df.rename(columns={
        'Track Number': 'TrackNumber',
        'My Rating': 'MyRating'
    })

The metadata is now ready for Pydantic.

Installing Pydantic

Finally, let’s install Pydantic. This process is fully documented. My preferred method is via pip install in a local virtual environment:

Python
pip install pydantic

And then importing Pydantic into my script:

Python
import pydantic

Now I can start using Pydantic.

Pydantic Data Models

In this section, I tell Pydantic about my data model and the types of data it should expect for validation.

Introducing BaseModel

At the core of Pydantic is the BaseModel class – used for defining data models. Every Pydantic model inherits from it, and by doing so gains features like type enforcement, automatic data parsing and built-in validation.

By subclassing BaseModel, a schema for the data is defined using standard Python type hints. Pydantic uses these hints to validate and convert input data automatically.

Let’s explore BaseModel by creating a new Track class.

Creating A Track Class

Pydantic supports standard library types like string and integer. This reduces Pydantic’s learning curve and simplifies integration into existing Python processes.

Here are the very beginnings of my Track data model. I have a new Track class inheriting from Pydantic’s BaseModel, and a Name field with string data type:

Python
class Track(BaseModel):
    Name: str

Next, I add a Year field with integer data type:

Python
class Track(BaseModel):
    Name: str
    Year: int

And so on for each field I want to validate with Pydantic:

Python
class Track(BaseModel):
    Name: str
    Artist: str
    Album: str
    Work: str
    Genre: str
    TrackNumber: int
    Year: int
    MyRating: int
    Location: str

Now, if any field is missing or has the wrong type, Pydantic will raise a ValidationError. But there’s far more to Pydantic data types than this…

Defining Special Data Types

Where no standards exist or where validation rules are more complex to determine, Pydantic offers further type coverage. These include:

One of my Track fields will immediately benefit from this:

Python
class Track(BaseModel):
    Location: str

Currently, my Location field validation is highly permissive. It will accept any string. I can improve this using Pydantic’s FilePath data type:

Python
class Track(BaseModel):
    Location: FilePath

Now, Pydantic will check that the given location is a path that exists and links to a valid file. No custom code; no for loops – the FilePath type handles everything for me.

So I now have data type validation in my Pydantic data model. What else can I have?

Pydantic Built-In Validation

This section explores the native data validation features of Pydantic, including field annotation and constraints.

Introducing Field

In Pydantic models, data attributes are typically defined using Python type hints. The Field function enables further customisation like constraints, schema metadata and default values.

While type hints define what kind of data is allowed, Field defines how that data should behave, what happens if it’s missing and how it should be documented. It adds clarity to models and helps Pydantic enforce stricter rules.

Let’s run through some examples.

Custom Schema Metadata

One of the challenges in creating data pipelines is that the data fields can sometimes be unclear or difficult to explain. This can cause confusion and delay when building ETLs, examining repos and interacting with code.

Field helps here by adding custom fields to annotate data within Pydantic classes. Examples include description:

Python
class Track(BaseModel):
    Name: str = Field(
        description="Track's name and mix.")

And examples:

Python
class Track(BaseModel):
    Name: str = Field(
        description="Track's name and mix.",
        examples=["Track Title (Original Mix)", "Track Title (Extended Mix)"])

Using these throughout my Track class simplifies the code and reduces context switching:

Python
class Track(BaseModel):
    Name: str = Field(
        description="Track's name and mix.",
        examples=["Track Title (Original Mix)", "Track Title (Extended Mix)"])
    
    Artist: str = Field(
        description="The artist(s) of the track.",
        examples=["Above & Beyond", "Armin van Buuren"])
    
    Album: str = Field(
        description="Track's Camelot Notation indicating the key.",
        examples=["01A-Abm", "02B-GbM"])
    
    Work: str = Field(
        description="The record label that published the track.",
        examples=["Armada Music", "Anjunabeats"])
    
    Genre: str = Field(
        description="Track's musical genre.",
        examples=["Trance", "Progressive House"])
    
    TrackNumber: int = Field(
        description="Track's BPM (Beats Per Minute).",
        examples=[130, 140])
    
    Year: int = Field(
        description="Track's release year.",
        examples=[1998, 2004])
    
    MyRating: int = Field(
        description="Personal Rating.  Stars expressed as 0, 20, 40, 60, 80, or 100",
        examples=[60, 80])
    
    Location: FilePath = Field(
        description="Track's Location on the filesystem.",
        examples=[r"C:\Users\User\Music\iTunes\TranquilityBase-GettingAway-OriginalMix.mp3"])

This is especially useful for Album and TrackNumber given their unique properties.

Field Constraints

Field can also constrain the data that a class accepts. This includes string constraints:

  • max_length: Maximum length of the string.
  • min_length: Minimum length of the string.
  • pattern: A regular expression that the string must match.

and numeric constraints:

  • ge & le – greater than or equal to/less than or equal to
  • gt & lt – greater/less than
  • multiple_of – multiple of a given number

Constraints can also be combined as needed. For example, iTunes exports record MyRating values in increments of 20, where 1 star is 20 and 2 stars are 40, rising to the maximum 5 stars being 100.

I can express this within the Track class as:

Python
class Track(BaseModel):
    MyRating: int = Field(
        description="Personal Rating.  Stars expressed as 0, 20, 40, 60, 80, or 100",
        examples=[60, 80],
        ge=20,
        le=100,
        multiple_of=20)

Here, MyRating must be greater than or equal to 20 (ge=20), less than or equal to 100 (le=100), and must be a multiple of 20 (multiple_of=20).

I can also parameterise these constraints using variables instead of hard-coded values:

Python
ITUNES_RATING_RAW_LOWEST = 20
ITUNES_RATING_RAW_HIGHEST = 100

class Track(BaseModel):
    MyRating: int = Field(
        description="Personal Rating.  Stars expressed as 0, 20, 40, 60, 80, or 100",
        examples=[60, 80],
        ge=ITUNES_RATING_RAW_LOWEST,
        le=ITUNES_RATING_RAW_HIGHEST,
        multiple_of=20)

This property lets me use Pydantic with other Python libraries. Here, my Year validation checks for years greater than or equal to 1970 and less than or equal to the current year (using the datetime library):

Python
YEAR_EARLIEST = 1970
YEAR_CURRENT = datetime.datetime.now().year

class Track(BaseModel):
    Year: int = Field(
        description="Track's release year.",
        examples=[1998, 2004],
        ge=YEAR_EARLIEST,
        le=YEAR_CURRENT)

No track in the collection should exist beyond the current year – this constraint will now update itself as time passes.

Having applied other constraints, my Track class looks like this:

Python
class Track(BaseModel):
    """Pydantic model for validating iTunes track metadata."""
    
    Name: str = Field(
        description="Track's name and mix type.",
        examples=["Track Title (Original Mix)", "Track Title (Extended Mix)"])
    
    Artist: str = Field(
        description="The artist(s) of the track.",
        examples=["Above & Beyond", "Armin van Buuren"])
    
    Album: str = Field(
        description="Track's Camelot Notation indicating the key.",
        examples=["01A-Abm", "02B-GbM"])
    
    Work: str = Field(
        description="The record label that published the track.",
        examples=["Armada Music", "Anjunabeats"])
    
    Genre: str = Field(
        description="Track's musical genre.",
        examples=["Trance", "Progressive House"])
    
    TrackNumber: int = Field(
        description="Track's BPM (Beats Per Minute).",
        examples=[130, 140],
        ge=BPM_LOWEST,
        le=BPM_HIGHEST)
    
    Year: int = Field(
        description="Track's release year.",
        examples=[1998, 2004],
        ge=YEAR_EARLIEST,
        le=YEAR_CURRENT)
    
    MyRating: int = Field(
        description="Personal Rating. Stars expressed as 0, 20, 40, 60, 80, or 100",
        examples=[60, 80],
        ge=ITUNES_RATING_RAW_LOWEST,
        le=ITUNES_RATING_RAW_HIGHEST,
        multiple_of=20)
    
    Location: FilePath = Field(
        description="Track's Location on the filesystem.",
        examples=[r"C:\Users\User\Music\iTunes\AboveAndBeyond-AloneTonight-OriginalMix.mp3"])

This is already very helpful. Next, let’s examine my custom requirements.

Pydantic Custom Validation

This section discusses how to create custom data validation using Pydantic. I will outline what the requirements are, and then examine how these validations are defined and implemented.

Introducing Decorators

In Python, decorators modify or enhance the behaviour of functions or methods without changing their actual code. Decorators are usually written using the @ symbol followed by the decorator name, just above the function definition:

Python
@my_decorator
def my_function():
    ...

For example, consider this logger_decorator function:

Python
def logger_decorator(func):
    def wrapper():
        print(f"Running {func.__name__}...")
        func()  # Execute the supplied function
        print("Done!")
    return wrapper

This function takes another function (func) as an argument, printing a message before and after execution. If the logger_decorator function is then used as a decorator when running this greet function:

Python
@logger_decorator
def greet():
    print("Hello, world!")

greet()

Python will add the logging behaviour of logger_decorator without modifying greet:

Python
Running greet...
Hello, world!
Done!

Introducing Field Validators

In addition to the built-in data validation capabilities of Pydantic, custom validators with more specific rules can be defined for individual fields using Field Validators. These use the field_validator() decorator, and are declared as class methods within a class inheriting from Pydantic’s BaseModel.

Here’s a basic example using my Track model:

Python
class Track(BaseModel):
    Name: str = Field(
        description="Track's name and mix.",
        examples=["Track Title (Original Mix)", "Track Title (Extended Mix)"]
    )

    @field_validator("Name")
    @classmethod
    def validate_name(cls, value):
        # custom validation logic here
        return value

Where:

  • @field_validator("Name") tells Pydantic to use the function to validate the Name field.
  • @classmethod lets the validator access the Track class (cls).
  • The validator executes the validate_name function with the field value (in this case Name) as input, performs the checks and must either:
    • return the validated value, or
    • raise a ValueError or TypeError if validation fails.

Let’s see this in action.

Null Checks

Firstly, let’s perform a common data validation check by identifying empty fields. I have two variants of this – one for strings and another for numbers.

The first – validate_non_empty_string – uses pandas.isna to catch missing values and strip() to catch empty strings. This field validator applies to the Artist, Work and Genre columns:

Python
    @field_validator("Artist", "Work", "Genre")
    @classmethod
    def validate_non_empty_string(cls, value, info):
        """Validate that a string field is not empty."""
        if pd.isna(value) or str(value).strip() == "":
            raise ValueError(f"{info.field_name} must not be null or empty")
        return value

The second – validate_non_null_numeric – checks the TrackNumber, Year and MyRating numeric columns for empty values using pandas.isna:

Python
    @field_validator("TrackNumber", "Year", "MyRating", mode="before")
    @classmethod
    def validate_non_null_numeric(cls, value, info):
        """Validate that a numeric field is not null."""
        if pd.isna(value):
            raise ValueError(f"{info.field_name} must not be null")
        return value

Also, it uses Pydantic’s before validator (mode="before"), ensuring the data validation happens before Pydantic coerces types. This catches edge cases like "" or "NaN" before they become None or float("nan") values.

Character Check

Now let’s create a validator for something a little more challenging to define. All tracks in my collection follow a Track Name (Mix) schema. This can take many forms:

  • Original track: Getting Away (Original Mix)
  • Remixed track: Shapes (Oliver Smith Remix)
  • Updated remixed track: Distant Planet (Menno de Jong Interpretation) (2020 Remaster)
  • …and many more variants.

But generally, there should be at least one instance of text enclosed by parentheses. However, some tracks have no remixer and are released with just a title:

  • Getting Away
  • Shapes
  • Distant Planet

This not only looks untidy (eww!), but also breaks some of my downstream automation that expects the Track Name (Mix) schema. So any track without a remixer gets (Original Mix) added to the Name field upon download:

  • Getting Away (Original Mix)
  • Shapes (Original Mix)
  • Distant Planet (Original Mix)

Expressing this is possible with RegEx, but I can make a more straightforward and more understandable check with a field validator:

Python
    @field_validator("Name")
    @classmethod
    def validate_name(cls, value):
        if pd.isna(value) or str(value).strip() == "":
            raise ValueError("Name must not be null or empty")
        
        value_str = str(value)
        if '(' not in value_str:
            raise ValueError("Name must contain an opening parenthesis '('")
        if ')' not in value_str:
            raise ValueError("Name must contain a closing parenthesis ')'")
        return value

This validator checks that the value isn’t empty and then performs additional checks for parentheses. This could be one check, but having it as two checks improves log readability (insert foreshadowing – Ed). I could also have added Name to the validate_non_empty_string validation, but this way I have all my Name checks in the same place.

Parameterised Checks

Like constraints, field validators can also be parameterised. Let’s examine Album.

As iTunes exports can’t be customised, I use Album for a track’s Camelot Notation. These are based on the Camelot WheelMixedInKey‘s representation of the Circle Of Fifths. DJs generally favour Camelot Notation as it is simpler than traditional music notation for human understanding and application sorting.

Importantly, there are only twenty-four possible notations:

For example:

  • 1A (A-Flat Minor)
  • 6A (G Minor)
  • 6B (B-Flat Major)
  • 10A (B Minor)

So let’s capture these values in a CAMELOT_NOTATIONS list:

Python
CAMELOT_NOTATIONS = {
    '01A-Abm', '01B-BM', '02A-Ebm', '02B-GbM', '03A-Bbm', '03B-DbM',
    '04A-Fm', '04B-AbM', '05A-Cm', '05B-EbM', '06A-Gm', '06B-BbM',
    '07A-Dm', '07B-FM', '08A-Am', '08B-CM', '09A-Em', '09B-GM',
    '10A-Bm', '10B-DM', '11A-Gbm', '11B-AM', '12A-Dbm', '12B-EM'
}

(Note the leading zeros. Without them, iTunes sorts the Album column as (10, 11, 12, 1, 2, 3…) – you can imagine how I felt about that – Ed)

Next, I pass the CAMELOT_NOTATIONS list to an Album field validator that checks if the given value is in the list:

Python
    @field_validator("Album")
    @classmethod
    def validate_album(cls, value):
        if pd.isna(value) or str(value).strip() == "":
            raise ValueError("Album must not be null or empty")
        
        if str(value) not in CAMELOT_NOTATIONS:
            raise ValueError(f"Album must be a valid Camelot notation: {value} is not in the valid list")
        return value

Pydantic now fails any value not found in the CAMELOT_NOTATIONS list.

Now I have my validation needs fully covered. What observability does Pydantic give me over these data validation checks?

Pydantic Observability

In this section, I assess and adjust the default Pydantic observability abilities to ensure my data validation is accurately recorded.

Default Output

Pydantic automatically generates data validation error messages if validation fails. These detailed messages provide a structured overview of the issues encountered, including:

  • The index of the failing input (e.g., a DataFrame row number).
  • The model class where the error occurred.
  • The field name that failed validation.
  • A human-readable explanation of the issue.
  • The offending input value and its type.
  • A direct link to relevant documentation for further guidance.

Here’s an example of Pydantic’s output when a string field receives a NaN value:

Python
Row 2353: 1 validation error for Track
Work
  Input should be a valid string [type=string_type, input_value=nan, input_type=float]
    For further information visit https://errors.pydantic.dev/2.11/v/string_type

In this example:

  • Row 2353 indicates the problematic input row.
  • Track is the Pydantic model where validation failed.
  • Work is the failing field.
  • Pydantic detects that the input is nan (a float) and not a valid string.
  • Pydantic provides a URL to the string_type documentation.

Here’s another example, this time for a MyRating error:

Python
Row 3040: 1 validation error for Track
MyRating
  Value error, MyRating must not be null [type=value_error, input_value=nan, input_type=float]
    For further information visit https://errors.pydantic.dev/2.11/v/value_error

In this case, a field validator raised a ValueError because MyRating must not be null.

Pydantic’s error reporting is clear and actionable, making it suitable for debugging and systemic data validation tasks. However, for larger datasets or more user-friendly outputs (such as reports or UI feedback), further customisation is helpful, such as…

Terminal Output Customisation

As good as Pydantic’s default output is, it’s not that human-readable. For example, in this Terminal output I have no idea which tracks are on rows 2353, 2495 and 3040:

Plaintext
Row 2353: 1 validation error for Track
Work
  Input should be a valid string [type=string_type, input_value=nan, input_type=float]
    For further information visit https://errors.pydantic.dev/2.11/v/string_type
    
Row 2495: 1 validation error for Track
Work
  Input should be a valid string [type=string_type, input_value=nan, input_type=float]
    For further information visit https://errors.pydantic.dev/2.11/v/string_type
    
Row 3040: 1 validation error for Track
MyRating
  Value error, MyRating must not be null [type=value_error, input_value=nan, input_type=float]
    For further information visit https://errors.pydantic.dev/2.11/v/value_error

While I can find this out, it would be better to know at a glance. Fortunately, I can improve this when capturing the errors by appending the artist and name to each row of the errors object:

Python
except (ValidationError, ValueError) as e:
            artist = row['Artist'] if not pd.isna(row['Artist']) else "Unknown Artist"
            name = row['Name'] if not pd.isna(row['Name']) else "Unknown Name"
            errors.append((index, artist, name, str(e)))

Now, Artist and Name are added to each row:

Plaintext
Row 2353: Ben Stone - Mercure (Extended Mix): 1 validation error for Track
Work
  Input should be a valid string [type=string_type, input_value=nan, input_type=float]
    For further information visit https://errors.pydantic.dev/2.11/v/string_type
    
Row 2495: DJ Hell - My Definition Of House Music (Resistance D Remix): 1 validation error for Track
Work
  Input should be a valid string [type=string_type, input_value=nan, input_type=float]
    For further information visit https://errors.pydantic.dev/2.11/v/string_type
    
Row 3040: York - Reachers Of Civilisation (In Search Of Sunrise Mix): 1 validation error for Track
MyRating
  Value error, MyRating must not be null [type=value_error, input_value=nan, input_type=float]
    For further information visit https://errors.pydantic.dev/2.11/v/value_error

This makes it far easier to find the problematic files in my collection. As long as there aren’t many findings…

Creating An Error File

There are three main problems with Pydantic printing all data validation errors in the Terminal:

  • They don’t persist outside of the Terminal session.
  • The Terminal isn’t that easy to read when it’s full of text.
  • The Terminal may run out of space if there are a large number of errors.

So let’s capture the errors in a file instead. This write_error_report function generates a text-based error report from validation failures, saving it in a logs subfolder adjacent to the input file:

Python
def write_error_report(
    csv_path: Path, 
    field_error_details: Dict[str, List[str]],  
    sorted_fields: List[str]
) -> Path:

    timestamp = datetime.now().strftime("%Y%m%d-%H%M%S")
    logs_dir = csv_path.parent / "logs"
    logs_dir.mkdir(exist_ok=True)
    
    error_output_path = logs_dir / f"{timestamp}-PydanticErrors-{csv_path.stem}.txt"
    
    with open(error_output_path, 'w', encoding='utf-8') as f:
        f.write(f"Validation Error Report - {timestamp}\n")
        f.write("=" * 80 + "\n")

        for field in sorted_fields:
            messages = field_error_details.get(field, [])
            if messages:
                f.write(f"\n{field} Errors ({len(messages)}):\n")
                f.write("-" * 80 + "\n")
                for message in messages:
                    f.write(message + "\n\n")
    
    return error_output_path

Firstly, it constructs a timestamped filename using the original file’s stem (e.g., 20250529-142304-PydanticErrors-data.txt) and the logs subfolder, creating the latter if it doesn’t exist:

Python
    timestamp = datetime.now().strftime("%Y%m%d-%H%M%S")
    logs_dir = csv_path.parent / "logs"
    logs_dir.mkdir(exist_ok=True)
    
    error_output_path = logs_dir / f"{timestamp}-PydanticErrors-{csv_path.stem}.txt"

Next, Python orders the errors by the sorted_fields input, displays error counts per field and formats each error message with clear section dividers. A structured report listing all validation errors by field is saved in the logs subfolder:

Python
    with open(error_output_path, 'w', encoding='utf-8') as f:
        f.write(f"Validation Error Report - {timestamp}\n")
        f.write("=" * 80 + "\n")

        for field in sorted_fields:
            messages = field_error_details.get(field, [])
            if messages:
                f.write(f"\n{field} Errors ({len(messages)}):\n")
                f.write("-" * 80 + "\n")
                for message in messages:
                    f.write(message + "\n\n")

Finally, the filesystem path of the generated report is returned:

Python
    return error_output_path

When executed, the Terminal tells me the error file path:

Plaintext
Detailed error log written to: 20250513-133743-PydanticErrors-iTunes-Elec-Dance-Club-Main.txt

And stores the findings in a local txt file, grouped by error type for simpler readability:

Plaintext
Validation Error Report - 20250513-133743
================================================================================

MyRating Errors (5):
--------------------------------------------------------------------------------
Row 3040: York - Reachers Of Civilisation (In Search Of Sunrise Mix): 1 validation error for Track
MyRating
  Value error, MyRating must not be null [type=value_error, input_value=nan, input_type=float]
    For further information visit https://errors.pydantic.dev/2.11/v/value_error

Work Errors (22):
--------------------------------------------------------------------------------
Row 223: Dave Angel - Artech (Original Mix): 1 validation error for Track
Work
  Input should be a valid string [type=string_type, input_value=nan, input_type=float]
    For further information visit https://errors.pydantic.dev/2.11/v/string_type

Adding A Terminal Summary

Finally, I created a Terminal summary of Pydantic’s findings:

Python
print("\nValidation Summary:\n")
sorted_fields = sorted(Track.model_fields.keys())

for field in sorted_fields:
  count = error_analysis['counts'].get(field, 0)
  print(f"{field} findings: {count}")

This shows feedback after each execution:

Plaintext
Validation Summary:

Album findings: 0
Artist findings: 0
Genre findings: 0
Location findings: 0
MyRating findings: 5
Name findings: 1
TrackNumber findings: 0
Work findings: 22
Year findings: 0

Now, let’s ensure everything works properly!

Testing Pydantic

In this section, I test that my Pydantic data validation and observability processes are working correctly using iTunes export files and pytest unit tests.

Recent File Test

The first test used a recent export from the end of April 2025. Here is the Terminal output:

Plaintext
Processing file: iTunes-Elec-Dance-Club-Main-2025-04-28.txt
Reading iTunes-Elec-Dance-Club-Main-2025-04-28.txt with detected encoding UTF-16
Loaded 4407 rows
Validated 4379 rows
Found 28 errors!

Validation Summary for iTunes-Elec-Dance-Club-Main-2025-04-28.txt:
Album errors: 0
Artist errors: 0
Genre errors: 0
Location errors: 0
MyRating errors: 5
Name errors: 1
TrackNumber errors: 0
Work errors: 22
Year errors: 0

Detailed error log written to: 20250521-164324-PydanticErrors-iTunes-Elec-Dance-Club-Main-2025-04-28.txt

Good first impressions – the 4407 row count matches the export file, the summary is shown in the Terminal and an error log is created. So what’s in the log?

Firstly, five tracks have no MyRating values. For example:

Plaintext
MyRating Errors (5):
--------------------------------------------------------------------------------
Row 558: Reel People Feat Angela Johnson - Can't Stop (Michael Gray Instrumental Remix): 1 validation error for Track
MyRating
  Value error, MyRating must not be null [type=value_error, input_value=nan, input_type=float]
    For further information visit https://errors.pydantic.dev/2.11/v/value_error

This is correct, as this export was created when I added some new tracks to my collection.

Next, one track has a Name issue:

Plaintext
Name Errors (1):
--------------------------------------------------------------------------------
Row 1292: The Prodigy - Firestarter (Original Mix}: 1 validation error for Track
Name
  Value error, Name must contain a closing parenthesis ')' [type=value_error, input_value='Firestarter (Original Mix}', input_type=str]
    For further information visit https://errors.pydantic.dev/2.11/v/value_error

This one confused me at first, until I looked at the error more closely and realised the closing parenthesis is wrong! } is used instead of )! This is why my validate_name field validator has separate checks for each character – it makes it easier to understand the results!

Finally, twenty-two tracks are missing record label metadata in Work:

Plaintext
Work Errors (22):
--------------------------------------------------------------------------------
Row 223: Dave Angel - Artech (Original Mix): 1 validation error for Track
Work
  Input should be a valid string [type=string_type, input_value=nan, input_type=float]
    For further information visit https://errors.pydantic.dev/2.11/v/string_type

This means some tracks are missing full metadata. This won’t break any downstream processes as I have no reliance on this field. That said, it’s good to know about this in case my future needs change.

Older File Test

The next test uses an older file from March 2025. Let’s see what the Terminal says this time…

Plaintext
Processing file: iTunes-AllTunesMaster-2025-03-01.txt      
Reading iTunes-AllTunesMaster-2025-03-01.txt with detected encoding UTF-16
Loaded 4381 rows
Validated 0 rows
Found 4381 errors!

Validation Summary for iTunes-AllTunesMaster-2025-03-01.txt:

Album errors: 0
Artist errors: 0
Genre errors: 0
Location errors: 4381
MyRating errors: 0
Name errors: 1
TrackNumber errors: 2
Work errors: 17
Year errors: 0
Detailed error log written to: 20250521-164322-PydanticErrors-iTunes-AllTunesMaster-2025-03-01.txt

There are fewer rows here – 4381 vs 4407. This is correct, as my collection was smaller in March. But no rows were validated successfully!

I don’t have to go far to find out why:

Plaintext
Location Errors (4381):
--------------------------------------------------------------------------------
Row 0: Ariel - A9 (Original Mix): 1 validation error for Track
Location
  Path does not point to a file [type=path_not_file, input_value='C:\\Users\\User\\Folder...riel-A9-OriginalMix.mp3', input_type=str]

All the location checks failed. But this is actually a successful test!

In the time between these two exports, I reorganised my music collection. As a result, the file paths in this export no longer exist. Remember – the Location field uses the FilePath data type, which checks that the given paths exist and link to valid files. And these don’t!

The Name results are the same as the first test. This has been around for a while apparently…

Plaintext
Name Errors (1):
--------------------------------------------------------------------------------
Row 1292: The Prodigy - Firestarter (Original Mix}: 1 validation error for Track
Name
  Value error, Name must contain a closing parenthesis ')' [type=value_error, input_value='Firestarter (Original Mix}', input_type=str]
    For further information visit https://errors.pydantic.dev/2.11/v/value_error

There are also TrackNumber errors in this export:

Plaintext
TrackNumber Errors (2):
--------------------------------------------------------------------------------
Row 485: Andrew Bayer Feat Alison May - Brick (Original Mix): 2 validation errors for Track
TrackNumber
  Input should be greater than or equal to 100 [type=greater_than_equal, input_value=90, input_type=int]
    For further information visit https://errors.pydantic.dev/2.11/v/greater_than_equal

Two tracks have BPM values lower than the set range. Both files were moved during my reorganisation, but were included in this export at the time and therefore fail this validation check.

Finally, the Work errors are the same as the first test (although more have crept in since!):

Plaintext
Work Errors (17):
--------------------------------------------------------------------------------
Row 223: Dave Angel - Artech (Original Mix): 1 validation error for Track
Work
  Input should be a valid string [type=string_type, input_value=nan, input_type=float]
    For further information visit https://errors.pydantic.dev/2.11/v/string_type

Ultimately, both tests match expectations!

Unit Tests With Amazon Q

Finally, I wanted to include some unit tests for this project. Unit testing is always a good idea, especially in this context where I can verify function outputs and error generation without needing to create numerous test files.

I figured this was a good opportunity to test Amazon Q Developer and see what it came up with. I gave it a fairly basic prompt, using the @workspace context to allow Q access to my project’s entire workspace as context for its responses:

Plaintext
@workspace write unit tests for this script using pytest

I tend to use pytest for my Python testing, as I find it simpler and more flexible than Python’s standard unittest library.

Q promptly provided several reasonable tests in response. This initiated a half-hour exchange between us focused on calibrating the existing tests and creating new ones. To be fair to Q, my initial prompt was quite basic and could have been much more detailed.

Amongst Q’s tests was this one testing an empty Artist field:

Python
    @patch('pathlib.Path.exists')
    def test_empty_artist(self, mock_exists):
        """Test that an empty artist fails validation."""
        # Mock file existence check
        mock_exists.return_value = True
        
        invalid_track_data = {
            "Name": "Test Track (Original Mix)",
            "Artist": "",  # Empty artist
            "Album": "01A-Abm",
            "Work": "Test Label",
            "Genre": "Trance",
            "TrackNumber": 130,
            "Year": 2020,
            "MyRating": 80,
            "Location": "C:\\Music\\test_track.mp3"
        }

This one, checking an invalid Camelot Notation:

Python
@patch('pathlib.Path.exists')
    def test_invalid_album_not_camelot(self, mock_exists):
        """Test that an invalid Camelot notation fails validation."""
        # Mock file existence check
        mock_exists.return_value = True
        
        invalid_track_data = {
            "Name": "Test Track (Original Mix)",
            "Artist": "Test Artist",
            "Album": "Invalid Key",  # Not a valid Camelot notation
            "Work": "Test Label",
            "Genre": "Trance",
            "TrackNumber": 130,
            "Year": 2020,
            "MyRating": 80,
            "Location": "C:\\Music\\test_track.mp3"
        }
        
        with pytest.raises(ValueError, match="Album must be a valid Camelot notation"):
            Track(**invalid_track_data)

And this one, checking what happens with an incomplete DataFrame:

Python
    @patch('wolfie_exportvalidator_itunes.detect_file_encoding')
    @patch('pandas.read_csv')
    def test_load_itunes_data_missing_columns(self, mock_read_csv, mock_detect_encoding):
        """Test loading iTunes data with missing columns."""
        # Setup mocks
        mock_detect_encoding.return_value = 'utf-8'
        mock_df = pd.DataFrame({
            'Name': ['Test Track (Original Mix)'],
            'Artist': ['Test Artist'],
            # Missing required columns
        })
        mock_read_csv.return_value = mock_df
        
        # Call function and verify it raises an error
        with pytest.raises(ValueError, match="Missing expected columns"):
            load_itunes_data(Path('dummy_path.txt'))

I’ll include the whole test suite in my GitHub repo. Let’s conclude with pytest‘s output:

Plaintext
collected 41 items                                                                                                                                        

tests/test_wolfie_exportvalidator_itunes.py::TestTrackModel::test_valid_track PASSED                                                                [  2%] 
tests/test_wolfie_exportvalidator_itunes.py::TestTrackModel::test_valid_track_boundary_values PASSED                                                [  4%] 
tests/test_wolfie_exportvalidator_itunes.py::TestTrackModel::test_invalid_name_no_parentheses PASSED                                                [  7%]
tests/test_wolfie_exportvalidator_itunes.py::TestTrackModel::test_empty_name PASSED                                                                 [  9%] 
tests/test_wolfie_exportvalidator_itunes.py::TestTrackModel::test_empty_artist PASSED                                                               [ 12%] 
tests/test_wolfie_exportvalidator_itunes.py::TestTrackModel::test_empty_work PASSED                                                                 [ 14%] 
tests/test_wolfie_exportvalidator_itunes.py::TestTrackModel::test_empty_genre PASSED                                                                [ 17%] 
tests/test_wolfie_exportvalidator_itunes.py::TestTrackModel::test_invalid_album_not_camelot PASSED                                                  [ 19%] 
tests/test_wolfie_exportvalidator_itunes.py::TestTrackModel::test_valid_camelot_notations PASSED                                                    [ 21%] 
tests/test_wolfie_exportvalidator_itunes.py::TestTrackModel::test_invalid_bpm_range_high PASSED                                                     [ 24%] 
tests/test_wolfie_exportvalidator_itunes.py::TestTrackModel::test_invalid_bpm_range_low PASSED                                                      [ 26%] 
tests/test_wolfie_exportvalidator_itunes.py::TestTrackModel::test_invalid_year_range_early PASSED                                                   [ 29%]
tests/test_wolfie_exportvalidator_itunes.py::TestTrackModel::test_invalid_year_range_future PASSED                                                  [ 31%] 
tests/test_wolfie_exportvalidator_itunes.py::TestTrackModel::test_invalid_rating_not_multiple PASSED                                                [ 34%] 
tests/test_wolfie_exportvalidator_itunes.py::TestTrackModel::test_invalid_rating_too_low PASSED                                                     [ 36%] 
tests/test_wolfie_exportvalidator_itunes.py::TestTrackModel::test_invalid_rating_too_high PASSED                                                    [ 39%] 
tests/test_wolfie_exportvalidator_itunes.py::TestTrackModel::test_null_track_number PASSED                                                          [ 41%] 
tests/test_wolfie_exportvalidator_itunes.py::TestTrackModel::test_null_year PASSED                                                                  [ 43%] 
tests/test_wolfie_exportvalidator_itunes.py::TestTrackModel::test_null_rating PASSED                                                                [ 46%]
tests/test_wolfie_exportvalidator_itunes.py::TestFileOperations::test_detect_file_encoding PASSED                                                   [ 48%] 
tests/test_wolfie_exportvalidator_itunes.py::TestFileOperations::test_detect_file_encoding_latin1 PASSED                                            [ 51%] 
tests/test_wolfie_exportvalidator_itunes.py::TestFileOperations::test_detect_file_encoding_no_result PASSED                                         [ 53%] 
tests/test_wolfie_exportvalidator_itunes.py::TestFileOperations::test_load_itunes_data_success PASSED                                               [ 56%]
tests/test_wolfie_exportvalidator_itunes.py::TestFileOperations::test_load_itunes_data_missing_columns PASSED                                       [ 58%] 
tests/test_wolfie_exportvalidator_itunes.py::TestFileOperations::test_load_itunes_data_empty_dataframe PASSED                                       [ 60%] 
tests/test_wolfie_exportvalidator_itunes.py::TestValidation::test_validate_tracks_all_valid PASSED                                                  [ 63%] 
tests/test_wolfie_exportvalidator_itunes.py::TestValidation::test_validate_tracks_with_errors PASSED                                                [ 65%] 
tests/test_wolfie_exportvalidator_itunes.py::TestValidation::test_duplicate_location PASSED                                                         [ 68%] 
tests/test_wolfie_exportvalidator_itunes.py::TestValidation::test_analyze_errors PASSED                                                             [ 70%] 
tests/test_wolfie_exportvalidator_itunes.py::TestValidation::test_analyze_errors_with_general_error PASSED                                          [ 73%] 
tests/test_wolfie_exportvalidator_itunes.py::TestValidation::test_write_error_report PASSED                                                         [ 75%]
tests/test_wolfie_exportvalidator_itunes.py::TestValidation::test_process_file_with_errors PASSED                                                   [ 78%] 
tests/test_wolfie_exportvalidator_itunes.py::TestValidation::test_process_file_no_errors PASSED                                                     [ 80%]
tests/test_wolfie_exportvalidator_itunes.py::TestParamsData::test_bpm_range_valid PASSED                                                            [ 82%] 
tests/test_wolfie_exportvalidator_itunes.py::TestParamsData::test_year_range_valid PASSED                                                           [ 85%] 
tests/test_wolfie_exportvalidator_itunes.py::TestParamsData::test_rating_range_valid PASSED                                                         [ 87%] 
tests/test_wolfie_exportvalidator_itunes.py::TestParamsData::test_camelot_notations_valid PASSED                                                    [ 90%] 
tests/test_wolfie_exportvalidator_itunes.py::TestMain::test_main_with_files PASSED                                                                  [ 92%] 
tests/test_wolfie_exportvalidator_itunes.py::TestMain::test_main_no_files PASSED                                                                    [ 95%] 
tests/test_wolfie_exportvalidator_itunes.py::TestMain::test_main_with_exception PASSED                                                              [ 97%]
tests/test_wolfie_exportvalidator_itunes.py::TestMain::test_main_with_critical_exception PASSED                                                     [100%] 

=================================================================== 41 passed in 0.20s =================================================================== 

I had a very positive experience overall! Working with Amazon Q allowed me to write the tests more quickly than I could have done on my own. We would have been even faster if I had put more thought into my initial prompt. Additionally, since Q Developer offers a generous free tier, it didn’t cost me anything.

GitHub Repo

I have committed my Pydantic data validation script, test suite and documentation in the repo below:

GitHub-BannerSmall

Note that the parameters are decoupled from the Pydantic script. This will allow me to reuse some parameters across future validation scripts and has enabled me to exclude the system parameters from the repository.

Summery

In this post, I used the Pydantic Python library to create data validation and observability processes for my Project Wolfie iTunes data.

I found Pydantic very impressive! Its simplicity, functionality and interoperability make it an attractive addition to Python data pipelines, and its strong community support keeps Pydantic relevant and current. Additionally, Pydantic’s presence in FastAPI, PydanticAI and a managed AWS Lambda layer enables rapid integration and seamless deployment. I see many applications for it within Project Wolfie.

There’s lots more to Pydantic – this Pixegami video is a great walkthrough of Pydantic in action:

If this post has been useful then the button below has links for contact, socials, projects and sessions:

SharkLinkButton 1

Thanks for reading ~~^~~

Categories
Data & Analytics

Gold Layer PySpark ETL With AWS Glue Studio

In this post, I create my WordPress data pipeline’s Gold ETL process using PySpark and the AWS Glue Studio visual interface.

Table of Contents

Introduction

Time to finish my WordPress AWS data pipeline! Here it is so far:

AWS Cloud
AWS Cloud
EventBridge
Schedule
EventBridge…
AWS Step Functions workflow
AWS Step Functions workflow
3
3
AWS Lambda Raw Function
AWS Lambda Ra…
AWS SNS Topic
AWS SNS Topic
2
2
State
Machine
State…
AWS Lambda Bronze Function
AWS Lambda Br…
F
F
5
5
AWS Glue
Bronze Crawler
AWS Glue…
4
4
AWS Glue
Silver ETL Job
AWS Glue…
F
F
F
F
1
1
F
F
EventBridge
Scheduler
EventBridge…
AWS SNS Topic
AWS SNS Topic
User
User
CloudWatch Logs
CloudWatch Lo…
F
F
F
F
Text is not SVG – cannot display

In which;

In the Medallion Lakehouse Architecture, this covers both the Bronze and Silver layers that handle raw and processed data respectively. Now I’ll start aggregating my WordPress data for reporting and analytics. For this, I’ll use AWS Glue Studio.

Firstly, I’ll explore Glue Studio and its features. Next, I’ll architect and build an ETL job using Glue Studio’s visual editor while examining some of Glue’s behaviours. Finally, I’ll update my WordPress Data Pipeline Step Functions workflow and examine costs.

Let’s begin with Glue Studio.

AWS Glue Studio

This section introduces Glue Studio and examines Apache Spark.

AWS Glue Studio

AWS Glue Studio is a serverless tool designed for data-centric tasks like automating data preparation, orchestrating data quality checks and creating ETL jobs. It integrates with other AWS services, and also interacts with data from sources like RDS, Redshift and S3. It is ideal for simplifying data transformation and integration processes. The AWS documentation contains full details of Glue Studio’s features.

Under the hood, Glue Studio uses PySpark, the Python API for Apache Spark. Workflows can be created both as code and via Glue Studio’s visual interface. Glue Studio supports Git version control systems for change management, and integrates several observability tools including AWS IAM for security and Amazon CloudWatch for logging. Additionally, Glue also has its own monitoring and orchestration tools.

But wait – Spark? PySpark? What?!

Apache Spark

Apache Spark is an open-source framework designed to process large-scale data quickly. Spark enables distributed computing, allowing tasks to be performed across multiple machines for faster and more efficient data processing. It has existed since 2014.

Known for its speed, Spark processes data in memory, significantly reducing the need for slower disk operations associated with older systems. Spark is commonly used for big data analytics, machine learning and real-time data processing in industries that handle massive datasets.

PySpark

PySpark is a Python interface for Apache Spark. It allows operations to be distributed across clusters of machines while maintaining the accessibility and ease of Python. PySpark’s combination of Python’s simplicity and Spark’s power makes it a practical, accessible solution for handling extensive datasets in a fast and scalable way.

Glue Studio’s visual interface automatically writes PySpark code in real time. For example, this boilerplate Python script is created with each new Glue PySpark job:

Python
import sys
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.job import Job

args = getResolvedOptions(sys.argv, ["JOB_NAME"])
sc = SparkContext()
glueContext = GlueContext(sc)
spark = glueContext.spark_session
job = Job(glueContext)
job.init(args["JOB_NAME"], args)

For those curious, this DataEng video provides a technical explanation of each import:

So that’s the basics of AWS Glue Studio. Now let’s see what the solution looks like.

Architecture

This section examines my proposed solution’s architecture. Much of this architecture is similar to both the Bronze and Silver layers. I’ll examine the new Gold Glue PySpark ELT job first, followed by the updated WordPress data pipeline Step Function workflow.

Glue Gold ETL Job

Firstly, this is the Gold Glue PySpark ETL job:

While updating CloudWatch Logs throughout:

  1. Gold Glue ETL job extracts data from wordpress-api Silver S3 objects and then performs PySpark transformations.
  2. Gold Glue PySpark ETL job loads the transformed data into Gold S3 bucket as Parquet objects.

Step Function Workflow

Next, the updated Step Function workflow:

While updating the workflow’s CloudWatch Log Group throughout:

  1. An EventBridge Schedule executes the Step Functions workflow. Lambda Raw function is invoked.
    • Invocation Fails: Publish SNS message. Workflow then ends.
    • Invocation Succeeds: Invoke Lambda Bronze function.
  2. Lambda Bronze function is invoked.
    • Invocation Fails: Publish SNS message. Workflow then ends.
    • Invocation Succeeds: Run Glue Bronze Crawler.
  3. Glue Bronze Crawler runs.
    • Run Fails: Publish SNS message. Workflow then ends.
    • Run Succeeds: Update Glue Data Catalog. Run Glue Silver ETL job.
  4. Glue Silver ETL job runs.
    • Run Fails: Publish SNS message. Workflow then ends.
    • Run Succeeds: Run Glue Silver Data Quality Checks.
  5. Glue Silver Data Quality Checks run.
    • Run Fails: Publish SNS message. Workflow then ends.
    • Run Succeeds: Run Glue Silver Crawler.
  6. Glue Silver Crawler runs.
    • Run Fails: Publish SNS message. Workflow then ends.
    • Run Succeeds: Update Glue Data Catalog. Run Glue Gold ETL job.
  7. Glue Gold PySpark ETL job runs.
    • Run Fails: Publish SNS message. Workflow then ends.
    • Run Succeeds: Run Glue Gold Crawler.
  8. Glue Gold Crawler runs.
    • Run Fails: Publish SNS message. Workflow then ends.
    • Run Succeeds: Update Glue Data Catalog. Workflow then ends.

Additionally, an SNS message is published if the Step Functions workflow fails.

Gold ETL Job

In this section, I create my Gold Glue PySpark ETL job. Firstly, I’ll define the job’s requirements. Next, I’ll build the job in Glue Studio, and finally I’ll examine Glue’s inbuilt monitoring.

Requirements

Let’s begin by understanding the Gold Layer. Databricks defines it as curated, business-level data:

Data in the Gold layer of the lakehouse is typically organised in consumption-ready “project-specific” databases. The Gold layer is for reporting and uses more de-normalised and read-optimised data models with fewer joins. The final layer of data transformations and data quality rules are applied here.

https://www.databricks.com/glossary/medallion-architecture

The concept of a gold layer is nothing new. Other names include aggregated, enriched and consumption layers. The idea is the same in all cases – producing refined and aggregated datasets that are easily consumable by analytics tools, machine learning models and production applications.

This Gold ETL job will produce an aggregation of both the posts and statistics_pages Silver datasets. The Gold dataset will contain view statistics and post creation data, limited to blog posts.

This will involve:

  • Joining the Silver datasets.
  • Removing unneeded columns to reduce the Gold dataset’s size.
  • Renaming columns to improve the Gold dataset’s legibility.
  • Filtering the Gold dataset to remove unneeded data.

So let’s get started!

Job Creation

This section splits the Gold Glue PySpark ETL job creation process into separate steps for each part.

Sources

Firstly, let’s define the data sources. There are two sources, both of which are folders in the data-lakehouse-silver S3 bucket:

  • wordpress_api/posts/
  • wordpress_api/statistics_pages/

Each source needs a separate node specifying the S3 path and data format. This example shows the Silver posts dataset, where the wordpress_api/posts/ S3 path is selected:

2024 10 25 AWSGlueStudioNodeSource

Finally, this is the Source node’s PySpark code for both posts and statistics_pages:

Python
# Script generated for node S3 Silver statistics_pages
S3Silverstatistics_pages_node1724058965930 = glueContext.create_dynamic_frame.from_options(
  format_options={}, 
  connection_type="s3", 
  format="parquet", 
  connection_options={
    "paths": ["s3://data-lakehouse-silver/wordpress_api/statistics_pages/"], 
    "recurse": True
    },
  transformation_ctx="S3Silverstatistics_pages_node1724058965930"
 )

# Script generated for node S3 Silver posts
S3Silverposts_node1724058915313 = glueContext.create_dynamic_frame.from_options(
  format_options={}, 
  connection_type="s3", 
  format="parquet", 
  connection_options={
    "paths": ["s3://data-lakehouse-silver/wordpress_api/posts/"], 
    "recurse": True
    },
  transformation_ctx="S3Silverposts_node1724058915313"
 )

Join Transformation

From AWS:

The Join transform allows you to combine two datasets into one. You specify the key names in the schema of each dataset to compare.

https://docs.aws.amazon.com/glue/latest/dg/transforms-configure-join.html

This node essentially creates a SQL join using columns from the selected sources. Here, I’ve inner joined posts.ID to statistics_pages.ID:

2024 10 25 AWSGlueStudioNodeJoin

Rows from the Silver datasets that match the join condition are merged into a new row in an output DynamicFrame that will ultimately become the Gold dataset. This frame includes all columns from both Silver datasets.

The ETL visual now shows two source nodes linked to the Join node:

2024 10 25 AWSGlueStudioDAGSourceJoin

Finally, this is the Join node’s PySpark code:

Python
# Script generated for node Join
Join_node1724059035756 = Join.apply(
  frame1=S3Silverposts_node1724058915313,
  frame2=S3Silverstatistics_pages_node1724058965930,
  keys1=["ID"],
  keys2=["id"],
  transformation_ctx="Join_node1724059035756"
  )

Change Schema Transformation

Now it’s time to do some cleaning!

From AWS:

Change Schema transform remaps the source data property keys into the desired configured for the target data. In a Change Schema transform node, you can:

  • Change the name of multiple data property keys.
  • Change the data type of the data property keys, if the new data type is supported and there is a transformation path between the two data types.
  • Choose a subset of data property keys by indicating which data property keys you want to drop.
https://docs.aws.amazon.com/glue/latest/dg/transforms-configure-applymapping.html

Firstly, I set the Join node as the Change Schema node’s parent to update the ETL visual:

2024 10 25 AWSGlueStudioDAGJoinSchema

Following the join, the Gold dataset can be simplified and optimised. Here’s an example of what the Change Schema node looks like in action:

2024 10 25 AWSGlueStudioNodeSchema

Here

  • Source Key shows the current column name.
  • Target Key handles column name changes.
  • Data Type sets the data type.
  • Ticking a Drop box removes that column from the output DynamicFrame

I’ve listed my changes below. Bold items appear in the example.

Firstly, these columns are dropped due to duplication or redundancy:

posts:

  • posts.post_modified
  • post_modified_day
  • post_modified_month
  • post_modified_todate
  • post_modified_year

statistics_pages:

  • date_todate
  • id
  • type
  • uri

Additionally, these columns are renamed to add context:

posts:

  • post_date_todate to post_date

statistics_pages:

  • page_id to statistics_id
  • date to statistics_date
  • date_year to statistics_date_year
  • date_month to statistics_date_month
  • date_day to statistics_date_day

Finally, this is the Change Schema node’s PySpark code:

Python
# Script generated for node Change Schema
ChangeSchema_node1724059144495 = ApplyMapping.apply(
  frame=Join_node1724059035756, 
  mappings=[
    ("ID", "bigint", "post_ID", "long"), 
    ("post_title", "string", "post_title", "string"), 
    ("post_status", "string", "post_status", "string"), 
    ("post_parent", "bigint", "post_parent", "long"), 
    ("post_type", "string", "post_type", "string"), 
    ("post_date_todate", "timestamp", "post_date", "timestamp"), 
    ("post_date_year", "bigint", "post_date_year", "long"), 
    ("post_date_month", "bigint", "post_date_month", "long"), 
    ("post_date_day", "bigint", "post_date_day", "long"), 
    ("page_id", "bigint", "statistics_id", "long"), 
    ("date", "timestamp", "statistics_date", "timestamp"), 
    ("count", "bigint", "statistics_count", "long"), 
    ("date_year", "bigint", "statistics_date_year", "long"), 
    ("date_month", "bigint", "statistics_date_month", "long"), 
    ("date_day", "bigint", "statistics_date_day", "long")
    ], 
  transformation_ctx="ChangeSchema_node1724059144495"
  )

Filter Transformation

The joined, cleaned dataset contains data about all amazonwebshark content. I only want the posts data, so next I’ll filter everything else out.

From AWS:

Use the Filter transform to create a new dataset by filtering records from the input dataset based on a regular expression. Rows that don’t satisfy the filter condition are removed from the output.

https://docs.aws.amazon.com/glue/latest/dg/transforms-filter.html

Firstly, I set the Change Schema node as the Filter node’s parent to update the ETL visual:

2024 10 25 AWSGlueStudioDAGSchemaFilter

Next, I set the filter conditions. I only need one condition here – keep all dataset rows where post_type matches post:

2024 10 25 AWSGlueStudioNodeFilter

Finally, this is the Filter node’s PySpark code:

Python
# Script generated for node Filter
Filter_node1724060106174 = Filter.apply(
  frame=ChangeSchema_node1724059144495, 
  f=lambda row: (bool(re.match("post", row["post_type"]))),
 transformation_ctx="Filter_node1724060106174"
 )

Target

Finally, I must choose a target location for my Gold dataset.

Target uses the same interface as the Source node. This time, a Gold S3 bucket folder path wordpress_api/statistics_postname/ is specified. Everything else is the same as Source. The Target node offers significant versatility, detailed in the AWS target node documentation.

In summary, this is the Target node’s PySpark code:

Python
# Script generated for node S3 Gold
S3Gold_node1724060393283 = glueContext.write_dynamic_frame.from_options(
  frame=Filter_node1724060106174, 
  connection_type="s3", 
  format="glueparquet", 
  connection_options={
    "path": "s3://data-lakehouse-gold/wordpress_api/statistics_postname/", 
    "partitionKeys": []
    },
 format_options={"compression": "snappy"}, 
 transformation_ctx="S3Gold_node1724060393283"
 )

And here’s the full ETL visual:

2024 10 25 AWSGlueStudioDAGFinal

The full Glue job PySpark script is available in this post’s GitHub repo.

Job Properties

Next, I’ll examine some of my Glue job’s properties. This section only covers some key properties as there are loads. For a fuller view, please review the AWS Job Property documentation.

Additional properties like bookmarks, quality checks, scheduling and version control are also available. I’ve written about quality checks before, and the other properties could all be posts in themselves. For now, let’s move on to execution.

Job Execution

Each PySpark Glue job has several logging sources that are aggregated into the job’s Run tab. The summary shows properties including job status, durations and DPU capacity:

2024 10 25 AWSGlueStudioRunsLowerDetails

Each job can then be viewed in further detail, with insights including:

These resources are increasingly useful as Glue jobs scale. They show resource utilisation, query plans and node configuration which is essential when optimising and troubleshooting big data processes.

Ok, so my job is configured and running successfully. Now let’s review the outputs.

Glue Outputs & Behaviours

This section examines the outputs of my Gold Glue PySpark ETL job and the behaviours influencing them.

For clarity, this is not a case of finding and fixing errors. Rather, this is an exploration of how a Glue PySpark job’s output can differ from expectations. Coming in, I was more familiar with using pandas for ETL and initially found these behaviours confusing. So I wrote this section with that in mind, as it may help others in similar positions down the road.

Firstly I’ll demonstrate a behaviour. Next, I’ll explain why it happens. Finally, I’ll examine if it can be changed. Although, just because something can be done doesn’t mean that it should be.

Run 1: Multiple Objects

Previously, the Bronze and Silver layers ultimately produced single objects for each dataset. Conversely, my Gold PySpark job creates four objects with the same RunID:

2024 10 29 TestingObjectsFour

Ok – that’s unexpected. What’s more, if I run the job again then I get another four files with a new RunID. So that’s eight in total:

2024 10 29 TestingObjectsEight

There’s two behaviours here that differ from the previous layers:

  • Each run produces multiple objects instead of one.
  • Each run creates new objects instead of replacing existing ones.

Let’s examine the multiple objects first.

What’s Happening?

This occurs due to data partitioning.

As mentioned earlier, AWS Glue uses Apache Spark. Spark enables distributed computing by breaking down data into smaller parts. The presence of multiple objects is a direct outcome of this partitioning approach, offering benefits such as:

  • Parallel Processing: With data spread across multiple files, Spark workers can access different parts of the dataset simultaneously instead of fighting for a single object. This approach balances the workload and accelerates both read and write operations.
  • Fault Tolerance: If a write operation fails, only the impacted object needs reprocessing rather than the entire dataset. This design enhances resilience and reduces the risk of complete data loss.
  • Memory Management: Each Spark worker processes only its assigned data partition rather than the full dataset. This improves data loading efficiency and helps prevent memory exhaustion.

Can I Change It?

I couldn’t find a way to change this behaviour within Glue Studio. Glue is very capable of deriving partitions, so this isn’t surprising.

While it can be done, this involves manually changing the autogenerated PySpark script. Glue allows this at the cost of disabling the job’s visual design features:

Unlocking the job script will convert your job from visual mode to script-only mode. This action cannot be undone. To keep a copy of the visual-mode job, clone the job on the Jobs page of Glue Studio.

The change itself uses the coalesce method of Glue’s DynamicFrame class to control the number of partitions. This involves:

  • An additional import:
Python
from awsglue.dynamicframe import DynamicFrame
  • Converting the dynamic frame to a Spark DataFrame using coalesce(n). Here, coalesce(1) forces the output into a single object:
Python
single_file_df = Filter_node1724060106174.toDF().coalesce(1)
Python
single_file_dyf = DynamicFrame.fromDF(single_file_df, glueContext, "single_file_dyf")

The Glue job now produces a single Parquet object.

This should be used with care. Too many partitions can reduce response times by requiring more reads than necessary. Too few can hinder Spark’s workload distribution abilities. Here, having one object cripples it completely thus removing a key Spark benefit.

Run 2: Objects Not Replaced

Ok, let’s keep coalesce(1) in place because it makes this example easier. Running this job variant creates a single object:

2024 10 29 TestingObjectsOne

Running it again produces a second object with a new RunID:

2024 10 29 TestingObjectsTwo

Why isn’t the first object being replaced?

What’s Happening?

There are good reasons for this. Here’s why a replace function isn’t built in:

  • Spark Architecture: Spark processes data in parallel, with each task running separately. With this setup, replacing a single piece of data in an object is challenging. So instead, Spark jobs either create entirely new objects or replace data partitions.
  • S3 Architecture: S3 stores data as objects rather than files, so it doesn’t have folder-level replacements like a typical file system. When S3 ‘replaces’ an object, it actually creates a new version of the object with the same name and removes the old one.
  • Data Management Features: Writing new objects for each job run enables features like versioning, time travel and incremental processing with formats like Apache Iceberg and Delta Lake. It also avoids issues like access conflicts and deadlocks, since existing data remains unchanged while new data is written.

Can I Change It?

So…yes. Creating a boto3 S3 client and running a conditional delete during the job would achieve the desired effect:

Python
# Define S3 bucket and prefix for output path
output_bucket = "data-lakehouse-gold"
output_prefix = "wordpress_api/statistics_postname/"

# Initialize S3 client and clear existing objects in the output path
s3 = boto3.client('s3')
response = s3.list_objects_v2(Bucket=output_bucket, Prefix=output_prefix)

# Check if there are any files and delete them
if 'Contents' in response:
    for obj in response['Contents']:
        s3.delete_object(Bucket=output_bucket, Key=obj['Key'])

But, at this point, is this really a Spark use case anymore? For an ETL job requiring object replacement, I would initially lean towards using a Glue Python Shell job or the AWS SDK for pandas Lambda layer because:

  • Fewer cloud resources would be used, making the job cheaper than a PySpark job.
  • Fewer Python imports would be needed, reducing the script size and dependencies.
  • With appropriate settings, Lambda may run the script faster than Glue.

Suitability should always be a key consideration with cloud architectures. Taking time to choose the right service saves a lot of headaches later on.

Step Functions Update

This section integrates the Gold resources into my existing WordPress Data Pipeline Step Function workflow.

The Gold workflow update is similar to the Silver one. Firstly, I need a new Glue: StartJobRun action running the Gold Glue PySpark ETL job:

JSON
{
  "JobName": "WordPress_Gold_statisticspagespostsjoin"
}

Also, a new Glue: StartCrawler action running the Gold crawler:

JSON
{
  "Name": "wordpress-gold"
}

Here is how my Step Function workflow looks with these changes:

stepfunctions graph

The workflow’s IAM role needs new allow permissions too. Firstly, glue:StartJobRun and glue:GetJobRun on the WordPress_Gold_statisticspagespostsjoin Glue job:

JSON
{
	"Version": "2012-10-17",
	"Statement": [
		{
			"Sid": "VisualEditor0",
			"Effect": "Allow",
			"Action": [
				"glue:StartJobRun",
				"glue:GetJobRun"
			],
			"Resource": [
				"arn:aws:glue:eu-west-1: REDACTED:job/WordPress_Gold_statisticspagespostsjoin"
			]
		}
	]
}

(glue:GetJobRun lets the workflow check the job’s progress – Ed)

Next, glue:StartCrawler on the wordpress-gold crawler:

JSON
{
	"Version": "2012-10-17",
	"Statement": [
		{
			"Sid": "VisualEditor0",
			"Effect": "Allow",
			"Action": [
				"glue:StartCrawler"
			],
			"Resource": [
				"arn:aws:glue:eu-west-1:REDACTED:crawler/wordpress-gold"
			]
		}
	]
}

With these permissions, the workflow executes successfully:

2024 10 29 StepFunctionResultsGraph

I can get further details from the workflow’s Table view. This includes task durations, resource log links and a visual timeline of each state:

2024 10 29 StepFunctionResultsTable

Further Step Functions console details are in this 2022 Ben Smith AWS post.

Cost Analysis

This section examines my costs for the updated Step Function workflow.

Here, my Cost Explorer chart runs from 04 November to 14 November. It is grouped by API Operation and excludes tax.

2024 11 15 CostsGold

My main costs are from Glue’s Jobrun and CrawlerRun operations. Each ruleset now costs around $0.17 a day to run. This has increased from last time’s $0.09, but that’s to be expected as I’m running two Glue jobs now.

My crawlers now cost $0.06 a day, averaging $0.02 for each of the Bronze, Silver and Gold crawlers. The purple blip is for Glue Interactive Sessions – I have something coming up on those. Beyond that, I’m paying for some S3 PutObject calls and everything else is within the free tier.

Note that on Nov 06, it….broke. A failed call to the WordPress API brought the whole workflow down:

stepfunctions graph error

This proves my error handling works though! A forced stop and graceful failure is preferable to having data in an unknown state, especially in a production environment!

Summary

In this post, I created my WordPress data pipeline’s Gold ETL process using PySpark and the AWS Glue Studio visual interface.

I found Glue Studio to be highly user-friendly. It enhances job observability with comprehensive monitoring tools, and makes PySpark script creation significantly easier through its visual editor. Additionally, it integrates smoothly with other Glue features and the broader AWS ecosystem, offering extensive and intuitive customisation options.

This wraps up the WordPress AWS Data Pipeline project. This series aimed to demonstrate how different AWS services can work together to build efficient and cost-effective data pipelines. Through it, I’ve gained new insights and have several fresh ideas to explore!

If this post has been useful then the button below has links for contact, socials, projects and sessions:

SharkLinkButton 1

Thanks for reading ~~^~~

Categories
Data & Analytics

Bespoke Veracity Checks With AWS Glue Data Quality

In this post, I use AWS Glue Data Quality checks and rulesets to apply bespoke veracity checks to my WordPress datasets.

Table of Contents

Introduction

In my last post, I used the AWS Glue ETL Job Script Editor to write a Silver ETL Python script. Within that script and the ones prior, there are checks like:

If these checks all pass then datasets are created in various S3 buckets. But before I use these datasets for reporting and analytics, I should check their quality first.

In my Building And Automating Serverless Auto-Scaling Data Pipelines In AWS session, I talk about the Four V’s of Big Data. One of these Vs is Veracity – the measure of data’s truthfulness, accuracy and quality. AWS Glue offers veracity checks with AWS Glue Data Quality.

Launched in 2023, AWS Glue Data Quality measures and monitors data quality and veracity. It is built on top of an open-source framework, and provides a managed, serverless experience with machine learning augmentation.

Firstly, I’ll examine AWS Glue Data Quality and some of its features. Then I’ll use it to recommend rules for some of my Silver layer data objects, and then customise those recommendations as needed. Next, I’ll create and test a Glue Data Quality job using those rules. Finally, I’ll examine my Glue Data Quality costs.

AWS Glue Data Quality

This section introduces AWS Glue Data Quality and examines some of its features.

What Is Glue Data Quality?

From AWS:

AWS Glue Data Quality evaluates and monitors the quality of your data based on rules that you define. This makes it easy to identify the data that needs action. You can then monitor and evaluate changes to your datasets as they evolve over time.

https://docs.aws.amazon.com/glue/latest/dg/data-quality-gs-studio.html

AWS Glue Data Quality features include:

  • Creating & recommending sets of data quality rules.
  • Running the data quality rules as on-demand and scheduled jobs.
  • Monitoring and reporting against the data quality results.

So how do data quality rules work?

Glue Data Quality Rules

From AWS:

AWS Glue Data Quality currently supports 18 built-in rule types under four categories:

  1. Consistency rules check if data across different columns agrees by looking at column correlations.
  2. Accuracy rules check if record counts meet a set threshold and if columns are not empty, match certain patterns, have valid data types, and have valid values.
  3. Integrity rules check if duplicates exist in a dataset.
  4. Completeness rules check if data in your datasets do not have missing values.
https://aws.amazon.com/glue/faqs/

AWS Glue Data Quality rules are defined using Data Quality Definition Language (DQDL). DQDL uses a Rules list containing comma-separated rules in square brackets.

For example, this DQLD rule checks for missing values in customer-id and unique values in order-id:

Plaintext
Rules = [
   IsComplete "customer-id",
   IsUnique "order-id"
]

AWS maintains a DQDL reference that advises about syntax, structure, expressions, and rule types. Now all of this can be a lot to take in, so a good way of getting started is to get AWS to do some of the heavy lifting…

Glue Data Quality Rule Recommendations

Getting to grips with new features can be daunting. To help out, AWS Glue Data Quality can analyse data in the Glue Data Catalog. This process uses machine learning to identify and recommend rules for the analysed data. These rules can then be used and changed as needed.

Glue Data Quality recommendations are generated using Amazon’s Deequ open-source framework, which is tested on Amazon’s own petabyte-scale datasets. AWS has documented the recommendation generation process, and has released supporting videos like:

So that’s enough theory – let’s build something!

Ruleset Creation

In this section, I create a Glue Data Quality veracity ruleset for my silver-statistcs_pages dataset by generating and customising Glue’s recommendations.

Generating Recommendations

Firstly, I told Glue Data Quality to scan the dataset and recommend some rules. Two minutes later, Glue returned these:

Plaintext
Rules = [
    RowCount between 4452 and 17810,
    IsComplete "page_id",
    StandardDeviation "page_id" between 2444.94 and 2702.3,
    Uniqueness "page_id" > 0.95,
    ColumnValues "page_id" <= 8925,
    IsComplete "uri",
    ColumnLength "uri" <= 190,
    IsComplete "type",
    ColumnValues "type" in ["post","home","page","category","post_tag","archive","author","search"],
    ColumnValues "type" in ["post","home"] with threshold >= 0.94,
    ColumnLength "type" between 3 and 9,
    IsComplete "date",
    IsComplete "count",
    ColumnValues "count" in ["1","2","3","4","5","6"] with threshold >= 0.9,
    StandardDeviation "count" between 3.89 and 4.3,
    ColumnValues "count" <= 93,
    IsComplete "id",
    ColumnValues "id" in ["92","11","281","7","1143","1902","770","1217","721","1660","2169","589","371","67","484","4","898","0","691","2029","1606","2686","1020","2643","2993","1400","30","167","2394"] with threshold >= 0.89,
    StandardDeviation "id" between 820.3 and 906.65,
    ColumnValues "id" <= 3532,
    IsComplete "date_todate",
    IsComplete "date_year",
    ColumnValues "date_year" in ["2023","2024","2022"],
    ColumnValues "date_year" between 2021 and 2025,
    IsComplete "date_month",
    ColumnValues "date_month" in ["6","7","5","4","3","8","2","1","11","12","10","9"],
    ColumnValues "date_month" in ["6","7","5","4","3","8","2","1","11","12","10"] with threshold >= 0.94,
    StandardDeviation "date_month" between 3.09 and 3.41,
    ColumnValues "date_month" <= 12,
    IsComplete "date_day",
    ColumnValues "date_day" in ["13","7","12","8","6","3","19","20","17","4","9","14","1","16","2","11","5","15","10","26","21","25","24","18","27","22","28","30","23","29","31"],
    ColumnValues "date_day" in ["13","7","12","8","6","3","19","20","17","4","9","14","1","16","2","11","5","15","10","26","21","25","24","18","27","22","28","30"] with threshold >= 0.91,
    StandardDeviation "date_day" between 8.3 and 9.18,
    ColumnValues "date_day" <= 31
]

A lot is going on here, so let’s deep a little deeper.

Recommendations Analysis

As with many machine learning processes, some human validation of the results is wise before moving forward.

While Glue Data Quality can predict rules based on its ML model and the data supplied, I have years of familiarity with the data and can intuit likely future trends and patterns. As Glue currently lacks this intuition, some recommendations are more useful than others. Let’s examine some of them and I’ll elaborate.

Firstly, these recommendations are totally fine:

Plaintext
IsComplete "page_id",
IsComplete "uri",
IsComplete "date",
IsComplete "count",

IsComplete checks whether all of the values in a column are complete with no NULL values present. This is completely reasonable and should apply to all columns in the silver-statistics_pages data. An easy win.

However, some recommendations need work:

Plaintext
ColumnValues "date_year" in ["2023","2024","2022"],
ColumnValues "date_year" between 2021 and 2025,

ColumnValues runs an expression against the values in a column. These rules (which are both checking the same thing as DQDL’s BETWEEN is exclusive) state that:

date_year must be 2022, 2023 or 2024

This is fine for now, as 2024 is the current year and the first statistics are from 2022. But a post published next year will cause this rule to fail. And not because of incorrect data – because of incorrect rule configuration. Hello false positives!

Finally, some suggestions are outright wrong. For example:

Plaintext
ColumnValues "page_id" <= 8925,

This rule checks that the page_id column doesn’t exceed 8925. But page_id is a primary key! It auto-increments with every new row! So this rule will fail almost immediately, and so is completely unsuitable.

Ok so let’s fix them!

Recommendations Modifications

Firstly, let’s fix the date_year rule by replacing the range with a minimum value:

Plaintext
ColumnValues "date_year" >= 2021,

Now let’s fix the page_id rule. This column is a primary key in the WordPress MySQL database, so every value should be unique. Therefore the ruleset should check page_id for uniqueness.

As it turns out I’m spoilt for choice here! There are (at least) three relevant rules I can use:

Plaintext
IsUnique "page_id",
IsPrimaryKey "page_id",
Uniqueness "page_id" = 1.0,

Let’s examine them:

  • IsUnique checks whether all of the values in a column are unique. Exactly what I’m after.
  • IsPrimaryKey goes a step further, verifying that a column contains a primary key by checking if all of the values in the column are unique and complete (non-null).
  • Finally, Uniqueness checks the percentage of unique values in a column against a given expression. In my example, "page_id" = 1.0 states that each page_id column value must be 100% unique.

So why not use them all? Well, besides that being overkill there is a cost implication. Like many Glue services, AWS Glue Data Quality is billed by job duration (per DPU hour). If I keep all three rules then I’m doing the same check three times. This is wasteful and creates unnecessary costs.

Here, the IsPrimaryKey check most closely matches the source column (itself a primary key) so I’ll use that.

Elsewhere, I’m simplifying date_month and date_day. While these are correct:

Plaintext
ColumnValues "date_month" in ["6","7","5","4","3","8","2","1","11","12","10","9"],
ColumnValues "date_day" in ["13","7","12","8","6","3","19","20","17","4","9","14","1","16","2","11","5","15","10","26","21","25","24","18","27","22","28","30","23","29","31"],

It’s far simpler to read as:

Plaintext
ColumnValues "date_month" between 0 and 13,
ColumnValues "date_day" between 0 and 32,

Finally, I did some housekeeping to reduce the ruleset’s duration:

  • Removed all the duplicate checks. IsComplete was fine for most.
  • ColumnLength checks are gone as the WordPress database already enforces character limits.
  • StandardDeviation checks are also gone as they don’t add any value here.

Now let’s use these suggestions as a starting point for my own ruleset.

Customising A Ruleset

In addition to the above rules and changes, the following rules have been added to the silver-statistics_pages ruleset:

ColumnCount checks the dataset’s column count against a given expression. This checks there are ten columns in silver-statistics_pages:

Plaintext
ColumnCount = 10

RowCount checks a dataset’s row count against a given expression. This checks there are more than zero rows in silver-statistics_pages:

Plaintext
RowCount > 0

RowCountMatch checks the ratio of the primary dataset’s row count and a reference dataset’s row count against the given expression. This checks that the row count of silver-statistics_pages and bronze-statistics_pages are the same (100%):

Plaintext
RowCountMatch "wordpress_api.bronze-statistics_pages" = 1.0

ReferentialIntegrity checks to what extent the values of a set of columns in the primary dataset are a subset of the values of a set of columns in a reference dataset. This checks that each silver-statistics_pages ID value is present in silver-posts:

Plaintext
ReferentialIntegrity "id" "wordpress_api.silver-posts.id" = 1.0

Finally, here is my finished silver-statistics_pages ruleset:

Plaintext
# silver-statistics_pages data quality rules

Rules = [
    # all data
    ColumnCount = 10,
    RowCount > 0,
    RowCountMatch "wordpress_api.bronze-statistics_pages" = 1.0,
    
    # page_id
    IsPrimaryKey "page_id",
    
    # uri
    IsComplete "uri",
    
    # type
    IsComplete "type",
    
    # date
    IsComplete "date",
    ColumnValues "date_todate" <= now(),
    
    # count
    IsComplete "count",
    ColumnValues "count" between 0 and 1000,
    
    # id
    IsComplete "id",
    ReferentialIntegrity "id" "wordpress_api.silver-posts.id" = 1.0,
    
    # date_todate
    IsComplete "date_todate",
    ColumnValues "date_todate" <= now(),
    
    # date_year
    IsComplete "date_year",
    ColumnValues "date_year" >= 2021,
    
    # date_month
    IsComplete "date_month",
    ColumnValues "date_month" between 0 and 13,
    
    # date_day
    IsComplete "date_day",
    ColumnValues "date_day" between 0 and 32

]

Once a ruleset is created, it can be edited, cloned and run. So let’s test it out!

Ruleset Testing

In this section, I test my Glue Data Quality veracity ruleset and act on its findings. But first I need to get it running…

Job Test: Data Fetch Fail

Running my ruleset for the first time, it didn’t take long for a problem to appear:

Plaintext
Exception in User Class: java.lang.RuntimeException : 
Failed to fetch data. 
Please check the logs in CloudWatch to get more details.

Uh oh. Guess it’s time to check CloudWatch. This wasn’t an easy task the first time round!

Glue Data Quality generates two new log groups:

  • aws-glue/data-quality/error
  • aws-glue/data-quality/output

And each Data Quality job run creates five log streams:

2024 08 23 LogStream

But there’s no clear hint of where to start! So I dived in and started reading the error logs. After some time, it turned out I actually needed the output logs. Oh well.

And in an output log stream’s massive stack trace:

2024 08 23 Stacktrace
This isn’t even half of it – Ed

Was my problem:

Plaintext
Caused by:
java.io.FileNotFoundException:
No such file or directory
's3://data-lakehouse-bronze/wordpress_api/statistics_pages/statistics_pages.parquet'

No such directory? Well, there definitely is! Sounds like a permissions issue. What gives?

So, remember my RowCountMatch check? It’s trying to compare the silver-statistics_pages row count to the bronze-statistics_pages row count. Like most AWS services, AWS Glue uses an IAM role to interact with AWS resources – in this case the Bronze and Silver Lakehouse S3 buckets.

So let’s check:

  • Can the Glue Data Quality check’s IAM role read from the Silver Lakehouse S3 bucket? Yup!
  • Can it read from the Bronze one? Ah…

Adding s3:GetObject for the bronze S3 path to the Glue Data Quality check’s IAM role fixed this error. Now the job runs and returns results!

Job Test: Constraint Not Met

Next up, I got an interesting message from my ColumnValues "count" rule:

Plaintext
ColumnValues "count" between 0 and 1000

Value: 93.0 does not meet the constraint requirement!

That’s…a lot! Then I realised I’d set the rule conditions to between 0 and 1 instead of between 0 and 1000. Oops…

Then I got a confusing result from my ReferentialIntegrity "id" "wordpress_api.silver-posts.id" = 1.0 rule:

Plaintext
ReferentialIntegrity "id" "wordpress_api.silver-posts.id" = 1.0 

Value: 0.9763982102908277 does not meet the constraint requirement.

As a reminder, ReferentialIntegrity checks to what extent the values of a set of columns in the primary dataset are a subset of the values of a set of columns in a reference dataset. And because "wordpress_api.silver-statistics_pages.id" values are based entirely on "wordpress_api.silver-posts.id" values, they should be a perfect match!

Time to investigate. I launched Athena and put this query together:

SQL
SELECT 
sp.id AS stats_id
,p.id AS post_id
FROM "wordpress_api"."silver-statistics_pages" AS sp
LEFT JOIN "wordpress_api"."silver-posts" AS p 
ON sp.id = p.id

And the results quickly highlighted a problem:

2024 08 23 AthenaResults

Here, the LEFT JOIN retrieves all silver-statistics_pages IDs and each row’s matching ID from silver-posts. The empty spaces represent NULLs, where no matching silver-posts ID was found. So what’s going on? What is stats_id zero in silver-statistics_pages?

Reviewing the silver-statistics_pages uri column shows that ID zero is amazonwebshark’s home page. As the WordPress posts table doesn’t record anything about the home page, the statistics_pages table can’t link to anything in posts. So ID zero is used to prevent missing data.

Knowing this, how can I update the rule? In June 2024 AWS added DQDL WHERE clause support, so I tried to add a “where statistics_pages ID isn’t zero” condition. But in testing the editor either didn’t run the check properly or rejected my syntax entirely. So eventually I settled for changing the check’s threshold from = 1.0 to >= 0.9. Maybe something to revisit in a few months.

Run History & Reporting

So now all my rules are working, what benefits do I get? Firstly, AWS Glue shows the job’s run history including status, result and start/stop times:

2024 08 23 DQHistory

Each run is expandable, showing details like duration, overall score and each check’s output. Results are also downloadable – in testing this gave me an unreadable file but adding a JSON suffix let me view the contents:

JSON
{
	"ResultId": "dqresult-e76896fe1ab1dd3436cf12b719da726416d4e64e",
	"Score": 0.95,
	"DataSource": {
		"GlueTable": {
			"DatabaseName": "wordpress_api",
			"TableName": "silver-statistics_pages",
			"CatalogId": "973122011240"
		}
	},
	"RulesetName": "silver-statistics_pages",
	"StartedOn": "2024-08-22T17:23:20.468Z",
	"CompletedOn": "2024-08-22T17:23:45.680Z",
	"RulesetEvaluationRunId": "dqrun-a94651ef8547f426cb977c9451c39061c68aefbd",
	"RuleResults": [
		{
			"Name": "Rule_1",
			"Description": "ColumnCount = 10",
			"Result": "PASS",
			"EvaluatedMetrics": {
				"Dataset.*.ColumnCount": 10
			}
		},
		{
			"Name": "Rule_2",
			"Description": "RowCount > 0",
			"Result": "PASS",
			"EvaluatedMetrics": {
				"Dataset.*.RowCount": 8940
			}
		},
		{
			"Name": "Rule_3",
			"Description": "RowCountMatch \"wordpress_api.bronze-statistics_pages\" = 1.0",
			"Result": "PASS",
			"EvaluatedMetrics": {
				"Dataset.wordpress_api.bronze-statistics_pages.RowCountMatch": 1
			}
		},
		{
			"Name": "Rule_4",
			"Description": "IsPrimaryKey \"page_id\"",
			"Result": "PASS",
			"EvaluatedMetrics": {
				"Column.page_id.Completeness": 1,
				"Column.page_id.Uniqueness": 1
			}
		},

[REDACTED Rules 5 to 19 for space - Ed]

		{
			"Name": "Rule_20",
			"Description": "ColumnValues \"date_day\" between 0 and 32",
			"Result": "PASS",
			"EvaluatedMetrics": {
				"Column.date_day.Maximum": 31,
				"Column.date_day.Minimum": 1
			}
		}
	]
}

Finally, there’s a snapshot chart showing the results trend of the last ten runs:

2024 08 23 DQSnapshot

Although not downloadable, this can still be screen-grabbed and used to certify the data’s quality to stakeholders. Additionally, AWS has documented a visualisation solution using Lambda, S3 and Athena.

Additional Data Quality Ruleset

With the silver-statistics_pages ruleset testing complete, I added a second dataset check before I moved on. This ruleset is applied to silver-posts.

The checks are very similar to silver-statistics_pages in terms of rules and criteria. So in the interests of space I’ve committed it to my GitHub repo.

Now, let’s add my Glue Data Quality checks into my WordPress pipeline.

Ruleset Orchestration

In this section, I integrate my Glue Data Quality veracity checks into my existing WordPress Data Pipeline Step Function workflow.

Step Function Integration

As a quick reminder, here’s how the Step Function workflow currently looks:

2024 08 09 stepfunctions graph

This workflow controls the ingestion, validation, crawling and ETL processes associated with my WordPress API data. I’ll insert the quality checks between the Silver ETL job and the Silver crawler.

AWS Step Functions runs Glue Data Quality checks using the StartDataQualityRulesetEvaluationRun task. This task uses an AWS SDK integration, calling the StartDataQualityRulesetEvaluationRun API with the following parameters:

  • Data source (AWS Glue table) associated with the run.
  • IAM role to run the checks with.
  • Ruleset(s) to run.

Optional parameters are also available. In the case of my silver-statistics_pages ruleset, the API parameters are as follows:

JSON
{
  "DataSource": {
    "GlueTable": {
      "DatabaseName": "wordpress_api",
      "TableName": "silver-statistics_pages"
    }
  },
  "Role": "Glue-S3ReadOnly",
  "RulesetNames": [
    "silver-statistics_pages"
  ]
}

Because the TableName parameter is different for the silver-posts checks, each check needs a separate action. However, I can use a Parallel state because both actions can run simultaneously. This will take full advantage of AWS’s systems, yielding faster execution times for my workflow.

Here is how my Step Function workflow looks with these changes:

stepfunctions graph

This workflow is executed by an EventBridge Schedule running daily at 07:00.

Step Function Testing

Testing time! My workflow needs new IAM permissions to perform its new tasks. These are:

  • glue:StartDataQualityRulesetEvaluationRun

This lets the workflow start the silver-statistics_pages and silver-posts Data Quality jobs.

  • iam:PassRole

A Glue Data Quality job must assume an IAM role to access AWS resources successfully. Without iam:PassRole the workflow can’t do this and the check fails.

  • glue:GetTable

The workflow must access the Glue Data Catalog while running, requiring glue:GetTable on the desired region’s Data Catalog ARN to get the required metadata.

With these updates, the workflow executes successfully:

2024 08 28 SFExec

During the parallel state, both Data Quality jobs successfully start and finish within milliseconds of each other instead of running sequentially:

2024 08 28 SFResults

Cost Analysis

In this section, I examine my costs for the updated Step Function workflow.

This Cost Explorer chart runs from 16 August to the end of August. It is grouped by API Operation and excludes some APIs that aren’t part of this workload.

2024 09 01 Costs

Some notes:

  • I was experimenting with Glue Data Quality from 19 August to 22 August. This period generates the highest Glue Jobrun costs – $0.23 on the 20th and $0.27 on the 22nd.
  • The silver-statistics_pages ruleset was added to the Step Function workflow on the 26th. The silver-posts ruleset was then added on the 27th.
  • The CrawlerRun daily costs are usually $0.04, with some experiments generating higher costs.

My main costs are from Glue’s Jobrun and CrawlerRun operations, which was expected. Each ruleset costs around $0.09 a day to run, while each crawler continues to cost $0.02 a day. Beyond that I’m paying for some S3 PutObject calls, and everything else is within the free tier.

Separately, AWS has tested Data Quality rulesets of varying complexity. Their accrued costs ranged from $0.18 for the least complex to $0.54 for the most complex. So on par with mine!

Summary

In this post, I used AWS Glue Data Quality checks and rulesets to apply bespoke veracity checks to my WordPress datasets.

I think AWS Glue Data Quality is a very effective veracity tool. The simple syntax, quick execution and deep AWS integration offer a good solution to a core Data Engineering issue. It’s great that datasets can be compared with other datasets in the Glue Data Catalog, and the baked-in reporting and visuals make Glue’s findings immediately accessible to both technical engineers and non-technical stakeholders. I look forward to seeing what future Glue Data Quality releases will offer!

If this post has been useful then the button below has links for contact, socials, projects and sessions:

SharkLinkButton 1

Thanks for reading ~~^~~