diff --git a/GEMstack/offboard/calibration/capture_lidar_zed.py b/GEMstack/offboard/calibration/capture_lidar_zed.py index 21814b793..ee11ebb1a 100644 --- a/GEMstack/offboard/calibration/capture_lidar_zed.py +++ b/GEMstack/offboard/calibration/capture_lidar_zed.py @@ -1,99 +1,107 @@ -# ROS Headers +#!/usr/bin/env python3 +""" +Capture ROS messages and save each run in its own time-stamped folder. +""" + import rospy -from sensor_msgs.msg import Image,PointCloud2 +from sensor_msgs.msg import Image, PointCloud2 import sensor_msgs.point_cloud2 as pc2 import ctypes import struct - -# OpenCV and cv2 bridge import cv2 from cv_bridge import CvBridge import numpy as np import os +import datetime lidar_points = None camera_image = None depth_image = None bridge = CvBridge() -def lidar_callback(lidar : PointCloud2): +def lidar_callback(lidar: PointCloud2): global lidar_points lidar_points = lidar -def camera_callback(img : Image): +def camera_callback(img: Image): global camera_image camera_image = img -def depth_callback(img : Image): +def depth_callback(img: Image): global depth_image depth_image = img -def pc2_to_numpy(pc2_msg, want_rgb = False): +def pc2_to_numpy(pc2_msg, want_rgb=False): gen = pc2.read_points(pc2_msg, skip_nans=True) if want_rgb: - xyzpack = np.array(list(gen),dtype=np.float32) + xyzpack = np.array(list(gen), dtype=np.float32) if xyzpack.shape[1] != 4: raise ValueError("PointCloud2 does not have points") - xyzrgb = np.empty((xyzpack.shape[0],6)) - xyzrgb[:,:3] = xyzpack[:,:3] - for i,x in enumerate(xyzpack): - rgb = x[3] - # cast float32 to int so that bitwise operations are possible - s = struct.pack('>f' ,rgb) - i = struct.unpack('>l',s)[0] - # you can get back the float value by the inverse operations - pack = ctypes.c_uint32(i).value - r = (pack & 0x00FF0000)>> 16 - g = (pack & 0x0000FF00)>> 8 + xyzrgb = np.empty((xyzpack.shape[0], 6)) + xyzrgb[:, :3] = xyzpack[:, :3] + for i, x in enumerate(xyzpack): + rgb = x[3] + s = struct.pack('>f', rgb) + i_val = struct.unpack('>l', s)[0] + pack = ctypes.c_uint32(i_val).value + r = (pack & 0x00FF0000) >> 16 + g = (pack & 0x0000FF00) >> 8 b = (pack & 0x000000FF) - #r,g,b values in the 0-255 range - xyzrgb[i,3:] = (r,g,b) + xyzrgb[i, 3:] = (r, g, b) return xyzrgb else: - return np.array(list(gen),dtype=np.float32)[:,:3] + return np.array(list(gen), dtype=np.float32)[:, :3] -def save_scan(lidar_fn,color_fn,depth_fn): - print("Saving scan to",lidar_fn,color_fn,depth_fn) - pc = pc2_to_numpy(lidar_points,want_rgb=False) - np.savez(lidar_fn,pc) - cv2.imwrite(color_fn,bridge.imgmsg_to_cv2(camera_image)) +def save_scan(lidar_fn, color_fn, depth_fn): + print("Saving scan to", lidar_fn, color_fn, depth_fn) + pc = pc2_to_numpy(lidar_points, want_rgb=False) + np.savez(lidar_fn, pc) + cv2.imwrite(color_fn, bridge.imgmsg_to_cv2(camera_image)) dimage = bridge.imgmsg_to_cv2(depth_image) dimage_non_nan = dimage[np.isfinite(dimage)] - print("Depth range",np.min(dimage_non_nan),np.max(dimage_non_nan)) - dimage = np.nan_to_num(dimage,nan=0,posinf=0,neginf=0) - dimage = (dimage/4000*0xffff) - print("Depth pixel range",np.min(dimage),np.max(dimage)) + print("Depth range", np.min(dimage_non_nan), np.max(dimage_non_nan)) + dimage = np.nan_to_num(dimage, nan=0, posinf=0, neginf=0) + dimage = (dimage / 4000 * 0xffff) + print("Depth pixel range", np.min(dimage), np.max(dimage)) dimage = dimage.astype(np.uint16) - cv2.imwrite(depth_fn,dimage) + cv2.imwrite(depth_fn, dimage) + +def main(base_folder='data', start_index=1): + # unique folder for this capture run based on the current date/time. + run_timestamp = datetime.datetime.now().strftime("%Y-%m-%d_%H-%M-%S") + run_folder = os.path.join(base_folder, run_timestamp) + os.makedirs(run_folder, exist_ok=True) + print("Capture run folder:", run_folder) + + rospy.init_node("capture_lidar_zed", disable_signals=True) + rospy.Subscriber("/lidar1/velodyne_points", PointCloud2, lidar_callback) + rospy.Subscriber("/zed2/zed_node/rgb/image_rect_color", Image, camera_callback) + rospy.Subscriber("/zed2/zed_node/depth/depth_registered", Image, depth_callback) -def main(folder='data',start_index=1): - rospy.init_node("capture_lidar_zed",disable_signals=True) - lidar_sub = rospy.Subscriber("/lidar1/velodyne_points", PointCloud2, lidar_callback) - camera_sub = rospy.Subscriber("/zed2/zed_node/rgb/image_rect_color", Image, camera_callback) - depth_sub = rospy.Subscriber("/zed2/zed_node/depth/depth_registered", Image, depth_callback) index = start_index print("Press any key to:") - print(" store lidar point clouds as npz") - print(" store color images as png") - print(" store depth images (m scaled by 0xffff/4000) as 16-bit tif") + print(" - store lidar point clouds as npz") + print(" - store color images as png") + print(" - store depth images (m scaled by 0xffff/4000) as 16-bit tif") print("Press Escape or Ctrl+C to quit") - while True: - if camera_image: - cv2.imshow("result",bridge.imgmsg_to_cv2(camera_image)) + + while not rospy.is_shutdown(): + if camera_image is not None: + cv2.imshow("result", bridge.imgmsg_to_cv2(camera_image)) key = cv2.waitKey(30) if key == -1: - #nothing - pass + pass # No key pressed. elif key == 27: - #escape - break + break # Escape key pressed. else: if lidar_points is None or camera_image is None or depth_image is None: print("Missing some messages?") else: - files = [os.path.join(folder,'lidar{}.npz'.format(index)), - os.path.join(folder,'color{}.png'.format(index)), - os.path.join(folder,'depth{}.tif'.format(index))] + files = [ + os.path.join(run_folder, 'lidar{}.npz'.format(index)), + os.path.join(run_folder, 'color{}.png'.format(index)), + os.path.join(run_folder, 'depth{}.tif'.format(index)) + ] save_scan(*files) index += 1 @@ -105,4 +113,4 @@ def main(folder='data',start_index=1): folder = sys.argv[1] if len(sys.argv) >= 3: start_index = int(sys.argv[2]) - main(folder,start_index) + main(folder, start_index) diff --git a/GEMstack/offboard/log_management/s3.py b/GEMstack/offboard/log_management/s3.py new file mode 100644 index 000000000..1a2550d00 --- /dev/null +++ b/GEMstack/offboard/log_management/s3.py @@ -0,0 +1,187 @@ +#!/usr/bin/env python3 +""" +This client interacts with S3 to upload (push) or download (pull) a specific folder. + +Before running this client make sure you have defined .env in root with following values: +AWS_ACCESS_KEY_ID=example +AWS_SECRET_ACCESS_KEY=example +AWS_DEFAULT_REGION=us-east-1 + +Requires: + pip install boto3 python-dotenv + +Example Usage: + + # Push a folder named "2025-02-12_15-30-00" inside "data/" to S3: + python3 GEMStack/offboard/log_management/s3.py \ + --action push \ + --folder 2025-02-12_15-30-00 \ + --bucket cs588 \ + --s3-prefix captures + + # Pull (download) that same folder from S3 into local "download/" directory: + python3 GEMStack/offboard/log_management/s3.py \ + --action pull \ + --folder 2025-02-12_15-30-00 \ + --bucket cs588 \ + --s3-prefix captures \ + --dest-dir download +""" + +import argparse +import boto3 +import os +import sys +from dotenv import load_dotenv + +def get_s3_client(): + """ + Initializes the S3 client using AWS credentials from environment variables. + Expects: + - AWS_ACCESS_KEY_ID + - AWS_SECRET_ACCESS_KEY + - AWS_DEFAULT_REGION + Exits if any of these are missing. + """ + # load environment variables from .env file + # override in case local config exists for AWS + load_dotenv(override=True) + + access_key = os.environ.get('AWS_ACCESS_KEY_ID') + secret_key = os.environ.get('AWS_SECRET_ACCESS_KEY') + region = os.environ.get('AWS_DEFAULT_REGION') + + if not access_key or not secret_key or not region: + sys.exit( + "Error: AWS credentials not set. Please set AWS_ACCESS_KEY_ID, " + "AWS_SECRET_ACCESS_KEY, and AWS_DEFAULT_REGION environment variables (in .env)." + ) + + print(access_key, secret_key, region) + return boto3.client( + 's3', + aws_access_key_id=access_key, + aws_secret_access_key=secret_key, + region_name=region + ) + +def check_s3_connection(s3_client, bucket): + """ + Verifies that we can connect to S3 and access the specified bucket. + """ + try: + s3_client.head_bucket(Bucket=bucket) + print(f"Connection check: Successfully accessed bucket '{bucket}'") + except Exception as e: + sys.exit(f"Error: Could not connect to S3 bucket '{bucket}': {e}") + +def push_folder_to_s3(folder_path, bucket, s3_prefix): + """ + Walks through the folder and uploads each file to S3 under the given prefix. + Files are stored under the key: s3_prefix/folder_name/. + """ + s3_client = get_s3_client() + check_s3_connection(s3_client, bucket) + + folder_name = os.path.basename(folder_path) + files_uploaded = 0 + + for root, _, files in os.walk(folder_path): + for file in files: + local_path = os.path.join(root, file) + # determine the file's path relative to the folder being pushed. + relative_path = os.path.relpath(local_path, folder_path) + s3_key = os.path.join(s3_prefix, folder_name, relative_path) + try: + print(f"Uploading {local_path} to s3://{bucket}/{s3_key}") + s3_client.upload_file(local_path, bucket, s3_key) + files_uploaded += 1 + except Exception as e: + print(f"Error uploading {local_path}: {e}") + + if files_uploaded == 0: + sys.exit(f"Error: No files were uploaded from folder: {folder_path}") + +def pull_folder_from_s3(bucket, s3_prefix, folder_name, dest_dir): + """ + Downloads all objects from S3 whose key begins with s3_prefix/folder_name. + The folder will be recreated under `dest_dir/folder_name`. + """ + s3_client = get_s3_client() + check_s3_connection(s3_client, bucket) + + # this prefix is what we'll look for in the bucket. + prefix = os.path.join(s3_prefix, folder_name) + + paginator = s3_client.get_paginator('list_objects_v2') + pages = paginator.paginate(Bucket=bucket, Prefix=prefix) + + found_files = False + for page in pages: + if 'Contents' not in page: + continue + for obj in page['Contents']: + key = obj['Key'] + # if the object key is exactly the "folder" (a directory placeholder), skip it. + # e.g. sometimes you can have an empty key or just a trailing slash. + if key.endswith("/"): + continue + + found_files = True + # figure out the relative path to recreate the same structure locally. + relative_path = os.path.relpath(key, prefix) + local_path = os.path.join(dest_dir, folder_name, relative_path) + + # ensure the local directory exists. + os.makedirs(os.path.dirname(local_path), exist_ok=True) + + print(f"Downloading s3://{bucket}/{key} to {local_path}") + try: + s3_client.download_file(bucket, key, local_path) + except Exception as e: + print(f"Error downloading s3://{bucket}/{key}: {e}") + + if not found_files: + sys.exit(f"Error: No files found in bucket '{bucket}' with prefix '{prefix}'") + +def main(): + parser = argparse.ArgumentParser( + description="Push or pull a specific folder to/from an S3 bucket." + ) + parser.add_argument("--action", choices=["push", "pull"], required=True, + help="Choose whether to push (upload) or pull (download) a folder.") + parser.add_argument("--folder", required=True, + help="Folder name (e.g. 2025-02-12_15-30-00)") + parser.add_argument("--base-dir", default="data", + help="Local base directory where capture runs are stored (for push).") + parser.add_argument("--dest-dir", default="download", + help="Local directory to place the downloaded folder (for pull).") + parser.add_argument("--bucket", required=True, + help="S3 bucket name.") + parser.add_argument("--s3-prefix", default="captures", + help="S3 prefix (folder) where data is stored or will be uploaded.") + args = parser.parse_args() + + if args.action == "push": + # pushing from local 'base_dir/folder' to S3 + folder_path = os.path.join(args.base_dir, args.folder) + if not os.path.exists(folder_path): + sys.exit(f"Error: Folder does not exist: {folder_path}") + + # check if the folder is empty. + folder_empty = True + for _, _, files in os.walk(folder_path): + if files: + folder_empty = False + break + if folder_empty: + sys.exit(f"Error: Folder is empty: {folder_path}") + + push_folder_to_s3(folder_path, args.bucket, args.s3_prefix) + + elif args.action == "pull": + # pulling from S3 (prefix/folder) to local 'dest_dir/folder' + pull_folder_from_s3(args.bucket, args.s3_prefix, args.folder, args.dest_dir) + +if __name__ == '__main__': + main()