Skip to content

Lakeflow Connect (LFC) demo + two dataflow_pipeline.py bug fixes#267

Merged
ravi-databricks merged 13 commits intodatabrickslabs:issue_266from
rsleedbx:lfc-data
Apr 10, 2026
Merged

Lakeflow Connect (LFC) demo + two dataflow_pipeline.py bug fixes#267
ravi-databricks merged 13 commits intodatabrickslabs:issue_266from
rsleedbx:lfc-data

Conversation

@rsleedbx
Copy link
Copy Markdown

@rsleedbx rsleedbx commented Mar 4, 2026

Lakeflow Connect (LFC) demo + three dataflow_pipeline.py fixes

This branch adds a full end-to-end demo of Lakeflow Connect → SDP-Meta bronze/silver pipelines
and fixes three bugs/feature gaps in src/databricks/labs/sdp_meta/dataflow_pipeline.py discovered
while building and testing the demo.


Bug fixes and feature gaps

1 — apply_changes_from_snapshot raises "Snapshot reader function not provided!" with snapshot_format: "delta" (#266)

write_layer_table() gated the apply_changes_from_snapshot() call solely on
self.next_snapshot_and_version. When source_details.snapshot_format: "delta" is
configured, is_create_view() correctly sets next_snapshot_and_version_from_source_view = True
and provides a DLT view as the snapshot source — but next_snapshot_and_version stays None,
so the gate always raised even though the source was fully configured.

One-line fix:

# before
if self.next_snapshot_and_version:
# after
if self.next_snapshot_and_version or self.next_snapshot_and_version_from_source_view:

2 — CDC is silently skipped when both dataQualityExpectations and cdcApplyChanges are set (#265)

When both fields were present, write_layer_table() called write_layer_with_dqe() and
returned early — cdc_apply_changes() was never reached. The two paths were mutually exclusive.

Fix: New write_layer_with_dqe_then_cdc() method that:

  1. Writes DQE-passing rows into an intermediate {table}_dq table.
  2. Runs create_auto_cdc_flow using {table}_dq as the stream source.

Supporting changes to enable this:

  • _get_target_table_info(suffix=None) — optional suffix for the _dq intermediate table name.
  • write_layer_with_dqe(dqe_only=False, suffix=None) — new parameters for combined path.
  • cdc_apply_changes(source_table=None) — optional source table override (uses view_name when None).

3 — Custom next_snapshot_and_version lambda cannot override the built-in snapshot_format: "delta" view path (#268)

When snapshot_format: "delta" was configured together with a custom next_snapshot_and_version
lambda, is_create_view() always registered a DLT view (the built-in path) and
apply_changes_from_snapshot() always used the view as the source — the custom lambda was
silently ignored. This made it impossible to inject version-aware snapshot logic (e.g. an O(1)
Delta version check to skip unchanged tables) on top of a snapshot_format: "delta" spec.

Fix: Custom lambda takes priority over the view path for snapshot specs:

# is_create_view() — skip view creation when a custom lambda is provided for a snapshot spec
if self.next_snapshot_and_version and getattr(self, "_is_snapshot_spec", False):
    return False   # lambda will be the DLT source directly

# apply_changes_from_snapshot() — prefer custom lambda over view name
source = (
    (lambda latest_snapshot_version: self.next_snapshot_and_version(
        latest_snapshot_version, self.dataflowSpec
    ))
    if self.next_snapshot_and_version   # custom lambda takes priority over view
    else self.view_name
)

The _is_snapshot_spec guard ensures non-snapshot specs (e.g. CDF streaming tables) are
unaffected — they always get a DLT view regardless.


New features

Lakeflow Connect demo (demo/)

A complete, script-driven demo that streams two SQL Server / MySQL / PostgreSQL tables through
Lakeflow Connect into SDP-Meta bronze and silver pipelines, validated across all three database
sources.

File Purpose
demo/launch_lfc_demo.py Single-command setup: creates UC resources, uploads config/notebooks/wheel, creates LFC + SDP-Meta jobs, triggers the run. Also handles incremental re-runs via --run_id.
demo/cleanup_lfc_demo.py Tears down all objects created for a given --run_id: jobs, DLT pipelines, UC schemas/volumes, workspace notebooks, LFC gateway/ingestion pipelines.
demo/lfcdemo-database.ipynb Notebook task that creates the LFC pipelines, waits for the initial full load, writes onboarding.json to UC Volume, and triggers the downstream SDP-Meta job.
demo/notebooks/lfc_runners/init_sdp_meta_pipeline.py DLT notebook: installs the sdp-meta wheel, registers a bronze_custom_transform / next_snapshot_and_version lambda that renames LFC reserved columns (__START_ATlfc_start_at, __END_ATlfc_end_at) for no-PK SCD Type 2 tables, then calls DataflowPipeline.invoke_dlt_pipeline.
demo/notebooks/lfc_runners/trigger_ingestion_and_wait.py Notebook task in the incremental job: triggers the LFC scheduler job (or falls back to starting the ingestion pipeline directly), polls until the update completes, then yields to bronze/silver tasks.
demo/check_run_summary.py Utility: queries bronze/silver table row counts and Delta history for a given run_id.

Source tables streamed:

Table LFC SCD type DLT-Meta approach Keys
intpk Type 1 — has PK pk readChangeFeed + bronze_cdc_apply_changes + DQE pk
dtix Type 2 — no PK, index on dt source_format: snapshot + apply_changes_from_snapshot dt, lfc_end_at

Key design decisions documented in docs/content/demo/LakeflowConnectDemo.md:

  • DLT globally reserves __START_AT / __END_AT for all APPLY CHANGES operations (not
    just SCD2). Any LFC SCD2 source table carrying these columns must rename them before DLT
    analyses the schema. init_sdp_meta_pipeline.py performs this rename via either a
    bronze_custom_transform (full-scan path) or a next_snapshot_and_version lambda (CDF path).
  • For no-PK tables, (dt, lfc_start_at) is non-unique because multiple initial-load rows share
    the same dt and null __START_AT. LFC always assigns a unique __END_AT.__cdc_internal_value
    per row (encodes CDC log position + per-row sequence number), making (dt, lfc_end_at) the
    correct composite key.
  • lfcdemo-database.ipynb triggers the downstream SDP-Meta job
    (onboarding_job → bronze_dlt → silver_dlt) automatically so the full pipeline runs
    end-to-end without manual intervention.

Performance — apply_changes_from_snapshot at scale:

When source is a view name (built-in snapshot_format: "delta" path), DLT reads the entire
source table on every pipeline trigger
. For production-scale SCD2 tables the recommended path
is to supply a custom next_snapshot_and_version lambda (enabled by fix #3 above) that uses
Delta CDF internally to return only changed rows. The --snapshot_method=cdf flag in
launch_lfc_demo.py activates this optimised path.

--snapshot_method flag for launch_lfc_demo.py

Controls how the dtix (LFC SCD2, no-PK) table is processed by the bronze DLT pipeline:

Value Behaviour
cdf (default) Custom next_snapshot_and_version lambda. Checks the Delta table version first (O(1)); skips the pipeline run entirely when nothing changed, otherwise reads the full table.
full Built-in view-based apply_changes_from_snapshot. Reads and materialises the full source table on every trigger (O(n) always).

The value is passed as Spark conf dtix_snapshot_method to the bronze DLT pipeline and read by
init_sdp_meta_pipeline.py.

--sequence_by_pk flag for launch_lfc_demo.py

Allows the silver intpk CDC sequence_by column to be switched from dt (default) to pk,
useful when the source primary key is monotonically increasing and dt may have ties.

Incremental re-trigger support

python demo/launch_lfc_demo.py --profile=e2demofe --run_id=<run_id>

Re-uses all objects from the original setup run, re-uploads the latest notebooks, creates (or
reuses) an incremental job, triggers the LFC ingestion pipeline, and waits before firing
bronze/silver.


Documentation

File Content
docs/content/demo/LakeflowConnectDemo.md Full walkthrough: LFC + SDP-Meta architecture, SCD1/SCD2 patterns, reserved column problem and solution, no-PK key selection (lfc_end_at), CDF-based O(1) version check, DQE+CDC combined usage, history of approaches tried
.cursor/skills/databricks-job-monitor/SKILL.md Agent skill: monitoring job/pipeline runs, error diagnosis playbook, timing guide, test-cycle flow, cleanup instructions, incremental test procedure

Files changed

Area Files
Bug fixes / features src/databricks/labs/sdp_meta/dataflow_pipeline.py
New demo demo/launch_lfc_demo.py, demo/cleanup_lfc_demo.py, demo/lfcdemo-database.ipynb, demo/notebooks/lfc_runners/init_sdp_meta_pipeline.py, demo/notebooks/lfc_runners/trigger_ingestion_and_wait.py, demo/check_run_summary.py
Docs docs/content/demo/LakeflowConnectDemo.md
Agent skill .cursor/skills/databricks-job-monitor/SKILL.md
Integration tests integration_tests/run_integration_tests.py (shared get_workspace_api_client + --snapshot_method CLI flag)

Test plan

  • Fresh setup run (SQL Server): setup job (~1 hr) + downstream job (~10 min) both SUCCESS; bronze and silver tables populated for intpk and dtix — verified across all three database sources
  • Incremental run (MySQL): launch_lfc_demo.py --run_id=<run_id> — LFC ingestion triggered, trigger_ingestion_and_wait + bronze_dlt + silver_dlt all SUCCESS
  • --snapshot_method=cdf (default): O(1) version check lambda active for dtix; bronze and silver row counts match
  • Cleanup: cleanup_lfc_demo.py --run_id=<run_id> --include-all-lfc-pipelines — all schemas, pipelines, jobs, and workspace directories removed

Test results — all three database sources (initial downstream + incremental):

DB Initial downstream Incremental bronze.intpk bronze.dtix
SQL Server SUCCESS SUCCESS 4,894 rows 1,500 rows
MySQL SUCCESS SUCCESS 3 rows 81 rows
PostgreSQL SUCCESS SUCCESS 26,143 rows 3,981 rows

Bronze and silver row counts match on every run. DESCRIBE HISTORY shows MERGE operations
at each update — confirming CDC apply_changes (intpk) and apply_changes_from_snapshot
(dtix) are both writing correctly.

rsleedbx and others added 11 commits March 3, 2026 16:36
…data generation

- Multi-section YAML support for enhanced dlt-meta functionality
- Synthetic data generation using dbldatagen with proper API
- Lakeflow Connect integration for database ingestion
- Complete examples with variables, transformations, and dataflows
- Enhanced CLI commands for single-file configuration

Co-authored-by: Cursor <cursoragent@cursor.com>
rsleedbx added 2 commits March 4, 2026 14:32
Delete 17 files that were not part of the LFC demo or main sdp_meta
package and were causing the CI lint step to fail:

Orphaned enhanced-CLI subsystem (never referenced by demo or docs):
  - src/enhanced_cli.py, src/lakeflow_connect.py, src/synthetic_data.py
  - src/archive/ (lakeflow_connect_specs, postgres_slot_manager,
    synthetic_data_notebook, __init__)
  - demo_enhanced_cli.py, test_enhanced_cli.py, bin/dlt-meta-enhanced
  - IMPLEMENTATION_SUMMARY.md, docs/dlt-meta-dab.md, docs/dbldatagen-yaml.md

Draft / planning / stale docs:
  - docs/content/demo/scdtype2as head.md (superseded draft)
  - docs/content/demo/LakeflowConnectMasterPlan.md (planning doc)
  - demo/notebooks/lfcdemo_lakeflow_connect.ipynb (old approach notebook)
  - demo/notebooks/synthetic_data.ipynb (enhanced-CLI notebook)

Fix remaining flake8 E241/E221/E261/E302/E305/W293/E501/F841 errors in
demo/launch_lfc_demo.py, demo/cleanup_lfc_demo.py,
demo/check_run_summary.py, integration_tests/run_integration_tests.py,
and src/databricks/labs/sdp_meta/pipeline_readers.py.
@rsleedbx rsleedbx marked this pull request as ready for review March 4, 2026 22:02
Comment thread demo_enhanced_cli.py Outdated
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we should move this under demo folder

Comment thread test_enhanced_cli.py Outdated
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

move this file to test folder.

Comment thread bin/dlt-meta-enhanced Outdated
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why do we need bin/dlt-meta-enhanced?

Comment thread src/archive/lakeflow_connect_specs.py Outdated
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

do we need files inside archive directory?

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

remove cursor skills

@ravi-databricks ravi-databricks merged commit 51c97b6 into databrickslabs:issue_266 Apr 10, 2026
1 check passed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

bug Something isn't working

Projects

None yet

2 participants