Ingestion contract.
The ingestion contract is the executable definition of a table pipeline: source, target, write mode, schema policy, quality rules, transformations, execution controls and runtime behavior.
Complete example
- YAML
- Python
_metadata:
contract_version: 1.0.0
source:
type: postgres
system: crm_postgres
options:
url: "{{ secret:crm/postgres_url }}"
dbtable: public.customers
driver: org.postgresql.Driver
read:
partition_column: customer_id
lower_bound: 1
upper_bound: 1000000
num_partitions: 8
fetchsize: 10000
target:
catalog: main
schema: crm_curated
table: s_customers
layer: silver
mode: hash_diff_upsert
merge_keys: [customer_id]
hash_keys: [email, status, credit_limit]
transform:
cast:
credit_limit: decimal(18,2)
updated_at: timestamp
standardize:
email:
trim: true
lower: true
empty_as_null: true
status:
trim: true
lower: true
derive:
updated_date: to_date(updated_at)
deduplicate:
keys: [customer_id]
order_by:
- column: updated_at
direction: desc
nulls: last
schema_policy: additive_only
quality_rules:
not_null: [customer_id]
unique_key: [customer_id]
expressions:
- name: valid_status
expression: status IN ('active', 'inactive')
severity: quarantine
from contractforge_core.contracts import validate_contract
from contractforge_databricks import render_databricks_contract
contract = validate_contract({
"source": {"type": "postgres", "system": "crm_postgres", "table": "public.customers"},
"target": {"catalog": "main", "schema": "crm_curated", "table": "s_customers"},
"layer": "silver",
"mode": "hash_diff_upsert",
"merge_keys": ["customer_id"],
"hash_keys": ["email", "status", "credit_limit"],
"schema_policy": "additive_only",
})
artifacts = render_databricks_contract(contract)
Top-level fields
| Field | Type | Default | Use |
|---|---|---|---|
_metadata | object | Contract metadata removed before execution and kept for audit. | |
source | object | required | Source intent, connector/reference, table/query/path and source metadata. |
target | object | required | Canonical target object. |
target.catalog | string | main | Target catalog. |
target.schema | string | layer | Physical target schema. |
target.table | string | required | Target table name. |
layer | string | bronze | Logical layer. It is not restricted to bronze/silver/gold. |
mode | enum | append | Write mode. |
preset / presets | string | list[string] | unset | Preset names applied before plan validation. |
applied_presets | list[string] | [] | Recorded preset names, usually populated by the preset engine. |
description, owner, domain, tags, sla | strings/list | null/[] | Lightweight metadata. Prefer annotations/operations for catalog and operational metadata. |
runtime_parameters | object | Free-form runtime values recorded with the run. |
Source object fields
| Field | Type | Default | Use |
|---|---|---|---|
source.type | string | required | Portable, bounded, connection or adapter-specific source type. |
source.connector | string | optional | Connector identifier for type: connector compatibility. |
source.connection_path | string | null | null | Reusable connection YAML path when source.type: connection. |
source.system | string | null | null | Source system label written to evidence. |
source.name | string | null | null | Logical source name for metadata. |
source.provider | string | null | null | Object storage provider: adls, azure_blob, gcs, s3. |
source.format | string | null | null | File, HTTP or connector response format. |
source.path | string | null | null | Filesystem path, object storage path, HTTP URL or source location. |
source.account_url | string | null | null | Azure Blob account URL. |
source.container | string | null | null | Azure Blob/ADLS container. |
source.table | string | null | null | Catalog/JDBC/external connector table. |
source.query | string | null | null | SQL query for SQL, JDBC or external Spark connectors. |
source.schema | string | unset | Alias for source.read.schema. |
source.options | object | Spark/connector options. | |
source.read | object | Framework-aware read controls. | |
source.request | object | HTTP request settings. | |
source.auth | object | Authentication settings. | |
source.pagination | object | REST pagination settings. | |
source.response | object | REST/HTTP response parsing settings. | |
source.incremental | object | Connector-side incremental settings. | |
source.limits | object | Timeout, retry, rate and byte limits. |
Connector namespaces
| Namespace | Fields |
|---|---|
source.read | schema, source_complete, full_snapshot, recursiveFileLookup, pathGlobFilter, file_regex, file_regex_scope, file_regex_max_listed, file_regex_recursive, JDBC partition_column/lower_bound/upper_bound/num_partitions/fetchsize, JSON materialization staging_path/json_options, Auto Loader schema_location/checkpoint_location/schema_hints/include_existing_files/max_files_per_trigger. |
source.request | url, method, headers, params, json, body. |
source.auth | type, token, header, value/key, username/user, password, OAuth token_url/client_id/client_secret/scope, AWS access_key_id/secret_access_key/session_token/region/credential_provider, Azure sas_token. |
source.pagination | type, max_pages, page_param, start_page, offset_param, limit_param, page_size, next_cursor_path, cursor_param. |
source.response | mode, raw_column, records_path, format, encoding. |
source.incremental | watermark_column, initial_value, JDBC predicate, REST watermark_param/watermark_header/watermark_body_field. |
source.limits | timeout_seconds, retry_attempts, retry_backoff_seconds, rate_limit_per_minute, max_pages, max_page_bytes, max_total_bytes. |
Selection, transform and filtering
| Field | Type | Default | Use |
|---|---|---|---|
select_columns | string | list[string] | [] | Keep only selected source columns. |
column_mapping | object[string,string] | Rename source columns before framework technical columns are added. | |
filter_expression | string | null | null | Spark SQL filter. Do not combine with full snapshot soft delete. |
watermark_columns | string | list[string] | [] | Incremental filter/state columns. |
shape | object | null | null | Direct shape namespace. Prefer transform.shape. |
transform.shape | object | null | null | Declarative parse, flatten, array and projection operations. See Transformations. |
transform.cast | object[string,string] | Cast existing columns to Spark SQL types after shape. | |
transform.standardize | object | Trim, case-normalize, whitespace-normalize and nullify empty strings on declared columns. | |
transform.derive | object[string,string] | Create or replace columns with Spark SQL expressions. | |
transform.deduplicate.keys | string | list[string] | required when deduplicate exists | Deduplication keys. |
transform.deduplicate.order_by | string | list[object] | required when deduplicate exists | Deterministic order. Prefer structured items with column, direction and optional nulls. |
Write and optimization
| Field | Type | Default | Use |
|---|---|---|---|
merge_keys | string | list[string] | [] | Keys for upsert, historical and snapshot modes. |
hash_strategy | explicit | all_columns_except | explicit | Hash input strategy for hash_diff_upsert. Use explicit with hash_keys for governed tables; use all_columns_except for wide tables. |
hash_keys | string | list[string] | [] | Content columns for hash_diff_upsert when hash_strategy: explicit. These are not row identity keys. |
hash_exclude_columns | string | list[string] | [] | Additional columns excluded from row hash. ContractForge-generated columns are excluded automatically. |
custom_keys | object | Mode/custom writer key groups. | |
dedup_order_expr | string | null | null | Deterministic ordering for source/target deduplication. |
partition_column | string | null | null | Target partition column. |
partition_value | string | null | null | Single partition value; requires partition_column. |
merge_strategy | enum | delta | delta, delta_by_partition or replace_partitions. |
merge_partition_column | string | null | null | Partition column for delta_by_partition. |
replace_partitions_source_complete | boolean | false | Evidence required for partition replacement. |
extensions.databricks.cluster_columns | string | list[string] | [] | Databricks clustering columns. |
extensions.databricks.partition_columns | string | list[string] | [] | Databricks target partition columns. |
extensions.databricks.delta_properties | object[string,string] | Delta table properties. |
Schema, quality and runtime
| Field | Type | Default | Use |
|---|---|---|---|
schema_policy | permissive | additive_only | strict | permissive | Target schema handling. |
allow_type_widening | boolean | false | Allow supported widening changes. |
quality_rules.required_columns | string | list[string] | [] | Required schema columns. |
quality_rules.not_null | string | list[string] | [] | Row-level not-null checks. |
quality_rules.unique_key | string | list[string] | [] | Set-level uniqueness check. |
quality_rules.accepted_values | object[string,list] | Allowed values by column. | |
quality_rules.min_rows | integer | unset | Minimum row count. |
quality_rules.max_null_ratio | object[string,number] | Maximum null ratio by column. | |
quality_rules.expressions | list[object] | [] | SQL boolean rules with name, expression, severity and optional message. |
quality_rules.custom | object | Registered custom quality rules. | |
on_quality_fail | fail | warn | quarantine | fail | Global legacy action for built-in quality failures. |
scd2_change_columns | string | list[string] | [] | Historical comparison columns. |
scd2_effective_from_column | string | null | null | Historical effective-from source column. |
fix_encoding, encoding, encoding_columns | boolean/string/list | false/Windows-1252/[] | Optional encoding repair. |
dry_run | boolean | false | Validate without committing writes/control side effects. |
explain_mode, explain_format | boolean/enum | false/formatted | Capture Spark explain plans. |
openlineage_enabled, openlineage_namespace, openlineage_producer | boolean/string/string | false/null/contractforge | OpenLineage event metadata. |
use_cache | boolean | true | Allow framework caching. |
lock_enabled | boolean | false | Acquire framework lock. |
idempotency_key, idempotency_policy | string/enum | null/always_run | Idempotent execution policy. |
retry_attempts, retry_backoff_seconds | integer | null | global config | Retry behavior override. |
parent_run_id, run_group_id | string | null | null | Parent/group IDs for stream or window child runs. |
Execution window and catchup
| Field | Type | Default | Use |
|---|---|---|---|
execution.window.column | string | required | Column filtered by each explicit window. |
execution.window.windows[].start, end, label | string/string/string | required/required/null | Explicit window boundaries and label. |
execution.window.start, end, every | string | unset | Generated windows such as every 1 day. |
execution.window.stop_on_failure | boolean | true | Stop later windows after a failure. |
execution.catchup.enabled | boolean | false | Generate catchup windows from saved state/watermark. |
execution.catchup.column, start, end, every, stop_on_failure | mixed | null/null/null/null/true | Catchup window generation settings. |