Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
107 changes: 84 additions & 23 deletions GEMstack/offboard/log_management/s3.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,9 @@
#!/usr/bin/env python3
"""
This client interacts with S3 to upload (push) or download (pull) a specific folder.
This client interacts with S3 to upload (push) or download (pull) the latest folder.
For push, the latest folder in the local base directory (default "logs")
is determined based on its timestamp (folder name in "YYYY-MM-DD_HH-MM-SS" format).
For pull, the latest folder under the S3 prefix is selected.

Before running this client make sure you have defined .env in root with following values:
AWS_ACCESS_KEY_ID=example
Expand All @@ -12,17 +15,15 @@

Example Usage:

# Push a folder named "2025-02-12_15-30-00" inside "data/" to S3:
python3 GEMStack/offboard/log_management/s3.py \
# Push the latest folder from the local "logs" directory to S3:
python3 -m GEMstack.offboard.log_management.s3 \
--action push \
--folder 2025-02-12_15-30-00 \
--bucket cs588 \
--s3-prefix captures

# Pull (download) that same folder from S3 into local "download/" directory:
python3 GEMStack/offboard/log_management/s3.py \
# Pull (download) the latest folder from S3 into local "download/" directory:
python3 -m GEMstack.offboard.log_management.s3 \
--action pull \
--folder 2025-02-12_15-30-00 \
--bucket cs588 \
--s3-prefix captures \
--dest-dir download
Expand All @@ -32,6 +33,7 @@
import boto3
import os
import sys
from datetime import datetime
from dotenv import load_dotenv

def get_s3_client():
Expand All @@ -43,8 +45,7 @@ def get_s3_client():
- AWS_DEFAULT_REGION
Exits if any of these are missing.
"""
# load environment variables from .env file
# override in case local config exists for AWS
# load environment variables from .env file (override local config if exists)
load_dotenv(override=True)

access_key = os.environ.get('AWS_ACCESS_KEY_ID')
Expand All @@ -57,7 +58,6 @@ def get_s3_client():
"AWS_SECRET_ACCESS_KEY, and AWS_DEFAULT_REGION environment variables (in .env)."
)

