From c090fbe5b95c342db3f99be281087df4652847fe Mon Sep 17 00:00:00 2001 From: Rob Reeves Date: Tue, 3 Mar 2026 15:42:00 -0800 Subject: [PATCH 01/13] Wire branch support through OpenHouseDataLoader to PyIceberg scan MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit The branch parameter was accepted but silently ignored — reads always hit the main branch. This wires it through via scan().use_ref(branch) so branch-based reads work. Also resolves the snapshot_id property to the branch tip and validates that branch and snapshot_id are mutually exclusive. Co-Authored-By: Claude Opus 4.6 --- .../src/openhouse/dataloader/data_loader.py | 15 +++- .../dataloader/tests/test_data_loader.py | 71 +++++++++++++++++++ 2 files changed, 84 insertions(+), 2 deletions(-) diff --git a/integrations/python/dataloader/src/openhouse/dataloader/data_loader.py b/integrations/python/dataloader/src/openhouse/dataloader/data_loader.py index fdeb2fcde..eed27978b 100644 --- a/integrations/python/dataloader/src/openhouse/dataloader/data_loader.py +++ b/integrations/python/dataloader/src/openhouse/dataloader/data_loader.py @@ -91,6 +91,8 @@ def __init__( context: Data loader context max_attempts: Total number of attempts including the initial try (default 3) """ + if branch and snapshot_id is not None: + raise ValueError("Cannot specify both branch and snapshot_id") self._catalog = catalog self._table_id = TableIdentifier(database, table, branch) self._snapshot_id = snapshot_id @@ -115,7 +117,14 @@ def table_properties(self) -> Mapping[str, str]: @cached_property def snapshot_id(self) -> int | None: """Snapshot ID of the loaded table, or None if the table has no snapshots""" - return self._snapshot_id if self._snapshot_id is not None else self._iceberg_table.metadata.current_snapshot_id + if self._snapshot_id is not None: + return self._snapshot_id + if self._table_id.branch: + snapshot = self._iceberg_table.snapshot_by_name(self._table_id.branch) + if snapshot is None: + raise ValueError(f"Branch '{self._table_id.branch}' not found for table {self._table_id}") + return snapshot.snapshot_id + return self._iceberg_table.metadata.current_snapshot_id def _verify_snapshot(self, snapshot: Snapshot | None) -> None: """Log the resolved snapshot or raise if a user-provided snapshot_id was not found.""" @@ -137,12 +146,14 @@ def __iter__(self) -> Iterator[DataLoaderSplit]: row_filter = _to_pyiceberg(self._filters) scan_kwargs: dict = {"row_filter": row_filter} - if self.snapshot_id is not None: + if self._table_id.branch is None and self.snapshot_id is not None: scan_kwargs["snapshot_id"] = self.snapshot_id if self._columns: scan_kwargs["selected_fields"] = tuple(self._columns) scan = table.scan(**scan_kwargs) + if self._table_id.branch: + scan = scan.use_ref(self._table_id.branch) self._verify_snapshot(scan.snapshot()) diff --git a/integrations/python/dataloader/tests/test_data_loader.py b/integrations/python/dataloader/tests/test_data_loader.py index b092c1c0d..aff41c7b3 100644 --- a/integrations/python/dataloader/tests/test_data_loader.py +++ b/integrations/python/dataloader/tests/test_data_loader.py @@ -96,6 +96,7 @@ def fake_scan(**kwargs): scan = MagicMock() scan.projection.return_value = projected scan.plan_files.return_value = [task] + scan.use_ref.return_value = scan return scan mock_table = MagicMock() @@ -322,3 +323,73 @@ def test_snapshot_id_with_columns_and_filters(tmp_path): assert scan_kwargs["snapshot_id"] == 99 assert scan_kwargs["selected_fields"] == (COL_ID,) assert "row_filter" in scan_kwargs + + +# --- branch tests --- + + +def test_branch_calls_use_ref(tmp_path): + """scan.use_ref() is called when branch is set.""" + catalog = _make_real_catalog(tmp_path) + mock_table = catalog.load_table.return_value + original_side_effect = mock_table.scan.side_effect + scans: list = [] + + def capturing_scan(**kwargs): + scan = original_side_effect(**kwargs) + scans.append(scan) + return scan + + mock_table.scan.side_effect = capturing_scan + + loader = OpenHouseDataLoader(catalog=catalog, database="db", table="tbl", branch="my-branch") + list(loader) + + mock_table.scan.assert_called_once() + assert len(scans) == 1 + scans[0].use_ref.assert_called_once_with("my-branch") + + +def test_branch_and_snapshot_id_raises(): + """ValueError is raised when both branch and snapshot_id are provided.""" + catalog = MagicMock() + + with pytest.raises(ValueError, match="Cannot specify both branch and snapshot_id"): + OpenHouseDataLoader(catalog=catalog, database="db", table="tbl", branch="b", snapshot_id=42) + + +def test_branch_snapshot_id_resolves(): + """snapshot_id property resolves via snapshot_by_name when branch is set.""" + catalog = MagicMock() + mock_snapshot = MagicMock() + mock_snapshot.snapshot_id = 123 + catalog.load_table.return_value.snapshot_by_name.return_value = mock_snapshot + + loader = OpenHouseDataLoader(catalog=catalog, database="db", table="tbl", branch="my-branch") + + assert loader.snapshot_id == 123 + catalog.load_table.return_value.snapshot_by_name.assert_called_once_with("my-branch") + + +def test_branch_snapshot_id_not_found_raises(): + """ValueError is raised when branch does not exist in table metadata.""" + catalog = MagicMock() + catalog.load_table.return_value.snapshot_by_name.return_value = None + + loader = OpenHouseDataLoader(catalog=catalog, database="db", table="tbl", branch="missing") + + with pytest.raises(ValueError, match="Branch 'missing' not found"): + _ = loader.snapshot_id + + +def test_branch_snapshot_id_not_passed_to_scan(tmp_path): + """snapshot_id is not in scan kwargs when branch is used.""" + catalog = _make_real_catalog(tmp_path) + mock_table = catalog.load_table.return_value + + loader = OpenHouseDataLoader(catalog=catalog, database="db", table="tbl", branch="my-branch") + list(loader) + + mock_table.scan.assert_called_once() + scan_kwargs = mock_table.scan.call_args.kwargs + assert "snapshot_id" not in scan_kwargs From a58eb7e5e34eca2f02e4aa826cb81247c0e2d8ab Mon Sep 17 00:00:00 2001 From: Rob Reeves Date: Tue, 3 Mar 2026 15:57:41 -0800 Subject: [PATCH 02/13] Simplify branch scan: resolve via snapshot_id property The snapshot_id property already resolves the branch tip, so pass the resolved ID to scan() directly. Removes the use_ref() call and the branch guard on scan kwargs. Co-Authored-By: Claude Opus 4.6 --- .../src/openhouse/dataloader/data_loader.py | 4 +-- .../dataloader/tests/test_data_loader.py | 32 ++++--------------- 2 files changed, 7 insertions(+), 29 deletions(-) diff --git a/integrations/python/dataloader/src/openhouse/dataloader/data_loader.py b/integrations/python/dataloader/src/openhouse/dataloader/data_loader.py index eed27978b..f44b3042e 100644 --- a/integrations/python/dataloader/src/openhouse/dataloader/data_loader.py +++ b/integrations/python/dataloader/src/openhouse/dataloader/data_loader.py @@ -146,14 +146,12 @@ def __iter__(self) -> Iterator[DataLoaderSplit]: row_filter = _to_pyiceberg(self._filters) scan_kwargs: dict = {"row_filter": row_filter} - if self._table_id.branch is None and self.snapshot_id is not None: + if self.snapshot_id is not None: scan_kwargs["snapshot_id"] = self.snapshot_id if self._columns: scan_kwargs["selected_fields"] = tuple(self._columns) scan = table.scan(**scan_kwargs) - if self._table_id.branch: - scan = scan.use_ref(self._table_id.branch) self._verify_snapshot(scan.snapshot()) diff --git a/integrations/python/dataloader/tests/test_data_loader.py b/integrations/python/dataloader/tests/test_data_loader.py index aff41c7b3..910e79c9a 100644 --- a/integrations/python/dataloader/tests/test_data_loader.py +++ b/integrations/python/dataloader/tests/test_data_loader.py @@ -96,7 +96,6 @@ def fake_scan(**kwargs): scan = MagicMock() scan.projection.return_value = projected scan.plan_files.return_value = [task] - scan.use_ref.return_value = scan return scan mock_table = MagicMock() @@ -328,28 +327,6 @@ def test_snapshot_id_with_columns_and_filters(tmp_path): # --- branch tests --- -def test_branch_calls_use_ref(tmp_path): - """scan.use_ref() is called when branch is set.""" - catalog = _make_real_catalog(tmp_path) - mock_table = catalog.load_table.return_value - original_side_effect = mock_table.scan.side_effect - scans: list = [] - - def capturing_scan(**kwargs): - scan = original_side_effect(**kwargs) - scans.append(scan) - return scan - - mock_table.scan.side_effect = capturing_scan - - loader = OpenHouseDataLoader(catalog=catalog, database="db", table="tbl", branch="my-branch") - list(loader) - - mock_table.scan.assert_called_once() - assert len(scans) == 1 - scans[0].use_ref.assert_called_once_with("my-branch") - - def test_branch_and_snapshot_id_raises(): """ValueError is raised when both branch and snapshot_id are provided.""" catalog = MagicMock() @@ -382,14 +359,17 @@ def test_branch_snapshot_id_not_found_raises(): _ = loader.snapshot_id -def test_branch_snapshot_id_not_passed_to_scan(tmp_path): - """snapshot_id is not in scan kwargs when branch is used.""" +def test_branch_resolved_snapshot_id_passed_to_scan(tmp_path): + """Resolved branch snapshot_id is passed to scan kwargs.""" catalog = _make_real_catalog(tmp_path) mock_table = catalog.load_table.return_value + mock_snapshot = MagicMock() + mock_snapshot.snapshot_id = 456 + mock_table.snapshot_by_name.return_value = mock_snapshot loader = OpenHouseDataLoader(catalog=catalog, database="db", table="tbl", branch="my-branch") list(loader) mock_table.scan.assert_called_once() scan_kwargs = mock_table.scan.call_args.kwargs - assert "snapshot_id" not in scan_kwargs + assert scan_kwargs["snapshot_id"] == 456 From ae4a66d31e426dc1539a0f6dad8ebbb5cc3ff4da Mon Sep 17 00:00:00 2001 From: Rob Reeves Date: Tue, 3 Mar 2026 16:06:16 -0800 Subject: [PATCH 03/13] Replace mock-inspection branch test with behavioral test Instead of asserting on scan kwargs, set up two parquet files with different data and verify the loader returns data from the branch's snapshot. Co-Authored-By: Claude Opus 4.6 --- .../dataloader/tests/test_data_loader.py | 68 ++++++++++++++++--- 1 file changed, 57 insertions(+), 11 deletions(-) diff --git a/integrations/python/dataloader/tests/test_data_loader.py b/integrations/python/dataloader/tests/test_data_loader.py index 910e79c9a..80898d5e4 100644 --- a/integrations/python/dataloader/tests/test_data_loader.py +++ b/integrations/python/dataloader/tests/test_data_loader.py @@ -52,9 +52,9 @@ def test_package_imports(): } -def _write_parquet(tmp_path, data: dict) -> str: +def _write_parquet(tmp_path, data: dict, filename: str = "test.parquet") -> str: """Write a Parquet file with Iceberg field IDs in column metadata.""" - file_path = str(tmp_path / "test.parquet") + file_path = str(tmp_path / filename) table = pa.table(data) fields = [field.with_metadata({b"PARQUET:field_id": str(i + 1).encode()}) for i, field in enumerate(table.schema)] pq.write_table(table.cast(pa.schema(fields)), file_path) @@ -359,17 +359,63 @@ def test_branch_snapshot_id_not_found_raises(): _ = loader.snapshot_id -def test_branch_resolved_snapshot_id_passed_to_scan(tmp_path): - """Resolved branch snapshot_id is passed to scan kwargs.""" - catalog = _make_real_catalog(tmp_path) - mock_table = catalog.load_table.return_value +def test_branch_reads_data_from_branch_snapshot(tmp_path): + """Branch reads return data from the branch's snapshot, not the main snapshot.""" + main_data = {COL_ID: [1, 2], COL_NAME: ["alice", "bob"], COL_VALUE: [1.1, 2.2]} + branch_data = {COL_ID: [10, 20, 30], COL_NAME: ["x", "y", "z"], COL_VALUE: [10.0, 20.0, 30.0]} + + main_path = _write_parquet(tmp_path, main_data, "main.parquet") + branch_path = _write_parquet(tmp_path, branch_data, "branch.parquet") + + metadata = new_table_metadata( + schema=TEST_SCHEMA, + partition_spec=UNPARTITIONED_PARTITION_SPEC, + sort_order=UNSORTED_SORT_ORDER, + location=str(tmp_path), + properties={}, + ) + io = load_file_io(properties={}, location=main_path) + + main_snapshot_id = 100 + branch_snapshot_id = 200 + + def _make_task(path): + data_file = DataFile.from_args( + file_path=path, + file_format=FileFormat.PARQUET, + record_count=len(next(iter(main_data.values()))), + file_size_in_bytes=os.path.getsize(path), + ) + data_file._spec_id = 0 + return FileScanTask(data_file=data_file) + + tasks_by_snapshot = { + main_snapshot_id: _make_task(main_path), + branch_snapshot_id: _make_task(branch_path), + } + + def fake_scan(**kwargs): + scan = MagicMock() + scan.projection.return_value = TEST_SCHEMA + scan.plan_files.return_value = [tasks_by_snapshot[kwargs["snapshot_id"]]] + return scan + + mock_table = MagicMock() + mock_table.metadata = metadata + mock_table.io = io + mock_table.scan.side_effect = fake_scan + mock_snapshot = MagicMock() - mock_snapshot.snapshot_id = 456 + mock_snapshot.snapshot_id = branch_snapshot_id mock_table.snapshot_by_name.return_value = mock_snapshot + catalog = MagicMock() + catalog.load_table.return_value = mock_table + loader = OpenHouseDataLoader(catalog=catalog, database="db", table="tbl", branch="my-branch") - list(loader) + result = _materialize(loader) - mock_table.scan.assert_called_once() - scan_kwargs = mock_table.scan.call_args.kwargs - assert scan_kwargs["snapshot_id"] == 456 + assert result.num_rows == 3 + result = result.sort_by(COL_ID) + assert result.column(COL_ID).to_pylist() == branch_data[COL_ID] + assert result.column(COL_NAME).to_pylist() == branch_data[COL_NAME] From b7680533ecfbba73c5efa51034ca6dde8bdac566 Mon Sep 17 00:00:00 2001 From: Rob Reeves Date: Tue, 3 Mar 2026 16:32:02 -0800 Subject: [PATCH 04/13] Add branch integration tests against Docker OpenHouse Test that different branches return data from their respective snapshots, that a branch with multiple snapshots in its lineage reads from the latest, and that a missing branch raises ValueError. Co-Authored-By: Claude Opus 4.6 --- .../dataloader/tests/integration_tests.py | 55 +++++++++++++++++++ 1 file changed, 55 insertions(+) diff --git a/integrations/python/dataloader/tests/integration_tests.py b/integrations/python/dataloader/tests/integration_tests.py index 9d859ca32..e3d1b6c36 100644 --- a/integrations/python/dataloader/tests/integration_tests.py +++ b/integrations/python/dataloader/tests/integration_tests.py @@ -14,6 +14,9 @@ import pytest import requests from pyiceberg.exceptions import NoSuchTableError +from pyiceberg.io import load_file_io +from pyiceberg.serializers import ToOutputFile +from pyiceberg.table.refs import SnapshotRef, SnapshotRefType from openhouse.dataloader import OpenHouseDataLoader from openhouse.dataloader.catalog import OpenHouseCatalog @@ -115,6 +118,23 @@ def _read_all(loader: OpenHouseDataLoader) -> pa.Table: return pa.concat_tables([pa.Table.from_batches([b]) for b in batches]).sort_by(COL_ID) +def _create_branch(catalog: OpenHouseCatalog, branch_name: str, snapshot_id: int) -> None: + """Create a named branch on a table by writing updated metadata with the new ref. + + Spark 3.1 does not support ALTER TABLE ... CREATE BRANCH, so we manipulate + the Iceberg metadata directly: load metadata, add a branch SnapshotRef, and + write the updated metadata back to the same location. + """ + table = catalog.load_table(f"{DATABASE_ID}.{TABLE_ID}") + metadata = table.metadata + branch_ref = SnapshotRef(snapshot_id=snapshot_id, snapshot_ref_type=SnapshotRefType.BRANCH) + new_refs = {**metadata.refs, branch_name: branch_ref} + updated_metadata = metadata.model_copy(update={"refs": new_refs}) + io = load_file_io(properties=catalog.properties, location=table.metadata_location) + ToOutputFile.table_metadata(updated_metadata, io.new_output(table.metadata_location), overwrite=True) + print(f" Created branch '{branch_name}' at snapshot {snapshot_id}") + + def read_token() -> str: """Read auth token from OH_TOKEN env var or file argument.""" token = os.environ.get("OH_TOKEN") @@ -224,6 +244,41 @@ def read_token() -> str: list(loader) print("PASS: invalid snapshot_id raised ValueError") + # 8. Branch tests — create branch_a at snap1, branch_b at snap2 + _create_branch(catalog, "branch_a", snap1) + _create_branch(catalog, "branch_b", snap2) + + # 8a. branch_a returns only snap1 data (3 rows) + loader = OpenHouseDataLoader(catalog=catalog, database=DATABASE_ID, table=TABLE_ID, branch="branch_a") + result = _read_all(loader) + assert result.num_rows == 3 + assert result.column(COL_ID).to_pylist() == [1, 2, 3] + assert result.column(COL_NAME).to_pylist() == ["alice", "bob", "charlie"] + print(f"PASS: branch_a returned {result.num_rows} rows (snap1 data)") + + # 8b. branch_b returns all 4 rows (snap2 data) + loader = OpenHouseDataLoader(catalog=catalog, database=DATABASE_ID, table=TABLE_ID, branch="branch_b") + result = _read_all(loader) + assert result.num_rows == 4 + assert result.column(COL_ID).to_pylist() == [1, 2, 3, 4] + assert result.column(COL_NAME).to_pylist() == ["alice", "bob", "charlie", "diana"] + print(f"PASS: branch_b returned {result.num_rows} rows (snap2 data)") + + # 8c. Branch resolves to the correct snapshot_id + loader = OpenHouseDataLoader(catalog=catalog, database=DATABASE_ID, table=TABLE_ID, branch="branch_a") + assert loader.snapshot_id == snap1, f"Expected branch_a snapshot_id={snap1}, got {loader.snapshot_id}" + loader = OpenHouseDataLoader(catalog=catalog, database=DATABASE_ID, table=TABLE_ID, branch="branch_b") + assert loader.snapshot_id == snap2, f"Expected branch_b snapshot_id={snap2}, got {loader.snapshot_id}" + print("PASS: branches resolve to correct snapshot IDs") + + # 8d. Non-existent branch raises ValueError + with pytest.raises(ValueError, match="Branch .* not found"): + loader = OpenHouseDataLoader( + catalog=catalog, database=DATABASE_ID, table=TABLE_ID, branch="nonexistent" + ) + list(loader) + print("PASS: non-existent branch raised ValueError") + finally: livy.execute(f"DROP TABLE IF EXISTS {FQTN}") From 5287a2d60b0b34f960fb7b9191a7654d072b99f2 Mon Sep 17 00:00:00 2001 From: Rob Reeves Date: Wed, 4 Mar 2026 08:53:10 -0800 Subject: [PATCH 05/13] Tighten branch test mocks to match on exact branch name Use side_effect lambdas so snapshot_by_name only returns a snapshot for the specific branch name, returning None for anything else. Co-Authored-By: Claude Opus 4.6 --- .../python/dataloader/tests/test_data_loader.py | 13 ++++++------- 1 file changed, 6 insertions(+), 7 deletions(-) diff --git a/integrations/python/dataloader/tests/test_data_loader.py b/integrations/python/dataloader/tests/test_data_loader.py index 80898d5e4..8229535e5 100644 --- a/integrations/python/dataloader/tests/test_data_loader.py +++ b/integrations/python/dataloader/tests/test_data_loader.py @@ -340,18 +340,17 @@ def test_branch_snapshot_id_resolves(): catalog = MagicMock() mock_snapshot = MagicMock() mock_snapshot.snapshot_id = 123 - catalog.load_table.return_value.snapshot_by_name.return_value = mock_snapshot + catalog.load_table.return_value.snapshot_by_name.side_effect = lambda name: mock_snapshot if name == "my-branch" else None loader = OpenHouseDataLoader(catalog=catalog, database="db", table="tbl", branch="my-branch") assert loader.snapshot_id == 123 - catalog.load_table.return_value.snapshot_by_name.assert_called_once_with("my-branch") def test_branch_snapshot_id_not_found_raises(): """ValueError is raised when branch does not exist in table metadata.""" catalog = MagicMock() - catalog.load_table.return_value.snapshot_by_name.return_value = None + catalog.load_table.return_value.snapshot_by_name.side_effect = lambda name: None loader = OpenHouseDataLoader(catalog=catalog, database="db", table="tbl", branch="missing") @@ -400,14 +399,14 @@ def fake_scan(**kwargs): scan.plan_files.return_value = [tasks_by_snapshot[kwargs["snapshot_id"]]] return scan + mock_snapshot = MagicMock() + mock_snapshot.snapshot_id = branch_snapshot_id + mock_table = MagicMock() mock_table.metadata = metadata mock_table.io = io mock_table.scan.side_effect = fake_scan - - mock_snapshot = MagicMock() - mock_snapshot.snapshot_id = branch_snapshot_id - mock_table.snapshot_by_name.return_value = mock_snapshot + mock_table.snapshot_by_name.side_effect = lambda name: mock_snapshot if name == "my-branch" else None catalog = MagicMock() catalog.load_table.return_value = mock_table From 5cd06d8a8df7a8276fc93f1453662fdcd4ce046c Mon Sep 17 00:00:00 2001 From: Rob Reeves Date: Tue, 10 Mar 2026 12:04:47 -0700 Subject: [PATCH 06/13] Use Spark branch writes instead of metadata manipulation in integration tests Write to branches via Spark SQL (INSERT INTO table.branch_name) instead of directly manipulating Iceberg metadata files. This matches how users actually create and populate branches. Co-Authored-By: Claude Opus 4.6 --- .../dataloader/tests/integration_tests.py | 72 +++++++++---------- 1 file changed, 32 insertions(+), 40 deletions(-) diff --git a/integrations/python/dataloader/tests/integration_tests.py b/integrations/python/dataloader/tests/integration_tests.py index e3d1b6c36..ad44d5bc7 100644 --- a/integrations/python/dataloader/tests/integration_tests.py +++ b/integrations/python/dataloader/tests/integration_tests.py @@ -14,9 +14,6 @@ import pytest import requests from pyiceberg.exceptions import NoSuchTableError -from pyiceberg.io import load_file_io -from pyiceberg.serializers import ToOutputFile -from pyiceberg.table.refs import SnapshotRef, SnapshotRefType from openhouse.dataloader import OpenHouseDataLoader from openhouse.dataloader.catalog import OpenHouseCatalog @@ -118,23 +115,6 @@ def _read_all(loader: OpenHouseDataLoader) -> pa.Table: return pa.concat_tables([pa.Table.from_batches([b]) for b in batches]).sort_by(COL_ID) -def _create_branch(catalog: OpenHouseCatalog, branch_name: str, snapshot_id: int) -> None: - """Create a named branch on a table by writing updated metadata with the new ref. - - Spark 3.1 does not support ALTER TABLE ... CREATE BRANCH, so we manipulate - the Iceberg metadata directly: load metadata, add a branch SnapshotRef, and - write the updated metadata back to the same location. - """ - table = catalog.load_table(f"{DATABASE_ID}.{TABLE_ID}") - metadata = table.metadata - branch_ref = SnapshotRef(snapshot_id=snapshot_id, snapshot_ref_type=SnapshotRefType.BRANCH) - new_refs = {**metadata.refs, branch_name: branch_ref} - updated_metadata = metadata.model_copy(update={"refs": new_refs}) - io = load_file_io(properties=catalog.properties, location=table.metadata_location) - ToOutputFile.table_metadata(updated_metadata, io.new_output(table.metadata_location), overwrite=True) - print(f" Created branch '{branch_name}' at snapshot {snapshot_id}") - - def read_token() -> str: """Read auth token from OH_TOKEN env var or file argument.""" token = os.environ.get("OH_TOKEN") @@ -244,34 +224,46 @@ def read_token() -> str: list(loader) print("PASS: invalid snapshot_id raised ValueError") - # 8. Branch tests — create branch_a at snap1, branch_b at snap2 - _create_branch(catalog, "branch_a", snap1) - _create_branch(catalog, "branch_b", snap2) + # 8. Branch tests — write to branches via Spark, then read via DataLoader + livy.execute(f"INSERT INTO {FQTN}.branch_a VALUES (5, 'eve', 5.5)") + livy.execute(f"INSERT INTO {FQTN}.branch_b VALUES (6, 'frank', 6.6), (7, 'grace', 7.7)") - # 8a. branch_a returns only snap1 data (3 rows) + # 8a. branch_a has main data (4 rows) + 1 branch-only row = 5 rows loader = OpenHouseDataLoader(catalog=catalog, database=DATABASE_ID, table=TABLE_ID, branch="branch_a") result = _read_all(loader) - assert result.num_rows == 3 - assert result.column(COL_ID).to_pylist() == [1, 2, 3] - assert result.column(COL_NAME).to_pylist() == ["alice", "bob", "charlie"] - print(f"PASS: branch_a returned {result.num_rows} rows (snap1 data)") + assert result.num_rows == 5 + assert result.column(COL_ID).to_pylist() == [1, 2, 3, 4, 5] + assert result.column(COL_NAME).to_pylist() == ["alice", "bob", "charlie", "diana", "eve"] + print(f"PASS: branch_a returned {result.num_rows} rows (main + branch-only data)") - # 8b. branch_b returns all 4 rows (snap2 data) + # 8b. branch_b has main data (4 rows) + 2 branch-only rows = 6 rows loader = OpenHouseDataLoader(catalog=catalog, database=DATABASE_ID, table=TABLE_ID, branch="branch_b") result = _read_all(loader) + assert result.num_rows == 6 + assert result.column(COL_ID).to_pylist() == [1, 2, 3, 4, 6, 7] + assert result.column(COL_NAME).to_pylist() == ["alice", "bob", "charlie", "diana", "frank", "grace"] + print(f"PASS: branch_b returned {result.num_rows} rows (main + branch-only data)") + + # 8c. Main is unaffected by branch writes + loader = OpenHouseDataLoader(catalog=catalog, database=DATABASE_ID, table=TABLE_ID) + result = _read_all(loader) assert result.num_rows == 4 assert result.column(COL_ID).to_pylist() == [1, 2, 3, 4] - assert result.column(COL_NAME).to_pylist() == ["alice", "bob", "charlie", "diana"] - print(f"PASS: branch_b returned {result.num_rows} rows (snap2 data)") - - # 8c. Branch resolves to the correct snapshot_id - loader = OpenHouseDataLoader(catalog=catalog, database=DATABASE_ID, table=TABLE_ID, branch="branch_a") - assert loader.snapshot_id == snap1, f"Expected branch_a snapshot_id={snap1}, got {loader.snapshot_id}" - loader = OpenHouseDataLoader(catalog=catalog, database=DATABASE_ID, table=TABLE_ID, branch="branch_b") - assert loader.snapshot_id == snap2, f"Expected branch_b snapshot_id={snap2}, got {loader.snapshot_id}" - print("PASS: branches resolve to correct snapshot IDs") - - # 8d. Non-existent branch raises ValueError + print(f"PASS: main still has {result.num_rows} rows (unaffected by branch writes)") + + # 8d. Each branch has its own snapshot_id, different from main + snap_a = OpenHouseDataLoader( + catalog=catalog, database=DATABASE_ID, table=TABLE_ID, branch="branch_a" + ).snapshot_id + snap_b = OpenHouseDataLoader( + catalog=catalog, database=DATABASE_ID, table=TABLE_ID, branch="branch_b" + ).snapshot_id + assert snap_a != snap2, f"Expected branch_a snapshot != main, got {snap_a}" + assert snap_b != snap2, f"Expected branch_b snapshot != main, got {snap_b}" + assert snap_a != snap_b, f"Expected different branch snapshots, got {snap_a}" + print("PASS: branches have distinct snapshot IDs") + + # 8e. Non-existent branch raises ValueError with pytest.raises(ValueError, match="Branch .* not found"): loader = OpenHouseDataLoader( catalog=catalog, database=DATABASE_ID, table=TABLE_ID, branch="nonexistent" From 93724a8f722e4be0ab0612a2973fc26e66fad26e Mon Sep 17 00:00:00 2001 From: Rob Reeves Date: Tue, 10 Mar 2026 14:27:09 -0700 Subject: [PATCH 07/13] Fix ruff line-too-long in branch test mock setup Co-Authored-By: Claude Opus 4.6 --- integrations/python/dataloader/tests/test_data_loader.py | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/integrations/python/dataloader/tests/test_data_loader.py b/integrations/python/dataloader/tests/test_data_loader.py index 8229535e5..9041e567f 100644 --- a/integrations/python/dataloader/tests/test_data_loader.py +++ b/integrations/python/dataloader/tests/test_data_loader.py @@ -340,7 +340,9 @@ def test_branch_snapshot_id_resolves(): catalog = MagicMock() mock_snapshot = MagicMock() mock_snapshot.snapshot_id = 123 - catalog.load_table.return_value.snapshot_by_name.side_effect = lambda name: mock_snapshot if name == "my-branch" else None + catalog.load_table.return_value.snapshot_by_name.side_effect = ( + lambda name: mock_snapshot if name == "my-branch" else None + ) loader = OpenHouseDataLoader(catalog=catalog, database="db", table="tbl", branch="my-branch") From 713a524efeec0a4af4cb99cdf768680f86e22d8b Mon Sep 17 00:00:00 2001 From: Rob Reeves Date: Tue, 10 Mar 2026 16:09:30 -0700 Subject: [PATCH 08/13] Remove branch integration tests The Docker Compose setup uses Spark 3.1, which does not support branch-qualified writes or ALTER TABLE CREATE BRANCH. Branch behavior is covered by unit tests. Integration tests can be added when the Docker setup moves to Spark 3.5. Co-Authored-By: Claude Opus 4.6 --- .../dataloader/tests/integration_tests.py | 47 ------------------- 1 file changed, 47 deletions(-) diff --git a/integrations/python/dataloader/tests/integration_tests.py b/integrations/python/dataloader/tests/integration_tests.py index ad44d5bc7..9d859ca32 100644 --- a/integrations/python/dataloader/tests/integration_tests.py +++ b/integrations/python/dataloader/tests/integration_tests.py @@ -224,53 +224,6 @@ def read_token() -> str: list(loader) print("PASS: invalid snapshot_id raised ValueError") - # 8. Branch tests — write to branches via Spark, then read via DataLoader - livy.execute(f"INSERT INTO {FQTN}.branch_a VALUES (5, 'eve', 5.5)") - livy.execute(f"INSERT INTO {FQTN}.branch_b VALUES (6, 'frank', 6.6), (7, 'grace', 7.7)") - - # 8a. branch_a has main data (4 rows) + 1 branch-only row = 5 rows - loader = OpenHouseDataLoader(catalog=catalog, database=DATABASE_ID, table=TABLE_ID, branch="branch_a") - result = _read_all(loader) - assert result.num_rows == 5 - assert result.column(COL_ID).to_pylist() == [1, 2, 3, 4, 5] - assert result.column(COL_NAME).to_pylist() == ["alice", "bob", "charlie", "diana", "eve"] - print(f"PASS: branch_a returned {result.num_rows} rows (main + branch-only data)") - - # 8b. branch_b has main data (4 rows) + 2 branch-only rows = 6 rows - loader = OpenHouseDataLoader(catalog=catalog, database=DATABASE_ID, table=TABLE_ID, branch="branch_b") - result = _read_all(loader) - assert result.num_rows == 6 - assert result.column(COL_ID).to_pylist() == [1, 2, 3, 4, 6, 7] - assert result.column(COL_NAME).to_pylist() == ["alice", "bob", "charlie", "diana", "frank", "grace"] - print(f"PASS: branch_b returned {result.num_rows} rows (main + branch-only data)") - - # 8c. Main is unaffected by branch writes - loader = OpenHouseDataLoader(catalog=catalog, database=DATABASE_ID, table=TABLE_ID) - result = _read_all(loader) - assert result.num_rows == 4 - assert result.column(COL_ID).to_pylist() == [1, 2, 3, 4] - print(f"PASS: main still has {result.num_rows} rows (unaffected by branch writes)") - - # 8d. Each branch has its own snapshot_id, different from main - snap_a = OpenHouseDataLoader( - catalog=catalog, database=DATABASE_ID, table=TABLE_ID, branch="branch_a" - ).snapshot_id - snap_b = OpenHouseDataLoader( - catalog=catalog, database=DATABASE_ID, table=TABLE_ID, branch="branch_b" - ).snapshot_id - assert snap_a != snap2, f"Expected branch_a snapshot != main, got {snap_a}" - assert snap_b != snap2, f"Expected branch_b snapshot != main, got {snap_b}" - assert snap_a != snap_b, f"Expected different branch snapshots, got {snap_a}" - print("PASS: branches have distinct snapshot IDs") - - # 8e. Non-existent branch raises ValueError - with pytest.raises(ValueError, match="Branch .* not found"): - loader = OpenHouseDataLoader( - catalog=catalog, database=DATABASE_ID, table=TABLE_ID, branch="nonexistent" - ) - list(loader) - print("PASS: non-existent branch raised ValueError") - finally: livy.execute(f"DROP TABLE IF EXISTS {FQTN}") From 25c48abfd97555dc7e24d154e0bfb28e8f406b00 Mon Sep 17 00:00:00 2001 From: Rob Reeves Date: Tue, 10 Mar 2026 16:51:25 -0700 Subject: [PATCH 09/13] Apply suggestion from @Copilot Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com> --- .../python/dataloader/src/openhouse/dataloader/data_loader.py | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/integrations/python/dataloader/src/openhouse/dataloader/data_loader.py b/integrations/python/dataloader/src/openhouse/dataloader/data_loader.py index f44b3042e..08ddac8c3 100644 --- a/integrations/python/dataloader/src/openhouse/dataloader/data_loader.py +++ b/integrations/python/dataloader/src/openhouse/dataloader/data_loader.py @@ -91,7 +91,9 @@ def __init__( context: Data loader context max_attempts: Total number of attempts including the initial try (default 3) """ - if branch and snapshot_id is not None: + if branch is not None and branch.strip() == "": + raise ValueError("branch must not be empty or whitespace") + if branch is not None and snapshot_id is not None: raise ValueError("Cannot specify both branch and snapshot_id") self._catalog = catalog self._table_id = TableIdentifier(database, table, branch) From 63c08ed03e8d00f4f89c08d7c708e546586a38dc Mon Sep 17 00:00:00 2001 From: Rob Reeves Date: Tue, 10 Mar 2026 16:51:57 -0700 Subject: [PATCH 10/13] Apply suggestion from @Copilot Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com> --- .../dataloader/src/openhouse/dataloader/data_loader.py | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/integrations/python/dataloader/src/openhouse/dataloader/data_loader.py b/integrations/python/dataloader/src/openhouse/dataloader/data_loader.py index 08ddac8c3..8cdd59fb2 100644 --- a/integrations/python/dataloader/src/openhouse/dataloader/data_loader.py +++ b/integrations/python/dataloader/src/openhouse/dataloader/data_loader.py @@ -124,7 +124,10 @@ def snapshot_id(self) -> int | None: if self._table_id.branch: snapshot = self._iceberg_table.snapshot_by_name(self._table_id.branch) if snapshot is None: - raise ValueError(f"Branch '{self._table_id.branch}' not found for table {self._table_id}") + raise ValueError( + f"Branch '{self._table_id.branch}' not found for table " + f"{self._table_id.database}.{self._table_id.table}" + ) return snapshot.snapshot_id return self._iceberg_table.metadata.current_snapshot_id From 5715ecfed766ec974a755e3cd34d7f1b510340ca Mon Sep 17 00:00:00 2001 From: Rob Reeves Date: Wed, 11 Mar 2026 12:00:30 -0700 Subject: [PATCH 11/13] Simplify branch test by reusing _make_real_catalog Co-Authored-By: Claude Opus 4.6 --- .../dataloader/tests/test_data_loader.py | 49 ++----------------- 1 file changed, 3 insertions(+), 46 deletions(-) diff --git a/integrations/python/dataloader/tests/test_data_loader.py b/integrations/python/dataloader/tests/test_data_loader.py index 9041e567f..473ad7aa7 100644 --- a/integrations/python/dataloader/tests/test_data_loader.py +++ b/integrations/python/dataloader/tests/test_data_loader.py @@ -362,57 +362,14 @@ def test_branch_snapshot_id_not_found_raises(): def test_branch_reads_data_from_branch_snapshot(tmp_path): """Branch reads return data from the branch's snapshot, not the main snapshot.""" - main_data = {COL_ID: [1, 2], COL_NAME: ["alice", "bob"], COL_VALUE: [1.1, 2.2]} branch_data = {COL_ID: [10, 20, 30], COL_NAME: ["x", "y", "z"], COL_VALUE: [10.0, 20.0, 30.0]} - - main_path = _write_parquet(tmp_path, main_data, "main.parquet") - branch_path = _write_parquet(tmp_path, branch_data, "branch.parquet") - - metadata = new_table_metadata( - schema=TEST_SCHEMA, - partition_spec=UNPARTITIONED_PARTITION_SPEC, - sort_order=UNSORTED_SORT_ORDER, - location=str(tmp_path), - properties={}, - ) - io = load_file_io(properties={}, location=main_path) - - main_snapshot_id = 100 - branch_snapshot_id = 200 - - def _make_task(path): - data_file = DataFile.from_args( - file_path=path, - file_format=FileFormat.PARQUET, - record_count=len(next(iter(main_data.values()))), - file_size_in_bytes=os.path.getsize(path), - ) - data_file._spec_id = 0 - return FileScanTask(data_file=data_file) - - tasks_by_snapshot = { - main_snapshot_id: _make_task(main_path), - branch_snapshot_id: _make_task(branch_path), - } - - def fake_scan(**kwargs): - scan = MagicMock() - scan.projection.return_value = TEST_SCHEMA - scan.plan_files.return_value = [tasks_by_snapshot[kwargs["snapshot_id"]]] - return scan + catalog = _make_real_catalog(tmp_path, data=branch_data) mock_snapshot = MagicMock() - mock_snapshot.snapshot_id = branch_snapshot_id - - mock_table = MagicMock() - mock_table.metadata = metadata - mock_table.io = io - mock_table.scan.side_effect = fake_scan + mock_snapshot.snapshot_id = 200 + mock_table = catalog.load_table.return_value mock_table.snapshot_by_name.side_effect = lambda name: mock_snapshot if name == "my-branch" else None - catalog = MagicMock() - catalog.load_table.return_value = mock_table - loader = OpenHouseDataLoader(catalog=catalog, database="db", table="tbl", branch="my-branch") result = _materialize(loader) From 9aab33a1387af6308ab562b52b9725339e2582d2 Mon Sep 17 00:00:00 2001 From: Rob Reeves Date: Wed, 11 Mar 2026 12:14:36 -0700 Subject: [PATCH 12/13] Verify branch test returns different data than main The previous simplification lost the assertion that branch and non-branch reads return different data. Restore two parquet files with snapshot-based routing and assert both cases. Co-Authored-By: Claude Opus 4.6 --- .../dataloader/tests/test_data_loader.py | 65 ++++++++++++++++--- 1 file changed, 56 insertions(+), 9 deletions(-) diff --git a/integrations/python/dataloader/tests/test_data_loader.py b/integrations/python/dataloader/tests/test_data_loader.py index 473ad7aa7..96488cd93 100644 --- a/integrations/python/dataloader/tests/test_data_loader.py +++ b/integrations/python/dataloader/tests/test_data_loader.py @@ -362,18 +362,65 @@ def test_branch_snapshot_id_not_found_raises(): def test_branch_reads_data_from_branch_snapshot(tmp_path): """Branch reads return data from the branch's snapshot, not the main snapshot.""" + main_data = {COL_ID: [1, 2], COL_NAME: ["alice", "bob"], COL_VALUE: [1.1, 2.2]} branch_data = {COL_ID: [10, 20, 30], COL_NAME: ["x", "y", "z"], COL_VALUE: [10.0, 20.0, 30.0]} - catalog = _make_real_catalog(tmp_path, data=branch_data) + + main_path = _write_parquet(tmp_path, main_data, "main.parquet") + branch_path = _write_parquet(tmp_path, branch_data, "branch.parquet") + + metadata = new_table_metadata( + schema=TEST_SCHEMA, + partition_spec=UNPARTITIONED_PARTITION_SPEC, + sort_order=UNSORTED_SORT_ORDER, + location=str(tmp_path), + properties={}, + ) + io = load_file_io(properties={}, location=main_path) + + branch_snapshot_id = 200 + + def _file_scan_task(path, data): + data_file = DataFile.from_args( + file_path=path, + file_format=FileFormat.PARQUET, + record_count=len(next(iter(data.values()))), + file_size_in_bytes=os.path.getsize(path), + ) + data_file._spec_id = 0 + return FileScanTask(data_file=data_file) + + main_task = _file_scan_task(main_path, main_data) + branch_task = _file_scan_task(branch_path, branch_data) + + def fake_scan(**kwargs): + task = branch_task if kwargs.get("snapshot_id") == branch_snapshot_id else main_task + scan = MagicMock() + scan.projection.return_value = TEST_SCHEMA + scan.plan_files.return_value = [task] + return scan mock_snapshot = MagicMock() - mock_snapshot.snapshot_id = 200 - mock_table = catalog.load_table.return_value + mock_snapshot.snapshot_id = branch_snapshot_id + + mock_table = MagicMock() + mock_table.metadata = metadata + mock_table.io = io + mock_table.scan.side_effect = fake_scan mock_table.snapshot_by_name.side_effect = lambda name: mock_snapshot if name == "my-branch" else None - loader = OpenHouseDataLoader(catalog=catalog, database="db", table="tbl", branch="my-branch") - result = _materialize(loader) + catalog = MagicMock() + catalog.load_table.return_value = mock_table - assert result.num_rows == 3 - result = result.sort_by(COL_ID) - assert result.column(COL_ID).to_pylist() == branch_data[COL_ID] - assert result.column(COL_NAME).to_pylist() == branch_data[COL_NAME] + # Without branch: reads main data + main_loader = OpenHouseDataLoader(catalog=catalog, database="db", table="tbl") + main_result = _materialize(main_loader) + assert main_result.num_rows == 2 + main_result = main_result.sort_by(COL_ID) + assert main_result.column(COL_ID).to_pylist() == main_data[COL_ID] + + # With branch: reads branch data + branch_loader = OpenHouseDataLoader(catalog=catalog, database="db", table="tbl", branch="my-branch") + branch_result = _materialize(branch_loader) + assert branch_result.num_rows == 3 + branch_result = branch_result.sort_by(COL_ID) + assert branch_result.column(COL_ID).to_pylist() == branch_data[COL_ID] From decf57205ba21d84c9659bb45db5aacb4d1f11fe Mon Sep 17 00:00:00 2001 From: Rob Reeves Date: Wed, 11 Mar 2026 12:18:07 -0700 Subject: [PATCH 13/13] Simplify branch test to verify splits instead of reading data Use mocks to verify that branch and non-branch loaders yield splits backed by different files, without needing real parquet data or exercising DataLoaderSplit read logic. Co-Authored-By: Claude Opus 4.6 --- .../dataloader/tests/test_data_loader.py | 63 +++++-------------- 1 file changed, 16 insertions(+), 47 deletions(-) diff --git a/integrations/python/dataloader/tests/test_data_loader.py b/integrations/python/dataloader/tests/test_data_loader.py index 96488cd93..d6c654532 100644 --- a/integrations/python/dataloader/tests/test_data_loader.py +++ b/integrations/python/dataloader/tests/test_data_loader.py @@ -360,67 +360,36 @@ def test_branch_snapshot_id_not_found_raises(): _ = loader.snapshot_id -def test_branch_reads_data_from_branch_snapshot(tmp_path): - """Branch reads return data from the branch's snapshot, not the main snapshot.""" - main_data = {COL_ID: [1, 2], COL_NAME: ["alice", "bob"], COL_VALUE: [1.1, 2.2]} - branch_data = {COL_ID: [10, 20, 30], COL_NAME: ["x", "y", "z"], COL_VALUE: [10.0, 20.0, 30.0]} - - main_path = _write_parquet(tmp_path, main_data, "main.parquet") - branch_path = _write_parquet(tmp_path, branch_data, "branch.parquet") +def test_branch_reads_data_from_branch_snapshot(): + """Branch splits come from the branch snapshot, not the main snapshot.""" + catalog = MagicMock() - metadata = new_table_metadata( - schema=TEST_SCHEMA, - partition_spec=UNPARTITIONED_PARTITION_SPEC, - sort_order=UNSORTED_SORT_ORDER, - location=str(tmp_path), - properties={}, - ) - io = load_file_io(properties={}, location=main_path) + main_task = MagicMock() + main_task.file.file_path = "main.parquet" + branch_task = MagicMock() + branch_task.file.file_path = "branch.parquet" branch_snapshot_id = 200 - def _file_scan_task(path, data): - data_file = DataFile.from_args( - file_path=path, - file_format=FileFormat.PARQUET, - record_count=len(next(iter(data.values()))), - file_size_in_bytes=os.path.getsize(path), - ) - data_file._spec_id = 0 - return FileScanTask(data_file=data_file) - - main_task = _file_scan_task(main_path, main_data) - branch_task = _file_scan_task(branch_path, branch_data) - def fake_scan(**kwargs): task = branch_task if kwargs.get("snapshot_id") == branch_snapshot_id else main_task scan = MagicMock() - scan.projection.return_value = TEST_SCHEMA scan.plan_files.return_value = [task] return scan mock_snapshot = MagicMock() mock_snapshot.snapshot_id = branch_snapshot_id - mock_table = MagicMock() - mock_table.metadata = metadata - mock_table.io = io + mock_table = catalog.load_table.return_value mock_table.scan.side_effect = fake_scan mock_table.snapshot_by_name.side_effect = lambda name: mock_snapshot if name == "my-branch" else None - catalog = MagicMock() - catalog.load_table.return_value = mock_table + # Without branch: splits come from main snapshot + main_splits = list(OpenHouseDataLoader(catalog=catalog, database="db", table="tbl")) + assert len(main_splits) == 1 + assert main_splits[0]._file_scan_task.file.file_path == "main.parquet" - # Without branch: reads main data - main_loader = OpenHouseDataLoader(catalog=catalog, database="db", table="tbl") - main_result = _materialize(main_loader) - assert main_result.num_rows == 2 - main_result = main_result.sort_by(COL_ID) - assert main_result.column(COL_ID).to_pylist() == main_data[COL_ID] - - # With branch: reads branch data - branch_loader = OpenHouseDataLoader(catalog=catalog, database="db", table="tbl", branch="my-branch") - branch_result = _materialize(branch_loader) - assert branch_result.num_rows == 3 - branch_result = branch_result.sort_by(COL_ID) - assert branch_result.column(COL_ID).to_pylist() == branch_data[COL_ID] + # With branch: splits come from branch snapshot + branch_splits = list(OpenHouseDataLoader(catalog=catalog, database="db", table="tbl", branch="my-branch")) + assert len(branch_splits) == 1 + assert branch_splits[0]._file_scan_task.file.file_path == "branch.parquet"