Dataform CI/CD pipeline on Google Cloud

I looked forward to Dataform becoming part of Google Cloud. Merging into GCP took longer than I hoped and expected. In the second half of 2022, Dataform was (finally) integrated as a part of BigQuery. This whole article is about Dataform as a part of Google Cloud, not the initial (now legacy) Dataform.

Dataform offers UI to initialize a project, write queries, verify syntax, compile, and execute. It's possible to connect a GitHub repository and save all code there. Connection to GitHub or Bitbucket offers extra functionalities like creating pull requests and branch merging. 
In my projects, I usually use the dev branch for playing/development, staging for tests/pre-production deployments, and the main branch for production. I like to have stuff as much as configurable and automated so usually, I'm executing Dataform workflows in two ways:

1. automatically as a deployment via Cloud Build upon branch update/merge (especially for main/staging branch)

2. as a part of a data pipeline, for example after new data is loaded into BigQuery.

Dataform workflow execution consists of 2 steps:

1. compiling - converting Dataform sqlx files into raw BigQuery SQL queries

2. invoking workflow - executing SQL scripts

So when you want to execute a complete Dataform workflow, you need to compile and invoke it. An important part of workflow execution is that you need to provide execution variables like a Git branch from which you want to do execution, GCP project, or BigQuery dataset plus some extra flags like full refresh or concrete tags if you use them.

In this article, I want to describe several ways how Dataform Workflows can be executed outside of Google Cloud Console. The complete code is here.

Executing Dataform workflow in Cloud Workflows

At the moment (January 2023) interaction with Dataform is done through the API or client libraries, there is no integration with Cloud SDK which is more suitable to use in Cloud Build steps. Official documentation offers examples for Airflow (Cloud Composer) and Cloud Workflows that I personally like since it's relatively simple, easy to set, and serverless. For my use case, I've added a workflow step to check if there is an error when executing the Dataform workflow and also input parameters. 

This is what Cloud Workflow looks like:

main:
    params: [args]
    steps:
    - init:
        assign:
          - repository: ${"projects/" + args.gcp_project + "/locations/us-central1/repositories/" + args.repository}
          - default_schema: ${args.dataset}
          - branch: ${args.branch}
          - full_refresh: ${default(map.get(args, "full_refresh"), false)}
    - createCompilationResult:
        call: http.post
        args:
            url: ${"https://dataform.googleapis.com/v1beta1/" + repository + "/compilationResults"}
            auth:
                type: OAuth2
            body:
                gitCommitish: ${branch}
                codeCompilationConfig:
                    defaultSchema: ${default_schema}
        result: compilationResult
    - createWorkflowInvocation:
        call: http.post
        args:
            url: ${"https://dataform.googleapis.com/v1beta1/" + repository + "/workflowInvocations"}
            auth:
                type: OAuth2
            body:
                compilationResult: ${compilationResult.body.name}
                invocationConfig:
                    fullyRefreshIncrementalTablesEnabled: ${full_refresh}
        result: workflowInvocation
    - getInvocationResult:
        call: http.get
        args:
            url:  ${"https://dataform.googleapis.com/v1beta1/" + workflowInvocation.body.name}
            auth:
                type: OAuth2
        result: invocationResult
    - waitForResult:
        call: sys.sleep
        args:
            seconds: 10
        next: checkInvocationResult
    - checkInvocationResult:
        switch:
            - condition: ${invocationResult.body.state == "RUNNING"}
              next: getInvocationResult
            - condition: ${invocationResult.body.state == "SUCCEEDED"}
              next: end
            - condition: ${invocationResult.body.state == "CANCELLED" or invocationResult.body.state == "FAILED" or invocationResult.body.state == "CANCELING"}
              steps:
                - raiseException:
                    raise: ${"Error while running workflow " +  invocationResult.body.name + " " + invocationResult.body.state}

There are a couple of steps here executed via REST API: compiling the code, creating an invocation based on the compilation, and checking the invocation result.

Executing Dataform workflow in Cloud Build

When using Cloud Build we execute Cloud Workflow from above by using Google Cloud SDK and providing input variables in JSON format. So there is basically one step:

steps:
  - name: 'gcr.io/google.com/cloudsdktool/cloud-sdk:slim'
    entrypoint: 'gcloud'
    args: [ 'beta', 'workflows', 'run', '${_WORKFLOW_NAME}',  '--data',
            '{"repository": "${_REPOSITORY}", "dataset": "${_DATASET}", "branch": "${_BRANCH}", "full_refresh": "${_FULL_REFRESH}", "gcp_project": "$PROJECT_ID"}',
            '--call-log-level', 'log-all-calls'
    ]

Everything is parametrized, i.e. injected from Cloud Build trigger settings. 

_WORKFLOW_NAME - the name of the Cloud Workflow that executes Dataform workflow

_REPOSITORY - the name of the Dataform repository on Google Cloud