print(access_key, secret_key, region)
return boto3.client(
's3',
aws_access_key_id=access_key,
Expand Down Expand Up @@ -110,7 +110,6 @@ def pull_folder_from_s3(bucket, s3_prefix, folder_name, dest_dir):
s3_client = get_s3_client()
check_s3_connection(s3_client, bucket)

# this prefix is what we'll look for in the bucket.
prefix = os.path.join(s3_prefix, folder_name)

paginator = s3_client.get_paginator('list_objects_v2')
Expand All @@ -122,17 +121,13 @@ def pull_folder_from_s3(bucket, s3_prefix, folder_name, dest_dir):
continue
for obj in page['Contents']:
key = obj['Key']
# if the object key is exactly the "folder" (a directory placeholder), skip it.
# e.g. sometimes you can have an empty key or just a trailing slash.
if key.endswith("/"):
continue

found_files = True
# figure out the relative path to recreate the same structure locally.
relative_path = os.path.relpath(key, prefix)
local_path = os.path.join(dest_dir, folder_name, relative_path)

# ensure the local directory exists.
os.makedirs(os.path.dirname(local_path), exist_ok=True)

print(f"Downloading s3://{bucket}/{key} to {local_path}")
Expand All @@ -144,26 +139,87 @@ def pull_folder_from_s3(bucket, s3_prefix, folder_name, dest_dir):
if not found_files:
sys.exit(f"Error: No files found in bucket '{bucket}' with prefix '{prefix}'")

def get_latest_local_folder(base_dir):
"""
Scans the base directory for subdirectories and returns the one with the latest timestamp.
Assumes folder names are in the format "YYYY-MM-DD_HH-MM-SS".
"""
try:
subdirs = [d for d in os.listdir(base_dir) if os.path.isdir(os.path.join(base_dir, d))]
except FileNotFoundError:
sys.exit(f"Error: Base directory does not exist: {base_dir}")

if not subdirs:
sys.exit(f"Error: No subdirectories found in base directory: {base_dir}")

def parse_timestamp(folder):
try:
return datetime.strptime(folder, "%Y-%m-%d_%H-%M-%S")
except Exception:
return datetime.min

latest = sorted(subdirs, key=parse_timestamp)[-1]
return latest

def get_latest_s3_folder(s3_client, bucket, s3_prefix):
"""
Retrieves the latest folder name from S3 under the given prefix.
It lists common prefixes (folders) and returns the one with the latest timestamp.
Assumes folder names follow the format "YYYY-MM-DD_HH-MM-SS".
"""
response = s3_client.list_objects_v2(
Bucket=bucket,
Prefix=s3_prefix + "/",
Delimiter="/"
)
if 'CommonPrefixes' not in response:
sys.exit(f"Error: No folders found in S3 bucket '{bucket}' with prefix '{s3_prefix}'")

folders = []
for cp in response['CommonPrefixes']:
prefix = cp.get('Prefix')
# Expect prefix like "captures/2025-02-12_15-30-00/"
parts = prefix.split('/')
if len(parts) >= 2:
folder_name = parts[-2]
folders.append(folder_name)

if not folders:
sys.exit(f"Error: No valid folder names found in S3 bucket '{bucket}' with prefix '{s3_prefix}'")

def parse_timestamp(folder):
try:
return datetime.strptime(folder, "%Y-%m-%d_%H-%M-%S")
except Exception:
return datetime.min

latest = sorted(folders, key=parse_timestamp)[-1]
return latest

def main():
parser = argparse.ArgumentParser(
description="Push or pull a specific folder to/from an S3 bucket."
description="Push or pull the latest folder to/from an S3 bucket."
)
parser.add_argument("--action", choices=["push", "pull"], required=True,
help="Choose whether to push (upload) or pull (download) a folder.")
parser.add_argument("--folder", required=True,
help="Folder name (e.g. 2025-02-12_15-30-00)")
parser.add_argument("--base-dir", default="data",
help="Local base directory where capture runs are stored (for push).")
# The --folder argument is now optional. If not provided, the latest folder is auto-detected.
parser.add_argument("--folder", default=None,
help="(Optional) Folder name (e.g. 2025-02-12_15-30-00). If omitted, the latest folder is selected.")
parser.add_argument("--base-dir", default="logs",
help="Local base directory where capture runs are stored (used for push).")
parser.add_argument("--dest-dir", default="download",
help="Local directory to place the downloaded folder (for pull).")
help="Local directory to place the downloaded folder (used for pull).")
parser.add_argument("--bucket", required=True,
help="S3 bucket name.")
parser.add_argument("--s3-prefix", default="captures",
help="S3 prefix (folder) where data is stored or will be uploaded.")
args = parser.parse_args()

if args.action == "push":
# pushing from local 'base_dir/folder' to S3
# If no folder is provided, determine the latest local folder from the base directory.
if args.folder is None:
args.folder = get_latest_local_folder(args.base_dir)
print(f"Auto-detected latest local folder: {args.folder}")
folder_path = os.path.join(args.base_dir, args.folder)
if not os.path.exists(folder_path):
sys.exit(f"Error: Folder does not exist: {folder_path}")
Expand All @@ -180,7 +236,12 @@ def main():
push_folder_to_s3(folder_path, args.bucket, args.s3_prefix)

elif args.action == "pull":
# pulling from S3 (prefix/folder) to local 'dest_dir/folder'
# If no folder is provided, query S3 for the latest folder under the specified prefix.
if args.folder is None:
s3_client = get_s3_client()
check_s3_connection(s3_client, args.bucket)
args.folder = get_latest_s3_folder(s3_client, args.bucket, args.s3_prefix)
print(f"Auto-detected latest folder on S3: {args.folder}")
pull_folder_from_s3(args.bucket, args.s3_prefix, args.folder, args.dest_dir)

if __name__ == '__main__':
Expand Down