diff --git a/common/src/main/java/com/skyflow/errors/ErrorMessage.java b/common/src/main/java/com/skyflow/errors/ErrorMessage.java
index 4998936b..459998ab 100644
--- a/common/src/main/java/com/skyflow/errors/ErrorMessage.java
+++ b/common/src/main/java/com/skyflow/errors/ErrorMessage.java
@@ -68,6 +68,7 @@ public enum ErrorMessage {
EmptyKeyInTokens("%s0 Validation error. Invalid key tokens. Specify a valid key."),
EmptyValueInTokens("%s0 Validation error. Invalid value in tokens. Specify a valid value."),
EmptyUpsert("%s0 Validation error. 'upsert' key can't be empty. Specify an upsert column."),
+ EmptyUpsertValues("%s0 Validation error. Upsert column values can't be empty. Specify at least one upsert column."),
HomogenousNotSupportedWithUpsert("%s0 Validation error. 'homogenous' is not supported with 'upsert'. Specify either 'homogenous' or 'upsert'."),
TokensPassedForTokenModeDisable("%s0 Validation error. 'tokenMode' wasn't specified. Set 'tokenMode' to 'ENABLE' to insert tokens."),
NoTokensWithTokenMode("%s0 Validation error. Tokens weren't specified for records while 'tokenMode' was %s1. Specify tokens."),
diff --git a/common/src/main/java/com/skyflow/logs/ErrorLogs.java b/common/src/main/java/com/skyflow/logs/ErrorLogs.java
index 5cde7d96..fa7bd5dd 100644
--- a/common/src/main/java/com/skyflow/logs/ErrorLogs.java
+++ b/common/src/main/java/com/skyflow/logs/ErrorLogs.java
@@ -56,6 +56,7 @@ public enum ErrorLogs {
EMPTY_OR_NULL_VALUE_IN_VALUES("Invalid %s1 request. Value can not be null or empty in values for key \"%s2\"."),
EMPTY_OR_NULL_KEY_IN_VALUES("Invalid %s1 request. Key can not be null or empty in values"),
EMPTY_UPSERT("Invalid %s1 request. Upsert can not be empty."),
+ EMPTY_UPSERT_VALUES("Invalid %s1 request. Upsert values can not be empty."),
HOMOGENOUS_NOT_SUPPORTED_WITH_UPSERT("Invalid %s1 request. Homogenous is not supported when upsert is passed."),
TOKENS_NOT_ALLOWED_WITH_TOKEN_MODE_DISABLE("Invalid %s1 request. Tokens are not allowed when tokenMode is DISABLE."),
TOKENS_REQUIRED_WITH_TOKEN_MODE("Invalid %s1 request. Tokens are required when tokenMode is %s2."),
diff --git a/v3/pom.xml b/v3/pom.xml
index 166c7d24..7355af08 100644
--- a/v3/pom.xml
+++ b/v3/pom.xml
@@ -11,7 +11,7 @@
skyflow-java
- 2.0.0-beta.4-dev.4b56b3d
+ 2.0.0-beta.4-dev.d3257dc
jar
${project.groupId}:${project.artifactId}
Skyflow V3 SDK for the Java programming language
diff --git a/v3/src/main/java/com/skyflow/VaultClient.java b/v3/src/main/java/com/skyflow/VaultClient.java
index b53143fd..41d65e37 100644
--- a/v3/src/main/java/com/skyflow/VaultClient.java
+++ b/v3/src/main/java/com/skyflow/VaultClient.java
@@ -131,6 +131,7 @@ protected void updateExecutorInHTTP() {
.build();
apiClientBuilder.httpClient(httpClient);
}
+
protected InsertRequest getBulkInsertRequestBody(com.skyflow.vault.data.InsertRequest request, VaultConfig config) throws SkyflowException {
List> values = request.getValues();
List insertRecordDataList = new ArrayList<>();
@@ -142,12 +143,12 @@ protected InsertRequest getBulkInsertRequestBody(com.skyflow.vault.data.InsertRe
.vaultId(config.getVaultId())
.records(insertRecordDataList)
.tableName(request.getTable());
- if(request.getUpsert() != null && !request.getUpsert().isEmpty()){
+ if (request.getUpsert() != null && !request.getUpsert().isEmpty()) {
if (request.getUpsertType() != null) {
EnumUpdateType updateType = null;
- if(request.getUpsertType() == UpdateType.REPLACE){
+ if (request.getUpsertType() == UpdateType.REPLACE) {
updateType = EnumUpdateType.REPLACE;
- } else if (request.getUpsertType() == UpdateType.REPLACE) {
+ } else if (request.getUpsertType() == UpdateType.UPDATE) {
updateType = EnumUpdateType.UPDATE;
}
Upsert upsert = Upsert.builder().uniqueColumns(request.getUpsert()).updateType(updateType).build();
@@ -167,7 +168,7 @@ protected com.skyflow.generated.rest.resources.recordservice.requests.Detokenize
com.skyflow.generated.rest.resources.recordservice.requests.DetokenizeRequest.builder()
.vaultId(this.vaultConfig.getVaultId())
.tokens(tokens);
- if (request.getTokenGroupRedactions() != null){
+ if (request.getTokenGroupRedactions() != null) {
List tokenGroupRedactionsList = new ArrayList<>();
for (com.skyflow.vault.data.TokenGroupRedactions tokenGroupRedactions : request.getTokenGroupRedactions()) {
com.skyflow.generated.rest.types.TokenGroupRedactions redactions =
diff --git a/v3/src/main/java/com/skyflow/utils/validations/Validations.java b/v3/src/main/java/com/skyflow/utils/validations/Validations.java
index 4e278db3..058a002e 100644
--- a/v3/src/main/java/com/skyflow/utils/validations/Validations.java
+++ b/v3/src/main/java/com/skyflow/utils/validations/Validations.java
@@ -7,7 +7,6 @@
import com.skyflow.errors.ErrorMessage;
import com.skyflow.errors.SkyflowException;
import com.skyflow.logs.ErrorLogs;
-import com.skyflow.utils.BaseUtils;
import com.skyflow.utils.Utils;
import com.skyflow.utils.logger.LogUtil;
import com.skyflow.vault.data.DetokenizeRequest;
@@ -23,7 +22,6 @@ private Validations() {
super();
}
- // add validations specific to v3 SDK
public static void validateInsertRequest(InsertRequest insertRequest) throws SkyflowException {
String table = insertRequest.getTable();
ArrayList> values = insertRequest.getValues();
@@ -49,15 +47,15 @@ public static void validateInsertRequest(InsertRequest insertRequest) throws Sky
ErrorLogs.EMPTY_VALUES.getLog(), InterfaceName.INSERT.getName()
));
throw new SkyflowException(ErrorCode.INVALID_INPUT.getCode(), ErrorMessage.EmptyValues.getMessage());
- } else if(values.size() > 10000) {
+ } else if (values.size() > 10000) {
LogUtil.printErrorLog(ErrorLogs.RECORD_SIZE_EXCEED.getLog());
throw new SkyflowException(ErrorCode.INVALID_INPUT.getCode(), ErrorMessage.RecordSizeExceedError.getMessage());
- } else if (upsert != null && upsert.isEmpty()){
+ } else if (upsert != null && upsert.isEmpty()) {
LogUtil.printErrorLog(Utils.parameterizedString(
- ErrorLogs.EMPTY_UPSERT.getLog(), InterfaceName.INSERT.getName()
+ ErrorLogs.EMPTY_UPSERT_VALUES.getLog(), InterfaceName.INSERT.getName()
));
+ throw new SkyflowException(ErrorCode.INVALID_INPUT.getCode(), ErrorMessage.EmptyUpsertValues.getMessage());
}
- // upsert
for (HashMap valuesMap : values) {
for (String key : valuesMap.keySet()) {
@@ -88,7 +86,7 @@ public static void validateDetokenizeRequest(DetokenizeRequest request) throws S
throw new SkyflowException(ErrorCode.INVALID_INPUT.getCode(), ErrorMessage.DetokenizeRequestNull.getMessage());
}
List tokens = request.getTokens();
- if(tokens.size() > 10000) {
+ if (tokens.size() > 10000) {
LogUtil.printErrorLog(ErrorLogs.TOKENS_SIZE_EXCEED.getLog());
throw new SkyflowException(ErrorCode.INVALID_INPUT.getCode(), ErrorMessage.TokensSizeExceedError.getMessage());
}
diff --git a/v3/src/main/java/com/skyflow/vault/controller/VaultController.java b/v3/src/main/java/com/skyflow/vault/controller/VaultController.java
index 952cd3ee..6b5af623 100644
--- a/v3/src/main/java/com/skyflow/vault/controller/VaultController.java
+++ b/v3/src/main/java/com/skyflow/vault/controller/VaultController.java
@@ -9,6 +9,7 @@
import com.skyflow.generated.rest.core.ApiClientApiException;
import com.skyflow.generated.rest.types.InsertRecordData;
import com.skyflow.generated.rest.types.InsertResponse;
+import com.skyflow.generated.rest.types.Upsert;
import com.skyflow.logs.ErrorLogs;
import com.skyflow.logs.InfoLogs;
import com.skyflow.logs.WarningLogs;
@@ -23,7 +24,10 @@
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
-import java.util.concurrent.*;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
import static com.skyflow.utils.Utils.*;
@@ -125,7 +129,7 @@ public DetokenizeResponse bulkDetokenize(DetokenizeRequest detokenizeRequest) th
}
}
- public CompletableFuture bulkDetokenizeAsync(DetokenizeRequest detokenizeRequest) throws SkyflowException{
+ public CompletableFuture bulkDetokenizeAsync(DetokenizeRequest detokenizeRequest) throws SkyflowException {
LogUtil.printInfoLog(InfoLogs.DETOKENIZE_TRIGGERED.getLog());
ExecutorService executor = Executors.newFixedThreadPool(detokenizeConcurrencyLimit);
try {
@@ -161,13 +165,14 @@ public CompletableFuture bulkDetokenizeAsync(DetokenizeReque
executor.shutdown();
return new DetokenizeResponse(successRecords, errorTokens, detokenizeRequest.getTokens());
});
- } catch (Exception e){
+ } catch (Exception e) {
LogUtil.printErrorLog(ErrorLogs.DETOKENIZE_REQUEST_REJECTED.getLog());
throw new SkyflowException(e.getMessage());
} finally {
executor.shutdown();
}
}
+
private com.skyflow.vault.data.InsertResponse processSync(
com.skyflow.generated.rest.resources.recordservice.requests.InsertRequest insertRequest,
ArrayList> originalPayload
@@ -207,11 +212,11 @@ private DetokenizeResponse processDetokenizeSync(
List batches = Utils.createDetokenizeBatches(detokenizeRequest, detokenizeBatchSize);
try {
List> futures = this.detokenizeBatchFutures(executor, batches, errorTokens);
- try{
+ try {
CompletableFuture allFutures = CompletableFuture.allOf(futures.toArray(new CompletableFuture[0]));
allFutures.join();
- } catch (Exception e){
+ } catch (Exception e) {
}
for (CompletableFuture future : futures) {
DetokenizeResponse futureResponse = future.get();
@@ -224,7 +229,7 @@ private DetokenizeResponse processDetokenizeSync(
}
}
}
- } catch (Exception e){
+ } catch (Exception e) {
LogUtil.printErrorLog(ErrorLogs.DETOKENIZE_REQUEST_REJECTED.getLog());
throw new SkyflowException(e.getMessage());
} finally {
@@ -240,7 +245,7 @@ private List> detokenizeBatchFutures(Execu
try {
for (int batchIndex = 0; batchIndex < batches.size(); batchIndex++) {
- com.skyflow.generated.rest.resources.recordservice.requests.DetokenizeRequest batch = batches.get(batchIndex);
+ com.skyflow.generated.rest.resources.recordservice.requests.DetokenizeRequest batch = batches.get(batchIndex);
int batchNumber = batchIndex;
CompletableFuture future = CompletableFuture
.supplyAsync(() -> processDetokenizeBatch(batch), executor)
@@ -251,12 +256,13 @@ private List> detokenizeBatchFutures(Execu
});
futures.add(future);
}
- } catch (Exception e){
+ } catch (Exception e) {
ErrorRecord errorRecord = new ErrorRecord(0, e.getMessage(), 500);
errorTokens.add(errorRecord);
}
return futures;
}
+
private com.skyflow.generated.rest.types.DetokenizeResponse processDetokenizeBatch(com.skyflow.generated.rest.resources.recordservice.requests.DetokenizeRequest batch) {
return this.getRecordsApi().detokenize(batch);
}
@@ -270,13 +276,14 @@ private com.skyflow.generated.rest.types.DetokenizeResponse processDetokenizeBat
ExecutorService executor = Executors.newFixedThreadPool(insertConcurrencyLimit);
List> batches = Utils.createBatches(records, insertBatchSize);
List> futures = new ArrayList<>();
+ Upsert upsert = insertRequest.getUpsert().isPresent() ? insertRequest.getUpsert().get() : null;
try {
for (int batchIndex = 0; batchIndex < batches.size(); batchIndex++) {
List batch = batches.get(batchIndex);
int batchNumber = batchIndex;
CompletableFuture future = CompletableFuture
- .supplyAsync(() -> insertBatch(batch, insertRequest.getTableName().get()), executor)
+ .supplyAsync(() -> insertBatch(batch, insertRequest.getTableName().get(), upsert), executor)
.thenApply(response -> formatResponse(response, batchNumber, insertBatchSize))
.exceptionally(ex -> {
errorRecords.addAll(handleBatchException(ex, batch, batchNumber));
@@ -290,11 +297,12 @@ private com.skyflow.generated.rest.types.DetokenizeResponse processDetokenizeBat
return futures;
}
- private InsertResponse insertBatch(List batch, String tableName) {
+ private InsertResponse insertBatch(List batch, String tableName, Upsert upsert) {
com.skyflow.generated.rest.resources.recordservice.requests.InsertRequest req = com.skyflow.generated.rest.resources.recordservice.requests.InsertRequest.builder()
.vaultId(this.getVaultConfig().getVaultId())
.tableName(tableName)
.records(batch)
+ .upsert(upsert)
.build();
return this.getRecordsApi().insert(req);
}
diff --git a/v3/test/java/com/skyflow/vault/controller/VaultControllerTests.java b/v3/test/java/com/skyflow/vault/controller/VaultControllerTests.java
index 4467057c..811a9891 100644
--- a/v3/test/java/com/skyflow/vault/controller/VaultControllerTests.java
+++ b/v3/test/java/com/skyflow/vault/controller/VaultControllerTests.java
@@ -2,16 +2,25 @@
import com.skyflow.config.Credentials;
import com.skyflow.config.VaultConfig;
+import com.skyflow.errors.ErrorCode;
+import com.skyflow.errors.ErrorMessage;
import com.skyflow.errors.SkyflowException;
import com.skyflow.utils.Constants;
import com.skyflow.utils.validations.Validations;
import com.skyflow.vault.data.InsertRequest;
-import org.junit.*;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
import java.io.FileWriter;
import java.io.IOException;
-import java.lang.reflect.Method;
import java.lang.reflect.Field;
-import java.util.*;
+import java.lang.reflect.Method;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.Scanner;
+
import static org.junit.Assert.*;
@@ -80,7 +89,7 @@ private void invokeConfigureInsertConcurrencyAndBatchSize(VaultController contro
method.invoke(controller, totalRequests);
}
- private ArrayList> generateValues(int noOfRecords){
+ private ArrayList> generateValues(int noOfRecords) {
ArrayList> values = new ArrayList<>();
for (int i = 0; i < noOfRecords; i++) {
values.add(new HashMap<>());
@@ -135,9 +144,19 @@ public void testValidation_valuesIsEmpty() {
@Test
public void testValidation_upsertIsEmpty() throws SkyflowException {
- InsertRequest req = InsertRequest.builder().table("table1").values(generateValues(1)).upsert(new ArrayList<>()).build();
- // Should not throw, just logs a warning
- Validations.validateInsertRequest(req);
+ try {
+ InsertRequest req = InsertRequest.builder()
+ .table("table1")
+ .values(generateValues(1))
+ .upsert(new ArrayList<>())
+ .build();
+ Validations.validateInsertRequest(req);
+ } catch (SkyflowException e) {
+ Assert.assertEquals(ErrorCode.INVALID_INPUT.getCode(), e.getHttpCode());
+ Assert.assertEquals(ErrorMessage.EmptyUpsertValues.getMessage(), e.getMessage());
+ } catch (Exception e) {
+ Assert.fail(e.getMessage());
+ }
}
@@ -299,8 +318,6 @@ public void testConcurrencyZeroOrNegative() throws Exception {
assertEquals(min, getPrivateInt(controller, "insertConcurrencyLimit"));
-
-
writeEnv("INSERT_CONCURRENCY_LIMIT=-5");
try {
@@ -356,7 +373,8 @@ public void testHighConcurrencyForLowRecords() throws Exception {
try {
controller.bulkInsert(insertRequest);
- } catch (Exception ignored) {}
+ } catch (Exception ignored) {
+ }
// Only 10 batches needed, so concurrency should be clamped to 10
assertEquals(1000, getPrivateInt(controller, "insertBatchSize"));
@@ -372,7 +390,8 @@ public void testFractionalLastBatch() throws Exception {
try {
controller.bulkInsert(insertRequest);
- } catch (Exception ignored) {}
+ } catch (Exception ignored) {
+ }
// Last batch should have 50 records, concurrency should be 101
assertEquals(100, getPrivateInt(controller, "insertBatchSize"));
diff --git a/v3/test/java/com/skyflow/vault/data/InsertTests.java b/v3/test/java/com/skyflow/vault/data/InsertTests.java
index 7ea17d89..ff71fa19 100644
--- a/v3/test/java/com/skyflow/vault/data/InsertTests.java
+++ b/v3/test/java/com/skyflow/vault/data/InsertTests.java
@@ -211,11 +211,11 @@ public void testEmptyUpsertInInsertRequestValidations() {
InsertRequest request = InsertRequest.builder().table(table).values(values).upsert(new ArrayList<>()).build();
try {
Validations.validateInsertRequest(request);
-// Assert.fail(EXCEPTION_NOT_THROWN);
+ Assert.fail(EXCEPTION_NOT_THROWN);
} catch (SkyflowException e) {
Assert.assertEquals(ErrorCode.INVALID_INPUT.getCode(), e.getHttpCode());
Assert.assertEquals(
- Utils.parameterizedString(ErrorMessage.EmptyUpsert.getMessage(), Constants.SDK_PREFIX),
+ Utils.parameterizedString(ErrorMessage.EmptyUpsertValues.getMessage(), Constants.SDK_PREFIX),
e.getMessage()
);
}