Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -111,4 +111,12 @@ default Lookup lookupBy(String... lookupColumnNames) {
* @return the typed lookuper
*/
<T> TypedLookuper<T> createTypedLookuper(Class<T> pojoClass);

/**
* Enables insert-if-not-exists behavior for the lookup operation. When enabled, if a lookup
* does not find a matching row, a new row will be inserted with the lookup key values.
*
* @return a new Lookup instance with insert-if-not-exists enabled
*/
Lookup enableInsertIfNotExists();
}
Original file line number Diff line number Diff line change
Expand Up @@ -80,8 +80,11 @@ private ExecutorService createThreadPool() {
}

public CompletableFuture<byte[]> lookup(
TablePath tablePath, TableBucket tableBucket, byte[] keyBytes) {
LookupQuery lookup = new LookupQuery(tablePath, tableBucket, keyBytes);
TablePath tablePath,
TableBucket tableBucket,
byte[] keyBytes,
boolean insertIfNotExists) {
LookupQuery lookup = new LookupQuery(tablePath, tableBucket, keyBytes, insertIfNotExists);
lookupQueue.appendLookup(lookup);
return lookup.future();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,9 +32,21 @@ public class LookupQuery extends AbstractLookupQuery<byte[]> {

private final CompletableFuture<byte[]> future;

private final boolean insertIfNotExists;

LookupQuery(TablePath tablePath, TableBucket tableBucket, byte[] key) {
this(tablePath, tableBucket, key, false);
}

LookupQuery(
TablePath tablePath, TableBucket tableBucket, byte[] key, boolean insertIfNotExists) {
super(tablePath, tableBucket, key);
this.future = new CompletableFuture<>();
this.insertIfNotExists = insertIfNotExists;
}

public boolean insertIfNotExists() {
return insertIfNotExists;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ class PrimaryKeyLookuper extends AbstractLookuper implements Lookuper {

private final BucketingFunction bucketingFunction;
private final int numBuckets;
private final boolean insertIfNotExists;

/** a getter to extract partition from lookup key row, null when it's not a partitioned. */
private @Nullable final PartitionGetter partitionGetter;
Expand All @@ -60,13 +61,15 @@ public PrimaryKeyLookuper(
TableInfo tableInfo,
SchemaGetter schemaGetter,
MetadataUpdater metadataUpdater,
LookupClient lookupClient) {
LookupClient lookupClient,
boolean insertIfNotExists) {
super(tableInfo, metadataUpdater, lookupClient, schemaGetter);
checkArgument(
tableInfo.hasPrimaryKey(),
"Log table %s doesn't support lookup",
tableInfo.getTablePath());
this.numBuckets = tableInfo.getNumBuckets();
this.insertIfNotExists = insertIfNotExists;

// the row type of the input lookup row
RowType lookupRowType = tableInfo.getRowType().project(tableInfo.getPrimaryKeys());
Expand Down Expand Up @@ -114,7 +117,7 @@ public CompletableFuture<LookupResult> lookup(InternalRow lookupKey) {
TableBucket tableBucket = new TableBucket(tableInfo.getTableId(), partitionId, bucketId);
CompletableFuture<LookupResult> lookupFuture = new CompletableFuture<>();
lookupClient
.lookup(tableInfo.getTablePath(), tableBucket, pkBytes)
.lookup(tableInfo.getTablePath(), tableBucket, pkBytes, insertIfNotExists)
.whenComplete(
(result, error) -> {
if (error != null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,12 +35,14 @@ public class TableLookup implements Lookup {

@Nullable private final List<String> lookupColumnNames;

private final boolean insertIfNotExists;

public TableLookup(
TableInfo tableInfo,
SchemaGetter schemaGetter,
MetadataUpdater metadataUpdater,
LookupClient lookupClient) {
this(tableInfo, schemaGetter, metadataUpdater, lookupClient, null);
this(tableInfo, schemaGetter, metadataUpdater, lookupClient, null, false);
}

private TableLookup(
Expand All @@ -49,23 +51,71 @@ private TableLookup(
MetadataUpdater metadataUpdater,
LookupClient lookupClient,
@Nullable List<String> lookupColumnNames) {
this(tableInfo, schemaGetter, metadataUpdater, lookupClient, lookupColumnNames, false);
}

private TableLookup(
TableInfo tableInfo,
SchemaGetter schemaGetter,
MetadataUpdater metadataUpdater,
LookupClient lookupClient,
boolean insertIfNotExists) {
this(tableInfo, schemaGetter, metadataUpdater, lookupClient, null, insertIfNotExists);
}

private TableLookup(
TableInfo tableInfo,
SchemaGetter schemaGetter,
MetadataUpdater metadataUpdater,
LookupClient lookupClient,
@Nullable List<String> lookupColumnNames,
boolean insertIfNotExists) {
this.tableInfo = tableInfo;
this.schemaGetter = schemaGetter;
this.metadataUpdater = metadataUpdater;
this.lookupClient = lookupClient;
this.lookupColumnNames = lookupColumnNames;
this.insertIfNotExists = insertIfNotExists;
}

@Override
public Lookup enableInsertIfNotExists() {
if (lookupColumnNames != null) {
throw new IllegalArgumentException(
"insertIfNotExists can not be used with prefix lookup");
}

if (tableInfo.getSchema().getColumns().stream()
.filter(column -> column.getDataType().isNullable())
.filter(column -> !tableInfo.getPrimaryKeys().contains(column.getName()))
.anyMatch(
column ->
!tableInfo
.getSchema()
.getAutoIncrementColumnNames()
.contains(column.getName()))) {
throw new IllegalArgumentException(
"insertIfNotExists cannot be enabled for tables with nullable columns besides primary key and auto increment columns.");
}

return new TableLookup(tableInfo, schemaGetter, metadataUpdater, lookupClient, true);
}

@Override
public Lookup lookupBy(List<String> lookupColumnNames) {
if (insertIfNotExists) {
throw new IllegalArgumentException(
"insertIfNotExists can not be used with prefix lookup");
}
return new TableLookup(
tableInfo, schemaGetter, metadataUpdater, lookupClient, lookupColumnNames);
}

@Override
public Lookuper createLookuper() {
if (lookupColumnNames == null) {
return new PrimaryKeyLookuper(tableInfo, schemaGetter, metadataUpdater, lookupClient);
return new PrimaryKeyLookuper(
tableInfo, schemaGetter, metadataUpdater, lookupClient, insertIfNotExists);
} else {
return new PrefixKeyLookuper(
tableInfo, schemaGetter, metadataUpdater, lookupClient, lookupColumnNames);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -176,6 +176,11 @@ public static LookupRequest makeLookupRequest(
pbLookupReqForBucket.setPartitionId(tb.getPartitionId());
}
batch.lookups().forEach(get -> pbLookupReqForBucket.addKey(get.key()));
batch.lookups()
.forEach(
get ->
pbLookupReqForBucket.setInsertIfNotExists(
get.insertIfNotExists()));
});
return request;
}
Expand Down
1 change: 1 addition & 0 deletions fluss-rpc/src/main/proto/FlussApi.proto
Original file line number Diff line number Diff line change
Expand Up @@ -744,6 +744,7 @@ message PbLookupReqForBucket {
optional int64 partition_id = 1;
required int32 bucket_id = 2;
repeated bytes keys = 3;
optional bool insert_if_not_exists = 4;
}

message PbLookupRespForBucket {
Expand Down