diff --git a/paimon-python/pypaimon/casting/__init__.py b/paimon-python/pypaimon/casting/__init__.py new file mode 100644 index 000000000000..13a83393a912 --- /dev/null +++ b/paimon-python/pypaimon/casting/__init__.py @@ -0,0 +1,16 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. diff --git a/paimon-python/pypaimon/casting/data_type_casts.py b/paimon-python/pypaimon/casting/data_type_casts.py new file mode 100644 index 000000000000..819b9c115c00 --- /dev/null +++ b/paimon-python/pypaimon/casting/data_type_casts.py @@ -0,0 +1,257 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +"""Type-cast support rules used to validate ``update column type`` schema +changes. + +The rules mirror the engine-wide cast specification so a type change accepted +here is one the read path can also materialize: an *implicit* cast is a safe +widening (e.g. INT -> BIGINT, any numeric -> DECIMAL/DOUBLE), while an +*explicit* cast covers the broader, possibly lossy conversions a user opts into +(e.g. DOUBLE -> INT truncation, anything -> STRING). Read-time execution then +applies the conversion leniently. +""" + +import pyarrow as pa + +from pypaimon.schema.data_types import (ArrayType, AtomicType, DataTypeParser, + MapType, MultisetType, + PyarrowFieldParser, RowType, + VectorType) + +# ---- Type roots -------------------------------------------------------------- + +CHAR = "CHAR" +VARCHAR = "VARCHAR" +BOOLEAN = "BOOLEAN" +BINARY = "BINARY" +VARBINARY = "VARBINARY" +DECIMAL = "DECIMAL" +TINYINT = "TINYINT" +SMALLINT = "SMALLINT" +INTEGER = "INTEGER" +BIGINT = "BIGINT" +FLOAT = "FLOAT" +DOUBLE = "DOUBLE" +DATE = "DATE" +TIME = "TIME" +TIMESTAMP = "TIMESTAMP" +TIMESTAMP_LTZ = "TIMESTAMP_LTZ" +ARRAY = "ARRAY" +MAP = "MAP" +MULTISET = "MULTISET" +ROW = "ROW" +VECTOR = "VECTOR" +VARIANT = "VARIANT" +BLOB = "BLOB" + +# ---- Families ---------------------------------------------------------------- + +CHARACTER_STRING = {CHAR, VARCHAR} +BINARY_STRING = {BINARY, VARBINARY} +INTEGER_NUMERIC = {TINYINT, SMALLINT, INTEGER, BIGINT} +NUMERIC = INTEGER_NUMERIC | {FLOAT, DOUBLE, DECIMAL} +TIMESTAMP_FAMILY = {TIMESTAMP, TIMESTAMP_LTZ} +TIME_FAMILY = {TIME} +DATETIME = {DATE, TIME, TIMESTAMP, TIMESTAMP_LTZ} +PREDEFINED = { + CHAR, VARCHAR, BOOLEAN, BINARY, VARBINARY, DECIMAL, + TINYINT, SMALLINT, INTEGER, BIGINT, FLOAT, DOUBLE, + DATE, TIME, TIMESTAMP, TIMESTAMP_LTZ, +} +CONSTRUCTED = {ARRAY, MAP, MULTISET, ROW, VECTOR} +# Constructed types the read path can render as a character string +# ('{v1, v2}' / '[e1, e2]' / '{k -> v}'). VECTOR and MULTISET have no string +# rendering, so a type change from them to CHAR/VARCHAR is rejected here +# rather than failing when an old file is read. +STRING_RENDERABLE_CONSTRUCTED = {ARRAY, MAP, ROW} + + +def _root(data_type) -> str: + if isinstance(data_type, RowType): + return ROW + if isinstance(data_type, ArrayType): + return ARRAY + if isinstance(data_type, MapType): + return MAP + if isinstance(data_type, MultisetType): + return MULTISET + if isinstance(data_type, VectorType): + return VECTOR + if isinstance(data_type, AtomicType): + t = data_type.type.upper() + if t.startswith("DECIMAL") or t.startswith("NUMERIC") or t.startswith("DEC"): + return DECIMAL + if t in ("INT", "INTEGER"): + return INTEGER + if t in (TINYINT, SMALLINT, BIGINT, FLOAT, DOUBLE, BOOLEAN, DATE): + return t + if t == "STRING" or t.startswith("VARCHAR"): + return VARCHAR + if t.startswith("CHAR"): + return CHAR + if t == "BYTES" or t.startswith("VARBINARY"): + return VARBINARY + if t.startswith("BINARY"): + return BINARY + if t == "BLOB": + return BLOB + if t.startswith("TIMESTAMP_LTZ"): + return TIMESTAMP_LTZ + if t.startswith("TIMESTAMP"): + return TIMESTAMP + if t.startswith("TIME"): + return TIME + if t == "VARIANT": + return VARIANT + return None + + +def _build_rules(): + implicit = {} + explicit = {} + # Identity cast for every root. + for root in (PREDEFINED | CONSTRUCTED | {VARIANT, BLOB}): + implicit[root] = {root} + explicit[root] = set() + + def rule(target, implicit_from=None, explicit_from=None): + implicit[target] |= set(implicit_from or set()) + explicit[target] |= set(explicit_from or set()) + + rule(CHAR, {CHAR}, PREDEFINED | STRING_RENDERABLE_CONSTRUCTED) + rule(VARCHAR, CHARACTER_STRING, PREDEFINED | STRING_RENDERABLE_CONSTRUCTED) + rule(BOOLEAN, {BOOLEAN}, CHARACTER_STRING | INTEGER_NUMERIC) + rule(BINARY, {BINARY}, CHARACTER_STRING | {VARBINARY}) + rule(VARBINARY, BINARY_STRING, CHARACTER_STRING | {BINARY}) + rule(DECIMAL, NUMERIC, CHARACTER_STRING | {BOOLEAN, TIMESTAMP, TIMESTAMP_LTZ}) + int_explicit = NUMERIC | CHARACTER_STRING | {BOOLEAN, TIMESTAMP, TIMESTAMP_LTZ} + rule(TINYINT, {TINYINT}, int_explicit) + rule(SMALLINT, {TINYINT, SMALLINT}, int_explicit) + rule(INTEGER, {TINYINT, SMALLINT, INTEGER}, int_explicit) + rule(BIGINT, {TINYINT, SMALLINT, INTEGER, BIGINT}, int_explicit) + rule(FLOAT, {TINYINT, SMALLINT, INTEGER, BIGINT, FLOAT, DECIMAL}, int_explicit) + rule(DOUBLE, NUMERIC, CHARACTER_STRING | {BOOLEAN, TIMESTAMP, TIMESTAMP_LTZ}) + rule(DATE, {DATE, TIMESTAMP}, TIMESTAMP_FAMILY | CHARACTER_STRING) + rule(TIME, {TIME, TIMESTAMP}, TIME_FAMILY | TIMESTAMP_FAMILY | CHARACTER_STRING) + rule(TIMESTAMP, {TIMESTAMP, TIMESTAMP_LTZ}, DATETIME | CHARACTER_STRING | NUMERIC) + rule(TIMESTAMP_LTZ, {TIMESTAMP_LTZ, TIMESTAMP}, DATETIME | CHARACTER_STRING | NUMERIC) + return implicit, explicit + + +_IMPLICIT_RULES, _EXPLICIT_RULES = _build_rules() + + +def supports_cast(source_type, target_type, allow_explicit: bool = True) -> bool: + """Whether ``source_type`` can be cast to ``target_type`` for a column type + change. ``allow_explicit`` permits the broader (possibly lossy) conversions + in addition to the safe widening ones.""" + source_root = _root(source_type) + target_root = _root(target_type) + if source_root is None or target_root is None: + return False + # A NOT NULL target cannot accept a nullable source unless explicitly allowed. + if source_type.nullable and not target_type.nullable and not allow_explicit: + return False + if source_root == target_root: + if source_root in CONSTRUCTED: + # A constructed type is only castable to an (ignoring outer + # nullability) identical constructed type. Reshaping is done + # through sub-field / 'element' / 'value' paths instead: a whole + # ROW replacement would carry caller-supplied nested field ids + # that corrupt the id model, and there is no runtime conversion + # between differently-shaped constructed values. + return _equals_ignore_nullable(source_type, target_type) + return True + if source_root in _IMPLICIT_RULES.get(target_root, set()): + return True + if allow_explicit and source_root in _EXPLICIT_RULES.get(target_root, set()): + return True + return False + + +def _equals_ignore_nullable(source_type, target_type) -> bool: + source_copy = DataTypeParser.parse_data_type(source_type.to_dict()) + target_copy = DataTypeParser.parse_data_type(target_type.to_dict()) + source_copy.nullable = True + target_copy.nullable = True + return source_copy == target_copy + + +# Caches the PyArrow cast-kernel probe per (source, target) pyarrow type so the +# alter-time check stays cheap. Keyed by the pyarrow type strings. +_EXECUTABLE_CAST_CACHE = {} + + +def can_execute_cast(source_type, target_type) -> bool: + """Whether the Python read path can actually *materialize* a stored + ``source_type`` value as ``target_type`` when reading a file written before + the column type change. + + ``supports_cast`` only encodes the *logical* cast specification (mirroring + Java ``DataTypeCasts``). This is the executable-cast counterpart of Java's + ``CastExecutors.resolve(...) != null`` guard: some logically-valid casts + (e.g. ``TIMESTAMP -> DECIMAL``, ``BOOLEAN -> DECIMAL``, ``TIME -> + TIMESTAMP``) have no PyArrow cast kernel, so without this check the alter + succeeds and the read later fails with ``ArrowNotImplementedError``. + """ + source_root = _root(source_type) + target_root = _root(target_type) + if source_root is None or target_root is None: + return False + # Same root: identity, or a same-shape constructed type whose value is + # rebuilt by the read path's field-id alignment rather than a value cast. + if source_root == target_root: + return True + # Constructed -> character string is rendered by the read path's custom + # ``_constructed_to_string_array`` (see DataFileBatchReader), not a cast. + if (source_root in STRING_RENDERABLE_CONSTRUCTED + and target_root in CHARACTER_STRING): + return True + # Any other conversion touching a constructed type has no runtime cast. + if source_root in CONSTRUCTED or target_root in CONSTRUCTED: + return False + # Leaf-to-leaf: defer to PyArrow's cast-kernel availability, which is the + # read path's actual cast executor (``array.cast(target, safe=False)``). + return _pyarrow_cast_supported(source_type, target_type) + + +def _pyarrow_cast_supported(source_type, target_type) -> bool: + source_pa = PyarrowFieldParser.from_paimon_type(source_type) + target_pa = PyarrowFieldParser.from_paimon_type(target_type) + if source_pa == target_pa: + return True + cache_key = (str(source_pa), str(target_pa)) + cached = _EXECUTABLE_CAST_CACHE.get(cache_key) + if cached is not None: + return cached + # Probe a one-row (null-valued) array rather than an empty one. An empty + # array only resolves the cast kernel; some kernels additionally reject the + # target type parameters on any non-empty input -- e.g. INT -> DECIMAL(10,2) + # has a kernel but needs precision >= 12 to hold an int's range at scale 2, + # so an empty probe passes yet the read later fails with ArrowInvalid. A + # single null row triggers that static type-parameter validation while + # avoiding per-value parse/overflow errors (which ``safe=False`` -- matching + # the read path -- tolerates anyway). + try: + pa.nulls(1, type=source_pa).cast(target_pa, safe=False) + ok = True + except (pa.lib.ArrowNotImplementedError, pa.lib.ArrowInvalid, + pa.lib.ArrowTypeError): + ok = False + _EXECUTABLE_CAST_CACHE[cache_key] = ok + return ok diff --git a/paimon-python/pypaimon/read/reader/data_file_batch_reader.py b/paimon-python/pypaimon/read/reader/data_file_batch_reader.py index e3bd51143936..0183396fe704 100644 --- a/paimon-python/pypaimon/read/reader/data_file_batch_reader.py +++ b/paimon-python/pypaimon/read/reader/data_file_batch_reader.py @@ -18,16 +18,94 @@ from typing import List, Optional import pyarrow as pa +import pyarrow.compute as pc from pyarrow import RecordBatch from pypaimon.common.file_io import FileIO from pypaimon.read.partition_info import PartitionInfo from pypaimon.read.reader.format_blob_reader import FormatBlobReader from pypaimon.read.reader.iface.record_batch_reader import RecordBatchReader -from pypaimon.schema.data_types import DataField, PyarrowFieldParser +from pypaimon.schema.data_types import (ArrayType, AtomicType, DataField, + MapType, PyarrowFieldParser, RowType) from pypaimon.table.special_fields import SpecialFields +def _is_character_string_type(data_type) -> bool: + if not isinstance(data_type, AtomicType): + return False + t = data_type.type.upper() + return t == 'STRING' or t.startswith('VARCHAR') or t.startswith('CHAR') + + +def _unslice(array): + """Re-materialize a sliced array so offsets/buffers start at zero. + + The list/map rebuilds below read ``offsets``/raw buffers directly; on a + sliced array those still point into the parent storage, which either + errors (list rebuild with a null mask) or silently misaligns rows (map + rebuild from raw buffers).""" + if array.offset == 0: + return array + return pa.concat_arrays([array]) + + +def _to_string_values(array, data_type) -> list: + """Render *array* as a list of per-row strings (None for NULL rows).""" + if isinstance(data_type, (RowType, ArrayType, MapType)): + return _constructed_to_string_array(array, data_type).to_pylist() + return array.cast(pa.string(), safe=False).to_pylist() + + +def _constructed_to_string_array(array, file_type): + """Render a struct/list/map array in the engine's string form: + ROW -> ``{v1, v2}``, ARRAY -> ``[e1, e2]``, MAP -> ``{k1 -> v1, k2 -> v2}``. + Sub-values are rendered recursively; a NULL sub-value renders as the + literal ``null`` while a NULL container row stays NULL.""" + array = _unslice(array) + valid = pc.is_valid(array).to_pylist() + out = [] + if isinstance(file_type, RowType): + children = [ + _to_string_values(array.field(i), sub.type) + for i, sub in enumerate(file_type.fields) + ] + for i in range(len(array)): + if not valid[i]: + out.append(None) + continue + vals = [c[i] if c[i] is not None else 'null' for c in children] + out.append('{' + ', '.join(vals) + '}') + elif isinstance(file_type, ArrayType): + values = _to_string_values(array.values, file_type.element) + offsets = array.offsets.to_pylist() + for i in range(len(array)): + if not valid[i]: + out.append(None) + continue + elems = [v if v is not None else 'null' + for v in values[offsets[i]:offsets[i + 1]]] + out.append('[' + ', '.join(elems) + ']') + elif isinstance(file_type, MapType): + keys = _to_string_values(array.keys, file_type.key) + items = _to_string_values(array.items, file_type.value) + offsets = array.offsets.to_pylist() + for i in range(len(array)): + if not valid[i]: + out.append(None) + continue + entries = [ + '{} -> {}'.format( + keys[j] if keys[j] is not None else 'null', + items[j] if items[j] is not None else 'null') + for j in range(offsets[i], offsets[i + 1]) + ] + out.append('{' + ', '.join(entries) + '}') + else: + raise ValueError( + 'Unsupported constructed type for string rendering: {}'.format(file_type)) + return pa.array(out, type=pa.string()) + + class DataFileBatchReader(RecordBatchReader): """ Reads record batch from files of different formats @@ -57,55 +135,107 @@ def __init__(self, format_reader: RecordBatchReader, index_mapping: List[int], p self.file_io = file_io # Per-file field-id normalization: map the physically-read columns # (the file's own field order/names) onto the latest read target by - # field id, padding missing ids with NULL. ``None`` when there is no - # evolution to reconcile (identity) -- the common path stays zero-copy. - self._normalize_positions, self._normalize_names = \ - self._build_normalize_plan(file_data_fields, target_data_fields) + # field id, padding missing ids with NULL and recursing into nested + # ROW / ARRAY / MAP<.,ROW> sub-fields the same way. ``None`` when + # there is no evolution to reconcile -- the common path stays zero-copy. + self._normalize_plan = self._build_normalize_plan(file_data_fields, target_data_fields) @staticmethod def _build_normalize_plan(file_data_fields, target_data_fields): """Build a per-file field-id alignment plan. - Returns ``(positions, names)`` where ``positions[i]`` is the column - index in the physically-read batch carrying ``target_data_fields[i]`` - (matched by field id), or -1 if the file does not contain that id (pad - NULL). ``names[i]`` is the latest target name. Returns ``(None, None)`` - when the plan is the identity (no evolution), so the caller skips - normalization and stays zero-copy. + Returns a list of ``(pos, file_field, target_field)`` -- one per target + field, in target order -- where ``pos`` is the column index in the + physically-read batch carrying ``target_field`` (matched by field id), + or -1 if the file does not contain that id (pad NULL). Returns ``None`` + when the file already matches the target exactly (no evolution), so the + caller stays zero-copy. """ if file_data_fields is None or target_data_fields is None: - return None, None + return None + # Recursive equality covers nested sub-field changes too: any rename / + # add / drop / type change at any depth makes the file != target. + if file_data_fields == target_data_fields: + return None file_id_to_pos = {f.id: i for i, f in enumerate(file_data_fields)} - positions = [] - names = [] - # Identity only when every target maps to the same physical position - # AND already carries the same name -- a rename keeps the position but - # changes the name, which still requires a relabel pass. - identity = len(file_data_fields) == len(target_data_fields) - for i, target in enumerate(target_data_fields): + plan = [] + for target in target_data_fields: pos = file_id_to_pos.get(target.id, -1) - positions.append(pos) - names.append(target.name) - if pos != i or (pos >= 0 and file_data_fields[pos].name != target.name): - identity = False - if identity: - return None, None - return positions, names + file_field = file_data_fields[pos] if pos >= 0 else None + plan.append((pos, file_field, target)) + return plan def _normalize_batch(self, record_batch: RecordBatch) -> RecordBatch: """Reorder/pad the physically-read batch onto the latest read target by - field id, and relabel columns to the latest names. Missing ids become - all-NULL columns; types are reconciled later by _align_batch_to_read_schema.""" - if self._normalize_positions is None: + field id, relabel columns to the latest names, and align nested ROW + sub-fields by id. Missing ids become typed all-NULL columns.""" + if self._normalize_plan is None: return record_batch num_rows = record_batch.num_rows arrays = [] - for pos in self._normalize_positions: + names = [] + for pos, file_field, target_field in self._normalize_plan: + target_pa_type = PyarrowFieldParser.from_paimon_type(target_field.type) if pos < 0: - arrays.append(pa.nulls(num_rows)) + arrays.append(pa.nulls(num_rows, type=target_pa_type)) else: - arrays.append(record_batch.column(pos)) - return pa.RecordBatch.from_arrays(arrays, names=self._normalize_names) + arrays.append(self._align_array_by_id( + record_batch.column(pos), file_field.type, target_field.type)) + names.append(target_field.name) + return pa.RecordBatch.from_arrays(arrays, names=names) + + def _align_array_by_id(self, array, file_type, target_type): + """Return *array* converted to *target_type*, matching ROW sub-fields by + field id (reorder, pad missing with NULL, follow renames, cast changed + types) recursively, transparently through ARRAY/MAP wrappers.""" + if isinstance(target_type, RowType) and isinstance(file_type, RowType): + n = len(array) + file_id_to_pos = {f.id: i for i, f in enumerate(file_type.fields)} + children = [] + pa_fields = [] + for tsub in target_type.fields: + p = file_id_to_pos.get(tsub.id, -1) + if p < 0: + child = pa.nulls(n, type=PyarrowFieldParser.from_paimon_type(tsub.type)) + else: + child = self._align_array_by_id( + array.field(p), file_type.fields[p].type, tsub.type) + children.append(child) + pa_fields.append(pa.field(tsub.name, child.type, nullable=tsub.type.nullable)) + # Preserve the struct's own null mask; child values under a null + # struct are irrelevant. + return pa.StructArray.from_arrays( + children, fields=pa_fields, mask=pc.is_null(array)) + if isinstance(target_type, ArrayType) and isinstance(file_type, ArrayType): + array = _unslice(array) + aligned_values = self._align_array_by_id( + array.values, file_type.element, target_type.element) + return pa.ListArray.from_arrays( + array.offsets, aligned_values, mask=pc.is_null(array)) + if isinstance(target_type, MapType) and isinstance(file_type, MapType): + array = _unslice(array) + aligned_items = self._align_array_by_id( + array.items, file_type.value, target_type.value) + # MapArray.from_arrays cannot carry a null mask (a null map would + # collapse to an empty one), so rebuild from buffers, reusing the + # original validity/offset buffers and only swapping the value child. + target_pa = PyarrowFieldParser.from_paimon_type(target_type) + entries = pa.StructArray.from_arrays( + [array.keys, aligned_items], + fields=[target_pa.key_field, target_pa.item_field]) + return pa.Array.from_buffers( + target_pa, len(array), array.buffers()[:2], children=[entries]) + # A constructed type changed to a character string: pyarrow cannot + # cast struct/list/map to utf8 directly, so render the engine's + # string form instead. + if (isinstance(file_type, (RowType, ArrayType, MapType)) + and _is_character_string_type(target_type)): + return _constructed_to_string_array(array, file_type) + # Leaf / non-nested: cast to the target type when it differs. + target_pa_type = PyarrowFieldParser.from_paimon_type(target_type) + if array.type != target_pa_type: + return array.cast(target_pa_type, safe=False) + return array def read_arrow_batch(self, start_idx=None, end_idx=None) -> Optional[RecordBatch]: if isinstance(self.format_reader, FormatBlobReader): diff --git a/paimon-python/pypaimon/read/reader/nested_leaf_batch_reader.py b/paimon-python/pypaimon/read/reader/nested_leaf_batch_reader.py new file mode 100644 index 000000000000..8f042a9229e3 --- /dev/null +++ b/paimon-python/pypaimon/read/reader/nested_leaf_batch_reader.py @@ -0,0 +1,64 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +from typing import List, Optional + +import pyarrow as pa +import pyarrow.compute as pc +from pyarrow import RecordBatch + +from pypaimon.read.reader.iface.record_batch_reader import RecordBatchReader +from pypaimon.schema.data_types import DataField, PyarrowFieldParser + + +class NestedLeafBatchReader(RecordBatchReader): + """Extract projected nested leaves from batches of full top-level columns. + + The inner reader yields batches carrying the widened top-level columns, + already normalized to the latest schema by field id (renames followed, + missing sub-fields padded NULL, types cast). Each requested name path is + walked through the struct children (a NULL parent propagates to the + leaf), producing the user's flat projected schema. + """ + + def __init__(self, inner: RecordBatchReader, name_paths: List[List[str]], + output_fields: List[DataField]): + if len(name_paths) != len(output_fields): + raise ValueError( + "name_paths length {} does not match output_fields length {}".format( + len(name_paths), len(output_fields))) + self._inner = inner + self._paths = name_paths + self._schema = PyarrowFieldParser.from_paimon_schema(output_fields) + + def read_arrow_batch(self) -> Optional[RecordBatch]: + batch = self._inner.read_arrow_batch() + if batch is None: + return None + arrays = [] + for i, path in enumerate(self._paths): + column = batch.column(path[0]) + for name in path[1:]: + column = pc.struct_field(column, name) + target_type = self._schema.field(i).type + if column.type != target_type: + column = column.cast(target_type, safe=False) + arrays.append(column) + return pa.RecordBatch.from_arrays(arrays, schema=self._schema) + + def close(self) -> None: + self._inner.close() diff --git a/paimon-python/pypaimon/read/split_read.py b/paimon-python/pypaimon/read/split_read.py index 543c85893559..103d1e851091 100644 --- a/paimon-python/pypaimon/read/split_read.py +++ b/paimon-python/pypaimon/read/split_read.py @@ -627,6 +627,33 @@ def _genarate_deletion_file_readers(self): class RawFileSplitRead(SplitRead): + def __init__( + self, + table, + predicate: Optional[Predicate], + read_type: List[DataField], + split: Split, + row_tracking_enabled: bool, + outer_extract_name_paths: Optional[List[List[str]]] = None, + outer_flat_read_type: Optional[List[DataField]] = None, + limit: Optional[int] = None): + # Nested-leaf projection is NOT pushed down by name: a leaf path is + # only valid against the latest schema, while each data file stores + # its own (possibly renamed / retyped) sub-fields. Instead the read + # widens to the full top-level columns, which the per-file field-id + # normalization aligns to the latest schema, and the requested leaf + # paths are extracted afterwards (``outer_extract_name_paths``). + super().__init__( + table=table, + predicate=predicate, + read_type=read_type, + split=split, + row_tracking_enabled=row_tracking_enabled, + nested_name_paths=None, + limit=limit) + self.outer_extract_name_paths = outer_extract_name_paths + self.outer_flat_read_type = outer_flat_read_type + def raw_reader_supplier(self, file: DataFileMeta, dv_factory: Optional[Callable] = None) -> Optional[RecordReader]: read_fields = self._get_final_read_data_fields() # Check if this is a SlicedSplit to get shard_file_idx_map @@ -676,10 +703,43 @@ def create_reader(self) -> RecordReader: # if the table is appendonly table, we don't need extra filter, all predicates has pushed down if self.table.is_primary_key_table and self.predicate_for_reader: reader = FilterRecordReader(concat_reader, self.predicate_for_reader) + if self.outer_extract_name_paths: + # Row-level extraction: the filter evaluates rows in the + # widened top-level coordinate space, so extract after it. + from pypaimon.read.reader.outer_projection_record_reader import \ + OuterProjectionRecordReader + reader = OuterProjectionRecordReader( + reader, [f.name for f in self.read_fields], + self.outer_extract_name_paths, + file_io=self.table.file_io, + blob_field_indices=_blob_field_indices(self.read_fields), + vector_field_indices=_vector_field_indices(self.read_fields)) if self.limit is not None: reader = LimitedRecordReader(reader, self.limit) else: reader = concat_reader + if self.outer_extract_name_paths: + from pypaimon.read.reader.nested_leaf_batch_reader import \ + NestedLeafBatchReader + reader = NestedLeafBatchReader( + reader, self.outer_extract_name_paths, + self.outer_flat_read_type) + # A predicate on a projected nested leaf cannot be pushed down: + # its leaf path is absent from the widened top-level read + # fields, so SplitRead.__init__ dropped it (predicate_for_reader + # is None). Without re-applying it the filter is silently lost + # and every row is returned. Re-evaluate it on the extracted + # flat batches, whose column names match the predicate fields; + # trim to the projected columns so a filter on a non-projected + # column keeps the existing "dropped" semantics rather than + # referencing a missing column. + if self.predicate is not None and self.predicate_for_reader is None: + flat_names = [f.name for f in self.outer_flat_read_type] + trimmed = trim_predicate_by_fields(self.predicate, flat_names) + if trimmed is not None: + from pypaimon.read.reader.filter_record_batch_reader \ + import FilterRecordBatchReader + reader = FilterRecordBatchReader(reader, trimmed) if self.limit is not None: reader = LimitedRecordBatchReader(reader, self.limit) return reader @@ -699,6 +759,7 @@ def __init__( split: Split, row_tracking_enabled: bool, outer_extract_name_paths: Optional[List[List[str]]] = None, + outer_flat_read_type: Optional[List[DataField]] = None, limit: Optional[int] = None): # Merge functions need full ROW sub-structures, so nested paths # are not pushed down here; sub-path extraction happens above @@ -713,6 +774,7 @@ def __init__( limit=limit, ) self.outer_extract_name_paths = outer_extract_name_paths + self.outer_flat_read_type = outer_flat_read_type # Built once per split-read (value_fields and options are constant # for the object's life), not per section. ``None`` when # ``sequence.field`` is unset, in which case the heap falls back to @@ -811,6 +873,21 @@ def create_reader(self) -> RecordReader: file_io=self.table.file_io, blob_field_indices=_blob_field_indices(inner_value_fields), vector_field_indices=_vector_field_indices(inner_value_fields)) + # A predicate on a projected nested leaf is not pushed down (its leaf + # path is absent from the widened-to-full-ROW read fields, so it was + # dropped in __init__). Without re-applying it after extraction the + # filter is silently lost. Evaluate it on the extracted flat rows, + # whose fields are outer_flat_read_type; trim to the projected + # columns and rewrite indices into that flat row. + if (self.predicate is not None and self.predicate_for_reader is None + and self.outer_flat_read_type is not None): + flat_names = [f.name for f in self.outer_flat_read_type] + trimmed = trim_predicate_by_fields(self.predicate, flat_names) + if trimmed is not None: + reader = FilterRecordReader( + reader, + rewrite_predicate_indices( + trimmed, self.outer_flat_read_type)) if self.limit is not None: reader = LimitedRecordReader(reader, self.limit) return reader diff --git a/paimon-python/pypaimon/read/table_read.py b/paimon-python/pypaimon/read/table_read.py index 57e704b2b550..1c29025fdb9e 100644 --- a/paimon-python/pypaimon/read/table_read.py +++ b/paimon-python/pypaimon/read/table_read.py @@ -585,6 +585,8 @@ def _create_split_read(self, split: Split) -> SplitRead: split=split, row_tracking_enabled=False, outer_extract_name_paths=outer_extract_name_paths, + outer_flat_read_type=( + self.read_type if outer_extract_name_paths else None), limit=self.limit, ) elif self.table.options.data_evolution_enabled(): @@ -603,13 +605,26 @@ def _create_split_read(self, split: Split) -> SplitRead: limit=self.limit, ) else: + inner_read_type = self.read_type + outer_extract_name_paths: Optional[List[List[str]]] = None + if self.nested_name_paths and any( + len(p) > 1 for p in self.nested_name_paths): + # Mirror the merge path: read the full top-level columns so + # the per-file field-id normalization applies (a leaf path is + # only valid against the latest schema, not each file's own + # names/types), then extract the requested sub-paths back to + # the user's flat schema. + inner_read_type = self._widen_to_top_level_for_merge() + outer_extract_name_paths = self.nested_name_paths return RawFileSplitRead( table=self.table, predicate=self.predicate, - read_type=self.read_type, + read_type=inner_read_type, split=split, row_tracking_enabled=self.table.options.row_tracking_enabled(), - nested_name_paths=self.nested_name_paths, + outer_extract_name_paths=outer_extract_name_paths, + outer_flat_read_type=( + self.read_type if outer_extract_name_paths else None), limit=self.limit, ) diff --git a/paimon-python/pypaimon/schema/data_types.py b/paimon-python/pypaimon/schema/data_types.py index 9530996a9d6d..3ada7a62aa65 100755 --- a/paimon-python/pypaimon/schema/data_types.py +++ b/paimon-python/pypaimon/schema/data_types.py @@ -25,6 +25,15 @@ import pyarrow from pyarrow import types +# Field ids at or above this value are reserved for system fields (sequence +# number, value kind, row id, ...). User field ids stay strictly below it, so +# the highest-user-field-id computation can ignore anything from here up. +SYSTEM_FIELD_ID_START = 2147483647 // 2 + + +def is_system_field_id(field_id: int) -> bool: + return field_id >= SYSTEM_FIELD_ID_START + class AtomicInteger: @@ -370,6 +379,73 @@ def __str__(self) -> str: return "ROW<{}>{}".format(', '.join(field_strs), null_suffix) +def reassign_field_id(data_type: DataType, field_id: "AtomicInteger") -> DataType: + """Return a copy of *data_type* with every nested field id reassigned from + *field_id*, depth-first with children allocated before their parent field. + + Mirrors the canonical id-reassignment used when a column (possibly carrying + a nested ROW/ARRAY/MAP) is added, so nested subfields get globally-unique + ids drawn from the schema's running counter rather than struct-local ones. + """ + if isinstance(data_type, RowType): + new_fields = [] + for field in data_type.fields: + # Visit the nested type first, then allocate this field's id, so the + # ordering matches the rest of the engine ecosystem. + new_type = reassign_field_id(field.type, field_id) + new_id = field_id.increment_and_get() + new_fields.append(DataField( + new_id, field.name, new_type, field.description, field.default_value)) + return RowType(data_type.nullable, new_fields) + if isinstance(data_type, ArrayType): + return ArrayType(data_type.nullable, reassign_field_id(data_type.element, field_id)) + if isinstance(data_type, VectorType): + return VectorType( + data_type.nullable, reassign_field_id(data_type.element, field_id), data_type.length) + if isinstance(data_type, MultisetType): + return MultisetType(data_type.nullable, reassign_field_id(data_type.element, field_id)) + if isinstance(data_type, MapType): + new_key = reassign_field_id(data_type.key, field_id) + new_value = reassign_field_id(data_type.value, field_id) + return MapType(data_type.nullable, new_key, new_value) + return data_type + + +def collect_field_ids(data_type: DataType, field_ids: set): + """Collect all (nested) field ids reachable from *data_type* into *field_ids*, + raising on a duplicate id (a broken schema).""" + if isinstance(data_type, RowType): + for field in data_type.fields: + if field.id in field_ids: + raise ValueError( + "Broken schema, field id {} is duplicated.".format(field.id)) + field_ids.add(field.id) + collect_field_ids(field.type, field_ids) + elif isinstance(data_type, (ArrayType, VectorType, MultisetType)): + collect_field_ids(data_type.element, field_ids) + elif isinstance(data_type, MapType): + collect_field_ids(data_type.key, field_ids) + collect_field_ids(data_type.value, field_ids) + + +def current_highest_field_id(fields: List[DataField]) -> int: + """Highest user field id across *fields*, recursing into nested ROW/ARRAY/MAP. + + System field ids are excluded. Returns -1 for an empty/system-only schema. + The result is persisted as ``highestFieldId``; later schema changes seed + their id counter from the stored value (not from the live fields, since a + dropped field may have carried a higher id than any survivor). + """ + field_ids = set() + for field in fields: + if field.id in field_ids: + raise ValueError("Broken schema, field id {} is duplicated.".format(field.id)) + field_ids.add(field.id) + collect_field_ids(field.type, field_ids) + user_ids = [fid for fid in field_ids if not is_system_field_id(fid)] + return max(user_ids) if user_ids else -1 + + class Keyword(Enum): CHAR = "CHAR" VARCHAR = "VARCHAR" @@ -408,7 +484,18 @@ def parse_nullability(type_string: str) -> bool: @staticmethod def parse_atomic_type_sql_string(type_string: str) -> DataType: + nullable = DataTypeParser.parse_nullability(type_string) type_upper = type_string.upper().strip() + # Strip the trailing nullability suffix so it is stored only in + # ``nullable``, not baked into the atomic type string. The space-split + # branch below drops it for plain types ("BIGINT NOT NULL"), but a + # parameterized type ("DECIMAL(12, 2) NOT NULL", "VARCHAR(10) NOT NULL") + # takes the paren branch and would otherwise keep the suffix in + # ``AtomicType.type`` -- doubling it on the next ``to_dict()``. + for suffix in (" NOT NULL", " NULL"): + if type_upper.endswith(suffix): + type_upper = type_upper[: -len(suffix)].rstrip() + break if "(" in type_upper: base_type = type_upper.split("(")[0] @@ -420,9 +507,7 @@ def parse_atomic_type_sql_string(type_string: str) -> DataType: try: Keyword(base_type) - return AtomicType( - type_upper, DataTypeParser.parse_nullability(type_string) - ) + return AtomicType(type_upper, nullable) except ValueError: raise Exception("Unknown type: {}".format(base_type)) @@ -717,12 +802,21 @@ def to_paimon_field(field_idx: int, pa_field: pyarrow.Field) -> DataField: @staticmethod def to_paimon_schema(pa_schema: pyarrow.Schema) -> List[DataField]: - # Convert PyArrow schema to Paimon fields + # Convert PyArrow schema to Paimon fields, assigning globally-unique ids: + # each top-level field takes the next id, then its (possibly nested) type + # has its subfield ids reassigned from the same running counter. A flat + # schema keeps the plain 0,1,2,... ids; nested subfields get ids that do + # not collide with top-level ones. + field_id = AtomicInteger(-1) fields = [] - for i, pa_field in enumerate(pa_schema): + for pa_field in pa_schema: pa_field: pyarrow.Field - data_field = PyarrowFieldParser.to_paimon_field(i, pa_field) - fields.append(data_field) + top_id = field_id.increment_and_get() + data_type = PyarrowFieldParser.to_paimon_type(pa_field.type, pa_field.nullable) + data_type = reassign_field_id(data_type, field_id) + description = pa_field.metadata.get(b'description', b'').decode('utf-8') \ + if pa_field.metadata and b'description' in pa_field.metadata else None + fields.append(DataField(top_id, pa_field.name, data_type, description)) return fields @staticmethod diff --git a/paimon-python/pypaimon/schema/schema_manager.py b/paimon-python/pypaimon/schema/schema_manager.py index d01549c71bc5..d82a4f8a2012 100644 --- a/paimon-python/pypaimon/schema/schema_manager.py +++ b/paimon-python/pypaimon/schema/schema_manager.py @@ -26,7 +26,9 @@ from pypaimon.schema.column_directive_utils import ( apply_add_column_directive, apply_directives, remove_dropped_directive_options) -from pypaimon.schema.data_types import AtomicInteger, DataField +from pypaimon.casting.data_type_casts import can_execute_cast, supports_cast +from pypaimon.schema.data_types import (ArrayType, AtomicInteger, DataField, + MapType, RowType, reassign_field_id) from pypaimon.schema.schema import Schema from pypaimon.schema.schema_change import (AddColumn, DropColumn, RemoveOption, RenameColumn, SchemaChange, @@ -44,6 +46,123 @@ def _find_field_index(fields: List[DataField], field_name: str) -> Optional[int] return None +def _extract_row_data_fields(data_type, out_fields: List[DataField], + field_names: List[str], token_pos: int) -> int: + """Collect the immediate sub-fields reachable from *data_type* into + *out_fields* and return the path depth consumed. A ROW contributes its + fields (depth 1); an ARRAY/MAP is transparent and descends into its + element/value, consuming the ``element``/``value`` path token -- the + consumed token is validated so an unknown step cannot silently mutate + the schema; anything else contributes nothing (depth 1).""" + if isinstance(data_type, RowType): + out_fields.extend(data_type.fields) + return 1 + if isinstance(data_type, ArrayType): + _assert_wrapper_token(field_names, token_pos, 'element') + return _extract_row_data_fields( + data_type.element, out_fields, field_names, token_pos + 1) + 1 + if isinstance(data_type, MapType): + _assert_wrapper_token(field_names, token_pos, 'value') + return _extract_row_data_fields( + data_type.value, out_fields, field_names, token_pos + 1) + 1 + return 1 + + +def _assert_wrapper_token(field_names: List[str], token_pos: int, expected: str): + # A path that ends inside the wrappers (token_pos out of range) is the + # update-the-wrapped-type-itself case, handled by the caller's overflow + # branch; only a present-but-wrong token is rejected. + if token_pos < len(field_names) and field_names[token_pos] != expected: + raise ColumnNotExistException('.'.join(field_names)) + + +def _wrap_new_row_type(data_type, nested_fields: List[DataField]): + """Rebuild *data_type* substituting *nested_fields* at its innermost ROW, + preserving any ARRAY/MAP wrappers.""" + if isinstance(data_type, RowType): + return RowType(data_type.nullable, nested_fields) + if isinstance(data_type, ArrayType): + return ArrayType(data_type.nullable, _wrap_new_row_type(data_type.element, nested_fields)) + if isinstance(data_type, MapType): + return MapType( + data_type.nullable, data_type.key, + _wrap_new_row_type(data_type.value, nested_fields)) + return data_type + + +def _get_root_type(data_type, curr_depth: int, max_depth: int): + """Return the type sitting at ``max_depth`` when walking ARRAY/MAP wrappers + from *data_type* (e.g. the INT in ARRAY>>).""" + if curr_depth == max_depth - 1: + return data_type + if isinstance(data_type, ArrayType): + return _get_root_type(data_type.element, curr_depth + 1, max_depth) + if isinstance(data_type, MapType): + return _get_root_type(data_type.value, curr_depth + 1, max_depth) + return data_type + + +def _get_array_map_type_with_target_type_root(source, target, curr_depth: int, max_depth: int): + """Rebuild *source* with *target* substituted at ``max_depth``, keeping the + ARRAY/MAP wrappers around it intact.""" + if curr_depth == max_depth - 1: + return target + if isinstance(source, ArrayType): + return ArrayType( + source.nullable, + _get_array_map_type_with_target_type_root( + source.element, target, curr_depth + 1, max_depth)) + if isinstance(source, MapType): + return MapType( + source.nullable, source.key, + _get_array_map_type_with_target_type_root( + source.value, target, curr_depth + 1, max_depth)) + return target + + +def _update_intermediate_column(new_fields, previous_fields, depth, prev_depth, + field_names, update_last_fn): + """Walk *field_names* into nested ROW (transparently through ARRAY/MAP), + then run *update_last_fn(depth, fields, name)* on the field list that + directly contains the final path element, rebuilding parent types upward.""" + if depth == len(field_names) - 1: + update_last_fn(depth, new_fields, field_names[depth]) + return + if depth >= len(field_names): + # Path descended through ARRAY/MAP past the last ROW; operate on the + # field that owns the wrapper at the previous depth. + update_last_fn(prev_depth, previous_fields, field_names[prev_depth]) + return + for i, field in enumerate(new_fields): + if field.name != field_names[depth]: + continue + nested_fields: List[DataField] = [] + new_depth = depth + _extract_row_data_fields( + field.type, nested_fields, field_names, depth + 1) + _update_intermediate_column( + nested_fields, new_fields, new_depth, depth, field_names, update_last_fn) + field = new_fields[i] + new_fields[i] = DataField( + field.id, field.name, + _wrap_new_row_type(field.type, nested_fields), + field.description, field.default_value) + return + raise ColumnNotExistException('.'.join(field_names[:depth + 1])) + + +def _modify_nested_column(new_fields, field_names, update_last_fn): + _update_intermediate_column(new_fields, new_fields, 0, 0, field_names, update_last_fn) + + +def _update_nested_column(new_fields, field_names, update_func): + def update_last(depth, fields, field_name): + idx = _find_field_index(fields, field_name) + if idx is None: + raise ColumnNotExistException('.'.join(field_names)) + fields[idx] = update_func(fields[idx], depth) + _modify_nested_column(new_fields, field_names, update_last) + + def _get_rename_mappings(changes: List[SchemaChange]) -> dict: rename_mappings = {} for change in changes: @@ -55,49 +174,87 @@ def _get_rename_mappings(changes: List[SchemaChange]) -> dict: def _handle_update_column_comment( change: UpdateColumnComment, new_fields: List[DataField] ): - field_name = change.field_names[-1] - field_index = _find_field_index(new_fields, field_name) - if field_index is None: - raise ColumnNotExistException(field_name) - field = new_fields[field_index] - new_fields[field_index] = DataField( - field.id, field.name, field.type, change.new_comment, field.default_value - ) + def update_func(field: DataField, depth: int) -> DataField: + return DataField( + field.id, field.name, field.type, change.new_comment, field.default_value + ) + _update_nested_column(new_fields, change.field_names, update_func) + + +def _assert_nullability_change(old_nullability: bool, new_nullability: bool, + field_name: str, disable_null_to_not_null: bool): + if disable_null_to_not_null and old_nullability and not new_nullability: + raise ValueError( + "Cannot update column type from nullable to non nullable for {}. " + "You can set table configuration option " + "'alter-column-null-to-not-null.disabled' = 'false' " + "to allow converting null columns to not null".format(field_name) + ) def _handle_update_column_nullability( - change: UpdateColumnNullability, new_fields: List[DataField] + change: UpdateColumnNullability, new_fields: List[DataField], + disable_null_to_not_null: bool ): - field_name = change.field_names[-1] - field_index = _find_field_index(new_fields, field_name) - if field_index is None: - raise ColumnNotExistException(field_name) - field = new_fields[field_index] from pypaimon.schema.data_types import DataTypeParser - field_type_dict = field.type.to_dict() - new_type = DataTypeParser.parse_data_type(field_type_dict) - new_type.nullable = change.new_nullability - new_fields[field_index] = DataField( - field.id, field.name, new_type, field.description, field.default_value - ) + field_names = change.field_names + max_depth = len(field_names) + + def update_func(field: DataField, depth: int) -> DataField: + source_root = _get_root_type(field.type, depth, max_depth) + _assert_nullability_change( + source_root.nullable, change.new_nullability, + '.'.join(field_names), disable_null_to_not_null) + new_root = DataTypeParser.parse_data_type(source_root.to_dict()) + new_root.nullable = change.new_nullability + new_type = _get_array_map_type_with_target_type_root( + field.type, new_root, depth, max_depth) + return DataField( + field.id, field.name, new_type, field.description, field.default_value + ) + _update_nested_column(new_fields, field_names, update_func) def _handle_update_column_type( - change: UpdateColumnType, new_fields: List[DataField] + change: UpdateColumnType, new_fields: List[DataField], + disable_null_to_not_null: bool ): - field_name = change.field_names[-1] - field_index = _find_field_index(new_fields, field_name) - if field_index is None: - raise ColumnNotExistException(field_name) - field = new_fields[field_index] from pypaimon.schema.data_types import DataTypeParser - new_type_dict = change.new_data_type.to_dict() - new_type = DataTypeParser.parse_data_type(new_type_dict) - if change.keep_nullability: - new_type.nullable = field.type.nullable - new_fields[field_index] = DataField( - field.id, field.name, new_type, field.description, field.default_value - ) + field_names = change.field_names + max_depth = len(field_names) + + def update_func(field: DataField, depth: int) -> DataField: + source_root = _get_root_type(field.type, depth, max_depth) + target_root = DataTypeParser.parse_data_type(change.new_data_type.to_dict()) + if change.keep_nullability: + target_root.nullable = source_root.nullable + else: + # A type change carries its own nullability; guard nullable -> + # not null just like UpdateColumnNullability (mirrors Java + # SchemaManager#updateColumnType). + _assert_nullability_change( + source_root.nullable, target_root.nullable, + '.'.join(field_names), disable_null_to_not_null) + if not supports_cast(source_root, target_root): + raise ValueError( + "Column type {}[{}] cannot be converted to {} without losing information." + .format(field.name, source_root, target_root) + ) + # Logical cast support is not enough: the read path materializes the + # change via PyArrow when reading old files, so reject casts it cannot + # execute (mirrors Java's CastExecutors.resolve(...) != null check). + if not can_execute_cast(source_root, target_root): + raise ValueError( + "Column type {}[{}] cannot be converted to {}: the read path " + "has no executable cast for this conversion." + .format(field.name, source_root, target_root) + ) + new_type = _get_array_map_type_with_target_type_root( + field.type, target_root, depth, max_depth) + return DataField( + field.id, field.name, new_type, field.description, field.default_value + ) + _update_nested_column(new_fields, field_names, update_func) def _drop_column_validation(schema: 'TableSchema', change: DropColumn): @@ -112,17 +269,20 @@ def _drop_column_validation(schema: 'TableSchema', change: DropColumn): def _handle_drop_column(change: DropColumn, new_fields: List[DataField], new_options: dict): - field_name = change.field_names[-1] - field_index = _find_field_index(new_fields, field_name) - if field_index is None: - raise ColumnNotExistException(field_name) - if len(change.field_names) == 1: - field = new_fields[field_index] - type_root = _get_type_root(field.type) - remove_dropped_directive_options(field_name, type_root, new_options) - new_fields.pop(field_index) - if not new_fields: - raise ValueError("Cannot drop all fields in table") + field_names = change.field_names + + def update_last(depth, fields, field_name): + field_index = _find_field_index(fields, field_name) + if field_index is None: + raise ColumnNotExistException(field_name) + if len(field_names) == 1: + field = fields[field_index] + type_root = _get_type_root(field.type) + remove_dropped_directive_options(field_name, type_root, new_options) + fields.pop(field_index) + if not fields: + raise ValueError("Cannot drop all fields in table") + _modify_nested_column(new_fields, field_names, update_last) def _get_type_root(data_type) -> str: @@ -274,17 +434,19 @@ def _validate_blob_external_storage_fields(fields: List[DataField], options: dic def _handle_rename_column(change: RenameColumn, new_fields: List[DataField]): - field_name = change.field_names[-1] new_name = change.new_name - field_index = _find_field_index(new_fields, field_name) - if field_index is None: - raise ColumnNotExistException(field_name) - if _find_field_index(new_fields, new_name) is not None: - raise ColumnAlreadyExistException(new_name) - field = new_fields[field_index] - new_fields[field_index] = DataField( - field.id, new_name, field.type, field.description, field.default_value - ) + + def update_last(depth, fields, field_name): + field_index = _find_field_index(fields, field_name) + if field_index is None: + raise ColumnNotExistException(field_name) + if _find_field_index(fields, new_name) is not None: + raise ColumnAlreadyExistException(new_name) + field = fields[field_index] + fields[field_index] = DataField( + field.id, new_name, field.type, field.description, field.default_value + ) + _modify_nested_column(new_fields, change.field_names, update_last) def _apply_move(fields: List[DataField], new_field: Optional[DataField], move): @@ -332,11 +494,11 @@ def _handle_add_column( f"Column {'.'.join(change.field_names)} cannot specify NOT NULL in the table." ) field_id = highest_field_id.increment_and_get() + # Reassign ids of any nested sub-fields the new column carries (a ROW/ARRAY/ + # MAP type) so they draw globally-unique ids from the running counter. + data_type = reassign_field_id(change.data_type, highest_field_id) field_name = change.field_names[-1] - if _find_field_index(new_fields, field_name) is not None: - raise ColumnAlreadyExistException(field_name) - data_type = change.data_type comment = change.comment converted = apply_add_column_directive(comment, field_name, data_type, new_options) if converted is not None: @@ -349,21 +511,26 @@ def _handle_add_column( comment = converted.comment new_field = DataField(field_id, field_name, data_type, comment) - if change.move: - _apply_move(new_fields, new_field, change.move) - elif ( - add_column_before_partition - and partition_keys - and len(change.field_names) == 1 - ): - insert_index = len(new_fields) - for i, field in enumerate(new_fields): - if field.name in partition_keys: - insert_index = i - break - new_fields.insert(insert_index, new_field) - else: - new_fields.append(new_field) + + def update_last(depth, fields, fname): + if _find_field_index(fields, fname) is not None: + raise ColumnAlreadyExistException(fname) + if change.move: + _apply_move(fields, new_field, change.move) + elif ( + add_column_before_partition + and partition_keys + and len(change.field_names) == 1 + ): + insert_index = len(fields) + for i, field in enumerate(fields): + if field.name in partition_keys: + insert_index = i + break + fields.insert(insert_index, new_field) + else: + fields.append(new_field) + _modify_nested_column(new_fields, change.field_names, update_last) class SchemaManager: @@ -517,6 +684,10 @@ def _generate_table_schema( # Get add_column_before_partition option add_column_before_partition = CoreOptions(Options(old_table_schema.options)).add_column_before_partition() partition_keys = old_table_schema.partition_keys + # Converting a nullable column to NOT NULL is unsafe for existing + # data and is disabled by default; the table option below opts in. + disable_null_to_not_null = str(old_table_schema.options.get( + 'alter-column-null-to-not-null.disabled', 'true')).lower() != 'false' for change in changes: if isinstance(change, SetOption): @@ -547,13 +718,15 @@ def _generate_table_schema( _assert_not_updating_primary_keys( old_table_schema, change.field_names, "update" ) - _handle_update_column_type(change, new_fields) + _handle_update_column_type( + change, new_fields, disable_null_to_not_null) elif isinstance(change, UpdateColumnNullability): if change.new_nullability: _assert_not_updating_primary_keys( old_table_schema, change.field_names, "change nullability of" ) - _handle_update_column_nullability(change, new_fields) + _handle_update_column_nullability( + change, new_fields, disable_null_to_not_null) elif isinstance(change, UpdateColumnComment): _handle_update_column_comment(change, new_fields) elif isinstance(change, UpdateColumnPosition): diff --git a/paimon-python/pypaimon/schema/table_schema.py b/paimon-python/pypaimon/schema/table_schema.py index 52324ae7f607..d8586b28d139 100644 --- a/paimon-python/pypaimon/schema/table_schema.py +++ b/paimon-python/pypaimon/schema/table_schema.py @@ -23,7 +23,7 @@ from pypaimon.common.options.core_options import CoreOptions from pypaimon.common.file_io import FileIO from pypaimon.common.json_util import json_field -from pypaimon.schema.data_types import DataField +from pypaimon.schema.data_types import DataField, current_highest_field_id from pypaimon.schema.schema import Schema @@ -115,7 +115,7 @@ def from_schema(schema_id: int, schema: Schema) -> "TableSchema": partition_keys: List[str] = schema.partition_keys primary_keys: List[str] = schema.primary_keys options: Dict[str, str] = schema.options - highest_field_id: int = max((field.id for field in fields), default=0) + highest_field_id: int = current_highest_field_id(fields) return TableSchema( TableSchema.CURRENT_VERSION, diff --git a/paimon-python/pypaimon/table/special_fields.py b/paimon-python/pypaimon/table/special_fields.py index 64d2429bef7d..94dbfacc9ad0 100644 --- a/paimon-python/pypaimon/table/special_fields.py +++ b/paimon-python/pypaimon/table/special_fields.py @@ -17,7 +17,7 @@ from typing import List -from ..schema.data_types import AtomicType, DataField +from ..schema.data_types import AtomicType, DataField, SYSTEM_FIELD_ID_START class SpecialFields: @@ -35,11 +35,18 @@ class SpecialFields: '_ROW_ID' } + SYSTEM_FIELD_ID_START = SYSTEM_FIELD_ID_START + @staticmethod def is_system_field(field_name: str) -> bool: """Check if a field is a system field.""" return field_name in SpecialFields.SYSTEM_FIELD_NAMES + @staticmethod + def is_system_field_id(field_id: int) -> bool: + """Check if a field id is reserved for system fields.""" + return field_id >= SYSTEM_FIELD_ID_START + @staticmethod def find_system_fields(read_fields: List[DataField]) -> dict: """Find system fields in read fields and return a mapping of field name to index.""" diff --git a/paimon-python/pypaimon/tests/data_types_test.py b/paimon-python/pypaimon/tests/data_types_test.py index 5c8d86e55b2a..fb817ba942bc 100755 --- a/paimon-python/pypaimon/tests/data_types_test.py +++ b/paimon-python/pypaimon/tests/data_types_test.py @@ -42,6 +42,23 @@ def test_atomic_type(self): self.assertEqual(str(AtomicType("INT")), str(AtomicType.from_dict(AtomicType("INT").to_dict()))) + def test_parameterized_atomic_type_not_null_roundtrip(self): + # ``to_dict`` appends " NOT NULL" to the type string; the parser must + # strip it back into ``nullable`` instead of keeping it inside + # ``AtomicType.type``. Parameterized types take the paren branch where + # this used to be missed, so a re-serialize doubled the suffix and + # ``from_paimon_type`` blew up with "... NOT NULL NOT NULL". + for type_str in ("DECIMAL(12, 2)", "VARCHAR(10)", "CHAR(5)", + "TIMESTAMP(3)", "TIME(0)", "BINARY(12)"): + original = AtomicType(type_str, nullable=False) + parsed = AtomicType.from_dict(original.to_dict()) + self.assertEqual(parsed.type, type_str, type_str) + self.assertFalse(parsed.nullable, type_str) + self.assertEqual(parsed, original, type_str) + # Round-trips stably and stays materializable as a PyArrow type. + self.assertEqual(parsed.to_dict(), original.to_dict(), type_str) + PyarrowFieldParser.from_paimon_type(parsed) + @parameterized.expand([ (ArrayType, AtomicType("TIMESTAMP(6)"), "ARRAY", "ARRAY>"), (MultisetType, AtomicType("TIMESTAMP(6)"), "MULTISET", "MULTISET>") diff --git a/paimon-python/pypaimon/tests/filesystem_catalog_test.py b/paimon-python/pypaimon/tests/filesystem_catalog_test.py index 5f5f21679479..0251e7fd8804 100644 --- a/paimon-python/pypaimon/tests/filesystem_catalog_test.py +++ b/paimon-python/pypaimon/tests/filesystem_catalog_test.py @@ -219,6 +219,115 @@ def test_alter_table(self): table = catalog.get_table(identifier) self.assertEqual(len(table.fields), 2) + def test_update_column_type_guards_null_to_not_null(self): + catalog = CatalogFactory.create({"warehouse": self.warehouse}) + catalog.create_database("test_db_guard", False) + + def _make_table(name, options): + identifier = "test_db_guard.{}".format(name) + schema = Schema( + fields=[ + DataField.from_dict({"id": 0, "name": "k", "type": "INT"}), + DataField.from_dict({"id": 1, "name": "v", "type": "BIGINT"}), + ], + partition_keys=[], primary_keys=[], options=options, comment="", + ) + catalog.create_table(identifier, schema, False) + return identifier + + # Default option (disabled=true) rejects nullable -> not null, mirroring + # Java SchemaManager#updateColumnType. + default_id = _make_table("default_opt", {}) + with self.assertRaises(RuntimeError) as ctx: + catalog.alter_table( + default_id, + [SchemaChange.update_column_type( + "v", AtomicType("BIGINT", nullable=False))], + False) + self.assertIn("nullable to non nullable", str(ctx.exception)) + + # Opting out via the table option allows the transition. + allowed_id = _make_table( + "allow_opt", {"alter-column-null-to-not-null.disabled": "false"}) + catalog.alter_table( + allowed_id, + [SchemaChange.update_column_type( + "v", AtomicType("BIGINT", nullable=False))], + False) + table = catalog.get_table(allowed_id) + self.assertFalse(table.fields[1].type.nullable) + + def test_update_column_type_rejects_non_executable_cast(self): + catalog = CatalogFactory.create({"warehouse": self.warehouse}) + catalog.create_database("test_db_cast", False) + + identifier = "test_db_cast.ts_table" + schema = Schema( + fields=[ + DataField.from_dict({"id": 0, "name": "k", "type": "INT"}), + DataField.from_dict({"id": 1, "name": "ts", "type": "TIMESTAMP(3)"}), + ], + partition_keys=[], primary_keys=[], options={}, comment="", + ) + catalog.create_table(identifier, schema, False) + + # TIMESTAMP -> DECIMAL is logically allowed but has no PyArrow cast + # kernel, so the read path could not materialize it. Reject at alter + # time (mirrors Java's CastExecutors.resolve(...) != null check) instead + # of failing later at read with ArrowNotImplementedError. + with self.assertRaises(RuntimeError) as ctx: + catalog.alter_table( + identifier, + [SchemaChange.update_column_type( + "ts", AtomicType("DECIMAL(10, 0)"))], + False) + self.assertIn("no executable cast", str(ctx.exception)) + + # INT -> DECIMAL(10, 2) has a PyArrow kernel but the target precision is + # too small to hold an int's range at scale 2 (needs >= 12); the read + # path would fail with ArrowInvalid, so reject it at alter time too. + with self.assertRaises(RuntimeError) as ctx: + catalog.alter_table( + identifier, + [SchemaChange.update_column_type( + "k", AtomicType("DECIMAL(10, 2)"))], + False) + self.assertIn("no executable cast", str(ctx.exception)) + + # A wide-enough DECIMAL is executable and succeeds. + catalog.alter_table( + identifier, + [SchemaChange.update_column_type("k", AtomicType("DECIMAL(12, 2)"))], + False) + table = catalog.get_table(identifier) + self.assertEqual(table.fields[0].type.type, "DECIMAL(12, 2)") + + def test_update_column_type_parameterized_not_null_target(self): + catalog = CatalogFactory.create({"warehouse": self.warehouse}) + catalog.create_database("test_db_nn_param", False) + identifier = "test_db_nn_param.t" + schema = Schema( + fields=[ + DataField.from_dict({"id": 0, "name": "v", "type": "INT NOT NULL"}), + DataField.from_dict({"id": 1, "name": "s", "type": "STRING"}), + ], + partition_keys=[], primary_keys=[], options={}, comment="", + ) + catalog.create_table(identifier, schema, False) + + # Widening a non-null INT to a non-null DECIMAL(12, 2) is valid. The + # target's to_dict() is "DECIMAL(12, 2) NOT NULL"; the atomic parser must + # keep the nullability in `nullable` so the executable-cast check does + # not choke on a doubled "NOT NULL NOT NULL" type string. + catalog.alter_table( + identifier, + [SchemaChange.update_column_type( + "v", AtomicType("DECIMAL(12, 2)", nullable=False))], + False) + table = catalog.get_table(identifier) + self.assertEqual(table.fields[0].type.type, "DECIMAL(12, 2)") + self.assertFalse(table.fields[0].type.nullable) + def test_add_column_before_partition(self): catalog = CatalogFactory.create({ "warehouse": self.warehouse diff --git a/paimon-python/pypaimon/tests/schema_evolution_nested_read_test.py b/paimon-python/pypaimon/tests/schema_evolution_nested_read_test.py index c9147b823fc1..7a9971c44956 100644 --- a/paimon-python/pypaimon/tests/schema_evolution_nested_read_test.py +++ b/paimon-python/pypaimon/tests/schema_evolution_nested_read_test.py @@ -22,11 +22,10 @@ * Whole-column evolution of a top-level struct/array/map column (add / drop / rename / projection) -- aligned by the column's field id. * Sub-field evolution INSIDE a struct (add/rename/update-type/drop a nested - field via a dotted ``field_names`` path) -- this is NOT implemented: - ``schema_manager`` only operates on the top-level ``field_names[-1]``. - ``SchemaEvolutionNestedGapTest`` locks in the current behaviour with - explicit assertions so the gap is documented and any future fix is - noticed. + field via a dotted ``field_names`` path), including sub-fields of a ROW + nested in an ARRAY/MAP. Sub-fields are aligned by field id, so a rename + follows the data, an added sub-field reads NULL for old rows, a dropped one + is not revived, and a type change is cast at read time. """ import os @@ -37,7 +36,13 @@ import pyarrow as pa from pypaimon import CatalogFactory, Schema -from pypaimon.schema.data_types import AtomicType, PyarrowFieldParser +from pypaimon.casting.data_type_casts import can_execute_cast, supports_cast +from pypaimon.schema.data_types import (ArrayType, AtomicInteger, AtomicType, + DataField, MapType, MultisetType, + PyarrowFieldParser, RowType, + VectorType, collect_field_ids, + current_highest_field_id, + reassign_field_id) from pypaimon.schema.schema_change import SchemaChange @@ -240,18 +245,12 @@ def test_rename_struct_column(self): {'id': 2, 'mv2': {'latest_version': 200, 'latest_value': 'b'}, 'val': 'y'}]) -class SchemaEvolutionNestedGapTest(_NestedBase): - """Sub-field-level evolution inside a struct is NOT implemented. +class SchemaEvolutionNestedSubfieldTest(_NestedBase): + """Sub-field evolution inside a struct, aligned by field id.""" - schema_manager handles only the top-level ``field_names[-1]``; a dotted - path like ``['mv', 'latest_value']`` never recurses into the RowType. - These tests assert the current behaviour (silent top-level mutation, or - ColumnNotExistException) so the gap is documented. - """ - - def _create_struct_table(self, name): + def _create_struct_table(self, name, primary_keys=None, bucket='-1'): s0 = pa.schema([('id', pa.int64()), ('mv', _MV_PA), ('val', pa.string())]) - table = self._create(name, s0) + table = self._create(name, s0, primary_keys=primary_keys, bucket=bucket) self._write(table, pa.Table.from_pylist([ {'id': 1, 'mv': {'latest_version': 100, 'latest_value': 'a'}, 'val': 'x'}, ], schema=s0)) @@ -268,49 +267,512 @@ def _top_level_names(self, table_name): 'default.{}'.format(table_name)).table_schema return [f.name for f in schema.fields] - # -- C7: add nested sub-field -> silently adds a TOP-LEVEL column ---- + def test_add_subfield_goes_inside_struct_and_pads_null(self): + table = self._create_struct_table('nsub_add') + self.catalog.alter_table( + 'default.nsub_add', + [SchemaChange.add_column(['mv', 'score'], AtomicType('INT'))], False) + # The sub-field lands inside mv, not as a stray top-level column. + self.assertEqual(self._mv_subfield_names('nsub_add'), + ['latest_version', 'latest_value', 'score']) + self.assertNotIn('score', self._top_level_names('nsub_add')) + + table = self.catalog.get_table('default.nsub_add') + s1 = pa.schema([ + ('id', pa.int64()), + ('mv', pa.struct([('latest_version', pa.int64()), + ('latest_value', pa.string()), + ('score', pa.int32())])), + ('val', pa.string())]) + self._write(table, pa.Table.from_pylist([ + {'id': 2, 'mv': {'latest_version': 200, 'latest_value': 'b', 'score': 7}, + 'val': 'y'}], schema=s1)) + rows = self._read_sorted(table) + # Old row reads NULL for the added sub-field; new row carries it. + self.assertEqual(rows[0]['mv'], + {'latest_version': 100, 'latest_value': 'a', 'score': None}) + self.assertEqual(rows[1]['mv'], + {'latest_version': 200, 'latest_value': 'b', 'score': 7}) + + def test_rename_subfield_follows_field_id(self): + table = self._create_struct_table('nsub_rename') + self.catalog.alter_table( + 'default.nsub_rename', + [SchemaChange.rename_column(['mv', 'latest_value'], 'lv')], False) + self.assertEqual(self._mv_subfield_names('nsub_rename'), + ['latest_version', 'lv']) + table = self.catalog.get_table('default.nsub_rename') + rows = self._read_sorted(table) + # Old data follows the renamed sub-field by id, not by name. + self.assertEqual(rows[0]['mv'], {'latest_version': 100, 'lv': 'a'}) - def test_nested_add_subfield_mutates_top_level(self): - # GAP: add_column(['mv','new_inner']) does NOT add new_inner inside - # mv; it silently appends a top-level column 'new_inner' instead. - self._create_struct_table('gap_add') + def test_update_subfield_type_casts(self): + s0 = pa.schema([('id', pa.int64()), + ('mv', pa.struct([('v', pa.int32()), ('s', pa.string())]))]) + table = self._create('nsub_type', s0) + self._write(table, pa.Table.from_pylist( + [{'id': 1, 'mv': {'v': 10, 's': 'a'}}], schema=s0)) self.catalog.alter_table( - 'default.gap_add', - [SchemaChange.add_column(['mv', 'new_inner'], AtomicType('INT'))], - False) - # mv's sub-fields are unchanged; a stray top-level column appeared. - self.assertEqual(self._mv_subfield_names('gap_add'), - ['latest_version', 'latest_value']) - self.assertIn('new_inner', self._top_level_names('gap_add')) + 'default.nsub_type', + [SchemaChange.update_column_type(['mv', 'v'], AtomicType('BIGINT'))], False) + table = self.catalog.get_table('default.nsub_type') + rb = table.new_read_builder() + splits = rb.new_scan().plan().splits() + arrow = rb.new_read().to_arrow(splits) + self.assertEqual(arrow.schema.field('mv').type.field('v').type, pa.int64()) + self.assertEqual(arrow.to_pylist()[0]['mv'], {'v': 10, 's': 'a'}) - # -- C8/C9/C10: rename / update-type / drop nested sub-field --------- + def test_drop_subfield_not_revived(self): + table = self._create_struct_table('nsub_drop') + self.catalog.alter_table( + 'default.nsub_drop', + [SchemaChange.drop_column(['mv', 'latest_value'])], False) + self.assertEqual(self._mv_subfield_names('nsub_drop'), ['latest_version']) + table = self.catalog.get_table('default.nsub_drop') + rows = self._read_sorted(table) + # The dropped sub-field's old data is gone, not revived under its id. + self.assertEqual(rows[0]['mv'], {'latest_version': 100}) - def test_nested_rename_subfield_raises(self): - # GAP: field_names[-1]='latest_value' is looked up at the TOP level, - # where it does not exist -> ColumnNotExistException (wrapped). - self._create_struct_table('gap_rename') + def test_drop_all_subfields_rejected(self): + self._create_struct_table('nsub_dropall') + self.catalog.alter_table( + 'default.nsub_dropall', + [SchemaChange.drop_column(['mv', 'latest_value'])], False) + with self.assertRaises(RuntimeError): + self.catalog.alter_table( + 'default.nsub_dropall', + [SchemaChange.drop_column(['mv', 'latest_version'])], False) + + def test_null_to_not_null_disabled_by_default(self): + # Converting nullable -> NOT NULL is unsafe for existing data and is + # rejected unless the table opts in via + # 'alter-column-null-to-not-null.disabled' = 'false'. + self._create_struct_table('nsub_nullability') with self.assertRaises(RuntimeError) as cm: self.catalog.alter_table( - 'default.gap_rename', - [SchemaChange.rename_column(['mv', 'latest_value'], 'lv')], False) - self.assertIn('latest_value', str(cm.exception)) + 'default.nsub_nullability', + [SchemaChange.update_column_nullability( + ['mv', 'latest_value'], False)], False) + self.assertIn('nullable to non nullable', str(cm.exception)) + # Opt-in makes the same change succeed. + self.catalog.alter_table( + 'default.nsub_nullability', + [SchemaChange.set_option( + 'alter-column-null-to-not-null.disabled', 'false')], False) + self.catalog.alter_table( + 'default.nsub_nullability', + [SchemaChange.update_column_nullability( + ['mv', 'latest_value'], False)], False) + schema = self.catalog.get_table('default.nsub_nullability').table_schema + mv = next(f for f in schema.fields if f.name == 'mv') + lv = next(sf for sf in mv.type.fields if sf.name == 'latest_value') + self.assertFalse(lv.type.nullable) - def test_nested_update_subfield_type_raises(self): - self._create_struct_table('gap_update') + def test_unsupported_subfield_cast_rejected(self): + self._create_struct_table('nsub_badcast') with self.assertRaises(RuntimeError) as cm: self.catalog.alter_table( - 'default.gap_update', + 'default.nsub_badcast', [SchemaChange.update_column_type( - ['mv', 'latest_version'], AtomicType('BIGINT'))], False) - self.assertIn('latest_version', str(cm.exception)) + ['mv', 'latest_version'], AtomicType('DATE'))], False) + self.assertIn('cannot be converted', str(cm.exception)) + + def test_nested_projection_after_rename_subfield(self): + # Projecting a renamed leaf must follow the field id into old files, + # not look the new name up in the file's physical schema. + s0 = pa.schema([('id', pa.int64()), + ('mv', pa.struct([('v', pa.int32()), ('s', pa.string())]))]) + table = self._create('nsub_proj_rename', s0) + self._write(table, pa.Table.from_pylist( + [{'id': 1, 'mv': {'v': 10, 's': 'a'}}], schema=s0)) + self.catalog.alter_table( + 'default.nsub_proj_rename', + [SchemaChange.rename_column(['mv', 's'], 'ss')], False) + table = self.catalog.get_table('default.nsub_proj_rename') + s1 = pa.schema([('id', pa.int64()), + ('mv', pa.struct([('v', pa.int32()), ('ss', pa.string())]))]) + self._write(table, pa.Table.from_pylist( + [{'id': 2, 'mv': {'v': 20, 'ss': 'b'}}], schema=s1)) + + rows = self._read_sorted(table, projection=['id', 'mv.ss']) + self.assertEqual(rows, [ + {'id': 1, 'mv_ss': 'a'}, + {'id': 2, 'mv_ss': 'b'}, + ]) + + def test_nested_projection_after_update_subfield_type(self): + # Projecting a type-changed leaf must cast old batches to the latest + # type instead of emitting mixed-type batches. + s0 = pa.schema([('id', pa.int64()), + ('mv', pa.struct([('v', pa.int32()), ('s', pa.string())]))]) + table = self._create('nsub_proj_type', s0) + self._write(table, pa.Table.from_pylist( + [{'id': 1, 'mv': {'v': 10, 's': 'a'}}], schema=s0)) + self.catalog.alter_table( + 'default.nsub_proj_type', + [SchemaChange.update_column_type(['mv', 'v'], AtomicType('BIGINT'))], False) + table = self.catalog.get_table('default.nsub_proj_type') + s1 = pa.schema([('id', pa.int64()), + ('mv', pa.struct([('v', pa.int64()), ('s', pa.string())]))]) + self._write(table, pa.Table.from_pylist( + [{'id': 2, 'mv': {'v': 20, 's': 'b'}}], schema=s1)) + + rb = table.new_read_builder().with_projection(['id', 'mv.v']) + splits = rb.new_scan().plan().splits() + arrow = rb.new_read().to_arrow(splits) + self.assertEqual(arrow.schema.field('mv_v').type, pa.int64()) + rows = sorted(arrow.to_pylist(), key=lambda r: r['id']) + self.assertEqual(rows, [ + {'id': 1, 'mv_v': 10}, + {'id': 2, 'mv_v': 20}, + ]) + + def test_pk_nested_subfield_evolution_merge(self): + s0 = pa.schema([('id', pa.int64()), ('mv', _MV_PA)]) + table = self._create('nsub_pk', s0, primary_keys=['id'], bucket='1') + self._write(table, pa.Table.from_pylist( + [{'id': 1, 'mv': {'latest_version': 1, 'latest_value': 'a'}}], schema=s0)) + self.catalog.alter_table( + 'default.nsub_pk', + [SchemaChange.add_column(['mv', 'score'], AtomicType('INT'))], False) + table = self.catalog.get_table('default.nsub_pk') + s1 = pa.schema([('id', pa.int64()), + ('mv', pa.struct([('latest_version', pa.int64()), + ('latest_value', pa.string()), + ('score', pa.int32())]))]) + self._write(table, pa.Table.from_pylist( + [{'id': 1, 'mv': {'latest_version': 2, 'latest_value': 'b', 'score': 9}}], + schema=s1)) + rows = self._read_sorted(table) + self.assertEqual(len(rows), 1) + self.assertEqual(rows[0]['mv'], + {'latest_version': 2, 'latest_value': 'b', 'score': 9}) + + +class SchemaEvolutionNestedContainerTest(_NestedBase): + """Sub-field evolution of a ROW nested inside an ARRAY / MAP.""" + + def test_array_of_row_add_and_rename_subfield(self): + elem = pa.struct([('a', pa.int64()), ('b', pa.string())]) + s0 = pa.schema([('id', pa.int64()), ('arr', pa.list_(elem))]) + table = self._create('narr', s0) + self._write(table, pa.Table.from_pylist( + [{'id': 1, 'arr': [{'a': 1, 'b': 'x'}, {'a': 2, 'b': 'y'}]}], schema=s0)) + # Descend through the array element into the ROW. + self.catalog.alter_table('default.narr', [ + SchemaChange.add_column(['arr', 'element', 'c'], AtomicType('INT')), + SchemaChange.rename_column(['arr', 'element', 'b'], 'bb'), + ], False) + table = self.catalog.get_table('default.narr') + rows = self._read_sorted(table) + self.assertEqual(rows[0]['arr'], + [{'a': 1, 'bb': 'x', 'c': None}, + {'a': 2, 'bb': 'y', 'c': None}]) + + def test_map_of_row_add_subfield(self): + val = pa.struct([('a', pa.int64()), ('b', pa.string())]) + s0 = pa.schema([('id', pa.int64()), ('m', pa.map_(pa.string(), val))]) + table = self._create('nmap', s0) + self._write(table, pa.Table.from_pylist( + [{'id': 1, 'm': [('k', {'a': 1, 'b': 'x'})]}], schema=s0)) + # Descend through the map value into the ROW. + self.catalog.alter_table( + 'default.nmap', + [SchemaChange.add_column(['m', 'value', 'c'], AtomicType('INT'))], False) + table = self.catalog.get_table('default.nmap') + rows = self._read_sorted(table) + self.assertEqual(rows[0]['m'], [('k', {'a': 1, 'b': 'x', 'c': None})]) + + def test_array_wrapper_token_validated(self): + # The token consumed when descending through an ARRAY must be + # 'element'; an unknown step must not silently mutate the schema. + elem = pa.struct([('a', pa.int64())]) + s0 = pa.schema([('id', pa.int64()), ('arr', pa.list_(elem))]) + self._create('ntok_arr', s0) + with self.assertRaises(RuntimeError) as cm: + self.catalog.alter_table( + 'default.ntok_arr', + [SchemaChange.add_column(['arr', 'wrong', 'c'], AtomicType('INT'))], + False) + self.assertIn('arr.wrong.c', str(cm.exception)) + # The canonical token still works. + self.catalog.alter_table( + 'default.ntok_arr', + [SchemaChange.add_column(['arr', 'element', 'c'], AtomicType('INT'))], + False) - def test_nested_drop_subfield_raises(self): - self._create_struct_table('gap_drop') + def test_array_element_type_update(self): + # The canonical path for promoting an array's element type descends + # through the 'element' token; old files are cast at read time. + s0 = pa.schema([('id', pa.int64()), ('a2', pa.list_(pa.int32()))]) + table = self._create('nelem_type', s0) + self._write(table, pa.Table.from_pylist( + [{'id': 1, 'a2': [1, 2]}], schema=s0)) + self.catalog.alter_table( + 'default.nelem_type', + [SchemaChange.update_column_type(['a2', 'element'], AtomicType('BIGINT'))], + False) + table = self.catalog.get_table('default.nelem_type') + s1 = pa.schema([('id', pa.int64()), ('a2', pa.list_(pa.int64()))]) + self._write(table, pa.Table.from_pylist( + [{'id': 2, 'a2': [3]}], schema=s1)) + rb = table.new_read_builder() + splits = rb.new_scan().plan().splits() + arrow = rb.new_read().to_arrow(splits) + self.assertEqual(arrow.schema.field('a2').type, pa.list_(pa.int64())) + rows = sorted(arrow.to_pylist(), key=lambda r: r['id']) + self.assertEqual(rows, [{'id': 1, 'a2': [1, 2]}, {'id': 2, 'a2': [3]}]) + + def test_whole_struct_type_replacement_rejected(self): + # Replacing a whole ROW type would carry caller-supplied nested ids + # that corrupt the id model; it must be rejected at alter time. + elem = pa.struct([('a', pa.int32()), ('b', pa.string())]) + s0 = pa.schema([('id', pa.int64()), ('mv', elem)]) + self._create('nrow_replace', s0) + new_row = _paimon_type(pa.struct([('a', pa.int64()), ('c', pa.string())])) with self.assertRaises(RuntimeError) as cm: self.catalog.alter_table( - 'default.gap_drop', - [SchemaChange.drop_column(['mv', 'latest_value'])], False) - self.assertIn('latest_value', str(cm.exception)) + 'default.nrow_replace', + [SchemaChange.update_column_type('mv', new_row)], False) + self.assertIn('cannot be converted', str(cm.exception)) + + def test_align_handles_sliced_arrays(self): + # The list/map rebuilds read offsets/raw buffers; a sliced input + # must be re-materialized, not read through stale parent offsets. + from pypaimon.read.reader.data_file_batch_reader import \ + DataFileBatchReader + reader = DataFileBatchReader.__new__(DataFileBatchReader) + sliced_list = pa.array( + [[1, 2], [3], [4, 5, 6], None], type=pa.list_(pa.int32())).slice(1, 3) + out = reader._align_array_by_id( + sliced_list, + ArrayType(True, AtomicType('INT')), + ArrayType(True, AtomicType('BIGINT'))) + self.assertEqual(out.to_pylist(), [[3], [4, 5, 6], None]) + self.assertEqual(out.type, pa.list_(pa.int64())) + + sliced_map = pa.array( + [[('a', 1)], [('b', 2)], None], + type=pa.map_(pa.string(), pa.int32())).slice(1, 2) + out = reader._align_array_by_id( + sliced_map, + MapType(True, AtomicType('STRING'), AtomicType('INT')), + MapType(True, AtomicType('STRING'), AtomicType('BIGINT'))) + self.assertEqual(out.to_pylist(), [[('b', 2)], None]) + + def test_map_wrapper_token_validated(self): + # The token consumed when descending through a MAP must be 'value'. + val = pa.struct([('a', pa.int64())]) + s0 = pa.schema([('id', pa.int64()), ('m', pa.map_(pa.string(), val))]) + self._create('ntok_map', s0) + with self.assertRaises(RuntimeError) as cm: + self.catalog.alter_table( + 'default.ntok_map', + [SchemaChange.add_column(['m', 'wrong', 'c'], AtomicType('INT'))], + False) + self.assertIn('m.wrong.c', str(cm.exception)) + self.catalog.alter_table( + 'default.ntok_map', + [SchemaChange.add_column(['m', 'value', 'c'], AtomicType('INT'))], + False) + + +class SchemaEvolutionConstructedToStringTest(_NestedBase): + """update column type from ROW/ARRAY/MAP to STRING: old files must be + materialized as the engine's string rendering at read time.""" + + def test_row_to_string(self): + s0 = pa.schema([('id', pa.int64()), + ('mv', pa.struct([('a', pa.int32()), ('b', pa.string())]))]) + table = self._create('c2s_row', s0) + self._write(table, pa.Table.from_pylist( + [{'id': 1, 'mv': {'a': 1, 'b': 'x'}}], schema=s0)) + self.catalog.alter_table( + 'default.c2s_row', + [SchemaChange.update_column_type('mv', AtomicType('STRING'))], False) + table = self.catalog.get_table('default.c2s_row') + s1 = pa.schema([('id', pa.int64()), ('mv', pa.string())]) + self._write(table, pa.Table.from_pylist( + [{'id': 2, 'mv': 's2'}], schema=s1)) + + rows = self._read_sorted(table) + self.assertEqual(rows, [ + {'id': 1, 'mv': '{1, x}'}, + {'id': 2, 'mv': 's2'}, + ]) + + def test_array_to_string(self): + s0 = pa.schema([('id', pa.int64()), ('arr', pa.list_(pa.int32()))]) + table = self._create('c2s_arr', s0) + self._write(table, pa.Table.from_pylist( + [{'id': 1, 'arr': [1, 2, 3]}], schema=s0)) + self.catalog.alter_table( + 'default.c2s_arr', + [SchemaChange.update_column_type('arr', AtomicType('STRING'))], False) + table = self.catalog.get_table('default.c2s_arr') + rows = self._read_sorted(table) + self.assertEqual(rows[0]['arr'], '[1, 2, 3]') + + def test_map_to_string(self): + s0 = pa.schema([('id', pa.int64()), + ('m', pa.map_(pa.string(), pa.int32()))]) + table = self._create('c2s_map', s0) + self._write(table, pa.Table.from_pylist( + [{'id': 1, 'm': [('k', 7)]}], schema=s0)) + self.catalog.alter_table( + 'default.c2s_map', + [SchemaChange.update_column_type('m', AtomicType('STRING'))], False) + table = self.catalog.get_table('default.c2s_map') + rows = self._read_sorted(table) + self.assertEqual(rows[0]['m'], '{k -> 7}') + + def test_row_to_string_null_semantics(self): + s0 = pa.schema([('id', pa.int64()), + ('mv', pa.struct([('a', pa.int32()), ('b', pa.string())]))]) + table = self._create('c2s_null', s0) + self._write(table, pa.Table.from_pylist([ + {'id': 1, 'mv': None}, + {'id': 2, 'mv': {'a': None, 'b': 'x'}}, + ], schema=s0)) + self.catalog.alter_table( + 'default.c2s_null', + [SchemaChange.update_column_type('mv', AtomicType('STRING'))], False) + table = self.catalog.get_table('default.c2s_null') + rows = self._read_sorted(table) + # A NULL container stays NULL; a NULL sub-value renders as 'null'. + self.assertIsNone(rows[0]['mv']) + self.assertEqual(rows[1]['mv'], '{null, x}') + + def test_vector_to_string_rejected(self): + # There is no read-time string rendering for vectors, so the type + # change must be rejected at alter time instead of failing on read. + s0 = pa.schema([('id', pa.int64()), + ('embed', pa.list_(pa.float32(), 3))]) + table = self._create('c2s_vec', s0) + self._write(table, pa.Table.from_pylist( + [{'id': 1, 'embed': [1.0, 2.0, 3.0]}], schema=s0)) + with self.assertRaises(RuntimeError) as cm: + self.catalog.alter_table( + 'default.c2s_vec', + [SchemaChange.update_column_type('embed', AtomicType('STRING'))], + False) + self.assertIn('cannot be converted', str(cm.exception)) + # The vector column itself still reads fine. + rows = self._read_sorted(table) + self.assertEqual(rows[0]['embed'], [1.0, 2.0, 3.0]) + + def test_nested_subfield_row_to_string(self): + inner = pa.struct([('a', pa.int32())]) + s0 = pa.schema([('id', pa.int64()), + ('mv', pa.struct([('inner', inner)]))]) + table = self._create('c2s_nested', s0) + self._write(table, pa.Table.from_pylist( + [{'id': 1, 'mv': {'inner': {'a': 1}}}], schema=s0)) + self.catalog.alter_table( + 'default.c2s_nested', + [SchemaChange.update_column_type(['mv', 'inner'], AtomicType('STRING'))], + False) + table = self.catalog.get_table('default.c2s_nested') + rows = self._read_sorted(table) + self.assertEqual(rows[0]['mv'], {'inner': '{1}'}) + + +class NestedFieldIdModelTest(unittest.TestCase): + """Globally-unique nested field ids, mirrored from the engine id model.""" + + def test_nested_ids_are_globally_unique(self): + s = pa.schema([('id', pa.int64()), ('mv', _MV_PA), ('x', pa.string())]) + fields = PyarrowFieldParser.to_paimon_schema(s) + ids = set() + for f in fields: + ids.add(f.id) + collect_field_ids(f.type, ids) + # id(0), mv(1), latest_version(2), latest_value(3), x(4) + self.assertEqual(ids, {0, 1, 2, 3, 4}) + self.assertEqual(current_highest_field_id(fields), 4) + + def test_flat_schema_ids_unchanged(self): + fields = PyarrowFieldParser.to_paimon_schema( + pa.schema([('a', pa.int64()), ('b', pa.string()), ('c', pa.int32())])) + self.assertEqual([f.id for f in fields], [0, 1, 2]) + + def test_reassign_field_id_depth_first_order(self): + inner = RowType(True, [DataField(0, 'c', AtomicType('INT'))]) + mid = RowType(True, [DataField(0, 'b', inner)]) + outer = RowType(True, [DataField(0, 'a', mid), + DataField(0, 'd', AtomicType('INT'))]) + result = reassign_field_id(outer, AtomicInteger(2)) + ids = set() + collect_field_ids(result, ids) + self.assertEqual(ids, {3, 4, 5, 6}) + + def test_duplicate_field_id_raises(self): + bad = [ + DataField(0, 'a', AtomicType('INT')), + DataField(1, 'b', RowType(True, [DataField(0, 'c', AtomicType('INT'))])), + ] + with self.assertRaises(ValueError): + current_highest_field_id(bad) + + +class SupportsCastTest(unittest.TestCase): + + def test_supported_casts(self): + for src, dst in [('INT', 'BIGINT'), ('FLOAT', 'DOUBLE'), ('INT', 'STRING'), + ('DOUBLE', 'INT'), ('DECIMAL(10, 4)', 'DECIMAL(10, 2)')]: + self.assertTrue(supports_cast(AtomicType(src), AtomicType(dst)), + '{} -> {}'.format(src, dst)) + + def test_unsupported_casts(self): + for src, dst in [('BIGINT', 'DATE'), ('BOOLEAN', 'DATE')]: + self.assertFalse(supports_cast(AtomicType(src), AtomicType(dst)), + '{} -> {}'.format(src, dst)) + + def test_can_execute_cast_decimal_precision(self): + # A numeric -> decimal cast has a PyArrow kernel but is only executable + # when the target precision can hold the source's range at the target + # scale (INT needs >= 12 at scale 2, BIGINT >= 21). An empty-array probe + # misses this; can_execute_cast must reject the too-small targets so the + # read path does not later fail with ArrowInvalid. + for src, dst in [('INT', 'DECIMAL(10, 2)'), ('BIGINT', 'DECIMAL(10, 2)'), + ('BIGINT', 'DECIMAL(20, 2)')]: + self.assertFalse(can_execute_cast(AtomicType(src), AtomicType(dst)), + '{} -> {}'.format(src, dst)) + for src, dst in [('INT', 'DECIMAL(12, 2)'), ('BIGINT', 'DECIMAL(21, 2)'), + ('INT', 'BIGINT'), ('DOUBLE', 'INT'), ('INT', 'STRING')]: + self.assertTrue(can_execute_cast(AtomicType(src), AtomicType(dst)), + '{} -> {}'.format(src, dst)) + + def test_constructed_to_string(self): + # ROW/ARRAY/MAP have a read-time string rendering; vector and + # multiset do not, so their type change must be rejected. + row = RowType(True, [DataField(0, 'a', AtomicType('INT'))]) + arr = ArrayType(True, AtomicType('INT')) + m = MapType(True, AtomicType('STRING'), AtomicType('INT')) + for src in (row, arr, m): + self.assertTrue(supports_cast(src, AtomicType('STRING')), str(src)) + vec = VectorType(True, AtomicType('FLOAT'), 3) + ms = MultisetType(True, AtomicType('INT')) + for src in (vec, ms): + self.assertFalse(supports_cast(src, AtomicType('STRING')), str(src)) + + def test_constructed_to_differently_shaped_constructed_rejected(self): + # Reshaping a constructed type must go through sub-field / + # 'element' / 'value' paths; a whole-type replacement would carry + # caller-supplied nested ids that corrupt the id model. + self.assertFalse(supports_cast( + RowType(True, [DataField(0, 'a', AtomicType('INT'))]), + RowType(True, [DataField(0, 'a', AtomicType('BIGINT'))]))) + self.assertFalse(supports_cast( + ArrayType(True, AtomicType('INT')), + ArrayType(True, AtomicType('BIGINT')))) + self.assertFalse(supports_cast( + VectorType(True, AtomicType('FLOAT'), 3), + VectorType(True, AtomicType('FLOAT'), 5))) + # Only the outer nullability differing is still an identity cast. + self.assertTrue(supports_cast( + RowType(True, [DataField(2, 'a', AtomicType('INT'))]), + RowType(False, [DataField(2, 'a', AtomicType('INT'))]))) if __name__ == '__main__': diff --git a/paimon-python/pypaimon/tests/test_nested_projection_e2e.py b/paimon-python/pypaimon/tests/test_nested_projection_e2e.py index 50a239a1d8bc..a775a6dffd33 100644 --- a/paimon-python/pypaimon/tests/test_nested_projection_e2e.py +++ b/paimon-python/pypaimon/tests/test_nested_projection_e2e.py @@ -193,6 +193,23 @@ def test_partitioned_table_with_nested_projection(self): [{'part': 'A', 'mv_latest_version': 100, 'val': 'x'}, {'part': 'B', 'mv_latest_version': 200, 'val': 'y'}]) + def test_filter_on_projected_nested_leaf(self): + """A predicate on a projected nested leaf must actually filter rows. + The read widens the projection to the top-level struct, which drops + the leaf predicate from push-down (its path is absent from the read + fields); without re-applying it after the leaves are extracted, every + row leaks through.""" + table = self._create_table('ao_nested_leaf_filter') + rb = table.new_read_builder().with_projection(['id', 'mv.latest_version']) + pred = rb.new_predicate_builder().greater_than('mv_latest_version', 150) + rb = rb.with_filter(pred) + got = rb.new_read().to_arrow(rb.new_scan().plan().splits()).to_pylist() + got = sorted(got, key=lambda r: r['id']) + self.assertEqual( + got, + [{'id': 2, 'mv_latest_version': 200}, + {'id': 3, 'mv_latest_version': 300}]) + def test_avro_nested_projection_python_fallback(self): """Avro has no native nested column pruning; the reader walks each fastavro record dict by path and assembles the column @@ -245,11 +262,43 @@ def _create_pk_table(self, name: str, file_format: str = 'parquet'): w.close() return table + def _create_pk_raw_table(self, name: str, file_format: str = 'parquet'): + """Single commit keeps the split raw-convertible, so the read stays on + the RawFileSplitRead fast path rather than the merge reader.""" + identifier = 'default.{}'.format(name) + schema = Schema.from_pyarrow_schema( + self.pa_schema, + primary_keys=['id'], + options={'bucket': '1', 'file.format': file_format}, + ) + self.catalog.create_table(identifier, schema, False) + table = self.catalog.get_table(identifier) + wb = table.new_batch_write_builder() + w = wb.new_write() + w.write_arrow(pa.Table.from_pylist(self.rows, schema=self.pa_schema)) + wb.new_commit().commit(w.prepare_commit()) + w.close() + return table + def _read_arrow(self, table, projection): rb = table.new_read_builder().with_projection(projection) splits = rb.new_scan().plan().splits() return rb.new_read().to_arrow(splits) + def test_raw_convertible_filter_on_projected_nested_leaf(self): + """PK raw-convertible split also widens nested projection and so drops + the leaf predicate from push-down. The filter must be re-applied on the + extracted leaves; otherwise all rows are returned (reviewer repro).""" + table = self._create_pk_raw_table('pk_raw_nested_leaf_filter') + rb = table.new_read_builder().with_projection(['id', 'mv.latest_version']) + pred = rb.new_predicate_builder().greater_than('mv_latest_version', 150) + rb = rb.with_filter(pred) + arrow = rb.new_read().to_arrow(rb.new_scan().plan().splits()) + rows = sorted(zip( + arrow.column('id').to_pylist(), + arrow.column('mv_latest_version').to_pylist())) + self.assertEqual(rows, [(2, 200), (3, 300)]) + def test_extracts_single_nested_leaf(self): table = self._create_pk_table('pk_nested_single') arrow = self._read_arrow(table, ['mv.latest_version']) @@ -301,6 +350,21 @@ def test_dotted_top_level_field_kept(self): got = rb.new_read().to_arrow(rb.new_scan().plan().splits()).to_pylist() self.assertEqual(got, [{'id': 1, 'media.left': 'hello'}]) + def test_merge_filter_on_projected_nested_leaf(self): + """Non-raw-convertible PK splits go through the merge reader, which + widens the nested projection to the full ROW and so also drops the leaf + predicate from push-down. The filter must be re-applied on the extracted + leaves above the merge; otherwise all rows are returned.""" + table = self._create_pk_table('pk_merge_nested_leaf_filter') + rb = table.new_read_builder().with_projection(['id', 'mv.latest_version']) + pred = rb.new_predicate_builder().greater_than('mv_latest_version', 150) + rb = rb.with_filter(pred) + arrow = rb.new_read().to_arrow(rb.new_scan().plan().splits()) + rows = sorted(zip( + arrow.column('id').to_pylist(), + arrow.column('mv_latest_version').to_pylist())) + self.assertEqual(rows, [(2, 200), (3, 300)]) + def test_avro_extracts_single_nested_leaf(self): # Avro PK reads resolve DataFields through ``full_fields_map`` which # historically only covered merge-internal aliases; without the