I looked forward to Dataform becoming part of Google Cloud. Merging into GCP took longer than I hoped and expected. In the second half of 2022, Dataform was (finally) integrated as a part of BigQuery. This whole article is about Dataform as a part of Google Cloud, not the initial (now legacy) Dataform.
1. automatically as a deployment via Cloud Build upon branch update/merge (especially for main/staging branch)
2. as a part of a data pipeline, for example after new data is loaded into BigQuery.
Dataform workflow execution consists of 2 steps:
1. compiling - converting Dataform sqlx files into raw BigQuery SQL queries
2. invoking workflow - executing SQL scripts
So when you want to execute a complete Dataform workflow, you need to compile and invoke it. An important part of workflow execution is that you need to provide execution variables like a Git branch from which you want to do execution, GCP project, or BigQuery dataset plus some extra flags like full refresh or concrete tags if you use them.
In this article, I want to describe several ways how Dataform Workflows can be executed outside of Google Cloud Console. The complete code is here.
Executing Dataform workflow in Cloud Workflows
At the moment (January 2023) interaction with Dataform is done through the API or client libraries, there is no integration with Cloud SDK which is more suitable to use in Cloud Build steps. Official documentation offers examples for Airflow (Cloud Composer) and Cloud Workflows that I personally like since it's relatively simple, easy to set, and serverless. For my use case, I've added a workflow step to check if there is an error when executing the Dataform workflow and also input parameters.
This is what Cloud Workflow looks like:
main:
    params: [args]
    steps:
    - init:
        assign:
          - repository: ${"projects/" + args.gcp_project + "/locations/us-central1/repositories/" + args.repository}
          - default_schema: ${args.dataset}
          - branch: ${args.branch}
          - full_refresh: ${default(map.get(args, "full_refresh"), false)}
    - createCompilationResult:
        call: http.post
        args:
            url: ${"https://dataform.googleapis.com/v1beta1/" + repository + "/compilationResults"}
            auth:
                type: OAuth2
            body:
                gitCommitish: ${branch}
                codeCompilationConfig:
                    defaultSchema: ${default_schema}
        result: compilationResult
    - createWorkflowInvocation:
        call: http.post
        args:
            url: ${"https://dataform.googleapis.com/v1beta1/" + repository + "/workflowInvocations"}
            auth:
                type: OAuth2
            body:
                compilationResult: ${compilationResult.body.name}
                invocationConfig:
                    fullyRefreshIncrementalTablesEnabled: ${full_refresh}
        result: workflowInvocation
    - getInvocationResult:
        call: http.get
        args:
            url:  ${"https://dataform.googleapis.com/v1beta1/" + workflowInvocation.body.name}
            auth:
                type: OAuth2
        result: invocationResult
    - waitForResult:
        call: sys.sleep
        args:
            seconds: 10
        next: checkInvocationResult
    - checkInvocationResult:
        switch:
            - condition: ${invocationResult.body.state == "RUNNING"}
              next: getInvocationResult
            - condition: ${invocationResult.body.state == "SUCCEEDED"}
              next: end
            - condition: ${invocationResult.body.state == "CANCELLED" or invocationResult.body.state == "FAILED" or invocationResult.body.state == "CANCELING"}
              steps:
                - raiseException:
                    raise: ${"Error while running workflow " +  invocationResult.body.name + " " + invocationResult.body.state}
There are a couple of steps here executed via REST API: compiling the code, creating an invocation based on the compilation, and checking the invocation result.
When using Cloud Build we execute Cloud Workflow from above by using Google Cloud SDK and providing input variables in JSON format. So there is basically one step:
steps:
  - name: 'gcr.io/google.com/cloudsdktool/cloud-sdk:slim'
    entrypoint: 'gcloud'
    args: [ 'beta', 'workflows', 'run', '${_WORKFLOW_NAME}',  '--data',
            '{"repository": "${_REPOSITORY}", "dataset": "${_DATASET}", "branch": "${_BRANCH}", "full_refresh": "${_FULL_REFRESH}", "gcp_project": "$PROJECT_ID"}',
            '--call-log-level', 'log-all-calls'
    ]
