Control schema model

Control tables are created by ensure_ctrl_tables() in the configured control catalog and schema. They are additive: framework upgrades add known columns when missing and do not remove existing columns automatically.

Default names

Stable table names

The default prefix is ctrl_ingestion_*. Names can be changed through FrameworkConfig, but dashboards should preferably resolve names from the configured control schema.

Versioning

Framework and control schema

Most operational tables include framework_version and ctrl_schema_version. Use them to detect mixed wheel versions and outdated metadata layouts.

Partitions

Date-partitioned high-volume tables

ctrl_ingestion_runs and ctrl_ingestion_errors are partitioned by date. Use date filters in dashboards and retention jobs.

JSON payloads

Flexible operational details

Connector options, source metadata, stage durations, schema changes and operation metrics are stored as JSON strings to keep the table schema stable.

Table map

TablePurposePrimary operational questions
ctrl_ingestion_runsOne batch execution row per run, including source, target, status, metrics, runtime, schema and governance summaries.What ran? Did it succeed? How many rows moved? Which connector, mode, runtime and version were used?
ctrl_ingestion_stateLatest state per target table, including watermark, last success, last error and last Delta version.What is the current watermark? When did the table last succeed? Did the state advance?
ctrl_ingestion_qualityQuality rule results for failed, warned, quarantined or aborting rules.Which rules failed? How many records failed? Was the run aborted or quarantined?
ctrl_ingestion_quarantineQuarantined row payloads with rule and reason.Which records were isolated and why?
ctrl_ingestion_errorsFull failure details and stack traces.What is the real exception behind a short run error?
ctrl_ingestion_schema_changesDetected and applied schema evolution events.Which columns were added? Which type changes were detected or rejected?
ctrl_ingestion_streamsOuter streaming or Auto Loader execution row with aggregated micro-batch metrics.How many batches ran? Did the aggregate match child run totals?
ctrl_ingestion_locksBest-effort lock state per target.Is a target currently locked? Who owns the lock? Has it expired?
ctrl_ingestion_explainCaptured Spark explain plan when enabled.What Spark plan was used in a dry run, CI check or diagnostic run?
ctrl_ingestion_lineageOpenLineage-compatible event payloads.Which source produced which target, and what lineage event was emitted?
ctrl_ingestion_metadataControl-plane framework metadata.Which framework version and control schema version are installed?
ctrl_ingestion_annotationsCatalog comments, tags, aliases, PII and deprecation annotation actions.Which metadata was applied, skipped, warned or failed?
ctrl_ingestion_operationsDeclared ownership, SLA, criticality, alerting and support metadata snapshots.Who owns the table? What is the expected freshness? Which runbook applies?
ctrl_ingestion_accessAccess contract validation/application evidence for grants, row filters and masks.Which access changes were applied or deferred? Was drift detected?

Field reference

Use this section as the operational data dictionary for dashboards, alert rules and support queries. JSON fields are stored as strings; parse them with from_json, get_json_object or SQL JSON functions when building curated operational views.

ctrl_ingestion_runs — batch execution ledger

Central fact table for batch ingestion. Join it to errors, quality, schema changes, annotations and access by run_id.

