Connector
S3
Use the S3 connector for files stored in Amazon S3. On Databricks serverless, prefer Unity Catalog External Locations or Volumes. On classic/job clusters or local Spark, direct S3A credentials can be configured when the runtime allows Hadoop filesystem settings.
When to use it
| Runtime | Recommended access | Why |
|---|---|---|
| Databricks serverless on AWS | Unity Catalog External Location or Volume | Governed storage access is managed by the platform. |
| Databricks serverless from another cloud | External Location when available, otherwise network policy/egress setup | Direct S3A configuration may be blocked by Spark Connect/serverless controls. |
| Classic/job cluster | S3A credentials, instance profile or cluster IAM role | Cluster can usually accept Hadoop filesystem configuration. |
| Local Spark | S3A credentials or default AWS chain | Useful for development and compatibility checks. |
Runtime requirements
| Runtime | Access model | Use when |
|---|---|---|
| Databricks serverless | Unity Catalog External Location or Volume | The platform blocks direct Hadoop/S3A credential configuration. |
| Classic cluster | S3A credentials, instance profile or assumed role | You control cluster libraries and Spark/Hadoop configuration. |
| Local Spark | Local AWS profile, environment variables or explicit credentials | Development and compatibility tests outside Databricks. |
Basic example
When the workspace already exposes S3 through Unity Catalog, the contract should simply reference the governed path.
source:
type: connector
connector: s3
path: s3://contractforge-s3/blob_teste/json/
format: json
read:
schema: "id STRING, event_ts TIMESTAMP, payload STRING"
recursiveFileLookup: true
target:
catalog: contractforge
schema: bronze_examples
table: b_s3_json
layer: bronze
mode: scd0_append
from contractforge import ingest
result = ingest(
source={
"type": "connector",
"connector": "s3",
"path": "s3://contractforge-s3/blob_teste/json/",
"format": "json",
"read": {
"schema": "id STRING, event_ts TIMESTAMP, payload STRING",
"recursiveFileLookup": True,
},
},
catalog="contractforge",
target_schema="bronze_examples",
target_table="b_s3_json",
layer="bronze",
mode="scd0_append",
)
If the path is not reachable through workspace governance, fix External Location, network policy or workspace storage access. Do not expect the connector to bypass platform restrictions.
Classic cluster direct credentials
Use direct credentials only on runtimes where setting S3A filesystem configuration is allowed. Store secrets outside the contract.
source:
type: connector
connector: s3
path: s3a://contractforge-s3/blob_teste/parquet/
format: parquet
auth:
access_key_id: "{{ secret:contractforge-aws/aws_access_key_id }}"
secret_access_key: "{{ secret:contractforge-aws/aws_secret_access_key }}"
session_token: "{{ secret:contractforge-aws/aws_session_token }}"
read:
schema: "id STRING, event_ts TIMESTAMP, amount DOUBLE"
recursiveFileLookup: true
target:
catalog: contractforge
schema: bronze_examples
table: b_s3_parquet
layer: bronze
mode: scd0_append
from contractforge import ingest
result = ingest(
source={
"type": "connector",
"connector": "s3",
"path": "s3a://contractforge-s3/blob_teste/parquet/",
"format": "parquet",
"auth": {
"access_key_id": "{{ secret:contractforge-aws/aws_access_key_id }}",
"secret_access_key": "{{ secret:contractforge-aws/aws_secret_access_key }}",
"session_token": "{{ secret:contractforge-aws/aws_session_token }}",
},
"read": {
"schema": "id STRING, event_ts TIMESTAMP, amount DOUBLE",
"recursiveFileLookup": True,
},
},
catalog="contractforge",
target_schema="bronze_examples",
target_table="b_s3_parquet",
layer="bronze",
mode="scd0_append",
)
If you use a session token, ensure it is valid for the full job duration. Folder reads and regex filters require list permission.
Minimum IAM permissions
For read-only ingestion, the principal generally needs object read and bucket listing on the relevant prefix.
{
"Version": "2012-10-17",
"Statement": [
{
"Effect": "Allow",
"Action": ["s3:ListBucket"],
"Resource": "arn:aws:s3:::contractforge-s3",
"Condition": {"StringLike": {"s3:prefix": ["blob_teste/*"]}}
},
{
"Effect": "Allow",
"Action": ["s3:GetObject"],
"Resource": "arn:aws:s3:::contractforge-s3/blob_teste/*"
}
]
}
Multi-format folder examples
Use the same connector with different Spark formats. Keep each contract explicit about format, schema and folder behavior instead of relying on path naming conventions alone.
CSV
source:
type: connector
connector: s3
path: s3://contractforge-s3/blob_teste/csv/
format: csv
options:
header: true
delimiter: ","
read:
schema: "id STRING, event_ts TIMESTAMP, amount DOUBLE"
source_complete: true
target:
catalog: contractforge
schema: bronze_examples
table: b_s3_csv
layer: bronze
mode: scd0_overwrite
from contractforge import ingest
result = ingest(
source={
"type": "connector",
"connector": "s3",
"path": "s3://contractforge-s3/blob_teste/csv/",
"format": "csv",
"options": {"header": True, "delimiter": ","},
"read": {
"schema": "id STRING, event_ts TIMESTAMP, amount DOUBLE",
"source_complete": True,
},
},
catalog="contractforge",
target_schema="bronze_examples",
target_table="b_s3_csv",
layer="bronze",
mode="scd0_overwrite",
)
JSON
source:
type: connector
connector: s3
path: s3://contractforge-s3/blob_teste/json/
format: json
read:
schema: "id STRING, event_ts TIMESTAMP, payload STRUCT<kind:STRING,value:DOUBLE>"
recursiveFileLookup: true
target:
catalog: contractforge
schema: bronze_examples
table: b_s3_nested_json
layer: bronze
mode: scd0_append
from contractforge import ingest
result = ingest(
source={
"type": "connector",
"connector": "s3",
"path": "s3://contractforge-s3/blob_teste/json/",
"format": "json",
"read": {
"schema": "id STRING, event_ts TIMESTAMP, payload STRUCT<kind:STRING,value:DOUBLE>",
"recursiveFileLookup": True,
},
},
catalog="contractforge",
target_schema="bronze_examples",
target_table="b_s3_nested_json",
layer="bronze",
mode="scd0_append",
)
Parquet
source:
type: connector
connector: s3
path: s3://contractforge-s3/blob_teste/parquet/
format: parquet
read:
recursiveFileLookup: true
target:
catalog: contractforge
schema: bronze_examples
table: b_s3_parquet_folder
layer: bronze
mode: scd0_append
from contractforge import ingest
result = ingest(
source={
"type": "connector",
"connector": "s3",
"path": "s3://contractforge-s3/blob_teste/parquet/",
"format": "parquet",
"read": {"recursiveFileLookup": True},
},
catalog="contractforge",
target_schema="bronze_examples",
target_table="b_s3_parquet_folder",
layer="bronze",
mode="scd0_append",
)
Avro
source:
type: connector
connector: s3
path: s3://contractforge-s3/blob_teste/avro/
format: avro
read:
recursiveFileLookup: true
target:
catalog: contractforge
schema: bronze_examples
table: b_s3_avro
layer: bronze
mode: scd0_append
from contractforge import ingest
result = ingest(
source={
"type": "connector",
"connector": "s3",
"path": "s3://contractforge-s3/blob_teste/avro/",
"format": "avro",
"read": {"recursiveFileLookup": True},
},
catalog="contractforge",
target_schema="bronze_examples",
target_table="b_s3_avro",
layer="bronze",
mode="scd0_append",
)
CSV, JSON and Parquet are broadly available in Spark runtimes. Avro requires Spark Avro support. XML requires a Spark XML datasource. If the runtime lacks the reader, ContractForge surfaces the Spark error instead of silently changing behavior.
Many files and regex selection
Use Spark globbing for simple extension filtering and file_regex when selection depends on partition-style folders or naming conventions that glob patterns cannot express clearly.
source:
type: connector
connector: s3
path: s3a://contractforge-s3/blob_teste/small_files/
format: json
auth:
access_key_id: "{{ secret:contractforge-aws/aws_access_key_id }}"
secret_access_key: "{{ secret:contractforge-aws/aws_secret_access_key }}"
session_token: "{{ secret:contractforge-aws/aws_session_token }}"
read:
schema: "id STRING, event_ts TIMESTAMP, amount DOUBLE"
recursiveFileLookup: true
pathGlobFilter: "*.json"
file_regex: "^partition_date=2026-05-[0-9]{2}/part-[0-9]+\\.json$"
file_regex_scope: relative_path
file_regex_max_listed: 100000
target:
catalog: contractforge
schema: bronze_examples
table: b_s3_small_files
layer: bronze
mode: scd0_append
from contractforge import ingest
result = ingest(
source={
"type": "connector",
"connector": "s3",
"path": "s3a://contractforge-s3/blob_teste/small_files/",
"format": "json",
"auth": {
"access_key_id": "{{ secret:contractforge-aws/aws_access_key_id }}",
"secret_access_key": "{{ secret:contractforge-aws/aws_secret_access_key }}",
"session_token": "{{ secret:contractforge-aws/aws_session_token }}",
},
"read": {
"schema": "id STRING, event_ts TIMESTAMP, amount DOUBLE",
"recursiveFileLookup": True,
"pathGlobFilter": "*.json",
"file_regex": r"^partition_date=2026-05-[0-9]{2}/part-[0-9]+\.json$",
"file_regex_scope": "relative_path",
"file_regex_max_listed": 100000,
},
},
catalog="contractforge",
target_schema="bronze_examples",
target_table="b_s3_small_files",
layer="bronze",
mode="scd0_append",
)
Operational metadata
SELECT
run_id,
source_connector,
source_path,
source_auth_redacted_json,
source_read_redacted_json,
source_metrics_json
FROM main.ops.ctrl_ingestion_runs
WHERE source_connector = 's3'
ORDER BY started_at_utc DESC;
Common issues
| Symptom | Likely cause | Action |
|---|---|---|
| Access denied on folder | Missing s3:ListBucket. | Add list permission for the prefix. |
| Access denied on file | Missing s3:GetObject. | Add object read permission for the path. |
| Works on classic, fails on serverless | Direct S3A config blocked. | Use External Location/Volume or workspace network policy. |
| Temporary token expires | STS session shorter than job. | Issue longer token or use platform-managed identity. |
| Slow file discovery | Large prefix listing. | Use narrower paths, glob filters or Auto Loader. |