2929import java .time .Duration ;
3030import java .util .ArrayList ;
3131import java .util .Calendar ;
32+ import java .util .Collection ;
33+ import java .util .LinkedHashSet ;
3234import java .util .List ;
35+ import java .util .ListIterator ;
3336import java .util .TreeMap ;
3437import java .util .concurrent .TimeUnit ;
38+ import java .util .concurrent .atomic .AtomicInteger ;
3539
3640import javax .xml .namespace .QName ;
3741import javax .xml .parsers .ParserConfigurationException ;
@@ -783,8 +787,7 @@ public void testRawCombinedQueryPathIndex() throws Exception
783787 createForest (testMultipleForest [1 ], testMultipleDB );
784788 associateRESTServerWithDB (restServerName , testMultipleDB );
785789 setupAppServicesConstraint (testMultipleDB );
786- Thread .sleep (60000 );
787-
790+
788791 String [] filenames = { "pathindex1.xml" , "pathindex2.xml" };
789792 String combinedQueryFileName = "combinedQueryOptionPathIndex.xml" ;
790793
@@ -812,7 +815,7 @@ public void testRawCombinedQueryPathIndex() throws Exception
812815
813816 wbatcher .flushAndWait ();
814817 wbatcher .awaitCompletion ();
815-
818+
816819 // get the combined query
817820 File file = new File (combQueryFileDir + combinedQueryFileName );
818821
@@ -824,11 +827,18 @@ public void testRawCombinedQueryPathIndex() throws Exception
824827
825828 StringBuilder batchResults = new StringBuilder ();
826829 StringBuilder batchFailResults = new StringBuilder ();
830+
827831
828832 // Run a QueryBatcher on the new URIs.
829833 QueryBatcher queryBatcher1 = dmManagerTmp .newQueryBatcher (querydef );
830834
831835 queryBatcher1 .onUrisReady (batch -> {
836+ try {
837+ Thread .sleep (5000 );
838+ } catch (InterruptedException e ) {
839+ // TODO Auto-generated catch block
840+ e .printStackTrace ();
841+ }
832842 for (String str : batch .getItems ()) {
833843 batchResults .append (str )
834844 .append ('|' );
@@ -851,7 +861,7 @@ public void testRawCombinedQueryPathIndex() throws Exception
851861
852862 // Verify the batch results now.
853863 String [] res = batchResults .toString ().split ("\\ |" );
854- assertEquals ("Number of reults returned is incorrect" , 2 , res .length );
864+ assertEquals ("Number of results returned is incorrect" , 2 , res .length );
855865 assertTrue ("URI returned not correct" , res [0 ].contains ("pathindex1.xml" ) ? true : (res [1 ].contains ("pathindex1.xml" ) ? true : false ));
856866 assertTrue ("URI returned not correct" , res [0 ].contains ("pathindex2.xml" ) ? true : (res [1 ].contains ("pathindex2.xml" ) ? true : false ));
857867 }
@@ -2270,6 +2280,7 @@ public void testMinHostWithHostAvailabilityListener() throws Exception
22702280 public void testProgressListener () throws Exception {
22712281 DatabaseClient clientTmp = null ;
22722282 try {
2283+ System .out .println ("Running testProgressListener" );
22732284 DataMovementManager dmManager = null ;
22742285
22752286 clientTmp = getDatabaseClient ("eval-user" , "x" , getConnType ());
@@ -2411,4 +2422,77 @@ public void testProgressListener() throws Exception {
24112422 clientTmp .release ();
24122423 }
24132424 }
2425+
2426+ @ Test
2427+ public void testQueryBatcherWithIterator () throws Exception {
2428+ DatabaseClient clientTmp = null ;
2429+ try {
2430+ System .out .println ("Running testQueryBatcherWithIterator" );
2431+ DataMovementManager dmManager = null ;
2432+ String uri = null ;
2433+ Collection <String > uris = new LinkedHashSet <String >();
2434+ Collection <String > uris20 = new LinkedHashSet <String >();
2435+ Collection <String > batchValueResults = new LinkedHashSet <String >();
2436+ AtomicInteger count = new AtomicInteger (0 );
2437+
2438+ clientTmp = getDatabaseClient ("eval-user" , "x" , getConnType ());
2439+ dmManager = clientTmp .newDataMovementManager ();
2440+ QueryManager queryMgr = clientTmp .newQueryManager ();
2441+
2442+ String jsonDoc = "{" + "\" employees\" : [" + "{ \" firstName\" :\" Juan\" , \" lastName\" :\" Baptiste\" },"
2443+ + "{ \" firstName\" :\" Tress\" , \" lastName\" :\" Bossmann\" },"
2444+ + "{ \" firstName\" :\" Micheal\" , \" lastName\" :\" Fox\" }]" + "}" ;
2445+ WriteBatcher wbatcher = dmManager .newWriteBatcher ();
2446+ wbatcher .withBatchSize (600 );
2447+ wbatcher .onBatchFailure ((batch , throwable ) -> throwable .printStackTrace ());
2448+ StringHandle handle = new StringHandle ();
2449+ handle .set (jsonDoc );
2450+
2451+ // Insert 6 K documents
2452+ for (int i = 0 ; i < 6000 ; i ++) {
2453+ uri = "/QBIteratorTest" + i + ".json" ;
2454+ wbatcher .add (uri , handle );
2455+ uris .add (uri );
2456+ if (i < 20 ) {
2457+ uris20 .add (uri );
2458+ }
2459+ }
2460+
2461+ wbatcher .flushAndWait ();
2462+ // Read all 6000 docs in a batch using an iterator.
2463+
2464+ QueryBatcher queryBatcher = dmManager .newQueryBatcher (uris .iterator ())
2465+ .withBatchSize (20 )
2466+ .withThreadCount (1 );
2467+
2468+ queryBatcher .onUrisReady (batch -> {
2469+ System .out .println ("Batch results are: " + batch .getJobResultsSoFar ());
2470+ // Get 1 batch only for comparison.
2471+ if (count .get () == 0 ) {
2472+ for (String str : batch .getItems ()) {
2473+ batchValueResults .add (str );
2474+ }
2475+ count .incrementAndGet ();
2476+ }
2477+ }
2478+ )
2479+ .onQueryFailure (throwable -> {
2480+ System .out .println ("Exceptions thrown from callback onQueryFailure in testMinHostWithHostAvailabilityListener" );
2481+ throwable .printStackTrace ();
2482+
2483+ });
2484+
2485+ JobTicket jobTicket = dmManager .startJob (queryBatcher );
2486+ queryBatcher .awaitCompletion ();
2487+ System .out .println ("First batch comprisions" );
2488+
2489+ assertTrue ("Batch 1 incorrect" , batchValueResults .equals (uris20 ));
2490+ }
2491+ catch (Exception ex ) {
2492+ ex .printStackTrace ();
2493+ }
2494+ finally {
2495+ clientTmp .release ();
2496+ }
2497+ }
24142498}
0 commit comments