_DATASET - BigQuery dataset that will be used

_BRANCH - name of the Git branch to use for deployment

_FULL_REFRESH - true/false, i.e. whether to do a full refresh

 This how Cloud Build trigger looks like:

 

Executing Cloud Workflow in Python

In some cases, I am executing Cloud Workflow (Dataform workflow) straight from Python.

This is the sample code to execute Cloud Workflow in Python, adjusted for my use case (most of the code taken from official Cloud Workflows documentation):

import time
import json
import logging
from google.cloud import workflows
from google.cloud.workflows import executions

workflows_client = workflows.WorkflowsClient()
execution_client = workflows.executions.ExecutionsClient()


def execute_workflow(gcp_project: str, location: str, workflow_name: str, input_data: dict):
    """Executes a Workflow
    :param gcp_project - name of the GCP Project
    :param location - location of the Cloud Workflow, i.e. us-central1
    :param workflow_name - name of the workflow
    :param input_data - dictionary of input data for the workflow that will be passed as JSON
    """
    parent = workflows_client.workflow_path(gcp_project, location, workflow_name)

    response = execution_client.create_execution(
        request={'parent': parent, 'execution': {'argument': json.dumps(input_data)}}, )
    execution_finished = False
    backoff_delay = 1
    logging.info('Poll every second for result...')
    while not execution_finished:
        execution = execution_client.get_execution(request={"name": response.name})
        execution_finished = execution.state != executions.Execution.State.ACTIVE

        if not execution_finished:
            logging.info('- Waiting for results...')
            time.sleep(backoff_delay)
            backoff_delay *= 2
        else:
            logging.info(f'Execution finished with state: {execution.state.name}')
            logging.info(execution.result)
            return execution.result

 

and this is what sample execution could look like:

gcp_project = 'my-gcp-project'
workflow_name = 'dataform-pipeline'
location = 'us-central1'

input_data = {
        'full_refresh': False,
        'branch': 'main',
        'dataset': 'dataform_stage',
        'repository': 'df-test',
        'gcp_project': gcp_project
}
execute_workflow(gcp_project, location, workflow_name, input_data)

Executing Dataform workflow in Python

When I initially did development, I created also a Python script to execute Dataform workflow, so I'm adding it here for completeness.

import logging
import time

from google.cloud import dataform_v1beta1

df_client = dataform_v1beta1.DataformClient()


def execute_workflow(repo_uri: str, compilation_result: str):
    """Run workflow based on the compilation"""
    request = dataform_v1beta1.CreateWorkflowInvocationRequest(
        parent=repo_uri,
        workflow_invocation=dataform_v1beta1.types.WorkflowInvocation(
            compilation_result=compilation_result
        )
    )

    response = df_client.create_workflow_invocation(request=request)
    name = response.name
    logging.info(f'created workflow invocation {name}')
    return name


def compile_workflow(repo_uri: str, gcp_project, bq_dataset: str, branch: str):
    """Compiles the code"""
    request = dataform_v1beta1.CreateCompilationResultRequest(
        parent=repo_uri,
        compilation_result=dataform_v1beta1.types.CompilationResult(
            git_commitish=branch,
            code_compilation_config=dataform_v1beta1.types.CompilationResult.CodeCompilationConfig(
                default_database=gcp_project,
                default_schema=bq_dataset,
            )
        )
    )
    response = df_client.create_compilation_result(request=request)
    name = response.name
    logging.info(f'compiled workflow {name}')
    return name


def get_workflow_state(workflow_invocation_id: str):
    """Checks the status of a workflow invocation"""
    while True:
        request = dataform_v1beta1.GetWorkflowInvocationRequest(
            name=workflow_invocation_id
        )
        response = df_client.get_workflow_invocation(request)
        state = response.state.name
        logging.info(f'workflow state: {state}')
        if state == 'RUNNING':
            time.sleep(10)
        elif state in ('FAILED', 'CANCELING', 'CANCELLED'):
            raise Exception(f'Error while running workflow {workflow_invocation_id}')
        elif state == 'SUCCEEDED':
            return


def run_workflow(gcp_project: str, location: str, repo_name: str, bq_dataset: str, branch: str):
    """Runs complete workflow, i.e. compile and invoke"""

    repo_uri = f'projects/{gcp_project}/locations/{location}/repositories/{repo_name}'
    compilation_result = compile_workflow(repo_uri, gcp_project, bq_dataset, branch)
    workflow_invocation_name = execute_workflow(repo_uri, compilation_result)
    get_workflow_state(workflow_invocation_name)


if __name__ == '__main__':
    gcp_project = ''
    location = 'us-central1'
    repo_name = 'df-test'
    bq_dataset = 'dataform_tutorial'
    branch = 'main'

    run_workflow(gcp_project, location, repo_name, bq_dataset, branch)

I'm looking forward to further Dataform enhancement on Google Cloud.

blog comments powered by Disqus