11/*
22 * Copyright 2020 by OLTPBenchmark Project
33 *
4- * Licensed under the Apache License, Version 2.0 (the "License");
5- * you may not use this file except in compliance with the License.
6- * You may obtain a copy of the License at
4+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
5+ * in compliance with the License. You may obtain a copy of the License at
76 *
8- * http://www.apache.org/licenses/LICENSE-2.0
7+ * http://www.apache.org/licenses/LICENSE-2.0
98 *
10- * Unless required by applicable law or agreed to in writing, software
11- * distributed under the License is distributed on an "AS IS" BASIS,
12- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13- * See the License for the specific language governing permissions and
14- * limitations under the License.
9+ * Unless required by applicable law or agreed to in writing, software distributed under the License
10+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
11+ * or implied. See the License for the specific language governing permissions and limitations under
12+ * the License.
1513 *
1614 */
1715
2119import com .oltpbenchmark .api .BenchmarkModule ;
2220import com .oltpbenchmark .api .TransactionType ;
2321import com .oltpbenchmark .api .Worker ;
22+ import com .oltpbenchmark .api .collectors .monitoring .Monitor ;
23+ import com .oltpbenchmark .api .collectors .monitoring .MonitorGen ;
2424import com .oltpbenchmark .types .State ;
25+ import com .oltpbenchmark .util .MonitorInfo ;
2526import com .oltpbenchmark .util .StringUtil ;
2627import java .util .*;
2728import org .apache .commons .collections4 .map .ListOrderedMap ;
3031
3132public class ThreadBench implements Thread .UncaughtExceptionHandler {
3233 private static final Logger LOG = LoggerFactory .getLogger (ThreadBench .class );
34+ // Determines how long (in ms) to wait until monitoring thread rejoins the
35+ // main thread.
36+ private static final int MONITOR_REJOIN_TIME = 60000 ;
3337
3438 private final BenchmarkState testState ;
3539 private final List <? extends Worker <? extends BenchmarkModule >> workers ;
3640 private final ArrayList <Thread > workerThreads ;
3741 private final List <WorkloadConfiguration > workConfs ;
3842 private final ArrayList <LatencyRecord .Sample > samples = new ArrayList <>();
39- private final int intervalMonitor ;
43+ private final MonitorInfo monitorInfo ;
44+
45+ private Monitor monitor = null ;
4046
4147 private ThreadBench (
4248 List <? extends Worker <? extends BenchmarkModule >> workers ,
4349 List <WorkloadConfiguration > workConfs ,
44- int intervalMonitoring ) {
50+ MonitorInfo monitorInfo ) {
4551 this .workers = workers ;
4652 this .workConfs = workConfs ;
4753 this .workerThreads = new ArrayList <>(workers .size ());
48- this .intervalMonitor = intervalMonitoring ;
54+ this .monitorInfo = monitorInfo ;
4955 this .testState = new BenchmarkState (workers .size () + 1 );
5056 }
5157
5258 public static Results runRateLimitedBenchmark (
5359 List <Worker <? extends BenchmarkModule >> workers ,
5460 List <WorkloadConfiguration > workConfs ,
55- int intervalMonitoring ) {
56- ThreadBench bench = new ThreadBench (workers , workConfs , intervalMonitoring );
61+ MonitorInfo monitorInfo ) {
62+ ThreadBench bench = new ThreadBench (workers , workConfs , monitorInfo );
5763 return bench .runRateLimitedMultiPhase ();
5864 }
5965
@@ -88,10 +94,9 @@ private int finalizeWorkers(ArrayList<Thread> workerThreads) throws InterruptedE
8894 // to terminate... hands otherwise
8995
9096 /*
91- * // CARLO: Maybe we might want to do this to kill threads that are
92- * hanging... if (workerThreads.get(i).isAlive()) {
93- * workerThreads.get(i).kill(); try { workerThreads.get(i).join(); }
94- * catch (InterruptedException e) { } }
97+ * // CARLO: Maybe we might want to do this to kill threads that are hanging... if
98+ * (workerThreads.get(i).isAlive()) { workerThreads.get(i).kill(); try {
99+ * workerThreads.get(i).join(); } catch (InterruptedException e) { } }
95100 */
96101
97102 requests += workers .get (i ).getRequests ();
@@ -116,17 +121,11 @@ private Results runRateLimitedMultiPhase() {
116121 this .createWorkerThreads ();
117122
118123 // long measureStart = start;
124+ Phase phase = null ;
119125
120- long startTs = System .currentTimeMillis ();
121- long start = System .nanoTime ();
122- long warmupStart = System .nanoTime ();
123- long warmup = warmupStart ;
124- long measureEnd = -1 ;
125126 // used to determine the longest sleep interval
126127 double lowestRate = Double .MAX_VALUE ;
127128
128- Phase phase = null ;
129-
130129 for (WorkloadState workState : workStates ) {
131130 workState .switchToNextPhase ();
132131 phase = workState .getCurrentPhase ();
@@ -145,6 +144,12 @@ private Results runRateLimitedMultiPhase() {
145144 }
146145 }
147146
147+ long startTs = System .currentTimeMillis ();
148+ long start = System .nanoTime ();
149+ long warmupStart = System .nanoTime ();
150+ long warmup = warmupStart ;
151+ long measureEnd = -1 ;
152+
148153 long intervalNs = getInterval (lowestRate , phase .getArrival ());
149154
150155 long nextInterval = start + intervalNs ;
@@ -157,8 +162,11 @@ private Results runRateLimitedMultiPhase() {
157162 boolean lastEntry = false ;
158163
159164 // Initialize the Monitor
160- if (this .intervalMonitor > 0 ) {
161- new MonitorThread (this .intervalMonitor ).start ();
165+ if (this .monitorInfo .getMonitoringInterval () > 0 ) {
166+ this .monitor =
167+ MonitorGen .getMonitor (
168+ this .monitorInfo , this .testState , this .workers , this .workConfs .get (0 ));
169+ this .monitor .start ();
162170 }
163171
164172 // Allow workers to start work.
@@ -301,6 +309,18 @@ private Results runRateLimitedMultiPhase() {
301309 }
302310 }
303311
312+ // Stop the monitoring thread separately from cleanup all the workers so we can ignore errors
313+ // from these threads (including possible SQLExceptions), but not the others.
314+ try {
315+ if (this .monitor != null ) {
316+ this .monitor .interrupt ();
317+ this .monitor .join (MONITOR_REJOIN_TIME );
318+ this .monitor .tearDown ();
319+ }
320+ } catch (Exception e ) {
321+ LOG .error (e .getMessage (), e );
322+ }
323+
304324 try {
305325 int requests = finalizeWorkers (this .workerThreads );
306326
@@ -528,42 +548,4 @@ public void run() {
528548 }
529549 }
530550 }
531-
532- private class MonitorThread extends Thread {
533- private final int intervalMonitor ;
534-
535- {
536- this .setDaemon (true );
537- }
538-
539- /**
540- * @param interval How long to wait between polling in milliseconds
541- */
542- MonitorThread (int interval ) {
543- this .intervalMonitor = interval ;
544- }
545-
546- @ Override
547- public void run () {
548- LOG .info ("Starting MonitorThread Interval [{}ms]" , this .intervalMonitor );
549- while (true ) {
550- try {
551- Thread .sleep (this .intervalMonitor );
552- } catch (InterruptedException ex ) {
553- return ;
554- }
555-
556- // Compute the last throughput
557- long measuredRequests = 0 ;
558- synchronized (testState ) {
559- for (Worker <?> w : workers ) {
560- measuredRequests += w .getAndResetIntervalRequests ();
561- }
562- }
563- double seconds = this .intervalMonitor / 1000d ;
564- double tps = (double ) measuredRequests / seconds ;
565- LOG .info ("Throughput: {} txn/sec" , tps );
566- }
567- }
568- }
569551}
0 commit comments