Available-now ingestion
Run Auto Loader until currently available files are processed, then stop and return a stream result.
Connector
Use Auto Loader when files arrive over time and Databricks should discover them with checkpoints. ContractForge supports finite available_now execution and records both stream-level and child batch evidence.
Run Auto Loader until currently available files are processed, then stop and return a stream result.
Long-running streaming applications should be implemented as dedicated streaming jobs outside ContractForge.
Each micro-batch executes an internal ingest_plan with a child run id linked to the stream run.
ctrl_ingestion_streams stores processed batch count and total rows read/written/quarantined.
| Requirement | Details |
|---|---|
| Databricks Auto Loader | The runtime must support cloudFiles and Structured Streaming available-now execution. |
| Source access | The cluster or serverless environment must read the landing path through Volume, External Location or cloud credentials. |
| Checkpoint access | The runtime must read and write the checkpoint location across retries and job restarts. |
| Schema location | The schema tracking path must be durable and table-specific. |
source:
type: connector
connector: autoloader
format: json
path: /Volumes/main/landing/orders_json
read:
checkpoint_location: /Volumes/main/ops/checkpoints/orders_json
schema_location: /Volumes/main/ops/schemas/orders_json
available_now: true
schema: "order_id STRING, updated_at TIMESTAMP, payload STRUCT<amount:DOUBLE,status:STRING>"
cloudFiles.includeExistingFiles: true
cloudFiles.maxFilesPerTrigger: 100
target:
catalog: main
schema: bronze_sales
table: b_orders_autoloader
layer: bronze
mode: scd0_append
quality_rules:
not_null: [order_id]
source:
type: connector
connector: autoloader
format: csv
path: /Volumes/main/landing/cdc_csv
read:
checkpoint_location: /Volumes/main/ops/checkpoints/cdc_csv
schema_location: /Volumes/main/ops/schemas/cdc_csv
available_now: true
schema: "submission_date DATE, state STRING, tot_cases BIGINT"
cloudFiles.includeExistingFiles: true
cloudFiles.schemaHints: "submission_date DATE, state STRING, tot_cases BIGINT"
options:
header: true
delimiter: ","
target:
catalog: main
schema: bronze_health
table: b_cdc_autoloader
layer: bronze
mode: scd0_append
ctrl_ingestion_streams.cloudFiles reader from contract options.ingest_plan internally with source=batch_df.Use stable locations that survive job restarts. Avoid ephemeral workspace paths for production checkpoints.
| Location | Purpose | Recommendation |
|---|---|---|
checkpoint_location | Tracks processed files and stream state. | Use a governed Volume or cloud storage path dedicated to the table. |
schema_location | Stores Auto Loader schema tracking information. | Use a stable path separate from source data and target table. |
| Source path | Incoming files. | Use one table-specific landing prefix when possible. |
Use stream metrics to verify that available-now ingestion processed files and wrote rows. ContractForge aggregates micro-batch results and child run metrics into the stream control table.
SELECT
stream_run_id,
status,
target_table,
batches_processed,
total_rows_read,
total_rows_written,
total_rows_quarantined,
error_message
FROM main.ops.ctrl_ingestion_streams
ORDER BY started_at_utc DESC;
SELECT
parent_run_id,
run_id,
status,
rows_read,
rows_written,
rows_quarantined
FROM main.ops.ctrl_ingestion_runs
WHERE parent_run_id = '<stream_run_id>'
ORDER BY started_at_utc;
Quality rules apply inside each micro-batch. If rows are quarantined, the child batch result records row-level metrics and the stream aggregate sums them.
quality_rules:
not_null: [order_id]
expressions:
- name: positive_amount
expression: payload.amount >= 0
severity: quarantine
message: Amount must be non-negative
on_quality_fail: quarantine
| Symptom | Likely cause | Action |
|---|---|---|
| Files not picked up | Checkpoint already marked them processed or includeExistingFiles is false. | Use a new checkpoint for tests or validate cloudFiles options. |
| Schema errors | Inferred schema drift or missing schema location. | Declare schema/schema hints and use stable schema location. |
| Stream says zero rows but target has data | Metric aggregation bug or child run mismatch. | Check ctrl_ingestion_runs.parent_run_id; current versions aggregate child metrics. |
| Access denied | Runtime cannot access source/checkpoint/schema path. | Fix External Location, Volume grants or cluster storage credentials. |
| Continuous stream expected | Contract uses available-now semantics. | Use a dedicated streaming job outside ContractForge for always-on streams. |