Normalize structure before quality and write.
Connectors retrieve data. Transformations make the physical DataFrame usable before quality rules and write modes run. Use this namespace for mutations that change rows, columns or values. Keep catalog descriptions, PII, aliases and business ownership in split governance contracts.
The canonical namespace
Use transform for physical DataFrame preparation. The top-level shape shortcut remains supported for simple notebooks, but new contracts should use the canonical namespace.
Execution order is deterministic:
select_columnscolumn_mappingtransform.shapetransform.casttransform.standardizetransform.derivefilter_expressioncustom_keyswatermark_columnstransform.deduplicate- encoding cleanup and control columns
transform:
shape:
parse_json:
- column: raw_response
alias: payload
schema: "STRUCT<events:ARRAY<STRUCT<id:STRING,title:STRING>>>"
arrays:
- path: payload.events
mode: explode_outer
alias: event
columns:
event.id:
alias: event_id
cast: STRING
event.title:
alias: event_title
cast: STRING
cast:
event_id: string
standardize:
event_title:
trim: true
normalize_whitespace: true
derive:
event_date: to_date(ingestion_ts_utc)
deduplicate:
keys: [event_id]
order_by:
- column: ingestion_ts_utc
direction: desc
nulls: last
Design principles
Explicit schema
Parsing JSON requires Spark DDL. Avoid relying on runtime inference for important contracts.
Cardinality is intentional
Explode operations change row counts. Bronze blocks unsafe cardinality changes unless explicitly allowed.
Columns are projections
When columns is declared, only declared aliases remain as business columns.
Connectors stay neutral
Business structuring belongs in transformations, not in connector-specific workarounds.
Metadata stays out of transform
If a field describes meaning, ownership, PII, classification or catalog discoverability, it belongs in annotations or operations, not in transform.
Where schema belongs
ContractForge uses schema declarations in two different places. They should not be treated as interchangeable fields.
| Location | Purpose | Use it for | Do not use it for |
|---|---|---|---|
source.read.schema | Physical source-row schema used while a connector or Spark reader materializes data. | CSV, JSON, XML, Avro, file folders and REST response.mode: records where inference would be fragile. | Business JSON inside an already-read string column. |
Top-level schemas with shape.parse_json.schema_ref | Reusable Spark DDL for parsing a JSON string after a DataFrame already exists. | Event Hubs body, REST raw response, text payload columns and envelope formats. | Configuring how Spark reads the original source files or connector records. |
annotations | Catalog and governance metadata. | Descriptions, aliases, tags, PII, sensitivity and data-product context. | Any transformation, cast, explode or parsing behavior. |
Practical rule
If Spark needs the schema to read the source, put it in source.read.schema. If ContractForge already has a string column and must parse its JSON content, put the DDL in top-level schemas and reference it from transform.shape.parse_json.
Transform field reference
| Field | Type | Required | Behavior |
|---|---|---|---|
shape | object | No | Parses JSON strings, flattens structs, handles arrays and projects nested columns. |
cast | map string -> string | No | Casts existing columns to Spark SQL types such as string, double, timestamp or decimal(18,2). |
standardize | map string -> object | No | Applies deterministic string cleanup to declared columns. |
standardize.<column>.trim | boolean | No | Trims leading and trailing whitespace. |
standardize.<column>.lower | boolean | No | Converts text to lowercase. Cannot be combined with upper. |
standardize.<column>.upper | boolean | No | Converts text to uppercase. Cannot be combined with lower. |
standardize.<column>.normalize_whitespace | boolean | No | Replaces repeated whitespace with a single space. |
standardize.<column>.empty_as_null | boolean | No | Converts the final empty string to null. |
derive | map string -> string | No | Creates or replaces columns using Spark SQL expressions. |
deduplicate.keys | string or list | Yes when deduplicate exists | Key columns used to keep one row per key. |
deduplicate.order_by | string or list | Yes when deduplicate exists | Deterministic ordering. Prefer the structured list form for new contracts. |
deduplicate.order_by[].column | string | Yes for structured order items | Column used for ordering. |
deduplicate.order_by[].direction | asc or desc | No | Sort direction. Defaults to desc. |
deduplicate.order_by[].nulls | first or last | No | Null ordering clause. |
Cast, standardize and derive
Use these operations when the source is already tabular enough and the target contract needs explicit value preparation without custom notebook code.
- YAML
- Python
transform:
cast:
order_id: string
amount: decimal(18,2)
updated_at: timestamp
standardize:
customer_email:
trim: true
lower: true
empty_as_null: true
status:
trim: true
upper: true
normalize_whitespace: true
derive:
order_date: to_date(updated_at)
net_amount: amount - coalesce(discount_amount, 0)
from contractforge import ingest
result = ingest(
source=df,
target_table="orders",
catalog="main",
layer="silver",
mode="append",
transform={
"cast": {
"order_id": "string",
"amount": "decimal(18,2)",
"updated_at": "timestamp",
},
"standardize": {
"customer_email": {
"trim": True,
"lower": True,
"empty_as_null": True,
},
"status": {
"trim": True,
"upper": True,
"normalize_whitespace": True,
},
},
"derive": {
"order_date": "to_date(updated_at)",
"net_amount": "amount - coalesce(discount_amount, 0)",
},
},
)
Use cast for target type preparation, standardize for simple string cleanup and derive for Spark SQL expressions that create columns. If the expression becomes large business logic, promote it to a reviewed view, notebook step or downstream model.
Shape field reference
The shape contract runs after the source connector and before quality rules and write modes. It is intended for physical normalization of nested or semi-structured records, not for business aggregation.
| Field | Type | Required | Behavior |
|---|---|---|---|
parse_json | list | No | Parses declared string columns with an explicit Spark DDL schema using from_json. |
parse_json[].column | string | Yes | Source string column or nested string path to parse. |
parse_json[].schema | string | Yes | Spark DDL schema. Use STRUCT<...>, ARRAY<...> or any valid Spark data type accepted by from_json. |
parse_json[].alias | string | No | Output column. Required when parsing a nested path. Must be a simple top-level column name. |
parse_json[].drop_source | boolean | No | Drops the original source column after parsing. Only supported for top-level source columns. |
flatten.enabled | boolean | No | Flattens struct fields into top-level columns. |
flatten.separator | string | No | Separator used in generated column names. Default behavior follows the implementation default. |
flatten.max_depth | integer | No | Maximum struct nesting depth to flatten. |
flatten.include | list | No | Top-level struct columns to flatten. Other columns are kept as-is. |
flatten.exclude | list | No | Paths to exclude from flattening. |
zip_arrays | list | No | Combines parallel arrays into one array of structs before exploding. |
zip_arrays[].alias | string | Yes | Output array column containing structs. |
zip_arrays[].columns | map | Yes | Map of source array path to output struct field name. Requires at least two arrays. |
arrays | list | No | Transforms arrays by keeping, serializing, sizing, taking first element or exploding. |
arrays[].path | string | Yes | Array path. Use dot notation; do not use [] syntax. |
arrays[].mode | enum | No | keep, to_json, size, first, explode or explode_outer. |
arrays[].alias | string | No | Output column. Required implicitly for cardinality-changing modes when no default alias is desired. |
arrays[].allow_cartesian | boolean | No | Allows multiple sibling explodes that may multiply rows. Default protects against accidental Cartesian expansion. |
columns | map | No | Final projection. When declared, only projected columns are kept. |
columns.<path>.alias | string | No | Output column. Defaults to the path with dots replaced by underscores when no expression is used. |
columns.<path>.cast | string | No | Spark SQL type used to cast the projected expression. |
columns.<path>.expression | string | No | Spark SQL expression. Requires an explicit alias. |
allow_cardinality_change_on_bronze | boolean | No | Allows explode/explode_outer in bronze contracts when the contract intentionally changes row cardinality. |
Parallel arrays
Use zip_arrays before exploding when an API returns multiple arrays that represent aligned observations.
transform:
shape:
zip_arrays:
- alias: hourly_observation
columns:
payload.hourly.time: time
payload.hourly.temperature_2m: temperature
payload.hourly.relative_humidity_2m: humidity
arrays:
- path: hourly_observation
mode: explode_outer
alias: observation
Example patterns
NASA EONET
Raw REST JSON to rows
The REST connector captures raw JSON, parse_json applies an explicit DDL schema, and array explosion turns events into silver records.
Open-Meteo
Parallel arrays
Hourly weather responses return aligned arrays. zip_arrays preserves positional meaning before exploding.
USGS earthquakes
Nested structs and coordinates
Struct fields and array indexes can be projected declaratively, keeping notebooks free of Spark column plumbing.
Blob/S3 files
Schema first
Explicit file schemas plus transform.deduplicate make object storage examples deterministic and reviewable.
Deterministic deduplication
Use transform.deduplicate when a batch can contain multiple records for the same business key and the contract has a clear ordering rule.
- YAML
- Python
transform:
deduplicate:
keys: [order_id]
order_by:
- column: updated_at
direction: desc
nulls: last
- column: sequence
direction: desc
transform = {
"deduplicate": {
"keys": ["order_id"],
"order_by": [
{"column": "updated_at", "direction": "desc", "nulls": "last"},
{"column": "sequence", "direction": "desc"},
],
}
}
For existing contracts, SQL fragments such as updated_at DESC NULLS LAST are still accepted. For new contracts, prefer the structured list because it is easier to validate and review.
Column mapping versus shape
Use column_mapping for simple source-to-target renames before technical columns are added. Use transform.shape.columns when the target projection needs casts, nested paths, expressions, array indexes or a complete structural rewrite.
column_mapping:
id: customer_id
ingestion_date: source_ingestion_date
transform:
shape:
columns:
properties.mag:
alias: magnitude
cast: DOUBLE
geometry.coordinates[0]:
alias: longitude
cast: DOUBLE
Nested arrays in any order
For nested arrays, declare the parent explode and then reference the generated alias in child operations. ContractForge resolves pending array paths iteratively, so the contract can express the intended hierarchy without custom Spark code.
transform:
shape:
arrays:
- path: payload.orders
mode: explode_outer
alias: order
- path: order.items
mode: explode_outer
alias: item
columns:
order.id:
alias: order_id
cast: STRING
item.sku:
alias: sku
cast: STRING
item.quantity:
alias: quantity
cast: INT
Cardinality rule
Two sibling explodes under the same parent are blocked unless allow_cartesian is true. Parent-child explodes are valid because each step has a clear row-expansion path.
Guardrails
- JSON parsing requires a declared Spark DDL schema; this avoids accidental inference drift.
- Changing cardinality in Bronze is blocked by default because raw layers should normally preserve source records.
- Sibling arrays are not exploded independently unless the contract avoids accidental Cartesian products.
- Deduplication order must be deterministic for merge and hash-diff modes.
loweranduppercannot be enabled together for the same standardized column.cast,standardizeanddeduplicate.keysrequire columns to exist aftershapeand before write.