Declarative ETL with Spark and Flowman

Flowman utilizes an entirely declarative approach on top of Apache Spark to effectively manage your data transformations and models. The tool leverages straightforward text files with YAML syntax and a custom schema. You only need to describe all data sources, transformations, and data sinks, and Flowman will take care of handling all the technical details for actually physically creating the data models and data transformations according to your specifications.

A simple example

The following code snippets are taken from a small example, which extracts and transforms multiple metrics from a publicly available weather data set. These snippets will give you an impression for the YAML format for describing the data flows as expected by Flowman.

Flowman organizes the data flows in so called projects, which can actually contain multiple different data flows. Usually all data flows within a single project belong to the same logical data model and will be executed as a single batch process. Each project contains at least four YAML entity types relationsmappingstargets and at least one job as shown in the example:

Step 1: Relations

  # Define a relation called "measurements-raw"
    # The data is stored in files
    kind: file
    # ... as a simple text format
    format: text
    # ... in AWS S3
    location: "s3a://dimajix-training/data/weather/"
    # ... with a pattern for partitions
    pattern: "${year}"
      - name: year
        type: integer
        granularity: 1
    # Define the schema of the data    
      # The schema is embedded
      kind: embedded
        - name: raw_data
          type: string
          description: "Raw measurement data"
Relations describe your data sources and sinks including the physical location and their data schema. Each relation represents a single logical table. The example above defines a data source containing text data stored in S3.
  • Many different storage types are supported like files, object stores (like S3, ABS, …) and SQL sources.
  • The data schema can also be omitted and will then be inferred automatically. Or it can be stored in an external file to simplify maintenance.
  • Custom relation types can be implemented as plugins.

Step 2: Mappings

  # Define a mapping for reading from a relation
    kind: relation
    relation: measurements-raw
      year: $year

  # Define a mapping for extracting values
    kind: select
    # Specify the input mapping
    input: measurements-raw
    # Specify the columns to be extracted
      usaf: "SUBSTR(raw_data,5,6)"
      wban: "SUBSTR(raw_data,11,5)"
      date: "SUBSTR(raw_data,16,8)"
      time: "SUBSTR(raw_data,24,4)"
      report_type: "SUBSTR(raw_data,42,5)"
      wind_direction: "SUBSTR(raw_data,61,3)"
      wind_direction_qual: "SUBSTR(raw_data,64,1)"
      wind_speed: "CAST(SUBSTR(raw_data,66,4) AS FLOAT)/10"
      wind_speed_qual: "SUBSTR(raw_data,70,1)"
      air_temperature: "CAST(SUBSTR(raw_data,88,5) AS FLOAT)/10"
      air_temperature_qual: "SUBSTR(raw_data,93,1)"

Next you define the data flow, which usually starts with a read mapping and then continues with transformations. In this example we will simply extract some metrics at fixed locations in the weather data. A broad range of mappings is provided by Flowman.

  • Simple transformations are used for filtering, projections and record-wise transformations including the full power of all supported Spark SQL functions.
  • Grouped aggregations and joins provide  commonly functionality used for building data marts for reporting purposes.
  • Complex message transformations simplify handling of nested data types like commonly found in JSON and Avro.
  • Even SQL queries can be used for specifying arbitrary complex transformations.
  • Custom mappings can be implemented as plugins.

Step 3: Execution Targets

  # Define a built target called "measurements"
    # It should build a relation
    kind: relation
    # The data is provided by the mapping
    mapping: measurements-extracted
    # ...and the result is written into a relation
    relation: measurements
    # Overwrite a specific partition
      year: $year

Now that relations and mappings are described, we now create execution targets which plug together the output of one mapping to a target relation.

  • Typically each target refers to a target relation and and input mapping. Flowman will materialize all records from the mapping and write them to the relation.
  • Additional execution target types also provide file copy and SFTP operations.
  • Flowman will only build any execution target when the physical destination is outdated or if the execution is forced via a command-line flag.
  • Custom targets types can be implemented as plugins.

Step 4: Jobs

    # Define optional job parameters
      - name: year
        type: Integer
        default: 2013
    # Provide a list of targets in this job        
      - measurements

Finally we create a build job, which can possibly consist of multiple build targets, which Flowman will execute in the correct order, i.e. it will resolve any data dependencies between build targets within a single build job.

Each job can also be parametrized, in this example we use a parameter `year` for specifying which data partition to process.

  • Jobs help to bundle related build targets to be executed within a single Flowman invocation.
  • Automatic ordering of build targets ensure that all upstream dependencies will be executed before a target is run.
  • Optionally Flowman can store information on individual job runs in a history database to improve observability.

Step 5: Execute

Console with Flowman Shell

Once all declarations are in place, the project can be executed. While the most important component of Flowman is the command line tool for normal batch execution, Flowman also provides an interactive shell for developers where they can also inspect the project, analyze intermediate results and much more.