From 6f56c7ccb319b50726d171790756e991abbc46d4 Mon Sep 17 00:00:00 2001 From: "langu.gjl" Date: Fri, 12 Jun 2026 10:42:16 +0800 Subject: [PATCH 1/4] [python] Fix schema fields callback to short-circuit current schema id --- .../pypaimon/read/scanner/file_scanner.py | 17 +++- .../file_scanner_schema_fields_test.py | 94 +++++++++++++++++++ 2 files changed, 107 insertions(+), 4 deletions(-) create mode 100644 paimon-python/pypaimon/tests/scanner/file_scanner_schema_fields_test.py diff --git a/paimon-python/pypaimon/read/scanner/file_scanner.py b/paimon-python/pypaimon/read/scanner/file_scanner.py index 650bbe8ca402..b8b250baf260 100755 --- a/paimon-python/pypaimon/read/scanner/file_scanner.py +++ b/paimon-python/pypaimon/read/scanner/file_scanner.py @@ -227,14 +227,23 @@ def __init__( # bucket set per ``total_buckets`` value. self._bucket_selector = self._init_bucket_selector() - def schema_fields_func(schema_id: int): - return self.table.schema_manager.get_schema(schema_id).fields - self.simple_stats_evolutions = SimpleStatsEvolutions( - schema_fields_func, + self._schema_fields, self.table.table_schema.id ) + def _schema_fields(self, schema_id: int): + """Resolve schema fields, short-circuiting current table schema id to + avoid filesystem access (REST catalog would get 403). + + For historical schemas, delegates to schema_manager.get_schema(). + + Reference: AbstractFileStoreScan.scanTableSchema() in Java + """ + if schema_id == self.table.table_schema.id: + return self.table.table_schema.fields + return self.table.schema_manager.get_schema(schema_id).fields + def _deletion_files_map(self, entries: List[ManifestEntry]) -> Dict[tuple, Dict[str, DeletionFile]]: if not self.deletion_vectors_enabled: return {} diff --git a/paimon-python/pypaimon/tests/scanner/file_scanner_schema_fields_test.py b/paimon-python/pypaimon/tests/scanner/file_scanner_schema_fields_test.py new file mode 100644 index 000000000000..b75e7505d057 --- /dev/null +++ b/paimon-python/pypaimon/tests/scanner/file_scanner_schema_fields_test.py @@ -0,0 +1,94 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +import unittest +from unittest.mock import MagicMock + +from pypaimon.read.scanner.file_scanner import FileScanner +from pypaimon.schema.data_types import AtomicType, DataField + + +class FileScannerSchemaFieldsShortCircuitTest(unittest.TestCase): + """Test _schema_fields short-circuits current schema id (REST catalog 403 fix).""" + + def _make_scanner(self, current_schema_id, current_fields, + historical_fields_map=None): + """Build a FileScanner bypassing __init__ to isolate _schema_fields.""" + scanner = FileScanner.__new__(FileScanner) + scanner.table = MagicMock() + scanner.table.table_schema.id = current_schema_id + scanner.table.table_schema.fields = current_fields + scanner.table.schema_manager = MagicMock() + + def get_schema(schema_id): + if historical_fields_map is None or schema_id not in historical_fields_map: + raise AssertionError( + f"schema_manager.get_schema({schema_id}) was called " + "but no historical schema was registered for it" + ) + historical = MagicMock() + historical.fields = historical_fields_map[schema_id] + return historical + + scanner.table.schema_manager.get_schema.side_effect = get_schema + return scanner + + def test_short_circuits_current_schema_id(self): + """Current schema id returns in-memory fields without filesystem access.""" + current_fields = [DataField(0, "a", AtomicType("INT"))] + scanner = self._make_scanner( + current_schema_id=5, current_fields=current_fields + ) + + result = scanner._schema_fields(5) + + self.assertIs(result, current_fields) + scanner.table.schema_manager.get_schema.assert_not_called() + + def test_delegates_for_historical_schema(self): + """Historical schema id delegates to schema_manager.get_schema().""" + current_fields = [DataField(0, "a", AtomicType("INT"))] + historical_fields = [ + DataField(0, "a", AtomicType("INT")), + DataField(1, "b", AtomicType("STRING")), + ] + scanner = self._make_scanner( + current_schema_id=5, + current_fields=current_fields, + historical_fields_map={3: historical_fields}, + ) + + result = scanner._schema_fields(3) + + self.assertEqual(result, historical_fields) + scanner.table.schema_manager.get_schema.assert_called_once_with(3) + + def test_short_circuit_works_for_zero_schema_id(self): + """Schema id == 0 still short-circuits (guards against truthiness bugs).""" + current_fields = [DataField(0, "x", AtomicType("INT"))] + scanner = self._make_scanner( + current_schema_id=0, current_fields=current_fields + ) + + result = scanner._schema_fields(0) + + self.assertIs(result, current_fields) + scanner.table.schema_manager.get_schema.assert_not_called() + + +if __name__ == "__main__": + unittest.main() From e9f1267958c8056d22ab23d690505f9e9c6c9500 Mon Sep 17 00:00:00 2001 From: "langu.gjl" Date: Fri, 12 Jun 2026 14:26:06 +0800 Subject: [PATCH 2/4] [python] Add schema short-circuit to SplitRead and simplify FileScanner docstring --- .../pypaimon/read/scanner/file_scanner.py | 8 ++------ paimon-python/pypaimon/read/split_read.py | 15 +++++++++++---- 2 files changed, 13 insertions(+), 10 deletions(-) diff --git a/paimon-python/pypaimon/read/scanner/file_scanner.py b/paimon-python/pypaimon/read/scanner/file_scanner.py index b8b250baf260..e5f4e332a144 100755 --- a/paimon-python/pypaimon/read/scanner/file_scanner.py +++ b/paimon-python/pypaimon/read/scanner/file_scanner.py @@ -233,12 +233,8 @@ def __init__( ) def _schema_fields(self, schema_id: int): - """Resolve schema fields, short-circuiting current table schema id to - avoid filesystem access (REST catalog would get 403). - - For historical schemas, delegates to schema_manager.get_schema(). - - Reference: AbstractFileStoreScan.scanTableSchema() in Java + """Resolve schema fields, short-circuiting current table schema id to avoid + filesystem access (REST catalog would get 403). """ if schema_id == self.table.table_schema.id: return self.table.table_schema.fields diff --git a/paimon-python/pypaimon/read/split_read.py b/paimon-python/pypaimon/read/split_read.py index 543c85893559..d962657944e3 100644 --- a/paimon-python/pypaimon/read/split_read.py +++ b/paimon-python/pypaimon/read/split_read.py @@ -167,6 +167,14 @@ def _compute_nested_path_by_name(self) -> Optional[Dict[str, List[str]]]: def _nested_path_by_name(self) -> Optional[Dict[str, List[str]]]: return self._cached_nested_path_by_name + def _resolve_schema(self, schema_id: int): + """Resolve schema, short-circuiting current table schema id to avoid + filesystem access (REST catalog would get 403). + """ + if schema_id == self.table.table_schema.id: + return self.table.table_schema + return self.table.schema_manager.get_schema(schema_id) + def _push_down_predicate(self) -> Optional[Predicate]: if self.predicate is None: return None @@ -304,8 +312,7 @@ def file_reader_supplier(self, file: DataFileMeta, for_merge_read: bool, if has_nested: raise NotImplementedError( "Nested-field projection is not supported on ROW files") - file_schema = self.table.schema_manager.get_schema( - file.schema_id) + file_schema = self._resolve_schema(file.schema_id) if file.write_cols: field_map = {f.name: f for f in file_schema.fields} row_full_fields = [field_map[n] for n in file.write_cols @@ -389,7 +396,7 @@ def _get_fields_and_predicate(self, schema_id: int, read_fields): key = (schema_id, tuple(read_fields)) if key not in self.schema_id_2_fields: nested_path_by_name = self._nested_path_by_name() - schema = self.table.schema_manager.get_schema(schema_id) + schema = self._resolve_schema(schema_id) schema_fields = ( SpecialFields.row_type_with_row_tracking(schema.fields) if self.row_tracking_enabled else schema.fields @@ -461,7 +468,7 @@ def _file_read_fields(self, file: DataFileMeta) -> Optional[List[DataField]]: nested-projection reads.""" if self._nested_path_by_name() is not None: return None - file_schema = self.table.schema_manager.get_schema(file.schema_id) + file_schema = self._resolve_schema(file.schema_id) if file_schema is None: return None return self._final_data_fields_from( From 9f5d5211e86ccaf6b2c70cb622f8d40f3c3b540b Mon Sep 17 00:00:00 2001 From: "langu.gjl" Date: Fri, 12 Jun 2026 17:26:31 +0800 Subject: [PATCH 3/4] [python] Fix partition_predicate_test mock to set table_schema.fields The schema short-circuit in FileScanner._schema_fields() returns table.table_schema.fields when schema_id matches the current schema id. The test fixture only mocked Mock(id=0) without .fields, causing the short-circuit path to return a Mock auto-attribute that is not iterable when used by SimpleStatsEvolutions._create_index_cast_mapping. --- paimon-python/pypaimon/tests/partition_predicate_test.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/paimon-python/pypaimon/tests/partition_predicate_test.py b/paimon-python/pypaimon/tests/partition_predicate_test.py index abf5212d0e75..93eace2795c0 100644 --- a/paimon-python/pypaimon/tests/partition_predicate_test.py +++ b/paimon-python/pypaimon/tests/partition_predicate_test.py @@ -65,7 +65,7 @@ def _mock_scanner_table(): table.options.data_evolution_enabled.return_value = False table.options.deletion_vectors_enabled.return_value = False table.options.scan_manifest_parallelism.return_value = 1 - table.table_schema = Mock(id=0) + table.table_schema = Mock(id=0, fields=TABLE_FIELDS) table.schema_manager = Mock() table.schema_manager.get_schema.return_value = Mock(fields=TABLE_FIELDS) return table From 8817e848b576b2f85405fb7d9173037ef7e24336 Mon Sep 17 00:00:00 2001 From: "langu.gjl" Date: Mon, 15 Jun 2026 10:35:07 +0800 Subject: [PATCH 4/4] [python] Trigger CI rerun