Skip to content

Commit 232405d

Browse files
georgeajitgeorgeajit
authored andcommitted
#1095 - New tests for setMaxBatches methods.
1 parent 3d6f026 commit 232405d

File tree

1 file changed

+156
-113
lines changed

1 file changed

+156
-113
lines changed

marklogic-client-api-functionaltests/src/test/java/com/marklogic/client/datamovement/functionaltests/QueryBatcherJobReportTest.java

Lines changed: 156 additions & 113 deletions
Original file line numberDiff line numberDiff line change
@@ -16,16 +16,19 @@
1616
package com.marklogic.client.datamovement.functionaltests;
1717

1818
import static org.junit.Assert.assertEquals;
19+
import static org.junit.Assert.assertTrue;
1920

2021
import java.io.File;
2122
import java.io.IOException;
2223
import java.util.ArrayList;
2324
import java.util.Arrays;
2425
import java.util.Calendar;
26+
import java.util.Collection;
2527
import java.util.Collections;
2628
import java.util.HashMap;
2729
import java.util.HashSet;
2830
import java.util.Iterator;
31+
import java.util.LinkedHashSet;
2932
import java.util.List;
3033
import java.util.Map;
3134
import java.util.Set;
@@ -41,13 +44,10 @@
4144
import org.junit.Assert;
4245
import org.junit.Before;
4346
import org.junit.BeforeClass;
44-
import org.junit.Ignore;
4547
import org.junit.Test;
4648

