When to use it

Supported

Available-now ingestion

Run Auto Loader until currently available files are processed, then stop and return a stream result.

Not the goal

Continuous streaming apps

Long-running streaming applications should be implemented as dedicated streaming jobs outside ContractForge.

Observed

Child runs

Each micro-batch executes an internal ingest_plan with a child run id linked to the stream run.

Audited

Aggregated metrics

ctrl_ingestion_streams stores processed batch count and total rows read/written/quarantined.

Runtime requirements

RequirementDetails
Databricks Auto LoaderThe runtime must support cloudFiles and Structured Streaming available-now execution.
Source accessThe cluster or serverless environment must read the landing path through Volume, External Location or cloud credentials.
Checkpoint accessThe runtime must read and write the checkpoint location across retries and job restarts.
Schema locationThe schema tracking path must be durable and table-specific.

JSON available-now example

source:
  type: connector
  connector: autoloader
  format: json
  path: /Volumes/main/landing/orders_json
  read:
    checkpoint_location: /Volumes/main/ops/checkpoints/orders_json
    schema_location: /Volumes/main/ops/schemas/orders_json
    available_now: true
    schema: "order_id STRING, updated_at TIMESTAMP, payload STRUCT<amount:DOUBLE,status:STRING>"
    cloudFiles.includeExistingFiles: true
    cloudFiles.maxFilesPerTrigger: 100

target:
  catalog: main
  schema: bronze_sales
  table: b_orders_autoloader

layer: bronze
mode: scd0_append
quality_rules:
  not_null: [order_id]

CSV available-now example

source:
  type: connector
  connector: autoloader
  format: csv
  path: /Volumes/main/landing/cdc_csv
  read:
    checkpoint_location: /Volumes/main/ops/checkpoints/cdc_csv
    schema_location: /Volumes/main/ops/schemas/cdc_csv
    available_now: true
    schema: "submission_date DATE, state STRING, tot_cases BIGINT"
    cloudFiles.includeExistingFiles: true
    cloudFiles.schemaHints: "submission_date DATE, state STRING, tot_cases BIGINT"
  options:
    header: true
    delimiter: ","

target:
  catalog: main
  schema: bronze_health
  table: b_cdc_autoloader

layer: bronze
mode: scd0_append

Execution lifecycle

  1. Start stream run. ContractForge writes a parent record in ctrl_ingestion_streams.
  2. Open Auto Loader readStream. The source resolver builds a cloudFiles reader from contract options.
  3. Process micro-batches. Each batch calls ingest_plan internally with source=batch_df.
  4. Aggregate metrics. Batch result metrics are normalized and written to the stream control table.
  5. Finish or fail. Stream status and errors are persisted before the caller receives success or an exception.

Checkpoint and schema locations

Use stable locations that survive job restarts. Avoid ephemeral workspace paths for production checkpoints.

LocationPurposeRecommendation
checkpoint_locationTracks processed files and stream state.Use a governed Volume or cloud storage path dedicated to the table.
schema_locationStores Auto Loader schema tracking information.Use a stable path separate from source data and target table.
Source pathIncoming files.Use one table-specific landing prefix when possible.

Stream observability

Use stream metrics to verify that available-now ingestion processed files and wrote rows. ContractForge aggregates micro-batch results and child run metrics into the stream control table.

SELECT
  stream_run_id,
  status,
  target_table,
  batches_processed,
  total_rows_read,
  total_rows_written,
  total_rows_quarantined,
  error_message
FROM main.ops.ctrl_ingestion_streams
ORDER BY started_at_utc DESC;
SELECT
  parent_run_id,
  run_id,
  status,
  rows_read,
  rows_written,
  rows_quarantined
FROM main.ops.ctrl_ingestion_runs
WHERE parent_run_id = '<stream_run_id>'
ORDER BY started_at_utc;

Quality and quarantine in micro-batches

Quality rules apply inside each micro-batch. If rows are quarantined, the child batch result records row-level metrics and the stream aggregate sums them.

quality_rules:
  not_null: [order_id]
  expressions:
    - name: positive_amount
      expression: payload.amount >= 0
      severity: quarantine
      message: Amount must be non-negative
on_quality_fail: quarantine

Common issues

SymptomLikely causeAction
Files not picked upCheckpoint already marked them processed or includeExistingFiles is false.Use a new checkpoint for tests or validate cloudFiles options.
Schema errorsInferred schema drift or missing schema location.Declare schema/schema hints and use stable schema location.
Stream says zero rows but target has dataMetric aggregation bug or child run mismatch.Check ctrl_ingestion_runs.parent_run_id; current versions aggregate child metrics.
Access deniedRuntime cannot access source/checkpoint/schema path.Fix External Location, Volume grants or cluster storage credentials.
Continuous stream expectedContract uses available-now semantics.Use a dedicated streaming job outside ContractForge for always-on streams.