FieldMeaning
run_idUnique execution identifier for this batch run.
run_ts_utcRun timestamp used as the logical event time for the row.
run_dateDate partition derived from the run timestamp.
notebook_nameNotebook or caller name when available.
layerLogical layer declared by the contract, such as bronze, silver, gold, stage or a custom value.
source_tableLegacy or resolved source table identifier when the source is table-like.
source_typeSource category, for example table, sql, path, connector or stream.
source_connectorConnector name, such as jdbc, s3, azure_blob, rest_api, http_file or autoloader.
source_nameLogical source name from the source contract.
source_providerProvider or platform for the source, when declared.
source_formatInput format such as csv, json, parquet, avro, delta, xml, text or jdbc.
source_pathPath or URL used by file/object-storage connectors.
source_options_jsonRedacted connector options.
source_read_jsonRead configuration, such as schema, glob filters, recursion, partitioning or connector-specific read options.
source_request_jsonRequest configuration for HTTP/REST sources.
source_auth_jsonRedacted authentication configuration.
source_pagination_jsonPagination configuration for REST-style sources.
source_response_jsonResponse extraction configuration for REST-style sources.
source_incremental_jsonIncremental source settings, when present.
source_limits_jsonConnector safety limits such as timeout, page count or payload size limits.
source_capabilities_jsonDeclared or inferred connector capabilities.
source_metrics_jsonConnector-side read metrics, when available.
target_tableFully qualified target table.
modeWrite mode, such as scd0_append, scd0_overwrite, scd1_upsert, scd1_hash_diff, scd2 or snapshot_soft_delete.
statusFinal run status: SUCCESS, FAILED, ABORTED or SKIPPED.
rows_readLogical number of source rows processed by the framework.
rows_writtenLogical number of rows written to the target.
rows_insertedLogical inserted row count when the mode can report it.
rows_updatedLogical updated row count when the mode can report it.
rows_deletedLogical deleted or soft-deleted row count when applicable.
rows_quarantinedRows written to quarantine instead of the target.
watermark_columnWatermark column used for incremental reads.
watermark_previousPrevious committed watermark value.
watermark_currentNew committed watermark value for successful runs.
started_at_utcRun start timestamp.
finished_at_utcRun finish timestamp.
duration_secondsTotal elapsed execution time in seconds.
quality_statusQuality summary: NOT_CONFIGURED, PASSED, WARNED, QUARANTINED or FAILED.
schema_policySchema policy used by the run.
schema_changes_jsonStructured schema-change summary returned by schema validation/evolution.
stage_durations_jsonPer-stage timings such as read, quality, write, state and lineage.
contract_descriptionContract-level description snapshot.
contract_ownerContract-level owner snapshot.
contract_domainBusiness or data domain snapshot.
contract_tags_jsonContract tags serialized as JSON.
contract_slaContract SLA value, if declared in the ingestion contract.
runtime_parameters_jsonRuntime parameters snapshot.
operation_metrics_jsonDelta and logical write metrics serialized as JSON.
write_started_at_utcWrite stage start timestamp.
write_finished_at_utcWrite stage finish timestamp.
delta_version_beforeDelta table version before the write, when known.
delta_version_afterDelta table version after the write, when known.
write_committedWhether the write reached the target commit path.
error_messageShort diagnostic message. Use ctrl_ingestion_errors for full stack traces.
parent_run_idParent stream run identifier for micro-batch child runs.
run_group_idOptional group identifier for related runs.
master_job_idDatabricks job id when available.
master_run_idDatabricks run id when available.
idempotency_keyUser-provided idempotency key.
idempotency_policyPolicy used for duplicate execution handling.
skip_reasonReason for SKIPPED runs.
skipped_by_run_idPrior run that caused the skip.
metrics_sourceMetric origin: logical, delta or mixed.
framework_versionContractForge package version used by the run.
ctrl_schema_versionControl schema version used by the run.
runtime_typeRuntime classification, such as classic or serverless.
spark_versionSpark runtime version.
python_versionPython runtime version.
annotations_statusAnnotation application summary for the run.
annotations_result_jsonDetailed annotation result payload.
ownership_jsonOwnership metadata snapshot from the operations contract.
operations_jsonOperations contract snapshot.
ctrl_ingestion_state — latest target state

Current operational state by target table. Watermark and Delta version updates happen only when the execution commits successfully.

