[Data] Support incremental reads for Iceberg tables#64485
Conversation
Extend ray.data.read_iceberg(...) with start_snapshot_id / end_snapshot_id to read only the rows appended between two Iceberg snapshots, built on PyIceberg's Table.incremental_append_scan(...) (apache/iceberg-python#3512). - IcebergDatasource: incremental-scan branch reusing the existing plan_files()/projection() read path; validates mutual exclusivity with snapshot_id; guards on the PyIceberg capability with a clear error. - read_iceberg: additive params, docstring, and example. - Tests: validation, capability guard (runs on all PyIceberg versions), and functional incremental reads (skipped until the pin ships ray-project#3512). Related to ray-project#64464 Signed-off-by: Tanmay Rauth <t_rauth@apple.com>
There was a problem hiding this comment.
Code Review
This pull request adds support for incremental append scans in the Iceberg datasource by introducing start_snapshot_id and end_snapshot_id parameters to read_iceberg and IcebergDatasource. It includes compatibility checks for PyIceberg and comprehensive unit tests. The feedback recommends raising a clear ValueError when end_snapshot_id is provided without start_snapshot_id to prevent undefined behavior, along with adding a corresponding unit test to verify this validation.
| # An incremental (append) scan is requested when either snapshot bound is set. | ||
| self._incremental = start_snapshot_id is not None or end_snapshot_id is not None | ||
| if snapshot_id is not None and self._incremental: | ||
| raise ValueError( | ||
| "`snapshot_id` is mutually exclusive with `start_snapshot_id` / " | ||
| "`end_snapshot_id`. Pass `snapshot_id` to read a single snapshot, or " | ||
| "the start/end pair to read data appended between two snapshots." | ||
| ) | ||
| self._start_snapshot_id = start_snapshot_id | ||
| self._end_snapshot_id = end_snapshot_id |
There was a problem hiding this comment.
An incremental read is not well-defined without a starting snapshot ID. If a user provides end_snapshot_id but omits start_snapshot_id, we should raise a clear ValueError to prevent silent misbehavior or confusing errors from PyIceberg's incremental_append_scan.
| # An incremental (append) scan is requested when either snapshot bound is set. | |
| self._incremental = start_snapshot_id is not None or end_snapshot_id is not None | |
| if snapshot_id is not None and self._incremental: | |
| raise ValueError( | |
| "`snapshot_id` is mutually exclusive with `start_snapshot_id` / " | |
| "`end_snapshot_id`. Pass `snapshot_id` to read a single snapshot, or " | |
| "the start/end pair to read data appended between two snapshots." | |
| ) | |
| self._start_snapshot_id = start_snapshot_id | |
| self._end_snapshot_id = end_snapshot_id | |
| # An incremental (append) scan is requested when either snapshot bound is set. | |
| self._incremental = start_snapshot_id is not None or end_snapshot_id is not None | |
| if snapshot_id is not None and self._incremental: | |
| raise ValueError( | |
| "`snapshot_id` is mutually exclusive with `start_snapshot_id` / " | |
| "`end_snapshot_id`. Pass `snapshot_id` to read a single snapshot, or " | |
| "the start/end pair to read data appended between two snapshots." | |
| ) | |
| if self._incremental and start_snapshot_id is None: | |
| raise ValueError( | |
| "`start_snapshot_id` must be provided when performing an incremental read." | |
| ) | |
| self._start_snapshot_id = start_snapshot_id | |
| self._end_snapshot_id = end_snapshot_id |
| def test_incremental_snapshot_id_mutually_exclusive(): | ||
| """`snapshot_id` cannot be combined with the incremental snapshot bounds.""" | ||
| with pytest.raises(ValueError, match="mutually exclusive"): | ||
| IcebergDatasource( | ||
| table_identifier=f"{_DB_NAME}.{_TABLE_NAME}", | ||
| snapshot_id=123, | ||
| start_snapshot_id=456, | ||
| catalog_kwargs=_CATALOG_KWARGS.copy(), | ||
| ) | ||
|
|
||
| with pytest.raises(ValueError, match="mutually exclusive"): | ||
| IcebergDatasource( | ||
| table_identifier=f"{_DB_NAME}.{_TABLE_NAME}", | ||
| snapshot_id=123, | ||
| end_snapshot_id=789, | ||
| catalog_kwargs=_CATALOG_KWARGS.copy(), | ||
| ) |
There was a problem hiding this comment.
Add a test case to verify that a ValueError is raised when end_snapshot_id is provided without start_snapshot_id.
def test_incremental_snapshot_id_mutually_exclusive():
"""`snapshot_id` cannot be combined with the incremental snapshot bounds."""
with pytest.raises(ValueError, match="mutually exclusive"):
IcebergDatasource(
table_identifier=f"{_DB_NAME}.{_TABLE_NAME}",
snapshot_id=123,
start_snapshot_id=456,
catalog_kwargs=_CATALOG_KWARGS.copy(),
)
with pytest.raises(ValueError, match="mutually exclusive"):
IcebergDatasource(
table_identifier=f"{_DB_NAME}.{_TABLE_NAME}",
snapshot_id=123,
end_snapshot_id=789,
catalog_kwargs=_CATALOG_KWARGS.copy(),
)
with pytest.raises(ValueError, match="start_snapshot_id must be provided"):
IcebergDatasource(
table_identifier=f"{_DB_NAME}.{_TABLE_NAME}",
end_snapshot_id=789,
catalog_kwargs=_CATALOG_KWARGS.copy(),
)|
@abrarsheikh @rueian Can you please review it when you get chance? |
Extend ray.data.read_iceberg(...) with start_snapshot_id / end_snapshot_id to read only the rows appended between two Iceberg snapshots, built on PyIceberg's Table.incremental_append_scan(...) (apache/iceberg-python#3512).
Related to #64464