Skip to main content

Ingestion contract.

The ingestion contract is the executable definition of a table pipeline: source, target, write mode, schema policy, quality rules, transformations, execution controls and runtime behavior.

Complete example

_metadata:
contract_version: 1.0.0

source:
type: postgres
system: crm_postgres
options:
url: "{{ secret:crm/postgres_url }}"
dbtable: public.customers
driver: org.postgresql.Driver
read:
partition_column: customer_id
lower_bound: 1
upper_bound: 1000000
num_partitions: 8
fetchsize: 10000

target:
catalog: main
schema: crm_curated
table: s_customers

layer: silver
mode: hash_diff_upsert
merge_keys: [customer_id]
hash_keys: [email, status, credit_limit]
transform:
cast:
credit_limit: decimal(18,2)
updated_at: timestamp
standardize:
email:
trim: true
lower: true
empty_as_null: true
status:
trim: true
lower: true
derive:
updated_date: to_date(updated_at)
deduplicate:
keys: [customer_id]
order_by:
- column: updated_at
direction: desc
nulls: last
schema_policy: additive_only
quality_rules:
not_null: [customer_id]
unique_key: [customer_id]
expressions:
- name: valid_status
expression: status IN ('active', 'inactive')
severity: quarantine

Top-level fields

FieldTypeDefaultUse
_metadataobjectContract metadata removed before execution and kept for audit.
sourceobjectrequiredSource intent, connector/reference, table/query/path and source metadata.
targetobjectrequiredCanonical target object.
target.catalogstringmainTarget catalog.
target.schemastringlayerPhysical target schema.
target.tablestringrequiredTarget table name.
layerstringbronzeLogical layer. It is not restricted to bronze/silver/gold.
modeenumappendWrite mode.
preset / presetsstring | list[string]unsetPreset names applied before plan validation.
applied_presetslist[string][]Recorded preset names, usually populated by the preset engine.
description, owner, domain, tags, slastrings/listnull/[]Lightweight metadata. Prefer annotations/operations for catalog and operational metadata.
runtime_parametersobjectFree-form runtime values recorded with the run.

Source object fields

FieldTypeDefaultUse
source.typestringrequiredPortable, bounded, connection or adapter-specific source type.
source.connectorstringoptionalConnector identifier for type: connector compatibility.
source.connection_pathstring | nullnullReusable connection YAML path when source.type: connection.
source.systemstring | nullnullSource system label written to evidence.
source.namestring | nullnullLogical source name for metadata.
source.providerstring | nullnullObject storage provider: adls, azure_blob, gcs, s3.
source.formatstring | nullnullFile, HTTP or connector response format.
source.pathstring | nullnullFilesystem path, object storage path, HTTP URL or source location.
source.account_urlstring | nullnullAzure Blob account URL.
source.containerstring | nullnullAzure Blob/ADLS container.
source.tablestring | nullnullCatalog/JDBC/external connector table.
source.querystring | nullnullSQL query for SQL, JDBC or external Spark connectors.
source.schemastringunsetAlias for source.read.schema.
source.optionsobjectSpark/connector options.
source.readobjectFramework-aware read controls.
source.requestobjectHTTP request settings.
source.authobjectAuthentication settings.
source.paginationobjectREST pagination settings.
source.responseobjectREST/HTTP response parsing settings.
source.incrementalobjectConnector-side incremental settings.
source.limitsobjectTimeout, retry, rate and byte limits.

Connector namespaces

NamespaceFields
source.readschema, source_complete, full_snapshot, recursiveFileLookup, pathGlobFilter, file_regex, file_regex_scope, file_regex_max_listed, file_regex_recursive, JDBC partition_column/lower_bound/upper_bound/num_partitions/fetchsize, JSON materialization staging_path/json_options, Auto Loader schema_location/checkpoint_location/schema_hints/include_existing_files/max_files_per_trigger.
source.requesturl, method, headers, params, json, body.
source.authtype, token, header, value/key, username/user, password, OAuth token_url/client_id/client_secret/scope, AWS access_key_id/secret_access_key/session_token/region/credential_provider, Azure sas_token.
source.paginationtype, max_pages, page_param, start_page, offset_param, limit_param, page_size, next_cursor_path, cursor_param.
source.responsemode, raw_column, records_path, format, encoding.
source.incrementalwatermark_column, initial_value, JDBC predicate, REST watermark_param/watermark_header/watermark_body_field.
source.limitstimeout_seconds, retry_attempts, retry_backoff_seconds, rate_limit_per_minute, max_pages, max_page_bytes, max_total_bytes.

Selection, transform and filtering

