ETL development life-cycle with Dataflow
by Rishika Idnani and Olek Gorajek
Introduction
This blog post is for all the data practitioners who are interested in learning about the standardization and automation of data engineering projects at Netflix.
In our first Dataflow post, we introduced Dataflow as the data project management toolbox at Netflix and described data pipeline asset management in detail. Following that, our next Dataflow post provided an inside look at the sample workflows feature, which offers an easy way to bootstrap new workflows in one of several programming languages.
In this post, we will explore the benefits of the Dataflow recommended approach to testing and deployment. This pattern, also mentioned in our initial blog post, provides a robust framework for managing and optimizing data pipelines. We’ll show how a team of data engineers can work at the same time on the same set of ETL code, and are able to keep their code isolated, easily testable, and with the ability to merge their changes with the production environment without much effort.
“Git Solution”
Some of us may remember times when we used to push (or save) our ETL code into whatever version control, or network drive system, existed in the company. It could have been as simple as saving some flavor of SQL into an editor window and calling it something like bob_etl_v1.sql
, in order to keep all your previous versions intact and to make sure everyone who sees the name prefix will not override your code. Those days are luckily long gone, but proper version control and other software engineering practices have only in recent years been entering into the world of data engineering.
And there is one tool that has been by far the most widely used in this effort: git. With a little help from git anyone is free to develop code comfortably in their local branches and after they make sure it has no syntax, logical and performance issues, they can safely merge it with the production code… or we wish it was that easy.
The fact of the matter is that it’s not always simple to take the ETL code you are working on and test it safely in an isolated environment. Or to take your transformation logic, especially written in SQL and run it as if it was running in production mode. The common problem here is that the SQL logic is often parametrized and sometimes the parameters are dynamically generated and the output must be isolated from the current production environment. Preparing a well designed test pipeline and its environment can be often such a hassle that sometimes Data Engineers simply cross their fingers and test-in-prod.
Obviously there is nothing revolutionary about source control tools, or SQL templatization for that matter, but let’s have a look at the effect it can have when combined with a solid CI/CD framework. Here is a short list of related features, provided in Dataflow out-of-the-box, which we’ll be talking about in this post:
- branch driven configuration
- unit testing framework (with dataset mocking)
- integration testing framework
- ci/cd customization hooks
Branch Driven Configuration
This is the core element of the setup to make your Dataflow project easy to develop and test. Let’s have a look at an example Dataflow configuration in some random repository. The file we want to show you is called dataflow.yaml
and it typically sits in the root of a repository:
~/random-dataflow-repo$ cat dataflow.yaml
[...]
defaults:
workflows:
scheduler: sandbox
variables:
TARGET_DB: ${dataflow.username}
branches:
staging:
workflows:
scheduler: production
variables:
TARGET_DB: staging
main:
workflows:
scheduler: production
variables:
TARGET_DB: prod
Now let’s review the above config file. Its content defines the following pattern:
- If you are operating in a git branch called
staging
then:
→ Dataflow will make sure to deploy your workflows to the scheduler¹production
cluster,
→ and it will set yourTARGET_DB
workflow variable tostaging
, so all the output tables are created (and accessed) in that database - On the other hand if you are running in a git branch called
main
then:
→ Dataflow will deploy your workflows to the schedulerproduction
cluster,
→ and set theTARGET_DB
workflow variable toprod
- And finally… if you are in a git branch that is not recognized or registered in the config, Dataflow will use the default settings, which means:
→ it will deploy your workflows to the schedulersandbox
cluster,
→ and set theTARGET_DB
workflow variable to the value of your own username.
Of course the above settings are not perfectly isolating for everyone or even for the same user running the same pipelines with different adjustments, but it is not hard to get this configuration to a point that is sufficient for any type of use case you need. There is simply no point in making it too complex from the start, especially if, say, some Dataflow managed repository has only a few workflows in it and is accessed only by a handful of users.
It is also important to note that by far the most common case for the above main branch configuration is to be applied inside a CI/CD framework only, while the defaults settings are usually applicable in a local environment where someone is building and testing their code.
Pull-Request Life Cycle
The pull-request (PR) life cycle is the key concept to enable safe and sound experience of working on your ETL code. You are not required to follow it, even when using Dataflow, but when you do, your data engineering work will become more enjoyable, predictable and reproducible.
Let’s have a look at this diagram:
Almost any data engineering project can use the above pattern to safely manage their code and data artifacts. The main issue is that it can be hard to set this up if your environment is a clean slate and you don’t already have a DBT-like platform to start with. Here is where Dataflow comes in handy. Let’s have a look at how its functionality maps to the above PR life cycle.
Testing with Dataflow
If you set up your ETL project using Dataflow you can benefit from all these testing methodologies. But it’s important to mention that the Dataflow team has not made all these features from scratch, and that it combined existing tools and libraries into a coherent framework for making data pipelines work (at Netflix) more efficient and less frustrating.
Let’s review the different styles of testing in the context of branch driven configuration. As you read about them we recommend going back to the above diagram to better understand how each testing style fits inside the development cycle.
Unit Tests
In the past, verifying SQL syntax was a cumbersome process. You had to copy your code into a SQL editor, replace all parameters with appropriate values, and prepend an EXPLAIN [PLAN] statement. This method allows you to check the syntax of your code. However, if you needed to make changes, you face the risk of introducing new syntax errors, necessitating a repeat of the entire process.
Motivation
Since running the entire pipeline (with embedded SQL) in production, sandbox, or SQL editor, can be very time consuming, writing unit tests can quickly identify obvious errors. This approach enhances the development process’s efficiency and minimizes the risk of disruptions in your ETL code.
Isolated PySpark
At the risk of stating the obvious, let’s clarify a few things about unit testing. Unit testing should not be performed by using an actual spark session, with access to the production catalog, because it would inevitably lead to issues happening on production data at some point.
Hence, for the purpose of unit testing, under the Dataflow initiative, we developed an internal library to help with this issue. It is a Python module that provides classes similar in interface to the unittest.TestCase
library but based on a version PySpark identical to Netflix’s internal Spark, with all its UDF library included. This module called dataflow.unittest
is insulated from production tables, making it ideal for unit testing in a Spark environment without affecting any live data.
The above Dataflow unit test framework starts by checking for the presence of this specialized PySpark version. If instead a regular (PySpark) version is found, Dataflow throws a warning advising against using it in unit testing due to the risk of accidental access to production databases.
Example
Let’s revisit an example from our previous blog, where we compute the top hundred movies/shows on a daily basis. Below is a refined reference to the SQL script that accomplishes this task and writes the results to a target table:
-- Step 1: Aggregate view hours by title and country
WITH STEP_1 AS (
SELECT
title_id,
country_code,
SUM(view_hours) AS view_hours
FROM schema.playback
WHERE playback_date = CURRENT_DATE
GROUP BY
title_id,
country_code
),
-- Step 2: Rank all titles from most watched to least in every country
STEP_2 AS (
SELECT
title_id,
country_code,
view_hours,
RANK() OVER (
PARTITION BY country_code
ORDER BY view_hours DESC
) AS title_rank
FROM STEP_1
),
-- Step 3: Filter all titles to the top 100
STEP_3 AS (
SELECT
title_id,
country_code,
view_hours,
title_rank
FROM STEP_2
WHERE title_rank <= 100
)
-- Write to target table
-- Insert the final results into the target table
INSERT INTO ${TARGET_DB}.dataflow_results
SELECT
title_id,
country_code,
title_rank,
view_hours,
CURRENT_DATE AS date
FROM STEP_3;
To ensure comprehensive unit testing and avoid any gaps, we need to test the SQL in this pipeline. This thorough testing guarantees full coverage and ensures that each part of the SQL (top100.sql
) logic is functioning correctly and reliably.
Additionally, here is quick reference to the file structure that includes the unit test itself and the SQL file to test:
top100
├── pipeline-definition.yaml
├── ddl
│ └── dataflow_targettable.sql
└── src
├── mocks
│ ├── dataflow_top100_expected_sample.yaml
│ ├── schema.playback.yaml
├── top100.sql
In the example unit test, we will:
- prepare the source and target tables that participate in our workflow.
- then, test the SQL code using a
run_sql_from_file
method which executes it in a local Netflix-specialized Spark environment. - and finally, compare the results against the mock data.
Below is an example of a unit test class that accomplishes this:
from dataflow.unittest import TestSparkSession
class TestSparkSQLWrite(TestSparkSession):
def test_write(self):
# prepare test arguments
args = {"TARGET_DB": "foo", "TARGET_TABLE": ...}
# prepare source table: schema.playback
self.create_table_from_yaml("./mocks/schema.playback.yaml")
# prepare target table
self.create_table_from_ddl(
"../ddl/dataflow_top100_expected_sample.sql", args
)
# run the job
self.run_sql_from_file("./top100.sql", variables=args)
# evaluate results
self.compare_table_with_yaml(
args.TARGET_TABLE, "./mocks/dataflow_top100_expected_sample.yaml"
)
Mock Datasets
Dataflow unit tests require sample data to ensure the accuracy of the transformations. Data Engineers at Netflix frequently generate sample data for their test cases. These sample data sets are a small subset of the actual data and they can be created and stored in a YAML file using the Dataflow mock feature. This feature have been discussed in our previous blog post, but here is a quick command line output for reference:
$ dataflow mock save schema.playback
Sample data from schema.playback dataset successfully written to ./schema.playback.yaml!
In the above example some sample data from the schema.playback
table is generated and saved in a YAML file. Once this mock data is prepared, we can use it to construct a mock table as part of our unit test code. That mock dataset is necessary for all the input tables of your test code, but you don’t need to do that for the output/target table. When the unit test code runs, and the compare_table_with_yaml
method does not find the table to compare, it will create one for you. And when it does it will also fail the test, so you know what happened and you have a chance to review it before merging the code.
In conclusion, unit testing pipelines can ensure that your SQL code has comprehensive coverage. While unit tests are a cornerstone of good software practices, their importance in ETL pipelines is paramount as well.
Integration Tests
Unit tests are cool and fun to write. They essentially allow for quick and frequent feedback during the development cycle. Still, there is no chance that unit tests will tell you that your workflow definition is correct or that you connected the workflow steps properly. This is where integration testing comes in. For the purpose of this article we define integration testing as execution of one or more workflows in a non-production environment.
Every one of us can probably relate to the “fascinating” world of testing your ETL code. You change some transformation, you save, you kick off the workflow and you go for a coffee (or tea), unless you like to watch the paint dry as described by Max Beauchemin in his blog titled the Downfall of the Data Engineer. And before you have saved your changes, you hopefully remembered to change the output of your code to a temporary target, otherwise you are going to have a bad time and you may need to restore some tables.
This is where we need to recall the Dataflow branch driven configuration. With the simple ability to automatically switch your target tables to a different name or different database, depending on the runtime environment we make the integration testing less prone to errors and much easier to reproduce.
For the illustration of this pattern we’ll continue using the example pipeline from the previous chapter, which is also the example pipeline that comes with Dataflow.
First let’s review the components that will be playing a role here. Let’s assume we have an existing workflow running in production called dataflow_sparksql_sample.workflow
. This workflow runs on a daily basis and every day creates a partition of data in a table called dataflow_sparksql_sample
. Let’s have a look at the partial definition:
Trigger:
tz: US/Pacific
Variables:
TARGET_TABLE: dataflow_top100_expected_sample
TARGET_DB: ${DATAFLOW.TARGET_DB}
Workflow:
id: dataflow_top100_expected_sample.workflow.${DATAFLOW.BRANCH}
jobs:
- job:
id: ddl
type: Spark
spark:
script: $S3{./ddl/dataflow_top100_expected_sample.sql}
parameters:
TARGET_DB: ${DATAFLOW.TARGET_DB}
- job:
id: write
type: Spark
spark:
script: $S3{./src/top100.sql}
parameters:
SOURCE_DB: prod
TARGET_DB: ${DATAFLOW.TARGET_DB}
DATE: ${CURRENT_DATE}
Notice that some variables listed in the above workflow are managed by Dataflow. These variables must be defined in the Dataflow config file. For example, based on the above workflow definition we should make sure that DATAFLOW.TARGET_DB
variable is defined.
Example dataflow.yaml
config file may look like this:
defaults:
workflows:
variables:
TARGET_DB: ${dataflow.username}_db
branches:
staging:
workflows:
variables:
TARGET_DB: stg
main:
workflows:
variables:
TARGET_DB: prod
Now, if anyone clones the repository with the above code and needs to make a modification to the SQL transformation, they can simply follow these general steps:
- Make some change to the transformation code and DDL, if necessary. Adjust the unit tests and make sure they pass (see previous chapter).
- Run the workflow locally in a safe user namespace using:
dataflow project run ./dataflow_top100_sample.workflow.yaml
and as long as the above command is executed in a custom git branch, the target database for the output table will be replaced with<username>_db
. - Commit your code, make a PR, have it reviewed and merge it to the
staging
branch. The Dataflow managed CI/CD will automatically test the PR and upon merging it will kick off another run of the workflow in thestaging
context. Which means the data will be written to the target table in thestg
database, as per Dataflow config settings.
That’s it. Once the same pull-request makes it to the main
branch the production located workflow will be automatically updated and there is high certainty that everything will work as expected after going through the multiple stages of testing already. Notice that no adjustments are necessary, for the purpose of testing the updated transformation on the real data, and that a single Dataflow command safely deploys and executes your workflow depending on the git context it is running on. This makes the development cycle much safer, and with easily repeatable executions.
But we all know that code testing is one thing and that the input data can surprise us even when we think we considered all the edge cases. And that’s where data audits come in…
Data Audits
Data audits are essential for ensuring that the final dataset is fit for consumption. While unit tests and integration tests cover technical aspects, data audits validate that the data is accurate and aligns with business expectations. Each business use case has specific data requirements, such as non-nullable columns, defined value ranges, and expected daily row counts. Data audits verify that incoming data meets these criteria, confirming its consistency and reliability according to business standards.
To achieve this, another Netflix team has created DataAuditor, a Python library designed to ensure data quality by evaluating data in tables or columns within our data warehouse. A typical audit process involves selecting data, evaluating it against predefined checks, and producing a binary result (pass/fail). DataAuditor check can be integrated into any workflow definitions and workflows can be halted or alerts can be generated in case of data audit failures. Since DataAuditor jobs take table names and queries as parameters they can be easily adjusted in the scope of Dataflow branch configuration.
DataAuditor offers a suite of canned audits for common data quality checks, requiring minimal code and configuration in the workflow. Some of these canned audits leverage the Iceberg __partitions
metadata table, enabling fast execution with minimal overhead and without the need to scan the full dataset. Below are some examples of these canned audits:
- Column should not have nulls
- Column should be unique on primary key
- Column should have values in range
- Column should have values matching regex
Let’s dive into how these audits can be applied to our example dataset of top movies/shows to ensure the data is accurate and robust.
For reference, this is what we had as result from an example stated in detail in previous blog
sql> SELECT * FROM foo.dataflow_sample_results
WHERE date = 20220101 and country_code = 'US'
ORDER BY title_rank LIMIT 5;
title_id | country_code | title_rank | view_hours | date
----------+--------------+------------+------------+----------
11111111 | US | 1 | 123 | 20220101
44444444 | US | 2 | 111 | 20220101
33333333 | US | 3 | 98 | 20220101
55555555 | US | 4 | 55 | 20220101
22222222 | US | 5 | 11 | 20220101
In this example, we want to make sure that final data product satisfies below requirements:
- County code should be a not null column
- Primary key should be
title_id
andcountry_code
(meaning there should be no duplicate rows for this combination) - Unique values in
country_code
should be 195 (assumption: 195 countries in total) - Every country should have a top 10 meaning this table should have 195 * 10 records = 1950 (assuming each country has at least 10 titles watched)
- Partitioning column of table should be
dateint
Example data audits
data_auditor:
audits:
- function: table_should_be_unique_on_primary_key
blocking: true
params:
table: ${dataflow.TARGET_DB}.dataflow_sample_results
- function: columns_should_not_have_nulls
blocking: true
params:
table: ${dataflow.TARGET_DB}.dataflow_sample_results
columns: [country_code]
- function: table_should_have_dateint_partitions
params:
table: ${dataflow.TARGET_DB}.dataflow_sample_results
- function: query_should_return_country_count_within_range
params:
query_name: Total Country Count for Top10
targets: ${TABLE_PATH}
query: >
SELECT
COUNT(distinct country_code) AS total_country_count
FROM ${dataflow.TARGET_DB}.dataflow_sample_results
WHERE date = ${TODAY_DATE}
lower_bound: 195
upper_bound: 195
- function: query_should_return_number_rows_within_range
params:
query_name: Total Row Count for Top10
targets: ${dataflow.TARGET_DB}.dataflow_sample_results
query: >
SELECT
COUNT(*) AS total_row_count
FROM ${dataflow.TARGET_DB}.dataflow_sample_results
WHERE date = ${TODAY_DATE}
lower_bound: 1950
upper_bound: 1950
DataAuditor empowers everyone to seamlessly integrate data checks into their workflows, enabling early detection and resolution of data issues. This proactive approach not only enhances the quality of data products but also fosters trust and confidence in the insights derived from them. And the best thing about DataAuditor is that you can use it both for the production part of the datasets you own, as well as for the consumption steps of the datasets you read from — in case you want to protect your transformations from some undesired cases of the input data.
Project Variables
In case that the above framework does not provide enough flexibility in testing or deployment of your project, Dataflow features two more knobs to make your CI/CD workflows do exactly what you’d like them to. These two knobs are project variables and custom hooks.
It is not uncommon for workflow definitions to require some values passed from the environment or from the user. Dataflow makes it possible with the help of project variables. There are three general types of variables that folks can reference in their Dataflow managed workflows:
- system variables
- workflow asset variables
- custom variables
System variables are provided by Dataflow out-of-the-box and can be used in workflow definitions without any additional setup. There are not too many of these since their scope is limited. Here are few examples:
${dataflow.username}
returns the current local username${dataflow.branch}
returns the current local git branch name${dataflow.commit_hash}
returns the current git commit hash
Workflow asset variables make it possible to reference the unique location of the asset built by Dataflow, whether from the current or from any other Dataflow project. Here are some examples:
${dataflow.jar.<namespace>}
returns the location of the latest jar from the Dataflow project under some namespace.${dataflow.egg.<namespace>}
returns the location of the latest egg from the Dataflow project under some namespace.${dataflow.jar.<namespace>.<build>}
returns the location of a specific build of some jar from the Dataflow project under some namespace.${dataflow.egg.<namespace>.<build>}
returns the location of a specific build of some egg from the Dataflow project under some namespace.
Custom variables are the most interesting one. They can be defined at the project scope and then redefined for any arbitrary git branch, which enables a deeper CI/CD integration inside your code. Let’s say you want to define a variable ${dataflow.my_db}
and then adjust its value depending on the execution branch. Here is how you could define it in your Dataflow config:
defaults:
workflows:
variables:
my_db: default_db
branches:
staging:
workflows:
variables:
my_db: dev_db
main:
workflows:
variables:
my_db: prod_db
For debugging purposes, as Dataflow renders your workflows, whether for the purpose of testing or deployment, it can show how these variables are replaced and with what values.
Custom Hooks
If the Dataflow project variables are not sufficient to make the CI/CD of your repository do exactly what you need there is one more knob to make that happen. It is called “custom hooks” and it allows you to seamlessly plug in a script either before or after execution of any standard Dataflow command. Let’s have a look at an example.
Let’s say you’d like to add some logic to the standard dataflow project test
command. Of course if this is a generally needed logic you are encouraged to open a pull-request to Dataflow repo with a contribution. But if this is just something you need for your project only then you could simply add the following scripts to your repository:
- PROJECT_ROOT/dataflow-hooks/project/before-test.sh
- PROJECT_ROOT/dataflow-hooks/project/after-test.sh
The directory and file naming convention allows Dataflow to find and run these files w/o any configuration. Notice the bolded path elements above. The Dataflow command for which those scripts are invoked is embedded in the path. For the purposes of this example let’s say that these scripts simply echo “Hello world!” and “Bye-bye!” respectively.
With the above scripts in place this is what your dataflow project test
command would look like:
$ dataflow project test
Dataflow (<version>)
Running custom script <PROJECT_DIR>/dataflow-hooks/project/before-test.sh ...
Hello world!
# output of the actual "dataflow project test" command
Running custom script <PROJECT_DIR>/dataflow-hooks/project/after-test.sh ...
Bye-bye!
One could say that these custom hooks are unnecessary. After all you could probably already adjust your CI/CD jobs to execute any custom scripts before or after the Dataflow command. And that’s true… but remember that one of the main goals of Dataflow is standardization, and if someone who is not familiar with your projects comes in and attempts to run a Dataflow command, they will automatically have your custom logic executed if it is plugged in as a Dataflow hook.
Conclusions
Dataflow provides a robust testing framework inside the Netflix data pipeline ecosystem. This is especially valuable for the Spark SQL code which used to not be easy to unit test. All these test features, whether for unit testing, integration testing, or data audits come in the form of Dataflow commands or Python libraries, which make them easy to set up, easy to run, and provide no excuse to not instrument all your ETL workflows with robust tests. And the best part is that, once created, all these tests will run automatically during standard Dataflow command calls or during the CI/CD workflows, allowing for automated checking of code changes made by folks who may be unaware of the whole setup.
Credits
As always we’d like to thank many engineers for contributing to the Dataflow project and the features we described in this post. The slick Dataflow development environment would not be possible without help from the following folks:
- John Zhuge for building and maintaining the Netflix isolated PySpark library.
- Max McFarland for creating the original Dataflow
unittest
module. - Samuel Redai for building the original Dataflow branch driven configuration.
- Stephen Huenneke for the last few years of making sure that DataAuditor is a dependable component of data engineering infrastructure at Netflix.
- And many others who contributed to the Netflix data engineering tools ecosystem.
[1]: Dataflow’s current scheduler of choice is Maestro, a Netflix’s home grown platform that has recently been open sourced.