Don’t reinvent the wheel by writing more boilerplate code. Focus on critical business logic and delegate the tricky details to a clever tool.
Introduction
Apache Spark is a powerful framework for building flexible and scalable data processing applications. Spark offers a relatively simple yet powerful API that provides the capabilities of classic SQL SELECT statements in a clean and flexible API. At the same time, Spark can access all kinds of data sources, such as object stores in the cloud (S3, ABS, …), relational databases via JDBC and much more via custom connectors (Kafka, HBase, MongoDB, Cassandra, …). This wide range of connectors, coupled with the extensive support for data transformations, supports both simple data ingestion, pure data transformation or any combination such as ETL and ELT – Sparks flexibility supports very different types of workflows to fit your exact needs.
But in the end, a framework for a data processing application is not enough. You still need to build an application on top of Apache Spark that includes all the business logic and lots of boilerplate code. In addition, some important data-related topics fall outside the scope of Apache Spark, such as robust schema management, including schema evolution and migration strategies, or capturing relevant metrics to measure data quality.
In this situation, a tool at a higher level of abstraction can provide practical solutions, and Flowman is just such a tool. It is completely open source and free of charge, with pre-built packages for various Spark versions (starting from 2.4 until 3.3) and environments (pure Spark, Cloudera, AWS EMR, Azure Synapse).
Declarative Approach
Flowman uses a purely declarative approach with simple YAML files to specify all data sources, transformations, and data sinks. This makes the life of a data engineer much easier as no expertise in classical software engineering is required, and he/she can focus on “data”.
A powerful set of command-line applications then parses these specification files and performs all data transformations with Apache Spark as the workhorse under the hood.
Flowman Entities
Flowman uses several types of entities for modeling data transformations. The
main entities are as follows:
- Relations represent physical data sources and sinks. Each relation provides the type (Hive table, JDBC, files, …) together with type-specific properties and an optional schema.
- Mappings represent logic blocks for transforming data. The result of a mapping can be used as the input of other mappings, so you can build a complex graph of transformations to be applied to data. Typically, each graph starts with a special mapping referring to a relation to read in the data as the first step.
- Targets are finally used to define build targets, which eventually write the results of a mapping into a relation again. In addition to specifying individual targets, you can also bundle multiple targets into jobs, in order to simplify building multiple logically related targets within a single execution cycle.
In addition to these core entities, Flowman also supports several additional entities, such as for defining unit tests, publishing execution metrics to a monitoring tool, and more. For more information, see the official Flowman documentation.
Simple Example
The following small example gives an insight how to implement a simple project with Flowman. The goal is to read in some raw data from weather sensors stored in ASCII files, extract some measurements and save the result as Parquet files.
You find the full example on GitHub.
1. Variables
Flowman uses global variables that can be used in your project. These can be overridden with different values on the command line or via execution profiles.
# File: flow/config/environment.yml
environment:
- basedir=file:///datawarehouse/weather
- srcdir=$System.getenv('WEATHER_SRCDIR', 's3a://dimajix-training/data/weather')
Code language: YAML (yaml)
2. Relations
First, we need to define the relations. These represent the physical data sources and sinks. Therefore, our example requires two relations, one for the raw source data and one for the target data.
# File: flow/model/measurements-raw.yml
relations:
# Define source relation containing the raw ASCII data
measurements_raw:
kind: file
format: text
location: "$srcdir/"
# Define the pattern to be used for partitions
pattern: "${year}"
# Define data partitions. Each year is stored in a separate subdirectory
partitions:
- name: year
type: integer
granularity: 1
description: "The year when the measurement was made"
schema:
# Specify the (single) column via an embedded schema.
kind: inline
fields:
- name: raw_data
type: string
description: "Raw measurement data"
Code language: YAML (yaml)
# File: flow/model/measurements.yml
relations:
# Define the target relation containing the extracted measurements stored
# as Parquet files
measurements:
kind: file
format: parquet
location: "$basedir/measurements/"
# Define data partitions. Each year is stored in a separate subdirectory
partitions:
- name: year
type: integer
granularity: 1
# We use the inferred schema of the mapping that is written into the relation
schema:
kind: mapping
mapping: measurements_extracted
Code language: YAML (yaml)
3. Mappings
Now we need to define two mappings: One for reading the raw data from the input relation, and one for extracting the measurements from the ASCII text.
# File: flow/mapping/measurements.yml
mappings:
# This mapping refers to the "raw" relation and reads in data from
# the source in S3
measurements_raw:
kind: relation
relation: measurements_raw
partitions:
year: $year
# Extract multiple columns from the raw measurements data
# using SQL SUBSTR functions
measurements_extracted:
kind: select
input: measurements_raw
columns:
usaf: "CAST(SUBSTR(raw_data,5,6) AS INT)"
wban: "CAST(SUBSTR(raw_data,11,5) AS INT)"
date: "TO_DATE(SUBSTR(raw_data,16,8), 'yyyyMMdd')"
time: "SUBSTR(raw_data,24,4)"
report_type: "SUBSTR(raw_data,42,5)"
wind_direction: "CAST(SUBSTR(raw_data,61,3) AS INT)"
wind_direction_qual: "SUBSTR(raw_data,64,1)"
wind_observation: "SUBSTR(raw_data,65,1)"
wind_speed: "CAST(CAST(SUBSTR(raw_data,66,4) AS FLOAT)/10 AS FLOAT)"
wind_speed_qual: "SUBSTR(raw_data,70,1)"
air_temperature: "CAST(CAST(SUBSTR(raw_data,88,5) AS FLOAT)/10 AS FLOAT)"
air_temperature_qual: "SUBSTR(raw_data,93,1)"
Code language: YAML (yaml)
Flowman supports many different types, for example for filtering, performing grouped aggregations, joins and many more. Of course there is also a generic SQL mapping which allows you to write arbitrary (Spark) SQL code.
4. Targets
The mapping measurements_extracted
contains the structured information that we want to store in Parquet files, represented by the measurements
relation. We define a build target for writing all records produced by the mapping into the output relation:
# File: flow/target/measurements.yml
targets:
# Define build target for measurements
measurements:
# Again, the target is of type "relation"
kind: relation
description: "Write extracted measurements per year"
# Read records from mapping
mapping: measurements_extracted
# ... and write them into the relation "measurements"
relation: measurements
# Specify the data partition to be written
partition:
year: $year
Code language: YAML (yaml)
5. Jobs
Finally, we create a job, which in this simple example only contains the single target defined above. Normally, in more complex projects, you’d bundle multiple related targets to simplify execution.
# File: flow/job/main.yml
jobs:
# Define the 'main' job, which implicitly is used whenever you build
# the whole project
main:
# Add a parameter for selecting the year to process. This will create an
# environment variable `$year` which can be accessed from within other
# entities like mappings, relations, etc.
parameters:
- name: year
type: Integer
default: 2013
# List all targets which should be built as part of the `main` job
targets:
- measurements
Code language: YAML (yaml)
That’s essentially all what we need to describe the whole data flow. The project can then be executed with Flowman via
flowexec project build main year=2014
Features
The value of Flowman lies not only in the simplicity of its declarative approach, but it also provides additional functionality that would otherwise require more complex code to be written and maintained by a data engineer.
Different Environments (DEV, TEST, PROD,…)
Of course, your ETL application needs to be able to run in different environments, starting with the development environment, a test environment and finally a production environment. Flowman solves this problem either by special profiles or by accessing system environment variables.
# File: flow/config/profiles.yml
profiles:
dev:
environment:
- basedir=s3a://flowman-dev/datawarehouse/weather
prod:
environment:
- basedir=s3a://flowman-prod/datawarehouse/weather
Code language: YAML (yaml)
Partial Execution
In many cases, a single Spark batch application produces different outputs (e.g., writing to different Hive tables, writing multiple files with different content, etc.). However, once you need to customize the logic of a single table, you probably want to recreate only that table.
Flowman uses the concept of *build targets*, each representing a single output table. Multiple, logically related targets can be bundled into *jobs* so that multiple targets are built in a single invocation of Flowman. Of course, you can instruct Flowman to build only a subset of all targets, which solves the above problem.
flowexec job build main year=2015 --target t1
Code language: Bash (bash)
Automated Testing with Unit Tests
Running meaningful tests in any data application is difficult because the results depend not only on the application logic but also on the data. Nevertheless, unit tests for complex business logic still have a huge value. A solid approach is to mock the input data and compare the actual result with your expectations.
Flowman provides first-class support for unit testing, including mocking any data source or other input for a particular mapping.
# File: flow/test/test-measurements.yml
tests:
test_measurements:
environment:
- year=2013
overrideMappings:
measurements_raw:
# Mocking a data source will pick up the original data schema, and the only
# thing left to do is providing mocked records
kind: mock
records:
# Provide two records including both the raw data and the partition key
- year: $year
raw_data: "042599999963897201301010000I+32335-086979CRN05+004..."
- year: $year
raw_data: "013399999963897201301010005I+32335-086979CRN05+004..."
assertions:
measurements_extracted:
kind: sql
description: "Measurements are extracted correctly"
tests:
- query: "SELECT * FROM measurements_extracted"
expected:
- [ 999999,63897,2013-01-01,0000,CRN05,124,1,H,0.9,1,10.6,1 ]
- [ 999999,63897,2013-01-01,0005,CRN05,124,1,H,1.5,1,10.6,1 ]
- query: "SELECT COUNT(*) FROM measurements_extracted"
expected: 2
Code language: YAML (yaml)
A good requirements engineer can provide examples of input data sets and the expected result. This is a perfect starting point for creating unit tests in which you replace the real data sources (e.g., Hive tables or JDBC data sources) with the sample data sets, run your data transformations, and then compare the result with the given expectation.
Automatic Schema Management
Another important aspect of a data application is the responsibility to actually create the output tables before writing to them. To manage the schema of a table, Flowman supports either schema inference from a chain of mappings or explicit specification of a schema. An explicit schema helps to detect errors in your logic early on, which would lead to a different (incompatible) schema.
# File: flow/model/measurements.yml
relations:
# measurements will use an implicit schema defined as the output of a mapping
measurements:
kind: file
format: parquet
location: "$basedir/measurements/"
# We use the inferred schema of the mapping that is written into the relation
schema:
kind: mapping
mapping: measurements_extracted
Code language: YAML (yaml)
# File: flow/model/stations.yml
relations:
# stations uses an explicit schema provided as an Avro schema definition file
stations:
kind: file
format: parquet
location: "$basedir/stations/"
# Use an explicit schema stored in an external file
schema:
kind: avro
file: "${project.basedir}/schema/stations.avsc"
Code language: YAML (yaml)
In addition, the schema is likely to change over time due to new business requirements. Flowman detects a schema mismatch between the actual physical table and the desired schema and automatically adjusts the table definition by adding/removing/modifying columns in the table definition.
Observability
Observability has rightly become a hot topic for virtually all applications. Following the DevOps principle of “you build it – you run it,” you want to understand what your application is actually doing. There are two important and complementary approaches to this topic: meaningful logging and execution metrics.
First, Flowman provides meaningful logging output:
Second, Flowman also provides meaningful metrics such as the number of records read or written. In addition, you can also define custom metrics to get more insights from the business point of view. All of these metrics can be published to Prometheus, which in turn can be connected to Grafana for creating dashboards.
The metrics are collected as part of a job, as follows:
# File: flow/job/main.yml
jobs:
main:
parameters:
- name: year
type: Integer
default: 2013
targets:
- measurements
# Define metrics to be published while running this job
metrics:
labels:
project: "${project.name}"
metrics:
- selector:
# Select all metrics
name: ".*"
labels:
# Define the labels, which should be attached to each metric
category: "$category"
kind: "$kind"
name: "$name"
Code language: YAML (yaml)
The metrics can be published to different sinks, for example, Prometheus, a relational database or simply printed onto the console:
# File: conf/default-namespace.yml
metrics:
# Publish metrics to Prometheus
- kind: prometheus
url: $System.getenv('URL_PROMETHEUS_PUSHGW', 'http://pushgateway:9091')
# Define how the labels which should be published
labels:
namespace: ${namespace.name}
project: ${project.name}
job: ${job.name}
phase: ${phase}
# Publish metrics to SQL database
- kind: jdbc
connection:
kind: jdbc
driver: "org.mariadb.jdbc.Driver"
url: "jdbc:mariadb://mariadb/flowman_metrics"
username: "flowman_metrics"
password: "flowman_metrics"
# Define how the labels which should be published
labels:
project: ${project.name}
version: ${project.version}
job: ${job.name}
phase: ${phase}
# Print all metrics onto the console
- kind: console
Code language: YAML (yaml)
Extensibility
Although Flowman already comes with a solid set of relations, mappings, targets and more, it can be extended with custom plugins. This, of course, requires a certain level of expertise in Spark and Scala, but allows organizations to use Flowman for their workflows, but keep the custom logic behind their doors
without having to open source their code.
Summary
Flowman simplifies the development of complex ETL jobs by providing a simple but powerful declarative interface on top of Apache Spark. This helps data engineers focus on the “data” part. At the same time, Flowman provides advanced features such as automated schema management and unit testing to reduce development effort and increase productivity.
If you like the idea of Flowman, please help us to spread the word and star it on GitHub.