CLI

contractforge validate path/to/table.ingestion.yaml
contractforge validate-project contracts/
contractforge validate-bundle contracts/silver/s_customers
contractforge connectors list
contractforge connectors show rest_api jdbc s3
contractforge connectors doctor rest_api jdbc s3
contractforge templates list
contractforge templates wizard
contractforge maintenance ctrl-retention --catalog main --ctrl-schema ops
contractforge maintenance cost-report --catalog main --ctrl-schema ops
CommandUseTypical stage
validateValidates one ingestion contract file.Pull request and local authoring.
validate-projectWalks a contracts directory and validates all supported contract files.CI for a repository of contracts.
validate-bundleLoads split contract files for the same table and validates cross-file consistency.CI before deployment.
connectors listLists built-in and registered source connectors.Discovery and onboarding.
connectors showDisplays connector-specific contract shape and runtime notes.Authoring a source contract.
connectors doctorPerforms static capability diagnostics without opening network connections.Runtime preparation and review.
templates listLists packaged contract templates.Starting new tables.
templates wizardGenerates a contract scaffold from guided prompts.Onboarding and standardization.
maintenance ctrl-retentionApplies control-table retention.Scheduled operations.
maintenance cost-reportBuilds an estimated runtime cost report from control-table durations and user-provided rates.FinOps reporting.

Python API

FunctionUseNotes
ingest()Direct Python invocation with kwargs, connector dictionaries or DataFrame source.Notebook-friendly API. Builds an immutable plan and executes it.
ingest_plan()Execute a prepared IngestionPlan.Use when the application constructs or validates plans before execution.
build_plan_from_kwargs()Normalize kwargs into IngestionPlan.Useful for tools that need validation without execution.
load_contract_bundle()Load split YAML contracts.Reads *.ingestion.yaml, *.annotations.yaml, *.operations.yaml and *.access.yaml.
ingest_bundle()Execute an ingestion bundle with governance context.Recommended entry point for contract repositories.
apply_annotations_bundle()Apply annotations without re-running ingestion.Useful when metadata changes independently from data movement.
apply_access_bundle()Apply access contract through the dedicated governance path.Use from a principal with security permissions.
governance_check()Validate governance contracts and report drift/status.Use in CI or privileged deployment workflows.
register_source_resolver()Add custom source connectors.Use for organization-specific source systems.
register_write_mode()Add custom write modes when built-in modes are not enough.Keep write-mode extensions rare and tested.
register_quality_rule()Add custom quality rules.Use after built-in expressions are not enough.

Runtime options

OptionDefaultUse
raise_on_failuretrueRaise ContractForgeExecutionError after failed runs are logged.
dry_runfalseValidate contract, source shape and plan without writing target/control side effects.
explain_modefalseCapture Spark explain plans. Prefer in development or CI because it can add overhead.
openlineage_enabledconfig-dependentPersist OpenLineage-compatible events for forwarding or audit.

Common contract fields

FieldTypeMeaning
sourceobject/DataFrame/stringConnector, table, SQL, path, API or DataFrame source definition.
targetobjectPhysical catalog/schema/table. target.schema can differ from logical layer.
catalog, target_schema, target_tablestringPython-friendly equivalents of the target block.
layerstringLogical classification for presets, observability and conventions. It no longer has to match the physical schema.
modeenumWrite semantics: append, overwrite, upsert, hash diff, SCD2 or snapshot.
select_columnslistKeep only selected source columns before transformations.
column_mappingmapRename source columns before technical columns are added.
transformobjectStructural normalization and deduplication before schema/quality/write.
filter_expressionstringSpark SQL filter applied to the source. Do not use with complete snapshot soft delete.
watermark_columnslistIncremental filter columns persisted in state.
merge_keys, hash_keyslistBusiness keys used by MERGE and hash-diff modes.
dedup_order_exprstringSQL ORDER BY fragment used to choose the latest row per key.
cluster_columns, zorder_columnslistPhysical optimization hints for Databricks/Delta layout.
delta_propertiesmapDelta table properties, such as CDF and retention settings.
schema_policyenumControls target schema handling and additive evolution.
allow_type_wideningbooleanAllows safe type widening when schema policy permits it.
quality_rulesobjectRules with fail, warn, quarantine or abort semantics.
on_quality_failenumLegacy/global quality behavior when rule-level severity is not enough.
executionobjectBackfill, explicit windows and catchup configuration.
idempotency_key, idempotency_policystringDuplicate-run handling.
retry_attempts, retry_backoff_secondsintegerPlan-level retry policy for transient retryable failures.
annotations, operations, accessobjectInline governance contracts for Python-driven usage.

Quality rule fields

FieldUseQuarantine behavior
required_columnsEnsure fields exist in the source schema.Set-level rule; fails/aborts rather than quarantining individual rows.
not_nullReject nulls in required business fields.Row-level and quarantineable.
unique_keyEnsure a key is unique in the prepared source.Set-level rule; duplicate ambiguity usually aborts writes.
accepted_valuesRestrict categorical values.Row-level and quarantineable.
min_rowsReject empty or unexpectedly small source batches.Set-level rule.
max_null_ratioReject columns whose null ratio exceeds a threshold.Aggregate rule; not a row-isolation rule.
expressionsNamed Spark SQL boolean rules with severity and message.Can warn, quarantine or abort depending on severity.
customCustom registered quality rule payloads.Defined by the registered rule implementation.

Error handling

from contractforge import ContractForgeExecutionError, ingest_bundle

try:
    result = ingest_bundle("contracts/silver/s_customers")
except ContractForgeExecutionError as exc:
    # The failed run is already recorded in ctrl tables.
    print(exc.result["run_id"])
    print(exc.result["error_message"])
    raise