diff --git a/common/src/main/java/com/skyflow/errors/ErrorMessage.java b/common/src/main/java/com/skyflow/errors/ErrorMessage.java index f4b2ce7b..459998ab 100644 --- a/common/src/main/java/com/skyflow/errors/ErrorMessage.java +++ b/common/src/main/java/com/skyflow/errors/ErrorMessage.java @@ -16,6 +16,8 @@ public enum ErrorMessage { EmptyVaultId("%s0 Initialization failed. Invalid vault ID. Vault ID must not be empty."), InvalidClusterId("%s0 Initialization failed. Invalid cluster ID. Specify cluster ID."), EmptyClusterId("%s0 Initialization failed. Invalid cluster ID. Specify a valid cluster ID."), + EmptyVaultUrl("%s0 Initialization failed. Vault URL is empty. Specify a valid vault URL."), + InvalidVaultUrlFormat("%s0 Initialization failed. Vault URL must start with 'https://'."), // Connection config InvalidConnectionId("%s0 Initialization failed. Invalid connection ID. Specify a valid connection ID."), @@ -66,6 +68,7 @@ public enum ErrorMessage { EmptyKeyInTokens("%s0 Validation error. Invalid key tokens. Specify a valid key."), EmptyValueInTokens("%s0 Validation error. Invalid value in tokens. Specify a valid value."), EmptyUpsert("%s0 Validation error. 'upsert' key can't be empty. Specify an upsert column."), + EmptyUpsertValues("%s0 Validation error. Upsert column values can't be empty. Specify at least one upsert column."), HomogenousNotSupportedWithUpsert("%s0 Validation error. 'homogenous' is not supported with 'upsert'. Specify either 'homogenous' or 'upsert'."), TokensPassedForTokenModeDisable("%s0 Validation error. 'tokenMode' wasn't specified. Set 'tokenMode' to 'ENABLE' to insert tokens."), NoTokensWithTokenMode("%s0 Validation error. Tokens weren't specified for records while 'tokenMode' was %s1. Specify tokens."), @@ -73,11 +76,13 @@ public enum ErrorMessage { InsufficientTokensPassedForTokenModeEnableStrict("%s0 Validation error. 'tokenMode' is set to 'ENABLE_STRICT', but some fields are missing tokens. Specify tokens for all fields."), BatchInsertPartialSuccess("%s0 Insert operation completed with partial success."), BatchInsertFailure("%s0 Insert operation failed."), + RecordSizeExceedError("%s0 Maximum number of records exceeded. The limit is 10000."), // Detokenize InvalidDetokenizeData("%s0 Validation error. Invalid detokenize data. Specify valid detokenize data."), EmptyDetokenizeData("%s0 Validation error. Invalid data tokens. Specify at least one data token."), EmptyTokenInDetokenizeData("%s0 Validation error. Invalid data tokens. Specify a valid data token."), + TokensSizeExceedError("%s0 Maximum number of tokens exceeded. The limit is 10000."), // Get IdsKeyError("%s0 Validation error. 'ids' key is missing from the payload. Specify an 'ids' key."), diff --git a/common/src/main/java/com/skyflow/logs/ErrorLogs.java b/common/src/main/java/com/skyflow/logs/ErrorLogs.java index 8f1156fd..fa7bd5dd 100644 --- a/common/src/main/java/com/skyflow/logs/ErrorLogs.java +++ b/common/src/main/java/com/skyflow/logs/ErrorLogs.java @@ -25,6 +25,8 @@ public enum ErrorLogs { EMPTY_ROLES("Invalid credentials. Roles can not be empty."), EMPTY_OR_NULL_ROLE_IN_ROLES("Invalid credentials. Role can not be null or empty in roles at index %s1."), EMPTY_OR_NULL_CONTEXT("Invalid credentials. Context can not be empty."), + EMPTY_VAULT_URL("Invalid vault config. Vault URL can not be empty."), + INVALID_VAULT_URL_FORMAT("Invalid vault config. Vault URL format is incorrect"), // Bearer token generation INVALID_BEARER_TOKEN("Bearer token is invalid or expired."), @@ -49,9 +51,12 @@ public enum ErrorLogs { EMPTY_TABLE_NAME("Invalid %s1 request. Table name can not be empty."), VALUES_IS_REQUIRED("Invalid %s1 request. Values are required."), EMPTY_VALUES("Invalid %s1 request. Values can not be empty."), + RECORD_SIZE_EXCEED("Maximum number of records exceeded. The limit is 10000."), + TOKENS_SIZE_EXCEED("Maximum number of tokens exceeded. The limit is 10000."), EMPTY_OR_NULL_VALUE_IN_VALUES("Invalid %s1 request. Value can not be null or empty in values for key \"%s2\"."), EMPTY_OR_NULL_KEY_IN_VALUES("Invalid %s1 request. Key can not be null or empty in values"), EMPTY_UPSERT("Invalid %s1 request. Upsert can not be empty."), + EMPTY_UPSERT_VALUES("Invalid %s1 request. Upsert values can not be empty."), HOMOGENOUS_NOT_SUPPORTED_WITH_UPSERT("Invalid %s1 request. Homogenous is not supported when upsert is passed."), TOKENS_NOT_ALLOWED_WITH_TOKEN_MODE_DISABLE("Invalid %s1 request. Tokens are not allowed when tokenMode is DISABLE."), TOKENS_REQUIRED_WITH_TOKEN_MODE("Invalid %s1 request. Tokens are required when tokenMode is %s2."), diff --git a/common/src/main/java/com/skyflow/utils/BaseUtils.java b/common/src/main/java/com/skyflow/utils/BaseUtils.java index b7cf6bb0..a13c9ab5 100644 --- a/common/src/main/java/com/skyflow/utils/BaseUtils.java +++ b/common/src/main/java/com/skyflow/utils/BaseUtils.java @@ -11,6 +11,8 @@ import com.skyflow.serviceaccount.util.BearerToken; import com.skyflow.serviceaccount.util.Token; import com.skyflow.utils.logger.LogUtil; +import io.github.cdimascio.dotenv.Dotenv; +import io.github.cdimascio.dotenv.DotenvException; import org.apache.commons.codec.binary.Base64; import java.io.File; @@ -43,6 +45,7 @@ public static String getVaultURL(String clusterId, Env env, String vaultDomain) return sb.toString(); } + public static String generateBearerToken(Credentials credentials) throws SkyflowException { String bearerToken; if (credentials.getPath() != null) { diff --git a/common/src/main/java/com/skyflow/utils/validations/BaseValidations.java b/common/src/main/java/com/skyflow/utils/validations/BaseValidations.java index 8460ec1d..410b99aa 100644 --- a/common/src/main/java/com/skyflow/utils/validations/BaseValidations.java +++ b/common/src/main/java/com/skyflow/utils/validations/BaseValidations.java @@ -34,7 +34,8 @@ public static void validateVaultConfig(VaultConfig vaultConfig) throws SkyflowEx } else if (clusterId.trim().isEmpty()) { LogUtil.printErrorLog(ErrorLogs.EMPTY_CLUSTER_ID.getLog()); throw new SkyflowException(ErrorCode.INVALID_INPUT.getCode(), ErrorMessage.EmptyClusterId.getMessage()); - } else if (credentials != null) { + } + else if (credentials != null) { validateCredentials(credentials); } } diff --git a/v3/pom.xml b/v3/pom.xml index 4655a067..fb409cb5 100644 --- a/v3/pom.xml +++ b/v3/pom.xml @@ -11,7 +11,7 @@ skyflow-java - 3.0.0-beta.3 + 3.0.0-beta.4 jar ${project.groupId}:${project.artifactId} Skyflow V3 SDK for the Java programming language diff --git a/v3/src/main/java/com/skyflow/Skyflow.java b/v3/src/main/java/com/skyflow/Skyflow.java index 7973c747..ed357bdd 100644 --- a/v3/src/main/java/com/skyflow/Skyflow.java +++ b/v3/src/main/java/com/skyflow/Skyflow.java @@ -56,7 +56,7 @@ public SkyflowClientBuilder() { public SkyflowClientBuilder addVaultConfig(VaultConfig vaultConfig) throws SkyflowException { LogUtil.printInfoLog(InfoLogs.VALIDATING_VAULT_CONFIG.getLog()); - Validations.validateVaultConfig(vaultConfig); + Validations.validateVaultConfiguration(vaultConfig); VaultConfig vaultConfigCopy; try { vaultConfigCopy = (VaultConfig) vaultConfig.clone(); diff --git a/v3/src/main/java/com/skyflow/VaultClient.java b/v3/src/main/java/com/skyflow/VaultClient.java index b43ccd60..41d65e37 100644 --- a/v3/src/main/java/com/skyflow/VaultClient.java +++ b/v3/src/main/java/com/skyflow/VaultClient.java @@ -39,7 +39,7 @@ public class VaultClient { private String token; private String apiKey; - protected VaultClient(VaultConfig vaultConfig, Credentials credentials) { + protected VaultClient(VaultConfig vaultConfig, Credentials credentials) throws SkyflowException { super(); this.vaultConfig = vaultConfig; this.commonCredentials = credentials; @@ -79,8 +79,11 @@ protected void setBearerToken() throws SkyflowException { this.apiClient = this.apiClientBuilder.build(); } - private void updateVaultURL() { - String vaultURL = Utils.getVaultURL(this.vaultConfig.getClusterId(), this.vaultConfig.getEnv()); + private void updateVaultURL() throws SkyflowException { + String vaultURL = Utils.getEnvVaultURL(); + if (vaultURL == null || vaultURL.isEmpty()) { + vaultURL = Utils.getVaultURL(this.vaultConfig.getClusterId(), this.vaultConfig.getEnv()); + } this.apiClientBuilder.url(vaultURL); } @@ -92,11 +95,13 @@ private void prioritiseCredentials() throws SkyflowException { } else if (this.commonCredentials != null) { this.finalCredentials = this.commonCredentials; } else { - Dotenv dotenv = Dotenv.load(); - String sysCredentials = dotenv.get(Constants.ENV_CREDENTIALS_KEY_NAME); + String sysCredentials = System.getenv(Constants.ENV_CREDENTIALS_KEY_NAME); + if (sysCredentials == null) { + Dotenv dotenv = Dotenv.load(); + sysCredentials = dotenv.get(Constants.ENV_CREDENTIALS_KEY_NAME); + } if (sysCredentials == null) { - throw new SkyflowException(ErrorCode.INVALID_INPUT.getCode(), - ErrorMessage.EmptyCredentials.getMessage()); + throw new SkyflowException(ErrorCode.INVALID_INPUT.getCode(), ErrorMessage.EmptyCredentials.getMessage()); } else { this.finalCredentials = new Credentials(); this.finalCredentials.setCredentialsString(sysCredentials); @@ -126,6 +131,7 @@ protected void updateExecutorInHTTP() { .build(); apiClientBuilder.httpClient(httpClient); } + protected InsertRequest getBulkInsertRequestBody(com.skyflow.vault.data.InsertRequest request, VaultConfig config) throws SkyflowException { List> values = request.getValues(); List insertRecordDataList = new ArrayList<>(); @@ -137,12 +143,12 @@ protected InsertRequest getBulkInsertRequestBody(com.skyflow.vault.data.InsertRe .vaultId(config.getVaultId()) .records(insertRecordDataList) .tableName(request.getTable()); - if(request.getUpsert() != null && !request.getUpsert().isEmpty()){ + if (request.getUpsert() != null && !request.getUpsert().isEmpty()) { if (request.getUpsertType() != null) { EnumUpdateType updateType = null; - if(request.getUpsertType() == UpdateType.REPLACE){ + if (request.getUpsertType() == UpdateType.REPLACE) { updateType = EnumUpdateType.REPLACE; - } else if (request.getUpsertType() == UpdateType.REPLACE) { + } else if (request.getUpsertType() == UpdateType.UPDATE) { updateType = EnumUpdateType.UPDATE; } Upsert upsert = Upsert.builder().uniqueColumns(request.getUpsert()).updateType(updateType).build(); @@ -162,7 +168,7 @@ protected com.skyflow.generated.rest.resources.recordservice.requests.Detokenize com.skyflow.generated.rest.resources.recordservice.requests.DetokenizeRequest.builder() .vaultId(this.vaultConfig.getVaultId()) .tokens(tokens); - if (request.getTokenGroupRedactions() != null){ + if (request.getTokenGroupRedactions() != null) { List tokenGroupRedactionsList = new ArrayList<>(); for (com.skyflow.vault.data.TokenGroupRedactions tokenGroupRedactions : request.getTokenGroupRedactions()) { com.skyflow.generated.rest.types.TokenGroupRedactions redactions = diff --git a/v3/src/main/java/com/skyflow/utils/Constants.java b/v3/src/main/java/com/skyflow/utils/Constants.java index e3c56ad3..cfc29792 100644 --- a/v3/src/main/java/com/skyflow/utils/Constants.java +++ b/v3/src/main/java/com/skyflow/utils/Constants.java @@ -2,7 +2,7 @@ public final class Constants extends BaseConstants { public static final String SDK_NAME = "Skyflow Java SDK "; - public static final String SDK_VERSION = "3.0.0-beta.3"; + public static final String SDK_VERSION = "3.0.0-beta.4"; public static final String VAULT_DOMAIN = ".skyvault."; public static final String SDK_PREFIX = SDK_NAME + SDK_VERSION; public static final Integer INSERT_BATCH_SIZE = 50; diff --git a/v3/src/main/java/com/skyflow/utils/Utils.java b/v3/src/main/java/com/skyflow/utils/Utils.java index f2142bd6..468f3ff6 100644 --- a/v3/src/main/java/com/skyflow/utils/Utils.java +++ b/v3/src/main/java/com/skyflow/utils/Utils.java @@ -2,16 +2,23 @@ import com.google.gson.JsonObject; import com.skyflow.enums.Env; +import com.skyflow.errors.ErrorCode; +import com.skyflow.errors.ErrorMessage; +import com.skyflow.errors.SkyflowException; import com.skyflow.generated.rest.core.ApiClientApiException; import com.skyflow.generated.rest.resources.recordservice.requests.DetokenizeRequest; import com.skyflow.generated.rest.types.InsertRecordData; import com.skyflow.generated.rest.types.InsertResponse; import com.skyflow.generated.rest.types.RecordResponseObject; import com.skyflow.generated.rest.types.TokenGroupRedactions; +import com.skyflow.logs.ErrorLogs; +import com.skyflow.utils.logger.LogUtil; import com.skyflow.vault.data.DetokenizeResponse; import com.skyflow.vault.data.ErrorRecord; import com.skyflow.vault.data.Success; import com.skyflow.vault.data.Token; +import io.github.cdimascio.dotenv.Dotenv; +import io.github.cdimascio.dotenv.DotenvException; import java.util.ArrayList; import java.util.HashMap; @@ -239,4 +246,24 @@ public static com.skyflow.vault.data.InsertResponse formatResponse(InsertRespons return formattedResponse; } + public static String getEnvVaultURL() throws SkyflowException { + try { + String vaultURL = System.getenv("VAULT_URL"); + if (vaultURL == null) { + Dotenv dotenv = Dotenv.load(); + vaultURL = dotenv.get("VAULT_URL"); + } + if (vaultURL != null && vaultURL.trim().isEmpty()) { + LogUtil.printErrorLog(ErrorLogs.EMPTY_VAULT_URL.getLog()); + throw new SkyflowException(ErrorCode.INVALID_INPUT.getCode(), ErrorMessage.EmptyVaultUrl.getMessage()); + } else if (vaultURL != null && !vaultURL.startsWith(BaseConstants.SECURE_PROTOCOL)) { + LogUtil.printErrorLog(ErrorLogs.INVALID_VAULT_URL_FORMAT.getLog()); + throw new SkyflowException( ErrorCode.INVALID_INPUT.getCode(), ErrorMessage.InvalidVaultUrlFormat.getMessage()); + } + return vaultURL; + } catch (DotenvException e) { + return null; + } + } + } diff --git a/v3/src/main/java/com/skyflow/utils/validations/Validations.java b/v3/src/main/java/com/skyflow/utils/validations/Validations.java index b7ea4bbd..058a002e 100644 --- a/v3/src/main/java/com/skyflow/utils/validations/Validations.java +++ b/v3/src/main/java/com/skyflow/utils/validations/Validations.java @@ -1,5 +1,7 @@ package com.skyflow.utils.validations; +import com.skyflow.config.Credentials; +import com.skyflow.config.VaultConfig; import com.skyflow.enums.InterfaceName; import com.skyflow.errors.ErrorCode; import com.skyflow.errors.ErrorMessage; @@ -20,7 +22,6 @@ private Validations() { super(); } - // add validations specific to v3 SDK public static void validateInsertRequest(InsertRequest insertRequest) throws SkyflowException { String table = insertRequest.getTable(); ArrayList> values = insertRequest.getValues(); @@ -46,12 +47,15 @@ public static void validateInsertRequest(InsertRequest insertRequest) throws Sky ErrorLogs.EMPTY_VALUES.getLog(), InterfaceName.INSERT.getName() )); throw new SkyflowException(ErrorCode.INVALID_INPUT.getCode(), ErrorMessage.EmptyValues.getMessage()); - } else if (upsert != null && upsert.isEmpty()){ + } else if (values.size() > 10000) { + LogUtil.printErrorLog(ErrorLogs.RECORD_SIZE_EXCEED.getLog()); + throw new SkyflowException(ErrorCode.INVALID_INPUT.getCode(), ErrorMessage.RecordSizeExceedError.getMessage()); + } else if (upsert != null && upsert.isEmpty()) { LogUtil.printErrorLog(Utils.parameterizedString( - ErrorLogs.EMPTY_UPSERT.getLog(), InterfaceName.INSERT.getName() + ErrorLogs.EMPTY_UPSERT_VALUES.getLog(), InterfaceName.INSERT.getName() )); + throw new SkyflowException(ErrorCode.INVALID_INPUT.getCode(), ErrorMessage.EmptyUpsertValues.getMessage()); } - // upsert for (HashMap valuesMap : values) { for (String key : valuesMap.keySet()) { @@ -82,6 +86,10 @@ public static void validateDetokenizeRequest(DetokenizeRequest request) throws S throw new SkyflowException(ErrorCode.INVALID_INPUT.getCode(), ErrorMessage.DetokenizeRequestNull.getMessage()); } List tokens = request.getTokens(); + if (tokens.size() > 10000) { + LogUtil.printErrorLog(ErrorLogs.TOKENS_SIZE_EXCEED.getLog()); + throw new SkyflowException(ErrorCode.INVALID_INPUT.getCode(), ErrorMessage.TokensSizeExceedError.getMessage()); + } if (tokens == null || tokens.isEmpty()) { LogUtil.printErrorLog(Utils.parameterizedString( ErrorLogs.EMPTY_DETOKENIZE_DATA.getLog(), InterfaceName.DETOKENIZE.getName() @@ -118,4 +126,26 @@ public static void validateDetokenizeRequest(DetokenizeRequest request) throws S } + public static void validateVaultConfiguration(VaultConfig vaultConfig) throws SkyflowException { + String vaultId = vaultConfig.getVaultId(); + String clusterId = vaultConfig.getClusterId(); + Credentials credentials = vaultConfig.getCredentials(); + if (vaultId == null) { + LogUtil.printErrorLog(ErrorLogs.VAULT_ID_IS_REQUIRED.getLog()); + throw new SkyflowException(ErrorCode.INVALID_INPUT.getCode(), ErrorMessage.InvalidVaultId.getMessage()); + } else if (vaultId.trim().isEmpty()) { + LogUtil.printErrorLog(ErrorLogs.EMPTY_VAULT_ID.getLog()); + throw new SkyflowException(ErrorCode.INVALID_INPUT.getCode(), ErrorMessage.EmptyVaultId.getMessage()); + } else if (Utils.getEnvVaultURL() == null) { + if (clusterId == null) { + LogUtil.printErrorLog(ErrorLogs.CLUSTER_ID_IS_REQUIRED.getLog()); + throw new SkyflowException(ErrorCode.INVALID_INPUT.getCode(), ErrorMessage.InvalidClusterId.getMessage()); + } else if (clusterId.trim().isEmpty()) { + LogUtil.printErrorLog(ErrorLogs.EMPTY_CLUSTER_ID.getLog()); + throw new SkyflowException(ErrorCode.INVALID_INPUT.getCode(), ErrorMessage.EmptyClusterId.getMessage()); + } + } else if (credentials != null) { + validateCredentials(credentials); + } + } } diff --git a/v3/src/main/java/com/skyflow/vault/controller/VaultController.java b/v3/src/main/java/com/skyflow/vault/controller/VaultController.java index 0ac80f4a..6b5af623 100644 --- a/v3/src/main/java/com/skyflow/vault/controller/VaultController.java +++ b/v3/src/main/java/com/skyflow/vault/controller/VaultController.java @@ -9,6 +9,7 @@ import com.skyflow.generated.rest.core.ApiClientApiException; import com.skyflow.generated.rest.types.InsertRecordData; import com.skyflow.generated.rest.types.InsertResponse; +import com.skyflow.generated.rest.types.Upsert; import com.skyflow.logs.ErrorLogs; import com.skyflow.logs.InfoLogs; import com.skyflow.logs.WarningLogs; @@ -23,7 +24,10 @@ import java.util.Collections; import java.util.HashMap; import java.util.List; -import java.util.concurrent.*; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; import static com.skyflow.utils.Utils.*; @@ -34,7 +38,7 @@ public final class VaultController extends VaultClient { private int detokenizeBatchSize; private int detokenizeConcurrencyLimit; - public VaultController(VaultConfig vaultConfig, Credentials credentials) { + public VaultController(VaultConfig vaultConfig, Credentials credentials) throws SkyflowException { super(vaultConfig, credentials); this.insertBatchSize = Constants.INSERT_BATCH_SIZE; this.insertConcurrencyLimit = Constants.INSERT_CONCURRENCY_LIMIT; @@ -125,7 +129,7 @@ public DetokenizeResponse bulkDetokenize(DetokenizeRequest detokenizeRequest) th } } - public CompletableFuture bulkDetokenizeAsync(DetokenizeRequest detokenizeRequest) throws SkyflowException{ + public CompletableFuture bulkDetokenizeAsync(DetokenizeRequest detokenizeRequest) throws SkyflowException { LogUtil.printInfoLog(InfoLogs.DETOKENIZE_TRIGGERED.getLog()); ExecutorService executor = Executors.newFixedThreadPool(detokenizeConcurrencyLimit); try { @@ -161,13 +165,14 @@ public CompletableFuture bulkDetokenizeAsync(DetokenizeReque executor.shutdown(); return new DetokenizeResponse(successRecords, errorTokens, detokenizeRequest.getTokens()); }); - } catch (Exception e){ + } catch (Exception e) { LogUtil.printErrorLog(ErrorLogs.DETOKENIZE_REQUEST_REJECTED.getLog()); throw new SkyflowException(e.getMessage()); } finally { executor.shutdown(); } } + private com.skyflow.vault.data.InsertResponse processSync( com.skyflow.generated.rest.resources.recordservice.requests.InsertRequest insertRequest, ArrayList> originalPayload @@ -207,11 +212,11 @@ private DetokenizeResponse processDetokenizeSync( List batches = Utils.createDetokenizeBatches(detokenizeRequest, detokenizeBatchSize); try { List> futures = this.detokenizeBatchFutures(executor, batches, errorTokens); - try{ + try { CompletableFuture allFutures = CompletableFuture.allOf(futures.toArray(new CompletableFuture[0])); allFutures.join(); - } catch (Exception e){ + } catch (Exception e) { } for (CompletableFuture future : futures) { DetokenizeResponse futureResponse = future.get(); @@ -224,7 +229,7 @@ private DetokenizeResponse processDetokenizeSync( } } } - } catch (Exception e){ + } catch (Exception e) { LogUtil.printErrorLog(ErrorLogs.DETOKENIZE_REQUEST_REJECTED.getLog()); throw new SkyflowException(e.getMessage()); } finally { @@ -240,7 +245,7 @@ private List> detokenizeBatchFutures(Execu try { for (int batchIndex = 0; batchIndex < batches.size(); batchIndex++) { - com.skyflow.generated.rest.resources.recordservice.requests.DetokenizeRequest batch = batches.get(batchIndex); + com.skyflow.generated.rest.resources.recordservice.requests.DetokenizeRequest batch = batches.get(batchIndex); int batchNumber = batchIndex; CompletableFuture future = CompletableFuture .supplyAsync(() -> processDetokenizeBatch(batch), executor) @@ -251,12 +256,13 @@ private List> detokenizeBatchFutures(Execu }); futures.add(future); } - } catch (Exception e){ + } catch (Exception e) { ErrorRecord errorRecord = new ErrorRecord(0, e.getMessage(), 500); errorTokens.add(errorRecord); } return futures; } + private com.skyflow.generated.rest.types.DetokenizeResponse processDetokenizeBatch(com.skyflow.generated.rest.resources.recordservice.requests.DetokenizeRequest batch) { return this.getRecordsApi().detokenize(batch); } @@ -270,13 +276,14 @@ private com.skyflow.generated.rest.types.DetokenizeResponse processDetokenizeBat ExecutorService executor = Executors.newFixedThreadPool(insertConcurrencyLimit); List> batches = Utils.createBatches(records, insertBatchSize); List> futures = new ArrayList<>(); + Upsert upsert = insertRequest.getUpsert().isPresent() ? insertRequest.getUpsert().get() : null; try { for (int batchIndex = 0; batchIndex < batches.size(); batchIndex++) { List batch = batches.get(batchIndex); int batchNumber = batchIndex; CompletableFuture future = CompletableFuture - .supplyAsync(() -> insertBatch(batch, insertRequest.getTableName().get()), executor) + .supplyAsync(() -> insertBatch(batch, insertRequest.getTableName().get(), upsert), executor) .thenApply(response -> formatResponse(response, batchNumber, insertBatchSize)) .exceptionally(ex -> { errorRecords.addAll(handleBatchException(ex, batch, batchNumber)); @@ -290,11 +297,12 @@ private com.skyflow.generated.rest.types.DetokenizeResponse processDetokenizeBat return futures; } - private InsertResponse insertBatch(List batch, String tableName) { + private InsertResponse insertBatch(List batch, String tableName, Upsert upsert) { com.skyflow.generated.rest.resources.recordservice.requests.InsertRequest req = com.skyflow.generated.rest.resources.recordservice.requests.InsertRequest.builder() .vaultId(this.getVaultConfig().getVaultId()) .tableName(tableName) .records(batch) + .upsert(upsert) .build(); return this.getRecordsApi().insert(req); } diff --git a/v3/test/java/com/skyflow/vault/controller/VaultControllerTests.java b/v3/test/java/com/skyflow/vault/controller/VaultControllerTests.java index 7ffa1d3a..811a9891 100644 --- a/v3/test/java/com/skyflow/vault/controller/VaultControllerTests.java +++ b/v3/test/java/com/skyflow/vault/controller/VaultControllerTests.java @@ -2,21 +2,30 @@ import com.skyflow.config.Credentials; import com.skyflow.config.VaultConfig; +import com.skyflow.errors.ErrorCode; +import com.skyflow.errors.ErrorMessage; import com.skyflow.errors.SkyflowException; import com.skyflow.utils.Constants; import com.skyflow.utils.validations.Validations; import com.skyflow.vault.data.InsertRequest; -import org.junit.*; +import org.junit.After; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; + import java.io.FileWriter; import java.io.IOException; -import java.lang.reflect.Method; import java.lang.reflect.Field; -import java.util.*; +import java.lang.reflect.Method; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.Scanner; + import static org.junit.Assert.*; public class VaultControllerTests { - private static final String ENV_PATH = "/home/saib/skyflow3/skyflow-java/v3/.env"; + private static final String ENV_PATH = "./.env"; private VaultConfig vaultConfig; private Credentials credentials; @@ -42,14 +51,28 @@ public void tearDown() { } private void writeEnv(String content) { - try (FileWriter writer = new FileWriter(ENV_PATH)) { + java.io.File envFile = new java.io.File(ENV_PATH); + java.io.File parentDir = envFile.getParentFile(); + if (parentDir != null && !parentDir.exists()) { + parentDir.mkdirs(); // Create parent directory if it doesn't exist + } + try (FileWriter writer = new FileWriter(envFile)) { writer.write(content); } catch (IOException e) { throw new RuntimeException(e); } + // Print the contents of the .env file + try (Scanner scanner = new Scanner(envFile)) { + System.out.println("Current .env contents:"); + while (scanner.hasNextLine()) { + System.out.println(scanner.nextLine()); + } + } catch (IOException e) { + System.out.println("Could not read .env file: " + e.getMessage()); + } } - private VaultController createController() { + private VaultController createController() throws SkyflowException { return new VaultController(vaultConfig, credentials); } @@ -66,7 +89,7 @@ private void invokeConfigureInsertConcurrencyAndBatchSize(VaultController contro method.invoke(controller, totalRequests); } - private ArrayList> generateValues(int noOfRecords){ + private ArrayList> generateValues(int noOfRecords) { ArrayList> values = new ArrayList<>(); for (int i = 0; i < noOfRecords; i++) { values.add(new HashMap<>()); @@ -121,9 +144,19 @@ public void testValidation_valuesIsEmpty() { @Test public void testValidation_upsertIsEmpty() throws SkyflowException { - InsertRequest req = InsertRequest.builder().table("table1").values(generateValues(1)).upsert(new ArrayList<>()).build(); - // Should not throw, just logs a warning - Validations.validateInsertRequest(req); + try { + InsertRequest req = InsertRequest.builder() + .table("table1") + .values(generateValues(1)) + .upsert(new ArrayList<>()) + .build(); + Validations.validateInsertRequest(req); + } catch (SkyflowException e) { + Assert.assertEquals(ErrorCode.INVALID_INPUT.getCode(), e.getHttpCode()); + Assert.assertEquals(ErrorMessage.EmptyUpsertValues.getMessage(), e.getMessage()); + } catch (Exception e) { + Assert.fail(e.getMessage()); + } } @@ -285,8 +318,6 @@ public void testConcurrencyZeroOrNegative() throws Exception { assertEquals(min, getPrivateInt(controller, "insertConcurrencyLimit")); - - writeEnv("INSERT_CONCURRENCY_LIMIT=-5"); try { @@ -342,7 +373,8 @@ public void testHighConcurrencyForLowRecords() throws Exception { try { controller.bulkInsert(insertRequest); - } catch (Exception ignored) {} + } catch (Exception ignored) { + } // Only 10 batches needed, so concurrency should be clamped to 10 assertEquals(1000, getPrivateInt(controller, "insertBatchSize")); @@ -354,11 +386,12 @@ public void testHighConcurrencyForLowRecords() throws Exception { public void testFractionalLastBatch() throws Exception { writeEnv("INSERT_BATCH_SIZE=100"); VaultController controller = createController(); - InsertRequest insertRequest = InsertRequest.builder().table("table1").values(generateValues(10050)).build(); + InsertRequest insertRequest = InsertRequest.builder().table("table1").values(generateValues(9050)).build(); try { controller.bulkInsert(insertRequest); - } catch (Exception ignored) {} + } catch (Exception ignored) { + } // Last batch should have 50 records, concurrency should be 101 assertEquals(100, getPrivateInt(controller, "insertBatchSize")); diff --git a/v3/test/java/com/skyflow/vault/data/InsertTests.java b/v3/test/java/com/skyflow/vault/data/InsertTests.java index 7ea17d89..ff71fa19 100644 --- a/v3/test/java/com/skyflow/vault/data/InsertTests.java +++ b/v3/test/java/com/skyflow/vault/data/InsertTests.java @@ -211,11 +211,11 @@ public void testEmptyUpsertInInsertRequestValidations() { InsertRequest request = InsertRequest.builder().table(table).values(values).upsert(new ArrayList<>()).build(); try { Validations.validateInsertRequest(request); -// Assert.fail(EXCEPTION_NOT_THROWN); + Assert.fail(EXCEPTION_NOT_THROWN); } catch (SkyflowException e) { Assert.assertEquals(ErrorCode.INVALID_INPUT.getCode(), e.getHttpCode()); Assert.assertEquals( - Utils.parameterizedString(ErrorMessage.EmptyUpsert.getMessage(), Constants.SDK_PREFIX), + Utils.parameterizedString(ErrorMessage.EmptyUpsertValues.getMessage(), Constants.SDK_PREFIX), e.getMessage() ); }