4749
import com.fasterxml.jackson.databind.JsonNode;
4850
import com.marklogic.client.DatabaseClient;
49-
import com.marklogic.client.DatabaseClientFactory;
50-
import com.marklogic.client.DatabaseClientFactory.SecurityContext;
5151
import com.marklogic.client.admin.ExtensionMetadata;
5252
import com.marklogic.client.admin.TransformExtensionsManager;
5353
import com.marklogic.client.datamovement.ApplyTransformListener;
@@ -78,7 +78,6 @@ public class QueryBatcherJobReportTest extends BasicJavaClientREST {
7878
private static final String TEST_DIR_PREFIX = "/WriteHostBatcher-testdata/";
7979

8080
private static DatabaseClient dbClient;
81-
private static String host = null;
8281
private static String user = "admin";
8382
private static int port = 8000;
8483
private static String password = "admin";
@@ -107,7 +106,7 @@ public static void setUpBeforeClass() throws Exception {
107106
server = getRestAppServerName();
108107
port = getRestAppServerPort();
109108

110-
host = getRestAppServerHostName();
109+
getRestAppServerHostName();
111110
dbClient = getDatabaseClient(user, password, getConnType());
112111
dmManager = dbClient.newDataMovementManager();
113112
hostNames = getHosts();
@@ -349,6 +348,7 @@ public void queryFailures() throws Exception {
349348
e.printStackTrace();
350349
}
351350
throwable.getBatcher().retry(throwable);
351+
// We need an NullPointerException. Hence these statements.
352352
String s = null;
353353
s.length();
354354

@@ -600,135 +600,178 @@ public void stopTransformJobTest() throws Exception {
600600
Assert.assertEquals(successCount.get(), successBatch.size());
601601
}
602602

603-
// Remove Ignore and add setMaxUris()
604-
/* Test 1 setMaxUris(1895) - Complete this test.
605-
* Test 2 setMaxUris() -- reset?
606-
* Test 3 setMaxUris() on non iterator based QB.
607-
* Test 4 setMaxUris() called after dmManager start - negative
608-
* Test 5 setMaxUris() called from multiple Threads on the diff QueryBatchers on same Manager
603+
/* Test 1 setMaxBatches(2035) - maximum specified in advance
604+
* Test 2 setMaxBatches() -- the uris collected thus far during a job
609605
*
610606
*/
611-
@Ignore
607+
@Test
612608
public void testStopBeforeListenerisComplete() throws Exception {
613-
System.out.println("In testStopBeforeListenerisComplete method");
609+
try {
610+
clearDB(port);
611+
System.out.println("In testStopBeforeListenerisComplete method");
614612

615-
final String query1 = "fn:count(fn:doc())";
616-
final AtomicInteger count = new AtomicInteger(0);
617-
final AtomicInteger failedBatch = new AtomicInteger(0);
618-
final AtomicInteger successBatch = new AtomicInteger(0);
619-
620-
ArrayList<String> urisList = new ArrayList<String>();
613+
final String query1 = "fn:count(fn:doc())";
614+
final AtomicInteger count = new AtomicInteger(0);
615+
final AtomicInteger failedBatch = new AtomicInteger(0);
616+
final AtomicInteger successBatch = new AtomicInteger(0);
617+
618+
final AtomicInteger failedBatch2 = new AtomicInteger(0);
619+
final AtomicInteger successBatch2 = new AtomicInteger(0);
621620

622-
WriteBatcher batcher = dmManager.newWriteBatcher();
623-
batcher.withBatchSize(99);
624-
batcher.withThreadCount(10);
621+
ArrayList<String> urisList = new ArrayList<String>();
625622

626-
batcher.onBatchSuccess(batch -> {
623+
WriteBatcher batcher = dmManager.newWriteBatcher();
624+
batcher.withBatchSize(99);
625+
batcher.withThreadCount(10);
627626

628-
}).onBatchFailure((batch, throwable) -> {
629-
throwable.printStackTrace();
627+
batcher.onBatchSuccess(batch -> {
630628

631-
});
632-
dmManager.startJob(batcher);
629+
}).onBatchFailure((batch, throwable) -> {
630+
throwable.printStackTrace();
631+
632+
});
633+
dmManager.startJob(batcher);
633634

634-
class writeDocsThread implements Runnable {
635+
class writeDocsThread implements Runnable {
635636

636-
@Override
637-
public void run() {
637+
@Override
638+
public void run() {
638639

639-
for (int j = 0; j < 50000; j++) {
640-
String uri = "/local/json-" + j + "-" + Thread.currentThread().getId();
641-
System.out.println("Thread name: " + Thread.currentThread().getName() + " URI:" + uri);
642-
urisList.add(uri);
643-
batcher.add(uri, fileHandle);
640+
for (int j = 0; j < 50000; j++) {
641+
String uri = "/local/json-" + j + "-" + Thread.currentThread().getId();
642+
System.out.println("Thread name: " + Thread.currentThread().getName() + " URI:" + uri);
643+
urisList.add(uri);
644+
batcher.add(uri, fileHandle);
645+
}
646+
batcher.flushAndWait();
644647
}
645-
batcher.flushAndWait();
646648
}
647-
}
648649

649-
class CountRunnable implements Runnable {
650+
class CountRunnable implements Runnable {
650651

651-
@Override
652-
public void run() {
653-
try {
654-
Thread.currentThread().sleep(15000L);
655-
} catch (InterruptedException e) {
656-
// TODO Auto-generated catch block
657-
e.printStackTrace();
658-
}
659-
Set threads = Thread.getAllStackTraces().keySet();
660-
Iterator<Thread> iter = threads.iterator();
661-
while (iter.hasNext()) {
662-
Thread t = iter.next();
663-
if (t.getName().contains("pool-1-thread-"))
664-
System.out.println(t.getName());
665-
count.incrementAndGet();
652+
@Override
653+
public void run() {
654+
try {
655+
Thread.currentThread().sleep(15000L);
656+
} catch (InterruptedException e) {
657+
// TODO Auto-generated catch block
658+
e.printStackTrace();
659+
}
660+
Set<Thread> threads = Thread.getAllStackTraces().keySet();
661+
Iterator<Thread> iter = threads.iterator();
662+
while (iter.hasNext()) {
663+
Thread t = iter.next();
664+
if (t.getName().contains("pool-1-thread-"))
665+
System.out.println(t.getName());
666+
count.incrementAndGet();
667+
}
666668
}
667669
}
668-
}
669-
Thread countT;
670-
countT = new Thread(new CountRunnable());
670+
Thread countT;
671+
countT = new Thread(new CountRunnable());
672+
673+
Thread t1;
674+
t1 = new Thread(new writeDocsThread());
675+
676+
countT.start();
677+
t1.start();
678+
countT.join();
679+
680+
t1.join();
681+
int docCnt = dbClient.newServerEval().xquery(query1).eval().next().getNumber().intValue();
682+
System.out.println("Doc count is " + docCnt);
683+
Assert.assertTrue( docCnt == 50000);
684+
685+
Collection<String> batchResults = new LinkedHashSet<String>();
686+
QueryBatcher qb = dmManager.newQueryBatcher(urisList.iterator())
687+
.withBatchSize(12)
688+
.withThreadCount(1)
689+
.withJobId("ListenerCompletionTest")
690+
.onUrisReady((QueryBatch batch) -> {
691+
692+
for (String str : batch.getItems()) {
693+
batchResults.add(str);
694+
}
695+
successBatch.addAndGet(1);
696+
})
697+
.onQueryFailure(throwable-> {
698+
failedBatch.addAndGet(1);
699+
});
700+
// Test 1 - Set max uris that can be collected in advance of the job.
701+
qb.setMaxBatches(2035);
702+
703+
class MaxBatchesThread implements Runnable {
704+
705+
@Override
706+
public void run() {
707+
try {
708+
Thread.currentThread().sleep(3000);
709+
} catch (InterruptedException e) {
710+
// TODO Auto-generated catch block
711+
e.printStackTrace();
712+
}
713+
dmManager.stopJob(qb);
714+
}
715+
}
671716

672-
Thread t1;
673-
t1 = new Thread(new writeDocsThread());
674-
675-
countT.start();
676-
t1.start();
677-
countT.join();
717+
JobTicket jobTicket = dmManager.startJob(qb);
678718

679-
t1.join();
680-
681-
Assert.assertTrue(dbClient.newServerEval().xquery(query1).eval().next().getNumber().intValue() == 5000);
682-
QueryBatcher qb = dmManager.newQueryBatcher(urisList.iterator())
683-
.withBatchSize(5)
684-
.withThreadCount(2)
685-
.withJobId("ListenerCompletionTest")
686-
// TODO Test 1 setMaxUris(1895)
687-
.onUrisReady((QueryBatch batch) -> {
688-
StringBuffer batchResults = new StringBuffer();
689-
for (String str : batch.getItems()) {
690-
691-
batchResults.append(str);
692-
batchResults.append("|");
693-
}
694-
System.out.println(batchResults.toString());
695-
batchResults = null;
696-
batchResults = new StringBuffer();
697-
successBatch.addAndGet(1);
698-
})
699-
.onQueryFailure(throwable-> {
700-
failedBatch.addAndGet(1);
701-
});
702-
703-
class stopDMManager implements Runnable {
704-
705-
@Override
706-
public void run() {
707-
try {
708-
Thread.currentThread().sleep(5000);
709-
} catch (InterruptedException e) {
710-
// TODO Auto-generated catch block
711-
e.printStackTrace();
712-
}
713-
714-
dmManager.stopJob(qb);
719+
Thread tMBStop;
720+
tMBStop = new Thread(new MaxBatchesThread());
721+
722+
tMBStop.start();
723+
tMBStop.join();
724+
725+
// Validate Test 1 setMaxBatches(2035)
726+
System.out.println("Max URIs size is : " + batchResults.size());
727+
/* 12 times 2035 equals 24420 with one Thread on QueryBatcher.
728+
* With thread count > 1 on QueryBatcher, batchResults size is less than 24420.
729+
*/
730+
assertTrue("Stop QueryBatcher with setMaxBatches set to 2035 is incorrect", batchResults.size() == 24420);
731+
732+
/* Test 2 setMaxBatches()
733+
734+
*/
735+
Collection<String> batchResults2 = new LinkedHashSet<String>();
736+
QueryBatcher qb2 = dmManager.newQueryBatcher(urisList.iterator())
737+
.withBatchSize(12)
738+
.withThreadCount(1)
739+
.withJobId("ListenerCompletionTest2")
740+
.onUrisReady((QueryBatch batch) -> {
741+
742+
for (String str : batch.getItems()) {
743+
batchResults2.add(str);
744+
}
745+
successBatch2.addAndGet(1);
746+
})
747+
.onQueryFailure(throwable-> {
748+
failedBatch2.addAndGet(1);
749+
});
750+
751+
class BatchesSoFarThread implements Runnable {
752+
753+
@Override
754+
public void run() {
755+
// Test 2
756+
qb2.setMaxBatches();
715757
}
758+
}
759+
760+
JobTicket jobTicket2 = dmManager.startJob(qb2);
761+
762+
Thread tMBStop2;
763+
tMBStop2 = new Thread(new BatchesSoFarThread());
764+
765+
tMBStop2.start();
766+
tMBStop2.join();
767+
System.out.println("URI size so far is : " + batchResults2.size());
768+
assertTrue("Batches of URIs collected so far", batchResults.size() > 0);
716769
}
717-
718-
JobTicket jobTicket = dmManager.startJob(qb);
719-
qb.awaitCompletion();
720-
Thread tQBStop;
721-
tQBStop = new Thread(new stopDMManager());
722-
723-
tQBStop.start();
724-
tQBStop.join();
725-
// Validate Test 1 setMaxUris(1895)
726-
727-
/* Test 2 setMaxUris() -- reset?
728-
* Test 3 setMaxUris() on non iterator based QB.
729-
* Test 4 setMaxUris() called after dmManager start - negative
730-
* Test 5 setMaxUris() called from multiple Threads on the diff QueryBatchers on same Manager
731-
*/
770+
catch (Exception ex) {
771+
ex.printStackTrace();
772+
}
773+
finally {
732774
clearDB(port);
775+
}
733776
}
734777
}

0 commit comments

Comments
 (0)