Skip to content

Commit 4f8a811

Browse files
[python] fix data-evolution double-counting issue by using mergedRowCount in ray datasource (#7087)
1 parent 88332de commit 4f8a811

7 files changed

Lines changed: 424 additions & 6 deletions

File tree

paimon-python/pypaimon/globalindex/indexed_split.py

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -76,6 +76,9 @@ def row_count(self) -> int:
7676
"""
7777
return sum(r.count() for r in self._row_ranges)
7878

79+
def merged_row_count(self):
80+
return self.row_count
81+
7982
# Delegate other properties to data_split
8083

8184
@property

paimon-python/pypaimon/read/datasource/ray_datasource.py

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -171,8 +171,15 @@ def _get_read_task(
171171
for split in chunk_splits:
172172
if predicate is None:
173173
# Only estimate rows if no predicate (predicate filtering changes row count)
174-
if hasattr(split, 'row_count') and split.row_count > 0:
175-
total_rows += split.row_count
174+
row_count = None
175+
if hasattr(split, 'merged_row_count'):
176+
merged_count = split.merged_row_count()
177+
if merged_count is not None:
178+
row_count = merged_count
179+
if row_count is None and hasattr(split, 'row_count') and split.row_count > 0:
180+
row_count = split.row_count
181+
if row_count is not None and row_count > 0:
182+
total_rows += row_count
176183
if hasattr(split, 'file_size') and split.file_size > 0:
177184
total_size += split.file_size
178185

paimon-python/pypaimon/read/scanner/data_evolution_split_generator.py

Lines changed: 57 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,7 @@
2323
from pypaimon.manifest.schema.data_file_meta import DataFileMeta
2424
from pypaimon.manifest.schema.manifest_entry import ManifestEntry
2525
from pypaimon.read.scanner.split_generator import AbstractSplitGenerator
26-
from pypaimon.read.split import Split
26+
from pypaimon.read.split import DataSplit, Split
2727
from pypaimon.read.sliced_split import SlicedSplit
2828

2929

@@ -104,8 +104,8 @@ def weight_func(file_list: List[DataFileMeta]) -> int:
104104
for pack in packed_files
105105
]
106106

107-
splits += self._build_split_from_pack(
108-
flatten_packed_files, sorted_entries_list, False
107+
splits += self._build_split_from_pack_for_data_evolution(
108+
flatten_packed_files, packed_files, sorted_entries_list
109109
)
110110

111111
if self.start_pos_of_this_subtask is not None or self.idx_of_this_subtask is not None:
@@ -117,6 +117,60 @@ def weight_func(file_list: List[DataFileMeta]) -> int:
117117

118118
return splits
119119

120+
def _build_split_from_pack_for_data_evolution(
121+
self,
122+
flatten_packed_files: List[List[DataFileMeta]],
123+
packed_files: List[List[List[DataFileMeta]]],
124+
file_entries: List[ManifestEntry]
125+
) -> List[Split]:
126+
"""
127+
Build splits from packed files for data evolution tables.
128+
raw_convertible is True only when each range (pack) contains exactly one file.
129+
"""
130+
splits = []
131+
for i, file_group in enumerate(flatten_packed_files):
132+
# In Java: rawConvertible = f.stream().allMatch(file -> file.size() == 1)
133+
# This means raw_convertible is True only when each range contains exactly one file
134+
pack = packed_files[i] if i < len(packed_files) else []
135+
raw_convertible = all(len(sub_pack) == 1 for sub_pack in pack)
136+
137+
file_paths = []
138+
total_file_size = 0
139+
total_record_count = 0
140+
141+
for data_file in file_group:
142+
data_file.set_file_path(
143+
self.table.table_path,
144+
file_entries[0].partition,
145+
file_entries[0].bucket
146+
)
147+
file_paths.append(data_file.file_path)
148+
total_file_size += data_file.file_size
149+
total_record_count += data_file.row_count
150+
151+
if file_paths:
152+
# Get deletion files for this split
153+
data_deletion_files = None
154+
if self.deletion_files_map:
155+
data_deletion_files = self._get_deletion_files_for_split(
156+
file_group,
157+
file_entries[0].partition,
158+
file_entries[0].bucket
159+
)
160+
161+
split = DataSplit(
162+
files=file_group,
163+
partition=file_entries[0].partition,
164+
bucket=file_entries[0].bucket,
165+
file_paths=file_paths,
166+
row_count=total_record_count,
167+
file_size=total_file_size,
168+
raw_convertible=raw_convertible,
169+
data_deletion_files=data_deletion_files
170+
)
171+
splits.append(split)
172+
return splits
173+
120174
def _wrap_to_sliced_splits(self, splits: List[Split], plan_start_pos: int, plan_end_pos: int) -> List[Split]:
121175
"""
122176
Wrap splits with SlicedSplit to add file-level slicing information.

paimon-python/pypaimon/read/sliced_split.py

Lines changed: 78 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -96,6 +96,84 @@ def raw_convertible(self):
9696
def data_deletion_files(self):
9797
return self._data_split.data_deletion_files
9898

99+
def _get_sliced_file_row_count(self, file: 'DataFileMeta') -> int:
100+
if file.file_name in self._shard_file_idx_map:
101+
start, end = self._shard_file_idx_map[file.file_name]
102+
return (end - start) if start != -1 and end != -1 else 0
103+
return file.row_count
104+
105+
def merged_row_count(self):
106+
if not self._shard_file_idx_map:
107+
return self._data_split.merged_row_count()
108+
109+
underlying_merged = self._data_split.merged_row_count()
110+
if underlying_merged is not None:
111+
original_row_count = self._data_split.row_count
112+
return int(underlying_merged * self.row_count / original_row_count) if original_row_count > 0 else 0
113+
114+
from pypaimon.read.split import DataSplit
115+
from pypaimon.globalindex.range import Range
116+
117+
if not isinstance(self._data_split, DataSplit):
118+
return None
119+
120+
if not all(f.first_row_id is not None for f in self._data_split.files):
121+
return None
122+
123+
file_ranges = []
124+
for file in self._data_split.files:
125+
if file.first_row_id is not None:
126+
sliced_count = self._get_sliced_file_row_count(file)
127+
if sliced_count > 0:
128+
file_ranges.append((file, Range(file.first_row_id, file.first_row_id + sliced_count - 1)))
129+
130+
if not file_ranges:
131+
return 0
132+
133+
file_ranges.sort(key=lambda x: x[1].from_)
134+
135+
groups = []
136+
current_group = [file_ranges[0]]
137+
current_range = file_ranges[0][1]
138+
139+
for file, file_range in file_ranges[1:]:
140+
if file_range.from_ <= current_range.to + 1:
141+
current_group.append((file, file_range))
142+
current_range = Range(current_range.from_, max(current_range.to, file_range.to))
143+
else:
144+
groups.append(current_group)
145+
current_group = [(file, file_range)]
146+
current_range = file_range
147+
148+
if current_group:
149+
groups.append(current_group)
150+
151+
sum_rows = 0
152+
for group in groups:
153+
max_count = 0
154+
for file, _ in group:
155+
max_count = max(max_count, self._get_sliced_file_row_count(file))
156+
sum_rows += max_count
157+
158+
if self._data_split.data_deletion_files is not None:
159+
if not all(f is None or f.cardinality is not None for f in self._data_split.data_deletion_files):
160+
return None
161+
162+
for i, deletion_file in enumerate(self._data_split.data_deletion_files):
163+
if (deletion_file is not None and deletion_file.cardinality is not None
164+
and i < len(self._data_split.files)):
165+
file = self._data_split.files[i]
166+
if file.first_row_id is not None:
167+
file_original_count = file.row_count
168+
file_sliced_count = self._get_sliced_file_row_count(file)
169+
if file_original_count > 0:
170+
deletion_ratio = deletion_file.cardinality / file_original_count
171+
sum_rows -= int(file_sliced_count * deletion_ratio)
172+
else:
173+
sum_rows -= deletion_file.cardinality
174+
175+
return sum_rows
176+
99177
def __eq__(self, other):
100178
if not isinstance(other, SlicedSplit):
101179
return False

paimon-python/pypaimon/read/split.py

Lines changed: 94 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -55,6 +55,15 @@ def bucket(self) -> int:
5555
"""Return the bucket of this split."""
5656
pass
5757

58+
def merged_row_count(self) -> Optional[int]:
59+
"""
60+
Return the merged row count of data files. For example, when the delete vector is enabled in
61+
the primary key table, the number of rows that have been deleted will be subtracted from the
62+
returned result. In the Data Evolution mode of the Append table, the actual number of rows
63+
will be returned.
64+
"""
65+
return None
66+
5867

5968
class DataSplit(Split):
6069
"""
@@ -106,3 +115,88 @@ def file_size(self) -> int:
106115
@property
107116
def file_paths(self) -> List[str]:
108117
return self._file_paths
118+
119+
def set_row_count(self, row_count: int) -> None:
120+
self._row_count = row_count
121+
122+
def merged_row_count(self) -> Optional[int]:
123+
"""
124+
Return the merged row count of data files. For example, when the delete vector is enabled in
125+
the primary key table, the number of rows that have been deleted will be subtracted from the
126+
returned result. In the Data Evolution mode of the Append table, the actual number of rows
127+
will be returned.
128+
"""
129+
if self._raw_merged_row_count_available():
130+
return self._raw_merged_row_count()
131+
if self._data_evolution_row_count_available():
132+
return self._data_evolution_merged_row_count()
133+
return None
134+
135+
def _raw_merged_row_count_available(self) -> bool:
136+
return self.raw_convertible and (
137+
self.data_deletion_files is None
138+
or all(f is None or f.cardinality is not None for f in self.data_deletion_files)
139+
)
140+
141+
def _raw_merged_row_count(self) -> int:
142+
sum_rows = 0
143+
for i, file in enumerate(self._files):
144+
deletion_file = None
145+
if self.data_deletion_files is not None and i < len(self.data_deletion_files):
146+
deletion_file = self.data_deletion_files[i]
147+
148+
if deletion_file is None:
149+
sum_rows += file.row_count
150+
elif deletion_file.cardinality is not None:
151+
sum_rows += file.row_count - deletion_file.cardinality
152+
153+
return sum_rows
154+
155+
def _data_evolution_row_count_available(self) -> bool:
156+
for file in self._files:
157+
if file.first_row_id is None:
158+
return False
159+
return True
160+
161+
def _data_evolution_merged_row_count(self) -> int:
162+
if not self._files:
163+
return 0
164+
165+
file_ranges = []
166+
for file in self._files:
167+
if file.first_row_id is not None and file.row_count > 0:
168+
start = file.first_row_id
169+
end = file.first_row_id + file.row_count - 1
170+
file_ranges.append((file, start, end))
171+
172+
if not file_ranges:
173+
return 0
174+
175+
file_ranges.sort(key=lambda x: (x[1], x[2]))
176+
177+
groups = []
178+
current_group = [file_ranges[0]]
179+
current_end = file_ranges[0][2]
180+
181+
for file_range in file_ranges[1:]:
182+
file, start, end = file_range
183+
if start <= current_end:
184+
current_group.append(file_range)
185+
if end > current_end:
186+
current_end = end
187+
else:
188+
groups.append(current_group)
189+
current_group = [file_range]
190+
current_end = end
191+
192+
if current_group:
193+
groups.append(current_group)
194+
195+
sum_rows = 0
196+
for group in groups:
197+
max_count = 0
198+
for file, _, _ in group:
199+
max_count = max(max_count, file.row_count)
200+
sum_rows += max_count
201+
202+
return sum_rows

0 commit comments

Comments
 (0)