Hung's Notebook

DEZ Week 2 - Kestra

Table of Content


Week 1 was about SQL and 2 DevOps components that help to set the environment for a data engineering projects. In week 2, we moved on to orchestration with Kestra, a relatively new framework.

Why Orchestration?

This is a data engineering zoomcamp, so everything taught should be viewed with the perspective of a data engineer.

What data engineers do in their daily job? Most of the time it's building data pipelines which involves:

At the start, the volume of data is small. You can use a single on-premise server and develop the script and the environment there for all company pipelines. And to set your pipeline script to run at a set interval, a cron script suffices.

As volume of data grows while required duration to insights shortens, companies have moved their data to the cloud. Your data engineering architecture are put together from basic cloud services such as VMs or simple storage. This creates new problems such as "How to make sure production environment is always consistent?" and "How to create and destroy cloud services quickly without going through web consoles one by one?" that lead to new standards of practice (Docker and Terraform).

Similar motivation was applied to the pipelines. There may be hundreds of workflows, so we want a better UI to view them. Workflow can involve dozens of step, so we want a better language to define them. Instead of running only by schedule, we also want workflows to be triggerable by certain conditions. We want to break workflows into different steps, with the ability to checkpoint data so that it's faster to recover from failure. Adding these functionalities to cron, we now have orchestration frameworks to handle our pipelines.

To explain the concepts, I want to jump directly into the homework and use the example them.

Doing the homework with Kestra

Homework folder: https://github.com/HangenYuu/data-engineering-zoomcamp/tree/main/02-workflow-orchestration

Setting up Kestra

Kestra has 3 main deployment components:

This can be implemented simply as 2 Docker containers for PostgreSQL and Kestra application, which is what I used for the homework (image from Kestra documentation).

Setting up Kestra was easy with the 🐳Docker Compose script provided by Kestra. To reduce storage usage on my GitHub Codespace, I only changed 2 things: deleted the volume for PostgreSQL since I won't load any data there, and set app data of Kestra to the machine /tmp/ since it's not counted to the 32GB limit of the instance 😉.

For cloud deployment:

Setting up credentials

To set up and ingest data into cloud services from sources outside the cloud, we need credentials with access to the right cloud services. This includes Google Cloud Storage as data lake for raw file parking, and BigQuery as data warehouse for production data1.

States (credentials, environment variables) on Kestra will be stored on the internal KV store of the app. So the first workflow need to be set up the states in KV store.

id: 04_gcp_kv
namespace: zoomcamp

tasks:
  - id: gcp_project_id
    type: io.kestra.plugin.core.kv.Set
    key: GCP_PROJECT_ID
    kvType: STRING
    value: dezoomcamp-416702
  - ...

Each workflow is defined in a YAML file, identified via its namespace and id. The namespace is a logical grouping of the flows, helping to segregate secretes, environment variables, and plugins between namespaces. Each workflow has a sequence of tasks. Here, each task sets the value for one environment variables on execution2.

Kestra-GCP-KV.png

Setting up GCP

I set up the GCP resources with Terraform to make use of the knowledge of the first module. The resouces can also be set up with Kestra via a workflow (ifExists must be set to SKIP to not overwrite existing resource or raise errors). The workflow must be defined in the same namespace to make use of the states set up earlier.

id: 05_gcp_setup
namespace: zoomcamp

tasks:
  - id: create_gcs_bucket
    type: io.kestra.plugin.gcp.gcs.CreateBucket
    ifExists: SKIP
    storageClass: STANDARD
    name: "{{kv('GCP_BUCKET_NAME')}}"

  - id: create_bq_dataset
    type: io.kestra.plugin.gcp.bigquery.CreateDataset
    name: "{{kv('GCP_DATASET')}}"
    ifExists: SKIP

pluginDefaults:
  - type: io.kestra.plugin.gcp
    values:
      serviceAccount: "{{kv('GCP_CREDS')}}"
      projectId: "{{kv('GCP_PROJECT_ID')}}"
      location: "{{kv('GCP_LOCATION')}}"
      bucket: "{{kv('GCP_BUCKET_NAME')}}"

The resources will be set up after successful execution (author own image).

GCP Setup

Main workflow

The main workflow is based on the one used in the tutorial.

id: 06_gcp_taxi_scheduled
namespace: zoomcamp
description: |
  Best to add a label `backfill:true` from the UI to track executions created via a backfill.
  CSV data used here comes from: https://github.com/DataTalksClub/nyc-tlc-data/releases

inputs:
  - id: taxi
    type: SELECT
    displayName: Select taxi type
    values: [yellow, green]
    defaults: green

