Flowman — A Declarative ETL Framework for Apache Spark

Email
LinkedIn
Facebook
Twitter

Don’t reinvent the wheel by writing more boilerplate code. Focus on critical business logic and delegate the tricky details to a clever tool.

Introduction

Apache Spark is a powerful framework for building flexible and scalable data processing applications. Spark offers a relatively simple yet powerful API that provides the capabilities of classic SQL SELECT statements in a clean and flexible API. At the same time, Spark can access all kinds of data sources, such as object stores in the cloud (S3, ABS, …), relational databases via JDBC and much more via custom connectors (Kafka, HBase, MongoDB, Cassandra, …). This wide range of connectors, coupled with the extensive support for data transformations, supports both simple data ingestion, pure data transformation or any combination such as ETL and ELT – Sparks flexibility supports very different types of workflows to fit your exact needs.

But in the end, a framework for a data processing application is not enough. You still need to build an application on top of Apache Spark that includes all the business logic and lots of boilerplate code. In addition, some important data-related topics fall outside the scope of Apache Spark, such as robust schema management, including schema evolution and migration strategies, or capturing relevant metrics to measure data quality.

In this situation, a tool at a higher level of abstraction can provide practical solutions, and Flowman is just such a tool. It is completely open source and free of charge, with pre-built packages for various Spark versions (starting from 2.4 until 3.3) and environments (pure Spark, Cloudera, AWS EMR, Azure Synapse).

Declarative Approach

Flowman uses a purely declarative approach with simple YAML files to specify all data sources, transformations, and data sinks. This makes the life of a data engineer much easier as no expertise in classical software engineering is required, and he/she can focus on “data”.

A powerful set of command-line applications then parses these specification files and performs all data transformations with Apache Spark as the workhorse under the hood.

Flowman Entities

Flowman uses several types of entities for modeling data transformations. The
main entities are as follows:

  • Relations represent physical data sources and sinks. Each relation provides the type (Hive table, JDBC, files, …) together with type-specific properties and an optional schema.
  • Mappings represent logic blocks for transforming data. The result of a mapping can be used as the input of other mappings, so you can build a complex graph of transformations to be applied to data. Typically, each graph starts with a special mapping referring to a relation to read in the data as the first step.
  • Targets are finally used to define build targets, which eventually write the results of a mapping into a relation again. In addition to specifying individual targets, you can also bundle multiple targets into jobs, in order to simplify building multiple logically related targets within a single execution cycle.

In addition to these core entities, Flowman also supports several additional entities, such as for defining unit tests, publishing execution metrics to a monitoring tool, and more. For more information, see the official Flowman documentation.

Simple Example

The following small example gives an insight how to implement a simple project with Flowman. The goal is to read in some raw data from weather sensors stored in ASCII files, extract some measurements and save the result as Parquet files.

You find the full example on GitHub.

1. Variables

Flowman uses global variables that can be used in your project. These can be overridden with different values on the command line or via execution profiles.

# File: flow/config/environment.yml
environment:
  - basedir=file:///datawarehouse/weather
  - srcdir=$System.getenv('WEATHER_SRCDIR', 's3a://dimajix-training/data/weather')
Code language: YAML (yaml)

2. Relations

First, we need to define the relations. These represent the physical data sources and sinks. Therefore, our example requires two relations, one for the raw source data and one for the target data.

# File: flow/model/measurements-raw.yml
relations:
  # Define source relation containing the raw ASCII data
  measurements_raw:
    kind: file
    format: text
    location: "$srcdir/"
    # Define the pattern to be used for partitions
    pattern: "${year}"
    # Define data partitions. Each year is stored in a separate subdirectory
    partitions:
      - name: year
        type: integer
        granularity: 1
        description: "The year when the measurement was made"
    schema:
      # Specify the (single) column via an embedded schema.
      kind: inline
      fields:
        - name: raw_data
          type: string
          description: "Raw measurement data"
Code language: YAML (yaml)
# File: flow/model/measurements.yml
relations:
  # Define the target relation containing the extracted measurements stored
  # as Parquet files
  measurements:
    kind: file
    format: parquet
    location: "$basedir/measurements/"
    # Define data partitions. Each year is stored in a separate subdirectory
    partitions:
      - name: year
        type: integer
        granularity: 1
    # We use the inferred schema of the mapping that is written into the relation
    schema:
      kind: mapping
      mapping: measurements_extracted
Code language: YAML (yaml)

3. Mappings

Now we need to define two mappings: One for reading the raw data from the input relation, and one for extracting the measurements from the ASCII text.

