MOD-042 — CDC pipeline (Neon logical replication → S3 Iceberg → Snowflake)¶
Purpose¶
Replicates committed Postgres changes from each Neon domain database to S3 Iceberg tables, where Snowflake reads them via External Iceberg Tables — zero-copy, no Snowpipe ingestion cost. Enables the analytics, ML, AML monitoring, and regulatory reporting workloads that depend on operational data being in Snowflake within ≤ 5 min of commit.
FR scope: FR-265–268, FR-454–457, NFR-015, NFR-019, NFR-024. Architectural decisions: ADR-003.
Architecture¶
EventBridge Scheduler (60s, ENABLED)
└── CDC poller Lambda × 7 (one per domain DB)
├── direct connection to Neon (NOT pooled — pgoutput needs WAL stream)
├── ensure publication + slot (idempotent — FR-454)
├── schema-compat check (FR-457)
├── pg_logical_slot_get_changes via pg-logical-replication
├── Firehose put-record-batch
│ └── Iceberg destination
│ └── s3://bank-iceberg-{env}/cdc/{domain}/<table>/...
│ └── Glue catalog: bank_cdc_{env}_{domain}.<table>
├── persist last LSN to s3://.../cdc/{domain}/_lsn_ack.json (FR-268)
├── PutMetricData on BankPlatform/CDC (FR-456)
└── PutEvents to bank-platform bus on lag breach / drift / replay
↓
Snowflake External Iceberg Tables
(CREATE OR REPLACE ICEBERG TABLE
EXTERNAL_VOLUME=...
CATALOG=...
AUTO_REFRESH=TRUE)
Domain configuration¶
7 × Lambda, Firehose, Glue DB, Schedule, Alarms — one set per domain:
core, kyc, aml, payments, credit, risk, platform. Naming uses aml
(not fincrime) to match MOD-102's already-deployed
BANK_PROD_AML_ROLE and the rest of the platform.
| Component | Naming pattern |
|---|---|
| Lambda function | bank-cdc-{env}-{domain} |
| Firehose stream | bank-cdc-{env}-{domain} |
| Glue database | bank_cdc_{env}_{domain} |
| Schedule | bank-cdc-{env}-{domain}-60s |
| Replication slot | cdc_{domain}_slot |
| Publication | cdc_{domain}_pub |
| LSN ack key | cdc/{domain}/_lsn_ack.json |
| Alarms | bank-cdc-lag-{env}-{domain} / bank-cdc-wal-... / bank-cdc-slot-health-... |
Per-record envelope¶
Every Firehose record is a JSON line with this shape (consumed by
Firehose's Iceberg destination via schema_table routing):
{
"schema_table": "public.accounts",
"op": "insert",
"lsn": "0/16B6A50",
"committed_at": "2026-04-28T10:00:00.000Z",
"row": { ... },
"previous_row": null,
"pii_classifications": { "email": "PII_HIGH" },
"jurisdiction": "AU",
"event_date": "2026-04-28",
"source_module": "MOD-042",
"domain": "core"
}
jurisdiction and event_date are the FR-455 partition keys
Iceberg uses for partitioning. The pii_classifications map is
written as Iceberg column metadata; the Snowflake-side
PII_CLASSIFICATION tag (MOD-102 0013) is then applied via the
operator-run snowflake/pii-tag-template.sql.
SSM contract¶
Read (upstream)¶
| SSM path | Owner |
|---|---|
/bank/{env}/network/vpc-id, /private-subnet-ids |
MOD-104 |
/bank/{env}/kms/financial/arn |
MOD-104 |
/bank/{env}/s3/iceberg/name, /iceberg/arn |
MOD-104 |
/bank/{env}/s3/firehose-landing/name |
MOD-104 |
/bank/{env}/eventbridge/bank-platform/arn |
MOD-104 / MOD-043 |
/bank/{env}/sns/alerts/arn |
MOD-104 |
/bank/{env}/neon/project-id, /branch-id, /direct-host |
MOD-103 |
bank-neon/{env}/{database}/replicator (Secrets Manager) |
MOD-103 |
Write (downstream contract)¶
| SSM path | Value |
|---|---|
/bank/{env}/mod042/firehose-stream-arn/{domain} |
Firehose ARN per domain |
/bank/{env}/mod042/iceberg-prefix/{domain} |
s3://bank-iceberg-{env}/cdc/{domain}/ |
/bank/{env}/mod042/glue-database/{domain} |
bank_cdc_{env}_{domain} |
/bank/{env}/mod042/lambda-name/{domain} |
bank-cdc-{env}-{domain} |
/bank/{env}/mod042/last-lsn-key/{domain} |
cdc/{domain}/_lsn_ack.json |
5 paths × 7 domains = 35 SSM parameters.
Events published (bank-platform bus, source bank.platform.cdc)¶
| Detail-type | When | Consumers |
|---|---|---|
cdc.lag_breach |
lag > 5min OR wal_records_behind > 10k | MOD-076 alerts |
cdc.slot_health_alert |
slot inactive > 30h (Neon drops at ~40h) | MOD-076 alerts |
cdc.schema_drift_detected |
FR-457 startup check fails | MOD-076 alerts; engineer paged |
cdc.replay_started / replay_complete |
Operator-triggered replay | MOD-076 audit dashboard |
FR coverage¶
| FR | Where it lives |
|---|---|
| FR-265 (replicate every committed change, p99 ≤ 5 min) | cdc-poller Lambda + Firehose buffering 60s/128MB |
| FR-266 (schema evolution recorded in Iceberg metadata) | Firehose Iceberg destination native; column metadata via Lambda envelope |
| FR-267 (alert on lag > 5 min OR > 10k WAL) | metrics.ts + alarms.ts + cdc.lag_breach event |
| FR-268 (replay from arbitrary LSN within 7d) | Lambda payload replay_from_lsn + pg_replication_slot_advance |
| FR-454 (one slot per DB, idempotent lifecycle) | slot-bootstrap.ts runs every cold start, IF NOT EXISTS |
| FR-455 (jurisdiction + event_date partitions, PII tag) | firehose-writer.ts envelope + pii-tagger.ts + Snowflake DDL templates |
| FR-456 (5 metrics + cdc.lag_breach event) | metrics.ts + event-publisher.ts |
| FR-457 (schema-compat startup check) | schema-compat.ts + handler aborts before drain on violation |
| NFR-015 (≤ 5 min p99) | Same machinery as FR-265 |
| NFR-019 (RTO ≤ 4h / RPO ≤ 1h) | LSN ack persisted per run; replay via FR-268 |
| NFR-024 (audit log mutability = 0) | Iceberg snapshots are append-only by format |
Policy coverage¶
| Policy | Mode | How verified |
|---|---|---|
| DT-004 | AUTO | __tests__/policy/dt-004-single-pipeline.test.ts — IAM simulate that only Firehose roles can write to s3://.../cdc/; non-MOD-042 principals are denied |
| REP-005 | AUTO | __tests__/policy/rep-005-single-source.test.ts — SSM contract publishes single prefix per domain; Firehose is configured to that prefix; no parallel streams |
| AML-005 | AUTO | __tests__/policy/aml-005-five-min.test.ts — placeholder until bank_core has transactions; full E2E test gated on that arrival |
Tests¶
| Layer | Coverage |
|---|---|
| Unit | LSN compare/convert, schema-compat report, PII parser+lookup, Firehose envelope, lag computation, domain config tripwire |
| Integration | Resource existence per domain (Lambda, Firehose, Schedule, Glue DB, alarms), SSM-contract publish |
| Policy | DT-004 (IAM simulate), REP-005 (SSM ↔ Firehose prefix match), AML-005 (E2E latency, placeholder) |
| Contract | 5 detail-type payload shapes |
Operational notes¶
wal_level = logicalmust be set on the Neon project. Irreversible.- The Lambda must use the Neon direct host — not the pooler.
- Neon drops inactive replication slots at ~40 hours. We poll every 60 seconds and alert at 30 hours of inactivity.
- Each Lambda is per-domain — a slow domain doesn't block others, and a code bug's blast radius is one domain.
- Schedule starts ENABLED in all stages. CDC has no dependency-gated activation pattern (unlike MOD-159) — its dependencies are required for IaC resolution; if the deploy succeeds, the data path can run.
Manual one-time prerequisites (per env, before first deploy)¶
- Neon project parameter —
wal_level = logicalset on every domain database. MOD-103 should set this; verify before deploy. - Replicator role — each domain DB has a
replicatorrole withREPLICATIONprivilege. Created by MOD-103's seed; secret stored atbank-neon/{env}/{database}/replicator(Secrets Manager). - Glue → Snowflake IAM role —
bank-snowflake-glue-{env}role in AWS, trusted by Snowflake's external integration. Used by theexternal-table-template.sqlGLUE_AWS_ROLE_ARN field. Provisioned by MOD-104 — verify present at deploy time.
Snowflake operator workflow (per source table, post-deploy)¶
Once a source domain ships its first table and Firehose has written the first Iceberg snapshot to S3:
sed -e "s/{{DOMAIN}}/CORE/g" \
-e "s/{{ENVIRONMENT}}/dev/g" \
-e "s/{{SOURCE_SCHEMA}}/public/g" \
-e "s/{{SOURCE_TABLE}}/accounts/g" \
snowflake/external-table-template.sql \
| snowsql -f /dev/stdin
PII tags applied per-column via snowflake/pii-tag-template.sql. The
External Iceberg Table auto-refreshes at 60s polling; new columns
appear automatically.
Replay procedure (FR-268)¶
aws lambda invoke \
--function-name bank-cdc-dev-core \
--payload '{"replay_from_lsn": "0/16B6A50"}' \
--cli-binary-format raw-in-base64-out \
/tmp/out.json
Lambda calls pg_replication_slot_advance to that LSN, then drains
forward. Emits cdc.replay_started / cdc.replay_complete events.
Limit: 7-day WAL retention on Neon.