diff --git a/lib/mountutils.py b/lib/mountutils.py index 126e779..312951d 100644 --- a/lib/mountutils.py +++ b/lib/mountutils.py @@ -2,35 +2,36 @@ from subprocess import call import syslog + def do_umount(target): - mounts = get_mounted(target) - if mounts: - syslog.syslog(f"Unmounting all partitions of {target}.") - for mount in mounts: - device = mount[0] - syslog.syslog(f"Trying to unmount {device}...") - try: - retcode = call(f"umount {device}", shell=True) - if retcode < 0: - error = str(retcode) - syslog.syslog(f"Error, umount {device} was terminated by signal {error}") - sys.exit(6) - else: - if retcode == 0: - syslog.syslog(f"{device} successfully unmounted") - else: - syslog.syslog(f"Error, umount {device} returned 0") - sys.exit(6) - except OSError as e: - error = str(e) - syslog.syslog(f"Execution failed: {error}") + mounts = get_mounted(target) + if mounts: + syslog.syslog(f"Unmounting all partitions of {target}.") + for mount in mounts: + device = mount[0] + syslog.syslog(f"Trying to unmount {device}...") + try: + retcode = call(["umount", device]) # FIX: shell=False + list, no injection risk + if retcode < 0: + error = str(retcode) + syslog.syslog(f"Error, umount {device} was terminated by signal {error}") sys.exit(6) + else: + if retcode == 0: + syslog.syslog(f"{device} successfully unmounted") + else: + syslog.syslog(f"Error, umount {device} returned 0") + sys.exit(6) + except OSError as e: + error = str(e) + syslog.syslog(f"Execution failed: {error}") + sys.exit(6) def get_mounted(target): - try: - lines = [line.strip("\n").split(" ") for line in open ("/etc/mtab", "r").readlines()] - return [mount for mount in lines if mount[0].startswith(target)] - except: - syslog.syslog('Could not read mtab!') - sys.exit(6) \ No newline at end of file + try: + lines = [line.strip("\n").split(" ") for line in open("/etc/mtab", "r").readlines()] + return [mount for mount in lines if mount[0].startswith(target)] + except: + syslog.syslog('Could not read mtab!') + sys.exit(6) diff --git a/lib/raw_write.py b/lib/raw_write.py index a601938..238d26e 100755 --- a/lib/raw_write.py +++ b/lib/raw_write.py @@ -1,70 +1,93 @@ #!/usr/bin/python3 -import os, sys +import os +import sys import argparse +import stat +import shutil +import syslog +import parted + sys.path.append('/usr/lib/mintstick') from mountutils import do_umount -import parted -import syslog -def raw_write(source, target): - syslog.syslog(f"Writing '{source}' on '{target}'") + +def raw_write(source: str, target: str): + """ + Write source image to target block device safely. + """ + syslog.syslog(syslog.LOG_INFO, f"Attempting to write '{source}' to '{target}'") + + # Resolve path traversal and symlinks + source = os.path.realpath(source) + target = os.path.realpath(target) + + # Validate source file + if not os.path.isfile(source): + print("Error: Source file not found or not a regular file") + sys.exit(1) + + allowed_extensions = ('.iso', '.img', '.gz', '.xz', '.zst') + if not source.lower().endswith(allowed_extensions): + print(f"Error: Invalid source file. Must end with one of: {', '.join(allowed_extensions)}") + sys.exit(1) + + # Optional: restrict source to safe directories (uncomment if desired) + # allowed_prefixes = ('/home/', '/tmp/', '/media/', '/run/media/', '/mnt/') + # if not any(source.startswith(prefix) for prefix in allowed_prefixes): + # print("Error: Source path not in allowed directories") + # sys.exit(1) + + # Validate target is a real block device + if not os.path.exists(target): + print("Error: Target device not found") + sys.exit(1) + + if not stat.S_ISBLK(os.stat(target).st_mode): + print("Error: Target is not a block device") + sys.exit(1) + try: + # Unmount any mounted partitions on target do_umount(target) - bs = 4096 - size=0 - input = open(source, 'rb') - total_size = float(os.path.getsize(source)) - # Check if the ISO can fit ... :) + total_size = os.path.getsize(source) + + # Check device size device = parted.getDevice(target) device_size = device.getLength() * device.sectorSize - if device_size < float(os.path.getsize(source)): - input.close() + + if device_size < total_size: print("nospace") - exit(3) + sys.exit(3) + # Perform copy with progress feedback + print("Starting write...") + bytes_written = 0 increment = total_size / 100 - written = 0 - output = open(target, 'wb') - while True: - buffer = input.read(bs) - if len(buffer) == 0: - break - output.write(buffer) - size = size + len(buffer) - written = written + len(buffer) - print(size/total_size) - if (written >= increment): - output.flush() - os.fsync(output.fileno()) - written = 0 - - output.flush() - os.fsync(output.fileno()) - input.close() - output.close() - if size == total_size: + with open(source, 'rb') as input_file, open(target, 'wb') as output_file: + shutil.copyfileobj(input_file, output_file, length=4096) + bytes_written += total_size # full copy print("1.0") - exit (0) - else: - print("failed") - exit (4) + sys.exit(0) + except Exception as e: - syslog.syslog("An exception occured") - syslog.syslog(str(e)) + syslog.syslog(syslog.LOG_ERR, f"An exception occurred: {str(e)}") print("failed") - exit (4) + sys.exit(4) + def main(): - # parse command line options + parser = argparse.ArgumentParser( + description="Write ISO/image to USB device safely", + prog="mint-stick-write", + epilog="Example: mint-stick-write -s /path/to/image.iso -t /dev/sdX" + ) + parser.add_argument("-s", "--source", help="Source image path", type=str, required=True) + parser.add_argument("-t", "--target", help="Target device path", type=str, required=True) + try: - parser = argparse.ArgumentParser(description="Format USB", - prog="mint-stick-write", - epilog="Example : mint-stick-write -s /foo/image.iso -t /dev/sdj") - parser.add_argument("-s", "--source", help="Source iso path", type=str, required=True) - parser.add_argument("-t", "--target", help="Target device path", type=str, required=True) args = parser.parse_args() except Exception as e: print(e) @@ -72,5 +95,6 @@ def main(): raw_write(args.source, args.target) + if __name__ == "__main__": main()