feat: add load_to_lakehouse Temporal activity for Iceberg ingestion#1134
feat: add load_to_lakehouse Temporal activity for Iceberg ingestion#1134
Conversation
Add a new Temporal activity that calls the MDLH REST API to load extracted data files into Iceberg lakehouse tables. Raw parquet files are loaded after extraction completes, and transformed jsonl files are loaded during exit activities. Both loads are gated behind ENABLE_LAKEHOUSE_LOAD and per-table env var configuration. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
✅ Snyk checks have passed. No issues have been found so far.
💻 Catch issues earlier using the plugins for VS Code, JetBrains IDEs, Visual Studio, and Eclipse. |
📜 Docstring Coverage ReportRESULT: PASSED (minimum: 30.0%, actual: 79.9%) Detailed Coverage Report |
📦 Trivy Vulnerability Scan Results
Report SummaryCould not generate summary table (data length mismatch: 9 vs 8). Scan Result Detailsrequirements.txtuv.lock |
📦 Trivy Secret Scan Results
Report SummaryCould not generate summary table (data length mismatch: 9 vs 8). Scan Result Detailsrequirements.txtuv.lock |
|
🛠 Docs available at: https://k.atlan.dev/application-sdk/feat/load-to-lakehouse-activity |
☂️ Python Coverage
Overall Coverage
New FilesNo new covered files... Modified FilesNo covered modified files...
|
|
🛠 Full Test Coverage Report: https://k.atlan.dev/coverage/application-sdk/pr/1134 |
Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Co-authored-by: Claude Opus 4.6 <noreply@anthropic.com>
Co-authored-by: Claude Opus 4.6 <noreply@anthropic.com>
…s handling, and session-per-poll - Add 30s HTTP timeout to all aiohttp sessions to prevent indefinite blocking - Fail fast on non-retryable poll status codes (4xx) instead of burning all attempts - Create fresh aiohttp session per poll iteration to avoid stale connections - Rename _do_lakehouse_load → do_lakehouse_load (public cross-module API) - Add correlation headers (X-Atlan-Tenant-Id, X-Lakehouse-Job-Id) for debugging - Add cross-field validation on LhLoadRequest (require file_keys or patterns) - Catch asyncio.TimeoutError alongside aiohttp.ClientError during polling
Transformed data loading: - Load transformed data into per-entity-type Iceberg tables in entity_metadata (e.g. entity_metadata.database, entity_metadata.table) instead of a single hardcoded table - TYPENAME_TO_ICEBERG_TABLE maps SDK typenames to MDLH table names - fetch_and_transform now returns typename for downstream routing - Remove LH_LOAD_TRANSFORMED_TABLE_NAME (derived from typename) Raw data loading: - New prepare_raw_for_lakehouse activity converts raw parquet to JSONL with common metadata columns (typename, connection_qualified_name, workflow_id, workflow_run_id, extracted_at, tenant_id, entity_name, raw_record as JSON string) - Per-connector raw table: LH_LOAD_RAW_TABLE_NAME defaults to APPLICATION_NAME (e.g. raw_metadata.redshift) - Enables join between raw and transformed data via shared fields Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Move from standalone @activity.defn function to a method on BaseMetadataExtractionActivities, so connector apps don't need to import and register it separately — it's available as activities.prepare_raw_for_lakehouse just like load_to_lakehouse. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
…hardcoded map Replace TYPENAME_TO_ICEBERG_TABLE dict with _resolve_iceberg_table() that defaults to typename.lower() — matching MDLH's naming convention (lowercase of Atlas typedef). This works for all connectors (SQL, Looker, Snowflake, etc.) without needing a per-connector mapping. Only "extras-procedure" → "procedure" is kept as an override for the SDK-specific naming quirk. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Match connector-framework convention: http://lakehouse.atlas.svc.cluster.local:4541 Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
…e URL - Add prepare_raw_for_lakehouse to BaseSQLMetadataExtractionActivities (separate class hierarchy from BaseMetadataExtractionActivities) - Fix test_sql_workflow: assert 11 activities, include prepare_raw_for_lakehouse - Fix example: use correct default MDLH URL Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Move all lakehouse loading logic into MetadataExtractionWorkflow: - load_raw_to_lakehouse(): prepare + load raw data (was inline in sql.py) - load_transformed_to_lakehouse(): per-typename load (was _load_transformed_to_lakehouse) - _submit_lakehouse_load(): private helper (was _execute_lakehouse_load) sql.py run() is now a one-liner: await self.load_raw_to_lakehouse(...) All env var checks, config building, and MDLH interaction live in the base workflow — subclasses just call the public methods. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
The retry policy is shared across upload_to_atlan and lakehouse activities — it's not lakehouse-specific. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
- do_lakehouse_load -> submit_and_poll_mdlh_load - _do_prepare_raw_for_lakehouse -> convert_raw_parquet_to_jsonl Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
New files: - activities/metadata_extraction/lakehouse.py All lakehouse implementation: submit_and_poll_mdlh_load, convert_raw_parquet_to_jsonl - workflows/metadata_extraction/lakehouse.py LakehouseLoadMixin with load_raw_to_lakehouse, load_transformed_to_lakehouse, _submit_lakehouse_load, resolve_iceberg_table Existing files now only contain thin delegation: - base.py: activity methods delegate to lakehouse.py functions - sql.py: same delegation for SQL activity class - __init__.py: MetadataExtractionWorkflow inherits LakehouseLoadMixin Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
…AD is true Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Remove raw_lakehouse_config dict — the activity reads workflow_id, workflow_run_id, output_path, connection_qualified_name directly from workflow_args. Only typenames is passed via _extracted_typenames. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
os.listdir only works locally, not on S3 via Dapr. typenames are always provided by _extracted_typenames from fetch_and_transform. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
…e_load The load_to_lakehouse activity only reads lh_load_config — no need to pass the entire workflow_args through Temporal serialization. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Keep run_exit_activities and workflow_success outside the guard so they always run regardless of lakehouse config. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
The asyncio.gather call was inside the ENABLE_LAKEHOUSE_LOAD block, causing extraction to silently skip when lakehouse loading is disabled.
Fix all @patch targets from ...base.X to ...lakehouse.X so mocks actually intercept the right module references. Add tests for convert_raw_parquet_to_jsonl, resolve_iceberg_table, and load_raw_to_lakehouse that were previously uncovered.
Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
MDLH LhLoadActivityImpl.load() calls request.getFileKeys().size() without a null check. When the SDK sends only patterns (no fileKeys), exclude_none=True omits the field entirely, causing MDLH to deserialize it as null and NPE on the log line. Send file_keys=[] explicitly so the serialized payload always includes "fileKeys": [] — works around the MDLH bug.
prepare_raw_for_lakehouse writes JSONL files to local disk but never uploads them to S3. MDLH resolves the glob pattern against S3 and finds 0 files, resulting in 0 rows loaded into the Iceberg table. Add ObjectStore.upload_prefix call after JSONL generation so files are available in S3 when MDLH processes the load request.
…without lakehouse
Code ReviewThis PR adds lakehouse loading capabilities to the metadata extraction workflow. It introduces two new Temporal activities ( Confidence Score: 3/5
Important Files Changed
Change FlowsequenceDiagram
participant WF as SQL Workflow
participant Mixin as LakehouseLoadMixin
participant PrepAct as prepare_raw_for_lakehouse
participant LoadAct as load_to_lakehouse
participant HealthChk as check_lakehouse_enabled
participant MDLH as MDLH REST API
WF->>Mixin: load_raw_to_lakehouse()
Mixin->>PrepAct: convert parquet -> JSONL
PrepAct-->>Mixin: raw_lakehouse dir
Mixin->>LoadAct: submit load (raw)
LoadAct->>HealthChk: GET /actuator/health
HealthChk-->>LoadAct: healthy?
LoadAct->>MDLH: POST /load (JSONL pattern)
MDLH-->>LoadAct: 202 + jobId
LoadAct->>MDLH: GET /load/{jobId}/status (poll)
MDLH-->>LoadAct: COMPLETED
WF->>Mixin: load_transformed_to_lakehouse()
loop For each typename
Mixin->>LoadAct: submit load (transformed)
LoadAct->>HealthChk: GET /actuator/health
LoadAct->>MDLH: POST /load + poll
end
Findings
|
…-file processing, ensure base_dir exists
…ypename processing
… remove transformed loads - Rename convert_raw_parquet_to_jsonl → convert_raw_parquet_to_parquet - Replace Daft + orjson row loop with DuckDB to_json() for ~50x faster raw_record serialization (C++ vectorized, zero Python object creation) - Fix DuckDB SQL injection: escape column names in struct literals - Fix DuckDB connection leak: wrap in try/finally - Skip upload_prefix when no parquet files were produced - Remove load_transformed_to_lakehouse, resolve_iceberg_table, and LH_LOAD_TRANSFORMED_* constants — only raw lakehouse load for now
…ntirely if unavailable
RFC:
entity_rawNamespace — Per-Application Raw Data TablesPRs:
Author: Mrunmayi Tripathi
Date: 2026-03-23
Status: In Review
Problem
Today, the metadata lakehouse only stores transformed/enriched data in
entity_metadata. The raw records from source systems are discarded after transformation. This creates four key gaps:No auditability or compliance — we cannot trace back to exactly what the source system sent. If a customer disputes a metadata value, there's no source-of-truth to verify against. For regulated environments, there may be requirements to retain source data for audit trails — without raw storage, MDLH cannot serve as the system of record for metadata provenance.
No reprocessing or debugging — if transformation logic changes (bug fix, schema evolution, new fields), we cannot re-derive transformed data from source records. Re-extraction from the source is expensive and may not reproduce historical state. Investigating data quality issues (missing assets, wrong counts, stale data) also requires going back to the source system, which is slow, often requires customer credentials, and the source state may have changed since the original extraction.
No diffing — we cannot compare what changed between two extraction runs at the raw level. Identifying whether a data discrepancy is caused by a source-side change vs. a transformation bug is guesswork without access to both the before and after raw records.
No cross-connector visibility — there's no unified place to query raw data across connectors for features like raw-to-transformed lineage, coverage reports, or extraction health dashboards. Connector developers also have no visibility into what their extraction actually produced, independent of the transformation layer, making it hard to validate and debug connector output.
Proposal
Introduce a new Iceberg namespace
entity_rawwith one table per registered Application (e.g.entity_raw.snowflake,entity_raw.redshift).Each table stores the full raw record as a JSON string (
raw_recordcolumn) alongside common metadata columns (typename,connection_qualified_name,workflow_run_id,extracted_at,tenant_id). These metadata columns are intentionally aligned withentity_metadatafields so that raw and transformed data can be correlated with a simple equi-join — enabling debugging, diffing, and lineage across the two layers.Table names are not arbitrary — they must match a registered
Applicationentity in Atlas. MDLH proactively creates tables for all known applications at startup and every 10 minutes, and also validates on-demand creation requests against the Atlas application registry. This prevents namespace pollution while keeping the system self-service for onboarded connectors.The feature is opt-in per connector via a single environment variable (
ENABLE_LAKEHOUSE_LOAD=true). When disabled, behavior is unchanged — no raw data is written, no new API calls are made.This is a coordinated change across three repos:
entity_rawtables in Iceberg, validates/loadrequests against the Atlas application registryentity_rawschema and submit load jobs to MDLH via the/loadREST APIDesign
Schema
Namespace:
entity_raw| Table name: application name (e.g.snowflake,redshift)typenametable,column)connection_qualified_nameentity_metadata.connectionqualifiednameworkflow_idworkflow_run_identity_metadata.lastsyncrunextracted_attenant_identity_nameraw_recordRaw and transformed data can be correlated via:
Table Lifecycle
Tables are named after registered Application entities in Atlas.
Proactive (startup + every 10 min) — MDLH queries Atlas for all ACTIVE
Applicationentities and pre-creates a table for each. This runs during MDLH init (first install) and on every Notification Processor cycle (*/10 * * * *). Any new Application registered in Atlas gets its table within 10 minutes.Reactive (on
/loadrequest, guarded) — if a/loadrequest targets a non-existent table inentity_raw, MDLH checks whether the table name matches a registered Application. If yes, it auto-creates. If not, it rejects with a clear error listing valid applications. This handles race conditions where a new app sends data before the 10-minute scheduler catches up, while preventing arbitrary table names from polluting the namespace.End-to-End Data Flow
MDLH table management
Application SDK & Connector Integration
What the SDK does
The SDK adds two new Temporal activities available to all connectors:
prepare_raw_for_lakehouse— reads raw parquet files produced during extraction and wraps each row into theentity_rawschema as JSONL, adding metadata columns (typename,connection_qualified_name,workflow_run_id,extracted_at,tenant_id) alongside the original row as araw_recordJSON string.load_to_lakehouse— submits a load job to the MDLH/loadAPI with an S3 glob pattern, then polls the status endpoint until completion or terminal failure.Workflow execution order
Configuration
All lakehouse loading is controlled by environment variables — no code changes needed in connector apps beyond registering the two activities:
ENABLE_LAKEHOUSE_LOADfalseMDLH_BASE_URLhttp://lakehouse.atlas.svc.cluster.local:4541LH_LOAD_RAW_NAMESPACEentity_rawLH_LOAD_RAW_TABLE_NAMEAPPLICATION_NAMEredshift)LH_LOAD_RAW_MODEAPPENDLH_LOAD_TRANSFORMED_NAMESPACEentity_metadataLH_LOAD_TRANSFORMED_MODEAPPENDLH_LOAD_POLL_INTERVAL_SECONDS10LH_LOAD_MAX_POLL_ATTEMPTS360What connector apps need to do
Minimal — register the two SDK activities in the workflow's activity list and add a connection metadata normalizer to handle different connection object shapes. The Redshift app PR (#184) serves as the reference implementation.
Rollout
All changes are additive and idempotent. No migration needed.
ENABLE_LAKEHOUSE_LOAD=trueper-connector, per-environmentRollback
ENABLE_LAKEHOUSE_LOAD=false→ stops writing immediatelySecurity
/loadAPI requiresX-Atlan-Tenant-IdheaderOpen Questions
typename; add time-based partitioning onextracted_at?raw_recorduse ZSTD-compressed bytes instead of plain JSON?raw_recordto prevent extremely large JSON blobs?