FieldMeaning
target_tableFully qualified target table and logical key for the state row.
watermark_columnWatermark column currently tracked for the target.
watermark_valueLast committed watermark value.
last_success_at_utcTimestamp of the latest successful run.
last_run_idMost recent run id touching this target.
last_statusStatus of the most recent run touching this target.
last_rows_writtenRows written by the most recent run.
last_error_messageShort error from the most recent failed run.
parent_run_idParent stream run when the latest state update came from a micro-batch.
run_group_idGroup id for related runs.
master_job_idDatabricks job id when available.
master_run_idDatabricks run id when available.
last_delta_versionLast known target Delta version after successful write.
last_write_completed_at_utcTimestamp when the last write finished.
last_watermark_candidateCandidate watermark observed during the last run.
last_updated_at_utcTimestamp when this state row was updated.
Quality, quarantine and schema tables
TableFieldMeaning
ctrl_ingestion_qualityrun_idRun that produced the quality result.
ctrl_ingestion_qualitytarget_tableTarget table evaluated by the rule.
ctrl_ingestion_qualityrule_nameRule identifier, built-in rule name or expression name.
ctrl_ingestion_qualitystatusRule outcome.
ctrl_ingestion_qualityseverityConfigured action: warn, quarantine or abort.
ctrl_ingestion_qualityfailed_countNumber of rows or checks that failed.
ctrl_ingestion_qualitychecked_at_utcTimestamp of quality evaluation.
ctrl_ingestion_qualitymessageRule message, generated or custom.
ctrl_ingestion_qualitydetails_jsonAdditional rule details.
ctrl_ingestion_quarantinerun_idRun that quarantined the row.
ctrl_ingestion_quarantinetarget_tableTarget table the row was intended for.
ctrl_ingestion_quarantinerule_nameRule that caused quarantine.
ctrl_ingestion_quarantineerror_reasonHuman-readable quarantine reason.
ctrl_ingestion_quarantinerecord_payloadQuarantined record serialized as JSON.
ctrl_ingestion_quarantinequarantined_at_utcQuarantine timestamp.
ctrl_ingestion_schema_changesrun_idRun that detected the schema change.
ctrl_ingestion_schema_changeschange_ts_utcChange detection timestamp.
ctrl_ingestion_schema_changestarget_tableTarget table affected by the change.
ctrl_ingestion_schema_changeschange_typeChange class, such as add_column or type_change.
ctrl_ingestion_schema_changescolumn_nameColumn affected by the change.
ctrl_ingestion_schema_changessource_typeSource data type observed.
ctrl_ingestion_schema_changestarget_typeTarget data type observed.
ctrl_ingestion_schema_changesappliedWhether the change was applied to the Delta table.
ctrl_ingestion_schema_changesdetails_jsonFull change payload.
ctrl_ingestion_schema_changesframework_versionFramework version that logged the change.
ctrl_ingestion_schema_changesctrl_schema_versionControl schema version.
Diagnostics, lineage and streaming tables
TableFieldsMeaning
ctrl_ingestion_errorsrun_id, error_ts_utc, error_dateFailure identity and date partition.
ctrl_ingestion_errorstarget_table, source_table, mode, statusContext for the failed execution.
ctrl_ingestion_errorserror_type, error_message, stack_traceException type, short message and full traceback.
ctrl_ingestion_errorsframework_version, ctrl_schema_version, runtime_type, spark_version, python_versionRuntime and version context.
ctrl_ingestion_streamsstream_run_id, parent_run_id, run_group_idOuter stream identity and grouping.
ctrl_ingestion_streamsidempotency_key, idempotency_policy, skip_reason, skipped_by_stream_run_idIdempotency behavior for streaming executions.
ctrl_ingestion_streamstarget_table, target_catalog, target_layer, notebook_nameTarget and caller metadata.
ctrl_ingestion_streamssource_type, source_path, trigger, checkpoint_locationStreaming source and checkpoint configuration.
ctrl_ingestion_streamsstatus, started_at_utc, ended_at_utc, duration_secondsLifecycle timing and final status.
ctrl_ingestion_streamsbatches_processed, total_rows_read, total_rows_written, total_rows_quarantinedAggregated child micro-batch metrics.
ctrl_ingestion_streamsframework_version, ctrl_schema_version, runtime_type, spark_version, python_version, error_message, master_job_id, master_run_idRuntime, version, failure and job context.
ctrl_ingestion_explainrun_id, target_table, source_table, mode, explain_format, plan_text, captured_at_utcCaptured Spark plan for diagnostic executions.
ctrl_ingestion_lineagerun_id, event_time_utc, event_type, target_table, source_table, namespace, producer, event_jsonOpenLineage-compatible event evidence.
ctrl_ingestion_metadatacomponent, framework_version, ctrl_schema_version, updated_at_utcControl-plane metadata row for installed framework state.
Governance and access tables
TableFieldsMeaning
ctrl_ingestion_annotationsrun_id, target_tableRun and target for the metadata action.
ctrl_ingestion_annotationsannotation_scope, annotation_type, column_name, keyAnnotation scope and logical field, such as table tag, column comment, PII or deprecation.
ctrl_ingestion_annotationsprevious_value, value, status, error_message, applied_sqlBefore/after evidence, outcome, diagnostic and SQL action.
ctrl_ingestion_annotationsannotation_ts_utc, annotation_date, framework_version, ctrl_schema_versionTiming and version context.
ctrl_ingestion_operationsrun_id, target_tableRun and target for the operations metadata snapshot.
ctrl_ingestion_operationscriticality, expected_frequency, freshness_sla_minutesSLA and severity metadata used by dashboards and alert routing.
ctrl_ingestion_operationsalert_on_failure, alert_on_quality_fail, runbook_urlExternal alerting and response metadata. ContractForge records this; alerting systems consume it.
ctrl_ingestion_operationsownership_json, owners_json, groups_json, tags_jsonOwnership, support groups and custom tags serialized as JSON.
ctrl_ingestion_operationsstatus, recorded_at_utc, framework_version, ctrl_schema_versionRecording outcome, timestamp and version context.
ctrl_ingestion_accessaccess_run_id, run_id, target_tableAccess workflow identity and target.
ctrl_ingestion_accessaccess_type, principal, privilegeGrant, row filter, column mask or validation action and affected principal/privilege.
ctrl_ingestion_accesscolumn_name, function_name, object_nameColumn, function or object affected by access governance.
ctrl_ingestion_accessstatus, error_message, applied_sqlOutcome, diagnostic and SQL command.
ctrl_ingestion_accessprevious_value, new_value, mode, drift_policy, revoke_unmanagedDrift and reconciliation context.
ctrl_ingestion_accessaccess_ts_utc, access_date, framework_version, ctrl_schema_versionTiming and version context.
ctrl_ingestion_locks — best-effort coordination

