Copy source tables
Land Snowflake data into Delta using overwrite, append or partition replacement when the source extract is complete.
Connector
Read Snowflake tables and SQL queries through the Spark Snowflake connector while keeping authentication, extraction, transformation, quality and target semantics explicit in the contract.
Land Snowflake data into Delta using overwrite, append or partition replacement when the source extract is complete.
Combine Snowflake queries with ContractForge watermarks, deduplication and SCD1/SCD2 write modes.
Use annotations and operations contracts to document ownership, PII, criticality and support metadata on the Delta target.
ContractForge validates and records the source contract. The actual Snowflake read is performed by the installed Spark connector.
| Requirement | Details |
|---|---|
| Spark connector | The runtime must have the Snowflake Spark connector available for spark.read.format("snowflake"). |
| JDBC driver | The Snowflake JDBC driver is required by the Spark connector. Validate connector/JDBC versions when authentication features behave differently. |
| Network route | The Databricks runtime must reach *.snowflakecomputing.com or the account PrivateLink endpoint. |
| Warehouse | sfWarehouse must point to a running or auto-resumable warehouse. |
| Role permissions | The role must have warehouse usage, database/schema usage and table/query read privileges. |
| Secrets | Passwords, private keys, PATs and account identifiers should be injected through Databricks secrets or the runtime secret manager. |
Serverless can run Snowflake ingestion when the Snowflake Spark connector is available, credentials are service-oriented and the Snowflake account allows the Databricks egress path. Authentication and network policy should be validated before adding transformations or write-mode complexity.
| Area | Serverless guidance |
|---|---|
| Connector package | The runtime must expose a compatible Spark Snowflake connector and JDBC driver. If package installation is blocked, use a runtime or cluster policy that provides them. |
| Authentication | Use a service user with key-pair/JWT or PAT. Avoid personal users with interactive MFA for scheduled jobs. |
| Network policy | Snowflake may reject valid credentials until a network policy allows the Databricks egress IP, CIDR or PrivateLink path for the service user. |
| Secret storage | Store account, user, role, warehouse, PAT and private key material in Databricks secrets. ContractForge redacts source metadata before writing control tables. |
| Failure diagnosis | Use ctrl_ingestion_errors for the full stack trace and Snowflake login history for authentication/network-policy failure codes. |
A successful token or key configuration is not enough if Snowflake blocks the runtime egress path. Treat Snowflake network policy as a first-class prerequisite for serverless jobs.
source:
type: connector
connector: snowflake
table: CF_CUSTOMERS
options:
sfURL: "{{ secret:contractforge-snowflake/sf_account }}.snowflakecomputing.com"
sfUser: "{{ secret:contractforge-snowflake/sf_user }}"
sfPassword: "{{ secret:contractforge-snowflake/sf_password }}"
sfWarehouse: "{{ secret:contractforge-snowflake/sf_warehouse }}"
sfDatabase: "{{ secret:contractforge-snowflake/sf_database }}"
sfSchema: "{{ secret:contractforge-snowflake/sf_schema }}"
sfRole: "{{ secret:contractforge-snowflake/sf_role }}"
read:
source_complete: true
catalog: main
target_schema: bronze_ext
target_table: b_snowflake_customers
ctrl_schema: ops
layer: bronze
mode: scd0_overwrite
source_system: snowflake
schema_policy: permissive
quality_rules:
min_rows: 1
required_columns: [CUSTOMER_ID, CUSTOMER_CODE, FULL_NAME, UPDATED_AT]
not_null: [CUSTOMER_ID, CUSTOMER_CODE, UPDATED_AT]
unique_key: [CUSTOMER_ID]
on_quality_fail: fail
from contractforge import ingest
result = ingest(
source={
"type": "connector",
"connector": "snowflake",
"table": "CF_CUSTOMERS",
"options": {
"sfURL": "{{ secret:contractforge-snowflake/sf_account }}.snowflakecomputing.com",
"sfUser": "{{ secret:contractforge-snowflake/sf_user }}",
"sfPassword": "{{ secret:contractforge-snowflake/sf_password }}",
"sfWarehouse": "{{ secret:contractforge-snowflake/sf_warehouse }}",
"sfDatabase": "{{ secret:contractforge-snowflake/sf_database }}",
"sfSchema": "{{ secret:contractforge-snowflake/sf_schema }}",
"sfRole": "{{ secret:contractforge-snowflake/sf_role }}",
},
"read": {"source_complete": True},
},
catalog="main",
target_schema="bronze_ext",
target_table="b_snowflake_customers",
ctrl_schema="ops",
layer="bronze",
mode="scd0_overwrite",
source_system="snowflake",
quality_rules={
"min_rows": 1,
"required_columns": ["CUSTOMER_ID", "CUSTOMER_CODE", "FULL_NAME", "UPDATED_AT"],
"not_null": ["CUSTOMER_ID", "CUSTOMER_CODE", "UPDATED_AT"],
"unique_key": ["CUSTOMER_ID"],
},
)
Use top-level source.query for reviewed SQL extracts. ContractForge passes it to the Spark connector as a query read and marks source_query=true in source metadata.
source:
type: connector
connector: snowflake
query: |
SELECT
ORDER_ID,
CUSTOMER_ID,
ORDER_STATUS,
TOTAL_AMOUNT,
UPDATED_AT
FROM CF_ORDERS
WHERE UPDATED_AT >= TO_TIMESTAMP_NTZ('2026-01-01')
options:
sfURL: "{{ secret:contractforge-snowflake/sf_account }}.snowflakecomputing.com"
sfUser: "{{ secret:contractforge-snowflake/sf_user }}"
sfPassword: "{{ secret:contractforge-snowflake/sf_password }}"
sfWarehouse: "{{ secret:contractforge-snowflake/sf_warehouse }}"
sfDatabase: "{{ secret:contractforge-snowflake/sf_database }}"
sfSchema: "{{ secret:contractforge-snowflake/sf_schema }}"
sfRole: "{{ secret:contractforge-snowflake/sf_role }}"
mode: scd1_upsert
merge_keys: [ORDER_ID]
watermark_columns: [UPDATED_AT]
transform:
deduplicate:
keys: [ORDER_ID]
order_by: UPDATED_AT DESC NULLS LAST
from contractforge import ingest
result = ingest(
source={
"type": "connector",
"connector": "snowflake",
"query": """
SELECT ORDER_ID, CUSTOMER_ID, ORDER_STATUS, TOTAL_AMOUNT, UPDATED_AT
FROM CF_ORDERS
WHERE UPDATED_AT >= TO_TIMESTAMP_NTZ('2026-01-01')
""",
"options": {
"sfURL": "{{ secret:contractforge-snowflake/sf_account }}.snowflakecomputing.com",
"sfUser": "{{ secret:contractforge-snowflake/sf_user }}",
"sfPassword": "{{ secret:contractforge-snowflake/sf_password }}",
"sfWarehouse": "{{ secret:contractforge-snowflake/sf_warehouse }}",
"sfDatabase": "{{ secret:contractforge-snowflake/sf_database }}",
"sfSchema": "{{ secret:contractforge-snowflake/sf_schema }}",
"sfRole": "{{ secret:contractforge-snowflake/sf_role }}",
},
},
catalog="main",
target_schema="silver_sales",
target_table="s_snowflake_orders",
ctrl_schema="ops",
layer="silver",
mode="scd1_upsert",
merge_keys=["ORDER_ID"],
watermark_columns=["UPDATED_AT"],
transform={"deduplicate": {"keys": ["ORDER_ID"], "order_by": "UPDATED_AT DESC NULLS LAST"}},
)
| Method | Contract shape | Operational notes |
|---|---|---|
| Password | sfUser + sfPassword | Simple for development. Avoid personal users for scheduled ingestion. |
| Key-pair / JWT | sfAuthenticator: SNOWFLAKE_JWT, pem_private_key, sfUser | Recommended for service users when supported by the installed connector/JDBC stack. |
| Programmatic Access Token | sfUser + PAT value in sfPassword | Use a service user, ROLE_RESTRICTION and Snowflake network policy. Do not add sfAuthenticator=programmatic_access_token unless your connector version explicitly supports it. |
Keep the private key in a secret. Depending on how the key is stored, your runtime may need a small setup notebook to normalize newlines before passing it to the connector.
source:
type: connector
connector: snowflake
table: CF_CUSTOMERS
options:
sfURL: "{{ secret:contractforge-snowflake/sf_account }}.snowflakecomputing.com"
sfUser: "{{ secret:contractforge-snowflake/sf_user }}"
sfAuthenticator: SNOWFLAKE_JWT
pem_private_key: "{{ secret:contractforge-snowflake/sf_private_key }}"
sfWarehouse: "{{ secret:contractforge-snowflake/sf_warehouse }}"
sfDatabase: "{{ secret:contractforge-snowflake/sf_database }}"
sfSchema: "{{ secret:contractforge-snowflake/sf_schema }}"
sfRole: "{{ secret:contractforge-snowflake/sf_role }}"
Snowflake PAT works as a password-like secret for the Spark connector path. Generate it for the service user and restrict it to the ingestion role.
USE ROLE ACCOUNTADMIN;
ALTER USER IF EXISTS CFINGESTSVC
ADD PROGRAMMATIC ACCESS TOKEN CONTRACTFORGE_DATABRICKS_PAT
ROLE_RESTRICTION = 'CONTRACTFORGE_INGEST_ROLE'
DAYS_TO_EXPIRY = 30;
source:
type: connector
connector: snowflake
table: CF_CUSTOMERS
options:
sfURL: "{{ secret:contractforge-snowflake/sf_account }}.snowflakecomputing.com"
sfUser: "{{ secret:contractforge-snowflake/sf_user }}"
sfPassword: "{{ secret:contractforge-snowflake/sf_pat }}"
sfWarehouse: "{{ secret:contractforge-snowflake/sf_warehouse }}"
sfDatabase: "{{ secret:contractforge-snowflake/sf_database }}"
sfSchema: "{{ secret:contractforge-snowflake/sf_schema }}"
sfRole: "{{ secret:contractforge-snowflake/sf_role }}"
Snowflake can require a network policy for PAT authentication. Allow the Databricks egress IP, PrivateLink path or approved CIDR for the service user. If the SQL API accepts the PAT but Spark fails, inspect the installed Snowflake connector/JDBC versions.
Snowflake can expose VARIANT or JSON-like payloads as strings through Spark. Keep the connector focused on extraction and parse or flatten data with transform.shape.
source:
type: connector
connector: snowflake
query: |
SELECT EVENT_ID, EVENT_TS, EVENT_PAYLOAD
FROM CF_EVENTS
options:
sfURL: "{{ secret:contractforge-snowflake/sf_account }}.snowflakecomputing.com"
sfUser: "{{ secret:contractforge-snowflake/sf_user }}"
sfPassword: "{{ secret:contractforge-snowflake/sf_pat }}"
sfWarehouse: "{{ secret:contractforge-snowflake/sf_warehouse }}"
sfDatabase: "{{ secret:contractforge-snowflake/sf_database }}"
sfSchema: "{{ secret:contractforge-snowflake/sf_schema }}"
sfRole: "{{ secret:contractforge-snowflake/sf_role }}"
transform:
shape:
parse_json:
EVENT_PAYLOAD:
schema: "STRUCT<device:STRING, severity:STRING, metrics:STRUCT<temperature:DOUBLE,battery:DOUBLE>>"
target_column: payload
flatten:
- path: payload.metrics
prefix: metric_
raise_on_failure=True defaults in jobs so failed contracts fail the Databricks task.ctrl_ingestion_runs.source_options_json to confirm options are redacted and source_connector=snowflake.ctrl_ingestion_errors for full connector stack traces when Databricks job output is truncated.SELECT
run_id,
status,
target_table,
source_connector,
rows_read,
rows_written,
quality_status,
error_message
FROM main.ops.ctrl_ingestion_runs
WHERE source_connector = 'snowflake'
ORDER BY started_at_utc DESC;
| Symptom | Likely cause | Action |
|---|---|---|
Connection string is invalid | Malformed sfURL or unresolved embedded secret. | Use {{ secret:scope/key }}.snowflakecomputing.com and inspect redacted source options. |
Incorrect username or password | Wrong password/PAT/private key or token generated for a different user. | Validate with Snowflake SQL API or connector outside the ingestion flow. |
Network policy is required | Snowflake policy requires allowlisting before PAT login. | Attach a network policy to the service user with the Databricks egress path. |
| Connector class not found | Spark Snowflake connector or JDBC driver is missing. | Install the connector package/JAR on the runtime. |
| Query works in Snowflake but not in Spark | Connector option, warehouse role or SQL dialect edge case. | Start with a minimal query and add options incrementally. |