@@ -110,11 +110,11 @@ public function execute(string $endpoint, string $path, array $options)
110110 $ readTimeout = (int )($ options ["-read-timeout " ] ?? Defaults::READ_TIMEOUT );
111111 $ writeForks = ((int )($ options ["-write-rps " ] ?? Defaults::WRITE_RPS )) / Defaults::RPS_PER_WRITE_FORK ;
112112 $ writeTimeout = (int )($ options ["-write-timeout " ] ?? Defaults::WRITE_TIMEOUT );
113- $ time = (int )($ options ["-time " ] ?? Defaults::READ_TIME ) - 5 ;
113+ $ time = (int )($ options ["-time " ] ?? Defaults::READ_TIME );
114114 $ shutdownTime = (int )($ options ["-shutdown-time " ] ?? Defaults::SHUTDOWN_TIME );
115115
116116 $ this ->queueId = ftok (__FILE__ , 'm ' );
117- $ msgQueue = msg_get_queue ($ this ->queueId );
117+ msg_remove_queue ( msg_get_queue ($ this ->queueId ) );
118118
119119 $ pIds = [];
120120
@@ -128,8 +128,8 @@ public function execute(string $endpoint, string $path, array $options)
128128 }, $ readForks );
129129 $ pIds = array_merge ($ pIds , $ readPIds );
130130
131- $ writePIds = $ this ->forkJob (function (int $ i ) use ($ endpoint , $ path , $ tableName , $ initialDataCount , $ time , $ readTimeout , $ shutdownTime , $ startTime ) {
132- $ this ->writeJob ($ endpoint , $ path , $ tableName , $ initialDataCount , $ time , $ readTimeout , $ i , $ shutdownTime , $ startTime );
131+ $ writePIds = $ this ->forkJob (function (int $ i ) use ($ endpoint , $ path , $ tableName , $ initialDataCount , $ time , $ writeTimeout , $ shutdownTime , $ startTime ) {
132+ $ this ->writeJob ($ endpoint , $ path , $ tableName , $ initialDataCount , $ time , $ writeTimeout , $ i , $ shutdownTime , $ startTime );
133133 }, $ writeForks );
134134 $ pIds = array_merge ($ pIds , $ writePIds );
135135
@@ -209,13 +209,13 @@ protected function readJob(string $endpoint, string $path, $tableName, int $init
209209 Utils::retriedError ($ this ->queueId , 'write ' , get_class ($ e ));
210210 }
211211 ]);
212- Utils::metricDone ("read " , $ this ->queueId , $ attemps , $ this ->getLatency ($ begin ));
212+ Utils::metricDone ("read " , $ this ->queueId , $ attemps , $ this ->getLatencyMilliseconds ($ begin ));
213213 } catch (\Exception $ e ) {
214214 $ table ->getLogger ()->error ($ e ->getMessage ());
215- Utils::metricFail ("read " , $ this ->queueId , $ attemps , get_class ($ e ), $ this ->getLatency ($ begin ));
215+ Utils::metricFail ("read " , $ this ->queueId , $ attemps , get_class ($ e ), $ this ->getLatencyMilliseconds ($ begin ));
216216 } finally {
217217 $ i ++;
218- $ delay = $ this ->getDelay ($ startTime , Defaults::RPS_PER_READ_FORK , $ i );
218+ $ delay = $ this ->getDelayMicroseconds ($ startTime , Defaults::RPS_PER_READ_FORK , $ i );
219219 usleep ($ delay > 0 ? $ delay : 1 );
220220 }
221221 }
@@ -244,13 +244,13 @@ protected function writeJob(string $endpoint, string $path, $tableName, int $ini
244244 Utils::retriedError ($ this ->queueId , 'write ' , get_class ($ e ));
245245 }
246246 ]);
247- Utils::metricDone ("write " , $ this ->queueId , $ attemps , $ this ->getLatency ($ begin ));
247+ Utils::metricDone ("write " , $ this ->queueId , $ attemps , $ this ->getLatencyMilliseconds ($ begin ));
248248 } catch (\Exception $ e ) {
249249 $ table ->getLogger ()->error ($ e ->getMessage ());
250- Utils::metricFail ("write " , $ this ->queueId , $ attemps , get_class ($ e ), $ this ->getLatency ($ begin ));
250+ Utils::metricFail ("write " , $ this ->queueId , $ attemps , get_class ($ e ), $ this ->getLatencyMilliseconds ($ begin ));
251251 } finally {
252252 $ i ++;
253- $ delay = $ this ->getDelay ($ startTime , Defaults::RPS_PER_WRITE_FORK , $ i );
253+ $ delay = $ this ->getDelayMicroseconds ($ startTime , Defaults::RPS_PER_WRITE_FORK , $ i );
254254 usleep ($ delay > 0 ? $ delay : 1 );
255255 }
256256 }
@@ -288,7 +288,7 @@ protected function metricsJob(int $reportPeriod, float $startTime, int $time, st
288288 ]);
289289
290290 while (microtime (true ) <= $ startTime + $ time ) {
291- msg_receive ($ msgQueue , Utils::MSG_TYPE , $ msgType , 1024 , $ message );
291+ msg_receive ($ msgQueue , Utils::MSG_TYPE , $ msgType , Utils:: MESSAGE_SIZE_LIMIT_BYTES , $ message );
292292 $ queryLatencies ->observe ($ this ->getLatency ($ message ["sent " ]));
293293 switch ($ message ['type ' ]) {
294294 case 'reset ' :
@@ -325,14 +325,15 @@ protected function metricsJob(int $reportPeriod, float $startTime, int $time, st
325325 $ lastPushTime = microtime (true );
326326 }
327327 }
328+ msg_remove_queue ($ msgQueue );
328329 }
329330
330- protected function getLatency ( $ begin )
331+ protected function getLatencyMilliseconds ( float $ begin ): float
331332 {
332333 return (microtime (true ) - $ begin ) * 1000 ;
333334 }
334335
335- protected function getDelay (float $ startTime , int $ rps , int $ i )
336+ protected function getDelayMicroseconds (float $ startTime , int $ rps , int $ i ): float
336337 {
337338 return $ startTime * 1000000 + $ i * 1000000 / $ rps - microtime (true ) * 1000000 ;
338339 }
0 commit comments