variables:
  file: "{{inputs.taxi}}_tripdata_{{trigger.date | date('yyyy-MM')}}.csv"
  gcs_file: "gs://{{kv('GCP_BUCKET_NAME')}}/{{vars.file}}"
  table: "{{kv('GCP_DATASET')}}.{{inputs.taxi}}_tripdata_{{trigger.date | date('yyyy_MM')}}"
  data: "{{outputs.extract.outputFiles[inputs.taxi ~ '_tripdata_' ~ (trigger.date | date('yyyy-MM')) ~ '.csv']}}"

triggers:
  - id: green_schedule
    type: io.kestra.plugin.core.trigger.Schedule
    cron: "0 9 1 * *"
    inputs:
      taxi: green

  - id: yellow_schedule
    type: io.kestra.plugin.core.trigger.Schedule
    cron: "0 10 1 * *"
    inputs:
      taxi: yellow

The preamble sets up

Next is the tasks section.

tasks:
  - id: set_label
    type: io.kestra.plugin.core.execution.Labels
    labels:
      file: "{{render(vars.file)}}"
      taxi: "{{inputs.taxi}}"

  - id: extract
    type: io.kestra.plugin.scripts.shell.Commands
    outputFiles:
      - "*.csv"
    taskRunner:
      type: io.kestra.plugin.core.runner.Process
    commands:
      - wget -qO- https://github.com/DataTalksClub/nyc-tlc-data/releases/download/{{inputs.taxi}}/{{render(vars.file)}}.gz | gunzip > {{render(vars.file)}}

  - id: upload_to_gcs
    type: io.kestra.plugin.gcp.gcs.Upload
    from: "{{render(vars.data)}}"
    to: "{{render(vars.gcs_file)}}"

  - id: if_yellow_taxi
    type: io.kestra.plugin.core.flow.If
    condition: "{{inputs.taxi == 'yellow'}}"
    then:
      - id: bq_yellow_tripdata
        type: io.kestra.plugin.gcp.bigquery.Query
        sql: ...

Workflow in orchestration framework is conceptualized as directed acyclic graphs (DAGs). The word is thrown around a lot, but it just means that you have:

  1. A sequence of tasks (a script, a shell command, etc.) that must be executed in one direction only.
  2. A task should not be executed until all dependent task have finished.
  3. Task dependencies do not form a loop i.e., the execution has an end.

It's quite similar to control flow graph in program execution (image from rustc-dev-guide repo of rust-lang).

mir_cfg

In Kestra, the workflow is defined in a YAML file, with tasks defined sequentially in the tasks section. Each task in the workflow has a defined type in Kestra, which abstracts what the step does. For example, the io.kestra.plugin.gcp.gcs.Upload type in the YAML will invoke this method, which encodes the file from source, uploads it to destination in chunks of 10MB, and returns the URI for the next step. The type system also introduces programmatic elements into the workflow, such as io.kestra.plugin.core.flow.If for branching and io.kestra.plugin.core.flow.ForEach for looping.

In this case, the workflow is branched based on the condition of the input taxi, so that the CSV file is appended to the correct BigQuery table.

Doing the homework

Question 6 is not related to the data, answer can be found from this documentation.

Question 2 can also be inferred from the workflow definition.

Question 1, 3, 4, 5 require us to ingest the data of Green & Yellow taxi for year 2020, and March 2021 for Yellow taxi afterwards.

To do this, we can run backfill for the workflow in the period 2020-01-01 00:00:00 to 2020-12-31 23:59:59, assuming starting from scratch.

Backfill

Backfill-Conditions

backfill run

backfill run 2

After running successfully (after a few trials 😅), we can see the files on GCS and the tables on BigQuery.

GCS backfill

BQ backfill

From GCS, we can get the answer for Question 1.

After ingesting for the whole year 2020 and before ingesting Yellow March 2021, we can get the answer for Question 3 & 4 from the "DETAILS" tab of the main table yellow_tripdata and green_tripdata.

After ingesting the Yellow March 2021, Question 5 can be read from the "DETAILS" tab for the partition yellow_tripdata_2021_03 as shown in the above picture.

Next week: Data Warehouse with BigQuery

The ETL pipeline for this week is incomplete - there's no transformation yet. So far, we have only learnt tools to set up the workflows, and partly how to extract and load data. In the next week and the workshop, we will learn more about data ingestion with dlthub, a data integration tool for Python, and BigQuery, THE cloud data warehouse from Google.

Without specification, images are own author's.


  1. Though the production data are not created in this workflow yet, and the raw data are ingested into BigQuery directly.

  2. The credential is input directly into the KV store since its content cannot be included in the YAML file.

#dataeng #post #study