Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -383,11 +383,12 @@ public void testPartialUpdateWithTwoWriters() throws Exception {
for (ScanRecord record : scanRecords.records(bucket)) {
InternalRow row = record.getRow();
String address = row.getString(3) != null ? row.getString(3).toString() : null;
int amount = row.isNullAt(2) ? -1 : row.getInt(2);
TestOrder order =
new TestOrder(
row.getLong(0),
row.getLong(1),
row.getInt(2),
amount,
address,
toFlinkRowKind(record.getChangeType()));
actual.add(order);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -562,9 +562,13 @@ private long processUpsert(

byte[] oldValueBytes = getFromBufferOrKv(key);
if (oldValueBytes == null) {
// For partial updates on first insert, we need to merge to null out
// non-target columns. Without this, the full row (including non-target
// column values) would be stored as-is.
BinaryValue valueToInsert = currentMerger.mergeInsert(currentValue);
return applyInsert(
key,
currentValue,
valueToInsert,
walBuilder,
latestSchemaRow,
logOffset,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,11 @@ public BinaryValue merge(BinaryValue oldValue, BinaryValue newValue) {
return partialUpdater.updateRow(oldValue, newValue);
}

@Override
public BinaryValue mergeInsert(BinaryValue newValue) {
return partialUpdater.updateRow(null, newValue);
}

@Nullable
@Override
public BinaryValue delete(BinaryValue oldRow) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,18 @@ public interface RowMerger {
*/
BinaryValue merge(BinaryValue oldValue, BinaryValue newValue);

/**
* Process a new value being inserted for the first time (no existing row). Most mergers simply
* return the new value unchanged. Partial update mergers override this to null out non-target
* columns.
*
* @param newValue the new row being inserted
* @return the value to insert
*/
default BinaryValue mergeInsert(BinaryValue newValue) {
return newValue;
}

/**
* Merge the old row with a delete row.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -463,6 +463,85 @@ void testPartialUpdateAndDelete() throws Exception {
checkEqual(actualLogRecords, expectedLogs, rowType);
}

@Test
void testPartialUpdateFirstInsertNullsNonTargetColumns() throws Exception {
initLogTabletAndKvTablet(DATA2_SCHEMA, new HashMap<>());
RowType rowType = DATA2_SCHEMA.getRowType();
KvRecordTestUtils.KvRecordFactory data2kvRecordFactory =
KvRecordTestUtils.KvRecordFactory.of(rowType);

// Bug reproduction: partial update with targetColumns={0,1} (a, b) but the row
// contains non-null values for ALL columns including non-target column c.
// On first insert (no existing row), non-target columns should be set to null.
KvRecordBatch kvRecordBatch =
kvRecordBatchFactory.ofRecords(
data2kvRecordFactory.ofRecord(
"k1".getBytes(), new Object[] {1, "v1", "should_be_null"}));

int[] targetColumns = new int[] {0, 1};
kvTablet.putAsLeader(kvRecordBatch, targetColumns);

// The stored value should have column c (index 2) set to null,
// NOT "should_be_null", because c is not in targetColumns.
assertThat(kvTablet.getKvPreWriteBuffer().get(Key.of("k1".getBytes())))
.isEqualTo(valueOf(compactedRow(rowType, new Object[] {1, "v1", null})));

// Also verify CDC log emits the correct row with null for non-target column
LogRecords actualLogRecords = readLogRecords();
List<MemoryLogRecords> expectedLogs =
Collections.singletonList(
logRecords(
rowType,
0,
Collections.singletonList(ChangeType.INSERT),
Collections.singletonList(new Object[] {1, "v1", null})));
checkEqual(actualLogRecords, expectedLogs, rowType);
}

@Test
void testPartialUpdateFirstInsertThenUpdate() throws Exception {
initLogTabletAndKvTablet(DATA2_SCHEMA, new HashMap<>());
RowType rowType = DATA2_SCHEMA.getRowType();
KvRecordTestUtils.KvRecordFactory data2kvRecordFactory =
KvRecordTestUtils.KvRecordFactory.of(rowType);

// First insert: partial update columns a and b, column c should be null
KvRecordBatch batch1 =
kvRecordBatchFactory.ofRecords(
data2kvRecordFactory.ofRecord(
"k1".getBytes(), new Object[] {1, "v1", "ignored"}));
kvTablet.putAsLeader(batch1, new int[] {0, 1});
long endOffset = logTablet.localLogEndOffset();

// Verify first insert stored correctly with null for non-target column
assertThat(kvTablet.getKvPreWriteBuffer().get(Key.of("k1".getBytes())))
.isEqualTo(valueOf(compactedRow(rowType, new Object[] {1, "v1", null})));

// Second update: partial update columns a and c, column b should retain "v1"
KvRecordBatch batch2 =
kvRecordBatchFactory.ofRecords(
data2kvRecordFactory.ofRecord(
"k1".getBytes(), new Object[] {1, "ignored2", "c1"}));
kvTablet.putAsLeader(batch2, new int[] {0, 2});

// Verify: b should retain "v1" from first insert, c should be updated to "c1"
assertThat(kvTablet.getKvPreWriteBuffer().get(Key.of("k1".getBytes())))
.isEqualTo(valueOf(compactedRow(rowType, new Object[] {1, "v1", "c1"})));

// Verify CDC log for the second update
LogRecords actualLogRecords = readLogRecords(endOffset);
List<MemoryLogRecords> expectedLogs =
Collections.singletonList(
logRecords(
rowType,
endOffset,
Arrays.asList(ChangeType.UPDATE_BEFORE, ChangeType.UPDATE_AFTER),
Arrays.asList(
new Object[] {1, "v1", null},
new Object[] {1, "v1", "c1"})));
checkEqual(actualLogRecords, expectedLogs, rowType);
}

@Test
void testPutWithMultiThread() throws Exception {
initLogTabletAndKvTablet(DATA1_SCHEMA_PK, new HashMap<>());
Expand Down
Loading