Finite file sets
Use file connectors for bounded folders, one-time backfills, exports and public datasets that Spark can read directly.
Connector
Use file connectors when the runtime can already read the path through Spark: Workspace files, Volumes, DBFS, mounted storage, local paths, External Locations or cloud storage paths configured by the platform.
Use file connectors for bounded folders, one-time backfills, exports and public datasets that Spark can read directly.
Declare Spark DDL schemas for repeatable ingestion instead of relying on inference in scheduled jobs.
Use transform.shape to parse, flatten or explode nested JSON/struct/array content after extraction.
Use glob filters, recursive lookup and regex file selection only when the runtime can list the path efficiently.
| Requirement | Details |
|---|---|
| Path access | The Spark runtime must be able to list/read the path through local filesystem, DBFS, Volume, External Location or cloud storage configuration. |
| Format support | CSV, JSON, Parquet, Delta, ORC and text are usually built in. Avro and XML may require extra runtime libraries. |
| Schema stability | Production contracts should provide explicit schema for ambiguous formats such as CSV, JSON and XML. |
| Listing scale | Regex filtering requires file listing. Use safety limits for very large folders. |
ContractForge delegates file parsing to Spark readers and adds contract validation, source metadata, regex file selection and consistent operational logging.
| Format | Connector value | Typical use | Notes |
|---|---|---|---|
| CSV | csv | Delimited extracts, public datasets, SaaS exports. | Always declare delimiter/header/schema for repeatability. |
| JSON | json | One JSON object per line or Spark-readable JSON files. | For nested documents, combine with transform.shape. |
| JSONL / NDJSON | jsonl, ndjson | Line-delimited API dumps and event logs. | Aliases normalize to Spark JSON reading behavior. |
| Parquet | parquet | Columnar landing data and internal lakehouse exchange. | Preferred for typed high-volume file ingestion. |
| Delta path | delta | Path-based Delta tables outside catalog registration. | Use catalog table connector when the table is registered. |
| ORC | orc | Hive-compatible columnar datasets. | Runtime must support ORC. |
| Avro | avro | Schema-driven event or messaging extracts. | Requires Spark Avro support in the runtime. |
| XML | xml | Legacy systems and regulatory files. | Requires a Spark XML reader package in many runtimes. |
| Text | text | Logs, fixed-width staging or raw capture. | Often followed by explicit parsing in a downstream step. |
Options under source.read are validated by ContractForge when they affect framework behavior. Other reader options under source.options are passed to Spark.
| Option | Use |
|---|---|
schema | Explicit Spark DDL schema. Recommended for most production contracts. |
recursiveFileLookup | Read files recursively from nested folders. |
pathGlobFilter | Spark-native glob filtering, for example *.json. |
file_regex | ContractForge regex filtering over filename or relative path after listing. |
file_regex_scope | filename or relative_path. |
file_regex_max_listed | Safety limit for listing-based regex selection. |
source_complete | Evidence that a read represents the full expected source slice, required by some modes. |
CSV is common but ambiguous. Declare header, delimiter, quote/escape behavior and schema instead of relying on inference.
source:
type: connector
connector: csv
path: /Volumes/main/landing/cdc/covid/*.csv
options:
header: true
delimiter: ","
quote: "\""
escape: "\""
mode: PERMISSIVE
dateFormat: yyyy-MM-dd
timestampFormat: yyyy-MM-dd'T'HH:mm:ssX
read:
schema: "submission_date DATE, state STRING, tot_cases BIGINT, new_case BIGINT"
source_complete: true
target:
catalog: main
schema: bronze_health
table: b_cdc_covid
layer: bronze
mode: scd0_overwrite
schema_policy: additive_only
from contractforge import ingest
result = ingest(
source={
"type": "connector",
"connector": "csv",
"path": "/Volumes/main/landing/cdc/covid/*.csv",
"options": {
"header": True,
"delimiter": ",",
"quote": "\"",
"escape": "\"",
"mode": "PERMISSIVE",
"dateFormat": "yyyy-MM-dd",
"timestampFormat": "yyyy-MM-dd'T'HH:mm:ssX",
},
"read": {
"schema": "submission_date DATE, state STRING, tot_cases BIGINT, new_case BIGINT",
"source_complete": True,
},
},
catalog="main",
target_schema="bronze_health",
target_table="b_cdc_covid",
layer="bronze",
mode="scd0_overwrite",
schema_policy="additive_only",
)
For complete public CSV extracts, combine an explicit schema with full overwrite semantics and mark the source as complete when downstream modes depend on that guarantee.
Use Spark JSON for files that are already line-oriented or Spark-readable. If the file contains a raw document that needs domain modeling, read it as text or REST raw payload and parse with transform.shape.parse_json.
source:
type: connector
connector: json
path: /Volumes/main/landing/events/date=2026-05-17/
read:
schema: "event_id STRING, event_ts TIMESTAMP, properties STRUCT<country:STRING,amount:DOUBLE>"
recursiveFileLookup: true
pathGlobFilter: "*.json"
target:
catalog: main
schema: bronze_events
table: b_events
layer: bronze
mode: scd0_append
quality_rules:
not_null: [event_id]
from contractforge import ingest
result = ingest(
source={
"type": "connector",
"connector": "json",
"path": "/Volumes/main/landing/events/date=2026-05-17/",
"read": {
"schema": "event_id STRING, event_ts TIMESTAMP, properties STRUCT<country:STRING,amount:DOUBLE>",
"recursiveFileLookup": True,
"pathGlobFilter": "*.json",
},
},
catalog="main",
target_schema="bronze_events",
target_table="b_events",
layer="bronze",
mode="scd0_append",
quality_rules={"not_null": ["event_id"]},
)
Use jsonl or ndjson aliases when the naming helps reviewers understand that each line is an independent record.
source:
type: connector
connector: ndjson
path: /Volumes/main/landing/api/orders/orders_20260517.ndjson
read:
schema: "order_id STRING, updated_at TIMESTAMP, customer_id STRING, amount DOUBLE"
target:
catalog: main
schema: bronze_sales
table: b_orders_api
layer: bronze
mode: scd0_append
watermark_columns: [updated_at]
from contractforge import ingest
result = ingest(
source={
"type": "connector",
"connector": "ndjson",
"path": "/Volumes/main/landing/api/orders/orders_20260517.ndjson",
"read": {"schema": "order_id STRING, updated_at TIMESTAMP, customer_id STRING, amount DOUBLE"},
},
catalog="main",
target_schema="bronze_sales",
target_table="b_orders_api",
layer="bronze",
mode="scd0_append",
watermark_columns=["updated_at"],
)
Parquet is the safest default for large typed file ingestion. It preserves schema and is efficient for column pruning and partitioned folders.
source:
type: connector
connector: parquet
path: s3://company-landing/orders/year=2026/month=05/
read:
recursiveFileLookup: true
schema: "order_id STRING, updated_at TIMESTAMP, amount DOUBLE, status STRING"
target:
catalog: main
schema: silver_sales
table: s_orders
layer: silver
mode: scd1_upsert
merge_keys: [order_id]
transform:
deduplicate:
keys: [order_id]
order_by: updated_at DESC NULLS LAST
quality_rules:
not_null: [order_id]
unique_key: [order_id]
from contractforge import ingest
result = ingest(
source={
"type": "connector",
"connector": "parquet",
"path": "s3://company-landing/orders/year=2026/month=05/",
"read": {
"recursiveFileLookup": True,
"schema": "order_id STRING, updated_at TIMESTAMP, amount DOUBLE, status STRING",
},
},
catalog="main",
target_schema="silver_sales",
target_table="s_orders",
layer="silver",
mode="scd1_upsert",
merge_keys=["order_id"],
transform={"deduplicate": {"keys": ["order_id"], "order_by": "updated_at DESC NULLS LAST"}},
quality_rules={"not_null": ["order_id"], "unique_key": ["order_id"]},
)
Use the Delta path connector when data is stored as Delta but not registered as a table in the catalog. If it is registered, prefer Table and SQL.
source:
type: connector
connector: delta
path: /mnt/legacy/delta/customers
read:
source_complete: true
target:
catalog: main
schema: silver_crm
table: s_customers
layer: silver
mode: snapshot_soft_delete
merge_keys: [customer_id]
from contractforge import ingest
result = ingest(
source={
"type": "connector",
"connector": "delta",
"path": "/mnt/legacy/delta/customers",
"read": {"source_complete": True},
},
catalog="main",
target_schema="silver_crm",
target_table="s_customers",
layer="silver",
mode="snapshot_soft_delete",
merge_keys=["customer_id"],
)
snapshot_soft_delete requires a complete source. Do not combine this pattern with watermark filters or partial folder slices.
ORC is useful for Hive-compatible systems and historical Hadoop datasets. ContractForge passes the read to Spark, so runtime ORC support is required.
source:
type: connector
connector: orc
path: /Volumes/main/landing/hive/orders_orc/
read:
schema: "order_id STRING, updated_at TIMESTAMP, amount DECIMAL(18,2)"
recursiveFileLookup: true
target:
catalog: main
schema: bronze_legacy
table: b_orders_orc
layer: bronze
mode: scd0_append
from contractforge import ingest
result = ingest(
source={
"type": "connector",
"connector": "orc",
"path": "/Volumes/main/landing/hive/orders_orc/",
"read": {
"schema": "order_id STRING, updated_at TIMESTAMP, amount DECIMAL(18,2)",
"recursiveFileLookup": True,
},
},
catalog="main",
target_schema="bronze_legacy",
target_table="b_orders_orc",
layer="bronze",
mode="scd0_append",
)
Avro support depends on the runtime. Databricks runtimes commonly include Avro support; local Spark may need the Spark Avro package.
source:
type: connector
connector: avro
path: /Volumes/main/landing/kafka/orders_avro/
read:
recursiveFileLookup: true
target:
catalog: main
schema: bronze_streams
table: b_orders_avro
layer: bronze
mode: scd0_append
quality_rules:
expressions:
- name: has_event_id
expression: event_id IS NOT NULL
severity: abort
from contractforge import ingest
result = ingest(
source={
"type": "connector",
"connector": "avro",
"path": "/Volumes/main/landing/kafka/orders_avro/",
"read": {"recursiveFileLookup": True},
},
catalog="main",
target_schema="bronze_streams",
target_table="b_orders_avro",
layer="bronze",
mode="scd0_append",
quality_rules={
"expressions": [
{"name": "has_event_id", "expression": "event_id IS NOT NULL", "severity": "abort"}
]
},
)
If your runtime lacks Spark Avro, install the matching Spark package before using the connector.
XML is usually not built into Apache Spark by default. Use it when the runtime has a compatible Spark XML reader installed, and always declare row tag and schema.
source:
type: connector
connector: xml
path: /Volumes/main/landing/regulatory/invoices/*.xml
options:
rowTag: invoice
attributePrefix: "_"
valueTag: "_value"
read:
schema: "invoice_id STRING, issued_at TIMESTAMP, total DOUBLE, customer STRUCT<id:STRING,name:STRING>"
target:
catalog: main
schema: bronze_regulatory
table: b_invoices_xml
layer: bronze
mode: scd0_append
from contractforge import ingest
result = ingest(
source={
"type": "connector",
"connector": "xml",
"path": "/Volumes/main/landing/regulatory/invoices/*.xml",
"options": {"rowTag": "invoice", "attributePrefix": "_", "valueTag": "_value"},
"read": {
"schema": "invoice_id STRING, issued_at TIMESTAMP, total DOUBLE, customer STRUCT<id:STRING,name:STRING>"
},
},
catalog="main",
target_schema="bronze_regulatory",
target_table="b_invoices_xml",
layer="bronze",
mode="scd0_append",
)
Install the XML Spark datasource on classic clusters, or use a runtime where the XML reader is already available. If the library is missing, the connector should fail with the underlying Spark reader error.
Use text for raw line capture, logs, fixed-width staging or files that will be parsed later. The output contains Spark's standard value column unless a downstream transformation changes it.
source:
type: connector
connector: text
path: /Volumes/main/landing/logs/app/*.log
read:
recursiveFileLookup: true
file_regex: "^app-[0-9]{8}.*\\.log$"
file_regex_scope: filename
target:
catalog: main
schema: bronze_logs
table: b_app_logs
layer: bronze
mode: scd0_append
column_mapping:
value: raw_line
from contractforge import ingest
result = ingest(
source={
"type": "connector",
"connector": "text",
"path": "/Volumes/main/landing/logs/app/*.log",
"read": {
"recursiveFileLookup": True,
"file_regex": r"^app-[0-9]{8}.*\.log$",
"file_regex_scope": "filename",
},
},
catalog="main",
target_schema="bronze_logs",
target_table="b_app_logs",
layer="bronze",
mode="scd0_append",
column_mapping={"value": "raw_line"},
)
Folder reads should be explicit about recursion, schema and filtering. Spark glob filters are cheap and should be preferred for simple suffix/prefix matching. ContractForge regex filtering is useful when the selection logic cannot be represented as a glob.
source:
type: connector
connector: json
path: abfss://landing@account.dfs.core.windows.net/orders/
read:
schema: "order_id STRING, event_date DATE, updated_at TIMESTAMP, amount DOUBLE"
recursiveFileLookup: true
pathGlobFilter: "*.json"
file_regex: "^year=2026/month=05/day=[0-9]{2}/orders_.*\\.json$"
file_regex_scope: relative_path
file_regex_max_listed: 100000
target:
catalog: main
schema: bronze_sales
table: b_orders_files
layer: bronze
mode: scd0_append
from contractforge import ingest
result = ingest(
source={
"type": "connector",
"connector": "json",
"path": "abfss://landing@account.dfs.core.windows.net/orders/",
"read": {
"schema": "order_id STRING, event_date DATE, updated_at TIMESTAMP, amount DOUBLE",
"recursiveFileLookup": True,
"pathGlobFilter": "*.json",
"file_regex": r"^year=2026/month=05/day=[0-9]{2}/orders_.*\.json$",
"file_regex_scope": "relative_path",
"file_regex_max_listed": 100000,
},
},
catalog="main",
target_schema="bronze_sales",
target_table="b_orders_files",
layer="bronze",
mode="scd0_append",
)
Regex selection records listing and match metrics in source_metrics_json, including files_listed, files_matched and whether regex filtering was applied.
Compression is usually inferred by Spark from file extension. When a format requires additional reader behavior, keep those values in source.options.
source:
type: connector
connector: csv
path: /Volumes/main/landing/orders/orders_20260517.csv.gz
options:
header: true
compression: gzip
multiLine: false
read:
schema: "order_id STRING, updated_at TIMESTAMP, amount DOUBLE"
from contractforge import ingest
result = ingest(
source={
"type": "connector",
"connector": "csv",
"path": "/Volumes/main/landing/orders/orders_20260517.csv.gz",
"options": {"header": True, "compression": "gzip", "multiLine": False},
"read": {"schema": "order_id STRING, updated_at TIMESTAMP, amount DOUBLE"},
},
catalog="main",
target_schema="bronze_sales",
target_table="b_orders_compressed",
layer="bronze",
mode="scd0_append",
)
| Use file connector when... | Use Auto Loader when... |
|---|---|
| The file set is finite and known for the run. | Files arrive continuously or in repeated drops. |
| You want one batch read with deterministic folder filters. | You need checkpointed discovery and schema tracking. |
| The source is small enough that file listing is not a bottleneck. | The source has many arriving files and needs cloudFiles semantics. |
| You are backfilling fixed folders or validating static datasets. | You need available_now micro-batches and stream metrics. |
source_complete: true only when the loaded folder really represents the complete source slice required by the mode.transform.shape for nested JSON, structs and arrays; do not embed business flattening in connector behavior.transform.deduplicate before merge modes when the source can contain multiple records per key.SELECT
run_id,
status,
source_connector,
source_format,
source_path,
rows_read,
rows_written,
source_metrics_json
FROM main.ops.ctrl_ingestion_runs
WHERE source_connector IN ('csv', 'json', 'jsonl', 'ndjson', 'parquet', 'delta', 'orc', 'avro', 'xml', 'text')
ORDER BY started_at_utc DESC;
| Symptom | Likely cause | Action |
|---|---|---|
| Wrong column types | Schema inference changed between files or runs. | Declare an explicit Spark DDL schema. |
| Zero rows | Wrong path, glob filter or regex selection. | Validate file listing and inspect source_metrics_json. |
| XML/Avro reader missing | Runtime library is not installed. | Install the required Spark format package on the cluster/runtime. |
| Duplicate merge keys | Folder contains multiple versions per business key. | Use transform.deduplicate before merge modes. |
| Slow startup | Large folder listing or schema inference. | Use explicit schema, partitioned paths, glob filters or Auto Loader. |
Keep extraction concerns in source, structural normalization in transform, validation in quality_rules and target semantics in mode. This separation keeps examples portable and prevents connector-specific workarounds from becoming hidden business logic.