# File: flow/mapping/measurements.yml
mappings:
  # This mapping refers to the "raw" relation and reads in data from
  # the source in S3
  measurements_raw:
    kind: relation
    relation: measurements_raw
    partitions:
      year: $year

  # Extract multiple columns from the raw measurements data 
  # using SQL SUBSTR functions
  measurements_extracted:
    kind: select
    input: measurements_raw
    columns:
      usaf: "CAST(SUBSTR(raw_data,5,6) AS INT)"
      wban: "CAST(SUBSTR(raw_data,11,5) AS INT)"
      date: "TO_DATE(SUBSTR(raw_data,16,8), 'yyyyMMdd')"
      time: "SUBSTR(raw_data,24,4)"
      report_type: "SUBSTR(raw_data,42,5)"
      wind_direction: "CAST(SUBSTR(raw_data,61,3) AS INT)"
      wind_direction_qual: "SUBSTR(raw_data,64,1)"
      wind_observation: "SUBSTR(raw_data,65,1)"
      wind_speed: "CAST(CAST(SUBSTR(raw_data,66,4) AS FLOAT)/10 AS FLOAT)"
      wind_speed_qual: "SUBSTR(raw_data,70,1)"
      air_temperature: "CAST(CAST(SUBSTR(raw_data,88,5) AS FLOAT)/10 AS FLOAT)"
      air_temperature_qual: "SUBSTR(raw_data,93,1)"
Code language: YAML (yaml)

Flowman supports many different types, for example for filtering, performing grouped aggregations, joins and many more. Of course there is also a generic SQL mapping which allows you to write arbitrary (Spark) SQL code.

4. Targets

The mapping measurements_extracted contains the structured information that we want to store in Parquet files, represented by the measurements relation. We define a build target for writing all records produced by the mapping into the output relation:

# File: flow/target/measurements.yml
targets:
  # Define build target for measurements
  measurements:
    # Again, the target is of type "relation"
    kind: relation
    description: "Write extracted measurements per year"
    # Read records from mapping
    mapping: measurements_extracted
    # ... and write them into the relation "measurements"
    relation: measurements
    # Specify the data partition to be written
    partition:
      year: $year
Code language: YAML (yaml)

5. Jobs

Finally, we create a job, which in this simple example only contains the single target defined above. Normally, in more complex projects, you’d bundle multiple related targets to simplify execution.

# File: flow/job/main.yml
jobs:
  # Define the 'main' job, which implicitly is used whenever you build 
  # the whole project
  main:
    # Add a parameter for selecting the year to process. This will create an
    # environment variable `$year` which can be accessed from within other 
    # entities like mappings, relations, etc.
    parameters:
      - name: year
        type: Integer
        default: 2013
    # List all targets which should be built as part of the `main` job
    targets:
      - measurements
Code language: YAML (yaml)

That’s essentially all what we need to describe the whole data flow. The project can then be executed with Flowman via

flowexec project build main year=2014

Features

The value of Flowman lies not only in the simplicity of its declarative approach, but it also provides additional functionality that would otherwise require more complex code to be written and maintained by a data engineer.

Different Environments (DEV, TEST, PROD,…)

Of course, your ETL application needs to be able to run in different environments, starting with the development environment, a test environment and finally a production environment. Flowman solves this problem either by special profiles or by accessing system environment variables.

# File: flow/config/profiles.yml
profiles:
  dev:
    environment:
      - basedir=s3a://flowman-dev/datawarehouse/weather

  prod:
    environment:
      - basedir=s3a://flowman-prod/datawarehouse/weather
Code language: YAML (yaml)

Partial Execution

In many cases, a single Spark batch application produces different outputs (e.g., writing to different Hive tables, writing multiple files with different content, etc.). However, once you need to customize the logic of a single table, you probably want to recreate only that table.

Flowman uses the concept of *build targets*, each representing a single output table. Multiple, logically related targets can be bundled into *jobs* so that multiple targets are built in a single invocation of Flowman. Of course, you can instruct Flowman to build only a subset of all targets, which solves the above problem.

flowexec job build main year=2015 --target t1Code language: Bash (bash)

Automated Testing with Unit Tests

Running meaningful tests in any data application is difficult because the results depend not only on the application logic but also on the data. Nevertheless, unit tests for complex business logic still have a huge value. A solid approach is to mock the input data and compare the actual result with your expectations.

Flowman provides first-class support for unit testing, including mocking any data source or other input for a particular mapping.

# File: flow/test/test-measurements.yml
tests:
  test_measurements:
    environment:
      - year=2013

    overrideMappings:
      measurements_raw:
        # Mocking a data source will pick up the original data schema, and the only
        # thing left to do is providing mocked records
        kind: mock
        records:
          # Provide two records including both the raw data and the partition key
          - year: $year
            raw_data: "042599999963897201301010000I+32335-086979CRN05+004..."
          - year: $year
            raw_data: "013399999963897201301010005I+32335-086979CRN05+004..."

    assertions:
      measurements_extracted:
        kind: sql
        description: "Measurements are extracted correctly"
        tests:
          - query: "SELECT * FROM measurements_extracted"
            expected:
              - [ 999999,63897,2013-01-01,0000,CRN05,124,1,H,0.9,1,10.6,1 ]
              - [ 999999,63897,2013-01-01,0005,CRN05,124,1,H,1.5,1,10.6,1 ]
          - query: "SELECT COUNT(*) FROM measurements_extracted"
            expected: 2
