diff --git a/fluss-client/src/main/java/org/apache/fluss/client/lookup/Lookup.java b/fluss-client/src/main/java/org/apache/fluss/client/lookup/Lookup.java index e64685b9a0..0850b3f5be 100644 --- a/fluss-client/src/main/java/org/apache/fluss/client/lookup/Lookup.java +++ b/fluss-client/src/main/java/org/apache/fluss/client/lookup/Lookup.java @@ -111,4 +111,12 @@ default Lookup lookupBy(String... lookupColumnNames) { * @return the typed lookuper */ TypedLookuper createTypedLookuper(Class pojoClass); + + /** + * Enables insert-if-not-exists behavior for the lookup operation. When enabled, if a lookup + * does not find a matching row, a new row will be inserted with the lookup key values. + * + * @return a new Lookup instance with insert-if-not-exists enabled + */ + Lookup enableInsertIfNotExists(); } diff --git a/fluss-client/src/main/java/org/apache/fluss/client/lookup/LookupClient.java b/fluss-client/src/main/java/org/apache/fluss/client/lookup/LookupClient.java index bcea2302c3..1478a0a221 100644 --- a/fluss-client/src/main/java/org/apache/fluss/client/lookup/LookupClient.java +++ b/fluss-client/src/main/java/org/apache/fluss/client/lookup/LookupClient.java @@ -80,8 +80,11 @@ private ExecutorService createThreadPool() { } public CompletableFuture lookup( - TablePath tablePath, TableBucket tableBucket, byte[] keyBytes) { - LookupQuery lookup = new LookupQuery(tablePath, tableBucket, keyBytes); + TablePath tablePath, + TableBucket tableBucket, + byte[] keyBytes, + boolean insertIfNotExists) { + LookupQuery lookup = new LookupQuery(tablePath, tableBucket, keyBytes, insertIfNotExists); lookupQueue.appendLookup(lookup); return lookup.future(); } diff --git a/fluss-client/src/main/java/org/apache/fluss/client/lookup/LookupQuery.java b/fluss-client/src/main/java/org/apache/fluss/client/lookup/LookupQuery.java index cc6e70e345..3122c7e38b 100644 --- a/fluss-client/src/main/java/org/apache/fluss/client/lookup/LookupQuery.java +++ b/fluss-client/src/main/java/org/apache/fluss/client/lookup/LookupQuery.java @@ -32,9 +32,21 @@ public class LookupQuery extends AbstractLookupQuery { private final CompletableFuture future; + private final boolean insertIfNotExists; + LookupQuery(TablePath tablePath, TableBucket tableBucket, byte[] key) { + this(tablePath, tableBucket, key, false); + } + + LookupQuery( + TablePath tablePath, TableBucket tableBucket, byte[] key, boolean insertIfNotExists) { super(tablePath, tableBucket, key); this.future = new CompletableFuture<>(); + this.insertIfNotExists = insertIfNotExists; + } + + public boolean insertIfNotExists() { + return insertIfNotExists; } @Override diff --git a/fluss-client/src/main/java/org/apache/fluss/client/lookup/PrimaryKeyLookuper.java b/fluss-client/src/main/java/org/apache/fluss/client/lookup/PrimaryKeyLookuper.java index d2cd949630..5ef245087b 100644 --- a/fluss-client/src/main/java/org/apache/fluss/client/lookup/PrimaryKeyLookuper.java +++ b/fluss-client/src/main/java/org/apache/fluss/client/lookup/PrimaryKeyLookuper.java @@ -52,6 +52,7 @@ class PrimaryKeyLookuper extends AbstractLookuper implements Lookuper { private final BucketingFunction bucketingFunction; private final int numBuckets; + private final boolean insertIfNotExists; /** a getter to extract partition from lookup key row, null when it's not a partitioned. */ private @Nullable final PartitionGetter partitionGetter; @@ -60,13 +61,15 @@ public PrimaryKeyLookuper( TableInfo tableInfo, SchemaGetter schemaGetter, MetadataUpdater metadataUpdater, - LookupClient lookupClient) { + LookupClient lookupClient, + boolean insertIfNotExists) { super(tableInfo, metadataUpdater, lookupClient, schemaGetter); checkArgument( tableInfo.hasPrimaryKey(), "Log table %s doesn't support lookup", tableInfo.getTablePath()); this.numBuckets = tableInfo.getNumBuckets(); + this.insertIfNotExists = insertIfNotExists; // the row type of the input lookup row RowType lookupRowType = tableInfo.getRowType().project(tableInfo.getPrimaryKeys()); @@ -114,7 +117,7 @@ public CompletableFuture lookup(InternalRow lookupKey) { TableBucket tableBucket = new TableBucket(tableInfo.getTableId(), partitionId, bucketId); CompletableFuture lookupFuture = new CompletableFuture<>(); lookupClient - .lookup(tableInfo.getTablePath(), tableBucket, pkBytes) + .lookup(tableInfo.getTablePath(), tableBucket, pkBytes, insertIfNotExists) .whenComplete( (result, error) -> { if (error != null) { diff --git a/fluss-client/src/main/java/org/apache/fluss/client/lookup/TableLookup.java b/fluss-client/src/main/java/org/apache/fluss/client/lookup/TableLookup.java index e0c92661f1..b2401ad4d6 100644 --- a/fluss-client/src/main/java/org/apache/fluss/client/lookup/TableLookup.java +++ b/fluss-client/src/main/java/org/apache/fluss/client/lookup/TableLookup.java @@ -35,12 +35,14 @@ public class TableLookup implements Lookup { @Nullable private final List lookupColumnNames; + private final boolean insertIfNotExists; + public TableLookup( TableInfo tableInfo, SchemaGetter schemaGetter, MetadataUpdater metadataUpdater, LookupClient lookupClient) { - this(tableInfo, schemaGetter, metadataUpdater, lookupClient, null); + this(tableInfo, schemaGetter, metadataUpdater, lookupClient, null, false); } private TableLookup( @@ -49,15 +51,62 @@ private TableLookup( MetadataUpdater metadataUpdater, LookupClient lookupClient, @Nullable List lookupColumnNames) { + this(tableInfo, schemaGetter, metadataUpdater, lookupClient, lookupColumnNames, false); + } + + private TableLookup( + TableInfo tableInfo, + SchemaGetter schemaGetter, + MetadataUpdater metadataUpdater, + LookupClient lookupClient, + boolean insertIfNotExists) { + this(tableInfo, schemaGetter, metadataUpdater, lookupClient, null, insertIfNotExists); + } + + private TableLookup( + TableInfo tableInfo, + SchemaGetter schemaGetter, + MetadataUpdater metadataUpdater, + LookupClient lookupClient, + @Nullable List lookupColumnNames, + boolean insertIfNotExists) { this.tableInfo = tableInfo; this.schemaGetter = schemaGetter; this.metadataUpdater = metadataUpdater; this.lookupClient = lookupClient; this.lookupColumnNames = lookupColumnNames; + this.insertIfNotExists = insertIfNotExists; + } + + @Override + public Lookup enableInsertIfNotExists() { + if (lookupColumnNames != null) { + throw new IllegalArgumentException( + "insertIfNotExists can not be used with prefix lookup"); + } + + if (tableInfo.getSchema().getColumns().stream() + .filter(column -> column.getDataType().isNullable()) + .filter(column -> !tableInfo.getPrimaryKeys().contains(column.getName())) + .anyMatch( + column -> + !tableInfo + .getSchema() + .getAutoIncrementColumnNames() + .contains(column.getName()))) { + throw new IllegalArgumentException( + "insertIfNotExists cannot be enabled for tables with nullable columns besides primary key and auto increment columns."); + } + + return new TableLookup(tableInfo, schemaGetter, metadataUpdater, lookupClient, true); } @Override public Lookup lookupBy(List lookupColumnNames) { + if (insertIfNotExists) { + throw new IllegalArgumentException( + "insertIfNotExists can not be used with prefix lookup"); + } return new TableLookup( tableInfo, schemaGetter, metadataUpdater, lookupClient, lookupColumnNames); } @@ -65,7 +114,8 @@ public Lookup lookupBy(List lookupColumnNames) { @Override public Lookuper createLookuper() { if (lookupColumnNames == null) { - return new PrimaryKeyLookuper(tableInfo, schemaGetter, metadataUpdater, lookupClient); + return new PrimaryKeyLookuper( + tableInfo, schemaGetter, metadataUpdater, lookupClient, insertIfNotExists); } else { return new PrefixKeyLookuper( tableInfo, schemaGetter, metadataUpdater, lookupClient, lookupColumnNames); diff --git a/fluss-client/src/main/java/org/apache/fluss/client/utils/ClientRpcMessageUtils.java b/fluss-client/src/main/java/org/apache/fluss/client/utils/ClientRpcMessageUtils.java index 2fae90d4b5..9e14d631db 100644 --- a/fluss-client/src/main/java/org/apache/fluss/client/utils/ClientRpcMessageUtils.java +++ b/fluss-client/src/main/java/org/apache/fluss/client/utils/ClientRpcMessageUtils.java @@ -176,6 +176,11 @@ public static LookupRequest makeLookupRequest( pbLookupReqForBucket.setPartitionId(tb.getPartitionId()); } batch.lookups().forEach(get -> pbLookupReqForBucket.addKey(get.key())); + batch.lookups() + .forEach( + get -> + pbLookupReqForBucket.setInsertIfNotExists( + get.insertIfNotExists())); }); return request; } diff --git a/fluss-rpc/src/main/proto/FlussApi.proto b/fluss-rpc/src/main/proto/FlussApi.proto index db9d614354..3d51e7985d 100644 --- a/fluss-rpc/src/main/proto/FlussApi.proto +++ b/fluss-rpc/src/main/proto/FlussApi.proto @@ -744,6 +744,7 @@ message PbLookupReqForBucket { optional int64 partition_id = 1; required int32 bucket_id = 2; repeated bytes keys = 3; + optional bool insert_if_not_exists = 4; } message PbLookupRespForBucket {