Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
74 changes: 73 additions & 1 deletion paimon-core/src/test/java/org/apache/paimon/JavaPyE2ETest.java
Original file line number Diff line number Diff line change
Expand Up @@ -1288,6 +1288,73 @@ public void testCompactConflictRunCompact() throws Exception {
LOG.info("compact_conflict_test: compact done, 5 files merged into 1 (1000 rows)");
}

/**
* Step 1 for blob compact conflict test: write a blob table with 2 data files so compaction
* will merge them. Each file has 100 rows of (id, name, blob_data).
*/
@Test
@EnabledIfSystemProperty(named = "run.e2e.tests", matches = "true")
public void testBlobCompactConflictWriteBase() throws Exception {
Identifier id = identifier("blob_compact_conflict_test");
try {
catalog.dropTable(id, true);
} catch (Exception ignore) {
}
Schema schema =
Schema.newBuilder()
.column("f0", DataTypes.INT())
.column("f1", DataTypes.STRING())
.column("f2", DataTypes.BLOB())
.option("target-file-size", "100 MB")
.option(ROW_TRACKING_ENABLED.key(), "true")
.option(DATA_EVOLUTION_ENABLED.key(), "true")
.option("compaction.min.file-num", "2")
.option(BUCKET.key(), "-1")
.build();
catalog.createTable(id, schema, false);

byte[] blobBytes = new byte[64];
java.util.Random rng = new java.util.Random(42);

FileStoreTable table = (FileStoreTable) catalog.getTable(id);
BatchWriteBuilder builder = table.newBatchWriteBuilder();
try (BatchTableWrite w = builder.newWrite()) {
for (int i = 0; i < 100; i++) {
rng.nextBytes(blobBytes);
w.write(
GenericRow.of(
i,
BinaryString.fromString("name" + i),
new org.apache.paimon.data.BlobData(blobBytes.clone())));
}
builder.newCommit().commit(w.prepareCommit());
}

table = (FileStoreTable) catalog.getTable(id);
builder = table.newBatchWriteBuilder();
try (BatchTableWrite w = builder.newWrite()) {
for (int i = 100; i < 200; i++) {
rng.nextBytes(blobBytes);
w.write(
GenericRow.of(
i,
BinaryString.fromString("name" + i),
new org.apache.paimon.data.BlobData(blobBytes.clone())));
}
builder.newCommit().commit(w.prepareCommit());
}
LOG.info("blob_compact_conflict_test: 2 base files written (100 rows each, total 200)");
}

/** Step 3 for blob compact conflict test: run compact. */
@Test
@EnabledIfSystemProperty(named = "run.e2e.tests", matches = "true")
public void testBlobCompactConflictRunCompact() throws Exception {
Identifier id = identifier("blob_compact_conflict_test");
doDataEvolutionCompact((FileStoreTable) catalog.getTable(id), true);
LOG.info("blob_compact_conflict_test: compact done (compactBlob=true)");
}

