Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions common/src/main/java/com/skyflow/errors/ErrorMessage.java
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,7 @@ public enum ErrorMessage {
EmptyKeyInTokens("%s0 Validation error. Invalid key tokens. Specify a valid key."),
EmptyValueInTokens("%s0 Validation error. Invalid value in tokens. Specify a valid value."),
EmptyUpsert("%s0 Validation error. 'upsert' key can't be empty. Specify an upsert column."),
EmptyUpsertValues("%s0 Validation error. Upsert column values can't be empty. Specify at least one upsert column."),
HomogenousNotSupportedWithUpsert("%s0 Validation error. 'homogenous' is not supported with 'upsert'. Specify either 'homogenous' or 'upsert'."),
TokensPassedForTokenModeDisable("%s0 Validation error. 'tokenMode' wasn't specified. Set 'tokenMode' to 'ENABLE' to insert tokens."),
NoTokensWithTokenMode("%s0 Validation error. Tokens weren't specified for records while 'tokenMode' was %s1. Specify tokens."),
Expand Down
1 change: 1 addition & 0 deletions common/src/main/java/com/skyflow/logs/ErrorLogs.java
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ public enum ErrorLogs {
EMPTY_OR_NULL_VALUE_IN_VALUES("Invalid %s1 request. Value can not be null or empty in values for key \"%s2\"."),
EMPTY_OR_NULL_KEY_IN_VALUES("Invalid %s1 request. Key can not be null or empty in values"),
EMPTY_UPSERT("Invalid %s1 request. Upsert can not be empty."),
EMPTY_UPSERT_VALUES("Invalid %s1 request. Upsert values can not be empty."),
HOMOGENOUS_NOT_SUPPORTED_WITH_UPSERT("Invalid %s1 request. Homogenous is not supported when upsert is passed."),
TOKENS_NOT_ALLOWED_WITH_TOKEN_MODE_DISABLE("Invalid %s1 request. Tokens are not allowed when tokenMode is DISABLE."),
TOKENS_REQUIRED_WITH_TOKEN_MODE("Invalid %s1 request. Tokens are required when tokenMode is %s2."),
Expand Down
2 changes: 1 addition & 1 deletion v3/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
</parent>

<artifactId>skyflow-java</artifactId>
<version>2.0.0-beta.4-dev.4b56b3d</version>
<version>2.0.0-beta.4-dev.d3257dc</version>
<packaging>jar</packaging>
<name>${project.groupId}:${project.artifactId}</name>
<description>Skyflow V3 SDK for the Java programming language</description>
Expand Down
9 changes: 5 additions & 4 deletions v3/src/main/java/com/skyflow/VaultClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -131,6 +131,7 @@ protected void updateExecutorInHTTP() {
.build();
apiClientBuilder.httpClient(httpClient);
}

protected InsertRequest getBulkInsertRequestBody(com.skyflow.vault.data.InsertRequest request, VaultConfig config) throws SkyflowException {
List<HashMap<String, Object>> values = request.getValues();
List<InsertRecordData> insertRecordDataList = new ArrayList<>();
Expand All @@ -142,12 +143,12 @@ protected InsertRequest getBulkInsertRequestBody(com.skyflow.vault.data.InsertRe
.vaultId(config.getVaultId())
.records(insertRecordDataList)
.tableName(request.getTable());
if(request.getUpsert() != null && !request.getUpsert().isEmpty()){
if (request.getUpsert() != null && !request.getUpsert().isEmpty()) {
if (request.getUpsertType() != null) {
EnumUpdateType updateType = null;
if(request.getUpsertType() == UpdateType.REPLACE){
if (request.getUpsertType() == UpdateType.REPLACE) {
updateType = EnumUpdateType.REPLACE;
} else if (request.getUpsertType() == UpdateType.REPLACE) {
} else if (request.getUpsertType() == UpdateType.UPDATE) {
updateType = EnumUpdateType.UPDATE;
}
Upsert upsert = Upsert.builder().uniqueColumns(request.getUpsert()).updateType(updateType).build();
Expand All @@ -167,7 +168,7 @@ protected com.skyflow.generated.rest.resources.recordservice.requests.Detokenize
com.skyflow.generated.rest.resources.recordservice.requests.DetokenizeRequest.builder()
.vaultId(this.vaultConfig.getVaultId())
.tokens(tokens);
if (request.getTokenGroupRedactions() != null){
if (request.getTokenGroupRedactions() != null) {
List<com.skyflow.generated.rest.types.TokenGroupRedactions> tokenGroupRedactionsList = new ArrayList<>();
for (com.skyflow.vault.data.TokenGroupRedactions tokenGroupRedactions : request.getTokenGroupRedactions()) {
com.skyflow.generated.rest.types.TokenGroupRedactions redactions =
Expand Down
12 changes: 5 additions & 7 deletions v3/src/main/java/com/skyflow/utils/validations/Validations.java
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@
import com.skyflow.errors.ErrorMessage;
import com.skyflow.errors.SkyflowException;
import com.skyflow.logs.ErrorLogs;
import com.skyflow.utils.BaseUtils;
import com.skyflow.utils.Utils;
import com.skyflow.utils.logger.LogUtil;
import com.skyflow.vault.data.DetokenizeRequest;
Expand All @@ -23,7 +22,6 @@ private Validations() {
super();
}

// add validations specific to v3 SDK
public static void validateInsertRequest(InsertRequest insertRequest) throws SkyflowException {
String table = insertRequest.getTable();
ArrayList<HashMap<String, Object>> values = insertRequest.getValues();
Expand All @@ -49,15 +47,15 @@ public static void validateInsertRequest(InsertRequest insertRequest) throws Sky
ErrorLogs.EMPTY_VALUES.getLog(), InterfaceName.INSERT.getName()
));
throw new SkyflowException(ErrorCode.INVALID_INPUT.getCode(), ErrorMessage.EmptyValues.getMessage());
} else if(values.size() > 10000) {
} else if (values.size() > 10000) {
LogUtil.printErrorLog(ErrorLogs.RECORD_SIZE_EXCEED.getLog());
throw new SkyflowException(ErrorCode.INVALID_INPUT.getCode(), ErrorMessage.RecordSizeExceedError.getMessage());
} else if (upsert != null && upsert.isEmpty()){
} else if (upsert != null && upsert.isEmpty()) {
LogUtil.printErrorLog(Utils.parameterizedString(
ErrorLogs.EMPTY_UPSERT.getLog(), InterfaceName.INSERT.getName()
ErrorLogs.EMPTY_UPSERT_VALUES.getLog(), InterfaceName.INSERT.getName()
));
throw new SkyflowException(ErrorCode.INVALID_INPUT.getCode(), ErrorMessage.EmptyUpsertValues.getMessage());
}
// upsert

for (HashMap<String, Object> valuesMap : values) {
for (String key : valuesMap.keySet()) {
Expand Down Expand Up @@ -88,7 +86,7 @@ public static void validateDetokenizeRequest(DetokenizeRequest request) throws S
throw new SkyflowException(ErrorCode.INVALID_INPUT.getCode(), ErrorMessage.DetokenizeRequestNull.getMessage());
}
List<String> tokens = request.getTokens();
if(tokens.size() > 10000) {
if (tokens.size() > 10000) {
LogUtil.printErrorLog(ErrorLogs.TOKENS_SIZE_EXCEED.getLog());
throw new SkyflowException(ErrorCode.INVALID_INPUT.getCode(), ErrorMessage.TokensSizeExceedError.getMessage());
}
Expand Down
28 changes: 18 additions & 10 deletions v3/src/main/java/com/skyflow/vault/controller/VaultController.java
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
import com.skyflow.generated.rest.core.ApiClientApiException;
import com.skyflow.generated.rest.types.InsertRecordData;
import com.skyflow.generated.rest.types.InsertResponse;
import com.skyflow.generated.rest.types.Upsert;
import com.skyflow.logs.ErrorLogs;
import com.skyflow.logs.InfoLogs;
import com.skyflow.logs.WarningLogs;
Expand All @@ -23,7 +24,10 @@
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.concurrent.*;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

import static com.skyflow.utils.Utils.*;

Expand Down Expand Up @@ -125,7 +129,7 @@ public DetokenizeResponse bulkDetokenize(DetokenizeRequest detokenizeRequest) th
}
}

public CompletableFuture<DetokenizeResponse> bulkDetokenizeAsync(DetokenizeRequest detokenizeRequest) throws SkyflowException{
public CompletableFuture<DetokenizeResponse> bulkDetokenizeAsync(DetokenizeRequest detokenizeRequest) throws SkyflowException {
LogUtil.printInfoLog(InfoLogs.DETOKENIZE_TRIGGERED.getLog());
ExecutorService executor = Executors.newFixedThreadPool(detokenizeConcurrencyLimit);
try {
Expand Down Expand Up @@ -161,13 +165,14 @@ public CompletableFuture<DetokenizeResponse> bulkDetokenizeAsync(DetokenizeReque
executor.shutdown();
return new DetokenizeResponse(successRecords, errorTokens, detokenizeRequest.getTokens());
});
} catch (Exception e){
} catch (Exception e) {
LogUtil.printErrorLog(ErrorLogs.DETOKENIZE_REQUEST_REJECTED.getLog());
throw new SkyflowException(e.getMessage());
} finally {
executor.shutdown();
}
}

private com.skyflow.vault.data.InsertResponse processSync(
com.skyflow.generated.rest.resources.recordservice.requests.InsertRequest insertRequest,
ArrayList<HashMap<String, Object>> originalPayload
Expand Down Expand Up @@ -207,11 +212,11 @@ private DetokenizeResponse processDetokenizeSync(
List<com.skyflow.generated.rest.resources.recordservice.requests.DetokenizeRequest> batches = Utils.createDetokenizeBatches(detokenizeRequest, detokenizeBatchSize);
try {
List<CompletableFuture<DetokenizeResponse>> futures = this.detokenizeBatchFutures(executor, batches, errorTokens);
try{
try {

CompletableFuture<Void> allFutures = CompletableFuture.allOf(futures.toArray(new CompletableFuture[0]));
allFutures.join();
} catch (Exception e){
} catch (Exception e) {
}
for (CompletableFuture<DetokenizeResponse> future : futures) {
DetokenizeResponse futureResponse = future.get();
Expand All @@ -224,7 +229,7 @@ private DetokenizeResponse processDetokenizeSync(
}
}
}
} catch (Exception e){
} catch (Exception e) {
LogUtil.printErrorLog(ErrorLogs.DETOKENIZE_REQUEST_REJECTED.getLog());
throw new SkyflowException(e.getMessage());
} finally {
Expand All @@ -240,7 +245,7 @@ private List<CompletableFuture<DetokenizeResponse>> detokenizeBatchFutures(Execu
try {

for (int batchIndex = 0; batchIndex < batches.size(); batchIndex++) {
com.skyflow.generated.rest.resources.recordservice.requests.DetokenizeRequest batch = batches.get(batchIndex);
com.skyflow.generated.rest.resources.recordservice.requests.DetokenizeRequest batch = batches.get(batchIndex);
int batchNumber = batchIndex;
CompletableFuture<DetokenizeResponse> future = CompletableFuture
.supplyAsync(() -> processDetokenizeBatch(batch), executor)
Expand All @@ -251,12 +256,13 @@ private List<CompletableFuture<DetokenizeResponse>> detokenizeBatchFutures(Execu
});
futures.add(future);
}
} catch (Exception e){
} catch (Exception e) {
ErrorRecord errorRecord = new ErrorRecord(0, e.getMessage(), 500);
errorTokens.add(errorRecord);
}
return futures;
}

private com.skyflow.generated.rest.types.DetokenizeResponse processDetokenizeBatch(com.skyflow.generated.rest.resources.recordservice.requests.DetokenizeRequest batch) {
return this.getRecordsApi().detokenize(batch);
}
Expand All @@ -270,13 +276,14 @@ private com.skyflow.generated.rest.types.DetokenizeResponse processDetokenizeBat
ExecutorService executor = Executors.newFixedThreadPool(insertConcurrencyLimit);
List<List<InsertRecordData>> batches = Utils.createBatches(records, insertBatchSize);
List<CompletableFuture<com.skyflow.vault.data.InsertResponse>> futures = new ArrayList<>();
Upsert upsert = insertRequest.getUpsert().isPresent() ? insertRequest.getUpsert().get() : null;

try {
for (int batchIndex = 0; batchIndex < batches.size(); batchIndex++) {
List<InsertRecordData> batch = batches.get(batchIndex);
int batchNumber = batchIndex;
CompletableFuture<com.skyflow.vault.data.InsertResponse> future = CompletableFuture
.supplyAsync(() -> insertBatch(batch, insertRequest.getTableName().get()), executor)
.supplyAsync(() -> insertBatch(batch, insertRequest.getTableName().get(), upsert), executor)
.thenApply(response -> formatResponse(response, batchNumber, insertBatchSize))
.exceptionally(ex -> {
errorRecords.addAll(handleBatchException(ex, batch, batchNumber));
Expand All @@ -290,11 +297,12 @@ private com.skyflow.generated.rest.types.DetokenizeResponse processDetokenizeBat
return futures;
}

private InsertResponse insertBatch(List<InsertRecordData> batch, String tableName) {
private InsertResponse insertBatch(List<InsertRecordData> batch, String tableName, Upsert upsert) {
com.skyflow.generated.rest.resources.recordservice.requests.InsertRequest req = com.skyflow.generated.rest.resources.recordservice.requests.InsertRequest.builder()
.vaultId(this.getVaultConfig().getVaultId())
.tableName(tableName)
.records(batch)
.upsert(upsert)
.build();
return this.getRecordsApi().insert(req);
}
Expand Down
41 changes: 30 additions & 11 deletions v3/test/java/com/skyflow/vault/controller/VaultControllerTests.java
Original file line number Diff line number Diff line change
Expand Up @@ -2,16 +2,25 @@

import com.skyflow.config.Credentials;
import com.skyflow.config.VaultConfig;
import com.skyflow.errors.ErrorCode;
import com.skyflow.errors.ErrorMessage;
import com.skyflow.errors.SkyflowException;
import com.skyflow.utils.Constants;
import com.skyflow.utils.validations.Validations;
import com.skyflow.vault.data.InsertRequest;
import org.junit.*;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;

import java.io.FileWriter;
import java.io.IOException;
import java.lang.reflect.Method;
import java.lang.reflect.Field;
import java.util.*;
import java.lang.reflect.Method;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Scanner;

import static org.junit.Assert.*;


Expand Down Expand Up @@ -80,7 +89,7 @@ private void invokeConfigureInsertConcurrencyAndBatchSize(VaultController contro
method.invoke(controller, totalRequests);
}

private ArrayList<HashMap<String, Object>> generateValues(int noOfRecords){
private ArrayList<HashMap<String, Object>> generateValues(int noOfRecords) {
ArrayList<HashMap<String, Object>> values = new ArrayList<>();
for (int i = 0; i < noOfRecords; i++) {
values.add(new HashMap<>());
Expand Down Expand Up @@ -135,9 +144,19 @@ public void testValidation_valuesIsEmpty() {

@Test
public void testValidation_upsertIsEmpty() throws SkyflowException {
InsertRequest req = InsertRequest.builder().table("table1").values(generateValues(1)).upsert(new ArrayList<>()).build();
// Should not throw, just logs a warning
Validations.validateInsertRequest(req);
try {
InsertRequest req = InsertRequest.builder()
.table("table1")
.values(generateValues(1))
.upsert(new ArrayList<>())
.build();
Validations.validateInsertRequest(req);
} catch (SkyflowException e) {
Assert.assertEquals(ErrorCode.INVALID_INPUT.getCode(), e.getHttpCode());
Assert.assertEquals(ErrorMessage.EmptyUpsertValues.getMessage(), e.getMessage());
} catch (Exception e) {
Assert.fail(e.getMessage());
}
}


Expand Down Expand Up @@ -299,8 +318,6 @@ public void testConcurrencyZeroOrNegative() throws Exception {
assertEquals(min, getPrivateInt(controller, "insertConcurrencyLimit"));




writeEnv("INSERT_CONCURRENCY_LIMIT=-5");

try {
Expand Down Expand Up @@ -356,7 +373,8 @@ public void testHighConcurrencyForLowRecords() throws Exception {

try {
controller.bulkInsert(insertRequest);
} catch (Exception ignored) {}
} catch (Exception ignored) {
}

// Only 10 batches needed, so concurrency should be clamped to 10
assertEquals(1000, getPrivateInt(controller, "insertBatchSize"));
Expand All @@ -372,7 +390,8 @@ public void testFractionalLastBatch() throws Exception {

try {
controller.bulkInsert(insertRequest);
} catch (Exception ignored) {}
} catch (Exception ignored) {
}

// Last batch should have 50 records, concurrency should be 101
assertEquals(100, getPrivateInt(controller, "insertBatchSize"));
Expand Down
4 changes: 2 additions & 2 deletions v3/test/java/com/skyflow/vault/data/InsertTests.java
Original file line number Diff line number Diff line change
Expand Up @@ -211,11 +211,11 @@ public void testEmptyUpsertInInsertRequestValidations() {
InsertRequest request = InsertRequest.builder().table(table).values(values).upsert(new ArrayList<>()).build();
try {
Validations.validateInsertRequest(request);
// Assert.fail(EXCEPTION_NOT_THROWN);
Assert.fail(EXCEPTION_NOT_THROWN);
} catch (SkyflowException e) {
Assert.assertEquals(ErrorCode.INVALID_INPUT.getCode(), e.getHttpCode());
Assert.assertEquals(
Utils.parameterizedString(ErrorMessage.EmptyUpsert.getMessage(), Constants.SDK_PREFIX),
Utils.parameterizedString(ErrorMessage.EmptyUpsertValues.getMessage(), Constants.SDK_PREFIX),
e.getMessage()
);
}
Expand Down
Loading