GCP to Lakehouse
Extract BigQuery datasets into Delta tables while keeping the target operational model consistent with other ContractForge sources.
Connector
Read BigQuery tables and SQL queries through the Spark BigQuery connector, then use ContractForge contracts for quality, transformations, idempotency, governance and Delta write semantics.
Extract BigQuery datasets into Delta tables while keeping the target operational model consistent with other ContractForge sources.
Use top-level source.query for curated subsets, joins or filtered windows that belong to the source system.
Use table selection, predicates and query projection to avoid unnecessary BigQuery scan cost.
Apply annotations, operations metadata and access contracts to the Delta table created from BigQuery data.
| Requirement | Details |
|---|---|
| Spark connector | The runtime must have the Spark BigQuery connector available for spark.read.format("bigquery"). |
| Credentials | Use a service account through runtime identity, a credentials file path or connector-supported credential options. |
| BigQuery permissions | The principal needs read permissions on source tables and job permissions on the billed/project parent project. |
| Materialization dataset | Query reads can require a materialization project/dataset depending on connector and BigQuery view/query behavior. |
| Location alignment | Materialization dataset location must be compatible with the queried tables. |
| Network | The runtime must reach Google APIs. In locked-down environments, configure egress/private access before ingestion. |
On Databricks serverless, the most stable pattern is to expose BigQuery through Unity Catalog Lakehouse Federation and then read it with the ContractForge table or sql source. Use the direct Spark BigQuery connector only when the runtime supports the connector package, credential materialization and Google API egress required by that connector.
| Pattern | Use when | ContractForge connector |
|---|---|---|
| Lakehouse Federation | The workspace has a Unity Catalog connection and foreign catalog for BigQuery. | table or sql |
| Direct Spark BigQuery connector | The Spark BigQuery connector, credentials and Google API egress are available in the runtime. | bigquery |
| Staged extract | Security policy blocks direct/federated access or the extract is too large for scheduled cross-cloud reads. | Object storage or file connector after upstream export |
source:
type: connector
connector: sql
query: |
SELECT
order_id,
customer_id,
order_status,
total_amount,
updated_at
FROM bigquery_federated.contractforge_demo.orders
WHERE updated_at >= TIMESTAMP('2026-01-01')
catalog: main
target_schema: bronze_gcp
target_table: b_bigquery_orders_federated
ctrl_schema: ops
layer: bronze
mode: scd0_overwrite
source_system: bigquery
quality_rules:
min_rows: 1
required_columns: [order_id, customer_id, order_status, updated_at]
not_null: [order_id, customer_id, updated_at]
from contractforge import ingest
result = ingest(
source={
"type": "connector",
"connector": "sql",
"query": """
SELECT order_id, customer_id, order_status, total_amount, updated_at
FROM bigquery_federated.contractforge_demo.orders
WHERE updated_at >= TIMESTAMP('2026-01-01')
""",
},
catalog="main",
target_schema="bronze_gcp",
target_table="b_bigquery_orders_federated",
ctrl_schema="ops",
layer="bronze",
mode="scd0_overwrite",
source_system="bigquery",
quality_rules={
"min_rows": 1,
"required_columns": ["order_id", "customer_id", "order_status", "updated_at"],
"not_null": ["order_id", "customer_id", "updated_at"],
},
)
When BigQuery is exposed as a catalog object, BigQuery authentication lives in the Unity Catalog connection. ContractForge then only needs catalog permissions and a normal table or sql contract.
source:
type: connector
connector: bigquery
table: test-big.contractforge_demo.orders
options:
parentProject: "{{ secret:gcp/project_id }}"
credentialsFile: /dbfs/FileStore/contractforge/secrets/bigquery-reader.json
read:
source_complete: true
catalog: main
target_schema: bronze_gcp
target_table: b_bigquery_orders
ctrl_schema: ops
layer: bronze
mode: scd0_overwrite
source_system: bigquery
schema_policy: permissive
quality_rules:
min_rows: 1
required_columns: [order_id, customer_id, order_status, updated_at]
not_null: [order_id, customer_id, updated_at]
unique_key: [order_id]
on_quality_fail: fail
from contractforge import ingest
result = ingest(
source={
"type": "connector",
"connector": "bigquery",
"table": "test-big.contractforge_demo.orders",
"options": {
"parentProject": "{{ secret:gcp/project_id }}",
"credentialsFile": "/dbfs/FileStore/contractforge/secrets/bigquery-reader.json",
},
"read": {"source_complete": True},
},
catalog="main",
target_schema="bronze_gcp",
target_table="b_bigquery_orders",
ctrl_schema="ops",
layer="bronze",
mode="scd0_overwrite",
source_system="bigquery",
quality_rules={
"min_rows": 1,
"required_columns": ["order_id", "customer_id", "order_status", "updated_at"],
"not_null": ["order_id", "customer_id", "updated_at"],
"unique_key": ["order_id"],
},
)
Prefer top-level source.query for contract readability. Connector-specific generated contracts may also use source.options.query; ContractForge treats both as query reads in source metadata.
source:
type: connector
connector: bigquery
query: |
SELECT
order_id,
customer_id,
order_status,
total_amount,
updated_at
FROM `test-big.contractforge_demo.orders`
WHERE updated_at >= TIMESTAMP('2026-01-01')
options:
parentProject: "{{ secret:gcp/project_id }}"
credentialsFile: /dbfs/FileStore/contractforge/secrets/bigquery-reader.json
viewsEnabled: "true"
materializationProject: "{{ secret:gcp/project_id }}"
materializationDataset: contractforge_spark_materialization
catalog: main
target_schema: silver_gcp
target_table: s_bigquery_orders
ctrl_schema: ops
layer: silver
mode: scd1_upsert
merge_keys: [order_id]
watermark_columns: [updated_at]
transform:
deduplicate:
keys: [order_id]
order_by: updated_at DESC NULLS LAST
from contractforge import ingest
result = ingest(
source={
"type": "connector",
"connector": "bigquery",
"query": """
SELECT order_id, customer_id, order_status, total_amount, updated_at
FROM `test-big.contractforge_demo.orders`
WHERE updated_at >= TIMESTAMP('2026-01-01')
""",
"options": {
"parentProject": "{{ secret:gcp/project_id }}",
"credentialsFile": "/dbfs/FileStore/contractforge/secrets/bigquery-reader.json",
"viewsEnabled": "true",
"materializationProject": "{{ secret:gcp/project_id }}",
"materializationDataset": "contractforge_spark_materialization",
},
},
catalog="main",
target_schema="silver_gcp",
target_table="s_bigquery_orders",
ctrl_schema="ops",
layer="silver",
mode="scd1_upsert",
merge_keys=["order_id"],
watermark_columns=["updated_at"],
transform={"deduplicate": {"keys": ["order_id"], "order_by": "updated_at DESC NULLS LAST"}},
)
Query reads and view reads can require viewsEnabled, materializationProject and materializationDataset. Keep the dataset in a compatible BigQuery location and grant the service account the required permissions.
Credential handling is delegated to the Spark BigQuery connector. ContractForge stores the declarative source options and redacts secrets, but it does not replace the connector's authentication model.
| Approach | Use when | Contract option |
|---|---|---|
| Credentials file | The runtime can access a mounted or workspace-staged service-account JSON file. | credentialsFile |
| Runtime identity | The platform provides Google credentials to Spark without an explicit file. | Connector/runtime specific |
| Secret-backed file creation | The service-account JSON is stored as a secret and materialized by setup code before ingestion. | credentialsFile points to the materialized file |
Store sensitive values in the secret manager and pass a runtime path or connector-supported secret reference. Control tables redact ContractForge metadata, but Spark connector internals may emit their own messages.
BigQuery extraction can be filtered by query while ContractForge persists target-side watermark state and handles idempotent writes. Keep the query deterministic and deduplicate before merge.
source:
type: connector
connector: bigquery
query: |
SELECT *
FROM `test-big.contractforge_demo.customers`
WHERE updated_at >= TIMESTAMP('{{ runtime:last_watermark }}')
options:
parentProject: "{{ secret:gcp/project_id }}"
credentialsFile: /dbfs/FileStore/contractforge/secrets/bigquery-reader.json
materializationProject: "{{ secret:gcp/project_id }}"
materializationDataset: contractforge_spark_materialization
mode: scd1_hash_diff
hash_keys: [customer_id]
watermark_columns: [updated_at]
dedup_order_expr: updated_at DESC NULLS LAST
quality_rules:
not_null: [customer_id, updated_at]
unique_key: [customer_id]
BigQuery supports STRUCT and ARRAY types. Depending on connector behavior and target expectations, keep nested data as columns, flatten selected structs or explode arrays with transform.shape.
transform:
shape:
flatten:
- path: shipping_address
prefix: shipping_
explode:
- path: items
outer: true
prefix: item_
project:
- order_id
- customer_id
- shipping_city
- shipping_country
- item_sku
- item_quantity
- updated_at
source_complete: true only when the extract really represents the full source slice needed by the write mode.SELECT
run_id,
status,
target_table,
source_connector,
get_json_object(source_metrics_json, '$.source_query') AS source_query,
rows_read,
rows_written,
source_metrics_json
FROM main.ops.ctrl_ingestion_runs
WHERE source_connector = 'bigquery'
ORDER BY started_at_utc DESC;
For query reads, the source metrics should indicate that the effective read used a query, regardless of whether the query was declared at top level or inside connector options.
| Symptom | Likely cause | Action |
|---|---|---|
| Connector class not found | Spark BigQuery connector is not installed. | Install a connector package compatible with the Spark/Scala runtime. |
| Permission denied | Service account lacks BigQuery read/job/materialization permissions. | Grant dataset read permissions and project job permissions. |
| Query/view read fails | Materialization dataset is missing, wrong location or not writable. | Configure viewsEnabled, materializationProject and materializationDataset. |
| Unexpected high cost | Full table scan or too broad query. | Push down projection and predicates; validate bytes processed in BigQuery. |
| MERGE duplicate key error | Source query returns multiple versions per key. | Use transform.deduplicate with deterministic ordering. |