From a5b2e302c9162512bf18a4d225e0a5d0a0aa95f0 Mon Sep 17 00:00:00 2001 From: Mo Chen Date: Wed, 4 Mar 2026 14:17:34 -0600 Subject: [PATCH] thread_config: add startup polling for thread checks and skip on macOS MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit check_threads.py now uses a short bounded poll/retry window so thread-count validation doesn’t fail on startup races; the test is also skipped on macOS because per-thread introspection used by this check is not reliably available there. --- .../gold_tests/thread_config/check_threads.py | 204 +++++++++++------- .../thread_config/thread_config.test.py | 1 + 2 files changed, 122 insertions(+), 83 deletions(-) diff --git a/tests/gold_tests/thread_config/check_threads.py b/tests/gold_tests/thread_config/check_threads.py index 61a5414d875..448b8a5a3a8 100755 --- a/tests/gold_tests/thread_config/check_threads.py +++ b/tests/gold_tests/thread_config/check_threads.py @@ -19,92 +19,130 @@ import psutil import argparse +import os import sys - - -def count_threads(ts_path, etnet_threads, accept_threads, task_threads, aio_threads): - - for p in psutil.process_iter(['name', 'cwd', 'threads']): - - # Find the pid corresponding to the ats process we started in autest. - # It needs to match the process name and the binary path. - # If autest can expose the pid of the process this is not needed anymore. - if p.name() == '[TS_MAIN]' and p.cwd() == ts_path: - - etnet_check = set() - accept_check = set() - task_check = set() - aio_check = set() - - for t in p.threads(): - +import time + +COUNT_THREAD_WAIT_SECONDS = 10.0 +COUNT_THREAD_POLL_SECONDS = 0.1 + + +def _count_threads_once(ts_path, etnet_threads, accept_threads, task_threads, aio_threads): + """ + Return (code, message) for a single snapshot of ATS thread state. + """ + for p in psutil.process_iter(): + try: + # Find the pid corresponding to the ats process we started in autest. + # It needs to match the process name and the binary path. + # If autest can expose the pid of the process this is not needed anymore. + process_name = p.name() + process_cwd = p.cwd() + process_exe = p.exe() + if process_cwd != ts_path: + continue + if process_name != '[TS_MAIN]' and process_name != 'traffic_server' and os.path.basename( + process_exe) != 'traffic_server': + continue + except (psutil.NoSuchProcess, psutil.AccessDenied, psutil.ZombieProcess): + continue + + etnet_check = set() + accept_check = set() + task_check = set() + aio_check = set() + + try: + threads = p.threads() + except (psutil.NoSuchProcess, psutil.AccessDenied, psutil.ZombieProcess): + return 1, 'Could not inspect ATS process threads.' + + for t in threads: + try: # Get the name of the thread. thread_name = psutil.Process(t.id).name() - - if thread_name.startswith('[ET_NET'): - - # Get the id of this thread and check if it's in range. - etnet_id = int(thread_name.split(' ')[1][:-1]) - if etnet_id >= etnet_threads: - sys.stderr.write('Too many ET_NET threads created.\n') - return 2 - elif etnet_id in etnet_check: - sys.stderr.write('ET_NET thread with duplicate thread id created.\n') - return 3 - else: - etnet_check.add(etnet_id) - - elif thread_name.startswith('[ACCEPT'): - - # Get the id of this thread and check if it's in range. - accept_id = int(thread_name.split(' ')[1].split(':')[0]) - if accept_id >= accept_threads: - sys.stderr.write('Too many ACCEPT threads created.\n') - return 5 - else: - accept_check.add(accept_id) - - elif thread_name.startswith('[ET_TASK'): - - # Get the id of this thread and check if it's in range. - task_id = int(thread_name.split(' ')[1][:-1]) - if task_id >= task_threads: - sys.stderr.write('Too many ET_TASK threads created.\n') - return 7 - elif task_id in task_check: - sys.stderr.write('ET_TASK thread with duplicate thread id created.\n') - return 8 - else: - task_check.add(task_id) - - elif thread_name.startswith('[ET_AIO'): - - # Get the id of this thread and check if it's in range. - aio_id = int(thread_name.split(' ')[1].split(':')[0]) - if aio_id >= aio_threads: - sys.stderr.write('Too many ET_AIO threads created.\n') - return 10 - else: - aio_check.add(aio_id) - - # Check the size of the sets, must be equal to the expected size. - if len(etnet_check) != etnet_threads: - sys.stderr.write('Expected ET_NET threads: {0}, found: {1}.\n'.format(etnet_threads, len(etnet_check))) - return 4 - elif len(accept_check) != accept_threads: - sys.stderr.write('Expected ACCEPT threads: {0}, found: {1}.\n'.format(accept_threads, len(accept_check))) - return 6 - elif len(task_check) != task_threads: - sys.stderr.write('Expected ET_TASK threads: {0}, found: {1}.\n'.format(task_threads, len(task_check))) - return 9 - elif len(aio_check) != aio_threads: - sys.stderr.write('Expected ET_AIO threads: {0}, found: {1}.\n'.format(aio_threads, len(aio_check))) - return 11 - else: - return 0 - - # Return 1 if no pid is found to match the ats process. - return 1 + except (psutil.NoSuchProcess, psutil.AccessDenied, psutil.ZombieProcess): + # A thread can disappear while we inspect; treat as transient. + continue + + if thread_name.startswith('[ET_NET'): + + # Get the id of this thread and check if it's in range. + etnet_id = int(thread_name.split(' ')[1][:-1]) + if etnet_id >= etnet_threads: + return 2, 'Too many ET_NET threads created.' + elif etnet_id in etnet_check: + return 3, 'ET_NET thread with duplicate thread id created.' + else: + etnet_check.add(etnet_id) + + elif thread_name.startswith('[ACCEPT'): + + # Get the id of this thread and check if it's in range. + accept_id = int(thread_name.split(' ')[1].split(':')[0]) + if accept_id >= accept_threads: + return 5, 'Too many ACCEPT threads created.' + else: + accept_check.add(accept_id) + + elif thread_name.startswith('[ET_TASK'): + + # Get the id of this thread and check if it's in range. + task_id = int(thread_name.split(' ')[1][:-1]) + if task_id >= task_threads: + return 7, 'Too many ET_TASK threads created.' + elif task_id in task_check: + return 8, 'ET_TASK thread with duplicate thread id created.' + else: + task_check.add(task_id) + + elif thread_name.startswith('[ET_AIO'): + + # Get the id of this thread and check if it's in range. + aio_id = int(thread_name.split(' ')[1].split(':')[0]) + if aio_id >= aio_threads: + return 10, 'Too many ET_AIO threads created.' + else: + aio_check.add(aio_id) + + # Check the size of the sets, must be equal to the expected size. + if len(etnet_check) != etnet_threads: + return 4, 'Expected ET_NET threads: {0}, found: {1}.'.format(etnet_threads, len(etnet_check)) + elif len(accept_check) != accept_threads: + return 6, 'Expected ACCEPT threads: {0}, found: {1}.'.format(accept_threads, len(accept_check)) + elif len(task_check) != task_threads: + return 9, 'Expected ET_TASK threads: {0}, found: {1}.'.format(task_threads, len(task_check)) + elif len(aio_check) != aio_threads: + return 11, 'Expected ET_AIO threads: {0}, found: {1}.'.format(aio_threads, len(aio_check)) + else: + return 0, '' + + return 1, 'Expected ATS process [TS_MAIN] with cwd {0}, but it was not found.'.format(ts_path) + + +def count_threads( + ts_path, + etnet_threads, + accept_threads, + task_threads, + aio_threads, + wait_seconds=COUNT_THREAD_WAIT_SECONDS, + poll_seconds=COUNT_THREAD_POLL_SECONDS): + deadline = time.monotonic() + wait_seconds + + # Retry on startup/transient states: + # 1 : ATS process not found yet + # 4/6/9/11: expected thread count not reached yet + retry_codes = {1, 4, 6, 9, 11} + + while True: + code, message = _count_threads_once(ts_path, etnet_threads, accept_threads, task_threads, aio_threads) + if code == 0: + return 0 + if code not in retry_codes or time.monotonic() >= deadline: + sys.stderr.write(message + '\n') + return code + time.sleep(poll_seconds) def main(): diff --git a/tests/gold_tests/thread_config/thread_config.test.py b/tests/gold_tests/thread_config/thread_config.test.py index 8adc59814bf..d583ec301fa 100644 --- a/tests/gold_tests/thread_config/thread_config.test.py +++ b/tests/gold_tests/thread_config/thread_config.test.py @@ -20,6 +20,7 @@ Test.Summary = 'Test that Trafficserver starts with different thread configurations.' Test.ContinueOnFail = True +Test.SkipUnless(Condition.IsPlatform("linux")) ts = Test.MakeATSProcess('ts-1_exec-0_accept-1_task-1_aio') ts.Disk.records_config.update(