Running dbt pipelines via Cloud Composer

Introduction

In this post, we will be setting up dbt with cloud composer – built on an open-source project Airflow, that is a fully managed service provided by Google to monitor, author, and schedule pipelines, to showcase how it can be used to carry out transformational data pipelines for the purpose of analytics.

dbt, also known as data build tool, is a workflow used to show transformations. Teams can collaborate and put production code for analytics quickly.
With the help of dbt, the evident best practices for software engineering can be utilized to deploy code used to carry out analytics, which includes but is not limited to – modularity, CI/CD, portability, etc.; With dbt, if one knows SQL, they can build production-ready data pipelines easily.
dbt is used to transform data in data warehouses by simply writing select statements and converting them to tables and views.
In the post we will be creating a simple analytical pipeline, that would be utilizing Stackoverflow public BigQuery dataset [5] and transforming it to count the number of tags attached to the Stackoverflow tags. Furthermore, we will integrate the pipeline with a simple Data Studio dashboard.

A typical dbt project

A typical dbt project consists of:

  1. dbt_project.yml: This file tells dbt that it is a dbt project. It contains definitions, configurations, and information on how a particular dbt project is operated
    The model’s directory typically consists of SQL files, with a single select statement and subsequent subdirectories for nested models. In this case, the model’s directory contains one SQL file stack_tags.sql and a nested subdirectory foo, which further contains another SQL file stack_tags_view.sql
  2. profiles.yml: contains all the details required to connect to your data warehouse. When you invoke dbt from the command line, dbt parses your dbt_project.yml and obtains the profile name, which dbt needs to connect to your data warehouse
  3. dbt-bigquery-user-key.json: This is a service account key for a BigQuery user account
A typical dbt project

Configurations and Models

dbt_project.yml

name: ‘stackoverflow_stats’
version: ‘1.0’
profile: ‘stackoverflow_stats’

config-version: 2

models:
stackoverflow_stats:
+materialized: table
foo:
+materialized: view

models/stack_tags.sql

WITH
count_res AS (
SELECT
SPLIT(tags,’|’) AS tech
FROM
`bigquery-public-data.stackoverflow.posts_questions`
WHERE
EXTRACT (YEAR
FROM
creation_date)>2020 ),
tags_unnested AS (
SELECT
splitted_tags,
COUNT(*) AS tech_count
FROM
count_res
CROSS JOIN
UNNEST(tech) splitted_tags
GROUP BY
splitted_tags
ORDER BY
tech_count DESC )
SELECT
*
FROM
tags_unnested

profiles.yml

stackoverflow_stats: # Should be in line with dbt_project.yml file
target: dev
outputs:
dev:
type: bigquery
method: service-account
keyfile: /home/airflow/gcs/data/profiles/dbt-bigquery-user-key.json # Path to keyfile
project: rapid-gadget-325605 # project ID
dataset: dbt_vips # The dataset name
threads: 1
timeout_seconds: 300
location: US
priority: interactive

Integrating Cloud Composer and dbt

Integrating Cloud Composer and dbt

We will be following the above configuration for setting up Cloud Composer.

Furthermore, will install the packages below:

Werkzeug<1
Jsonschema<3.2,>=3.0
Airflow-dbt==0.4.0
dbt==0.20.0

DAGs folder (gs://us-west1-composer-dbt-env-e351ad79-bucket/dags) in GCS (Consists of Airflow pipeline with DAG object and DBT project files).
There is also an Airflow web UI link – that can be used to trigger airflow pipelines.

Overall directory structure including Cloud Composer dependencies to be uploaded to DAGs folder inside the GCS

This is the overall structure. dbtflow.py and dbtbashcmd.py are the airflow pipelines that orchestrate and execute the dbt run command. The dbt run further executes SQL model files against the current target database. dbt connects to the target database and runs the relevant SQL required to materialize all data models as per materialization strategies specified.
Models run in the order defined by the dependency graph generated during compilation. [4]

Configuration and code for Cloud composer

dbtflow.py

import datetime
from datetime import timedelta
from airflow.utils.dates import days_ago
from airflow import DAG, configuration
from airflow_dbt.hooks.dbt_hook import DbtCliHook
from airflow_dbt.operators.dbt_operator import DbtRunOperator, DbtTestOperator

default_args = {
‘dir’: ‘/home/airflow/gcs/dags’,
‘profiles_dir’: ‘/home/airflow/gcs/data/profiles’,
‘owner’: ‘airflow’,
‘depends_on_past’: False,
‘start_date’: days_ago(0),
’email’: [‘vipul.mehra@springml.com’],
’email_on_failure’: False,
’email_on_retry’: False,
‘retries’: 1,
‘retry_delay’: timedelta(minutes=5)
}

with DAG(dag_id=’stack_example’, default_args=default_args, schedule_interval=’@daily’) as dag:

run_op = DbtRunOperator(
task_id=’run_op’,
)

test_op = DbtTestOperator(
task_id=’test_op’,
retries=0, # If the tests fail, we don’t want Airflow to run again
)

run_op >> test_op

dbtbashcmd.py

from airflow import DAG
from airflow.operators.dummy_operator import DummyOperator
from airflow.operators.bash_operator import BashOperator

from datetime import datetime, timedelta

with DAG(dag_id=’bash_op_example’, schedule_interval=None, start_date=datetime(2020, 1, 1), catchup=False) as dag:

# Task 1
warmup_task = DummyOperator(task_id=’warmup_task’)

# Task 2
bash_cmd = “dbt run –project-dir /home/airflow/gcs/dags –profiles-dir /home/airflow/gcs/data/profiles”
bash_task = BashOperator(task_id=’dbt_task’, bash_command=bash_cmd)

warmup_task >> bash_task

Triggering dbt pipeline run through Cloud Composer (Airflow)

Triggering dbt pipeline run through Cloud Composer Airflow

The dbt pipeline can be executed through the Airflow web UI as can be seen, from the above figure, once either of the (bash_op_example and stack_example) DAG runs, the subsequent code for the following pipeline will execute dbt run, and eventually materialize views/table as per specification into the bigquery dataset. 

Our dashboard that is fetching data from those views/tables would display the metrics as specified.

You can watch the video on “Integrating DBT pipelines quickly with the Google Cloud Composer” to know more.