Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,22 @@ public interface UpsertWriter extends TableWriter {
*/
CompletableFuture<UpsertResult> upsert(InternalRow record);

/**
* Retracts a previous aggregation contribution for the given row. The row must contain the
* primary key fields and the old aggregation values to retract.
*
* <p>This is used when a Flink aggregate operator emits UPDATE_BEFORE (retract old value) for a
* key. The retract record is sent independently with {@link
* org.apache.fluss.record.MutationType#RETRACT} mutation type.
*
* @param row the old aggregation value to retract (UPDATE_BEFORE).
* @return A {@link CompletableFuture} that returns upsert result when complete normally.
*/
default CompletableFuture<UpsertResult> retract(InternalRow row) {
throw new UnsupportedOperationException(
"retract() is not supported by this UpsertWriter implementation.");
}

/**
* Delete a certain record from the Fluss table. The input must contain the primary key fields.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import org.apache.fluss.client.write.WriteRecord;
import org.apache.fluss.client.write.WriterClient;
import org.apache.fluss.metadata.KvFormat;
import org.apache.fluss.metadata.MergeEngineType;
import org.apache.fluss.metadata.TableInfo;
import org.apache.fluss.metadata.TablePath;
import org.apache.fluss.row.BinaryRow;
Expand Down Expand Up @@ -57,6 +58,9 @@ class UpsertWriterImpl extends AbstractTableWriter implements UpsertWriter {
/** The merge mode for this writer. This controls how the server handles data merging. */
private final MergeMode mergeMode;

/** Whether this table supports retract (only AGGREGATION merge engine). */
private final boolean supportsRetract;

UpsertWriterImpl(
TablePath tablePath,
TableInfo tableInfo,
Expand Down Expand Up @@ -102,6 +106,12 @@ class UpsertWriterImpl extends AbstractTableWriter implements UpsertWriter {

this.tableInfo = tableInfo;
this.mergeMode = mergeMode;
this.supportsRetract =
tableInfo
.getTableConfig()
.getMergeEngineType()
.filter(t -> t == MergeEngineType.AGGREGATION)
.isPresent();
}

private static void sanityCheck(
Expand Down Expand Up @@ -217,6 +227,31 @@ public CompletableFuture<DeleteResult> delete(InternalRow row) {
return sendWithResult(record, DeleteResult::new);
}

@Override
public CompletableFuture<UpsertResult> retract(InternalRow row) {
if (!supportsRetract) {
throw new IllegalStateException(
"retract() is only supported for tables with AGGREGATION merge engine. "
+ "Table: "
+ tablePath);
}
checkFieldCount(row);
byte[] key = primaryKeyEncoder.encodeKey(row);
byte[] bucketKey =
bucketKeyEncoder == primaryKeyEncoder ? key : bucketKeyEncoder.encodeKey(row);
WriteRecord record =
WriteRecord.forRetract(
tableInfo,
getPhysicalPath(row),
encodeRow(row),
key,
bucketKey,
writeFormat,
targetColumns,
mergeMode);
return sendWithResult(record, UpsertResult::new);
}

private BinaryRow encodeRow(InternalRow row) {
if (kvFormat == KvFormat.INDEXED && row instanceof IndexedRow) {
return (IndexedRow) row;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -179,7 +179,6 @@ public static PutKvRequest makePutKvRequest(
if (targetColumns != null) {
request.setTargetColumns(targetColumns);
}
// Set mergeMode in the request - this is the proper way to pass mergeMode to server
request.setAggMode(mergeMode.getProtoValue());

readyWriteBatches.forEach(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,8 @@ protected AbstractRowLogWriteBatch(
}

@Override
public boolean tryAppend(WriteRecord writeRecord, WriteCallback callback) throws Exception {
public AppendResult tryAppend(WriteRecord writeRecord, WriteCallback callback)
throws Exception {
checkNotNull(callback, "write callback must be not null");
InternalRow rowObj = writeRecord.getRow();
checkNotNull(rowObj, "row must not be null for log record");
Expand All @@ -68,12 +69,12 @@ public boolean tryAppend(WriteRecord writeRecord, WriteCallback callback) throws

R row = requireAndCastRow(rowObj);
if (!recordsBuilder.hasRoomFor(row) || isClosed()) {
return false;
return AppendResult.BATCH_FULL;
}
recordsBuilder.append(ChangeType.APPEND_ONLY, row);
recordCount++;
callbacks.add(callback);
return true;
return AppendResult.APPENDED;
}

protected abstract R requireAndCastRow(InternalRow row);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,8 @@ public boolean isLogBatch() {
}

@Override
public boolean tryAppend(WriteRecord writeRecord, WriteCallback callback) throws Exception {
public AppendResult tryAppend(WriteRecord writeRecord, WriteCallback callback)
throws Exception {
InternalRow row = writeRecord.getRow();
checkArgument(
writeRecord.getTargetColumns() == null,
Expand All @@ -81,12 +82,12 @@ public boolean tryAppend(WriteRecord writeRecord, WriteCallback callback) throws
checkNotNull(row != null, "row must not be null for log record");
checkNotNull(callback, "write callback must be not null");
if (recordsBuilder.isClosed() || recordsBuilder.isFull()) {
return false;
return AppendResult.BATCH_FULL;
} else {
recordsBuilder.append(ChangeType.APPEND_ONLY, row);
recordCount++;
callbacks.add(callback);
return true;
return AppendResult.APPENDED;
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import org.apache.fluss.metadata.KvFormat;
import org.apache.fluss.metadata.PhysicalTablePath;
import org.apache.fluss.record.KvRecordBatchBuilder;
import org.apache.fluss.record.MutationType;
import org.apache.fluss.record.bytesview.BytesView;
import org.apache.fluss.row.BinaryRow;
import org.apache.fluss.row.InternalRow;
Expand Down Expand Up @@ -53,6 +54,7 @@ public class KvWriteBatch extends WriteBatch {
private final @Nullable int[] targetColumns;
private final int schemaId;
private final MergeMode mergeMode;
private final boolean v2Format;

public KvWriteBatch(
int bucketId,
Expand All @@ -63,14 +65,16 @@ public KvWriteBatch(
AbstractPagedOutputView outputView,
@Nullable int[] targetColumns,
MergeMode mergeMode,
boolean v2Format,
long createdMs) {
super(bucketId, physicalTablePath, createdMs);
this.outputView = outputView;
this.recordsBuilder =
KvRecordBatchBuilder.builder(schemaId, writeLimit, outputView, kvFormat);
KvRecordBatchBuilder.builder(schemaId, writeLimit, outputView, kvFormat, v2Format);
this.targetColumns = targetColumns;
this.schemaId = schemaId;
this.mergeMode = mergeMode;
this.v2Format = v2Format;
}

@Override
Expand All @@ -79,47 +83,62 @@ public boolean isLogBatch() {
}

@Override
public boolean tryAppend(WriteRecord writeRecord, WriteCallback callback) throws Exception {
public AppendResult tryAppend(WriteRecord writeRecord, WriteCallback callback)
throws Exception {
validateRecordConsistency(writeRecord);

// V0 batch cannot carry RETRACT records — caller must create a new V2 batch.
if (!v2Format && writeRecord.getMutationType() == MutationType.RETRACT) {
return AppendResult.FORMAT_MISMATCH;
}

byte[] key = writeRecord.getKey();
checkNotNull(key, "key must be not null for kv record");
checkNotNull(callback, "write callback must be not null");
BinaryRow row = checkRow(writeRecord.getRow());

if (v2Format) {
if (!recordsBuilder.hasRoomForV2(key, row) || isClosed()) {
return AppendResult.BATCH_FULL;
} else {
recordsBuilder.appendV2(writeRecord.getMutationType(), key, row);
callbacks.add(callback);
recordCount++;
return AppendResult.APPENDED;
}
} else {
if (!recordsBuilder.hasRoomFor(key, row) || isClosed()) {
return AppendResult.BATCH_FULL;
} else {
recordsBuilder.append(key, row);
callbacks.add(callback);
recordCount++;
return AppendResult.APPENDED;
}
}
}

private void validateRecordConsistency(WriteRecord writeRecord) {
if (schemaId != writeRecord.getSchemaId()) {
throw new IllegalStateException(
String.format(
"schema id %d of the write record to append is not the same as the current schema id %d in the batch.",
writeRecord.getSchemaId(), schemaId));
}

// currently, we throw exception directly when the target columns of the write record is
// not the same as the current target columns in the batch.
// this should be quite fast as they should be the same objects.
if (!Arrays.equals(targetColumns, writeRecord.getTargetColumns())) {
throw new IllegalStateException(
String.format(
"target columns %s of the write record to append are not the same as the current target columns %s in the batch.",
Arrays.toString(writeRecord.getTargetColumns()),
Arrays.toString(targetColumns)));
}

// Validate mergeMode consistency - records with different mergeMode cannot be batched
// together
if (writeRecord.getMergeMode() != this.mergeMode) {
throw new IllegalStateException(
String.format(
"Cannot mix records with different mergeMode in the same batch. "
+ "Batch mergeMode: %s, Record mergeMode: %s",
this.mergeMode, writeRecord.getMergeMode()));
}

byte[] key = writeRecord.getKey();
checkNotNull(key, "key must be not null for kv record");
checkNotNull(callback, "write callback must be not null");
BinaryRow row = checkRow(writeRecord.getRow());
if (!recordsBuilder.hasRoomFor(key, row) || isClosed()) {
return false;
} else {
recordsBuilder.append(key, row);
callbacks.add(callback);
recordCount++;
return true;
}
}

@Nullable
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import org.apache.fluss.memory.LazyMemorySegmentPool;
import org.apache.fluss.memory.MemorySegment;
import org.apache.fluss.memory.PreAllocatedPagedOutputView;
import org.apache.fluss.metadata.MergeEngineType;
import org.apache.fluss.metadata.PhysicalTablePath;
import org.apache.fluss.metadata.TableBucket;
import org.apache.fluss.metadata.TableInfo;
Expand Down Expand Up @@ -592,7 +593,14 @@ private RecordAppendResult appendNewBatch(
outputView,
schemaId);

batch.tryAppend(writeRecord, callback);
WriteBatch.AppendResult appendResult2 = batch.tryAppend(writeRecord, callback);
if (appendResult2 != WriteBatch.AppendResult.APPENDED) {
batch.close();
throw new FlussRuntimeException(
"Failed to append record to a freshly created batch (result="
+ appendResult2
+ "). The record may be too large for the configured batch size.");
}
deque.addLast(batch);
incomplete.add(batch);
return new RecordAppendResult(deque.size() > 1 || batch.isClosed(), true, false);
Expand All @@ -619,6 +627,11 @@ private WriteBatch createWriteBatch(
outputView,
writeRecord.getTargetColumns(),
writeRecord.getMergeMode(),
tableInfo
.getTableConfig()
.getMergeEngineType()
.filter(t -> t == MergeEngineType.AGGREGATION)
.isPresent(),
clock.milliseconds());

case ARROW_LOG:
Expand Down Expand Up @@ -675,17 +688,17 @@ private RecordAppendResult tryAppend(
}
WriteBatch last = deque.peekLast();
if (last != null) {
boolean success = last.tryAppend(writeRecord, callback);
if (!success) {
// TODO For ArrowLogWriteBatch, close here is a heavy operation (including build
// logic), we need to avoid do that in an lock which locked dq. However, why we not
// remove build logic out of close for ArrowLogWriteBatch is that we want to release
// non-heap memory hold by arrowWriter as soon as possible to avoid OOM. Maybe we
// need to introduce a more reasonable way to solve these two problems.
last.close();
} else {
WriteBatch.AppendResult result = last.tryAppend(writeRecord, callback);
if (result == WriteBatch.AppendResult.APPENDED) {
return new RecordAppendResult(deque.size() > 1 || last.isClosed(), false, false);
}
// BATCH_FULL or FORMAT_MISMATCH — close the current batch so a new one is created.
// TODO For ArrowLogWriteBatch, close here is a heavy operation (including build
// logic), we need to avoid do that in an lock which locked dq. However, why we not
// remove build logic out of close for ArrowLogWriteBatch is that we want to release
// non-heap memory hold by arrowWriter as soon as possible to avoid OOM. Maybe we
// need to introduce a more reasonable way to solve these two problems.
last.close();
}
return null;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,14 +71,24 @@ public WriteBatch(int bucketId, PhysicalTablePath physicalTablePath, long create
*/
public abstract boolean isLogBatch();

/** Result of a {@link #tryAppend} call. */
public enum AppendResult {
/** Record was successfully appended to the batch. */
APPENDED,
/** Batch has no room for the record or is closed. */
BATCH_FULL,
/** Batch format does not support this record's mutation type (e.g. V0 batch + RETRACT). */
FORMAT_MISMATCH
}

/**
* try to append one write record to the record batch.
* Try to append one write record to the record batch.
*
* @param writeRecord the record to write
* @param callback the callback to send back to writer
* @return true if append success, false if the batch is full.
* @return the result of the append attempt
*/
public abstract boolean tryAppend(WriteRecord writeRecord, WriteCallback callback)
public abstract AppendResult tryAppend(WriteRecord writeRecord, WriteCallback callback)
throws Exception;

/**
Expand Down
Loading
Loading