diff --git a/paimon-core/src/test/java/org/apache/paimon/JavaPyE2ETest.java b/paimon-core/src/test/java/org/apache/paimon/JavaPyE2ETest.java index 379331a1c221..543ca8eab627 100644 --- a/paimon-core/src/test/java/org/apache/paimon/JavaPyE2ETest.java +++ b/paimon-core/src/test/java/org/apache/paimon/JavaPyE2ETest.java @@ -1288,6 +1288,73 @@ public void testCompactConflictRunCompact() throws Exception { LOG.info("compact_conflict_test: compact done, 5 files merged into 1 (1000 rows)"); } + /** + * Step 1 for blob compact conflict test: write a blob table with 2 data files so compaction + * will merge them. Each file has 100 rows of (id, name, blob_data). + */ + @Test + @EnabledIfSystemProperty(named = "run.e2e.tests", matches = "true") + public void testBlobCompactConflictWriteBase() throws Exception { + Identifier id = identifier("blob_compact_conflict_test"); + try { + catalog.dropTable(id, true); + } catch (Exception ignore) { + } + Schema schema = + Schema.newBuilder() + .column("f0", DataTypes.INT()) + .column("f1", DataTypes.STRING()) + .column("f2", DataTypes.BLOB()) + .option("target-file-size", "100 MB") + .option(ROW_TRACKING_ENABLED.key(), "true") + .option(DATA_EVOLUTION_ENABLED.key(), "true") + .option("compaction.min.file-num", "2") + .option(BUCKET.key(), "-1") + .build(); + catalog.createTable(id, schema, false); + + byte[] blobBytes = new byte[64]; + java.util.Random rng = new java.util.Random(42); + + FileStoreTable table = (FileStoreTable) catalog.getTable(id); + BatchWriteBuilder builder = table.newBatchWriteBuilder(); + try (BatchTableWrite w = builder.newWrite()) { + for (int i = 0; i < 100; i++) { + rng.nextBytes(blobBytes); + w.write( + GenericRow.of( + i, + BinaryString.fromString("name" + i), + new org.apache.paimon.data.BlobData(blobBytes.clone()))); + } + builder.newCommit().commit(w.prepareCommit()); + } + + table = (FileStoreTable) catalog.getTable(id); + builder = table.newBatchWriteBuilder(); + try (BatchTableWrite w = builder.newWrite()) { + for (int i = 100; i < 200; i++) { + rng.nextBytes(blobBytes); + w.write( + GenericRow.of( + i, + BinaryString.fromString("name" + i), + new org.apache.paimon.data.BlobData(blobBytes.clone()))); + } + builder.newCommit().commit(w.prepareCommit()); + } + LOG.info("blob_compact_conflict_test: 2 base files written (100 rows each, total 200)"); + } + + /** Step 3 for blob compact conflict test: run compact. */ + @Test + @EnabledIfSystemProperty(named = "run.e2e.tests", matches = "true") + public void testBlobCompactConflictRunCompact() throws Exception { + Identifier id = identifier("blob_compact_conflict_test"); + doDataEvolutionCompact((FileStoreTable) catalog.getTable(id), true); + LOG.info("blob_compact_conflict_test: compact done (compactBlob=true)"); + } + @Test @EnabledIfSystemProperty(named = "run.e2e.tests", matches = "true") public void testDataEvolutionWrite() throws Exception { @@ -1373,8 +1440,13 @@ private void setFirstRowId(List messages, long firstRowId) { } private void doDataEvolutionCompact(FileStoreTable table) throws Exception { + doDataEvolutionCompact(table, false); + } + + private void doDataEvolutionCompact(FileStoreTable table, boolean compactBlob) + throws Exception { DataEvolutionCompactCoordinator coordinator = - new DataEvolutionCompactCoordinator(table, false, false); + new DataEvolutionCompactCoordinator(table, compactBlob, false); List messages = new ArrayList<>(); try { List tasks; diff --git a/paimon-python/dev/run_mixed_tests.sh b/paimon-python/dev/run_mixed_tests.sh index c2b75f476da7..2014caa781fb 100755 --- a/paimon-python/dev/run_mixed_tests.sh +++ b/paimon-python/dev/run_mixed_tests.sh @@ -462,6 +462,30 @@ run_compact_conflict_test() { fi } +run_blob_compact_conflict_test() { + echo -e "${YELLOW}=== Running Blob Compact Conflict Test (Java Write Base, Python Blob Update + Java Compact) ===${NC}" + + cd "$PROJECT_ROOT" + + echo "Running Maven test for JavaPyE2ETest.testBlobCompactConflictWriteBase..." + if mvn test -Dtest=org.apache.paimon.JavaPyE2ETest#testBlobCompactConflictWriteBase -pl paimon-core -q -Drun.e2e.tests=true; then + echo -e "${GREEN}✓ Java write base blob files completed successfully${NC}" + else + echo -e "${RED}✗ Java write base blob files failed${NC}" + return 1 + fi + + cd "$PAIMON_PYTHON_DIR" + echo "Running Python test for JavaPyReadWriteTest.test_blob_compact_conflict_update..." + if python -m pytest java_py_read_write_test.py::JavaPyReadWriteTest::test_blob_compact_conflict_update -v; then + echo -e "${GREEN}✓ Python blob compact conflict test completed successfully${NC}" + return 0 + else + echo -e "${RED}✗ Python blob compact conflict test failed${NC}" + return 1 + fi +} + run_data_evolution_test() { echo -e "${YELLOW}=== Running Data Evolution Test (Java Write, Python Read) ===${NC}" @@ -673,6 +697,7 @@ main() { local lumina_vector_result=0 local lumina_vector_btree_result=0 local compact_conflict_result=0 + local blob_compact_conflict_result=0 local blob_alter_compact_result=0 local data_evolution_result=0 local data_evolution_py_write_result=0 @@ -823,6 +848,13 @@ main() { echo "" + # Run blob compact conflict test (Java write base + Python blob update + Java compact) + if ! run_blob_compact_conflict_test; then + blob_compact_conflict_result=1 + fi + + echo "" + # Run blob alter+compact test (Java write+alter+compact, Python read) if ! run_blob_alter_compact_test; then blob_alter_compact_result=1 @@ -968,6 +1000,12 @@ main() { echo -e "${RED}✗ Compact Conflict Test (Java Write+Compact, Python Read): FAILED${NC}" fi + if [[ $blob_compact_conflict_result -eq 0 ]]; then + echo -e "${GREEN}✓ Blob Compact Conflict Test (Java Write+Compact, Python Blob Update): PASSED${NC}" + else + echo -e "${RED}✗ Blob Compact Conflict Test (Java Write+Compact, Python Blob Update): FAILED${NC}" + fi + if [[ $blob_alter_compact_result -eq 0 ]]; then echo -e "${GREEN}✓ Blob Alter+Compact Test (Java Write+Alter+Compact, Python Read): PASSED${NC}" else @@ -1009,7 +1047,7 @@ main() { # Clean up warehouse directory after all tests cleanup_warehouse - if [[ $java_write_result -eq 0 && $python_read_result -eq 0 && $python_write_result -eq 0 && $java_read_result -eq 0 && $pk_dv_result -eq 0 && $btree_index_result -eq 0 && $compressed_text_result -eq 0 && $tantivy_fulltext_result -eq 0 && $lumina_vector_result -eq 0 && $lumina_vector_btree_result -eq 0 && $compact_conflict_result -eq 0 && $blob_alter_compact_result -eq 0 && $data_evolution_result -eq 0 && $data_evolution_py_write_result -eq 0 && $java_variant_write_py_read_result -eq 0 && $py_variant_write_java_read_result -eq 0 && $vector_append_table_result -eq 0 && $vector_dedicated_java_write_result -eq 0 && $vector_dedicated_py_write_result -eq 0 && $multi_vector_dedicated_java_write_result -eq 0 && $multi_vector_dedicated_py_write_result -eq 0 && $row_format_result -eq 0 ]]; then + if [[ $java_write_result -eq 0 && $python_read_result -eq 0 && $python_write_result -eq 0 && $java_read_result -eq 0 && $pk_dv_result -eq 0 && $btree_index_result -eq 0 && $compressed_text_result -eq 0 && $tantivy_fulltext_result -eq 0 && $lumina_vector_result -eq 0 && $lumina_vector_btree_result -eq 0 && $compact_conflict_result -eq 0 && $blob_compact_conflict_result -eq 0 && $blob_alter_compact_result -eq 0 && $data_evolution_result -eq 0 && $data_evolution_py_write_result -eq 0 && $java_variant_write_py_read_result -eq 0 && $py_variant_write_java_read_result -eq 0 && $vector_append_table_result -eq 0 && $vector_dedicated_java_write_result -eq 0 && $vector_dedicated_py_write_result -eq 0 && $multi_vector_dedicated_java_write_result -eq 0 && $multi_vector_dedicated_py_write_result -eq 0 && $row_format_result -eq 0 ]]; then echo -e "${GREEN}🎉 All tests passed! Java-Python interoperability verified.${NC}" return 0 else diff --git a/paimon-python/pypaimon/tests/e2e/java_py_read_write_test.py b/paimon-python/pypaimon/tests/e2e/java_py_read_write_test.py index 0c84a828bca1..4f1df3358580 100644 --- a/paimon-python/pypaimon/tests/e2e/java_py_read_write_test.py +++ b/paimon-python/pypaimon/tests/e2e/java_py_read_write_test.py @@ -1190,6 +1190,46 @@ def test_compact_conflict_shard_update(self): tc.close() print(f"Conflict detected as expected: {ctx.exception}") + def test_blob_compact_conflict_update(self): + import subprocess + + table = self.catalog.get_table('default.blob_compact_conflict_test') + snapshot_before = table.new_read_builder().new_scan().plan().snapshot_id + + wb = table.new_batch_write_builder() + table_update = wb.new_update().with_update_type(['f2']) + update_data = pa.Table.from_pydict({ + '_ROW_ID': pa.array([50], type=pa.int64()), + 'f2': pa.array([b'blob50-updated'], type=pa.large_binary()), + }) + stale_commit_msgs = table_update.update_by_arrow_with_row_id(update_data) + + project_root = os.path.join(self.tempdir, '..', '..', '..', '..') + result = subprocess.run( + ['mvn', 'test', + '-pl', 'paimon-core', + '-Dtest=org.apache.paimon.JavaPyE2ETest#testBlobCompactConflictRunCompact', + '-Drun.e2e.tests=true', + '-Dsurefire.failIfNoSpecifiedTests=false', + '-q'], + cwd=os.path.abspath(project_root), + stdout=subprocess.PIPE, stderr=subprocess.PIPE, + universal_newlines=True, timeout=300 + ) + self.assertEqual(result.returncode, 0, + f"Java compact failed:\n{result.stdout}\n{result.stderr}") + + table = self.catalog.get_table('default.blob_compact_conflict_test') + snapshot_after = table.new_read_builder().new_scan().plan().snapshot_id + self.assertGreater(snapshot_after, snapshot_before) + + tc = wb.new_commit() + try: + with self.assertRaises(RuntimeError): + tc.commit(stale_commit_msgs) + finally: + tc.close() + @parameterized.expand(get_file_format_params()) def test_read_data_evolution_table(self, file_format): """Read data evolution tables written by Java and verify merged results.""" diff --git a/paimon-python/pypaimon/tests/partition_predicate_test.py b/paimon-python/pypaimon/tests/partition_predicate_test.py index abf5212d0e75..dbc50d0585e4 100644 --- a/paimon-python/pypaimon/tests/partition_predicate_test.py +++ b/paimon-python/pypaimon/tests/partition_predicate_test.py @@ -345,3 +345,38 @@ def test_passes_partition_predicate_to_file_scanner(self, mock_scanner_cls): self.assertIn('partition_predicate', kwargs) self.assertIsNotNone(kwargs['partition_predicate']) self.assertNotIn('predicate', kwargs) + + @patch('pypaimon.write.commit.commit_scanner.ManifestFileManager') + def test_raw_entries_preserve_delete_kind(self, mock_mfm_cls): + added = ManifestEntry( + kind=0, partition=GenericRow(['p1', 'us'], PARTITION_FIELDS), + bucket=0, total_buckets=1, file=Mock()) + deleted = ManifestEntry( + kind=1, partition=GenericRow(['p1', 'us'], PARTITION_FIELDS), + bucket=0, total_buckets=1, file=Mock()) + mock_mfm_cls.return_value.read.return_value = [added, deleted] + + scanner = self._scanner() + scanner.manifest_list_manager.read_delta.return_value = [Mock(file_name='m1')] + result = scanner.read_incremental_raw_entries_from_changed_partitions( + Mock(), [_manifest_entry(['p1', 'us'])]) + + self.assertEqual([e.kind for e in result], [0, 1]) + + @patch('pypaimon.write.commit.commit_scanner.ManifestFileManager') + def test_raw_entries_filter_unmatched_partition(self, mock_mfm_cls): + in_part = ManifestEntry( + kind=1, partition=GenericRow(['p1', 'us'], PARTITION_FIELDS), + bucket=0, total_buckets=1, file=Mock()) + out_part = ManifestEntry( + kind=1, partition=GenericRow(['p2', 'eu'], PARTITION_FIELDS), + bucket=0, total_buckets=1, file=Mock()) + mock_mfm_cls.return_value.read.return_value = [in_part, out_part] + + scanner = self._scanner() + scanner.manifest_list_manager.read_delta.return_value = [Mock(file_name='m1')] + result = scanner.read_incremental_raw_entries_from_changed_partitions( + Mock(), [_manifest_entry(['p1', 'us'])]) + + self.assertEqual(len(result), 1) + self.assertEqual(tuple(result[0].partition.values), ('p1', 'us')) diff --git a/paimon-python/pypaimon/tests/write/conflict_detection_test.py b/paimon-python/pypaimon/tests/write/conflict_detection_test.py index 72ac50f79b66..b035b38056c8 100644 --- a/paimon-python/pypaimon/tests/write/conflict_detection_test.py +++ b/paimon-python/pypaimon/tests/write/conflict_detection_test.py @@ -205,6 +205,112 @@ def test_skip_when_next_row_id_is_none(self): detection.check_row_id_existence(base, delta, next_row_id=None)) +class _FakeSnapshot: + + def __init__(self, snapshot_id, commit_kind, next_row_id=None): + self.id = snapshot_id + self.commit_kind = commit_kind + self.next_row_id = next_row_id + + +class _FakeSnapshotManager: + + def __init__(self, snapshots): + self._by_id = {s.id: s for s in snapshots} + + def get_snapshot_by_id(self, snapshot_id): + return self._by_id.get(snapshot_id) + + +class _FakeCommitScanner: + + def __init__(self, entries_by_snapshot_id, raw_entries_by_snapshot_id=None): + self._by_id = entries_by_snapshot_id + self._raw_by_id = raw_entries_by_snapshot_id or {} + + def read_incremental_entries_from_changed_partitions(self, snapshot, _): + return self._by_id.get(snapshot.id, []) + + def read_incremental_raw_entries_from_changed_partitions(self, snapshot, _): + return self._raw_by_id.get(snapshot.id, self._by_id.get(snapshot.id, [])) + + +class _FakeTable: + + def __init__(self, schema_manager): + self.schema_manager = schema_manager + + +class TestCheckRowIdFromSnapshot(unittest.TestCase): + + def _make_detection(self, snapshots, raw_entries_by_snapshot_id): + detection = ConflictDetection( + data_evolution_enabled=True, + snapshot_manager=_FakeSnapshotManager(snapshots), + manifest_list_manager=None, + table=_FakeTable(_FakeSchemaManager([_DEFAULT_SCHEMA])), + commit_scanner=_FakeCommitScanner({}, raw_entries_by_snapshot_id), + ) + detection._row_id_check_from_snapshot = 1 + return detection + + def _blob_delta(self): + return [_make_entry("d.blob", first_row_id=0, row_count=51, + write_cols=["col_a"])] + + def test_compact_blob_delete_raises_at_first_match(self): + check_snap = _FakeSnapshot(1, "APPEND", next_row_id=200) + compact1 = _FakeSnapshot(2, "COMPACT", next_row_id=200) + compact2 = _FakeSnapshot(3, "COMPACT", next_row_id=200) + entries = { + 2: [_make_entry("first.blob", kind=1, first_row_id=0, row_count=200)], + 3: [_make_entry("second.blob", kind=1, first_row_id=0, row_count=200)], + } + detection = self._make_detection( + [check_snap, compact1, compact2], entries) + result = detection.check_row_id_from_snapshot(compact2, self._blob_delta()) + self.assertIsNotNone(result) + self.assertIn("snapshot 2", str(result)) + self.assertIn("COMPACT", str(result)) + + def test_compact_other_file_type_does_not_raise(self): + check_snap = _FakeSnapshot(1, "APPEND", next_row_id=200) + compact_snap = _FakeSnapshot(2, "COMPACT", next_row_id=200) + compact_entries = [ + _make_entry("old.parquet", kind=1, first_row_id=0, row_count=100), + _make_entry("merged.parquet", kind=0, first_row_id=0, row_count=200), + ] + detection = self._make_detection( + [check_snap, compact_snap], {2: compact_entries}) + self.assertIsNone( + detection.check_row_id_from_snapshot(compact_snap, self._blob_delta())) + + def test_compact_no_conflict_when_no_matching_delete(self): + check_snap = _FakeSnapshot(1, "APPEND", next_row_id=400) + compact_snap = _FakeSnapshot(2, "COMPACT", next_row_id=400) + col_a_delta = self._blob_delta() + col_b_delta = [_make_entry("d.parquet", first_row_id=0, row_count=51, + write_cols=["col_b"])] + cases = [ + ("disjoint_range", col_a_delta, [ + _make_entry("old.blob", kind=1, first_row_id=200, row_count=200), + ]), + ("add_only", col_a_delta, [ + _make_entry("merged.blob", kind=0, first_row_id=0, row_count=200), + ]), + ("other_column_shard", col_b_delta, [ + _make_entry("old.parquet", kind=1, first_row_id=0, row_count=100, + write_cols=["col_a"]), + ]), + ] + for name, delta, compact_entries in cases: + with self.subTest(case=name): + detection = self._make_detection( + [check_snap, compact_snap], {2: compact_entries}) + self.assertIsNone( + detection.check_row_id_from_snapshot(compact_snap, delta)) + + class TestRowIdColumnConflictChecker(unittest.TestCase): def _make_checker(self, delta_files, schema=None): diff --git a/paimon-python/pypaimon/write/commit/commit_scanner.py b/paimon-python/pypaimon/write/commit/commit_scanner.py index 697d95f216a6..95702f1d02b8 100644 --- a/paimon-python/pypaimon/write/commit/commit_scanner.py +++ b/paimon-python/pypaimon/write/commit/commit_scanner.py @@ -21,6 +21,7 @@ from typing import Optional, List from pypaimon.common.predicate_builder import PredicateBuilder +from pypaimon.manifest.manifest_file_manager import ManifestFileManager from pypaimon.manifest.manifest_list_manager import ManifestListManager from pypaimon.manifest.schema.manifest_entry import ManifestEntry from pypaimon.read.scanner.file_scanner import FileScanner @@ -95,6 +96,26 @@ def read_incremental_entries_from_changed_partitions(self, snapshot: Snapshot, self.table, lambda: ([], None), partition_predicate=partition_filter ).read_manifest_entries(delta_manifests) + def read_incremental_raw_entries_from_changed_partitions(self, snapshot: Snapshot, + commit_entries: List[ManifestEntry]): + """Like ``read_incremental_entries_from_changed_partitions`` but + preserves DELETE entries (kind=1). The regular method funnels through + ``read_entries_parallel`` which discards standalone DELETEs. + """ + delta_manifests = self.manifest_list_manager.read_delta(snapshot) + if not delta_manifests: + return [] + + partition_filter = self._build_partition_filter_from_entries(commit_entries) + mfm = ManifestFileManager(self.table) + entries = [] + for mf in delta_manifests: + for entry in mfm.read(mf.file_name): + if partition_filter is not None and not partition_filter.test(entry.partition): + continue + entries.append(entry) + return entries + def _build_partition_filter_from_entries(self, entries: List[ManifestEntry]): """Build a partition predicate that matches all partitions present in the given entries. diff --git a/paimon-python/pypaimon/write/commit/conflict_detection.py b/paimon-python/pypaimon/write/commit/conflict_detection.py index 0d04f464cee8..0a7ae8e30b60 100644 --- a/paimon-python/pypaimon/write/commit/conflict_detection.py +++ b/paimon-python/pypaimon/write/commit/conflict_detection.py @@ -301,13 +301,27 @@ def check_row_id_from_snapshot(self, latest_snapshot, commit_entries): "{snapshot}.".format(snapshot=self._row_id_check_from_snapshot)) check_next_row_id = check_snapshot.next_row_id + # Pair each delta with its anchor file type so a parquet-only + # compact does not flag a blob delta whose .blob anchor is intact. + delta_signatures = [] + for f in delta_files: + r = f.row_id_range() + if r is not None: + delta_signatures.append( + (DataFileMeta.is_blob_file(f.file_name), r.from_, r.to)) + for snapshot_id in range( self._row_id_check_from_snapshot + 1, latest_snapshot.id + 1): snapshot = self.snapshot_manager.get_snapshot_by_id(snapshot_id) if snapshot is None: continue + if snapshot.commit_kind == "COMPACT": + err = self._compact_conflicts_with_delta( + snapshot, delta_signatures, column_checker, commit_entries) + if err is not None: + return err continue incremental_entries = self.commit_scanner.read_incremental_entries_from_changed_partitions( @@ -325,3 +339,46 @@ def check_row_id_from_snapshot(self, latest_snapshot, commit_entries): "ineffective.") return None + + def _compact_conflicts_with_delta(self, snapshot, delta_signatures, + column_checker, commit_entries): + """Return RuntimeError if a COMPACT snapshot deleted a same-kind + anchor file whose row-id range AND write columns overlap any + staged delta; otherwise None. + + File-type match guards against `write_cols=None` ambiguity (an + initial full-row parquet does not actually contain blob columns); + column_checker guards against unrelated column-write shards + (compacting an f1-only parquet must not block an f2 update on + the same row range). + """ + if not delta_signatures: + return None + raw_entries = self.commit_scanner.read_incremental_raw_entries_from_changed_partitions( + snapshot, commit_entries) + for entry in raw_entries: + if entry.kind != 1: + continue + file_range = entry.file.row_id_range() + if file_range is None: + continue + deleted_is_blob = DataFileMeta.is_blob_file(entry.file.file_name) + for delta_is_blob, from_, to in delta_signatures: + if delta_is_blob != deleted_is_blob: + continue + if file_range.from_ > to or from_ > file_range.to: + continue + if not column_checker.conflicts_with(entry.file): + continue + return RuntimeError( + "Blob/row-id update conflicts with concurrent COMPACT " + "(snapshot {sid}): anchor file {name} [{ff}, {ft}] " + "was compacted away, overlaps staged delta " + "[{df}, {dt}].".format( + sid=snapshot.id, + name=entry.file.file_name, + ff=file_range.from_, + ft=file_range.to, + df=from_, + dt=to)) + return None diff --git a/paimon-python/pypaimon/write/table_update_by_row_id.py b/paimon-python/pypaimon/write/table_update_by_row_id.py index d5653cce9858..a44bee4e64bd 100644 --- a/paimon-python/pypaimon/write/table_update_by_row_id.py +++ b/paimon-python/pypaimon/write/table_update_by_row_id.py @@ -177,7 +177,11 @@ def update_columns(self, data: pa.Table, column_names: List[str]) -> List[Commit if col_name not in self.table.field_names: raise ValueError(f"Column {col_name} not found in table schema") - sorted_data = data.sort_by([(SpecialFields.ROW_ID.name, "ascending")]) + sort_keys = [(SpecialFields.ROW_ID.name, "ascending")] + if hasattr(data, "sort_by"): + sorted_data = data.sort_by(sort_keys) + else: + sorted_data = data.take(pc.sort_indices(data, sort_keys=sort_keys)) data_with_first_row_id = self._calculate_first_row_id(sorted_data) self._write_by_first_row_id(data_with_first_row_id, column_names) @@ -311,7 +315,7 @@ def _merge_update_with_original( # Build a boolean mask: True at positions that need to be updated all_indices = pa.array(range(original_data.num_rows), type=pa.int64()) - mask = pc.is_in(all_indices, relative_indices) + mask = pc.is_in(all_indices, value_set=relative_indices) # Build the merged table column by column merged_columns = {}