diff --git a/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/sink/FlussSinkITCase.java b/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/sink/FlussSinkITCase.java index 1a5e98eebb..6135342a41 100644 --- a/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/sink/FlussSinkITCase.java +++ b/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/sink/FlussSinkITCase.java @@ -383,11 +383,12 @@ public void testPartialUpdateWithTwoWriters() throws Exception { for (ScanRecord record : scanRecords.records(bucket)) { InternalRow row = record.getRow(); String address = row.getString(3) != null ? row.getString(3).toString() : null; + int amount = row.isNullAt(2) ? -1 : row.getInt(2); TestOrder order = new TestOrder( row.getLong(0), row.getLong(1), - row.getInt(2), + amount, address, toFlinkRowKind(record.getChangeType())); actual.add(order); diff --git a/fluss-server/src/main/java/org/apache/fluss/server/kv/KvTablet.java b/fluss-server/src/main/java/org/apache/fluss/server/kv/KvTablet.java index e72428a02f..52a9bae3f9 100644 --- a/fluss-server/src/main/java/org/apache/fluss/server/kv/KvTablet.java +++ b/fluss-server/src/main/java/org/apache/fluss/server/kv/KvTablet.java @@ -562,9 +562,13 @@ private long processUpsert( byte[] oldValueBytes = getFromBufferOrKv(key); if (oldValueBytes == null) { + // For partial updates on first insert, we need to merge to null out + // non-target columns. Without this, the full row (including non-target + // column values) would be stored as-is. + BinaryValue valueToInsert = currentMerger.mergeInsert(currentValue); return applyInsert( key, - currentValue, + valueToInsert, walBuilder, latestSchemaRow, logOffset, diff --git a/fluss-server/src/main/java/org/apache/fluss/server/kv/rowmerger/DefaultRowMerger.java b/fluss-server/src/main/java/org/apache/fluss/server/kv/rowmerger/DefaultRowMerger.java index 3717850fa8..88640fa603 100644 --- a/fluss-server/src/main/java/org/apache/fluss/server/kv/rowmerger/DefaultRowMerger.java +++ b/fluss-server/src/main/java/org/apache/fluss/server/kv/rowmerger/DefaultRowMerger.java @@ -102,6 +102,11 @@ public BinaryValue merge(BinaryValue oldValue, BinaryValue newValue) { return partialUpdater.updateRow(oldValue, newValue); } + @Override + public BinaryValue mergeInsert(BinaryValue newValue) { + return partialUpdater.updateRow(null, newValue); + } + @Nullable @Override public BinaryValue delete(BinaryValue oldRow) { diff --git a/fluss-server/src/main/java/org/apache/fluss/server/kv/rowmerger/RowMerger.java b/fluss-server/src/main/java/org/apache/fluss/server/kv/rowmerger/RowMerger.java index ed8b37a658..ebc92e158f 100644 --- a/fluss-server/src/main/java/org/apache/fluss/server/kv/rowmerger/RowMerger.java +++ b/fluss-server/src/main/java/org/apache/fluss/server/kv/rowmerger/RowMerger.java @@ -43,6 +43,18 @@ public interface RowMerger { */ BinaryValue merge(BinaryValue oldValue, BinaryValue newValue); + /** + * Process a new value being inserted for the first time (no existing row). Most mergers simply + * return the new value unchanged. Partial update mergers override this to null out non-target + * columns. + * + * @param newValue the new row being inserted + * @return the value to insert + */ + default BinaryValue mergeInsert(BinaryValue newValue) { + return newValue; + } + /** * Merge the old row with a delete row. * diff --git a/fluss-server/src/test/java/org/apache/fluss/server/kv/KvTabletTest.java b/fluss-server/src/test/java/org/apache/fluss/server/kv/KvTabletTest.java index da918f45e8..87a3058a20 100644 --- a/fluss-server/src/test/java/org/apache/fluss/server/kv/KvTabletTest.java +++ b/fluss-server/src/test/java/org/apache/fluss/server/kv/KvTabletTest.java @@ -463,6 +463,85 @@ void testPartialUpdateAndDelete() throws Exception { checkEqual(actualLogRecords, expectedLogs, rowType); } + @Test + void testPartialUpdateFirstInsertNullsNonTargetColumns() throws Exception { + initLogTabletAndKvTablet(DATA2_SCHEMA, new HashMap<>()); + RowType rowType = DATA2_SCHEMA.getRowType(); + KvRecordTestUtils.KvRecordFactory data2kvRecordFactory = + KvRecordTestUtils.KvRecordFactory.of(rowType); + + // Bug reproduction: partial update with targetColumns={0,1} (a, b) but the row + // contains non-null values for ALL columns including non-target column c. + // On first insert (no existing row), non-target columns should be set to null. + KvRecordBatch kvRecordBatch = + kvRecordBatchFactory.ofRecords( + data2kvRecordFactory.ofRecord( + "k1".getBytes(), new Object[] {1, "v1", "should_be_null"})); + + int[] targetColumns = new int[] {0, 1}; + kvTablet.putAsLeader(kvRecordBatch, targetColumns); + + // The stored value should have column c (index 2) set to null, + // NOT "should_be_null", because c is not in targetColumns. + assertThat(kvTablet.getKvPreWriteBuffer().get(Key.of("k1".getBytes()))) + .isEqualTo(valueOf(compactedRow(rowType, new Object[] {1, "v1", null}))); + + // Also verify CDC log emits the correct row with null for non-target column + LogRecords actualLogRecords = readLogRecords(); + List expectedLogs = + Collections.singletonList( + logRecords( + rowType, + 0, + Collections.singletonList(ChangeType.INSERT), + Collections.singletonList(new Object[] {1, "v1", null}))); + checkEqual(actualLogRecords, expectedLogs, rowType); + } + + @Test + void testPartialUpdateFirstInsertThenUpdate() throws Exception { + initLogTabletAndKvTablet(DATA2_SCHEMA, new HashMap<>()); + RowType rowType = DATA2_SCHEMA.getRowType(); + KvRecordTestUtils.KvRecordFactory data2kvRecordFactory = + KvRecordTestUtils.KvRecordFactory.of(rowType); + + // First insert: partial update columns a and b, column c should be null + KvRecordBatch batch1 = + kvRecordBatchFactory.ofRecords( + data2kvRecordFactory.ofRecord( + "k1".getBytes(), new Object[] {1, "v1", "ignored"})); + kvTablet.putAsLeader(batch1, new int[] {0, 1}); + long endOffset = logTablet.localLogEndOffset(); + + // Verify first insert stored correctly with null for non-target column + assertThat(kvTablet.getKvPreWriteBuffer().get(Key.of("k1".getBytes()))) + .isEqualTo(valueOf(compactedRow(rowType, new Object[] {1, "v1", null}))); + + // Second update: partial update columns a and c, column b should retain "v1" + KvRecordBatch batch2 = + kvRecordBatchFactory.ofRecords( + data2kvRecordFactory.ofRecord( + "k1".getBytes(), new Object[] {1, "ignored2", "c1"})); + kvTablet.putAsLeader(batch2, new int[] {0, 2}); + + // Verify: b should retain "v1" from first insert, c should be updated to "c1" + assertThat(kvTablet.getKvPreWriteBuffer().get(Key.of("k1".getBytes()))) + .isEqualTo(valueOf(compactedRow(rowType, new Object[] {1, "v1", "c1"}))); + + // Verify CDC log for the second update + LogRecords actualLogRecords = readLogRecords(endOffset); + List expectedLogs = + Collections.singletonList( + logRecords( + rowType, + endOffset, + Arrays.asList(ChangeType.UPDATE_BEFORE, ChangeType.UPDATE_AFTER), + Arrays.asList( + new Object[] {1, "v1", null}, + new Object[] {1, "v1", "c1"}))); + checkEqual(actualLogRecords, expectedLogs, rowType); + } + @Test void testPutWithMultiThread() throws Exception { initLogTabletAndKvTablet(DATA1_SCHEMA_PK, new HashMap<>());