Stable table names
The default prefix is ctrl_ingestion_*. Names can be changed through FrameworkConfig, but dashboards should preferably resolve names from the configured control schema.
Operations
ContractForge writes operational evidence to Delta control tables so teams can diagnose failures, track freshness, audit governance, inspect streaming batches, estimate runtime cost and verify framework versions without depending on notebook output.
Control tables are created by ensure_ctrl_tables() in the configured control catalog and schema. They are additive: framework upgrades add known columns when missing and do not remove existing columns automatically.
The default prefix is ctrl_ingestion_*. Names can be changed through FrameworkConfig, but dashboards should preferably resolve names from the configured control schema.
Most operational tables include framework_version and ctrl_schema_version. Use them to detect mixed wheel versions and outdated metadata layouts.
ctrl_ingestion_runs and ctrl_ingestion_errors are partitioned by date. Use date filters in dashboards and retention jobs.
Connector options, source metadata, stage durations, schema changes and operation metrics are stored as JSON strings to keep the table schema stable.
| Table | Purpose | Primary operational questions |
|---|---|---|
ctrl_ingestion_runs | One batch execution row per run, including source, target, status, metrics, runtime, schema and governance summaries. | What ran? Did it succeed? How many rows moved? Which connector, mode, runtime and version were used? |
ctrl_ingestion_state | Latest state per target table, including watermark, last success, last error and last Delta version. | What is the current watermark? When did the table last succeed? Did the state advance? |
ctrl_ingestion_quality | Quality rule results for failed, warned, quarantined or aborting rules. | Which rules failed? How many records failed? Was the run aborted or quarantined? |
ctrl_ingestion_quarantine | Quarantined row payloads with rule and reason. | Which records were isolated and why? |
ctrl_ingestion_errors | Full failure details and stack traces. | What is the real exception behind a short run error? |
ctrl_ingestion_schema_changes | Detected and applied schema evolution events. | Which columns were added? Which type changes were detected or rejected? |
ctrl_ingestion_streams | Outer streaming or Auto Loader execution row with aggregated micro-batch metrics. | How many batches ran? Did the aggregate match child run totals? |
ctrl_ingestion_locks | Best-effort lock state per target. | Is a target currently locked? Who owns the lock? Has it expired? |
ctrl_ingestion_explain | Captured Spark explain plan when enabled. | What Spark plan was used in a dry run, CI check or diagnostic run? |
ctrl_ingestion_lineage | OpenLineage-compatible event payloads. | Which source produced which target, and what lineage event was emitted? |
ctrl_ingestion_metadata | Control-plane framework metadata. | Which framework version and control schema version are installed? |
ctrl_ingestion_annotations | Catalog comments, tags, aliases, PII and deprecation annotation actions. | Which metadata was applied, skipped, warned or failed? |
ctrl_ingestion_operations | Declared ownership, SLA, criticality, alerting and support metadata snapshots. | Who owns the table? What is the expected freshness? Which runbook applies? |
ctrl_ingestion_access | Access contract validation/application evidence for grants, row filters and masks. | Which access changes were applied or deferred? Was drift detected? |
Use this section as the operational data dictionary for dashboards, alert rules and support queries. JSON fields are stored as strings; parse them with from_json, get_json_object or SQL JSON functions when building curated operational views.
ctrl_ingestion_runs — batch execution ledgerCentral fact table for batch ingestion. Join it to errors, quality, schema changes, annotations and access by run_id.
| Field | Meaning |
|---|---|
run_id | Unique execution identifier for this batch run. |
run_ts_utc | Run timestamp used as the logical event time for the row. |
run_date | Date partition derived from the run timestamp. |
notebook_name | Notebook or caller name when available. |
layer | Logical layer declared by the contract, such as bronze, silver, gold, stage or a custom value. |
source_table | Legacy or resolved source table identifier when the source is table-like. |
source_type | Source category, for example table, sql, path, connector or stream. |
source_connector | Connector name, such as jdbc, s3, azure_blob, rest_api, http_file or autoloader. |
source_name | Logical source name from the source contract. |
source_provider | Provider or platform for the source, when declared. |
source_format | Input format such as csv, json, parquet, avro, delta, xml, text or jdbc. |
source_path | Path or URL used by file/object-storage connectors. |
source_options_json | Redacted connector options. |
source_read_json | Read configuration, such as schema, glob filters, recursion, partitioning or connector-specific read options. |
source_request_json | Request configuration for HTTP/REST sources. |
source_auth_json | Redacted authentication configuration. |
source_pagination_json | Pagination configuration for REST-style sources. |
source_response_json | Response extraction configuration for REST-style sources. |
source_incremental_json | Incremental source settings, when present. |
source_limits_json | Connector safety limits such as timeout, page count or payload size limits. |
source_capabilities_json | Declared or inferred connector capabilities. |
source_metrics_json | Connector-side read metrics, when available. |
target_table | Fully qualified target table. |
mode | Write mode, such as scd0_append, scd0_overwrite, scd1_upsert, scd1_hash_diff, scd2 or snapshot_soft_delete. |
status | Final run status: SUCCESS, FAILED, ABORTED or SKIPPED. |
rows_read | Logical number of source rows processed by the framework. |
rows_written | Logical number of rows written to the target. |
rows_inserted | Logical inserted row count when the mode can report it. |
rows_updated | Logical updated row count when the mode can report it. |
rows_deleted | Logical deleted or soft-deleted row count when applicable. |
rows_quarantined | Rows written to quarantine instead of the target. |
watermark_column | Watermark column used for incremental reads. |
watermark_previous | Previous committed watermark value. |
watermark_current | New committed watermark value for successful runs. |
started_at_utc | Run start timestamp. |
finished_at_utc | Run finish timestamp. |
duration_seconds | Total elapsed execution time in seconds. |
quality_status | Quality summary: NOT_CONFIGURED, PASSED, WARNED, QUARANTINED or FAILED. |
schema_policy | Schema policy used by the run. |
schema_changes_json | Structured schema-change summary returned by schema validation/evolution. |
stage_durations_json | Per-stage timings such as read, quality, write, state and lineage. |
contract_description | Contract-level description snapshot. |
contract_owner | Contract-level owner snapshot. |
contract_domain | Business or data domain snapshot. |
contract_tags_json | Contract tags serialized as JSON. |
contract_sla | Contract SLA value, if declared in the ingestion contract. |
runtime_parameters_json | Runtime parameters snapshot. |
operation_metrics_json | Delta and logical write metrics serialized as JSON. |
write_started_at_utc | Write stage start timestamp. |
write_finished_at_utc | Write stage finish timestamp. |
delta_version_before | Delta table version before the write, when known. |
delta_version_after | Delta table version after the write, when known. |
write_committed | Whether the write reached the target commit path. |
error_message | Short diagnostic message. Use ctrl_ingestion_errors for full stack traces. |
parent_run_id | Parent stream run identifier for micro-batch child runs. |
run_group_id | Optional group identifier for related runs. |
master_job_id | Databricks job id when available. |
master_run_id | Databricks run id when available. |
idempotency_key | User-provided idempotency key. |
idempotency_policy | Policy used for duplicate execution handling. |
skip_reason | Reason for SKIPPED runs. |
skipped_by_run_id | Prior run that caused the skip. |
metrics_source | Metric origin: logical, delta or mixed. |
framework_version | ContractForge package version used by the run. |
ctrl_schema_version | Control schema version used by the run. |
runtime_type | Runtime classification, such as classic or serverless. |
spark_version | Spark runtime version. |
python_version | Python runtime version. |
annotations_status | Annotation application summary for the run. |
annotations_result_json | Detailed annotation result payload. |
ownership_json | Ownership metadata snapshot from the operations contract. |
operations_json | Operations contract snapshot. |
ctrl_ingestion_state — latest target stateCurrent operational state by target table. Watermark and Delta version updates happen only when the execution commits successfully.
| Field | Meaning |
|---|---|
target_table | Fully qualified target table and logical key for the state row. |
watermark_column | Watermark column currently tracked for the target. |
watermark_value | Last committed watermark value. |
last_success_at_utc | Timestamp of the latest successful run. |
last_run_id | Most recent run id touching this target. |
last_status | Status of the most recent run touching this target. |
last_rows_written | Rows written by the most recent run. |
last_error_message | Short error from the most recent failed run. |
parent_run_id | Parent stream run when the latest state update came from a micro-batch. |
run_group_id | Group id for related runs. |
master_job_id | Databricks job id when available. |
master_run_id | Databricks run id when available. |
last_delta_version | Last known target Delta version after successful write. |
last_write_completed_at_utc | Timestamp when the last write finished. |
last_watermark_candidate | Candidate watermark observed during the last run. |
last_updated_at_utc | Timestamp when this state row was updated. |
| Table | Field | Meaning |
|---|---|---|
ctrl_ingestion_quality | run_id | Run that produced the quality result. |
ctrl_ingestion_quality | target_table | Target table evaluated by the rule. |
ctrl_ingestion_quality | rule_name | Rule identifier, built-in rule name or expression name. |
ctrl_ingestion_quality | status | Rule outcome. |
ctrl_ingestion_quality | severity | Configured action: warn, quarantine or abort. |
ctrl_ingestion_quality | failed_count | Number of rows or checks that failed. |
ctrl_ingestion_quality | checked_at_utc | Timestamp of quality evaluation. |
ctrl_ingestion_quality | message | Rule message, generated or custom. |
ctrl_ingestion_quality | details_json | Additional rule details. |
ctrl_ingestion_quarantine | run_id | Run that quarantined the row. |
ctrl_ingestion_quarantine | target_table | Target table the row was intended for. |
ctrl_ingestion_quarantine | rule_name | Rule that caused quarantine. |
ctrl_ingestion_quarantine | error_reason | Human-readable quarantine reason. |
ctrl_ingestion_quarantine | record_payload | Quarantined record serialized as JSON. |
ctrl_ingestion_quarantine | quarantined_at_utc | Quarantine timestamp. |
ctrl_ingestion_schema_changes | run_id | Run that detected the schema change. |
ctrl_ingestion_schema_changes | change_ts_utc | Change detection timestamp. |
ctrl_ingestion_schema_changes | target_table | Target table affected by the change. |
ctrl_ingestion_schema_changes | change_type | Change class, such as add_column or type_change. |
ctrl_ingestion_schema_changes | column_name | Column affected by the change. |
ctrl_ingestion_schema_changes | source_type | Source data type observed. |
ctrl_ingestion_schema_changes | target_type | Target data type observed. |
ctrl_ingestion_schema_changes | applied | Whether the change was applied to the Delta table. |
ctrl_ingestion_schema_changes | details_json | Full change payload. |
ctrl_ingestion_schema_changes | framework_version | Framework version that logged the change. |
ctrl_ingestion_schema_changes | ctrl_schema_version | Control schema version. |
| Table | Fields | Meaning |
|---|---|---|
ctrl_ingestion_errors | run_id, error_ts_utc, error_date | Failure identity and date partition. |
ctrl_ingestion_errors | target_table, source_table, mode, status | Context for the failed execution. |
ctrl_ingestion_errors | error_type, error_message, stack_trace | Exception type, short message and full traceback. |
ctrl_ingestion_errors | framework_version, ctrl_schema_version, runtime_type, spark_version, python_version | Runtime and version context. |
ctrl_ingestion_streams | stream_run_id, parent_run_id, run_group_id | Outer stream identity and grouping. |
ctrl_ingestion_streams | idempotency_key, idempotency_policy, skip_reason, skipped_by_stream_run_id | Idempotency behavior for streaming executions. |
ctrl_ingestion_streams | target_table, target_catalog, target_layer, notebook_name | Target and caller metadata. |
ctrl_ingestion_streams | source_type, source_path, trigger, checkpoint_location | Streaming source and checkpoint configuration. |
ctrl_ingestion_streams | status, started_at_utc, ended_at_utc, duration_seconds | Lifecycle timing and final status. |
ctrl_ingestion_streams | batches_processed, total_rows_read, total_rows_written, total_rows_quarantined | Aggregated child micro-batch metrics. |
ctrl_ingestion_streams | framework_version, ctrl_schema_version, runtime_type, spark_version, python_version, error_message, master_job_id, master_run_id | Runtime, version, failure and job context. |
ctrl_ingestion_explain | run_id, target_table, source_table, mode, explain_format, plan_text, captured_at_utc | Captured Spark plan for diagnostic executions. |
ctrl_ingestion_lineage | run_id, event_time_utc, event_type, target_table, source_table, namespace, producer, event_json | OpenLineage-compatible event evidence. |
ctrl_ingestion_metadata | component, framework_version, ctrl_schema_version, updated_at_utc | Control-plane metadata row for installed framework state. |
| Table | Fields | Meaning |
|---|---|---|
ctrl_ingestion_annotations | run_id, target_table | Run and target for the metadata action. |
ctrl_ingestion_annotations | annotation_scope, annotation_type, column_name, key | Annotation scope and logical field, such as table tag, column comment, PII or deprecation. |
ctrl_ingestion_annotations | previous_value, value, status, error_message, applied_sql | Before/after evidence, outcome, diagnostic and SQL action. |
ctrl_ingestion_annotations | annotation_ts_utc, annotation_date, framework_version, ctrl_schema_version | Timing and version context. |
ctrl_ingestion_operations | run_id, target_table | Run and target for the operations metadata snapshot. |
ctrl_ingestion_operations | criticality, expected_frequency, freshness_sla_minutes | SLA and severity metadata used by dashboards and alert routing. |
ctrl_ingestion_operations | alert_on_failure, alert_on_quality_fail, runbook_url | External alerting and response metadata. ContractForge records this; alerting systems consume it. |
ctrl_ingestion_operations | ownership_json, owners_json, groups_json, tags_json | Ownership, support groups and custom tags serialized as JSON. |
ctrl_ingestion_operations | status, recorded_at_utc, framework_version, ctrl_schema_version | Recording outcome, timestamp and version context. |
ctrl_ingestion_access | access_run_id, run_id, target_table | Access workflow identity and target. |
ctrl_ingestion_access | access_type, principal, privilege | Grant, row filter, column mask or validation action and affected principal/privilege. |
ctrl_ingestion_access | column_name, function_name, object_name | Column, function or object affected by access governance. |
ctrl_ingestion_access | status, error_message, applied_sql | Outcome, diagnostic and SQL command. |
ctrl_ingestion_access | previous_value, new_value, mode, drift_policy, revoke_unmanaged | Drift and reconciliation context. |
ctrl_ingestion_access | access_ts_utc, access_date, framework_version, ctrl_schema_version | Timing and version context. |
ctrl_ingestion_locks — best-effort coordinationLocks reduce predictable collisions between cooperative ContractForge writers. They do not replace Delta optimistic concurrency.
| Field | Meaning |
|---|---|
target_table | Target table protected by the lock. |
run_id | Run currently owning or previously owning the lock. |
owner | Logical owner or caller identity. |
acquired_at_utc | Lock acquisition timestamp. |
expires_at_utc | TTL expiration timestamp. |
ttl_minutes | Configured lock TTL in minutes. |
released_at_utc | Release timestamp, when released cleanly. |
status | Lock status such as ACTIVE, RELEASED or EXPIRED. |
Replace main.ops with the catalog and schema used for control tables.
SELECT
status,
target_table,
mode,
source_connector,
runtime_type,
rows_read,
rows_written,
rows_quarantined,
duration_seconds,
error_message,
run_id
FROM main.ops.ctrl_ingestion_runs
WHERE run_date >= current_date() - INTERVAL 7 DAYS
ORDER BY started_at_utc DESC
LIMIT 100;
SELECT
r.started_at_utc,
r.target_table,
r.source_connector,
r.mode,
r.runtime_type,
r.error_message AS short_error,
e.error_type,
e.stack_trace
FROM main.ops.ctrl_ingestion_runs r
LEFT JOIN main.ops.ctrl_ingestion_errors e
ON r.run_id = e.run_id
WHERE r.status = 'FAILED'
AND r.run_date >= current_date() - INTERVAL 7 DAYS
ORDER BY r.started_at_utc DESC;
SELECT
target_table,
watermark_column,
watermark_value,
last_success_at_utc,
last_status,
last_rows_written,
last_delta_version,
current_timestamp() - last_success_at_utc AS time_since_success
FROM main.ops.ctrl_ingestion_state
ORDER BY last_success_at_utc ASC NULLS FIRST;
SELECT
q.target_table,
q.rule_name,
q.severity,
q.status,
q.failed_count,
q.message,
r.run_id,
r.started_at_utc
FROM main.ops.ctrl_ingestion_quality q
JOIN main.ops.ctrl_ingestion_runs r
ON q.run_id = r.run_id
WHERE r.run_date >= current_date() - INTERVAL 14 DAYS
AND q.failed_count > 0
ORDER BY r.started_at_utc DESC, q.failed_count DESC;
SELECT
target_table,
rule_name,
count(*) AS quarantined_records,
max(quarantined_at_utc) AS latest_quarantine
FROM main.ops.ctrl_ingestion_quarantine
GROUP BY target_table, rule_name
ORDER BY quarantined_records DESC;
SELECT
target_table,
change_type,
column_name,
source_type,
target_type,
applied,
change_ts_utc,
details_json
FROM main.ops.ctrl_ingestion_schema_changes
ORDER BY change_ts_utc DESC;
WITH child AS (
SELECT
parent_run_id AS stream_run_id,
count(*) AS child_batches,
sum(coalesce(rows_read, 0)) AS child_rows_read,
sum(coalesce(rows_written, 0)) AS child_rows_written,
sum(coalesce(rows_quarantined, 0)) AS child_rows_quarantined
FROM main.ops.ctrl_ingestion_runs
WHERE parent_run_id IS NOT NULL
GROUP BY parent_run_id
)
SELECT
s.stream_run_id,
s.status,
s.batches_processed,
c.child_batches,
s.total_rows_read,
c.child_rows_read,
s.total_rows_written,
c.child_rows_written,
s.total_rows_quarantined,
c.child_rows_quarantined
FROM main.ops.ctrl_ingestion_streams s
LEFT JOIN child c
ON s.stream_run_id = c.stream_run_id
ORDER BY s.started_at_utc DESC;
SELECT
run_id,
target_table,
mode,
duration_seconds,
get_json_object(stage_durations_json, '$.read') AS read_seconds,
get_json_object(stage_durations_json, '$.quality') AS quality_seconds,
get_json_object(stage_durations_json, '$.write') AS write_seconds,
get_json_object(stage_durations_json, '$.state') AS state_seconds,
get_json_object(stage_durations_json, '$.lineage') AS lineage_seconds
FROM main.ops.ctrl_ingestion_runs
WHERE run_date >= current_date() - INTERVAL 7 DAYS
ORDER BY duration_seconds DESC
LIMIT 50;
WITH rates AS (
SELECT 2.0 AS dbu_per_hour, 0.40 AS currency_per_dbu
)
SELECT
r.target_table,
r.mode,
r.source_connector,
r.rows_read,
r.rows_written,
r.duration_seconds,
CASE WHEN r.duration_seconds > 0 THEN r.rows_written / r.duration_seconds END AS rows_written_per_second,
(r.duration_seconds / 3600.0) * rates.dbu_per_hour AS estimated_dbu,
(r.duration_seconds / 3600.0) * rates.dbu_per_hour * rates.currency_per_dbu AS estimated_cost
FROM main.ops.ctrl_ingestion_runs r
CROSS JOIN rates
WHERE r.status = 'SUCCESS'
AND r.run_date >= current_date() - INTERVAL 30 DAYS
ORDER BY estimated_cost DESC;
SELECT
target_table,
annotation_scope,
annotation_type,
column_name,
key,
status,
error_message,
annotation_ts_utc
FROM main.ops.ctrl_ingestion_annotations
ORDER BY annotation_ts_utc DESC;
SELECT
target_table,
access_type,
principal,
privilege,
column_name,
function_name,
mode,
drift_policy,
status,
error_message,
access_ts_utc
FROM main.ops.ctrl_ingestion_access
WHERE status <> 'APPLIED'
ORDER BY access_ts_utc DESC;
SELECT
target_table,
run_id,
owner,
status,
acquired_at_utc,
expires_at_utc,
released_at_utc,
CASE
WHEN status = 'ACTIVE' AND expires_at_utc < current_timestamp() THEN 'EXPIRED_BUT_NOT_RELEASED'
ELSE status
END AS effective_status
FROM main.ops.ctrl_ingestion_locks
ORDER BY acquired_at_utc DESC;
SELECT
framework_version,
ctrl_schema_version,
runtime_type,
spark_version,
python_version,
count(*) AS runs
FROM main.ops.ctrl_ingestion_runs
WHERE run_date >= current_date() - INTERVAL 30 DAYS
GROUP BY framework_version, ctrl_schema_version, runtime_type, spark_version, python_version
ORDER BY runs DESC;
Base it on ctrl_ingestion_runs, grouped by status, target_table, mode, source_connector and runtime_type. Include recent error messages and links to run ids.
Use ctrl_ingestion_state.last_success_at_utc plus ctrl_ingestion_operations.freshness_sla_minutes. Alert outside ContractForge; the library records the metadata and evidence.
Use ctrl_ingestion_quality for rule-level summary and ctrl_ingestion_quarantine for record-level remediation. Abort rules and quarantine rules require different response paths.
Start with ctrl_ingestion_runs.error_message, then join ctrl_ingestion_errors for full trace, and check framework_version, runtime_type and connector metadata before changing code.
Retention and cost reports are operational utilities. Cost reports estimate runtime cost from user-provided rates; they are not a substitute for cloud billing exports.
contractforge maintenance ctrl-retention --catalog main --ctrl-schema ops --retention-days 30
contractforge maintenance cost-report --catalog main --ctrl-schema ops --dbu-per-hour 2 --currency-per-dbu 0.40
Apply retention first to high-volume date-partitioned tables such as runs and errors. Keep quarantine and schema-change history according to the organization's audit and data-quality remediation policy.