Appending Nested Columns in dbt Incremental Updates on BigQuery
## Overview
In this blog post, we’ll explore how to append nested columns in BigQuery when performing incremental updates using dbt. This is a common challenge that arises because BigQuery does not support adding nested columns directly through DDL commands. Instead, we’ll leverage the `bq` command-line tool and a Python script to manage schema updates effectively. By the end of this article, you will have a clear understanding of how to handle nested columns in your dbt projects.
## dbt Incremental Update Scenario
Incremental updates in dbt allow you to add new records to your models without reprocessing the entire dataset. This is particularly useful for large datasets where full refreshes can be time-consuming.
For instance, consider a dbt model that aggregates daily active users for app_data_events. Here’s a simple dbt model code snippet from the official docs:
{{
config(
materialized='incremental',
unique_key='date_day'
)
}}
select
date_trunc('day', event_at) as date_day,
count(distinct user_id) as daily_active_users
from {{ ref('app_data_events') }}
{% if is_incremental() %}
-- this filter will only be applied on an incremental run
-- (uses >= to include records arriving later on the same day as the last run of this model)
where date_day >= (select coalesce(max(date_day), '1900-01-01') from {{ this }})
{% endif %}
group by 1
As you can see in the code, the data to be loaded can be limited to only unprocessed dates when is_incremental()
is true, which can dramatically decreases the processing data.
Incremental updates also support a few types of schema change configuration on_schema_change
:
ignore
: Default behavior (see below).fail
: Triggers an error message when the source and target schemas divergeappend_new_columns
: Append new columns to the existing table. Note that this setting does not remove columns from the existing table that are not present in the new data.sync_all_columns
: Adds any new columns to the existing table, and removes any columns that are now missing. Note that this is inclusive of data type changes. On BigQuery, changing column types requires a full table scan; be mindful of the trade-offs when implementing.
{{
config(
materialized='incremental',
unique_key='date_day',
on_schema_change='append_new_columns'
)
}}
append_new_columns
would be a good tradeoff between flexibility and the backward compatibility.
## on_schema_change issue for BQ
However, since BigQuery currently does not allow for direct addition of nested columns through DDL, you will need to follow the steps outlined in this post to update your schema correctly.
This limitation complicates workflows when using dbt for incremental updates. As a result, engineers often find themselves needing to find workarounds to manage schema changes effectively without breaking their data pipelines.
## Solutions (Workaround)
### Optio1. Manual Update with bq
Command
If you prefer a manual and simple approach as is written in the GCP doc, you can update your schema using the bq
command-line tool. Here’s how:
- Retrieve the Current Schema
Run the following command to fetch the current schema of your table:
bq show - schema - format=prettyjson mydataset.mytable > /tmp/myschema.json
2. Edit the Schema File
Open the /tmp/myschema.json
file and append the new nested column.
3. Update the Table Schema
Apply the updated schema with:
bq update mydataset.mytable /tmp/myschema.json
### Option2. With a (Python or whatever) Script
For more efficiency, you can use the provided Python script to automate schema updates. Here’s a step-by-step guide:
import json
import argparse
from google.auth import impersonated_credentials, default
from google.cloud import bigquery
def diff_schemas(old_schema, new_schema, prefix=""):
diffs = []
old_fields = {field["name"]: field for field in old_schema}
new_fields = {field["name"]: field for field in new_schema}
for name, old_field in old_fields.items():
full_field = f"{prefix}{name}"
if name not in new_fields:
diffs.append(f"Field removed: {full_field}")
else:
new_field = new_fields[name]
if old_field.get("type") != new_field.get("type"):
diffs.append(f"Type changed for field {full_field}: {old_field.get('type')} -> {new_field.get('type')}")
if old_field.get("mode") != new_field.get("mode"):
diffs.append(f"Mode changed for field {full_field}: {old_field.get('mode')} -> {new_field.get('mode')}")
if old_field.get("type") == "RECORD":
old_nested = old_field.get("fields", [])
new_nested = new_field.get("fields", [])
diffs.extend(diff_schemas(old_nested, new_nested, prefix=full_field + "."))
for name, new_field in new_fields.items():
full_field = f"{prefix}{name}"
if name not in old_fields:
diffs.append(f"Field added: {full_field}")
return diffs
def get_bigquery_client(project, impersonate_service_account=None):
"""Get a BigQuery client, optionally using impersonated credentials.
Args:
project (str): GCP project ID.
impersonate_service_account (str, optional): Service account email to impersonate.
Returns:
bigquery.Client: A BigQuery client.
"""
if impersonate_service_account:
# Use the default credentials to create impersonated credentials.
source_credentials, _ = default()
target_credentials = impersonated_credentials.Credentials(
source_credentials=source_credentials,
target_principal=impersonate_service_account,
target_scopes=["https://www.googleapis.com/auth/cloud-platform"],
lifetime=3600,
)
return bigquery.Client(project=project, credentials=target_credentials)
else:
return bigquery.Client(project=project)
def load_current_schema(
project: str, dataset_id: str, table_id: str, schema_json: str, impersonate_service_account: str = ""
):
"""Load the current schema of a BigQuery table and save it to a JSON file.
Args:
project (str): GCP project ID.
dataset_id (str): BigQuery dataset ID.
table_id (str): BigQuery table ID.
schema_json (str): Path to the JSON file where the schema will be saved.
"""
# Initialize BigQuery client.
client = get_bigquery_client(project, impersonate_service_account)
# Get the table.
table_ref = client.dataset(dataset_id).table(table_id)
table = client.get_table(table_ref)
# Convert current schema (list of SchemaField objects) into a list of dictionaries.
current_schema = [field.to_api_repr() for field in table.schema]
with open(schema_json, "w") as f:
json.dump(current_schema, f, indent=2)
def update_table_schema(
project: str,
dataset_id: str,
table_id: str,
schema_json: str,
dryrun: bool = True,
impersonate_service_account: str = "",
):
"""Update table schema with the specified JSON file.
Args:
project (str): GCP project ID.
dataset_id (str): BigQuery dataset ID.
table_id (str): BigQuery table ID.
schema_json (str): Path to the JSON file with the schema definition.
dryrun (bool): Dry run (do not update the schema).
Returns:
None
"""
# Initialize BigQuery client.
client = get_bigquery_client(project, impersonate_service_account)
# Get the table.
table_ref = client.dataset(dataset_id).table(table_id)
table = client.get_table(table_ref)
# Convert current schema (list of SchemaField objects) into a list of dictionaries.
current_schema = [field.to_api_repr() for field in table.schema]
# Load the complete schema from the JSON file.
with open(schema_json, "r") as f:
new_schema_dict = json.load(f)
# Convert new schema dictionaries to SchemaField objects.
new_schema = [bigquery.SchemaField.from_api_repr(field) for field in new_schema_dict]
added_fields = diff_schemas(current_schema, new_schema_dict)
if not added_fields:
print(f"Schema is up-to-date for table {project}.{dataset_id}.{table_id}")
return
if dryrun:
# Identify new fields that are not in the current schema.
print(f"Fields to be added: {added_fields}")
else:
# Assign the new schema to the table and update it.
table.schema = new_schema
client.update_table(table, ["schema"])
print(f"Successfully updated schema for table {dataset_id}.{table_id}")
return
if __name__ == "__main__":
parser = argparse.ArgumentParser(
description="Update a BigQuery table schema by adding nested fields using a patch JSON file."
)
parser.add_argument("--impersonate_service_account", help="Service account email to impersonate")
parser.add_argument("--project", required=True, help="GCP project ID")
parser.add_argument("--dataset", required=True, help="BigQuery dataset ID")
parser.add_argument("--table", required=True, help="BigQuery table ID")
parser.add_argument(
"--action", required=True, choices=["update", "load"], default="load", help="Action to perform (default: load)"
)
parser.add_argument("--schema_json", required=True, help="Path to the JSON file with the schema definition")
parser.add_argument(
"--no-dryrun", action="store_false", dest="dryrun", help="Perform actual update (default is dry run)"
)
args = parser.parse_args()
if args.action == "load":
load_current_schema(args.project, args.dataset, args.table, args.schema_json, args.impersonate_service_account)
elif args.action == "update":
update_table_schema(
args.project, args.dataset, args.table, args.schema_json, args.dryrun, args.impersonate_service_account
)
else:
raise ValueError("Invalid action. Use 'update' or 'load'.")
This script allows you to load the current schema, update it with new fields, and apply changes. Key functions include:
load_current_schema()
: Loads the current schema from a specified BigQuery table.update_table_schema()
: Updates the table schema based on a JSON file.
#### Usage Instructions
1. Load the Current Schema
To load the current schema, run:
python manage_schema.py — project <project> — dataset <dataset> — table <table> — action load — schema_json tmp/schema.json
2. Update the Schema File
Add your new nested column to the tmp/schema.json
file.
3. Dry Run to Check Changes
Before applying the changes, perform a dry run:
python manage_schema.py — project <project> — dataset <dataset> — table <table> — action update — schema_json tmp/schema.json
4. Apply the Schema Changes
Finally, if everything looks good, execute the command to update the schema:
python manage_schema.py — project <project> — dataset <dataset> — table <table> — action update — schema_json tmp/schema.json — no-dryrun
### Option3. dbt-python
If you want to integrate this process into your dbt workflow, you can leverage dbt-python to create a model that automatically updates the schema during incremental processing. This allows for more seamless updates within your dbt environment.
## Apply Process
- Manual: manually run the bq command or script when schema change is necessary.
- Airflow or any other dbt executor: Apply schema in the process of dbt execution. (with dbt-python etc)
- GitHub Actions: Apply schema changes as part of CI/CD (script or bq command)
You can schedule a script using tools like Airflow or GitHub Actions to have your schema updates run automatically as part of your data pipeline. Automating this process can save time and reduce the risk of human error.
## Summary
In this article, we covered workarounds to append nested columns in BigQuery when using dbt for incremental updates. We learned that because of BigQuery’s limitations, we need to utilize the bq
command or a Python script for schema updates. By following the steps outlined above, you can effectively manage your BigQuery schemas and ensure your dbt models remain up to date.
For further reading and to dive deeper into the technical details, you can refer to the following resources: