From 61d26c70661fb38aab4a8e2d01f6c1761e565e40 Mon Sep 17 00:00:00 2001 From: skyflow-vivek Date: Wed, 10 Sep 2025 13:37:13 +0530 Subject: [PATCH] SK-2289 Fix upsert operation not working issue --- .../java/com/skyflow/errors/ErrorMessage.java | 1 + .../main/java/com/skyflow/logs/ErrorLogs.java | 1 + v3/src/main/java/com/skyflow/VaultClient.java | 9 ++-- .../utils/validations/Validations.java | 12 +++--- .../vault/controller/VaultController.java | 28 ++++++++----- .../controller/VaultControllerTests.java | 41 ++++++++++++++----- .../com/skyflow/vault/data/InsertTests.java | 4 +- 7 files changed, 62 insertions(+), 34 deletions(-) diff --git a/common/src/main/java/com/skyflow/errors/ErrorMessage.java b/common/src/main/java/com/skyflow/errors/ErrorMessage.java index 4998936b..459998ab 100644 --- a/common/src/main/java/com/skyflow/errors/ErrorMessage.java +++ b/common/src/main/java/com/skyflow/errors/ErrorMessage.java @@ -68,6 +68,7 @@ public enum ErrorMessage { EmptyKeyInTokens("%s0 Validation error. Invalid key tokens. Specify a valid key."), EmptyValueInTokens("%s0 Validation error. Invalid value in tokens. Specify a valid value."), EmptyUpsert("%s0 Validation error. 'upsert' key can't be empty. Specify an upsert column."), + EmptyUpsertValues("%s0 Validation error. Upsert column values can't be empty. Specify at least one upsert column."), HomogenousNotSupportedWithUpsert("%s0 Validation error. 'homogenous' is not supported with 'upsert'. Specify either 'homogenous' or 'upsert'."), TokensPassedForTokenModeDisable("%s0 Validation error. 'tokenMode' wasn't specified. Set 'tokenMode' to 'ENABLE' to insert tokens."), NoTokensWithTokenMode("%s0 Validation error. Tokens weren't specified for records while 'tokenMode' was %s1. Specify tokens."), diff --git a/common/src/main/java/com/skyflow/logs/ErrorLogs.java b/common/src/main/java/com/skyflow/logs/ErrorLogs.java index 5cde7d96..fa7bd5dd 100644 --- a/common/src/main/java/com/skyflow/logs/ErrorLogs.java +++ b/common/src/main/java/com/skyflow/logs/ErrorLogs.java @@ -56,6 +56,7 @@ public enum ErrorLogs { EMPTY_OR_NULL_VALUE_IN_VALUES("Invalid %s1 request. Value can not be null or empty in values for key \"%s2\"."), EMPTY_OR_NULL_KEY_IN_VALUES("Invalid %s1 request. Key can not be null or empty in values"), EMPTY_UPSERT("Invalid %s1 request. Upsert can not be empty."), + EMPTY_UPSERT_VALUES("Invalid %s1 request. Upsert values can not be empty."), HOMOGENOUS_NOT_SUPPORTED_WITH_UPSERT("Invalid %s1 request. Homogenous is not supported when upsert is passed."), TOKENS_NOT_ALLOWED_WITH_TOKEN_MODE_DISABLE("Invalid %s1 request. Tokens are not allowed when tokenMode is DISABLE."), TOKENS_REQUIRED_WITH_TOKEN_MODE("Invalid %s1 request. Tokens are required when tokenMode is %s2."), diff --git a/v3/src/main/java/com/skyflow/VaultClient.java b/v3/src/main/java/com/skyflow/VaultClient.java index b53143fd..41d65e37 100644 --- a/v3/src/main/java/com/skyflow/VaultClient.java +++ b/v3/src/main/java/com/skyflow/VaultClient.java @@ -131,6 +131,7 @@ protected void updateExecutorInHTTP() { .build(); apiClientBuilder.httpClient(httpClient); } + protected InsertRequest getBulkInsertRequestBody(com.skyflow.vault.data.InsertRequest request, VaultConfig config) throws SkyflowException { List> values = request.getValues(); List insertRecordDataList = new ArrayList<>(); @@ -142,12 +143,12 @@ protected InsertRequest getBulkInsertRequestBody(com.skyflow.vault.data.InsertRe .vaultId(config.getVaultId()) .records(insertRecordDataList) .tableName(request.getTable()); - if(request.getUpsert() != null && !request.getUpsert().isEmpty()){ + if (request.getUpsert() != null && !request.getUpsert().isEmpty()) { if (request.getUpsertType() != null) { EnumUpdateType updateType = null; - if(request.getUpsertType() == UpdateType.REPLACE){ + if (request.getUpsertType() == UpdateType.REPLACE) { updateType = EnumUpdateType.REPLACE; - } else if (request.getUpsertType() == UpdateType.REPLACE) { + } else if (request.getUpsertType() == UpdateType.UPDATE) { updateType = EnumUpdateType.UPDATE; } Upsert upsert = Upsert.builder().uniqueColumns(request.getUpsert()).updateType(updateType).build(); @@ -167,7 +168,7 @@ protected com.skyflow.generated.rest.resources.recordservice.requests.Detokenize com.skyflow.generated.rest.resources.recordservice.requests.DetokenizeRequest.builder() .vaultId(this.vaultConfig.getVaultId()) .tokens(tokens); - if (request.getTokenGroupRedactions() != null){ + if (request.getTokenGroupRedactions() != null) { List tokenGroupRedactionsList = new ArrayList<>(); for (com.skyflow.vault.data.TokenGroupRedactions tokenGroupRedactions : request.getTokenGroupRedactions()) { com.skyflow.generated.rest.types.TokenGroupRedactions redactions = diff --git a/v3/src/main/java/com/skyflow/utils/validations/Validations.java b/v3/src/main/java/com/skyflow/utils/validations/Validations.java index 4e278db3..058a002e 100644 --- a/v3/src/main/java/com/skyflow/utils/validations/Validations.java +++ b/v3/src/main/java/com/skyflow/utils/validations/Validations.java @@ -7,7 +7,6 @@ import com.skyflow.errors.ErrorMessage; import com.skyflow.errors.SkyflowException; import com.skyflow.logs.ErrorLogs; -import com.skyflow.utils.BaseUtils; import com.skyflow.utils.Utils; import com.skyflow.utils.logger.LogUtil; import com.skyflow.vault.data.DetokenizeRequest; @@ -23,7 +22,6 @@ private Validations() { super(); } - // add validations specific to v3 SDK public static void validateInsertRequest(InsertRequest insertRequest) throws SkyflowException { String table = insertRequest.getTable(); ArrayList> values = insertRequest.getValues(); @@ -49,15 +47,15 @@ public static void validateInsertRequest(InsertRequest insertRequest) throws Sky ErrorLogs.EMPTY_VALUES.getLog(), InterfaceName.INSERT.getName() )); throw new SkyflowException(ErrorCode.INVALID_INPUT.getCode(), ErrorMessage.EmptyValues.getMessage()); - } else if(values.size() > 10000) { + } else if (values.size() > 10000) { LogUtil.printErrorLog(ErrorLogs.RECORD_SIZE_EXCEED.getLog()); throw new SkyflowException(ErrorCode.INVALID_INPUT.getCode(), ErrorMessage.RecordSizeExceedError.getMessage()); - } else if (upsert != null && upsert.isEmpty()){ + } else if (upsert != null && upsert.isEmpty()) { LogUtil.printErrorLog(Utils.parameterizedString( - ErrorLogs.EMPTY_UPSERT.getLog(), InterfaceName.INSERT.getName() + ErrorLogs.EMPTY_UPSERT_VALUES.getLog(), InterfaceName.INSERT.getName() )); + throw new SkyflowException(ErrorCode.INVALID_INPUT.getCode(), ErrorMessage.EmptyUpsertValues.getMessage()); } - // upsert for (HashMap valuesMap : values) { for (String key : valuesMap.keySet()) { @@ -88,7 +86,7 @@ public static void validateDetokenizeRequest(DetokenizeRequest request) throws S throw new SkyflowException(ErrorCode.INVALID_INPUT.getCode(), ErrorMessage.DetokenizeRequestNull.getMessage()); } List tokens = request.getTokens(); - if(tokens.size() > 10000) { + if (tokens.size() > 10000) { LogUtil.printErrorLog(ErrorLogs.TOKENS_SIZE_EXCEED.getLog()); throw new SkyflowException(ErrorCode.INVALID_INPUT.getCode(), ErrorMessage.TokensSizeExceedError.getMessage()); } diff --git a/v3/src/main/java/com/skyflow/vault/controller/VaultController.java b/v3/src/main/java/com/skyflow/vault/controller/VaultController.java index 952cd3ee..6b5af623 100644 --- a/v3/src/main/java/com/skyflow/vault/controller/VaultController.java +++ b/v3/src/main/java/com/skyflow/vault/controller/VaultController.java @@ -9,6 +9,7 @@ import com.skyflow.generated.rest.core.ApiClientApiException; import com.skyflow.generated.rest.types.InsertRecordData; import com.skyflow.generated.rest.types.InsertResponse; +import com.skyflow.generated.rest.types.Upsert; import com.skyflow.logs.ErrorLogs; import com.skyflow.logs.InfoLogs; import com.skyflow.logs.WarningLogs; @@ -23,7 +24,10 @@ import java.util.Collections; import java.util.HashMap; import java.util.List; -import java.util.concurrent.*; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; import static com.skyflow.utils.Utils.*; @@ -125,7 +129,7 @@ public DetokenizeResponse bulkDetokenize(DetokenizeRequest detokenizeRequest) th } } - public CompletableFuture bulkDetokenizeAsync(DetokenizeRequest detokenizeRequest) throws SkyflowException{ + public CompletableFuture bulkDetokenizeAsync(DetokenizeRequest detokenizeRequest) throws SkyflowException { LogUtil.printInfoLog(InfoLogs.DETOKENIZE_TRIGGERED.getLog()); ExecutorService executor = Executors.newFixedThreadPool(detokenizeConcurrencyLimit); try { @@ -161,13 +165,14 @@ public CompletableFuture bulkDetokenizeAsync(DetokenizeReque executor.shutdown(); return new DetokenizeResponse(successRecords, errorTokens, detokenizeRequest.getTokens()); }); - } catch (Exception e){ + } catch (Exception e) { LogUtil.printErrorLog(ErrorLogs.DETOKENIZE_REQUEST_REJECTED.getLog()); throw new SkyflowException(e.getMessage()); } finally { executor.shutdown(); } } + private com.skyflow.vault.data.InsertResponse processSync( com.skyflow.generated.rest.resources.recordservice.requests.InsertRequest insertRequest, ArrayList> originalPayload @@ -207,11 +212,11 @@ private DetokenizeResponse processDetokenizeSync( List batches = Utils.createDetokenizeBatches(detokenizeRequest, detokenizeBatchSize); try { List> futures = this.detokenizeBatchFutures(executor, batches, errorTokens); - try{ + try { CompletableFuture allFutures = CompletableFuture.allOf(futures.toArray(new CompletableFuture[0])); allFutures.join(); - } catch (Exception e){ + } catch (Exception e) { } for (CompletableFuture future : futures) { DetokenizeResponse futureResponse = future.get(); @@ -224,7 +229,7 @@ private DetokenizeResponse processDetokenizeSync( } } } - } catch (Exception e){ + } catch (Exception e) { LogUtil.printErrorLog(ErrorLogs.DETOKENIZE_REQUEST_REJECTED.getLog()); throw new SkyflowException(e.getMessage()); } finally { @@ -240,7 +245,7 @@ private List> detokenizeBatchFutures(Execu try { for (int batchIndex = 0; batchIndex < batches.size(); batchIndex++) { - com.skyflow.generated.rest.resources.recordservice.requests.DetokenizeRequest batch = batches.get(batchIndex); + com.skyflow.generated.rest.resources.recordservice.requests.DetokenizeRequest batch = batches.get(batchIndex); int batchNumber = batchIndex; CompletableFuture future = CompletableFuture .supplyAsync(() -> processDetokenizeBatch(batch), executor) @@ -251,12 +256,13 @@ private List> detokenizeBatchFutures(Execu }); futures.add(future); } - } catch (Exception e){ + } catch (Exception e) { ErrorRecord errorRecord = new ErrorRecord(0, e.getMessage(), 500); errorTokens.add(errorRecord); } return futures; } + private com.skyflow.generated.rest.types.DetokenizeResponse processDetokenizeBatch(com.skyflow.generated.rest.resources.recordservice.requests.DetokenizeRequest batch) { return this.getRecordsApi().detokenize(batch); } @@ -270,13 +276,14 @@ private com.skyflow.generated.rest.types.DetokenizeResponse processDetokenizeBat ExecutorService executor = Executors.newFixedThreadPool(insertConcurrencyLimit); List> batches = Utils.createBatches(records, insertBatchSize); List> futures = new ArrayList<>(); + Upsert upsert = insertRequest.getUpsert().isPresent() ? insertRequest.getUpsert().get() : null; try { for (int batchIndex = 0; batchIndex < batches.size(); batchIndex++) { List batch = batches.get(batchIndex); int batchNumber = batchIndex; CompletableFuture future = CompletableFuture - .supplyAsync(() -> insertBatch(batch, insertRequest.getTableName().get()), executor) + .supplyAsync(() -> insertBatch(batch, insertRequest.getTableName().get(), upsert), executor) .thenApply(response -> formatResponse(response, batchNumber, insertBatchSize)) .exceptionally(ex -> { errorRecords.addAll(handleBatchException(ex, batch, batchNumber)); @@ -290,11 +297,12 @@ private com.skyflow.generated.rest.types.DetokenizeResponse processDetokenizeBat return futures; } - private InsertResponse insertBatch(List batch, String tableName) { + private InsertResponse insertBatch(List batch, String tableName, Upsert upsert) { com.skyflow.generated.rest.resources.recordservice.requests.InsertRequest req = com.skyflow.generated.rest.resources.recordservice.requests.InsertRequest.builder() .vaultId(this.getVaultConfig().getVaultId()) .tableName(tableName) .records(batch) + .upsert(upsert) .build(); return this.getRecordsApi().insert(req); } diff --git a/v3/test/java/com/skyflow/vault/controller/VaultControllerTests.java b/v3/test/java/com/skyflow/vault/controller/VaultControllerTests.java index 4467057c..811a9891 100644 --- a/v3/test/java/com/skyflow/vault/controller/VaultControllerTests.java +++ b/v3/test/java/com/skyflow/vault/controller/VaultControllerTests.java @@ -2,16 +2,25 @@ import com.skyflow.config.Credentials; import com.skyflow.config.VaultConfig; +import com.skyflow.errors.ErrorCode; +import com.skyflow.errors.ErrorMessage; import com.skyflow.errors.SkyflowException; import com.skyflow.utils.Constants; import com.skyflow.utils.validations.Validations; import com.skyflow.vault.data.InsertRequest; -import org.junit.*; +import org.junit.After; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; + import java.io.FileWriter; import java.io.IOException; -import java.lang.reflect.Method; import java.lang.reflect.Field; -import java.util.*; +import java.lang.reflect.Method; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.Scanner; + import static org.junit.Assert.*; @@ -80,7 +89,7 @@ private void invokeConfigureInsertConcurrencyAndBatchSize(VaultController contro method.invoke(controller, totalRequests); } - private ArrayList> generateValues(int noOfRecords){ + private ArrayList> generateValues(int noOfRecords) { ArrayList> values = new ArrayList<>(); for (int i = 0; i < noOfRecords; i++) { values.add(new HashMap<>()); @@ -135,9 +144,19 @@ public void testValidation_valuesIsEmpty() { @Test public void testValidation_upsertIsEmpty() throws SkyflowException { - InsertRequest req = InsertRequest.builder().table("table1").values(generateValues(1)).upsert(new ArrayList<>()).build(); - // Should not throw, just logs a warning - Validations.validateInsertRequest(req); + try { + InsertRequest req = InsertRequest.builder() + .table("table1") + .values(generateValues(1)) + .upsert(new ArrayList<>()) + .build(); + Validations.validateInsertRequest(req); + } catch (SkyflowException e) { + Assert.assertEquals(ErrorCode.INVALID_INPUT.getCode(), e.getHttpCode()); + Assert.assertEquals(ErrorMessage.EmptyUpsertValues.getMessage(), e.getMessage()); + } catch (Exception e) { + Assert.fail(e.getMessage()); + } } @@ -299,8 +318,6 @@ public void testConcurrencyZeroOrNegative() throws Exception { assertEquals(min, getPrivateInt(controller, "insertConcurrencyLimit")); - - writeEnv("INSERT_CONCURRENCY_LIMIT=-5"); try { @@ -356,7 +373,8 @@ public void testHighConcurrencyForLowRecords() throws Exception { try { controller.bulkInsert(insertRequest); - } catch (Exception ignored) {} + } catch (Exception ignored) { + } // Only 10 batches needed, so concurrency should be clamped to 10 assertEquals(1000, getPrivateInt(controller, "insertBatchSize")); @@ -372,7 +390,8 @@ public void testFractionalLastBatch() throws Exception { try { controller.bulkInsert(insertRequest); - } catch (Exception ignored) {} + } catch (Exception ignored) { + } // Last batch should have 50 records, concurrency should be 101 assertEquals(100, getPrivateInt(controller, "insertBatchSize")); diff --git a/v3/test/java/com/skyflow/vault/data/InsertTests.java b/v3/test/java/com/skyflow/vault/data/InsertTests.java index 7ea17d89..ff71fa19 100644 --- a/v3/test/java/com/skyflow/vault/data/InsertTests.java +++ b/v3/test/java/com/skyflow/vault/data/InsertTests.java @@ -211,11 +211,11 @@ public void testEmptyUpsertInInsertRequestValidations() { InsertRequest request = InsertRequest.builder().table(table).values(values).upsert(new ArrayList<>()).build(); try { Validations.validateInsertRequest(request); -// Assert.fail(EXCEPTION_NOT_THROWN); + Assert.fail(EXCEPTION_NOT_THROWN); } catch (SkyflowException e) { Assert.assertEquals(ErrorCode.INVALID_INPUT.getCode(), e.getHttpCode()); Assert.assertEquals( - Utils.parameterizedString(ErrorMessage.EmptyUpsert.getMessage(), Constants.SDK_PREFIX), + Utils.parameterizedString(ErrorMessage.EmptyUpsertValues.getMessage(), Constants.SDK_PREFIX), e.getMessage() ); }