Auto Loader
Use Auto Loader when files arrive over time and Databricks should discover them with checkpoints. ContractForge supports finite available_now execution and records both stream-level and child batch evidence.
When to use it
Available-now ingestion
Run Auto Loader until currently available files are processed, then stop and return a stream result.
Continuous streaming apps
Long-running streaming applications should be implemented as dedicated streaming jobs outside ContractForge.
Child runs
Each micro-batch executes an internal ingest_plan with a child run id linked to the stream run.
Aggregated metrics
ctrl_ingestion_streams stores processed batch count and total rows read/written/quarantined.
Runtime requirements
| Requirement | Details |
|---|---|
| Databricks Auto Loader | The runtime must support cloudFiles and Structured Streaming available-now execution. |
| Source access | The cluster or serverless environment must read the landing path through Volume, External Location or cloud credentials. |
| Checkpoint access | The runtime must read and write the checkpoint location across retries and job restarts. |
| Schema location | The schema tracking path must be durable and table-specific. |
JSON available-now example
- YAML
- Python
source:
type: connector
connector: autoloader
format: json
path: /Volumes/main/landing/orders_json
read:
checkpoint_location: /Volumes/main/ops/checkpoints/orders_json
schema_location: /Volumes/main/ops/schemas/orders_json
available_now: true
schema: "order_id STRING, updated_at TIMESTAMP, payload STRUCT<amount:DOUBLE,status:STRING>"
cloudFiles.includeExistingFiles: true
cloudFiles.maxFilesPerTrigger: 100
target:
catalog: main
schema: bronze_sales
table: b_orders_autoloader
layer: bronze
mode: append
quality_rules:
not_null: [order_id]
from contractforge import ingest
result = ingest(
source={
"type": "connector",
"connector": "autoloader",
"format": "json",
"path": "/Volumes/main/landing/orders_json",
"read": {
"checkpoint_location": "/Volumes/main/ops/checkpoints/orders_json",
"schema_location": "/Volumes/main/ops/schemas/orders_json",
"available_now": True,
"schema": "order_id STRING, updated_at TIMESTAMP, payload STRUCT<amount:DOUBLE,status:STRING>",
"cloudFiles.includeExistingFiles": True,
"cloudFiles.maxFilesPerTrigger": 100,
},
},
catalog="main",
target_schema="bronze_sales",
target_table="b_orders_autoloader",
layer="bronze",
mode="append",
quality_rules={"not_null": ["order_id"]},
)
CSV available-now example
- YAML
- Python
source:
type: connector
connector: autoloader
format: csv
path: /Volumes/main/landing/cdc_csv
read:
checkpoint_location: /Volumes/main/ops/checkpoints/cdc_csv
schema_location: /Volumes/main/ops/schemas/cdc_csv
available_now: true
schema: "submission_date DATE, state STRING, tot_cases BIGINT"
cloudFiles.includeExistingFiles: true
cloudFiles.schemaHints: "submission_date DATE, state STRING, tot_cases BIGINT"
options:
header: true
delimiter: ","
target:
catalog: main
schema: bronze_health
table: b_cdc_autoloader
layer: bronze
mode: append
from contractforge import ingest
result = ingest(
source={
"type": "connector",
"connector": "autoloader",
"format": "csv",
"path": "/Volumes/main/landing/cdc_csv",
"read": {
"checkpoint_location": "/Volumes/main/ops/checkpoints/cdc_csv",
"schema_location": "/Volumes/main/ops/schemas/cdc_csv",
"available_now": True,
"schema": "submission_date DATE, state STRING, tot_cases BIGINT",
"cloudFiles.includeExistingFiles": True,
"cloudFiles.schemaHints": "submission_date DATE, state STRING, tot_cases BIGINT",
},
"options": {
"header": True,
"delimiter": ",",
},
},
catalog="main",
target_schema="bronze_health",
target_table="b_cdc_autoloader",
layer="bronze",
mode="append",
)
Execution lifecycle
- Start stream run. ContractForge writes a parent record in
ctrl_ingestion_streams. - Open Auto Loader readStream. The source resolver builds a
cloudFilesreader from contract options. - Process micro-batches. Each batch calls
ingest_planinternally withsource=batch_df. - Aggregate metrics. Batch result metrics are normalized and written to the stream control table.
- Finish or fail. Stream status and errors are persisted before the caller receives success or an exception.
Checkpoint and schema locations
Use stable locations that survive job restarts. Avoid ephemeral workspace paths for production checkpoints.
| Location | Purpose | Recommendation |
|---|---|---|
checkpoint_location | Tracks processed files and stream state. | Use a governed Volume or cloud storage path dedicated to the table. |
schema_location | Stores Auto Loader schema tracking information. | Use a stable path separate from source data and target table. |
| Source path | Incoming files. | Use one table-specific landing prefix when possible. |
Stream observability
Use stream metrics to verify that available-now ingestion processed files and wrote rows. ContractForge aggregates micro-batch results and child run metrics into the stream control table.
SELECT
stream_run_id,
status,
target_table,
batches_processed,
total_rows_read,
total_rows_written,
total_rows_quarantined,
error_message
FROM main.ops.ctrl_ingestion_streams
ORDER BY started_at_utc DESC;
SELECT
parent_run_id,
run_id,
status,
rows_read,
rows_written,
rows_quarantined
FROM main.ops.ctrl_ingestion_runs
WHERE parent_run_id = '<stream_run_id>'
ORDER BY started_at_utc;
Quality and quarantine in micro-batches
Quality rules apply inside each micro-batch. If rows are quarantined, the child batch result records row-level metrics and the stream aggregate sums them.
- YAML
- Python
quality_rules:
not_null: [order_id]
expressions:
- name: positive_amount
expression: payload.amount >= 0
severity: quarantine
message: Amount must be non-negative
on_quality_fail: quarantine
from contractforge import ingest
autoloader_source = {
"type": "connector",
"connector": "autoloader",
"format": "json",
"path": "/Volumes/main/landing/orders_json",
"read": {
"checkpoint_location": "/Volumes/main/ops/checkpoints/orders_json",
"schema_location": "/Volumes/main/ops/schemas/orders_json",
"available_now": True,
"schema": "order_id STRING, updated_at TIMESTAMP, payload STRUCT<amount:DOUBLE,status:STRING>",
},
}
quality_rules = {
"not_null": ["order_id"],
"expressions": [
{
"name": "positive_amount",
"expression": "payload.amount >= 0",
"severity": "quarantine",
"message": "Amount must be non-negative",
}
],
}
result = ingest(
source=autoloader_source,
catalog="main",
target_schema="bronze_sales",
target_table="b_orders_autoloader",
layer="bronze",
mode="append",
quality_rules=quality_rules,
on_quality_fail="quarantine",
)
Common issues
| Symptom | Likely cause | Action |
|---|---|---|
| Files not picked up | Checkpoint already marked them processed or includeExistingFiles is false. | Use a new checkpoint for tests or validate cloudFiles options. |
| Schema errors | Inferred schema drift or missing schema location. | Declare schema/schema hints and use stable schema location. |
| Stream says zero rows but target has data | Metric aggregation bug or child run mismatch. | Check ctrl_ingestion_runs.parent_run_id; current versions aggregate child metrics. |
| Access denied | Runtime cannot access source/checkpoint/schema path. | Fix External Location, Volume grants or cluster storage credentials. |
| Continuous stream expected | Contract uses available-now semantics. | Use a dedicated streaming job outside ContractForge for always-on streams. |