Data Pipeline Validation

Datatest can be used to validate data as it flows through a data pipeline. This can be useful in a production environment because the data coming into a pipeline can change in unexpected ways. An up-stream provider could alter its format, the quality of a data source could degrade over time, previously unheard-of errors or missing data could be introduced, etc.

Well-structured pipelines are made of discrete, independent steps where the output of one step becomes the input of the next step. In the simplest case, the steps themselves can be functions. And in a pipeline framework, the steps are often a type of “task” or “job” object.

A simple pipeline could look something like the following:

21
22
23
24
25
26
27
28
29
30
...

def pipeline(file_path):
    data = load_from_source(file_path)  # STEP 1

    data = operation_one(data)          # STEP 2

    data = operation_two(data)          # STEP 3

    save_to_destination(data)           # STEP 4

You can simply add calls to validate() between the existing steps:

21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
...

def pipeline(file_path):
    data = load_from_source(file_path)  # STEP 1

    validate(data.columns, ['user_id', 'first_name', 'last_name'])

    data = operation_one(data)          # STEP 2

    validate(data.columns, ['user_id', 'full_name'])
    validate(data, (int, str))

    data = operation_two(data)          # STEP 3

    validate.unique(data['user_id'])

    save_to_destination(data)           # STEP 4

You could go further in a more sophisticated pipeline framework and define tasks dedicated specifically to validation.

Tip

When possible, it’s best to call validate() once for a batch of data rather than for individual elements. Doing this is more efficient and failures provide more context when fixing data issues or defining appropriate acceptances.

# Efficient:

validate(data, int)

for x in data:
    myfunc(x)
# Inefficient:

for x in data:
    validate(x, int)
    myfunc(x)

Toggle Validation On/Off Using __debug__

Sometimes it’s useful to perform comprehensive validation for debugging purposes but disable validation for production runs. You can use Python’s __debug__ constant to toggle validation on or off as needed:

21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
...

def pipeline(file_path):
    data = load_from_source(file_path)  # STEP 1

    if __debug__:
        validate(data.columns, ['user_id', 'first_name', 'last_name'])

    data = operation_one(data)          # STEP 2

    if __debug__:
        validate(data.columns, ['user_id', 'full_name'])
        validate(data, (int, str))

    data = operation_two(data)          # STEP 3

    if __debug__:
        validate.unique(data['user_id'])

    save_to_destination(data)           # STEP 4

Validation On

In the example above, you can run the pipeline with validation by running Python in unoptimized mode. In unoptimized mode, __debug__ is True and assert statements are executed normally. Unoptimized mode is the default mode when invoking Python:

python simple_pipeline.py

Validation Off

To run the example above without validation, run Python in optimized mode. In optimized mode, __debug__ is False and assert statements are skipped. You can invoke optimized mode using the -O command line option:

python -O simple_pipeline.py

Validate a Sample from a Larger Data Set

Another option for dealing with large data sets is to validate a small sample of the data. Doing this can provide some basic sanity checking in a production pipeline but it could also allow some invalid data to pass unnoticed. Users must decide if this approach is appropriate for their specific use case.

DataFrame Example

With Pandas, you can use the DataFrame.sample() method to get a random sample of items for validation:

21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
...

def pipeline(file_path):
    data = load_from_source(file_path)  # STEP 1

    validate(data.columns, ['user_id', 'first_name', 'last_name'])

    data = operation_one(data)          # STEP 2

    sample = data.sample(n=100)
    validate(sample.columns, ['user_id', 'full_name'])
    validate(sample, (int, str))

    data = operation_two(data)          # STEP 3

    sample = data.sample(n=100)
    validate.unique(sample['user_id'])

    save_to_destination(data)           # STEP 4

Iterator Example

Sometimes you will need to work with exhaustible iterators of unknown size. It’s possible for an iterator to yield more data than you can load into memory at any one time. Using Python’s itertools module, you can fetch a sample of data for validation and then reconstruct the iterator to continue with data processing:

1
2
3
import itertools

...
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
...

def pipeline(file_path):
    iterator = load_from_source(file_path)  # STEP 1

    iterator = operation_one(iterator)      # STEP 2

    sample = list(itertools.islice(iterator, 100))
    validate(sample, (int, str))
    iterator = itertools.chain(sample, iterator)

    iterator = operation_two(iterator)      # STEP 3

    sample = list(itertools.islice(iterator, 100))
    validate.unique(item[0] for item in sample)
    iterator = itertools.chain(sample, iterator)

    save_to_destination(iterator)           # STEP 4

Tip

If you need perform multiple sampling operations on exhaustible iterators, you may want to define a function for doing so:

1
2
3
4
5
6
7
8
9
import itertools

def get_sample(iterable, n=100):
    iterator = iter(iterable)
    sample = list(itertools.islice(iterator, n))
    iterator = itertools.chain(sample, iterator)
    return sample, iterator

...

Calling this function returns a tuple that contains a sample and the reconstructed iterator:

27
28
29
30
31
32
...

sample, iterator = get_sample(iterator)
validate(sample, (int, str))

...

Important

As previously noted, validating samples of a larger data set should be done with care. If the sample is not representative of the data set as a whole, some validations could fail even when the data is good and some validations could pass even when the data is bad.

In some of the examples above, there are calls to validate.unique() but this validation only checks data included in the sample—duplicates in the remaining data set could pass unnoticed. This may be acceptable for some situations but not for others.

Testing Pipeline Code Itself

The pipeline validation discussed in this document is not a replacement for proper testing of the pipeline’s code base itself. Pipeline code, should be treated with the same care and attention as any other software project.