Backend Registry
CFA supports pluggable code generation backends via a registry system. Each backend generates deterministic governed code from an approved ExecutionPlan.
Built-in Backends
| Backend | Language | Merge | Anonymize | Partition Overwrite | Forbidden Tokens |
|---|---|---|---|---|---|
pyspark | PySpark + Delta Lake | ✅ | ✅ (sha256, drop, tokenize, mask) | ✅ | .collect(), .toPandas(), crossJoin(), import os, import subprocess |
sql | ANSI SQL | ✅ (MERGE INTO) | ✅ (sha256, drop, md5) | ✅ (INSERT OVERWRITE) | DROP TABLE, TRUNCATE, DELETE FROM, ALTER TABLE |
dbt | dbt models + schema.yml | ✅ (unique_key) | ✅ (sha256, drop) | ✅ (partition_by) | DROP TABLE, TRUNCATE, DELETE FROM |
Each backend declares its own forbidden tokens via BackendCapabilities.forbidden_tokens. The StaticValidator queries the backend — no central hardcoded list.
Listing backends
cfa backend list
from cfa.backends import BackendRegistry
for name in BackendRegistry.singleton().list():
print(name)
# dbt, pyspark, sql
PySpark Backend
Generates PySpark code with Delta Lake operations:
from pyspark.sql import SparkSession, functions as F
spark = SparkSession.builder.getOrCreate()
# EXTRACT with partition filter
df_nfe = spark.read.format("delta").load("nfe")
df_nfe = df_nfe.filter(F.col("processing_date") >= F.lit("{{date_param}}"))
# ANONYMIZE PII columns
df_clientes = df_clientes.withColumn("cpf_hash", F.sha2(F.col("cpf").cast("string"), 256))
df_clientes = df_clientes.drop("cpf")
# JOIN with catalog merge keys
df_joined = df_nfe.join(df_clientes, on=["nfe_id"], how="inner")
# LOAD with Delta merge
target_table.alias("t").merge(
df_joined.alias("s"),
"t.nfe_id = s.nfe_id"
).whenMatchedUpdateAll().whenNotMatchedInsertAll().execute()
SQL Backend
Generates standard SQL for Snowflake, BigQuery, Postgres, DuckDB:
-- EXTRACT: nfe
SELECT * FROM "nfe"
WHERE "processing_date" >= '{date_param}'
-- JOIN: nfe + clientes
SELECT nfe_2.*, clien_2.*
FROM "nfe" nfe_2
INNER JOIN "clientes" clien_2
ON nfe_2."nfe_id" = clien_2."cliente_id"
-- LOAD with MERGE
MERGE INTO "silver_table" AS target
USING (joined_cte) AS source
ON target."nfe_id" = source."nfe_id"
WHEN MATCHED THEN UPDATE SET "nfe_id" = source."nfe_id"
WHEN NOT MATCHED THEN INSERT (*)
dbt Backend
Generates dbt model files with config blocks and schema.yml:
-- models/silver/silver_fiscal_merge.sql
{{ config(
materialized='table',
partition_by={'field': ['processing_date'], 'data_type': 'date'},
unique_key=['nfe_id'],
) }}
-- EXTRACT: nfe
SELECT * FROM {{ ref('nfe') }}
WHERE "processing_date" >= '{{ var("date_param") }}'
-- JOIN: nfe + clientes
SELECT nfe_2.*, clien_2.*
FROM {{ ref('nfe') }} nfe_2
INNER JOIN {{ ref('clientes') }} clien_2
ON nfe_2."nfe_id" = clien_2."cliente_id"
With auto-generated schema.yml:
version: 2
models:
- name: silver_fiscal_merge
description: "Governed model generated by CFA"
columns:
- name: nfe_id
tests:
- not_null
- unique
tests:
- dbt_utils.unique_combination_of_columns:
combination_of_columns:
- nfe_id
- processing_date
BackendAdapter Interface
Create custom backends by implementing the interface:
from cfa.backends import BackendAdapter, BackendCapabilities
from cfa.validation.static import ForbiddenToken
from cfa.types import FaultSeverity
class MyBackend(BackendAdapter):
def get_capabilities(self) -> BackendCapabilities:
return BackendCapabilities(
backend_name="my_backend",
supports_merge=True,
supported_languages=["my_language"],
forbidden_tokens=[
ForbiddenToken("dangerous_pattern", "STATIC_DANGER",
FaultSeverity.CRITICAL, "Dangerous pattern detected."),
],
)
def generate(self, plan: ExecutionPlan) -> GeneratedCode:
# Generate governed code for your target
...
Registering a Backend
from cfa.backends import BackendRegistry
BackendRegistry.singleton().register("my_backend", lambda: MyBackend())