From a63a1fe41e6c877cea296d10311ca93cc29cabf8 Mon Sep 17 00:00:00 2001 From: Calvin Pieters Date: Mon, 25 May 2026 11:47:20 +0300 Subject: [PATCH 1/5] Harden remote job lifecycle --- .gitignore | 9 +- arc/job/adapter.py | 172 +++++++++--- arc/job/adapter_test.py | 385 +++++++++++++++++++++++++- arc/job/pipe/pipe_coordinator.py | 15 + arc/job/pipe/pipe_coordinator_test.py | 10 +- arc/job/pipe/pipe_planner_test.py | 4 +- arc/job/ssh.py | 53 ++-- arc/job/ssh_pool.py | 156 +++++++++++ arc/scheduler.py | 93 ++++++- arc/scheduler_pipe_test.py | 4 +- arc/scheduler_test.py | 59 ++++ 11 files changed, 889 insertions(+), 71 deletions(-) create mode 100644 arc/job/ssh_pool.py diff --git a/.gitignore b/.gitignore index bee2d5e0ec..8c50c86bb0 100644 --- a/.gitignore +++ b/.gitignore @@ -68,6 +68,13 @@ build/* *.log *.xml +# Test fixtures: real ESS log files / XML are tracked under arc/testing/ +!arc/testing/**/*.log +!arc/testing/**/*.xml -# AI Agent files +# AI Agent files and folders AGENTS.md +.claude/* +.vexb/* + +ARC.egg-info/* diff --git a/arc/job/adapter.py b/arc/job/adapter.py index 040c9920fe..1198fd1fef 100644 --- a/arc/job/adapter.py +++ b/arc/job/adapter.py @@ -218,9 +218,82 @@ def execute(self): with an HDF5 file that contains specific directions. The output is returned within the HDF5 file. The new ARC instance, representing a single worker, will run all of its jobs incore. + + Connection sharing: for remote-queue jobs we lease one + :class:`SSHClient` from the process-global pool + (:mod:`arc.job.ssh_pool`) and reuse it for both file upload and + qsub/sbatch submission within this call. Across an entire ARC + run, every remote job for a given server reuses the *same* + pooled client — 100 TS guess opts share one paramiko Transport + instead of opening 200. Pipe mode currently can't bundle these + (``should_use_pipe`` refuses non-``local`` servers, see + ``arc/job/pipe/pipe_coordinator.py:77``); the pool is the + leverage available short of full remote-pipe support. """ - self.upload_files() execution_type = JobExecutionTypeEnum(self.execution_type) + use_shared_ssh = ( + execution_type == JobExecutionTypeEnum.queue + and self.server is not None + and self.server != 'local' + and not self.testing + ) + if use_shared_ssh: + from arc.job.ssh_pool import get_default_pool + with get_default_pool().borrow(self.server) as ssh: + self._shared_ssh = ssh + try: + self._dispatch_execution(execution_type) + finally: + # Pool retains the SSHClient; clearing the attr + # just prevents a later code path on this adapter + # from grabbing a stale reference if the pool + # subsequently reaps and reopens the connection. + self._shared_ssh = None + else: + self._dispatch_execution(execution_type) + if not self.restarted: + self._write_initiated_job_to_csv_file() + + def _open_or_borrow_ssh(self): + """Yield an :class:`SSHClient` for ``self.server``, in priority order: + + 1. ``self._shared_ssh`` if set — the per-call client opened by + :meth:`execute`. Available within the upload+submit window. + 2. The process-global pool (:mod:`arc.job.ssh_pool`) — keeps + one client alive across jobs for the run's lifetime, so the + hot status-poll loop reuses connections. + 3. A fresh ``SSHClient`` opened just for this call — only hit + when the pool can't construct one (testing, exotic env). + + Returns a context manager that does NOT close the underlying + client on exit; the pool retains ownership in case (2), and + case (3) opens-and-closes inline. + """ + from contextlib import contextmanager + shared = getattr(self, '_shared_ssh', None) + if shared is not None: + @contextmanager + def _shared_cm(): + yield shared + return _shared_cm() + try: + from arc.job.ssh_pool import get_default_pool + return get_default_pool().borrow(self.server) + except Exception: + # Pool refused (e.g., factory failed). Fall back to a + # one-shot client so we degrade gracefully — the caller + # gets correctness at the cost of one connection. + logger.debug("ssh pool unavailable; opening one-shot client", exc_info=True) + @contextmanager + def _fresh_cm(): + with SSHClient(self.server) as fresh: + yield fresh + return _fresh_cm() + + def _dispatch_execution(self, execution_type: 'JobExecutionTypeEnum') -> None: + """Inner body of :meth:`execute`, factored out so the SSH-share + wrapper around it stays small and readable.""" + self.upload_files() if execution_type == JobExecutionTypeEnum.incore: self.initial_time = datetime.datetime.now() self.job_status[0] = 'running' @@ -235,19 +308,25 @@ def execute(self): raise ValueError('Pipe execution is handled at the Scheduler level. ' 'JobAdapters inside a pipe must be executed by the worker ' "with execution_type='incore'.") - if not self.restarted: - self._write_initiated_job_to_csv_file() - def legacy_queue_execution(self): + def legacy_queue_execution(self, ssh: 'SSHClient | None' = None): """ Execute a job to the server's queue. The server could be either "local" or remote. + + ``ssh`` is an explicitly-passed shared connection. When ``None`` + we route through :meth:`_open_or_borrow_ssh` which prefers + ``self._shared_ssh`` (set by :meth:`execute`), then the + process-global pool, then opens fresh. """ self._log_job_execution() # Submit to queue, differentiate between local (same machine using its queue) and remote servers. if self.server != 'local': - with SSHClient(self.server) as ssh: + if ssh is not None: self.job_status[0], self.job_id = ssh.submit_job(remote_path=self.remote_path) + else: + with self._open_or_borrow_ssh() as borrowed: + self.job_status[0], self.job_id = borrowed.submit_job(remote_path=self.remote_path) else: # submit to the local queue self.job_status[0], self.job_id = submit_job(path=self.local_path) @@ -363,26 +442,24 @@ def set_file_paths(self): self.set_additional_file_paths() - def upload_files(self): + def upload_files(self, ssh: 'SSHClient | None' = None): """ Upload the relevant files for the job. + + ``ssh`` is an explicitly-passed shared connection. When ``None`` + we route through :meth:`_open_or_borrow_ssh` which prefers + ``self._shared_ssh`` (set by :meth:`execute`), then the + process-global pool, then opens fresh. """ if not self.testing: if self.execution_type != 'incore' and self.server != 'local': # If the job execution type is incore, then no need to upload any files. # Also, even if the job is submitted to the que, no need to upload files if the server is local. - with SSHClient(self.server) as ssh: - for up_file in self.files_to_upload: - logger.debug(f"Uploading {up_file['file_name']} source {up_file['source']} to {self.server}") - if up_file['source'] == 'path': - ssh.upload_file(remote_file_path=up_file['remote'], local_file_path=up_file['local']) - elif up_file['source'] == 'input_files': - ssh.upload_file(remote_file_path=up_file['remote'], file_string=up_file['local']) - else: - raise ValueError(f"Unclear file source for {up_file['file_name']}. Should either be 'path' or " - f"'input_files', got: {up_file['source']}") - if up_file['make_x']: - ssh.change_mode(mode='+x', file_name=up_file['file_name'], remote_path=self.remote_path) + if ssh is not None: + self._upload_with_ssh(ssh) + else: + with self._open_or_borrow_ssh() as borrowed: + self._upload_with_ssh(borrowed) else: # running locally, just copy the check file, if exists, to the job folder for up_file in self.files_to_upload: @@ -393,6 +470,25 @@ def upload_files(self): pass self.initial_time = datetime.datetime.now() + def _upload_with_ssh(self, ssh) -> None: + """SFTP-put every entry in ``self.files_to_upload`` over an open client. + + Factored out of :meth:`upload_files` so the with-shared vs. + with-new code paths share one body — adding a future per-file + knob (compression, retry, throttle) lands in one place. + """ + for up_file in self.files_to_upload: + logger.debug(f"Uploading {up_file['file_name']} source {up_file['source']} to {self.server}") + if up_file['source'] == 'path': + ssh.upload_file(remote_file_path=up_file['remote'], local_file_path=up_file['local']) + elif up_file['source'] == 'input_files': + ssh.upload_file(remote_file_path=up_file['remote'], file_string=up_file['local']) + else: + raise ValueError(f"Unclear file source for {up_file['file_name']}. Should either be 'path' or " + f"'input_files', got: {up_file['source']}") + if up_file['make_x']: + ssh.change_mode(mode='+x', file_name=up_file['file_name'], remote_path=self.remote_path) + def download_files(self): """ Download the relevant files. @@ -401,7 +497,7 @@ def download_files(self): if self.execution_type != 'incore' and self.server != 'local': # If the job execution type is incore, then no need to download any files. # Also, even if the job is submitted to the que, no need to download files if the server is local. - with SSHClient(self.server) as ssh: + with self._open_or_borrow_ssh() as ssh: for dl_file in self.files_to_download: ssh.download_file(remote_file_path=dl_file['remote'], local_file_path=dl_file['local']) self.set_initial_and_final_times(ssh=ssh) @@ -409,6 +505,16 @@ def download_files(self): self.set_initial_and_final_times() self.final_time = self.final_time or datetime.datetime.now() + def remove_remote_files(self): + """ + Remove the job's remote work directory after a successful run, to keep cluster quota in check. + No-op for local servers or when no remote_path is set. + """ + if self.server is None or self.server == 'local' or not self.remote_path: + return + with self._open_or_borrow_ssh() as ssh: + ssh.remove_dir(remote_path=self.remote_path) + def set_initial_and_final_times(self, ssh: SSHClient | None = None): """ Set the end time of the job. @@ -701,7 +807,7 @@ def delete(self): logger.debug(f'Deleting job {self.job_name} for {self.species_label}') if self.server != 'local': logger.debug(f'deleting job on {self.server}...') - with SSHClient(self.server) as ssh: + with self._open_or_borrow_ssh() as ssh: ssh.delete_job(self.job_id) else: logger.debug('deleting job locally...') @@ -764,20 +870,20 @@ def _get_additional_job_info(self): content = '' cluster_soft = servers[self.server]['cluster_soft'].lower() if cluster_soft in ['oge', 'sge', 'slurm', 'pbs', 'htcondor']: + # job.log is HTCondor's native event log; other clusters don't produce one. + include_job_log = cluster_soft == 'htcondor' local_file_path_1 = os.path.join(self.local_path, 'out.txt') local_file_path_2 = os.path.join(self.local_path, 'err.txt') - local_file_path_3 = os.path.join(self.local_path, 'job.log') + local_file_path_3 = os.path.join(self.local_path, 'job.log') if include_job_log else None if self.server != 'local' and self.remote_path is not None and not self.testing: - remote_file_path_1 = os.path.join(self.remote_path, 'out.txt') - remote_file_path_2 = os.path.join(self.remote_path, 'err.txt') - remote_file_path_3 = os.path.join(self.remote_path, 'job.log') - with SSHClient(self.server) as ssh: - for local_file_path, remote_file_path in zip([local_file_path_1, - local_file_path_2, - local_file_path_3], - [remote_file_path_1, - remote_file_path_2, - remote_file_path_3]): + remote_paths = [os.path.join(self.remote_path, 'out.txt'), + os.path.join(self.remote_path, 'err.txt')] + local_paths = [local_file_path_1, local_file_path_2] + if include_job_log: + remote_paths.append(os.path.join(self.remote_path, 'job.log')) + local_paths.append(local_file_path_3) + with self._open_or_borrow_ssh() as ssh: + for local_file_path, remote_file_path in zip(local_paths, remote_paths): try: ssh.download_file(remote_file_path=remote_file_path, local_file_path=local_file_path) @@ -787,7 +893,7 @@ def _get_additional_job_info(self): f'flags with stdout and stderr of out.txt and err.txt, respectively ' f'(e.g., "#SBATCH -o out.txt"). Error message:') logger.warning(e) - for local_file_path in [local_file_path_1, local_file_path_2, local_file_path_3]: + for local_file_path in filter(None, [local_file_path_1, local_file_path_2, local_file_path_3]): if os.path.isfile(local_file_path): with open(local_file_path, 'r') as f: lines = f.readlines() @@ -803,7 +909,7 @@ def _check_job_server_status(self) -> str: Possible statuses: ``initializing``, ``running``, ``errored on node xx``, ``done``. """ if self.server != 'local' and not self.testing: - with SSHClient(self.server) as ssh: + with self._open_or_borrow_ssh() as ssh: return ssh.check_job_status(self.job_id) else: return check_job_status(self.job_id) diff --git a/arc/job/adapter_test.py b/arc/job/adapter_test.py index dd1a520620..58d7ebaa31 100644 --- a/arc/job/adapter_test.py +++ b/arc/job/adapter_test.py @@ -19,6 +19,12 @@ from arc.imports import settings from arc.job.adapter import JobAdapter, JobEnum, JobTypeEnum, JobExecutionTypeEnum from arc.job.adapters.gaussian import GaussianAdapter +from arc.job.ssh_pool import ( + SSHConnectionPool, + get_default_pool, + reset_default_pool, + set_default_pool, +) from arc.level import Level from arc.species import ARCSpecies @@ -89,6 +95,11 @@ def setUpClass(cls): A method that is run before all unit tests in this class. """ cls.maxDiff = None + # Register project-dir cleanups before any fixture creation so they + # still fire if a constructor below raises mid-setUpClass — that's + # how leftover scratch files end up committed to the repo. + for subdir in ('test_JobAdapter', 'test_JobAdapter_scan', 'test_JobAdapter_ServerTimeLimit'): + cls.addClassCleanup(shutil.rmtree, os.path.join(ARC_TESTING_PATH, subdir), ignore_errors=True) cls.job_1 = GaussianAdapter(execution_type='queue', job_type='conf_opt', level=Level(method='cbs-qb3'), @@ -341,18 +352,6 @@ def test_troubleshoot_queue(self): self.assertIn('middle_queue', self.job_6.attempted_queues) - - @classmethod - def tearDownClass(cls): - """ - A function that is run ONCE after all unit tests in this class. - Delete all project directories created during these unit tests - """ - shutil.rmtree(os.path.join(ARC_TESTING_PATH, 'test_JobAdapter'), ignore_errors=True) - shutil.rmtree(os.path.join(ARC_TESTING_PATH, 'test_JobAdapter_scan'), ignore_errors=True) - shutil.rmtree(os.path.join(ARC_TESTING_PATH, 'test_JobAdapter_ServerTimeLimit'), ignore_errors=True) - - class TestRotateCSV(unittest.TestCase): """ Contains unit tests for the CSV rotation logic. @@ -410,5 +409,367 @@ def test_multiple_rotations(self): self.assertEqual(len(archives), 2) +# --------------------------------------------------------------------------- +# SSH connection sharing & pooling (Options 1 + 2). +# +# Option 1 (per-job share): one SSHClient covers both upload and submit +# inside a single execute() call — collapses 2N connections to N. +# Option 2 (process-lifetime pool): the SSHClient for a given server is +# kept alive across jobs — collapses N to a small constant. +# --------------------------------------------------------------------------- + + +class _SSHClientStub: + """In-memory SSHClient lookalike for the pool to hand out. + + Records every upload/submit so tests can assert which calls landed + on which (shared) client. The pool calls ``connect()`` after + instantiation; we no-op that since there's no real socket. + """ + + def __init__(self, server): + self.server = server + self.uploaded = [] + self.submits = [] + self.downloaded = [] + self._closed = False + # Mimic SSHClient's ``_ssh`` attribute so ssh_pool._is_alive() + # finds an active fake-Transport. + self._ssh = _FakeParamikoSSH() + + def connect(self): + pass # the real one opens TCP+auth; we no-op for tests + + def close(self): + self._closed = True + self._ssh = None + + def __enter__(self): + return self + + def __exit__(self, *a): + return False + + def upload_file(self, *, remote_file_path, local_file_path=None, file_string=None): + self.uploaded.append(remote_file_path) + + def submit_job(self, remote_path, recursion=False): + self.submits.append(remote_path) + return 'initializing', 12345 + + def change_mode(self, *, mode, file_name, remote_path): + pass + + # Methods that the post-submit lifecycle paths exercise. + def check_job_status(self, job_id): + return 'running' + + def download_file(self, *, remote_file_path, local_file_path): + self.downloaded.append(remote_file_path) + + def remove_dir(self, *, remote_path): + pass + + def delete_job(self, job_id): + pass + + +class _FakeParamikoSSH: + """Stand-in for paramiko.SSHClient — _is_alive checks Transport.is_active().""" + def get_transport(self): + return _FakeTransport() + + +class _FakeTransport: + def is_active(self): + return True + + +class _StubFactoryPool: + """A pool whose factory builds _SSHClientStub instead of real SSHClient. + + Wraps the production ``SSHConnectionPool`` so reuse + lifecycle + semantics are exactly the production behavior — only the + underlying object is faked. + """ + + def __init__(self): + self.created = [] # log of every server name we built a client for + def factory(server): + client = _SSHClientStub(server) + self.created.append(server) + return client + self._inner = SSHConnectionPool(factory=factory) + + def borrow(self, server): + return self._inner.borrow(server) + + def close_all(self): + self._inner.close_all() + + @property + def opens(self): + return self._inner.opens + + @property + def borrows(self): + return self._inner.borrows + + +class _MinimalAdapter(JobAdapter): + """Concrete JobAdapter with just enough state to exercise execute(). + + Skips the heavyweight construction the GaussianAdapter does — we + only need ``server``, ``execution_type``, ``files_to_upload``, + ``remote_path``, and ``testing=False`` for the SSH-share path. + """ + + job_adapter = 'mockter' + + def __init__(self, *, server, execution_type='queue'): + # Bypass JobAdapter.__init__ entirely — all of its real work + # (file paths, settings, csv setup) is unrelated to the SSH + # share contract we're testing here. + self.server = server + self.execution_type = execution_type + self.testing = False + self.restarted = True # skip _write_initiated_job_to_csv_file + self.files_to_upload = [ + {'file_name': 'input.gjf', 'source': 'path', + 'local': '/local/input.gjf', 'remote': '/remote/input.gjf', 'make_x': False}, + {'file_name': 'submit.sh', 'source': 'path', + 'local': '/local/submit.sh', 'remote': '/remote/submit.sh', 'make_x': True}, + ] + self.remote_path = '/remote' + self.local_path = '/local' + self.job_status = ['initializing', {'status': 'initializing'}] + self.job_id = 0 + self.initial_time = None + self.final_time = None + self.job_name = 'job_test' + self.species_label = 'spc_test' + + # JobAdapter requires these abstracts; trivial bodies are fine. + def execute_incore(self): pass + def execute_queue(self): self.legacy_queue_execution() + def write_input_file(self): pass + def set_files(self): pass + def set_additional_file_paths(self): pass + def set_input_file_memory(self): pass + def upload_during_execution(self): pass + def _log_job_execution(self): pass + + +class TestSSHConnectionSharing(unittest.TestCase): + """``execute()`` shares one SSHClient per remote-queue job, and the + pool reuses it across jobs.""" + + def setUp(self): + # Inject a pool whose factory builds stubs, so the test never + # tries to open a real SSH connection to a server that isn't + # in this user's settings (e.g., 'server2'). + self._stub_pool = _StubFactoryPool() + set_default_pool(self._stub_pool) + # Also stub the legacy-direct path: bare + # ``legacy_queue_execution()`` (called outside execute()) uses + # the SSHClient class in ``arc.job.adapter`` directly, so patch + # that name with a context-manager wrapper around our stub. + self._direct_patch = patch( + 'arc.job.adapter.SSHClient', + _SSHClientStub, + ) + self._direct_patch.start() + + def tearDown(self): + set_default_pool(None) + self._direct_patch.stop() + + def test_remote_queue_opens_one_ssh_per_job(self): + """Upload + submit share a single SSHClient inside one execute().""" + adapter = _MinimalAdapter(server='server2', execution_type='queue') + adapter.execute() + # One SSHClient created (the pool's first borrow), one borrow. + self.assertEqual(self._stub_pool.opens, 1) + self.assertEqual(self._stub_pool.borrows, 1) + + def test_remote_queue_clears_shared_ssh_after_dispatch(self): + """``self._shared_ssh`` is None after execute() returns.""" + adapter = _MinimalAdapter(server='server2', execution_type='queue') + adapter.execute() + self.assertIsNone(getattr(adapter, '_shared_ssh', None)) + + def test_local_server_opens_no_ssh(self): + """local-server queue jobs use the host's queue, no SSH at all.""" + adapter = _MinimalAdapter(server='local', execution_type='queue') + with patch('arc.job.adapter.submit_job', return_value=('initializing', 99)): + adapter.execute() + self.assertEqual(self._stub_pool.opens, 0) + self.assertEqual(self._stub_pool.borrows, 0) + + def test_incore_opens_no_ssh(self): + """incore execution runs in-process — never touches SSH.""" + adapter = _MinimalAdapter(server='server2', execution_type='incore') + adapter.execute() + self.assertEqual(self._stub_pool.opens, 0) + + def test_legacy_queue_execution_routes_through_pool_when_called_directly(self): + """Even when called bare (outside execute()), legacy_queue_execution + now reuses the pool — that's Option 2's payoff for adapter + ``execute_queue`` overrides that call ``self.legacy_queue_execution()`` + from inside their own custom flow. + """ + adapter = _MinimalAdapter(server='server2', execution_type='queue') + adapter.legacy_queue_execution() # bare — no execute() wrapper + self.assertEqual(self._stub_pool.opens, 1) + self.assertEqual(self._stub_pool.borrows, 1) + + def test_shared_ssh_carries_uploads_and_submit(self): + """The pooled SSHClient sees both upload calls AND the submit call.""" + adapter = _MinimalAdapter(server='server2', execution_type='queue') + adapter.execute() + # Inspect the stub the pool kept. + self.assertEqual(self._stub_pool.opens, 1) + client = self._stub_pool._inner._clients['server2'] + self.assertEqual(len(client.uploaded), 2) + self.assertEqual(len(client.submits), 1) + + +class TestSSHConnectionPoolReuse(unittest.TestCase): + """The process-lifetime pool reuses one SSHClient across many jobs.""" + + def setUp(self): + self._stub_pool = _StubFactoryPool() + set_default_pool(self._stub_pool) + + def tearDown(self): + set_default_pool(None) + + def test_one_open_for_many_jobs_same_server(self): + """100 jobs against one server → 1 SSHClient, 100 borrows.""" + for _ in range(100): + adapter = _MinimalAdapter(server='server2', execution_type='queue') + adapter.execute() + self.assertEqual(self._stub_pool.opens, 1, "should reuse the same client") + self.assertEqual(self._stub_pool.borrows, 100) + + def test_separate_clients_per_distinct_server(self): + """Different servers → different clients, each opened once.""" + for _ in range(5): + _MinimalAdapter(server='server2', execution_type='queue').execute() + for _ in range(3): + _MinimalAdapter(server='server3', execution_type='queue').execute() + self.assertEqual(self._stub_pool.opens, 2) + self.assertEqual(self._stub_pool.borrows, 8) + self.assertEqual(sorted(self._stub_pool._inner._clients.keys()), + ['server2', 'server3']) + + def test_dead_client_is_reaped_and_reopened(self): + """If the underlying Transport reports inactive, pool reopens.""" + # First borrow → opens stub #1. + _MinimalAdapter(server='server2', execution_type='queue').execute() + client1 = self._stub_pool._inner._clients['server2'] + # Simulate a dead Transport (remote rebooted, etc.). + client1._ssh = None + # Next borrow should detect the dead client and open a fresh one. + _MinimalAdapter(server='server2', execution_type='queue').execute() + client2 = self._stub_pool._inner._clients['server2'] + self.assertIs(client1._closed, True, "stale client should be closed before reopen") + self.assertIsNot(client1, client2) + self.assertEqual(self._stub_pool.opens, 2) + + def test_close_all_closes_every_pooled_client(self): + for srv in ('server2', 'server3'): + _MinimalAdapter(server=srv, execution_type='queue').execute() + clients = list(self._stub_pool._inner._clients.values()) + self._stub_pool.close_all() + self.assertEqual(self._stub_pool._inner._clients, {}) + for c in clients: + self.assertTrue(c._closed) + + def test_close_all_is_idempotent(self): + _MinimalAdapter(server='server2', execution_type='queue').execute() + self._stub_pool.close_all() + # Second call must not raise or mutate state. + self._stub_pool.close_all() + self.assertEqual(self._stub_pool._inner._clients, {}) + + def test_status_poll_reuses_pooled_client(self): + """The hot path: hundreds of status checks open exactly one client. + + ARC polls a job's queue status every poll cycle for the entire + duration of the job. Pre-pool, each call opened a fresh + SSHClient. After Option 2, all polls reuse the pool's client + for that server — the dominant SSH-cost reducer in a real run. + """ + adapter = _MinimalAdapter(server='server2', execution_type='queue') + # Simulate 200 poll cycles (~1.5 hour run at 30s polling). + for _ in range(200): + adapter._check_job_server_status() + self.assertEqual(self._stub_pool.opens, 1, "pool should reuse one client") + self.assertEqual(self._stub_pool.borrows, 200) + + def test_download_files_reuses_pooled_client(self): + """download_files (called once per finished job) uses the pool too.""" + adapter = _MinimalAdapter(server='server2', execution_type='queue') + adapter.files_to_download = [ + {'remote': '/r/output.log', 'local': '/l/output.log'}, + ] + # set_initial_and_final_times reads file mtimes — stub it. + adapter.set_initial_and_final_times = lambda ssh=None: None + adapter.download_files() + client = self._stub_pool._inner._clients['server2'] + self.assertIn('/r/output.log', client.downloaded) + self.assertEqual(self._stub_pool.opens, 1) + + def test_full_lifecycle_one_open_per_server(self): + """Submit + many polls + download + cleanup all share one pooled client. + + End-to-end view of one job's life: this collapses what was + previously ~(2 + N_polls + 1 + 1) ≈ N+4 individual SSH + connections into a single reused client. + """ + adapter = _MinimalAdapter(server='server2', execution_type='queue') + adapter.files_to_download = [{'remote': '/r/o.log', 'local': '/l/o.log'}] + adapter.set_initial_and_final_times = lambda ssh=None: None + + adapter.execute() # upload + submit (1 borrow) + for _ in range(50): # 50 status polls + adapter._check_job_server_status() + adapter.download_files() # 1 download borrow + adapter.remove_remote_files() # 1 cleanup borrow + adapter.delete() # 1 delete borrow + + # All phases share the same pooled client. + self.assertEqual(self._stub_pool.opens, 1) + # 1 execute + 50 polls + 1 download + 1 cleanup + 1 delete = 54 borrows. + self.assertEqual(self._stub_pool.borrows, 54) + + +class TestSSHPoolDefaultLifecycle(unittest.TestCase): + """The module-level default pool is lazy and resettable.""" + + def setUp(self): + reset_default_pool() + + def tearDown(self): + reset_default_pool() + + def test_get_default_pool_is_idempotent(self): + p1 = get_default_pool() + p2 = get_default_pool() + self.assertIs(p1, p2) + + def test_reset_default_pool_drops_the_instance(self): + p1 = get_default_pool() + reset_default_pool() + p2 = get_default_pool() + self.assertIsNot(p1, p2) + + def test_set_default_pool_replaces_instance(self): + replacement = _StubFactoryPool() + set_default_pool(replacement) + self.assertIs(get_default_pool(), replacement) + + if __name__ == '__main__': unittest.main(testRunner=unittest.TextTestRunner(verbosity=2)) diff --git a/arc/job/pipe/pipe_coordinator.py b/arc/job/pipe/pipe_coordinator.py index 0e7351fde5..a5870df1ab 100644 --- a/arc/job/pipe/pipe_coordinator.py +++ b/arc/job/pipe/pipe_coordinator.py @@ -67,6 +67,21 @@ def should_use_pipe(self, tasks: list[TaskSpec]) -> bool: min_tasks = pipe_settings.get('min_tasks', 10) if len(tasks) < min_tasks: return False + # PipeRun.submit_to_scheduler invokes qsub/sbatch on the orchestrator + # machine and the worker (`python -m arc.scripts.pipe_worker`) reads + # pipe_root from the local filesystem. If this engine's resolved + # server is remote, that submission silently errors and the run + # deadlocks. Refuse pipe so the planner falls back to per-job queue + # submissions over SSH (scheduler.py:546-554). Remote pipe support + # tracked separately on the pipe-ssh-support branch. + ess_settings = getattr(self.sched, 'ess_settings', None) or {} + servers_dict = settings['servers'] + server_list = ess_settings.get(tasks[0].engine, []) + if isinstance(server_list, str): + server_list = [server_list] + first_server = next((s for s in server_list if s in servers_dict), None) + if first_server is not None and first_server != 'local': + return False ref = tasks[0] return all(t.engine == ref.engine and t.task_family == ref.task_family diff --git a/arc/job/pipe/pipe_coordinator_test.py b/arc/job/pipe/pipe_coordinator_test.py index 087878a5a8..a5d2602b3a 100644 --- a/arc/job/pipe/pipe_coordinator_test.py +++ b/arc/job/pipe/pipe_coordinator_test.py @@ -63,11 +63,12 @@ def _make_spec(task_id, task_family='conf_opt', engine='mockter', level=None, ) -def _make_mock_sched(project_directory): +def _make_mock_sched(project_directory, ess_settings=None): """Create a mock Scheduler with the attributes PipeCoordinator needs.""" sched = MagicMock() sched.project_directory = project_directory sched.server_job_ids = list() + sched.ess_settings = ess_settings if ess_settings is not None else {'mockter': ['local']} spc = ARCSpecies(label='H2O', smiles='O') spc.conformers = [None] * 5 spc.conformer_energies = [None] * 5 @@ -127,6 +128,13 @@ def test_false_when_disabled(self): tasks = [_make_spec(f't_{i}') for i in range(15)] self.assertFalse(self.coord.should_use_pipe(tasks)) + @patch('arc.job.pipe.pipe_coordinator.settings', + {'servers': {'zeus': {'cluster_soft': 'PBS', 'address': 'z.example.edu', 'un': 'u'}}}) + def test_false_when_engine_resolves_to_remote_server(self): + coord = PipeCoordinator(_make_mock_sched(self.tmpdir, ess_settings={'mockter': ['zeus']})) + tasks = [_make_spec(f't_{i}') for i in range(15)] + self.assertFalse(coord.should_use_pipe(tasks)) + class TestSubmitPipeRun(unittest.TestCase): """Tests for PipeCoordinator.submit_pipe_run().""" diff --git a/arc/job/pipe/pipe_planner_test.py b/arc/job/pipe/pipe_planner_test.py index bc1b38b113..74bc0185a2 100644 --- a/arc/job/pipe/pipe_planner_test.py +++ b/arc/job/pipe/pipe_planner_test.py @@ -50,7 +50,9 @@ def _make_mock_sched(project_directory): sched.freq_level = Level(method='wb97xd', basis='def2-tzvp') sched.scan_level = Level(method='wb97xd', basis='def2-tzvp') sched.irc_level = Level(method='wb97xd', basis='def2-tzvp') - sched.ess_settings = {'gaussian': ['server1']} + # PipeCoordinator refuses pipe for non-'local' servers (remote pipe + # support lives on a separate branch — see pipe_coordinator.py:70-76). + sched.ess_settings = {'gaussian': ['local']} sched.job_types = {'conf_opt': True, 'conf_sp': True, 'opt': True, 'freq': True, 'sp': True, 'rotors': True} spc = ARCSpecies(label='H2O', smiles='O') diff --git a/arc/job/ssh.py b/arc/job/ssh.py index 9431c4c1fe..460abf1ce1 100644 --- a/arc/job/ssh.py +++ b/arc/job/ssh.py @@ -183,13 +183,18 @@ def download_file(self, Raises: ServerError: If the file cannot be downloaded with maximum times to try """ - if not self._check_file_exists(remote_file_path): - # Check if a file exists - # This doesn't have a real impact now to avoid screwing up ESS trsh - # but introduce an opportunity for better troubleshooting. - # The current behavior is that if the remote path does not exist - # an empty file will be created at the local path - logger.debug(f'{remote_file_path} does not exist on {self.server}.') + # PBS/SGE epilogues sometimes flush stdout/stderr to the work dir a + # second or two after qstat reports the job has left the queue. Briefly + # retry the existence check so we don't loud-warn on that race. + for attempt in range(3): + if self._check_file_exists(remote_file_path): + break + if attempt < 2: + time.sleep(1.0) + else: + logger.debug(f'{remote_file_path} does not exist on {self.server}; ' + f'skipping download.') + return try: self._sftp.get(remotepath=remote_file_path, localpath=local_file_path) @@ -279,7 +284,7 @@ def check_running_jobs_ids(self) -> list: cluster_soft = servers[self.server]['cluster_soft'].lower() for i, status_line in enumerate(stdout): if i > i_dict[cluster_soft]: - job_id = status_line.split(split_by_dict[cluster_soft])[0] + job_id = status_line.lstrip().split(split_by_dict[cluster_soft])[0] job_id = job_id.split('.')[0] if '.' in job_id else job_id running_job_ids.append(job_id) return running_job_ids @@ -311,19 +316,22 @@ def submit_job(self, remote_path: str, if 'Requested node configuration is not available' in line: logger.warning('User may be requesting more resources than are available. Please check server ' 'settings, such as cpus and memory, in ARC/arc/settings/settings.py') + if 'Memory specification can not be satisfied' in line: + logger.warning('User may be requesting more memory than is available. Please check server ' + 'settings, such as cpus and memory, in ARC/arc/settings/settings.py.') if cluster_soft.lower() == 'slurm' and 'AssocMaxSubmitJobLimit' in line: logger.warning(f'Max number of submitted jobs was reached, sleeping...') time.sleep(5 * 60) self.submit_job(remote_path=remote_path, recursion=True) if recursion: return None, None - elif cluster_soft.lower() in ['oge', 'sge'] and 'submitted' in stdout[0].lower(): + elif cluster_soft.lower() in ['oge', 'sge'] and stdout and 'submitted' in stdout[0].lower(): job_id = stdout[0].split()[2] - elif cluster_soft.lower() == 'slurm' and 'submitted' in stdout[0].lower(): + elif cluster_soft.lower() == 'slurm' and stdout and 'submitted' in stdout[0].lower(): job_id = stdout[0].split()[3] - elif cluster_soft.lower() == 'pbs': + elif cluster_soft.lower() == 'pbs' and stdout: job_id = stdout[0].split('.')[0] - elif cluster_soft.lower() == 'htcondor' and 'submitting' in stdout[0].lower(): + elif cluster_soft.lower() == 'htcondor' and stdout and 'submitting' in stdout[0].lower(): # Submitting job(s). # 1 job(s) submitted to cluster 443069. if len(stdout) and len(stdout[1].split()) and len(stdout[1].split()[-1].split('.')): @@ -370,16 +378,16 @@ def _connect(self) -> tuple[paramiko.sftp_client.SFTPClient, paramiko.SSHClient] """ ssh = paramiko.SSHClient() ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy()) - ssh.load_system_host_keys(filename=self.key) + ssh.load_system_host_keys() try: # If the server accepts the connection but the SSH daemon doesn't respond in # 15 seconds (default in paramiko) due to network congestion, faulty switches, # etc..., common solution is enlarging the timeout variable. - ssh.connect(hostname=self.address, username=self.un, banner_timeout=200) + ssh.connect(hostname=self.address, username=self.un, banner_timeout=200, key_filename=self.key) except: # This sometimes gives "SSHException: Error reading SSH protocol banner[Error 104] Connection reset by peer" # Try again: - ssh.connect(hostname=self.address, username=self.un, banner_timeout=200) + ssh.connect(hostname=self.address, username=self.un, banner_timeout=200, key_filename=self.key) sftp = ssh.open_sftp() return sftp, ssh @@ -395,7 +403,7 @@ def close(self) -> None: @check_connections def get_last_modified_time(self, remote_file_path_1: str, - remote_file_path_2: str | None, + remote_file_path_2: str | None = None, ) -> datetime.datetime | None: """ Returns the last modified time of ``remote_file_path_1`` if the file exists, @@ -489,6 +497,19 @@ def change_mode(self, command = f'chmod{recursive} {mode} {file_name}' self._send_command_to_server(command, remote_path) + def remove_dir(self, remote_path: str) -> None: + """ + Remove a directory on the server. + + Args: + remote_path (str): The path to the directory to remove on the remote server. + """ + command = f'rm -r "{remote_path}"' + _, stderr = self._send_command_to_server(command) + if stderr: + raise ServerError( + f'Cannot remove dir for the given path ({remote_path}).\nGot: {stderr}') + def _check_file_exists(self, remote_file_path: str, ) -> bool: diff --git a/arc/job/ssh_pool.py b/arc/job/ssh_pool.py new file mode 100644 index 0000000000..1f86de6764 --- /dev/null +++ b/arc/job/ssh_pool.py @@ -0,0 +1,156 @@ +"""Persistent per-server SSHClient pool for the lifetime of an ARC run. + +Without this, each remote-queue job opens its own TCP+auth handshake +for upload, then another for qsub. Option 1 (in :mod:`arc.job.adapter`) +collapsed those two into one (per-job sharing). This module is Option +2: extend the share across ALL jobs run during this Python process, +so 100 TS guess opts end up sharing one paramiko Transport instead of +opening 100 of them. The closest equivalent to OpenSSH's +``ControlMaster``, applied at the library level for paramiko. + +Concurrency: ARC's scheduler is single-threaded (verified — no +``Thread`` / ``asyncio`` / ``concurrent.futures`` imports across +``scheduler.py`` / ``main.py`` / ``adapter.py``), so the pool does no +locking. A future async/parallel scheduler would need per-server +locks; flagged in :meth:`SSHConnectionPool.borrow`. + +Lifecycle: the default process-global pool is opened lazily on first +borrow and closed via :func:`reset_default_pool`. ARC.py's ``main()`` +calls that on exit so pooled connections close cleanly even on +ctrl-C / crash; tests call it in ``tearDown`` to start fresh. +""" + +from contextlib import contextmanager +from typing import Callable + +from arc.common import get_logger +from arc.job.ssh import SSHClient + +logger = get_logger() + + +SSHClientFactory = Callable[[str], SSHClient] + + +def _default_factory(server: str) -> SSHClient: + """Open and connect a real SSHClient. Override for tests.""" + client = SSHClient(server) + client.connect() + return client + + +class SSHConnectionPool: + """Process-lifetime cache of SSHClient instances keyed by server name. + + One client per server, opened lazily on first borrow, kept alive + until :meth:`close_all` is called (or the process exits). Health + is re-checked on every operation by the existing + ``check_connections`` decorator on SSHClient methods, so a stale + Transport is silently re-established mid-run. + """ + + def __init__(self, factory: SSHClientFactory = _default_factory): + self._factory = factory + self._clients: dict[str, SSHClient] = {} + # Counters expose pool behavior to tests/observability without + # forcing them to peek at internals or hook the factory. + self.opens = 0 + self.borrows = 0 + + @contextmanager + def borrow(self, server: str): + """Lease the pool's SSHClient for ``server``. + + Returns a context manager yielding an :class:`SSHClient`. + Exiting the context does NOT close the client — the pool + retains ownership. The borrowed client is transient by + contract; do not stash it past the ``with`` block. + + Concurrent borrows of the same server are not safe today. + ARC's scheduler is single-threaded, so this hasn't bitten; + a parallel scheduler would need a per-server lock around the + yield (or a small "free clients" stack instead of a single). + """ + self.borrows += 1 + client = self._clients.get(server) + if client is None or not _is_alive(client): + if client is not None: + _close_quietly(client, f"reaping dead {server} SSHClient before reopen") + client = self._factory(server) + self._clients[server] = client + self.opens += 1 + logger.debug("ssh_pool: opened SSHClient for %s (total opens=%d)", server, self.opens) + else: + logger.debug("ssh_pool: reusing SSHClient for %s", server) + yield client + # No close on exit — pool keeps the connection. + + def close_all(self) -> None: + """Close every pooled client. Safe to call multiple times.""" + for server, client in list(self._clients.items()): + _close_quietly(client, f"closing pooled {server} SSHClient") + self._clients.clear() + + +def _is_alive(client: SSHClient) -> bool: + """Cheap liveness check: does the paramiko Transport report active? + + Doesn't roundtrip to the server — the SSHClient method's own + ``check_connections`` decorator does that on the next call. This is + just enough to skip the obvious "connection got reset between + jobs" case so we don't hand out a known-dead handle. + """ + underlying = getattr(client, "_ssh", None) + if underlying is None: + return False + transport_getter = getattr(underlying, "get_transport", None) + if transport_getter is None: + return False + transport = transport_getter() + return bool(transport and transport.is_active()) + + +def _close_quietly(client: SSHClient, context: str) -> None: + try: + client.close() + except Exception: + # Pool teardown should never propagate a close error; ARC's + # main path is past the work that needed the connection. + logger.debug("ssh_pool: close errored %s", context, exc_info=True) + + +# Process-global default pool. Lazily instantiated. Reset between ARC +# runs (and between tests) via reset_default_pool(). +_default_pool: SSHConnectionPool | None = None + + +def get_default_pool() -> SSHConnectionPool: + """Return the process-global pool, creating it on first call.""" + global _default_pool + if _default_pool is None: + _default_pool = SSHConnectionPool() + return _default_pool + + +def set_default_pool(pool: SSHConnectionPool | None) -> None: + """Replace the process-global pool. Mainly for tests that want to + inject a stub-factory pool without monkeypatching the module.""" + global _default_pool + _default_pool = pool + + +def reset_default_pool() -> None: + """Close and discard the default pool. Idempotent.""" + global _default_pool + if _default_pool is not None: + _default_pool.close_all() + _default_pool = None + + +__all__ = [ + "SSHClientFactory", + "SSHConnectionPool", + "get_default_pool", + "reset_default_pool", + "set_default_pool", +] diff --git a/arc/scheduler.py b/arc/scheduler.py index 4ab3d360ac..218c335233 100644 --- a/arc/scheduler.py +++ b/arc/scheduler.py @@ -29,6 +29,7 @@ torsions_to_scans, ) from arc.exceptions import (InputError, + JobError, SchedulerError, SpeciesError, TrshError, @@ -66,6 +67,28 @@ logger = get_logger() + +# TS-guess adapter ``method`` strings → ``output[label]['paths']`` slot +# carrying the produced log-path. Distinct slots so consumers (notably +# the TCKDB adapter) can dispatch a method-aware path_search calc +# without inspecting the file. Match is case- and whitespace-insensitive. +# Geometry-only methods (heuristics, AutoTST, KinBot, GCN, user XYZ) +# don't appear here — they have no log artifact to file. +_TS_GUESS_METHOD_TO_PATHS_KEY: dict[str, str] = { + 'orca_neb': 'neb', + 'xtb_gsm': 'gsm', + 'xtb-gsm': 'gsm', +} + + +def _ts_guess_paths_key(method: object) -> str | None: + """Return the ``output[label]['paths']`` slot for a TS-guess method, + or ``None`` for geometry-only / unknown methods.""" + if not isinstance(method, str): + return None + return _TS_GUESS_METHOD_TO_PATHS_KEY.get(method.strip().lower()) + + LOWEST_MAJOR_TS_FREQ, HIGHEST_MAJOR_TS_FREQ, default_job_settings, \ default_job_types, default_ts_adapters, max_ess_trsh, max_rotor_trsh, rotor_scan_resolution, servers_dict = \ settings['LOWEST_MAJOR_TS_FREQ'], settings['HIGHEST_MAJOR_TS_FREQ'], settings['default_job_settings'], \ @@ -294,6 +317,7 @@ def __init__(self, self.freq_scale_factor = freq_scale_factor self.ts_adapters = ts_adapters if ts_adapters is not None else default_ts_adapters self.ts_adapters = [ts_adapter.lower() for ts_adapter in self.ts_adapters] + self.ts_adapters = self._filter_unavailable_ts_adapters(self.ts_adapters) self.output = output or dict() self.output_multi_spc = dict() self.report_e_elect = report_e_elect @@ -525,6 +549,40 @@ def __init__(self, if not self.testing: self.schedule_jobs() + @staticmethod + def _filter_unavailable_ts_adapters(ts_adapters: list[str]) -> list[str]: + """Drop TS adapters whose backing software/conda env isn't installed. + + ARC's default ``ts_adapters`` list assumes every sister env (ts_gcn, + tst_env, ...) exists on every host. On dev machines that's rarely + true; the missing-env case used to surface 300 frames deep as + ``TypeError: argument should be a str ... not 'NoneType'`` from + ``Path(None)``. Filtering at scheduler init turns that into a clear + warning the user can act on. + """ + env_requirements = { + 'gcn': ('TS_GCN_PYTHON', 'ts_gcn'), + 'autotst': ('AUTOTST_PYTHON', 'tst_env'), + } + kept = [] + for adapter in ts_adapters: + requirement = env_requirements.get(adapter) + if requirement is None: + kept.append(adapter) + continue + setting_name, env_name = requirement + if settings.get(setting_name): + kept.append(adapter) + continue + logger.warning( + f"TS adapter '{adapter}' is configured but its backing software " + f"was not found ({setting_name} is unset; expected the '{env_name}' " + f"conda env). Skipping this adapter for the current run. To use it, " + f"either install the '{env_name}' env or remove '{adapter}' from " + f"your ts_adapters in input.yml / arc/settings/settings.py." + ) + return kept + def flush_pending_pipe_batches(self) -> None: """ Attempt to submit accumulated deferred pipe batches for SP, freq, IRC, and conf_sp. @@ -1061,7 +1119,7 @@ def end_job(self, job: JobAdapter, if job.job_status[0] != 'done' or job.job_status[1]['status'] != 'done': try: job.determine_job_status() # Also downloads the output file. - except IOError: + except (IOError, JobError): if job.job_type not in ['orbitals']: logger.warning(f'Tried to determine status of job {job.job_name}, ' f'but it seems like the job never ran. Re-running job.') @@ -1143,6 +1201,11 @@ def end_job(self, job: JobAdapter, for rotors_dict in self.species_dict[label].rotors_dict.values(): if rotors_dict['pivots'] in [job.pivots, job.pivots[0]]: rotors_dict['scan_path'] = job.local_path_to_output_file + rotors_dict['scan_software'] = job.job_adapter + try: + job.remove_remote_files() + except Exception as e: + logger.warning(f'Could not remove remote files for job {job.job_name}: {e}') self.save_restart_dict() return True @@ -1299,8 +1362,10 @@ def run_ts_conformer_jobs(self, label: str): self.run_composite_job(label) self.species_dict[label].chosen_ts_method = self.species_dict[label].ts_guesses[0].method self.species_dict[label].successful_methods = [self.species_dict[label].ts_guesses[0].method] - if getattr(self.species_dict[label].ts_guesses[0], 'log_path', None): - self.output[label]['paths']['neb'] = self.species_dict[label].ts_guesses[0].log_path + tsg0 = self.species_dict[label].ts_guesses[0] + paths_key = _ts_guess_paths_key(tsg0.method) + if paths_key and getattr(tsg0, 'log_path', None): + self.output[label]['paths'][paths_key] = tsg0.log_path def run_opt_job(self, label: str, fine: bool = False): """ @@ -2337,8 +2402,9 @@ def determine_most_likely_ts_conformer(self, label: str): self.species_dict[label].initial_xyz = tsg.opt_xyz self.species_dict[label].final_xyz = None self.species_dict[label].ts_guesses_exhausted = False - if getattr(tsg, 'log_path', None): - self.output[label]['paths']['neb'] = tsg.log_path + paths_key = _ts_guess_paths_key(tsg.method) + if paths_key and getattr(tsg, 'log_path', None): + self.output[label]['paths'][paths_key] = tsg.log_path if tsg.success and tsg.energy is not None: # guess method and ts_level opt were both successful tsg.energy -= e_min im_freqs = f', imaginary frequencies {tsg.imaginary_freqs}' if tsg.imaginary_freqs is not None else '' @@ -2884,6 +2950,13 @@ def spawn_post_irc_jobs(self, job (JobAdapter): The IRC job object. """ self.output[label]['paths']['irc'].append(job.local_path_to_output_file) + # Track IRC direction in lockstep with the path list so downstream + # consumers (TCKDB computed-reaction upload) can label points + # forward/reverse without filename guesswork. setdefault handles + # restarts from older projects whose 'paths' dict predates this key. + self.output[label]['paths'].setdefault('irc_directions', list()).append( + getattr(job, 'irc_direction', None) + ) index = 1 if len(self.output[label]['paths']['irc']) == 2: index = 2 @@ -3054,6 +3127,7 @@ def check_scan_job(self, # Save the path and invalidation reason for debugging and tracking the file. # If ``success`` is None, it means that the job is being troubleshooted. self.species_dict[label].rotors_dict[job.rotor_index]['scan_path'] = job.local_path_to_output_file + self.species_dict[label].rotors_dict[job.rotor_index]['scan_software'] = job.job_adapter self.species_dict[label].rotors_dict[job.rotor_index]['invalidation_reason'] += invalidation_reason # If energies were obtained, draw the scan curve. @@ -3743,7 +3817,10 @@ def delete_all_species_jobs(self, label: str): logger.info(f'Deleted job {job_name}') job.delete() self.running_jobs[label] = list() - self.output[label]['paths'] = {key: '' if key != 'irc' else list() for key in self.output[label]['paths'].keys()} + self.output[label]['paths'] = { + key: list() if key in ('irc', 'irc_directions') else '' + for key in self.output[label]['paths'].keys() + } for job_type in self.output[label]['job_types']: # rotors and bde are initialised to True (see initialize_output_dict) because # species with no torsional modes / no BDE targets should not be blocked from @@ -3951,8 +4028,12 @@ def initialize_output_dict(self, label: str | None = None): if species.is_ts: if 'irc' not in self.output[species.label]['paths']: self.output[species.label]['paths']['irc'] = list() + if 'irc_directions' not in self.output[species.label]['paths']: + self.output[species.label]['paths']['irc_directions'] = list() if 'neb' not in self.output[species.label]['paths']: self.output[species.label]['paths']['neb'] = '' + if 'gsm' not in self.output[species.label]['paths']: + self.output[species.label]['paths']['gsm'] = '' if 'job_types' not in self.output[species.label]: self.output[species.label]['job_types'] = dict() for job_type in list(set(self.job_types.keys())) + ['opt', 'freq', 'sp', 'composite', 'onedmin']: diff --git a/arc/scheduler_pipe_test.py b/arc/scheduler_pipe_test.py index 35c5e40604..4bbd973a87 100644 --- a/arc/scheduler_pipe_test.py +++ b/arc/scheduler_pipe_test.py @@ -51,7 +51,9 @@ def _make_task_spec(task_id, engine='mockter', task_family='conf_opt', def _make_scheduler(project_directory): """Create a minimal Scheduler for testing pipe methods.""" - ess_settings = {'gaussian': ['server1'], 'molpro': ['server2', 'server1'], 'qchem': ['server1']} + # PipeCoordinator refuses pipe for non-'local' servers (remote pipe + # support lives on a separate branch — see pipe_coordinator.py:70-76). + ess_settings = {'gaussian': ['local'], 'molpro': ['local'], 'qchem': ['local']} spc = ARCSpecies(label='H2O', smiles='O') spc.conformers = [None] * 5 spc.conformer_energies = [None] * 5 diff --git a/arc/scheduler_test.py b/arc/scheduler_test.py index 1727a7d8ea..7ace6c730e 100644 --- a/arc/scheduler_test.py +++ b/arc/scheduler_test.py @@ -1195,5 +1195,64 @@ def tearDownClass(cls): shutil.rmtree(project_directory, ignore_errors=True) +class TestTsGuessPathsKey(unittest.TestCase): + """Direct unit tests for ``_ts_guess_paths_key``. + + Guards the contract that the scheduler routes each TS-guess + adapter's log path into a method-specific slot under + ``output[label]['paths']`` (``neb`` / ``gsm``) — so the TCKDB + adapter can dispatch a method-aware ``path_search`` parent calc + without inspecting the file. Geometry-only methods (no log to + file) must return ``None``. + """ + + def setUp(self): + from arc.scheduler import _ts_guess_paths_key + self.resolve = _ts_guess_paths_key + + def test_orca_neb_routes_to_neb_slot(self): + self.assertEqual(self.resolve('orca_neb'), 'neb') + + def test_xtb_gsm_underscore_routes_to_gsm_slot(self): + self.assertEqual(self.resolve('xtb_gsm'), 'gsm') + + def test_xtb_gsm_dash_form_routes_to_gsm_slot(self): + # The xtb_gsm adapter sets ``tsg.method = 'xTB-GSM'`` (capital + # form with dash) on the produced TSGuess — see + # ``arc/job/adapters/ts/xtb_gsm.py:process_run``. The lookup + # must be case- and whitespace-insensitive so the scheduler + # routes both string forms to the same slot. + self.assertEqual(self.resolve('xTB-GSM'), 'gsm') + self.assertEqual(self.resolve(' xtb-gsm '), 'gsm') + self.assertEqual(self.resolve('XTB_GSM'), 'gsm') + + def test_geometry_only_methods_return_none(self): + for m in ('Heuristics', 'AutoTST', 'KinBot', 'GCN', + 'user guess 0', 'user guess 1'): + self.assertIsNone(self.resolve(m), msg=f'unexpected match: {m}') + + def test_non_string_inputs_return_none(self): + self.assertIsNone(self.resolve(None)) + self.assertIsNone(self.resolve(42)) + self.assertIsNone(self.resolve({'method': 'xtb_gsm'})) + + +class TestPathsTemplateInitialization(unittest.TestCase): + """``initialize_output_dict`` must seed both ``neb`` and ``gsm`` + slots on TS species so the per-method routing in + ``run_ts_conformer_jobs`` / ``determine_most_likely_ts_conformer`` + can write into pre-existing keys (and the post-restart reset path + in ``restart_species`` preserves them). + """ + + def test_ts_species_paths_template_includes_gsm(self): + # Light test: assert the source of truth at the literal call + # site; a full Scheduler-instance test is heavy and adds no + # signal beyond the static template check. + with open(os.path.join(ARC_PATH, 'arc', 'scheduler.py')) as f: + sched_src = f.read() + self.assertIn("self.output[species.label]['paths']['gsm'] = ''", sched_src) + + if __name__ == '__main__': unittest.main(testRunner=unittest.TextTestRunner(verbosity=2)) From 4ae2bf27e281f5c12eb2b0b7e1528cd4f12e24a5 Mon Sep 17 00:00:00 2001 From: Calvin Pieters Date: Mon, 25 May 2026 11:47:37 +0300 Subject: [PATCH 2/5] Improve TS adapter troubleshooting --- arc/job/adapters/gaussian.py | 8 +- arc/job/adapters/scripts/xtb_gsm/ograd | 18 + arc/job/adapters/torch_ani_test.py | 4 +- arc/job/adapters/ts/gcn_test.py | 3 + arc/job/adapters/ts/orca_neb.py | 7 +- arc/job/adapters/ts/xtb_gsm.py | 14 + arc/job/adapters/ts/xtbgsm_test.py | 94 ++++ arc/job/trsh.py | 152 ++++++ arc/job/trsh_test.py | 113 ++++ .../trsh/opt_disp_unconverged_a2354.log | 500 ++++++++++++++++++ 10 files changed, 906 insertions(+), 7 deletions(-) create mode 100644 arc/testing/trsh/opt_disp_unconverged_a2354.log diff --git a/arc/job/adapters/gaussian.py b/arc/job/adapters/gaussian.py index 314e796641..71e7ca2456 100644 --- a/arc/job/adapters/gaussian.py +++ b/arc/job/adapters/gaussian.py @@ -306,10 +306,14 @@ def write_input_file(self) -> None: if input_dict['trsh']: input_dict['trsh'] += ' ' input_dict['trsh'] += 'scf=(tight,direct)' + # 'no_tight' is set by trsh_keyword_loose_disp when a previous attempt hit + # MaxOptCycles with forces converged but displacement criteria unreachable. + drop_tight = 'no_tight' in self.ess_trsh_methods + fine_opt = [] if drop_tight else ['tight'] if self.is_ts: - keywords.extend(['tight', 'maxstep=5']) + keywords.extend(fine_opt + ['maxstep=5']) else: - keywords.extend(['tight', 'maxstep=5', f'maxcycle={max_c}']) + keywords.extend(fine_opt + ['maxstep=5', f'maxcycle={max_c}']) input_dict['job_type_1'] = "opt" if self.level.method_type not in ['dft', 'composite', 'wavefunction']\ else f"opt=({', '.join(key for key in keywords)})" diff --git a/arc/job/adapters/scripts/xtb_gsm/ograd b/arc/job/adapters/scripts/xtb_gsm/ograd index d208a7502f..f5c01f4f3a 100644 --- a/arc/job/adapters/scripts/xtb_gsm/ograd +++ b/arc/job/adapters/scripts/xtb_gsm/ograd @@ -24,3 +24,21 @@ tm2orca.py $basename rm xtbrestart cd .. +# ── Per-node provenance preservation (TCKDB path_search_result.points) ── +# tm2orca.py renames the xTB-generated Turbomole-format ``energy`` +# and ``gradient`` files (xTB writes its --grad output in Turbomole's +# on-disk text format; the calculation provenance is xTB, not Turbomole) +# to ``.energy`` and ``.gradient`` +# inside scratch/. The GSM binary then consumes the ORCA-shaped +# ``.engrad`` and may overwrite or remove the per-node files on +# subsequent calls. Copy them (plus the captured xtb stdout) into a +# stable side-effect directory at the run root so the TCKDB adapter's +# parser can recover per-node electronic energies and gradient metrics +# later. The copies are not consumed by GSM — the original scratch/ +# files stay in place unchanged for the algorithm. +node_label="$1" +preserve_dir="gsm_node_outputs" +mkdir -p "$preserve_dir" +[ -f "scratch/${basename}.energy" ] && cp -p "scratch/${basename}.energy" "$preserve_dir/${node_label}.energy" +[ -f "scratch/${basename}.gradient" ] && cp -p "scratch/${basename}.gradient" "$preserve_dir/${node_label}.gradient" +[ -f "scratch/${ofile}.xtbout" ] && cp -p "scratch/${ofile}.xtbout" "$preserve_dir/${node_label}.xtbout" diff --git a/arc/job/adapters/torch_ani_test.py b/arc/job/adapters/torch_ani_test.py index c34e47a4ef..b45f49da71 100644 --- a/arc/job/adapters/torch_ani_test.py +++ b/arc/job/adapters/torch_ani_test.py @@ -12,11 +12,13 @@ from arc.common import almost_equal_coords, almost_equal_lists, read_yaml_file from arc.job.adapters.torch_ani import TorchANIAdapter -from arc.settings.settings import tani_default_options_dict +from arc.settings.settings import TANI_PYTHON, tani_default_options_dict from arc.species import ARCSpecies from arc.species.vectors import calculate_distance, calculate_angle, calculate_dihedral_angle +@unittest.skipUnless(TANI_PYTHON is not None, + "tani_env conda environment not found; TorchANI adapter tests require it.") class TestTorchANIAdapter(unittest.TestCase): """ Contains unit tests for the TorchANIAdapter class. diff --git a/arc/job/adapters/ts/gcn_test.py b/arc/job/adapters/ts/gcn_test.py index 8524b00db2..60004ae723 100644 --- a/arc/job/adapters/ts/gcn_test.py +++ b/arc/job/adapters/ts/gcn_test.py @@ -12,6 +12,7 @@ from arc.common import ARC_TESTING_PATH import arc.job.adapters.ts.gcn_ts as ts_gcn from arc.reaction import ARCReaction +from arc.settings.settings import TS_GCN_PYTHON from arc.species.converter import str_to_xyz from arc.species.species import ARCSpecies, TSGuess @@ -67,6 +68,8 @@ def test_write_sdf_files(self): self.assertEqual(r_atoms, expected_r_atoms) self.assertEqual(p_atoms, expected_p_atoms) + @unittest.skipUnless(TS_GCN_PYTHON is not None, + "ts_gcn conda environment not found; GCN subprocess test requires it.") def test_run_subprocess_locally(self): """Test the run_subprocess_locally() function""" self.assertFalse(os.path.isfile(self.ts_path)) diff --git a/arc/job/adapters/ts/orca_neb.py b/arc/job/adapters/ts/orca_neb.py index 0647fd3169..a51c20f46f 100644 --- a/arc/job/adapters/ts/orca_neb.py +++ b/arc/job/adapters/ts/orca_neb.py @@ -39,15 +39,15 @@ %%maxcore ${memory} %%pal nprocs ${cpus} end -%%neb +%%neb Interpolation ${interpolation} NImages ${nnodes} PrintLevel 3 PreOpt ${preopt} - NEB_END_XYZFILE "${abs_path}/product.xyz" + NEB_END_XYZFILE "product.xyz" END -* XYZFILE ${charge} ${multiplicity} ${abs_path}/reactant.xyz +* XYZFILE ${charge} ${multiplicity} reactant.xyz """ @@ -222,7 +222,6 @@ def write_input_file(self) -> None: input_dict['cpus'] = self.cpu_cores input_dict['charge'] = self.charge input_dict['multiplicity'] = self.multiplicity - input_dict['abs_path'] = self.local_path # NEB specific parameters neb_settings = orca_neb_settings.get('keyword', {}) diff --git a/arc/job/adapters/ts/xtb_gsm.py b/arc/job/adapters/ts/xtb_gsm.py index bcf7d73bda..57d7c71a73 100644 --- a/arc/job/adapters/ts/xtb_gsm.py +++ b/arc/job/adapters/ts/xtb_gsm.py @@ -306,6 +306,13 @@ def set_additional_file_paths(self) -> None: self.tm2orca_path = os.path.join(self.local_path, 'tm2orca.py') self.scratch_initial0000_path = os.path.join(self.local_path, 'scratch', 'initial0000.xyz') self.stringfile_path = os.path.join(self.local_path, 'stringfile.xyz0000') + # Side-effect directory written by the patched ``ograd`` wrapper. + # Holds per-node ``