Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
29 changes: 8 additions & 21 deletions pyiceberg/io/pyarrow.py
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@

from pyiceberg.conversions import to_bytes
from pyiceberg.exceptions import ResolveError
from pyiceberg.expressions import AlwaysTrue, BooleanExpression, BoundIsNaN, BoundIsNull, BoundReference, BoundTerm, Not, Or
from pyiceberg.expressions import AlwaysTrue, BooleanExpression, BoundIsNaN, BoundIsNull, BoundTerm, Not, Or
from pyiceberg.expressions.literals import Literal
from pyiceberg.expressions.visitors import (
BoundBooleanExpressionVisitor,
Expand Down Expand Up @@ -764,31 +764,18 @@ def _expression_to_complementary_pyarrow(expr: BooleanExpression) -> pc.Expressi
collector = _NullNaNUnmentionedTermsCollector()
collector.collect(expr)

def _downcast_term_to_reference(bound_terms: Set[BoundTerm[Any]]) -> Set[BoundReference[Any]]:
"""Handle mypy check for BoundTerm -> BoundReference."""
bound_refs: Set[BoundReference[Any]] = set()
for t in bound_terms:
if not isinstance(t, BoundReference):
raise ValueError("Collected Bound Term that is not reference.")
else:
bound_refs.add(t)
return bound_refs

null_unmentioned_bound_refs: Set[BoundReference[Any]] = _downcast_term_to_reference(collector.null_unmentioned_bound_terms)
nan_unmentioned_bound_refs: Set[BoundReference[Any]] = _downcast_term_to_reference(collector.nan_unmentioned_bound_terms)

# Convert the set of references to a sorted list so that layout of the expression to build is deterministic.
null_unmentioned_bound_refs_sorted: List[BoundReference[Any]] = sorted(
null_unmentioned_bound_refs, key=lambda ref: ref.field.name
# Convert the set of terms to a sorted list so that layout of the expression to build is deterministic.
null_unmentioned_bound_terms_sorted: List[BoundTerm[Any]] = sorted(
collector.null_unmentioned_bound_terms, key=lambda term: term.ref().field.name
)
nan_unmentioned_bound_refs_sorted: List[BoundReference[Any]] = sorted(
nan_unmentioned_bound_refs, key=lambda ref: ref.field.name
nan_unmentioned_bound_terms_sorted: List[BoundTerm[Any]] = sorted(
collector.nan_unmentioned_bound_terms, key=lambda term: term.ref().field.name
)

preserve_expr: BooleanExpression = Not(expr)
for term in null_unmentioned_bound_refs_sorted:
for term in null_unmentioned_bound_terms_sorted:
preserve_expr = Or(preserve_expr, BoundIsNull(term=term))
for term in nan_unmentioned_bound_refs_sorted:
for term in nan_unmentioned_bound_terms_sorted:
preserve_expr = Or(preserve_expr, BoundIsNaN(term=term))
return expression_to_pyarrow(preserve_expr)

Expand Down