Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 9 additions & 8 deletions v3/src/main/java/com/skyflow/utils/Utils.java
Original file line number Diff line number Diff line change
@@ -1,5 +1,10 @@
package com.skyflow.utils;

import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

import com.google.gson.JsonObject;
import com.skyflow.enums.Env;
import com.skyflow.errors.ErrorCode;
Expand All @@ -17,14 +22,10 @@
import com.skyflow.vault.data.ErrorRecord;
import com.skyflow.vault.data.Success;
import com.skyflow.vault.data.Token;

import io.github.cdimascio.dotenv.Dotenv;
import io.github.cdimascio.dotenv.DotenvException;

import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

public final class Utils extends BaseUtils {

public static String getVaultURL(String clusterId, Env env) {
Expand Down Expand Up @@ -92,14 +93,14 @@ public static ErrorRecord createErrorRecord(Map<String, Object> recordMap, int i
}

public static List<ErrorRecord> handleBatchException(
Throwable ex, List<InsertRecordData> batch, int batchNumber
Throwable ex, List<InsertRecordData> batch, int batchNumber, int batchSize
) {
List<ErrorRecord> errorRecords = new ArrayList<>();
Throwable cause = ex.getCause();
if (cause instanceof ApiClientApiException) {
ApiClientApiException apiException = (ApiClientApiException) cause;
Map<String, Object> responseBody = (Map<String, Object>) apiException.body();
int indexNumber = batchNumber > 0 ? batchNumber * batch.size() : 0;
int indexNumber = batchNumber > 0 ? batchNumber * batchSize : 0;
if (responseBody != null) {
if (responseBody.containsKey("records")) {
Object recordss = responseBody.get("records");
Expand All @@ -124,7 +125,7 @@ public static List<ErrorRecord> handleBatchException(
}
}
} else {
int indexNumber = batchNumber > 0 ? batchNumber * batch.size() : 0;
int indexNumber = batchNumber > 0 ? batchNumber * batchSize: 0;
for (int j = 0; j < batch.size(); j++) {
ErrorRecord err = new ErrorRecord(indexNumber, ex.getMessage(), 500);
errorRecords.add(err);
Expand Down
75 changes: 26 additions & 49 deletions v3/src/main/java/com/skyflow/vault/controller/VaultController.java
Original file line number Diff line number Diff line change
@@ -1,5 +1,13 @@
package com.skyflow.vault.controller;

import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

import com.google.gson.Gson;
import com.google.gson.GsonBuilder;
import com.skyflow.VaultClient;
Expand All @@ -17,20 +25,15 @@
import com.skyflow.utils.Utils;
import com.skyflow.utils.logger.LogUtil;
import com.skyflow.utils.validations.Validations;
import com.skyflow.vault.data.*;
import io.github.cdimascio.dotenv.Dotenv;
import io.github.cdimascio.dotenv.DotenvException;

import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import com.skyflow.vault.data.DetokenizeRequest;
import com.skyflow.vault.data.DetokenizeResponse;
import com.skyflow.vault.data.DetokenizeResponseObject;
import com.skyflow.vault.data.ErrorRecord;
import com.skyflow.vault.data.InsertRecord;
import com.skyflow.vault.data.InsertRequest;
import com.skyflow.vault.data.Success;

import static com.skyflow.utils.Utils.*;
import io.github.cdimascio.dotenv.Dotenv;

public final class VaultController extends VaultClient {
private static final Gson gson = new GsonBuilder().serializeNulls().create();
Expand Down Expand Up @@ -250,9 +253,9 @@ private List<CompletableFuture<DetokenizeResponse>> detokenizeBatchFutures(Execu
int batchNumber = batchIndex;
CompletableFuture<DetokenizeResponse> future = CompletableFuture
.supplyAsync(() -> processDetokenizeBatch(batch), executor)
.thenApply(response -> formatDetokenizeResponse(response, batchNumber, detokenizeBatchSize))
.thenApply(response -> Utils.formatDetokenizeResponse(response, batchNumber, detokenizeBatchSize))
.exceptionally(ex -> {
errorTokens.addAll(handleDetokenizeBatchException(ex, batch, batchNumber, detokenizeBatchSize));
errorTokens.addAll(Utils.handleDetokenizeBatchException(ex, batch, batchNumber, detokenizeBatchSize));
return null;
});
futures.add(future);
Expand Down Expand Up @@ -285,9 +288,9 @@ private com.skyflow.generated.rest.types.DetokenizeResponse processDetokenizeBat
int batchNumber = batchIndex;
CompletableFuture<com.skyflow.vault.data.InsertResponse> future = CompletableFuture
.supplyAsync(() -> insertBatch(batch, insertRequest.getTableName().isPresent() ? insertRequest.getTableName().get() : null, upsert), executor)
.thenApply(response -> formatResponse(response, batchNumber, insertBatchSize))
.thenApply(response -> Utils.formatResponse(response, batchNumber, insertBatchSize))
.exceptionally(ex -> {
errorRecords.addAll(handleBatchException(ex, batch, batchNumber));
errorRecords.addAll(Utils.handleBatchException(ex, batch, batchNumber, insertBatchSize));
return null;
});
futures.add(future);
Expand All @@ -313,22 +316,9 @@ private InsertResponse insertBatch(List<InsertRecordData> batch, String tableNam

private void configureInsertConcurrencyAndBatchSize(int totalRequests) {
try {
String userProvidedBatchSize = System.getenv("INSERT_BATCH_SIZE");
String userProvidedConcurrencyLimit = System.getenv("INSERT_CONCURRENCY_LIMIT");

Dotenv dotenv = null;
try {
dotenv = Dotenv.load();
} catch (DotenvException ignored) {
// ignore the case if .env file is not found
}

if (userProvidedBatchSize == null && dotenv != null) {
userProvidedBatchSize = dotenv.get("INSERT_BATCH_SIZE");
}
if (userProvidedConcurrencyLimit == null && dotenv != null) {
userProvidedConcurrencyLimit = dotenv.get("INSERT_CONCURRENCY_LIMIT");
}
Dotenv dotenv = Dotenv.load();
String userProvidedBatchSize = dotenv.get("INSERT_BATCH_SIZE");
String userProvidedConcurrencyLimit = dotenv.get("INSERT_CONCURRENCY_LIMIT");

if (userProvidedBatchSize != null) {
try {
Expand Down Expand Up @@ -382,22 +372,9 @@ private void configureInsertConcurrencyAndBatchSize(int totalRequests) {

private void configureDetokenizeConcurrencyAndBatchSize(int totalRequests) {
try {
String userProvidedBatchSize = System.getenv("DETOKENIZE_BATCH_SIZE");
String userProvidedConcurrencyLimit = System.getenv("DETOKENIZE_BATCH_SIZE");

Dotenv dotenv = null;
try {
dotenv = Dotenv.load();
} catch (DotenvException ignored) {
// ignore the case if .env file is not found
}

if (userProvidedBatchSize == null && dotenv != null) {
userProvidedBatchSize = dotenv.get("DETOKENIZE_BATCH_SIZE");
}
if (userProvidedConcurrencyLimit == null && dotenv != null) {
userProvidedConcurrencyLimit = dotenv.get("DETOKENIZE_BATCH_SIZE");
}
Dotenv dotenv = Dotenv.load();
String userProvidedBatchSize = dotenv.get("DETOKENIZE_BATCH_SIZE");
String userProvidedConcurrencyLimit = dotenv.get("DETOKENIZE_CONCURRENCY_LIMIT");

if (userProvidedBatchSize != null) {
try {
Expand Down
Loading
Loading