Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion tox.ini
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
[flake8]
max-line-length = 120
exclude = .git,build,dist,__pycache__,.eggs,*.egg-info,venv
ignore = E203
# E203 is for expressions inside a slice, and it conflicts with Black.
# W503 is "line break before binary operator", and fixing that triggers W504.
ignore = E203,W503
90 changes: 88 additions & 2 deletions uit/async_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
import time
import uuid
from pathlib import PurePosixPath, Path
from urllib.parse import urljoin, urlencode # noqa: F401
from urllib.parse import urljoin

import param
import aiohttp
Expand All @@ -27,6 +27,7 @@
encode_pure_posix_path,
FG_CYAN,
ALL_OFF,
_auth_code,
)
from .util import robust, AsyncHpcEnv
from .pbs_script import PbsScript
Expand Down Expand Up @@ -171,7 +172,6 @@ async def get_token(self, auth_code=None):

url = urljoin(UIT_API_URL, "token")

global _auth_code
self._auth_code = auth_code or _auth_code

# check for auth_code
Expand Down Expand Up @@ -396,6 +396,92 @@ async def get_file(self, remote_path, local_path=None, timeout=30):

return local_path

async def _get_dir_stats(self, local_dir, remote_dir, get_dir=False):
local_dir = Path(local_dir)
remote_dir = PurePosixPath(remote_dir)
# get remote dir stats
try:
remote_dir_stats = await self.call(f"find {remote_dir.as_posix()} -type f -printf '%Ts %s %P\n'")
# Remote mtime is always an integer, so this ignores decimals for local and remote mtime
# That avoids re-transferring files with microsecond timestamp differences
remote_files = {
stats[-1]: {"mtime": int(stats[0]), "size": int(stats[1])}
for stats in [line.split() for line in remote_dir_stats.splitlines()]
}
except Exception as e:
logger.debug(e)
remote_files = {}
if get_dir:
return {}, {}, {}, {}

# get local dir stats
if get_dir:
local_dir.mkdir(parents=True, exist_ok=True)
local_files = {p.relative_to(local_dir).as_posix(): p.stat() for p in local_dir.glob("**/*") if p.is_file()}

# compare
not_local = {}
not_remote = local_files
for name, remote_stats in remote_files.items():
# get local stats for same file name if it exists
local_stats = local_files.get(name)

# compare file size and mtime
if not (
local_stats
and local_stats.st_size == remote_stats["size"]
and local_stats.st_mtime == remote_stats["mtime"]
):
not_local[name] = remote_stats
else:
not_remote.pop(name)

return remote_files, local_files, not_remote, not_local

@_ensure_connected
@robust()
async def put_dir(self, local_dir, remote_dir):
_, local_files, not_remote, __ = await self._get_dir_stats(local_dir, remote_dir)

async def put_file_with_stats(file_name):
remote_file_path = PurePosixPath(remote_dir) / file_name
local_file_path = Path(local_dir) / file_name
await self.put_file(local_file_path, remote_file_path)
mtime = int(local_files[file_name].st_mtime)
await self.call(f"touch -d @{mtime} {remote_file_path}")

remote_dirs_to_create = set()
for file_name in not_remote:
remote_dirs_to_create.add(Path(file_name).parent)

if remote_dirs_to_create:
logger.info(f"Creating {len(remote_dirs_to_create)} directories before file uploads")
for this_remote_dir in remote_dirs_to_create:
await self.call(f"mkdir -p {PurePosixPath(remote_dir) / this_remote_dir}")

# transfer files that didn't match those on the hpc
logger.info(f"Uploading {len(not_remote)} files")
for file_name in not_remote:
await put_file_with_stats(file_name)

@_ensure_connected
@robust()
async def get_dir(self, remote_dir, local_dir):
remote_files, _, __, not_local = await self._get_dir_stats(local_dir, remote_dir, get_dir=True)

async def get_file_with_stats(file_name):
remote_file_path = PurePosixPath(remote_dir) / file_name
local_file_path = Path(local_dir) / file_name
local_file_path.parent.mkdir(parents=True, exist_ok=True)
await self.get_file(remote_file_path, local_file_path)
mtime = remote_files[file_name]["mtime"]
os.utime(local_file_path, (mtime, mtime))

# transfer files that didn't match those on the hpc
logger.info(f"Downloading {len(not_local)} files")
for file_name in not_local:
await get_file_with_stats(file_name)

@_ensure_connected
@robust()
async def list_dir(self, path=None, parse=True, as_df=False, timeout=30):
Expand Down
1 change: 0 additions & 1 deletion uit/uit.py
Original file line number Diff line number Diff line change
Expand Up @@ -369,7 +369,6 @@ def get_token(self, auth_code=None):

url = urljoin(UIT_API_URL, "token")

global _auth_code
self._auth_code = auth_code or _auth_code

# check for auth_code
Expand Down