@Test
@EnabledIfSystemProperty(named = "run.e2e.tests", matches = "true")
public void testDataEvolutionWrite() throws Exception {
Expand Down Expand Up @@ -1373,8 +1440,13 @@ private void setFirstRowId(List<CommitMessage> messages, long firstRowId) {
}

private void doDataEvolutionCompact(FileStoreTable table) throws Exception {
doDataEvolutionCompact(table, false);
}

private void doDataEvolutionCompact(FileStoreTable table, boolean compactBlob)
throws Exception {
DataEvolutionCompactCoordinator coordinator =
new DataEvolutionCompactCoordinator(table, false, false);
new DataEvolutionCompactCoordinator(table, compactBlob, false);
List<CommitMessage> messages = new ArrayList<>();
try {
List<DataEvolutionCompactTask> tasks;
Expand Down
40 changes: 39 additions & 1 deletion paimon-python/dev/run_mixed_tests.sh
Original file line number Diff line number Diff line change
Expand Up @@ -462,6 +462,30 @@ run_compact_conflict_test() {
fi
}

run_blob_compact_conflict_test() {
echo -e "${YELLOW}=== Running Blob Compact Conflict Test (Java Write Base, Python Blob Update + Java Compact) ===${NC}"

cd "$PROJECT_ROOT"

echo "Running Maven test for JavaPyE2ETest.testBlobCompactConflictWriteBase..."
if mvn test -Dtest=org.apache.paimon.JavaPyE2ETest#testBlobCompactConflictWriteBase -pl paimon-core -q -Drun.e2e.tests=true; then
echo -e "${GREEN}✓ Java write base blob files completed successfully${NC}"
else
echo -e "${RED}✗ Java write base blob files failed${NC}"
return 1
fi

cd "$PAIMON_PYTHON_DIR"
echo "Running Python test for JavaPyReadWriteTest.test_blob_compact_conflict_update..."
if python -m pytest java_py_read_write_test.py::JavaPyReadWriteTest::test_blob_compact_conflict_update -v; then
echo -e "${GREEN}✓ Python blob compact conflict test completed successfully${NC}"
return 0
else
echo -e "${RED}✗ Python blob compact conflict test failed${NC}"
return 1
fi
}

run_data_evolution_test() {
echo -e "${YELLOW}=== Running Data Evolution Test (Java Write, Python Read) ===${NC}"

Expand Down Expand Up @@ -673,6 +697,7 @@ main() {
local lumina_vector_result=0
local lumina_vector_btree_result=0
local compact_conflict_result=0
local blob_compact_conflict_result=0
local blob_alter_compact_result=0
local data_evolution_result=0
local data_evolution_py_write_result=0
Expand Down Expand Up @@ -823,6 +848,13 @@ main() {

echo ""

# Run blob compact conflict test (Java write base + Python blob update + Java compact)
if ! run_blob_compact_conflict_test; then
blob_compact_conflict_result=1
fi

echo ""

# Run blob alter+compact test (Java write+alter+compact, Python read)
if ! run_blob_alter_compact_test; then
blob_alter_compact_result=1
Expand Down Expand Up @@ -968,6 +1000,12 @@ main() {
echo -e "${RED}✗ Compact Conflict Test (Java Write+Compact, Python Read): FAILED${NC}"
fi

if [[ $blob_compact_conflict_result -eq 0 ]]; then
echo -e "${GREEN}✓ Blob Compact Conflict Test (Java Write+Compact, Python Blob Update): PASSED${NC}"
else
echo -e "${RED}✗ Blob Compact Conflict Test (Java Write+Compact, Python Blob Update): FAILED${NC}"
fi

if [[ $blob_alter_compact_result -eq 0 ]]; then
echo -e "${GREEN}✓ Blob Alter+Compact Test (Java Write+Alter+Compact, Python Read): PASSED${NC}"
else
Expand Down Expand Up @@ -1009,7 +1047,7 @@ main() {
# Clean up warehouse directory after all tests
cleanup_warehouse

if [[ $java_write_result -eq 0 && $python_read_result -eq 0 && $python_write_result -eq 0 && $java_read_result -eq 0 && $pk_dv_result -eq 0 && $btree_index_result -eq 0 && $compressed_text_result -eq 0 && $tantivy_fulltext_result -eq 0 && $lumina_vector_result -eq 0 && $lumina_vector_btree_result -eq 0 && $compact_conflict_result -eq 0 && $blob_alter_compact_result -eq 0 && $data_evolution_result -eq 0 && $data_evolution_py_write_result -eq 0 && $java_variant_write_py_read_result -eq 0 && $py_variant_write_java_read_result -eq 0 && $vector_append_table_result -eq 0 && $vector_dedicated_java_write_result -eq 0 && $vector_dedicated_py_write_result -eq 0 && $multi_vector_dedicated_java_write_result -eq 0 && $multi_vector_dedicated_py_write_result -eq 0 && $row_format_result -eq 0 ]]; then
if [[ $java_write_result -eq 0 && $python_read_result -eq 0 && $python_write_result -eq 0 && $java_read_result -eq 0 && $pk_dv_result -eq 0 && $btree_index_result -eq 0 && $compressed_text_result -eq 0 && $tantivy_fulltext_result -eq 0 && $lumina_vector_result -eq 0 && $lumina_vector_btree_result -eq 0 && $compact_conflict_result -eq 0 && $blob_compact_conflict_result -eq 0 && $blob_alter_compact_result -eq 0 && $data_evolution_result -eq 0 && $data_evolution_py_write_result -eq 0 && $java_variant_write_py_read_result -eq 0 && $py_variant_write_java_read_result -eq 0 && $vector_append_table_result -eq 0 && $vector_dedicated_java_write_result -eq 0 && $vector_dedicated_py_write_result -eq 0 && $multi_vector_dedicated_java_write_result -eq 0 && $multi_vector_dedicated_py_write_result -eq 0 && $row_format_result -eq 0 ]]; then
echo -e "${GREEN}🎉 All tests passed! Java-Python interoperability verified.${NC}"
return 0
else
Expand Down
40 changes: 40 additions & 0 deletions paimon-python/pypaimon/tests/e2e/java_py_read_write_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -1190,6 +1190,46 @@ def test_compact_conflict_shard_update(self):
tc.close()
print(f"Conflict detected as expected: {ctx.exception}")

def test_blob_compact_conflict_update(self):
import subprocess

table = self.catalog.get_table('default.blob_compact_conflict_test')
snapshot_before = table.new_read_builder().new_scan().plan().snapshot_id

wb = table.new_batch_write_builder()
table_update = wb.new_update().with_update_type(['f2'])
update_data = pa.Table.from_pydict({
'_ROW_ID': pa.array([50], type=pa.int64()),
'f2': pa.array([b'blob50-updated'], type=pa.large_binary()),
})
stale_commit_msgs = table_update.update_by_arrow_with_row_id(update_data)

project_root = os.path.join(self.tempdir, '..', '..', '..', '..')
result = subprocess.run(
['mvn', 'test',
'-pl', 'paimon-core',
'-Dtest=org.apache.paimon.JavaPyE2ETest#testBlobCompactConflictRunCompact',
'-Drun.e2e.tests=true',
'-Dsurefire.failIfNoSpecifiedTests=false',
'-q'],
cwd=os.path.abspath(project_root),
stdout=subprocess.PIPE, stderr=subprocess.PIPE,
universal_newlines=True, timeout=300
)
self.assertEqual(result.returncode, 0,
f"Java compact failed:\n{result.stdout}\n{result.stderr}")

table = self.catalog.get_table('default.blob_compact_conflict_test')
snapshot_after = table.new_read_builder().new_scan().plan().snapshot_id
self.assertGreater(snapshot_after, snapshot_before)

tc = wb.new_commit()
try:
with self.assertRaises(RuntimeError):
tc.commit(stale_commit_msgs)
finally:
tc.close()

@parameterized.expand(get_file_format_params())
def test_read_data_evolution_table(self, file_format):
"""Read data evolution tables written by Java and verify merged results."""
Expand Down
35 changes: 35 additions & 0 deletions paimon-python/pypaimon/tests/partition_predicate_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -345,3 +345,38 @@ def test_passes_partition_predicate_to_file_scanner(self, mock_scanner_cls):
self.assertIn('partition_predicate', kwargs)
self.assertIsNotNone(kwargs['partition_predicate'])
self.assertNotIn('predicate', kwargs)

@patch('pypaimon.write.commit.commit_scanner.ManifestFileManager')
def test_raw_entries_preserve_delete_kind(self, mock_mfm_cls):
added = ManifestEntry(
kind=0, partition=GenericRow(['p1', 'us'], PARTITION_FIELDS),
bucket=0, total_buckets=1, file=Mock())
deleted = ManifestEntry(
kind=1, partition=GenericRow(['p1', 'us'], PARTITION_FIELDS),
bucket=0, total_buckets=1, file=Mock())
mock_mfm_cls.return_value.read.return_value = [added, deleted]

scanner = self._scanner()
scanner.manifest_list_manager.read_delta.return_value = [Mock(file_name='m1')]
result = scanner.read_incremental_raw_entries_from_changed_partitions(
Mock(), [_manifest_entry(['p1', 'us'])])

self.assertEqual([e.kind for e in result], [0, 1])

@patch('pypaimon.write.commit.commit_scanner.ManifestFileManager')
def test_raw_entries_filter_unmatched_partition(self, mock_mfm_cls):
in_part = ManifestEntry(
kind=1, partition=GenericRow(['p1', 'us'], PARTITION_FIELDS),
bucket=0, total_buckets=1, file=Mock())
out_part = ManifestEntry(
kind=1, partition=GenericRow(['p2', 'eu'], PARTITION_FIELDS),
bucket=0, total_buckets=1, file=Mock())
mock_mfm_cls.return_value.read.return_value = [in_part, out_part]

scanner = self._scanner()
scanner.manifest_list_manager.read_delta.return_value = [Mock(file_name='m1')]
result = scanner.read_incremental_raw_entries_from_changed_partitions(
Mock(), [_manifest_entry(['p1', 'us'])])

self.assertEqual(len(result), 1)
self.assertEqual(tuple(result[0].partition.values), ('p1', 'us'))
106 changes: 106 additions & 0 deletions paimon-python/pypaimon/tests/write/conflict_detection_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -205,6 +205,112 @@ def test_skip_when_next_row_id_is_none(self):
detection.check_row_id_existence(base, delta, next_row_id=None))


class _FakeSnapshot:

def __init__(self, snapshot_id, commit_kind, next_row_id=None):
self.id = snapshot_id
self.commit_kind = commit_kind
self.next_row_id = next_row_id


class _FakeSnapshotManager:

def __init__(self, snapshots):
self._by_id = {s.id: s for s in snapshots}

def get_snapshot_by_id(self, snapshot_id):
return self._by_id.get(snapshot_id)


class _FakeCommitScanner:

def __init__(self, entries_by_snapshot_id, raw_entries_by_snapshot_id=None):
self._by_id = entries_by_snapshot_id
self._raw_by_id = raw_entries_by_snapshot_id or {}

def read_incremental_entries_from_changed_partitions(self, snapshot, _):
return self._by_id.get(snapshot.id, [])

def read_incremental_raw_entries_from_changed_partitions(self, snapshot, _):
return self._raw_by_id.get(snapshot.id, self._by_id.get(snapshot.id, []))


class _FakeTable:

def __init__(self, schema_manager):
self.schema_manager = schema_manager


class TestCheckRowIdFromSnapshot(unittest.TestCase):

def _make_detection(self, snapshots, raw_entries_by_snapshot_id):
detection = ConflictDetection(
data_evolution_enabled=True,
snapshot_manager=_FakeSnapshotManager(snapshots),
manifest_list_manager=None,
table=_FakeTable(_FakeSchemaManager([_DEFAULT_SCHEMA])),
commit_scanner=_FakeCommitScanner({}, raw_entries_by_snapshot_id),
)
detection._row_id_check_from_snapshot = 1
return detection

def _blob_delta(self):
return [_make_entry("d.blob", first_row_id=0, row_count=51,
write_cols=["col_a"])]

def test_compact_blob_delete_raises_at_first_match(self):
check_snap = _FakeSnapshot(1, "APPEND", next_row_id=200)
compact1 = _FakeSnapshot(2, "COMPACT", next_row_id=200)
compact2 = _FakeSnapshot(3, "COMPACT", next_row_id=200)
entries = {
2: [_make_entry("first.blob", kind=1, first_row_id=0, row_count=200)],
3: [_make_entry("second.blob", kind=1, first_row_id=0, row_count=200)],
}
detection = self._make_detection(
[check_snap, compact1, compact2], entries)
result = detection.check_row_id_from_snapshot(compact2, self._blob_delta())
self.assertIsNotNone(result)
self.assertIn("snapshot 2", str(result))
self.assertIn("COMPACT", str(result))

def test_compact_other_file_type_does_not_raise(self):
check_snap = _FakeSnapshot(1, "APPEND", next_row_id=200)
compact_snap = _FakeSnapshot(2, "COMPACT", next_row_id=200)
compact_entries = [
_make_entry("old.parquet", kind=1, first_row_id=0, row_count=100),
_make_entry("merged.parquet", kind=0, first_row_id=0, row_count=200),
]
detection = self._make_detection(
[check_snap, compact_snap], {2: compact_entries})
self.assertIsNone(
detection.check_row_id_from_snapshot(compact_snap, self._blob_delta()))

def test_compact_no_conflict_when_no_matching_delete(self):
check_snap = _FakeSnapshot(1, "APPEND", next_row_id=400)
compact_snap = _FakeSnapshot(2, "COMPACT", next_row_id=400)
col_a_delta = self._blob_delta()
col_b_delta = [_make_entry("d.parquet", first_row_id=0, row_count=51,
write_cols=["col_b"])]
cases = [
("disjoint_range", col_a_delta, [
_make_entry("old.blob", kind=1, first_row_id=200, row_count=200),
]),
("add_only", col_a_delta, [
_make_entry("merged.blob", kind=0, first_row_id=0, row_count=200),
]),
("other_column_shard", col_b_delta, [
_make_entry("old.parquet", kind=1, first_row_id=0, row_count=100,
write_cols=["col_a"]),
]),
]
for name, delta, compact_entries in cases:
with self.subTest(case=name):
detection = self._make_detection(
[check_snap, compact_snap], {2: compact_entries})
self.assertIsNone(
detection.check_row_id_from_snapshot(compact_snap, delta))


class TestRowIdColumnConflictChecker(unittest.TestCase):

def _make_checker(self, delta_files, schema=None):
Expand Down
21 changes: 21 additions & 0 deletions paimon-python/pypaimon/write/commit/commit_scanner.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
from typing import Optional, List

from pypaimon.common.predicate_builder import PredicateBuilder
from pypaimon.manifest.manifest_file_manager import ManifestFileManager
from pypaimon.manifest.manifest_list_manager import ManifestListManager
from pypaimon.manifest.schema.manifest_entry import ManifestEntry
from pypaimon.read.scanner.file_scanner import FileScanner
Expand Down Expand Up @@ -95,6 +96,26 @@ def read_incremental_entries_from_changed_partitions(self, snapshot: Snapshot,
self.table, lambda: ([], None), partition_predicate=partition_filter
).read_manifest_entries(delta_manifests)

def read_incremental_raw_entries_from_changed_partitions(self, snapshot: Snapshot,
commit_entries: List[ManifestEntry]):
"""Like ``read_incremental_entries_from_changed_partitions`` but
preserves DELETE entries (kind=1). The regular method funnels through
``read_entries_parallel`` which discards standalone DELETEs.
"""
delta_manifests = self.manifest_list_manager.read_delta(snapshot)
if not delta_manifests:
return []

partition_filter = self._build_partition_filter_from_entries(commit_entries)
mfm = ManifestFileManager(self.table)
entries = []
for mf in delta_manifests:
for entry in mfm.read(mf.file_name):
if partition_filter is not None and not partition_filter.test(entry.partition):
continue
entries.append(entry)
return entries

def _build_partition_filter_from_entries(self, entries: List[ManifestEntry]):
"""Build a partition predicate that matches all partitions present in the given entries.

Expand Down
Loading
Loading