Locks reduce predictable collisions between cooperative ContractForge writers. They do not replace Delta optimistic concurrency.

FieldMeaning
target_tableTarget table protected by the lock.
run_idRun currently owning or previously owning the lock.
ownerLogical owner or caller identity.
acquired_at_utcLock acquisition timestamp.
expires_at_utcTTL expiration timestamp.
ttl_minutesConfigured lock TTL in minutes.
released_at_utcRelease timestamp, when released cleanly.
statusLock status such as ACTIVE, RELEASED or EXPIRED.

Useful SQL queries

Replace main.ops with the catalog and schema used for control tables.

Recent run health

SELECT
  status,
  target_table,
  mode,
  source_connector,
  runtime_type,
  rows_read,
  rows_written,
  rows_quarantined,
  duration_seconds,
  error_message,
  run_id
FROM main.ops.ctrl_ingestion_runs
WHERE run_date >= current_date() - INTERVAL 7 DAYS
ORDER BY started_at_utc DESC
LIMIT 100;

Failure triage with full stack trace

SELECT
  r.started_at_utc,
  r.target_table,
  r.source_connector,
  r.mode,
  r.runtime_type,
  r.error_message AS short_error,
  e.error_type,
  e.stack_trace
FROM main.ops.ctrl_ingestion_runs r
LEFT JOIN main.ops.ctrl_ingestion_errors e
  ON r.run_id = e.run_id
WHERE r.status = 'FAILED'
  AND r.run_date >= current_date() - INTERVAL 7 DAYS
ORDER BY r.started_at_utc DESC;

Freshness and watermark drift

SELECT
  target_table,
  watermark_column,
  watermark_value,
  last_success_at_utc,
  last_status,
  last_rows_written,
  last_delta_version,
  current_timestamp() - last_success_at_utc AS time_since_success
FROM main.ops.ctrl_ingestion_state
ORDER BY last_success_at_utc ASC NULLS FIRST;

Quality rules requiring attention

SELECT
  q.target_table,
  q.rule_name,
  q.severity,
  q.status,
  q.failed_count,
  q.message,
  r.run_id,
  r.started_at_utc
FROM main.ops.ctrl_ingestion_quality q
JOIN main.ops.ctrl_ingestion_runs r
  ON q.run_id = r.run_id
WHERE r.run_date >= current_date() - INTERVAL 14 DAYS
  AND q.failed_count > 0
ORDER BY r.started_at_utc DESC, q.failed_count DESC;

Quarantine volume by target and rule

SELECT
  target_table,
  rule_name,
  count(*) AS quarantined_records,
  max(quarantined_at_utc) AS latest_quarantine
FROM main.ops.ctrl_ingestion_quarantine
GROUP BY target_table, rule_name
ORDER BY quarantined_records DESC;

Schema changes by table

SELECT
  target_table,
  change_type,
  column_name,
  source_type,
  target_type,
  applied,
  change_ts_utc,
  details_json
FROM main.ops.ctrl_ingestion_schema_changes
ORDER BY change_ts_utc DESC;

Streaming aggregate versus child runs

WITH child AS (
  SELECT
    parent_run_id AS stream_run_id,
    count(*) AS child_batches,
    sum(coalesce(rows_read, 0)) AS child_rows_read,
    sum(coalesce(rows_written, 0)) AS child_rows_written,
    sum(coalesce(rows_quarantined, 0)) AS child_rows_quarantined
  FROM main.ops.ctrl_ingestion_runs
  WHERE parent_run_id IS NOT NULL
  GROUP BY parent_run_id
)
SELECT
  s.stream_run_id,
  s.status,
  s.batches_processed,
  c.child_batches,
  s.total_rows_read,
  c.child_rows_read,
  s.total_rows_written,
  c.child_rows_written,
  s.total_rows_quarantined,
  c.child_rows_quarantined