Everything is parametrized, i.e. injected from Cloud Build trigger settings.
_WORKFLOW_NAME - the name of the Cloud Workflow that executes Dataform workflow
_REPOSITORY - the name of the Dataform repository on Google Cloud
_DATASET - BigQuery dataset that will be used
_BRANCH - name of the Git branch to use for deployment
_FULL_REFRESH - true/false, i.e. whether to do a full refresh
This how Cloud Build trigger looks like:
In some cases, I am executing Cloud Workflow (Dataform workflow) straight from Python.
This is the sample code to execute Cloud Workflow in Python, adjusted for my use case (most of the code taken from official Cloud Workflows documentation):
import time
import json
import logging
from google.cloud import workflows
from google.cloud.workflows import executions
workflows_client = workflows.WorkflowsClient()
execution_client = workflows.executions.ExecutionsClient()
def execute_workflow(gcp_project: str, location: str, workflow_name: str, input_data: dict):
    """Executes a Workflow
    :param gcp_project - name of the GCP Project
    :param location - location of the Cloud Workflow, i.e. us-central1
    :param workflow_name - name of the workflow
    :param input_data - dictionary of input data for the workflow that will be passed as JSON
    """
    parent = workflows_client.workflow_path(gcp_project, location, workflow_name)
    response = execution_client.create_execution(
        request={'parent': parent, 'execution': {'argument': json.dumps(input_data)}}, )
    execution_finished = False
    backoff_delay = 1
    logging.info('Poll every second for result...')
    while not execution_finished:
        execution = execution_client.get_execution(request={"name": response.name})
        execution_finished = execution.state != executions.Execution.State.ACTIVE
        if not execution_finished:
            logging.info('- Waiting for results...')
            time.sleep(backoff_delay)
            backoff_delay *= 2
        else:
            logging.info(f'Execution finished with state: {execution.state.name}')
            logging.info(execution.result)
            return execution.result
and this is what sample execution could look like:
gcp_project = 'my-gcp-project'
workflow_name = 'dataform-pipeline'
location = 'us-central1'
input_data = {
        'full_refresh': False,
        'branch': 'main',
        'dataset': 'dataform_stage',
        'repository': 'df-test',
        'gcp_project': gcp_project
}
execute_workflow(gcp_project, location, workflow_name, input_data)When I initially did development, I created also a Python script to execute Dataform workflow, so I'm adding it here for completeness.
import logging
import time
from google.cloud import dataform_v1beta1
df_client = dataform_v1beta1.DataformClient()
def execute_workflow(repo_uri: str, compilation_result: str):
    """Run workflow based on the compilation"""
    request = dataform_v1beta1.CreateWorkflowInvocationRequest(
        parent=repo_uri,
        workflow_invocation=dataform_v1beta1.types.WorkflowInvocation(
            compilation_result=compilation_result
        )
    )
    response = df_client.create_workflow_invocation(request=request)
    name = response.name
    logging.info(f'created workflow invocation {name}')
    return name
def compile_workflow(repo_uri: str, gcp_project, bq_dataset: str, branch: str):
    """Compiles the code"""
    request = dataform_v1beta1.CreateCompilationResultRequest(
        parent=repo_uri,
        compilation_result=dataform_v1beta1.types.CompilationResult(
            git_commitish=branch,
            code_compilation_config=dataform_v1beta1.types.CompilationResult.CodeCompilationConfig(
                default_database=gcp_project,
                default_schema=bq_dataset,
            )
        )
    )
    response = df_client.create_compilation_result(request=request)
    name = response.name
    logging.info(f'compiled workflow {name}')
    return name
def get_workflow_state(workflow_invocation_id: str):
    """Checks the status of a workflow invocation"""
    while True:
        request = dataform_v1beta1.GetWorkflowInvocationRequest(
            name=workflow_invocation_id
        )
        response = df_client.get_workflow_invocation(request)
        state = response.state.name
        logging.info(f'workflow state: {state}')
        if state == 'RUNNING':
            time.sleep(10)
        elif state in ('FAILED', 'CANCELING', 'CANCELLED'):
            raise Exception(f'Error while running workflow {workflow_invocation_id}')
        elif state == 'SUCCEEDED':
            return
def run_workflow(gcp_project: str, location: str, repo_name: str, bq_dataset: str, branch: str):
    """Runs complete workflow, i.e. compile and invoke"""
    repo_uri = f'projects/{gcp_project}/locations/{location}/repositories/{repo_name}'
    compilation_result = compile_workflow(repo_uri, gcp_project, bq_dataset, branch)
    workflow_invocation_name = execute_workflow(repo_uri, compilation_result)
    get_workflow_state(workflow_invocation_name)
if __name__ == '__main__':
    gcp_project = ''
    location = 'us-central1'
    repo_name = 'df-test'
    bq_dataset = 'dataform_tutorial'
    branch = 'main'
    run_workflow(gcp_project, location, repo_name, bq_dataset, branch)
I'm looking forward to further Dataform enhancement on Google Cloud.