diff --git a/tox.ini b/tox.ini index 8d875b8..b7a754e 100644 --- a/tox.ini +++ b/tox.ini @@ -1,4 +1,6 @@ [flake8] max-line-length = 120 exclude = .git,build,dist,__pycache__,.eggs,*.egg-info,venv -ignore = E203 +# E203 is for expressions inside a slice, and it conflicts with Black. +# W503 is "line break before binary operator", and fixing that triggers W504. +ignore = E203,W503 diff --git a/uit/async_client.py b/uit/async_client.py index da74389..3328009 100644 --- a/uit/async_client.py +++ b/uit/async_client.py @@ -8,7 +8,7 @@ import time import uuid from pathlib import PurePosixPath, Path -from urllib.parse import urljoin, urlencode # noqa: F401 +from urllib.parse import urljoin import param import aiohttp @@ -27,6 +27,7 @@ encode_pure_posix_path, FG_CYAN, ALL_OFF, + _auth_code, ) from .util import robust, AsyncHpcEnv from .pbs_script import PbsScript @@ -171,7 +172,6 @@ async def get_token(self, auth_code=None): url = urljoin(UIT_API_URL, "token") - global _auth_code self._auth_code = auth_code or _auth_code # check for auth_code @@ -396,6 +396,92 @@ async def get_file(self, remote_path, local_path=None, timeout=30): return local_path + async def _get_dir_stats(self, local_dir, remote_dir, get_dir=False): + local_dir = Path(local_dir) + remote_dir = PurePosixPath(remote_dir) + # get remote dir stats + try: + remote_dir_stats = await self.call(f"find {remote_dir.as_posix()} -type f -printf '%Ts %s %P\n'") + # Remote mtime is always an integer, so this ignores decimals for local and remote mtime + # That avoids re-transferring files with microsecond timestamp differences + remote_files = { + stats[-1]: {"mtime": int(stats[0]), "size": int(stats[1])} + for stats in [line.split() for line in remote_dir_stats.splitlines()] + } + except Exception as e: + logger.debug(e) + remote_files = {} + if get_dir: + return {}, {}, {}, {} + + # get local dir stats + if get_dir: + local_dir.mkdir(parents=True, exist_ok=True) + local_files = {p.relative_to(local_dir).as_posix(): p.stat() for p in local_dir.glob("**/*") if p.is_file()} + + # compare + not_local = {} + not_remote = local_files + for name, remote_stats in remote_files.items(): + # get local stats for same file name if it exists + local_stats = local_files.get(name) + + # compare file size and mtime + if not ( + local_stats + and local_stats.st_size == remote_stats["size"] + and local_stats.st_mtime == remote_stats["mtime"] + ): + not_local[name] = remote_stats + else: + not_remote.pop(name) + + return remote_files, local_files, not_remote, not_local + + @_ensure_connected + @robust() + async def put_dir(self, local_dir, remote_dir): + _, local_files, not_remote, __ = await self._get_dir_stats(local_dir, remote_dir) + + async def put_file_with_stats(file_name): + remote_file_path = PurePosixPath(remote_dir) / file_name + local_file_path = Path(local_dir) / file_name + await self.put_file(local_file_path, remote_file_path) + mtime = int(local_files[file_name].st_mtime) + await self.call(f"touch -d @{mtime} {remote_file_path}") + + remote_dirs_to_create = set() + for file_name in not_remote: + remote_dirs_to_create.add(Path(file_name).parent) + + if remote_dirs_to_create: + logger.info(f"Creating {len(remote_dirs_to_create)} directories before file uploads") + for this_remote_dir in remote_dirs_to_create: + await self.call(f"mkdir -p {PurePosixPath(remote_dir) / this_remote_dir}") + + # transfer files that didn't match those on the hpc + logger.info(f"Uploading {len(not_remote)} files") + for file_name in not_remote: + await put_file_with_stats(file_name) + + @_ensure_connected + @robust() + async def get_dir(self, remote_dir, local_dir): + remote_files, _, __, not_local = await self._get_dir_stats(local_dir, remote_dir, get_dir=True) + + async def get_file_with_stats(file_name): + remote_file_path = PurePosixPath(remote_dir) / file_name + local_file_path = Path(local_dir) / file_name + local_file_path.parent.mkdir(parents=True, exist_ok=True) + await self.get_file(remote_file_path, local_file_path) + mtime = remote_files[file_name]["mtime"] + os.utime(local_file_path, (mtime, mtime)) + + # transfer files that didn't match those on the hpc + logger.info(f"Downloading {len(not_local)} files") + for file_name in not_local: + await get_file_with_stats(file_name) + @_ensure_connected @robust() async def list_dir(self, path=None, parse=True, as_df=False, timeout=30): diff --git a/uit/uit.py b/uit/uit.py index 41f9e8d..2fcac0b 100644 --- a/uit/uit.py +++ b/uit/uit.py @@ -369,7 +369,6 @@ def get_token(self, auth_code=None): url = urljoin(UIT_API_URL, "token") - global _auth_code self._auth_code = auth_code or _auth_code # check for auth_code