Skip to main content

Auto Loader

validated

Use Auto Loader when files arrive over time and Databricks should discover them with checkpoints. ContractForge supports finite available_now execution and records both stream-level and child batch evidence.

When to use it

Bounded streaming

Available-now ingestion

Run Auto Loader until currently available files are processed, then stop and return a stream result.

Not this model

Continuous streaming apps

Long-running streaming applications should be implemented as dedicated streaming jobs outside ContractForge.

Operational evidence

Child runs

Each micro-batch executes an internal ingest_plan with a child run id linked to the stream run.

Stream metrics

Aggregated metrics

ctrl_ingestion_streams stores processed batch count and total rows read/written/quarantined.

Runtime requirements

RequirementDetails
Databricks Auto LoaderThe runtime must support cloudFiles and Structured Streaming available-now execution.
Source accessThe cluster or serverless environment must read the landing path through Volume, External Location or cloud credentials.
Checkpoint accessThe runtime must read and write the checkpoint location across retries and job restarts.
Schema locationThe schema tracking path must be durable and table-specific.

JSON available-now example

source:
type: connector
connector: autoloader
format: json
path: /Volumes/main/landing/orders_json
read:
checkpoint_location: /Volumes/main/ops/checkpoints/orders_json
schema_location: /Volumes/main/ops/schemas/orders_json
available_now: true
schema: "order_id STRING, updated_at TIMESTAMP, payload STRUCT<amount:DOUBLE,status:STRING>"
cloudFiles.includeExistingFiles: true
cloudFiles.maxFilesPerTrigger: 100

target:
catalog: main
schema: bronze_sales
table: b_orders_autoloader

layer: bronze
mode: append
quality_rules:
not_null: [order_id]

CSV available-now example

source:
type: connector
connector: autoloader
format: csv
path: /Volumes/main/landing/cdc_csv
read:
checkpoint_location: /Volumes/main/ops/checkpoints/cdc_csv
schema_location: /Volumes/main/ops/schemas/cdc_csv
available_now: true
schema: "submission_date DATE, state STRING, tot_cases BIGINT"
cloudFiles.includeExistingFiles: true
cloudFiles.schemaHints: "submission_date DATE, state STRING, tot_cases BIGINT"
options:
header: true
delimiter: ","

target:
catalog: main
schema: bronze_health
table: b_cdc_autoloader

layer: bronze
mode: append

Execution lifecycle

  1. Start stream run. ContractForge writes a parent record in ctrl_ingestion_streams.
  2. Open Auto Loader readStream. The source resolver builds a cloudFiles reader from contract options.
  3. Process micro-batches. Each batch calls ingest_plan internally with source=batch_df.
  4. Aggregate metrics. Batch result metrics are normalized and written to the stream control table.
  5. Finish or fail. Stream status and errors are persisted before the caller receives success or an exception.

Checkpoint and schema locations

Use stable locations that survive job restarts. Avoid ephemeral workspace paths for production checkpoints.

LocationPurposeRecommendation
checkpoint_locationTracks processed files and stream state.Use a governed Volume or cloud storage path dedicated to the table.
schema_locationStores Auto Loader schema tracking information.Use a stable path separate from source data and target table.
Source pathIncoming files.Use one table-specific landing prefix when possible.

Stream observability

Use stream metrics to verify that available-now ingestion processed files and wrote rows. ContractForge aggregates micro-batch results and child run metrics into the stream control table.

SELECT
stream_run_id,
status,
target_table,
batches_processed,
total_rows_read,
total_rows_written,
total_rows_quarantined,
error_message
FROM main.ops.ctrl_ingestion_streams
ORDER BY started_at_utc DESC;

SELECT
parent_run_id,
run_id,
status,
rows_read,
rows_written,
rows_quarantined
FROM main.ops.ctrl_ingestion_runs
WHERE parent_run_id = '<stream_run_id>'
ORDER BY started_at_utc;

Quality and quarantine in micro-batches

Quality rules apply inside each micro-batch. If rows are quarantined, the child batch result records row-level metrics and the stream aggregate sums them.

quality_rules:
not_null: [order_id]
expressions:
- name: positive_amount
expression: payload.amount >= 0
severity: quarantine
message: Amount must be non-negative
on_quality_fail: quarantine

Common issues

SymptomLikely causeAction
Files not picked upCheckpoint already marked them processed or includeExistingFiles is false.Use a new checkpoint for tests or validate cloudFiles options.
Schema errorsInferred schema drift or missing schema location.Declare schema/schema hints and use stable schema location.
Stream says zero rows but target has dataMetric aggregation bug or child run mismatch.Check ctrl_ingestion_runs.parent_run_id; current versions aggregate child metrics.
Access deniedRuntime cannot access source/checkpoint/schema path.Fix External Location, Volume grants or cluster storage credentials.
Continuous stream expectedContract uses available-now semantics.Use a dedicated streaming job outside ContractForge for always-on streams.