From ae28287276fc1b63c6c51c3e5cbd26e9768f4ace Mon Sep 17 00:00:00 2001 From: sdc50 Date: Tue, 15 Apr 2025 14:44:35 -0600 Subject: [PATCH 1/6] add get_dir and put_dir methods to async client. --- uit/async_client.py | 74 +++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 74 insertions(+) diff --git a/uit/async_client.py b/uit/async_client.py index da74389..8ac73c8 100644 --- a/uit/async_client.py +++ b/uit/async_client.py @@ -396,6 +396,80 @@ async def get_file(self, remote_path, local_path=None, timeout=30): return local_path + async def _get_dir_stats(self, local_dir, remote_dir, get_dir=False): + local_dir = Path(local_dir) + remote_dir = PurePosixPath(remote_dir) + # get remote dir stats + try: + remote_dir_stats = await self.call(f"find {remote_dir.as_posix()} -type f -printf '%Ts %s %P\n'") + remote_files = {stats[-1]: {'mtime': float(stats[0]), 'size': int(stats[1])} for stats in + [line.split() for line in remote_dir_stats.splitlines()]} + except Exception as e: + logger.debug(e) + remote_files = {} + if get_dir: + return {}, {}, {}, {} + + # get local dir stats + local_dir = Path(local_dir) + if get_dir: + local_dir.mkdir(parents=True, exist_ok=True) + local_files = {p.relative_to(local_dir).as_posix(): p.stat() for p in local_dir.glob('**/*') if p.is_file()} + + # compare + not_local = {} + not_remote = local_files + for name, remote_stats in remote_files.items(): + # get local stats for same file name if it exists + local_stats = local_files.get(name) + + # compare file size and mtime + if not (local_stats and local_stats.st_size == remote_stats['size'] and local_stats.st_mtime == + remote_stats['mtime']): + not_local[name] = remote_stats + else: + not_remote.pop(name) + + return remote_files, local_files, not_remote, not_local + + @_ensure_connected + @robust() + async def put_dir(self, local_dir, remote_dir): + _, local_files, not_remote, __ = await self._get_dir_stats(local_dir, remote_dir) + # TODO ensure that remote dir exists + + async def put_file_with_stats(local_file_path, remote_file_path): + await self.put_file(local_file_path, remote_file_path) + # mtime = local_files[file_name]['mtime'] + # await self.call() # TODO some command to update the mtime on the remote file + + # transfer files that didn't match those on the hpc + async with asyncio.TaskGroup() as tg: + for file_name in not_remote: + remote_file_path = PurePosixPath(remote_dir) / file_name + local_file_path = local_dir / file_name + local_file_path.parent.mkdir(parents=True, exist_ok=True) + logger.info(f"Transferring {remote_file_path} from the HPC to {local_file_path}") + tg.create_task(put_file_with_stats(remote_file_path, local_file_path)) + + @_ensure_connected + @robust() + async def get_dir(self, remote_dir, local_dir): + remote_files, _, __, not_local = await self._get_dir_stats(local_dir, remote_dir, get_dir=True) + + async def get_file_with_stats(remote_file_path, local_file_path): + await self.get_file(remote_file_path, local_file_path) + mtime = remote_files[file_name]['mtime'] + os.utime(local_file_path, (mtime, mtime)) + + # transfer files that didn't match those on the hpc + for file_name in not_local: + remote_file_path = PurePosixPath(remote_dir) / file_name + local_file_path = local_dir / file_name + local_file_path.parent.mkdir(parents=True, exist_ok=True) + logger.info(f"Transferring {remote_file_path} from the HPC to {local_file_path}") + await get_file_with_stats(remote_file_path, local_file_path) + @_ensure_connected @robust() async def list_dir(self, path=None, parse=True, as_df=False, timeout=30): From 42fb8b8603816e9fc8d8fd7488cf7b7688455853 Mon Sep 17 00:00:00 2001 From: sdc50 Date: Thu, 17 Apr 2025 13:12:16 -0600 Subject: [PATCH 2/6] flake8 --- uit/async_client.py | 19 ++++++++++++------- 1 file changed, 12 insertions(+), 7 deletions(-) diff --git a/uit/async_client.py b/uit/async_client.py index 8ac73c8..2d33657 100644 --- a/uit/async_client.py +++ b/uit/async_client.py @@ -27,6 +27,7 @@ encode_pure_posix_path, FG_CYAN, ALL_OFF, + _auth_code, ) from .util import robust, AsyncHpcEnv from .pbs_script import PbsScript @@ -171,7 +172,6 @@ async def get_token(self, auth_code=None): url = urljoin(UIT_API_URL, "token") - global _auth_code self._auth_code = auth_code or _auth_code # check for auth_code @@ -402,8 +402,10 @@ async def _get_dir_stats(self, local_dir, remote_dir, get_dir=False): # get remote dir stats try: remote_dir_stats = await self.call(f"find {remote_dir.as_posix()} -type f -printf '%Ts %s %P\n'") - remote_files = {stats[-1]: {'mtime': float(stats[0]), 'size': int(stats[1])} for stats in - [line.split() for line in remote_dir_stats.splitlines()]} + remote_files = { + stats[-1]: {"mtime": float(stats[0]), "size": int(stats[1])} + for stats in [line.split() for line in remote_dir_stats.splitlines()] + } except Exception as e: logger.debug(e) remote_files = {} @@ -414,7 +416,7 @@ async def _get_dir_stats(self, local_dir, remote_dir, get_dir=False): local_dir = Path(local_dir) if get_dir: local_dir.mkdir(parents=True, exist_ok=True) - local_files = {p.relative_to(local_dir).as_posix(): p.stat() for p in local_dir.glob('**/*') if p.is_file()} + local_files = {p.relative_to(local_dir).as_posix(): p.stat() for p in local_dir.glob("**/*") if p.is_file()} # compare not_local = {} @@ -424,8 +426,11 @@ async def _get_dir_stats(self, local_dir, remote_dir, get_dir=False): local_stats = local_files.get(name) # compare file size and mtime - if not (local_stats and local_stats.st_size == remote_stats['size'] and local_stats.st_mtime == - remote_stats['mtime']): + if not ( + local_stats + and local_stats.st_size == remote_stats["size"] # noqa W503 + and local_stats.st_mtime == remote_stats["mtime"] # noqa W503 + ): not_local[name] = remote_stats else: not_remote.pop(name) @@ -459,7 +464,7 @@ async def get_dir(self, remote_dir, local_dir): async def get_file_with_stats(remote_file_path, local_file_path): await self.get_file(remote_file_path, local_file_path) - mtime = remote_files[file_name]['mtime'] + mtime = remote_files[file_name]["mtime"] os.utime(local_file_path, (mtime, mtime)) # transfer files that didn't match those on the hpc From bcd3b45d01cdd997fed086dddd016c7b73f16612 Mon Sep 17 00:00:00 2001 From: sdc50 Date: Thu, 17 Apr 2025 13:14:20 -0600 Subject: [PATCH 3/6] flake8 --- uit/uit.py | 1 - 1 file changed, 1 deletion(-) diff --git a/uit/uit.py b/uit/uit.py index 41f9e8d..2fcac0b 100644 --- a/uit/uit.py +++ b/uit/uit.py @@ -369,7 +369,6 @@ def get_token(self, auth_code=None): url = urljoin(UIT_API_URL, "token") - global _auth_code self._auth_code = auth_code or _auth_code # check for auth_code From 04160baa764720bbd566b09b9d5ce836eac7f358 Mon Sep 17 00:00:00 2001 From: Mark Lugar <14322382+araglu@users.noreply.github.com> Date: Wed, 30 Apr 2025 14:52:54 -0500 Subject: [PATCH 4/6] Swap variables for put_file_with_stats to match definition --- uit/async_client.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/uit/async_client.py b/uit/async_client.py index 2d33657..9273571 100644 --- a/uit/async_client.py +++ b/uit/async_client.py @@ -455,7 +455,7 @@ async def put_file_with_stats(local_file_path, remote_file_path): local_file_path = local_dir / file_name local_file_path.parent.mkdir(parents=True, exist_ok=True) logger.info(f"Transferring {remote_file_path} from the HPC to {local_file_path}") - tg.create_task(put_file_with_stats(remote_file_path, local_file_path)) + tg.create_task(put_file_with_stats(local_file_path, remote_file_path)) @_ensure_connected @robust() From 945c9a39b806a92d857abe2963f3a8a322f98cbd Mon Sep 17 00:00:00 2001 From: Mark Lugar <14322382+araglu@users.noreply.github.com> Date: Mon, 5 May 2025 16:50:22 -0500 Subject: [PATCH 5/6] Create folders and set timestamps in put_dir - Refactor _with_stats() because they referenced 'file_name' outside the function which caused all files to have the same timestamp - Create directories before uploading files - Set timestamps on remote files after uploading - Switch to integer timestamps because the remote filesystem does not show microseconds - Remove extra log lines for each file - Add info log lines about number of files to transfer - Remove TaskGroup from put_dir to avoid sending too many simultaneous requests --- uit/async_client.py | 49 ++++++++++++++++++++++++++------------------- 1 file changed, 28 insertions(+), 21 deletions(-) diff --git a/uit/async_client.py b/uit/async_client.py index 9273571..a39f7f3 100644 --- a/uit/async_client.py +++ b/uit/async_client.py @@ -8,7 +8,7 @@ import time import uuid from pathlib import PurePosixPath, Path -from urllib.parse import urljoin, urlencode # noqa: F401 +from urllib.parse import urljoin import param import aiohttp @@ -401,9 +401,11 @@ async def _get_dir_stats(self, local_dir, remote_dir, get_dir=False): remote_dir = PurePosixPath(remote_dir) # get remote dir stats try: - remote_dir_stats = await self.call(f"find {remote_dir.as_posix()} -type f -printf '%Ts %s %P\n'") + remote_dir_stats = await self.call(f"find {remote_dir.as_posix()} -type f -printf '%Ts %s %P\n'") + # Remote mtime is always an integer, so this ignores decimals for local and remote mtime + # That avoids re-transferring files with microsecond timestamp differences remote_files = { - stats[-1]: {"mtime": float(stats[0]), "size": int(stats[1])} + stats[-1]: {"mtime": int(stats[0]), "size": int(stats[1])} for stats in [line.split() for line in remote_dir_stats.splitlines()] } except Exception as e: @@ -413,7 +415,6 @@ async def _get_dir_stats(self, local_dir, remote_dir, get_dir=False): return {}, {}, {}, {} # get local dir stats - local_dir = Path(local_dir) if get_dir: local_dir.mkdir(parents=True, exist_ok=True) local_files = {p.relative_to(local_dir).as_posix(): p.stat() for p in local_dir.glob("**/*") if p.is_file()} @@ -441,39 +442,45 @@ async def _get_dir_stats(self, local_dir, remote_dir, get_dir=False): @robust() async def put_dir(self, local_dir, remote_dir): _, local_files, not_remote, __ = await self._get_dir_stats(local_dir, remote_dir) - # TODO ensure that remote dir exists - async def put_file_with_stats(local_file_path, remote_file_path): + async def put_file_with_stats(file_name): + remote_file_path = PurePosixPath(remote_dir) / file_name + local_file_path = Path(local_dir) / file_name await self.put_file(local_file_path, remote_file_path) - # mtime = local_files[file_name]['mtime'] - # await self.call() # TODO some command to update the mtime on the remote file + mtime = int(local_files[file_name].st_mtime) + await self.call(f"touch -d @{mtime} {remote_file_path}") + + remote_dirs_to_create = set() + for file_name in not_remote: + remote_dirs_to_create.add(Path(file_name).parent) + + if remote_dirs_to_create: + logger.info(f"Creating {len(remote_dirs_to_create)} directories before file uploads") + for this_remote_dir in remote_dirs_to_create: + await self.call(f"mkdir -p {PurePosixPath(remote_dir) / this_remote_dir}") # transfer files that didn't match those on the hpc - async with asyncio.TaskGroup() as tg: - for file_name in not_remote: - remote_file_path = PurePosixPath(remote_dir) / file_name - local_file_path = local_dir / file_name - local_file_path.parent.mkdir(parents=True, exist_ok=True) - logger.info(f"Transferring {remote_file_path} from the HPC to {local_file_path}") - tg.create_task(put_file_with_stats(local_file_path, remote_file_path)) + logger.info(f"Uploading {len(not_remote)} files") + for file_name in not_remote: + await put_file_with_stats(file_name) @_ensure_connected @robust() async def get_dir(self, remote_dir, local_dir): remote_files, _, __, not_local = await self._get_dir_stats(local_dir, remote_dir, get_dir=True) - async def get_file_with_stats(remote_file_path, local_file_path): + async def get_file_with_stats(file_name): + remote_file_path = PurePosixPath(remote_dir) / file_name + local_file_path = Path(local_dir) / file_name + local_file_path.parent.mkdir(parents=True, exist_ok=True) await self.get_file(remote_file_path, local_file_path) mtime = remote_files[file_name]["mtime"] os.utime(local_file_path, (mtime, mtime)) # transfer files that didn't match those on the hpc + logger.info(f"Downloading {len(not_local)} files") for file_name in not_local: - remote_file_path = PurePosixPath(remote_dir) / file_name - local_file_path = local_dir / file_name - local_file_path.parent.mkdir(parents=True, exist_ok=True) - logger.info(f"Transferring {remote_file_path} from the HPC to {local_file_path}") - await get_file_with_stats(remote_file_path, local_file_path) + await get_file_with_stats(file_name) @_ensure_connected @robust() From 62ab1d0a3e06cf0b8ecd927481cd975ade1db976 Mon Sep 17 00:00:00 2001 From: Mark Lugar <14322382+araglu@users.noreply.github.com> Date: Mon, 5 May 2025 16:51:19 -0500 Subject: [PATCH 6/6] Hide W503 flake8 warning It conflicts with W504 so we need to pick one, and W503 is what Black formats to. --- tox.ini | 4 +++- uit/async_client.py | 4 ++-- 2 files changed, 5 insertions(+), 3 deletions(-) diff --git a/tox.ini b/tox.ini index 8d875b8..b7a754e 100644 --- a/tox.ini +++ b/tox.ini @@ -1,4 +1,6 @@ [flake8] max-line-length = 120 exclude = .git,build,dist,__pycache__,.eggs,*.egg-info,venv -ignore = E203 +# E203 is for expressions inside a slice, and it conflicts with Black. +# W503 is "line break before binary operator", and fixing that triggers W504. +ignore = E203,W503 diff --git a/uit/async_client.py b/uit/async_client.py index a39f7f3..3328009 100644 --- a/uit/async_client.py +++ b/uit/async_client.py @@ -429,8 +429,8 @@ async def _get_dir_stats(self, local_dir, remote_dir, get_dir=False): # compare file size and mtime if not ( local_stats - and local_stats.st_size == remote_stats["size"] # noqa W503 - and local_stats.st_mtime == remote_stats["mtime"] # noqa W503 + and local_stats.st_size == remote_stats["size"] + and local_stats.st_mtime == remote_stats["mtime"] ): not_local[name] = remote_stats else: