From acfd73be534364a46616cb4191a3c30d6613b314 Mon Sep 17 00:00:00 2001 From: Nikita Mashchenko Date: Tue, 25 Feb 2025 19:04:55 -0600 Subject: [PATCH] fix: use latest folders locally and on s3 --- GEMstack/offboard/log_management/s3.py | 107 +++++++++++++++++++------ 1 file changed, 84 insertions(+), 23 deletions(-) diff --git a/GEMstack/offboard/log_management/s3.py b/GEMstack/offboard/log_management/s3.py index 1a2550d00..b8f2ea4c8 100644 --- a/GEMstack/offboard/log_management/s3.py +++ b/GEMstack/offboard/log_management/s3.py @@ -1,6 +1,9 @@ #!/usr/bin/env python3 """ -This client interacts with S3 to upload (push) or download (pull) a specific folder. +This client interacts with S3 to upload (push) or download (pull) the latest folder. +For push, the latest folder in the local base directory (default "logs") +is determined based on its timestamp (folder name in "YYYY-MM-DD_HH-MM-SS" format). +For pull, the latest folder under the S3 prefix is selected. Before running this client make sure you have defined .env in root with following values: AWS_ACCESS_KEY_ID=example @@ -12,17 +15,15 @@ Example Usage: - # Push a folder named "2025-02-12_15-30-00" inside "data/" to S3: - python3 GEMStack/offboard/log_management/s3.py \ + # Push the latest folder from the local "logs" directory to S3: + python3 -m GEMstack.offboard.log_management.s3 \ --action push \ - --folder 2025-02-12_15-30-00 \ --bucket cs588 \ --s3-prefix captures - # Pull (download) that same folder from S3 into local "download/" directory: - python3 GEMStack/offboard/log_management/s3.py \ + # Pull (download) the latest folder from S3 into local "download/" directory: + python3 -m GEMstack.offboard.log_management.s3 \ --action pull \ - --folder 2025-02-12_15-30-00 \ --bucket cs588 \ --s3-prefix captures \ --dest-dir download @@ -32,6 +33,7 @@ import boto3 import os import sys +from datetime import datetime from dotenv import load_dotenv def get_s3_client(): @@ -43,8 +45,7 @@ def get_s3_client(): - AWS_DEFAULT_REGION Exits if any of these are missing. """ - # load environment variables from .env file - # override in case local config exists for AWS + # load environment variables from .env file (override local config if exists) load_dotenv(override=True) access_key = os.environ.get('AWS_ACCESS_KEY_ID') @@ -57,7 +58,6 @@ def get_s3_client(): "AWS_SECRET_ACCESS_KEY, and AWS_DEFAULT_REGION environment variables (in .env)." ) - print(access_key, secret_key, region) return boto3.client( 's3', aws_access_key_id=access_key, @@ -110,7 +110,6 @@ def pull_folder_from_s3(bucket, s3_prefix, folder_name, dest_dir): s3_client = get_s3_client() check_s3_connection(s3_client, bucket) - # this prefix is what we'll look for in the bucket. prefix = os.path.join(s3_prefix, folder_name) paginator = s3_client.get_paginator('list_objects_v2') @@ -122,17 +121,13 @@ def pull_folder_from_s3(bucket, s3_prefix, folder_name, dest_dir): continue for obj in page['Contents']: key = obj['Key'] - # if the object key is exactly the "folder" (a directory placeholder), skip it. - # e.g. sometimes you can have an empty key or just a trailing slash. if key.endswith("/"): continue found_files = True - # figure out the relative path to recreate the same structure locally. relative_path = os.path.relpath(key, prefix) local_path = os.path.join(dest_dir, folder_name, relative_path) - # ensure the local directory exists. os.makedirs(os.path.dirname(local_path), exist_ok=True) print(f"Downloading s3://{bucket}/{key} to {local_path}") @@ -144,18 +139,76 @@ def pull_folder_from_s3(bucket, s3_prefix, folder_name, dest_dir): if not found_files: sys.exit(f"Error: No files found in bucket '{bucket}' with prefix '{prefix}'") +def get_latest_local_folder(base_dir): + """ + Scans the base directory for subdirectories and returns the one with the latest timestamp. + Assumes folder names are in the format "YYYY-MM-DD_HH-MM-SS". + """ + try: + subdirs = [d for d in os.listdir(base_dir) if os.path.isdir(os.path.join(base_dir, d))] + except FileNotFoundError: + sys.exit(f"Error: Base directory does not exist: {base_dir}") + + if not subdirs: + sys.exit(f"Error: No subdirectories found in base directory: {base_dir}") + + def parse_timestamp(folder): + try: + return datetime.strptime(folder, "%Y-%m-%d_%H-%M-%S") + except Exception: + return datetime.min + + latest = sorted(subdirs, key=parse_timestamp)[-1] + return latest + +def get_latest_s3_folder(s3_client, bucket, s3_prefix): + """ + Retrieves the latest folder name from S3 under the given prefix. + It lists common prefixes (folders) and returns the one with the latest timestamp. + Assumes folder names follow the format "YYYY-MM-DD_HH-MM-SS". + """ + response = s3_client.list_objects_v2( + Bucket=bucket, + Prefix=s3_prefix + "/", + Delimiter="/" + ) + if 'CommonPrefixes' not in response: + sys.exit(f"Error: No folders found in S3 bucket '{bucket}' with prefix '{s3_prefix}'") + + folders = [] + for cp in response['CommonPrefixes']: + prefix = cp.get('Prefix') + # Expect prefix like "captures/2025-02-12_15-30-00/" + parts = prefix.split('/') + if len(parts) >= 2: + folder_name = parts[-2] + folders.append(folder_name) + + if not folders: + sys.exit(f"Error: No valid folder names found in S3 bucket '{bucket}' with prefix '{s3_prefix}'") + + def parse_timestamp(folder): + try: + return datetime.strptime(folder, "%Y-%m-%d_%H-%M-%S") + except Exception: + return datetime.min + + latest = sorted(folders, key=parse_timestamp)[-1] + return latest + def main(): parser = argparse.ArgumentParser( - description="Push or pull a specific folder to/from an S3 bucket." + description="Push or pull the latest folder to/from an S3 bucket." ) parser.add_argument("--action", choices=["push", "pull"], required=True, help="Choose whether to push (upload) or pull (download) a folder.") - parser.add_argument("--folder", required=True, - help="Folder name (e.g. 2025-02-12_15-30-00)") - parser.add_argument("--base-dir", default="data", - help="Local base directory where capture runs are stored (for push).") + # The --folder argument is now optional. If not provided, the latest folder is auto-detected. + parser.add_argument("--folder", default=None, + help="(Optional) Folder name (e.g. 2025-02-12_15-30-00). If omitted, the latest folder is selected.") + parser.add_argument("--base-dir", default="logs", + help="Local base directory where capture runs are stored (used for push).") parser.add_argument("--dest-dir", default="download", - help="Local directory to place the downloaded folder (for pull).") + help="Local directory to place the downloaded folder (used for pull).") parser.add_argument("--bucket", required=True, help="S3 bucket name.") parser.add_argument("--s3-prefix", default="captures", @@ -163,7 +216,10 @@ def main(): args = parser.parse_args() if args.action == "push": - # pushing from local 'base_dir/folder' to S3 + # If no folder is provided, determine the latest local folder from the base directory. + if args.folder is None: + args.folder = get_latest_local_folder(args.base_dir) + print(f"Auto-detected latest local folder: {args.folder}") folder_path = os.path.join(args.base_dir, args.folder) if not os.path.exists(folder_path): sys.exit(f"Error: Folder does not exist: {folder_path}") @@ -180,7 +236,12 @@ def main(): push_folder_to_s3(folder_path, args.bucket, args.s3_prefix) elif args.action == "pull": - # pulling from S3 (prefix/folder) to local 'dest_dir/folder' + # If no folder is provided, query S3 for the latest folder under the specified prefix. + if args.folder is None: + s3_client = get_s3_client() + check_s3_connection(s3_client, args.bucket) + args.folder = get_latest_s3_folder(s3_client, args.bucket, args.s3_prefix) + print(f"Auto-detected latest folder on S3: {args.folder}") pull_folder_from_s3(args.bucket, args.s3_prefix, args.folder, args.dest_dir) if __name__ == '__main__':