WIP: Add RPS based load option #65
Conversation
bf19e71 to
439df72
Compare
| return logging.getLogger("user") | ||
|
|
||
| def _user_loop(self, test_end_time): | ||
| while self.stop_q.empty(): |
There was a problem hiding this comment.
Just a thought for the future. What if we have the main process SIGTERM (or SIGUSR1) the subprocesses as a stop message and write a custom signal handler to clean up?
There was a problem hiding this comment.
Yeah totally, that's long overdue. I was looking into it and should work on it sometime.
| log_reader_thread = logging_utils.init_logging(args.log_level, logger_q) | ||
|
|
||
| # Create processes and their Users | ||
| schedule_q = mp_ctx.Queue(1) |
There was a problem hiding this comment.
| schedule_q = mp_ctx.Queue(1) | |
| schedule_q = mp_ctx.Queue(1) | |
| schedule_q.cancel_join_thread() |
Toggle cancel_join_thread() here to avoid the queue blocking on exit.
| # Initialize the request_q with 2*concurrency requests | ||
| for query in dataset.get_next_n_queries(2 * concurrency): | ||
| dataset_q.put(query) | ||
| request_q.put((None, query)) |
There was a problem hiding this comment.
From a clarity perspective I think it would be better to have this be a dict or object. E.g.
| request_q.put((None, query)) | |
| request_q.put(dict(query=query, req_time=None)) |
or set a field on the query dict.
There was a problem hiding this comment.
Using a field in the query dict is much more elegant!
There was a problem hiding this comment.
Yeah, I like the idea of making it a field in the query dict.
|
|
||
| return |
There was a problem hiding this comment.
Drop this return?
| return |
There was a problem hiding this comment.
It doesn't but I wonder if adding a dedicated try-catch exception block in this function worth it. We currently catch all the cascade exceptions with the generic Exception class in the main function but it's probably not the cleanest way to handle the exception IMO.
Not suggesting this should be addressed in this PR but a follow-up PR to cleanup our exception handling might be good.
npalaska
left a comment
There was a problem hiding this comment.
Some minor nits and comments but this looks ready to go.
| load_options: | ||
| type: constant #Future options: loadgen, stair-step | ||
| concurrency: 1 | ||
| type: rps #Options: concurrency, rps, loadgen, stair-step |
There was a problem hiding this comment.
Any reason you replaced the word constant load type with concurrency? imo, constant sounds more closer to Constant Load which is a Continuous stream of requests.
There was a problem hiding this comment.
I was thinking that constant is ambiguous, as RPS can also be constant. My other thinking is that we might later add dynamically changing RPS or dynamically changing concurrency so either RPS or concurrency could be constant or dynamic.
|
|
||
| return |
There was a problem hiding this comment.
It doesn't but I wonder if adding a dedicated try-catch exception block in this function worth it. We currently catch all the cascade exceptions with the generic Exception class in the main function but it's probably not the cleanest way to handle the exception IMO.
Not suggesting this should be addressed in this PR but a follow-up PR to cleanup our exception handling might be good.
|
|
||
| def main_loop_concurrency_mode(dataset, request_q, start_time, end_time): | ||
| """Let all users send requests repeatedly until end_time""" | ||
| logging.info("Test from main process") |
There was a problem hiding this comment.
Do we still need this logging statement here?
There was a problem hiding this comment.
No, I'll remove this thanks!
| # Initialize the request_q with 2*concurrency requests | ||
| for query in dataset.get_next_n_queries(2 * concurrency): | ||
| dataset_q.put(query) | ||
| request_q.put((None, query)) |
There was a problem hiding this comment.
Yeah, I like the idea of making it a field in the query dict.
| result.output_tokens_before_timeout = result.output_tokens | ||
| result.output_text = response | ||
|
|
||
| result.calculate_results() |
There was a problem hiding this comment.
I wonder if its time to depreciate the caikit_client_plugin?
|
|
||
| result.end_time = time.time() | ||
|
|
||
| result.calculate_results() |
There was a problem hiding this comment.
When we do a cleanup we probably should remove this file.
There was a problem hiding this comment.
Yeah, this was originally added with the thought that it could be used in some test cases but we may want to remove it depending on how we decide to handle testing (unit tests, e2e tests, etc...)
| return logging.getLogger("user") | ||
|
|
||
| def _user_loop(self, test_end_time): | ||
| while self.stop_q.empty(): |
There was a problem hiding this comment.
Yeah totally, that's long overdue. I was looking into it and should work on it sometime.
| except queue.Empty: | ||
| # if timeout passes, queue.Empty will be thrown | ||
| # User should check if stop_q has been set, else poll again | ||
| # self.debug.info("User waiting for a request to be scheduled") |
There was a problem hiding this comment.
Should this line be uncommented?
No description provided.