Code language: YAML (yaml)

A good requirements engineer can provide examples of input data sets and the expected result. This is a perfect starting point for creating unit tests in which you replace the real data sources (e.g., Hive tables or JDBC data sources) with the sample data sets, run your data transformations, and then compare the result with the given expectation.

Automatic Schema Management

Another important aspect of a data application is the responsibility to actually create the output tables before writing to them. To manage the schema of a table, Flowman supports either schema inference from a chain of mappings or explicit specification of a schema. An explicit schema helps to detect errors in your logic early on, which would lead to a different (incompatible) schema.

# File: flow/model/measurements.yml
relations:
  # measurements will use an implicit schema defined as the output of a mapping
  measurements:
    kind: file
    format: parquet
    location: "$basedir/measurements/"
    # We use the inferred schema of the mapping that is written into the relation
    schema:
      kind: mapping
      mapping: measurements_extracted
Code language: YAML (yaml)
# File: flow/model/stations.yml
relations:
  # stations uses an explicit schema provided as an Avro schema definition file
  stations:
    kind: file
    format: parquet
    location: "$basedir/stations/"
    # Use an explicit schema stored in an external file
    schema:
      kind: avro
      file: "${project.basedir}/schema/stations.avsc"
Code language: YAML (yaml)

In addition, the schema is likely to change over time due to new business requirements. Flowman detects a schema mismatch between the actual physical table and the desired schema and automatically adjusts the table definition by adding/removing/modifying columns in the table definition.

Observability

Observability has rightly become a hot topic for virtually all applications. Following the DevOps principle of “you build it – you run it,” you want to understand what your application is actually doing. There are two important and complementary approaches to this topic: meaningful logging and execution metrics.

First, Flowman provides meaningful logging output:

Console with Flowman Shell
Console with Flowman Shell

Second, Flowman also provides meaningful metrics such as the number of records read or written. In addition, you can also define custom metrics to get more insights from the business point of view. All of these metrics can be published to Prometheus, which in turn can be connected to Grafana for creating dashboards.

The metrics are collected as part of a job, as follows:

# File: flow/job/main.yml
jobs:
  main:
    parameters:
      - name: year
        type: Integer
        default: 2013
    targets:
      - measurements

    # Define metrics to be published while running this job
    metrics:
      labels:
        project: "${project.name}"
      metrics:
        - selector:
            # Select all metrics
            name: ".*"
          labels:
            # Define the labels, which should be attached to each metric
            category: "$category"
            kind: "$kind"
            name: "$name"
Code language: YAML (yaml)

The metrics can be published to different sinks, for example, Prometheus, a relational database or simply printed onto the console:

# File: conf/default-namespace.yml
metrics:
  # Publish metrics to Prometheus
  - kind: prometheus
    url: $System.getenv('URL_PROMETHEUS_PUSHGW', 'http://pushgateway:9091')
    # Define how the labels which should be published
    labels:
      namespace: ${namespace.name}
      project: ${project.name}
      job: ${job.name}
      phase: ${phase}
  # Publish metrics to SQL database
  - kind: jdbc
    connection:
      kind: jdbc
      driver: "org.mariadb.jdbc.Driver"
      url: "jdbc:mariadb://mariadb/flowman_metrics"
      username: "flowman_metrics"
      password: "flowman_metrics"
    # Define how the labels which should be published
    labels:
      project: ${project.name}
      version: ${project.version}
      job: ${job.name}
      phase: ${phase}
  # Print all metrics onto the console
  - kind: console
Code language: YAML (yaml)

Extensibility

Although Flowman already comes with a solid set of relations, mappings, targets and more, it can be extended with custom plugins. This, of course, requires a certain level of expertise in Spark and Scala, but allows organizations to use Flowman for their workflows, but keep the custom logic behind their doors
without having to open source their code.

Summary

Flowman simplifies the development of complex ETL jobs by providing a simple but powerful declarative interface on top of Apache Spark. This helps data engineers focus on the “data” part. At the same time, Flowman provides advanced features such as automated schema management and unit testing to reduce development effort and increase productivity.

If you like the idea of Flowman, please help us to spread the word and star it on GitHub.

Flowman 1.1.0 released

We are happy to announce the release of Flowman 1.1.0. This release contains many small improvements and bugfixes. Flowman now finally supports

Flowman at Smartclip

smartclip is a successful and growing company specialized for online video advertisement. More importantly, smartclip was one of the first companies implementing

Flowman 1.0.0 released

Flowman version 1.0.0 has finally arrived. For several years, multiple companies are using Flowman in production as a robust and reliable solution