FieldTypeDefaultUse
select_columnsstring | list[string][]Keep only selected source columns.
column_mappingobject[string,string]Rename source columns before framework technical columns are added.
filter_expressionstring | nullnullSpark SQL filter. Do not combine with full snapshot soft delete.
watermark_columnsstring | list[string][]Incremental filter/state columns.
shapeobject | nullnullDirect shape namespace. Prefer transform.shape.
transform.shapeobject | nullnullDeclarative parse, flatten, array and projection operations. See Transformations.
transform.castobject[string,string]Cast existing columns to Spark SQL types after shape.
transform.standardizeobjectTrim, case-normalize, whitespace-normalize and nullify empty strings on declared columns.
transform.deriveobject[string,string]Create or replace columns with Spark SQL expressions.
transform.deduplicate.keysstring | list[string]required when deduplicate existsDeduplication keys.
transform.deduplicate.order_bystring | list[object]required when deduplicate existsDeterministic order. Prefer structured items with column, direction and optional nulls.

Write and optimization

FieldTypeDefaultUse
merge_keysstring | list[string][]Keys for upsert, historical and snapshot modes.
hash_strategyexplicit | all_columns_exceptexplicitHash input strategy for hash_diff_upsert. Use explicit with hash_keys for governed tables; use all_columns_except for wide tables.
hash_keysstring | list[string][]Content columns for hash_diff_upsert when hash_strategy: explicit. These are not row identity keys.
hash_exclude_columnsstring | list[string][]Additional columns excluded from row hash. ContractForge-generated columns are excluded automatically.
custom_keysobjectMode/custom writer key groups.
dedup_order_exprstring | nullnullDeterministic ordering for source/target deduplication.
partition_columnstring | nullnullTarget partition column.
partition_valuestring | nullnullSingle partition value; requires partition_column.
merge_strategyenumdeltadelta, delta_by_partition or replace_partitions.
merge_partition_columnstring | nullnullPartition column for delta_by_partition.
replace_partitions_source_completebooleanfalseEvidence required for partition replacement.
extensions.databricks.cluster_columnsstring | list[string][]Databricks clustering columns.
extensions.databricks.partition_columnsstring | list[string][]Databricks target partition columns.
extensions.databricks.delta_propertiesobject[string,string]Delta table properties.

Schema, quality and runtime

FieldTypeDefaultUse
schema_policypermissive | additive_only | strictpermissiveTarget schema handling.
allow_type_wideningbooleanfalseAllow supported widening changes.
quality_rules.required_columnsstring | list[string][]Required schema columns.
quality_rules.not_nullstring | list[string][]Row-level not-null checks.
quality_rules.unique_keystring | list[string][]Set-level uniqueness check.
quality_rules.accepted_valuesobject[string,list]Allowed values by column.
quality_rules.min_rowsintegerunsetMinimum row count.
quality_rules.max_null_ratioobject[string,number]Maximum null ratio by column.
quality_rules.expressionslist[object][]SQL boolean rules with name, expression, severity and optional message.
quality_rules.customobjectRegistered custom quality rules.
on_quality_failfail | warn | quarantinefailGlobal legacy action for built-in quality failures.
scd2_change_columnsstring | list[string][]Historical comparison columns.
scd2_effective_from_columnstring | nullnullHistorical effective-from source column.
fix_encoding, encoding, encoding_columnsboolean/string/listfalse/Windows-1252/[]Optional encoding repair.
dry_runbooleanfalseValidate without committing writes/control side effects.
explain_mode, explain_formatboolean/enumfalse/formattedCapture Spark explain plans.
openlineage_enabled, openlineage_namespace, openlineage_producerboolean/string/stringfalse/null/contractforgeOpenLineage event metadata.
use_cachebooleantrueAllow framework caching.
lock_enabledbooleanfalseAcquire framework lock.
idempotency_key, idempotency_policystring/enumnull/always_runIdempotent execution policy.
retry_attempts, retry_backoff_secondsinteger | nullglobal configRetry behavior override.
parent_run_id, run_group_idstring | nullnullParent/group IDs for stream or window child runs.

Execution window and catchup

FieldTypeDefaultUse
execution.window.columnstringrequiredColumn filtered by each explicit window.
execution.window.windows[].start, end, labelstring/string/stringrequired/required/nullExplicit window boundaries and label.
execution.window.start, end, everystringunsetGenerated windows such as every 1 day.
execution.window.stop_on_failurebooleantrueStop later windows after a failure.
execution.catchup.enabledbooleanfalseGenerate catchup windows from saved state/watermark.
execution.catchup.column, start, end, every, stop_on_failuremixednull/null/null/null/trueCatchup window generation settings.