1. Install the runtime package

pip install contractforge

On Databricks, install the released wheel on the job, cluster or notebook environment. Spark and Delta are provided by the runtime; local development can install the optional Spark extra.

pip install "contractforge[spark]"
Databricks recommendation

Pin a released wheel version in jobs. Serverless can install from PyPI or workspace files when allowed by the environment; classic clusters can also use cluster libraries.

2. Create a minimal YAML contract

source:
  type: connector
  connector: table
  table: main.raw.orders

target:
  catalog: main
  schema: bronze
  table: b_orders

layer: bronze
mode: scd0_append
source_system: crm
quality_rules:
  not_null: [order_id]
  expressions:
    - name: positive_amount
      expression: amount >= 0
      severity: abort
schema_policy: additive_only

3. Validate before running

contractforge validate contracts/bronze/b_orders.ingestion.yaml

Validation catches missing mode requirements, invalid enum values, malformed quality expressions and unsupported connector combinations before Spark work starts.

4. Execute from a generic notebook

from contractforge import ingest_bundle, load_contract_bundle

bundle = load_contract_bundle("/Workspace/Shared/contracts/bronze/b_orders")
result = ingest_bundle(bundle)

display(result)

Execution APIs raise ContractForgeExecutionError by default when the final status is FAILED or ABORTED. The run is still written to control tables first. Use raise_on_failure=False only when a test or diagnostic notebook needs to inspect the failed payload directly.

5. Inspect evidence

SELECT
  status,
  target_table,
  mode,
  rows_read,
  rows_written,
  quality_status,
  framework_version,
  error_message
FROM main.ops.ctrl_ingestion_runs
ORDER BY started_at_utc DESC
LIMIT 20;
What to check first

Look at run status, row counts, quality status, source metadata, framework version and error summaries. Those fields are the first layer of operational diagnosis.

6. Evolve the first contract

Incremental

Add watermarking

Use watermark_columns for append or upsert patterns where the source exposes a monotonic update timestamp.

Structure

Add transform.shape

Parse raw JSON payloads, flatten structs, zip parallel arrays and project target columns without custom notebook code.

Governance

Split annotations

Add descriptions, tags and PII classification after the data table exists and the schema is stable.

Operations

Add ownership

Record owner, support group, criticality, SLA and runbook so dashboards can identify the right response path.