Contract Reference
Ingestion contract.
The ingestion contract is the executable definition of a table pipeline: source, target, write mode, schema policy, quality rules, transformations, execution controls and runtime behavior.
Complete example
_metadata:
contract_version: 1.0.0
source:
type: connector
connector: postgres
options:
url: "{{ secret:crm/postgres_url }}"
dbtable: public.customers
driver: org.postgresql.Driver
read:
partition_column: customer_id
lower_bound: 1
upper_bound: 1000000
num_partitions: 8
fetchsize: 10000
target:
catalog: main
schema: crm_curated
table: s_customers
layer: silver
mode: scd1_hash_diff
hash_keys: [customer_id]
dedup_order_expr: updated_at DESC NULLS LAST
schema_policy: additive_only
quality_rules:
not_null: [customer_id]
unique_key: [customer_id]
expressions:
- name: valid_status
expression: status IN ('active', 'inactive')
severity: quarantine
from contractforge import ingest
result = ingest(
source={
"type": "connector",
"connector": "postgres",
"options": {
"url": "{{ secret:crm/postgres_url }}",
"dbtable": "public.customers",
"driver": "org.postgresql.Driver",
},
"read": {
"partition_column": "customer_id",
"lower_bound": 1,
"upper_bound": 1000000,
"num_partitions": 8,
"fetchsize": 10000,
},
},
catalog="main",
target_schema="crm_curated",
target_table="s_customers",
layer="silver",
mode="scd1_hash_diff",
hash_keys=["customer_id"],
dedup_order_expr="updated_at DESC NULLS LAST",
schema_policy="additive_only",
quality_rules={
"not_null": ["customer_id"],
"unique_key": ["customer_id"],
"expressions": [
{"name": "valid_status", "expression": "status IN ('active', 'inactive')", "severity": "quarantine"}
],
},
)
Top-level fields
| Field | Type | Default | Use |
|---|---|---|---|
_metadata | object | {} | Contract metadata removed before execution and kept for audit. |
source | string | object | required | Source table name, Python DataFrame, connector spec or Auto Loader spec. |
target | object | optional | Canonical target object. Prefer this over separate catalog/schema/table fields in YAML. |
target.catalog | string | main | Target catalog. |
target.schema | string | target_schema or layer | Physical target schema. |
target.table | string | required | Target table name. |
target_table | string | required if target.table is absent | Direct target table name. |
catalog | string | main | Direct target catalog. |
target_schema | string | null | null | Direct physical target schema. If absent, layer is used as schema. |
layer | string | bronze | Logical layer. It is not restricted to bronze/silver/gold. |
mode | enum | scd0_append | Write mode. |
preset / presets | string | list[string] | unset | Preset names applied before plan validation. |
applied_presets | list[string] | [] | Recorded preset names, usually populated by the preset engine. |
source_system | string | default | Source system label written to technical columns and control tables. |
ctrl_schema | string | ops | Control table schema. |
notebook_name | string | unknown | Execution context label. |
description, owner, domain, tags, sla | strings/list | null/[] | Lightweight metadata. Prefer annotations/operations for catalog and operational metadata. |
runtime_parameters | object | {} | Free-form runtime values recorded with the run. |
Source object fields
| Field | Type | Default | Use |
|---|---|---|---|
source.type | connector | autoloader | required | Source resolver type. |
source.connector | string | required for connector | Native or registered connector name. |
source.name | string | null | null | Logical source name for metadata. |
source.provider | string | null | null | Object storage provider: adls, azure_blob, gcs, s3. |
source.format | string | null | null | File, HTTP or connector response format. |
source.path | string | null | null | Filesystem path, object storage path, HTTP URL or source location. |
source.account_url | string | null | null | Azure Blob account URL. |
source.container | string | null | null | Azure Blob/ADLS container. |
source.table | string | null | null | Catalog/JDBC/external connector table. |
source.query | string | null | null | SQL query for SQL, JDBC or external Spark connectors. |
source.schema | string | unset | Alias for source.read.schema. |
source.options | object | {} | Spark/connector options. |
source.read | object | {} | Framework-aware read controls. |
source.request | object | {} | HTTP request settings. |
source.auth | object | {} | Authentication settings. |
source.pagination | object | {} | REST pagination settings. |
source.response | object | {} | REST/HTTP response parsing settings. |
source.incremental | object | {} | Connector-side incremental settings. |
source.limits | object | {} | Timeout, retry, rate and byte limits. |
Connector namespaces
| Namespace | Fields |
|---|---|
source.read | schema, source_complete, full_snapshot, recursiveFileLookup, pathGlobFilter, file_regex, file_regex_scope, file_regex_max_listed, file_regex_recursive, JDBC partition_column/lower_bound/upper_bound/num_partitions/fetchsize, JSON materialization staging_path/json_options, Auto Loader schema_location/checkpoint_location/schema_hints/include_existing_files/max_files_per_trigger. |
source.request | url, method, headers, params, json, body. |
source.auth | type, token, header, value/key, username/user, password, OAuth token_url/client_id/client_secret/scope, AWS access_key_id/secret_access_key/session_token/region/credential_provider, Azure sas_token. |
source.pagination | type, max_pages, page_param, start_page, offset_param, limit_param, page_size, next_cursor_path, cursor_param. |
source.response | mode, raw_column, records_path, format, encoding. |
source.incremental | watermark_column, initial_value, JDBC predicate, REST watermark_param/watermark_header/watermark_body_field. |
source.limits | timeout_seconds, retry_attempts, retry_backoff_seconds, rate_limit_per_minute, max_pages, max_page_bytes, max_total_bytes. |
Selection, transform and filtering
| Field | Type | Default | Use |
|---|---|---|---|
select_columns | string | list[string] | [] | Keep only selected source columns. |
column_mapping | object[string,string] | {} | Rename source columns before framework technical columns are added. |
filter_expression | string | null | null | Spark SQL filter. Do not combine with full snapshot soft delete. |
watermark_columns | string | list[string] | [] | Incremental filter/state columns. |
shape | object | null | null | Direct shape namespace. Prefer transform.shape. |
transform.shape | object | null | null | Declarative parse, flatten, array and projection operations. See Transformations. |
transform.deduplicate.keys | string | list[string] | required when deduplicate exists | Deduplication keys. |
transform.deduplicate.order_by | string | required when deduplicate exists | SQL order fragment such as updated_at DESC NULLS LAST. |
Write and optimization
| Field | Type | Default | Use |
|---|---|---|---|
merge_keys | string | list[string] | [] | Keys for upsert, SCD2 and snapshot modes. |
hash_keys | string | list[string] | [] | Keys for scd1_hash_diff. |
hash_exclude_columns | string | list[string] | [] | Columns excluded from row hash. |
custom_keys | object | {} | Mode/custom writer key groups. |
dedup_order_expr | string | null | null | Deterministic ordering for source/target deduplication. |
partition_column | string | null | null | Target partition column. |
partition_value | string | null | null | Single partition value; requires partition_column. |
merge_strategy | enum | delta | delta, delta_by_partition or replace_partitions. |
merge_partition_column | string | null | null | Partition column for delta_by_partition. |
replace_partitions_source_complete | boolean | false | Evidence required for partition replacement. |
cluster_columns | string | list[string] | [] | Liquid/table clustering columns where supported. |
zorder_columns | string | list[string] | [] | ZORDER columns where supported. |
optimize_after_write | boolean | false | Run post-write optimization. |
delta_properties | object[string,string] | {} | Delta table properties. |
Schema, quality and runtime
| Field | Type | Default | Use |
|---|---|---|---|
schema_policy | permissive | additive_only | strict | permissive | Target schema handling. |
allow_type_widening | boolean | false | Allow supported widening changes. |
quality_rules.required_columns | string | list[string] | [] | Required schema columns. |
quality_rules.not_null | string | list[string] | [] | Row-level not-null checks. |
quality_rules.unique_key | string | list[string] | [] | Set-level uniqueness check. |
quality_rules.accepted_values | object[string,list] | {} | Allowed values by column. |
quality_rules.min_rows | integer | unset | Minimum row count. |
quality_rules.max_null_ratio | object[string,number] | {} | Maximum null ratio by column. |
quality_rules.expressions | list[object] | [] | SQL boolean rules with name, expression, severity and optional message. |
quality_rules.custom | object | {} | Registered custom quality rules. |
on_quality_fail | fail | warn | quarantine | fail | Global legacy action for built-in quality failures. |
scd2_change_columns | string | list[string] | [] | SCD2 comparison columns. |
scd2_effective_from_column | string | null | null | SCD2 effective-from source column. |
fix_encoding, encoding, encoding_columns | boolean/string/list | false/Windows-1252/[] | Optional encoding repair. |
dry_run | boolean | false | Validate without committing writes/control side effects. |
explain_mode, explain_format | boolean/enum | false/formatted | Capture Spark explain plans. |
openlineage_enabled, openlineage_namespace, openlineage_producer | boolean/string/string | false/null/contractforge | OpenLineage event metadata. |
use_cache | boolean | true | Allow framework caching. |
lock_enabled | boolean | false | Acquire framework lock. |
idempotency_key, idempotency_policy | string/enum | null/always_run | Idempotent execution policy. |
retry_attempts, retry_backoff_seconds | integer | null | global config | Retry behavior override. |
parent_run_id, run_group_id | string | null | null | Parent/group IDs for stream or window child runs. |
Execution window and catchup
| Field | Type | Default | Use |
|---|---|---|---|
execution.window.column | string | required | Column filtered by each explicit window. |
execution.window.windows[].start, end, label | string/string/string | required/required/null | Explicit window boundaries and label. |
execution.window.start, end, every | string | unset | Generated windows such as every 1 day. |
execution.window.stop_on_failure | boolean | true | Stop later windows after a failure. |
execution.catchup.enabled | boolean | false | Generate catchup windows from saved state/watermark. |
execution.catchup.column, start, end, every, stop_on_failure | mixed | null/null/null/null/true | Catchup window generation settings. |