FROM main.ops.ctrl_ingestion_streams s
LEFT JOIN child c
  ON s.stream_run_id = c.stream_run_id
ORDER BY s.started_at_utc DESC;

Slow stages from stage durations

SELECT
  run_id,
  target_table,
  mode,
  duration_seconds,
  get_json_object(stage_durations_json, '$.read') AS read_seconds,
  get_json_object(stage_durations_json, '$.quality') AS quality_seconds,
  get_json_object(stage_durations_json, '$.write') AS write_seconds,
  get_json_object(stage_durations_json, '$.state') AS state_seconds,
  get_json_object(stage_durations_json, '$.lineage') AS lineage_seconds
FROM main.ops.ctrl_ingestion_runs
WHERE run_date >= current_date() - INTERVAL 7 DAYS
ORDER BY duration_seconds DESC
LIMIT 50;

Throughput and operational cost estimate

WITH rates AS (
  SELECT 2.0 AS dbu_per_hour, 0.40 AS currency_per_dbu
)
SELECT
  r.target_table,
  r.mode,
  r.source_connector,
  r.rows_read,
  r.rows_written,
  r.duration_seconds,
  CASE WHEN r.duration_seconds > 0 THEN r.rows_written / r.duration_seconds END AS rows_written_per_second,
  (r.duration_seconds / 3600.0) * rates.dbu_per_hour AS estimated_dbu,
  (r.duration_seconds / 3600.0) * rates.dbu_per_hour * rates.currency_per_dbu AS estimated_cost
FROM main.ops.ctrl_ingestion_runs r
CROSS JOIN rates
WHERE r.status = 'SUCCESS'
  AND r.run_date >= current_date() - INTERVAL 30 DAYS
ORDER BY estimated_cost DESC;

Governance application status

SELECT
  target_table,
  annotation_scope,
  annotation_type,
  column_name,
  key,
  status,
  error_message,
  annotation_ts_utc
FROM main.ops.ctrl_ingestion_annotations
ORDER BY annotation_ts_utc DESC;

Access drift and failed security operations

SELECT
  target_table,
  access_type,
  principal,
  privilege,
  column_name,
  function_name,
  mode,
  drift_policy,
  status,
  error_message,
  access_ts_utc
FROM main.ops.ctrl_ingestion_access
WHERE status <> 'APPLIED'
ORDER BY access_ts_utc DESC;

Active or expired locks

SELECT
  target_table,
  run_id,
  owner,
  status,
  acquired_at_utc,
  expires_at_utc,
  released_at_utc,
  CASE
    WHEN status = 'ACTIVE' AND expires_at_utc < current_timestamp() THEN 'EXPIRED_BUT_NOT_RELEASED'
    ELSE status
  END AS effective_status
FROM main.ops.ctrl_ingestion_locks
ORDER BY acquired_at_utc DESC;

Framework version inventory

SELECT
  framework_version,
  ctrl_schema_version,
  runtime_type,
  spark_version,
  python_version,
  count(*) AS runs
FROM main.ops.ctrl_ingestion_runs
WHERE run_date >= current_date() - INTERVAL 30 DAYS
GROUP BY framework_version, ctrl_schema_version, runtime_type, spark_version, python_version
ORDER BY runs DESC;

Operational workflows

Daily health dashboard

Base it on ctrl_ingestion_runs, grouped by status, target_table, mode, source_connector and runtime_type. Include recent error messages and links to run ids.

Freshness monitoring

Use ctrl_ingestion_state.last_success_at_utc plus ctrl_ingestion_operations.freshness_sla_minutes. Alert outside ContractForge; the library records the metadata and evidence.

Quality response

Use ctrl_ingestion_quality for rule-level summary and ctrl_ingestion_quarantine for record-level remediation. Abort rules and quarantine rules require different response paths.

Runtime support

Start with ctrl_ingestion_runs.error_message, then join ctrl_ingestion_errors for full trace, and check framework_version, runtime_type and connector metadata before changing code.

Maintenance commands

Retention and cost reports are operational utilities. Cost reports estimate runtime cost from user-provided rates; they are not a substitute for cloud billing exports.

contractforge maintenance ctrl-retention --catalog main --ctrl-schema ops --retention-days 30
contractforge maintenance cost-report --catalog main --ctrl-schema ops --dbu-per-hour 2 --currency-per-dbu 0.40
Retention guidance

Apply retention first to high-volume date-partitioned tables such as runs and errors. Keep quarantine and schema-change history according to the organization's audit and data-quality remediation policy.