@@ -809,71 +809,6 @@ public void testProcessDetokenizeSyncErrorPath() throws Exception {
809809 executor .shutdownNow ();
810810 }
811811
812- // ── Custom header options validation via VaultController ──────────────────
813-
814- @ Test
815- public void bulkInsert_nullOptions_doesNotThrowForHeaderValidation () throws SkyflowException {
816- VaultController controller = createController ();
817- InsertRequest request = InsertRequest .builder ()
818- .table ("tbl" )
819- .records (generateValues (1 ))
820- .build ();
821- try {
822- controller .bulkInsert (request , (InsertOptions ) null );
823- } catch (SkyflowException e ) {
824- assertFalse ("Should not throw EmptyValueInCustomHeaders for null options" ,
825- e .getMessage ().equals (ErrorMessage .EmptyValueInCustomHeaders .getMessage ()));
826- assertFalse ("Should not throw NullCustomHeaderKey for null options" ,
827- e .getMessage ().equals (ErrorMessage .NullCustomHeaderKey .getMessage ()));
828- }
829- }
830-
831- @ Test
832- public void bulkDetokenize_nullOptions_doesNotThrowForHeaderValidation () throws SkyflowException {
833- VaultController controller = createController ();
834- DetokenizeRequest request = DetokenizeRequest .builder ()
835- .tokens (getTokens (1 ))
836- .build ();
837- try {
838- controller .bulkDetokenize (request , (DetokenizeOptions ) null );
839- } catch (SkyflowException e ) {
840- assertFalse ("Should not throw header error for null options" ,
841- e .getMessage ().equals (ErrorMessage .EmptyValueInCustomHeaders .getMessage ()));
842- }
843- }
844-
845- @ Test
846- public void bulkDeleteTokens_nullOptions_doesNotThrowForHeaderValidation () throws SkyflowException {
847- VaultController controller = createController ();
848- DeleteTokensRequest request = DeleteTokensRequest .builder ()
849- .tokens (getTokens (1 ))
850- .build ();
851- try {
852- controller .bulkDeleteTokens (request , (DeleteTokensOptions ) null );
853- } catch (SkyflowException e ) {
854- assertFalse ("Should not throw header error for null options" ,
855- e .getMessage ().equals (ErrorMessage .EmptyValueInCustomHeaders .getMessage ()));
856- assertFalse ("Should not throw NullCustomHeaderKey for null options" ,
857- e .getMessage ().equals (ErrorMessage .NullCustomHeaderKey .getMessage ()));
858- }
859- }
860-
861- @ Test
862- public void bulkTokenize_nullOptions_doesNotThrowForHeaderValidation () throws SkyflowException {
863- VaultController controller = createController ();
864- ArrayList <TokenizeRecord > data = new ArrayList <>();
865- data .add (TokenizeRecord .builder ().value ("val" ).build ());
866- TokenizeRequest request = TokenizeRequest .builder ().data (data ).build ();
867- try {
868- controller .bulkTokenize (request , (TokenizeOptions ) null );
869- } catch (SkyflowException e ) {
870- assertFalse ("Should not throw header error for null options" ,
871- e .getMessage ().equals (ErrorMessage .EmptyValueInCustomHeaders .getMessage ()));
872- assertFalse ("Should not throw NullCustomHeaderKey for null options" ,
873- e .getMessage ().equals (ErrorMessage .NullCustomHeaderKey .getMessage ()));
874- }
875- }
876-
877812 // ── configureDeleteTokensConcurrencyAndBatchSize ──────────────────────────
878813
879814 @ Test
@@ -1172,6 +1107,153 @@ public void interceptor_contextHasCorrectBatchIndexAndTotal() throws Exception {
11721107 executor .shutdownNow ();
11731108 }
11741109
1110+ @ Test
1111+ public void interceptor_operationIsTokenize_inTokenizeBatchFutures () throws Exception {
1112+ VaultController controller = createController ();
1113+ setPrivateField (controller , "tokenizeBatchSize" , 1 );
1114+
1115+ java .util .List <String > capturedOps = java .util .Collections .synchronizedList (new ArrayList <>());
1116+ RequestInterceptor interceptor = ctx -> capturedOps .add (ctx .getOperation ());
1117+
1118+ ArrayList <com .skyflow .vault .data .TokenizeRecord > data = new ArrayList <>();
1119+ data .add (com .skyflow .vault .data .TokenizeRecord .builder ().value ("v1" ).tokenGroupNames (Collections .singletonList ("g1" )).build ());
1120+ data .add (com .skyflow .vault .data .TokenizeRecord .builder ().value ("v2" ).tokenGroupNames (Collections .singletonList ("g1" )).build ());
1121+ com .skyflow .vault .data .TokenizeRequest tokenizeRequest = com .skyflow .vault .data .TokenizeRequest .builder ().data (data ).build ();
1122+
1123+ Method getRequestBody = VaultController .class .getSuperclass ()
1124+ .getDeclaredMethod ("getTokenizeRequestBody" , com .skyflow .vault .data .TokenizeRequest .class );
1125+ getRequestBody .setAccessible (true );
1126+ com .skyflow .generated .rest .resources .flowservice .requests .V1FlowTokenizeRequest requestObj =
1127+ (com .skyflow .generated .rest .resources .flowservice .requests .V1FlowTokenizeRequest )
1128+ getRequestBody .invoke (controller , tokenizeRequest );
1129+
1130+ List <com .skyflow .generated .rest .resources .flowservice .requests .V1FlowTokenizeRequest > batches =
1131+ com .skyflow .utils .Utils .createTokenizeBatches (requestObj , 1 );
1132+
1133+ Method method = VaultController .class .getDeclaredMethod (
1134+ "tokenizeBatchFutures" , ExecutorService .class , List .class , RequestInterceptor .class );
1135+ method .setAccessible (true );
1136+ ExecutorService executor = Executors .newFixedThreadPool (1 );
1137+ method .invoke (controller , executor , batches , interceptor );
1138+
1139+ Assert .assertEquals (batches .size (), capturedOps .size ());
1140+ for (String op : capturedOps ) {
1141+ Assert .assertEquals ("TOKENIZE" , op );
1142+ }
1143+ executor .shutdownNow ();
1144+ }
1145+
1146+ @ Test
1147+ public void interceptor_batchIndexAndTotal_inDeleteTokensBatchFutures () throws Exception {
1148+ VaultController controller = createController ();
1149+ setPrivateField (controller , "deleteTokensBatchSize" , 1 );
1150+
1151+ java .util .List <Integer > capturedIndices = java .util .Collections .synchronizedList (new ArrayList <>());
1152+ java .util .concurrent .atomic .AtomicInteger capturedTotal = new java .util .concurrent .atomic .AtomicInteger (-1 );
1153+ RequestInterceptor interceptor = ctx -> {
1154+ capturedIndices .add (ctx .getBatchIndex ());
1155+ capturedTotal .set (ctx .getTotalBatches ());
1156+ };
1157+
1158+ List <String > tokens = Arrays .asList ("t1" , "t2" , "t3" );
1159+ DeleteTokensRequest deleteRequest = DeleteTokensRequest .builder ().tokens (tokens ).build ();
1160+ Method getRequestBody = VaultController .class .getSuperclass ()
1161+ .getDeclaredMethod ("getDeleteTokensRequestBody" , DeleteTokensRequest .class );
1162+ getRequestBody .setAccessible (true );
1163+ com .skyflow .generated .rest .resources .flowservice .requests .V1FlowDeleteTokenRequest requestObj =
1164+ (com .skyflow .generated .rest .resources .flowservice .requests .V1FlowDeleteTokenRequest )
1165+ getRequestBody .invoke (controller , deleteRequest );
1166+
1167+ List <com .skyflow .generated .rest .resources .flowservice .requests .V1FlowDeleteTokenRequest > batches =
1168+ com .skyflow .utils .Utils .createDeleteTokensBatches (requestObj , 1 );
1169+
1170+ Method method = VaultController .class .getDeclaredMethod (
1171+ "deleteTokensBatchFutures" , ExecutorService .class , List .class , RequestInterceptor .class );
1172+ method .setAccessible (true );
1173+ ExecutorService executor = Executors .newFixedThreadPool (1 );
1174+ method .invoke (controller , executor , batches , interceptor );
1175+
1176+ Assert .assertEquals (Arrays .asList (0 , 1 , 2 ), capturedIndices );
1177+ Assert .assertEquals (3 , capturedTotal .get ());
1178+ executor .shutdownNow ();
1179+ }
1180+
1181+ @ Test
1182+ public void interceptor_isCalledOncePerBatch_inDetokenizeBatchFutures () throws Exception {
1183+ VaultController controller = createController ();
1184+ setPrivateField (controller , "detokenizeBatchSize" , 2 );
1185+
1186+ java .util .concurrent .atomic .AtomicInteger callCount = new java .util .concurrent .atomic .AtomicInteger (0 );
1187+ java .util .List <String > capturedOps = java .util .Collections .synchronizedList (new ArrayList <>());
1188+ RequestInterceptor interceptor = ctx -> {
1189+ callCount .incrementAndGet ();
1190+ capturedOps .add (ctx .getOperation ());
1191+ };
1192+
1193+ DetokenizeRequest request = DetokenizeRequest .builder ().tokens (getTokens (4 )).build ();
1194+ Method getRequestBody = VaultController .class .getSuperclass ()
1195+ .getDeclaredMethod ("getDetokenizeRequestBody" , DetokenizeRequest .class );
1196+ getRequestBody .setAccessible (true );
1197+ com .skyflow .generated .rest .resources .flowservice .requests .V1FlowDetokenizeRequest requestObj =
1198+ (com .skyflow .generated .rest .resources .flowservice .requests .V1FlowDetokenizeRequest )
1199+ getRequestBody .invoke (controller , request );
1200+
1201+ List <com .skyflow .generated .rest .resources .flowservice .requests .V1FlowDetokenizeRequest > batches =
1202+ com .skyflow .utils .Utils .createDetokenizeBatches (requestObj , 2 );
1203+
1204+ Method method = VaultController .class .getDeclaredMethod (
1205+ "detokenizeBatchFutures" , ExecutorService .class , List .class , List .class , RequestInterceptor .class );
1206+ method .setAccessible (true );
1207+ ExecutorService executor = Executors .newFixedThreadPool (1 );
1208+ List <ErrorRecord > errors = new ArrayList <>();
1209+ method .invoke (controller , executor , batches , errors , interceptor );
1210+
1211+ Assert .assertEquals ("Interceptor called once per batch" , batches .size (), callCount .get ());
1212+ for (String op : capturedOps ) {
1213+ Assert .assertEquals ("DETOKENIZE" , op );
1214+ }
1215+ executor .shutdownNow ();
1216+ }
1217+
1218+ @ Test
1219+ public void interceptor_isCalledOncePerBatch_inInsertBatchFutures () throws Exception {
1220+ VaultController controller = createController ();
1221+ setPrivateField (controller , "insertBatchSize" , 2 );
1222+
1223+ java .util .concurrent .atomic .AtomicInteger callCount = new java .util .concurrent .atomic .AtomicInteger (0 );
1224+ java .util .List <String > capturedOps = java .util .Collections .synchronizedList (new ArrayList <>());
1225+ RequestInterceptor interceptor = ctx -> {
1226+ callCount .incrementAndGet ();
1227+ capturedOps .add (ctx .getOperation ());
1228+ };
1229+
1230+ InsertRequest insertRequest = InsertRequest .builder ()
1231+ .table ("test-table" )
1232+ .records (generateValues (4 ))
1233+ .build ();
1234+ Method getRequestBody = VaultController .class .getSuperclass ()
1235+ .getDeclaredMethod ("getBulkInsertRequestBody" , InsertRequest .class , VaultConfig .class );
1236+ getRequestBody .setAccessible (true );
1237+ com .skyflow .generated .rest .resources .flowservice .requests .V1InsertRequest requestObj =
1238+ (com .skyflow .generated .rest .resources .flowservice .requests .V1InsertRequest )
1239+ getRequestBody .invoke (controller , insertRequest , vaultConfig );
1240+
1241+ Method method = VaultController .class .getDeclaredMethod (
1242+ "insertBatchFutures" ,
1243+ com .skyflow .generated .rest .resources .flowservice .requests .V1InsertRequest .class ,
1244+ List .class ,
1245+ RequestInterceptor .class );
1246+ method .setAccessible (true );
1247+ List <ErrorRecord > errorRecords = new ArrayList <>();
1248+ method .invoke (controller , requestObj , errorRecords , interceptor );
1249+
1250+ // 4 records / batchSize 2 = 2 batches
1251+ Assert .assertEquals (2 , callCount .get ());
1252+ for (String op : capturedOps ) {
1253+ Assert .assertEquals ("INSERT" , op );
1254+ }
1255+ }
1256+
11751257 // ── Private helpers ───────────────────────────────────────────────────────
11761258
11771259 private void invokeConfigureDeleteTokensConcurrencyAndBatchSize (VaultController controller , int totalRequests ) throws Exception {
0 commit comments