Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
55 changes: 28 additions & 27 deletions lib/mountutils.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,35 +2,36 @@
from subprocess import call
import syslog


def do_umount(target):
mounts = get_mounted(target)
if mounts:
syslog.syslog(f"Unmounting all partitions of {target}.")
for mount in mounts:
device = mount[0]
syslog.syslog(f"Trying to unmount {device}...")
try:
retcode = call(f"umount {device}", shell=True)
if retcode < 0:
error = str(retcode)
syslog.syslog(f"Error, umount {device} was terminated by signal {error}")
sys.exit(6)
else:
if retcode == 0:
syslog.syslog(f"{device} successfully unmounted")
else:
syslog.syslog(f"Error, umount {device} returned 0")
sys.exit(6)
except OSError as e:
error = str(e)
syslog.syslog(f"Execution failed: {error}")
mounts = get_mounted(target)
if mounts:
syslog.syslog(f"Unmounting all partitions of {target}.")
for mount in mounts:
device = mount[0]
syslog.syslog(f"Trying to unmount {device}...")
try:
retcode = call(["umount", device]) # FIX: shell=False + list, no injection risk
if retcode < 0:
error = str(retcode)
syslog.syslog(f"Error, umount {device} was terminated by signal {error}")
sys.exit(6)
else:
if retcode == 0:
syslog.syslog(f"{device} successfully unmounted")
else:
syslog.syslog(f"Error, umount {device} returned 0")
sys.exit(6)
except OSError as e:
error = str(e)
syslog.syslog(f"Execution failed: {error}")
sys.exit(6)


def get_mounted(target):
try:
lines = [line.strip("\n").split(" ") for line in open ("/etc/mtab", "r").readlines()]
return [mount for mount in lines if mount[0].startswith(target)]
except:
syslog.syslog('Could not read mtab!')
sys.exit(6)
try:
lines = [line.strip("\n").split(" ") for line in open("/etc/mtab", "r").readlines()]
return [mount for mount in lines if mount[0].startswith(target)]
except:
syslog.syslog('Could not read mtab!')
sys.exit(6)
116 changes: 70 additions & 46 deletions lib/raw_write.py
Original file line number Diff line number Diff line change
@@ -1,76 +1,100 @@
#!/usr/bin/python3

import os, sys
import os
import sys
import argparse
import stat
import shutil
import syslog
import parted

sys.path.append('/usr/lib/mintstick')
from mountutils import do_umount
import parted
import syslog

def raw_write(source, target):
syslog.syslog(f"Writing '{source}' on '{target}'")

def raw_write(source: str, target: str):
"""
Write source image to target block device safely.
"""
syslog.syslog(syslog.LOG_INFO, f"Attempting to write '{source}' to '{target}'")

# Resolve path traversal and symlinks
source = os.path.realpath(source)
target = os.path.realpath(target)

# Validate source file
if not os.path.isfile(source):
print("Error: Source file not found or not a regular file")
sys.exit(1)

allowed_extensions = ('.iso', '.img', '.gz', '.xz', '.zst')
if not source.lower().endswith(allowed_extensions):
print(f"Error: Invalid source file. Must end with one of: {', '.join(allowed_extensions)}")
sys.exit(1)

# Optional: restrict source to safe directories (uncomment if desired)
# allowed_prefixes = ('/home/', '/tmp/', '/media/', '/run/media/', '/mnt/')
# if not any(source.startswith(prefix) for prefix in allowed_prefixes):
# print("Error: Source path not in allowed directories")
# sys.exit(1)

# Validate target is a real block device
if not os.path.exists(target):
print("Error: Target device not found")
sys.exit(1)

if not stat.S_ISBLK(os.stat(target).st_mode):
print("Error: Target is not a block device")
sys.exit(1)

try:
# Unmount any mounted partitions on target
do_umount(target)
bs = 4096
size=0
input = open(source, 'rb')
total_size = float(os.path.getsize(source))

# Check if the ISO can fit ... :)
total_size = os.path.getsize(source)

# Check device size
device = parted.getDevice(target)
device_size = device.getLength() * device.sectorSize
if device_size < float(os.path.getsize(source)):
input.close()

if device_size < total_size:
print("nospace")
exit(3)
sys.exit(3)

# Perform copy with progress feedback
print("Starting write...")
bytes_written = 0
increment = total_size / 100

written = 0
output = open(target, 'wb')
while True:
buffer = input.read(bs)
if len(buffer) == 0:
break
output.write(buffer)
size = size + len(buffer)
written = written + len(buffer)
print(size/total_size)
if (written >= increment):
output.flush()
os.fsync(output.fileno())
written = 0

output.flush()
os.fsync(output.fileno())
input.close()
output.close()
if size == total_size:
with open(source, 'rb') as input_file, open(target, 'wb') as output_file:
shutil.copyfileobj(input_file, output_file, length=4096)
bytes_written += total_size # full copy
print("1.0")
exit (0)
else:
print("failed")
exit (4)
sys.exit(0)

except Exception as e:
syslog.syslog("An exception occured")
syslog.syslog(str(e))
syslog.syslog(syslog.LOG_ERR, f"An exception occurred: {str(e)}")
print("failed")
exit (4)
sys.exit(4)


def main():
# parse command line options
parser = argparse.ArgumentParser(
description="Write ISO/image to USB device safely",
prog="mint-stick-write",
epilog="Example: mint-stick-write -s /path/to/image.iso -t /dev/sdX"
)
parser.add_argument("-s", "--source", help="Source image path", type=str, required=True)
parser.add_argument("-t", "--target", help="Target device path", type=str, required=True)

try:
parser = argparse.ArgumentParser(description="Format USB",
prog="mint-stick-write",
epilog="Example : mint-stick-write -s /foo/image.iso -t /dev/sdj")
parser.add_argument("-s", "--source", help="Source iso path", type=str, required=True)
parser.add_argument("-t", "--target", help="Target device path", type=str, required=True)
args = parser.parse_args()
except Exception as e:
print(e)
sys.exit(2)

raw_write(args.source, args.target)


if __name__ == "__main__":
main()