[python] Add conflict check for blob updating#8244
Merged
Conversation
Add testBlobCompactConflictWriteBase + testBlobCompactConflictRunCompact Java helpers and a paired pypaimon e2e test that: 1. Writes 2 blob data files (snapshot 2). 2. pypaimon scans + prepares an update-by-row-id for the blob column (stale layout, check_from_snapshot=2). 3. Java runs data-evolution compact (snapshot 3, COMPACT kind). 4. pypaimon commits the stale blob update. Today step 4 silently succeeds — conflict_detection.py:310-311 skips COMPACT snapshots in check_row_id_from_snapshot, and the parallel check_row_id_range_conflicts path only catches the parquet sidecar, not blob-only deltas. The resulting layout has multiple blob files overlapping the same row-id range with sentinel max_sequence_number=0, which a second concurrent stale update would turn into an unreadable table via BlobFallbackRecordReader's intra-group overlap check. The test asserts the post-fix behavior (RuntimeError) and is marked @expectedfailure so it xfails today; remove the decorator when the conflict-detection fix lands.
check_row_id_from_snapshot was unconditionally skipping COMPACT snapshots, letting a row-id-based update commit succeed even when the file layout it was prepared against had been replaced by a concurrent compact. For COMPACT snapshots, now check whether any file added by the compact overlaps the staged delta's row-id range; overlap means the delta's anchor is stale and the commit must be rejected. Drops @unittest.expectedFailure from test_blob_compact_conflict_update; the e2e now passes.
The previous fix flagged any COMPACT that touched a file overlapping the staged delta's row range. This false-positives when COMPACT only rewrites unrelated file types (e.g. parquet compact alone does not invalidate a blob delta whose anchor blob file is still on disk). Tighten the check to match the delta's anchor file type: - Pull both ADD and DELETE entries via new read_incremental_raw_entries_from_changed_partitions on CommitScanner (the regular method funnels through read_entries_parallel which drops standalone DELETEs). - Only conflict when the COMPACT deleted a file of the same type as the staged delta (.blob vs non-.blob) and the deleted file's row-id range overlaps the delta's. E2E test updated: doDataEvolutionCompact now takes compactBlob; testBlobCompactConflictRunCompact passes true so blob files actually get compacted, exercising the precise check. Unit tests rewritten around DELETE-based matching: new test covers the non-matching file type case (parquet compact + blob delta) which the previous version would have incorrectly flagged.
- Hoist ManifestFileManager import to module top (verified no circular dep) - Match existing RuntimeError style in the file (.format() instead of f-string) - Shorten the file-type-pairing comment to one line
- Replace inline _is_blob with DataFileMeta.is_blob_file utility used elsewhere in this file, avoiding silent drift if the blob suffix convention ever changes. - Extract the ~25-line COMPACT branch body into _compact_conflicts_with_delta helper. Outer loop becomes a clean dispatch; OVERWRITE handling (if added later) is a sibling helper. - Add two unit tests for read_incremental_raw_entries_from_changed_partitions in partition_predicate_test.py: one asserts DELETE entries are preserved (the load-bearing diff vs the regular method), the other asserts unmatched partitions are filtered out. Previously this code path was only exercised by the mvn-subprocess e2e.
Combine positive + first-match-wins into one test that uses a two-snapshot chain. Combine the two negative scenarios (disjoint range, add-only) into one subTest. Same code coverage, half the boilerplate.
Same-file-type + range-overlap alone false-positives when COMPACT rewrites another column's shard (e.g. compacting f1-only parquet should not block an update touching only f2). Reuse the existing RowIdColumnConflictChecker, which already handles row range + write column overlap; keep the file-type filter to handle write_cols=None ambiguity (initial full-row parquet doesn't actually contain blob columns). Adds a regression subcase 'other_column_shard' to test_compact_no_conflict_when_no_matching_delete.
… < 7 Table.sort_by is PyArrow 7+. The Python 3.6 CI job pins pyarrow==6.0.1, where the previous call raised AttributeError. Use sort_by when present and pc.sort_indices + take otherwise; both produce the same sorted Table.
PyArrow 6.0.1 (pinned in the Python 3.6 CI job) rejects the two-positional form of pc.is_in with 'Function is_in accepts 1 arguments but attempted to look up kernel(s) with 2'. Use the keyword form, which works on both old and new PyArrow.
Contributor
|
+1 |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
Purpose
A blob update prepared against snapshot N commits successfully even when a COMPACT between N and now has removed the anchor file. Subsequent reads can fail, and two such stale commits in a row break the table.
Detect the COMPACT in the row-id conflict check and reject the commit with a
RuntimeError.Tests
test_blob_compact_conflict_update(e2e)TestCheckRowIdFromSnapshot.test_compact_blob_delete_raises_at_first_matchTestCheckRowIdFromSnapshot.test_compact_other_file_type_does_not_raiseTestCheckRowIdFromSnapshot.test_compact_no_conflict_when_no_matching_deleteTestCommitScannerPartitionPredicate.test_raw_entries_preserve_delete_kindTestCommitScannerPartitionPredicate.test_raw_entries_filter_unmatched_partition