import getpass import grp import hashlib import os import pathlib import re import shlex import shutil import subprocess import sys import time from argparse import ArgumentParser from collections.abc import Iterator from select import EPOLLHUP, EPOLLIN, epoll from shutil import which from types import TracebackType from typing import Any, Self, override class RequirementError(Exception): pass class ArgumentError(Exception): pass def get_master(interface): master_path = pathlib.Path(f'/sys/class/net/{interface}/master') return master_path.readlink().name if master_path.exists() else None def gray(text): return f'\033[38;5;246m{text}\033[0m' def orange(text): return f'\033[38;5;208m{text}\033[0m' def red(text): return f'\033[31m{text}\033[0m' sudo_password = None # Gets populated later harddrives = {} username = getpass.getuser() groupname = grp.getgrgid(os.getgid()).gr_name # https://stackoverflow.com/a/43627833/929999 _VT100_ESCAPE_REGEX = r'\x1B\[[?0-9;]*[a-zA-Z]' _VT100_ESCAPE_REGEX_BYTES = _VT100_ESCAPE_REGEX.encode() parser = ArgumentParser(description='A set of common parameters for the tooling', add_help=True) # Defaults to the order of which the harddrives are defined. boot_option = parser.add_mutually_exclusive_group() boot_option.add_argument('--uki', help='Boot a UKI (EFI) image') boot_option.add_argument('--kernel', help='Boot a Linux kernel') boot_option.add_argument('--iso', help='Boot a ISO 9660') networking = parser.add_argument_group('Networking', "Disables the default '-net nic -net user' network behavior of Qemu.") networking.add_argument('--tap', nargs='?', help='Configures a TAP interface and passes it in as a virtio-net-pci.', default=None, type=str) networking.add_argument('--tap-mac', nargs='?', help='MAC for the --tap interface', default='52:54:00:00:00:02') networking.add_argument('--bridge', nargs='?', help='Configures a bridge, to which the --tap is added.', default=None, type=str) networking.add_argument('--bridge-mac', nargs='?', help='MAC for the interface', default=None) networking.add_argument('--bridge-master', nargs='?', help="Which interface to set as 'master' on the bridge.", default=None, type=str) hardware = parser.add_argument_group('Hardware', 'General hardware specs for the virtual machine') # To override the use of EFI boot (will not work with --uki for obvious reasons) hardware.add_argument('--bios', action='store_true', help='Disables EFI (edk2/ovmf) and uses BIOS support instead', default=False) hardware.add_argument('--memory', nargs='?', help='Ammount of memory to supply the machine', default=8192) hardware.add_argument('--harddrive', action='append', help='Sets up one or more virtio-scsi-pci, size is defined by --harddrive test.qcow2:15G', type=str) hardware.add_argument('--cpu', help='Sets the number of cores to allocate (default nproc -1)', type=str, default=os.cpu_count() - 1 if os.cpu_count() else 1) hardware.add_argument('--resolution', help="Sets Qemu's VGA resolution", type=str, default='1920x1107') kernel = parser.add_argument_group('Kernel', '--kernel specific arguments') kernel.add_argument('--initrd', nargs='?', help='Defines which ISO to run (skips build all together)', default=None, type=pathlib.Path) args, unknowns = parser.parse_known_args() # pylint: disable=redefined-outer-name if args.bios and args.uki: raise ArgumentError('Cannot boot a --uki image with --bios mode (at least not that I know of).') if args.uki is None and args.kernel is None and args.iso is None and args.harddrive is None: raise ArgumentError('Cannot boot this machine, define at least one of: --uki, --kernel, --iso, --harddrive') if args.bridge is None and args.bridge_master: raise ArgumentError('Cannot use --bridge-master without defining --bridge') if args.bridge is None and args.bridge_mac: raise ArgumentError('Cannot use --bridge-mac without defining --bridge') elif args.bridge and args.bridge_mac is None: args.bridge_mac = '52:54:00:00:00:1' if args.tap and not args.bridge and get_master(args.tap) is None: # We'll allow it, because maybe we're tesing what happens without networking, but the NIC exists. # Or the user has some creative iptables/nftables forwarding. print(orange('--tap does not have a master, consider adding --bridge or manual set a master using ip-link(8).')) if args.tap is None and args.bridge: print(orange("--bridge* arguments will be ignored since there's no --tap defined")) elif args.tap and args.tap_mac is None: args.tap_mac = '52:54:00:00:00:2' class SysCallError(Exception): def __init__(self, message: str, exit_code: int | None = None, worker_log: bytes = b'') -> None: super().__init__(message) self.message = message self.exit_code = exit_code self.worker_log = worker_log def clear_vt100_escape_codes(data: bytes) -> bytes: return re.sub(_VT100_ESCAPE_REGEX_BYTES, b'', data) def locate_binary(name: str) -> str: if path := which(name): return path raise RequirementError(f'Binary {name} does not exist.') def _pid_exists(pid: int) -> bool: try: return any(subprocess.check_output(['ps', '--no-headers', '-o', 'pid', '-p', str(pid)]).strip()) except subprocess.CalledProcessError: return False class SysCommandWorker: def __init__( self, cmd: str | list[str], peek_output: bool | None = False, environment_vars: dict[str, str] | None = None, working_directory: str = './', remove_vt100_escape_codes_from_lines: bool = True, ): if isinstance(cmd, str): cmd = shlex.split(cmd) if cmd and not cmd[0].startswith(('/', './')): # Path() does not work well cmd[0] = locate_binary(cmd[0]) self.cmd = cmd self.peek_output = peek_output # define the standard locale for command outputs. For now the C ascii one. Can be overridden self.environment_vars = {'LC_ALL': 'C'} if environment_vars: self.environment_vars.update(environment_vars) self.working_directory = working_directory self.exit_code: int | None = None self._trace_log = b'' self._trace_log_pos = 0 self.poll_object = epoll() self.child_fd: int | None = None self.started = False self.ended = False self.remove_vt100_escape_codes_from_lines: bool = remove_vt100_escape_codes_from_lines def __contains__(self, key: bytes) -> bool: """ Contains will also move the current buffert position forward. This is to avoid re-checking the same data when looking for output. """ assert isinstance(key, bytes) index = self._trace_log.find(key, self._trace_log_pos) if index >= 0: self._trace_log_pos += index + len(key) return True return False def __iter__(self, *args: str, **kwargs: dict[str, Any]) -> Iterator[bytes]: # pylint: disable=redefined-outer-name last_line = self._trace_log.rfind(b'\n') lines = filter(None, self._trace_log[self._trace_log_pos : last_line].splitlines()) for line in lines: if self.remove_vt100_escape_codes_from_lines: line = clear_vt100_escape_codes(line) yield line + b'\n' self._trace_log_pos = last_line @override def __repr__(self) -> str: self.make_sure_we_are_executing() return str(self._trace_log) @override def __str__(self) -> str: try: return self._trace_log.decode('utf-8') except UnicodeDecodeError: return str(self._trace_log) def __enter__(self) -> Self: return self def __exit__(self, exc_type: type[BaseException] | None, exc_value: BaseException | None, traceback: TracebackType | None) -> None: # b''.join(sys_command('sync')) # No need to, since the underlying fs() object will call sync. # TODO: https://stackoverflow.com/questions/28157929/how-to-safely-handle-an-exception-inside-a-context-manager if self.child_fd: try: os.close(self.child_fd) except Exception: pass if self.peek_output: # To make sure any peaked output didn't leave us hanging # on the same line we were on. sys.stdout.write('\n') sys.stdout.flush() if exc_type is not None: print(gray(str(exc_value))) if self.exit_code != 0: raise SysCallError( f'{self.cmd} exited with abnormal exit code [{self.exit_code}]: {str(self)[-500:]}', self.exit_code, worker_log=self._trace_log, ) def is_alive(self) -> bool: self.poll() if self.started and not self.ended: return True return False def write(self, data: bytes, line_ending: bool = True) -> int: assert isinstance(data, bytes) # TODO: Maybe we can support str as well and encode it self.make_sure_we_are_executing() if self.child_fd: return os.write(self.child_fd, data + (b'\n' if line_ending else b'')) return 0 def make_sure_we_are_executing(self) -> bool: if not self.started: return self.execute() return True def tell(self) -> int: self.make_sure_we_are_executing() return self._trace_log_pos def seek(self, pos: int) -> None: self.make_sure_we_are_executing() # Safety check to ensure 0 < pos < len(tracelog) self._trace_log_pos = min(max(0, pos), len(self._trace_log)) def peak(self, output: str | bytes) -> bool: if self.peek_output: if isinstance(output, bytes): try: output = output.decode('UTF-8') except UnicodeDecodeError: return False sys.stdout.write(output) sys.stdout.flush() return True def poll(self) -> None: self.make_sure_we_are_executing() if self.child_fd: got_output = False for _fileno, _event in self.poll_object.poll(0.1): try: output = os.read(self.child_fd, 8192) got_output = True self.peak(output) self._trace_log += output except OSError: self.ended = True break if self.ended or (not got_output and not _pid_exists(self.pid)): self.ended = True try: wait_status = os.waitpid(self.pid, 0)[1] self.exit_code = os.waitstatus_to_exitcode(wait_status) except ChildProcessError: try: wait_status = os.waitpid(self.child_fd, 0)[1] self.exit_code = os.waitstatus_to_exitcode(wait_status) except ChildProcessError: self.exit_code = 1 def execute(self) -> bool: import pty if (old_dir := os.getcwd()) != self.working_directory: os.chdir(str(self.working_directory)) # Note: If for any reason, we get a Python exception between here # and until os.close(), the traceback will get locked inside # stdout of the child_fd object. `os.read(self.child_fd, 8192)` is the # only way to get the traceback without losing it. self.pid, self.child_fd = pty.fork() # https://stackoverflow.com/questions/4022600/python-pty-fork-how-does-it-work if not self.pid: try: os.execve(self.cmd[0], list(self.cmd), {**os.environ, **self.environment_vars}) except FileNotFoundError: print(red(f'{self.cmd[0]} does not exist.')) self.exit_code = 1 return False else: # Only parent process moves back to the original working directory os.chdir(old_dir) self.started = True self.poll_object.register(self.child_fd, EPOLLIN | EPOLLHUP) return True def decode(self, encoding: str = 'UTF-8') -> str: return self._trace_log.decode(encoding) def ensure_sudo(): global sudo_password # pylint: disable=global-statement if sudo_password is None: if (sudo_password := getpass.getpass(f'[sudo] password for {username}: ')) == '': raise ValueError('Certain commands need sudo to work and no sudo password was given.') def setup_networking(): if args.tap: if pathlib.Path(f'/sys/class/net/{args.tap}').exists() is False: print(gray(f'Creating {args.tap} for user {username} and group {groupname}')) handle, pw_prompted = SysCommandWorker(f'sudo ip tuntap add dev {args.tap} mode tap user {username} group {groupname}'), False while handle.is_alive(): if b'password for' in handle and pw_prompted is False: ensure_sudo() handle.write(bytes(sudo_password, 'UTF-8')) pw_prompted = True if args.bridge: if pathlib.Path(f'/sys/class/net/{args.bridge}').exists() is False: print(gray(f'Creating {args.bridge}')) handle, pw_prompted = SysCommandWorker(f'sudo ip link add name {args.bridge} type bridge'), False while handle.is_alive(): if b'password for' in handle and pw_prompted is False: ensure_sudo() handle.write(bytes(sudo_password, 'UTF-8')) pw_prompted = True if args.bridge_mac: handle, pw_prompted = SysCommandWorker(f'sudo ip link set dev {args.bridge} address {args.bridge_mac}'), False print(gray(f'Setting bridge {args.bridge} MAC address to {args.bridge_mac}')) while handle.is_alive(): if b'password for' in handle and pw_prompted is False: ensure_sudo() handle.write(bytes(sudo_password, 'UTF-8')) pw_prompted = True if args.bridge_master and get_master(args.bridge) != args.bridge_master: handle, pw_prompted = SysCommandWorker(f'sudo ip link set dev {args.bridge_master} master {args.bridge}'), False print(gray(f'Setting interface {args.bridge_master} master to {args.bridge}')) while handle.is_alive(): if b'password for' in handle and pw_prompted is False: ensure_sudo() handle.write(bytes(sudo_password, 'UTF-8')) pw_prompted = True print(gray(f'Setting interface {args.tap} master to {args.bridge}')) handle, pw_prompted = SysCommandWorker(f'sudo ip link set dev {args.tap} master {args.bridge}'), False while handle.is_alive(): if b'password for' in handle and pw_prompted is False: ensure_sudo() handle.write(bytes(sudo_password, 'UTF-8')) pw_prompted = True print(gray(f'Bringing up bridge {args.bridge}')) handle, pw_prompted = SysCommandWorker(f'sudo ip link set dev {args.bridge} up'), False while handle.is_alive(): if b'password for' in handle and pw_prompted is False: ensure_sudo() handle.write(bytes(sudo_password, 'UTF-8')) pw_prompted = True print(gray(f'Bringing interface {args.tap} up')) handle, pw_prompted = SysCommandWorker(f'sudo ip link set dev {args.tap} up'), False while handle.is_alive(): if b'password for' in handle and pw_prompted is False: ensure_sudo() handle.write(bytes(sudo_password, 'UTF-8')) pw_prompted = True def setup_disks(): if args.harddrive: for harddrive_arg in args.harddrive: path, size = harddrive_arg.split(':') path = pathlib.Path(path.strip()).expanduser().resolve().absolute() harddrives[path] = size.strip() if path.exists() is False: handle = SysCommandWorker(f'qemu-img create -f qcow2 {hdd} {size}') while handle.is_alive(): time.sleep(0.01) if handle.exit_code != 0: raise ValueError(f'Could not create harddrive {hdd}: {handle}') setup_networking() setup_disks() if args.uki or args.bios is False: disk_paths_hash = hashlib.sha1((''.join(sorted([str(x) for x in harddrives.keys()]))).encode()).hexdigest() shutil.copy2('/usr/share/ovmf/x64/OVMF_CODE.secboot.4m.fd', f'./OVMF_CODE.secboot.4m.fd.{disk_paths_hash}') shutil.copy2('/usr/share/ovmf/x64/OVMF_VARS.4m.fd', f'./OVMF_VARS.4m.fd.{disk_paths_hash}') boot_index = 0 qemu = 'qemu-system-x86_64' qemu += ' -cpu host' qemu += ' -enable-kvm' qemu += ' -machine q35,accel=kvm' qemu += ' -object rng-random,filename=/dev/urandom,id=rng0' qemu += ' -device virtio-rng-pci,rng=rng0' qemu += ' -global driver=cfi.pflash01,property=secure,value=on' qemu += f' -smp {args.cpu},sockets=1,dies=1,cores={args.cpu},threads=1' # qemu += f' -vga vga' qemu += f' -device VGA,edid=on,xres={args.resolution.split("x")[0]},yres={args.resolution.split("x")[1]}' qemu += ' -device intel-iommu,device-iotlb=on,caching-mode=on' qemu += f' -m {args.memory}' if args.bios is False: qemu += f' -drive if=pflash,format=raw,readonly=on,file=./OVMF_CODE.secboot.4m.fd.{disk_paths_hash}' qemu += f' -drive if=pflash,format=raw,file=./OVMF_VARS.4m.fd.{disk_paths_hash}' if args.uki: qemu += f' -kernel {args.uki}' boot_index += 1 scsi_index = 0 for scsi_index, hdd in enumerate(harddrives.keys()): # qemu += f' -device virtio-scsi-pci,bus=pcie.0,id=scsi{index}' # qemu += f' -device scsi-hd,drive=hdd{index},bus=scsi{index}.0,id=scsi{index}.0,bootindex={hdd_boot_priority+index}' # qemu += f' -drive file={hdd},if=none,format=qcow2,discard=unmap,aio=native,cache=none,id=hdd{index}' qemu += f' -device virtio-scsi-pci,bus=pcie.0,id=scsi{scsi_index},addr=0x{scsi_index + 8}' qemu += f' -device scsi-hd,drive=libvirt-{scsi_index}-format,bus=scsi{scsi_index}.0,id=scsi{scsi_index}-0-0-0,channel=0,scsi-id=0,lun=0,device_id=drive-scsi0-0-0-0,bootindex={boot_index},write-cache=on' # noqa: E501 qemu += f' -blockdev \'{{"driver":"file","filename":"{hdd}","aio":"threads","node-name":"libvirt-{scsi_index}-storage","cache":{{"direct":false,"no-flush":false}},"auto-read-only":true,"discard":"unmap"}}\'' # noqa: E501 qemu += f' -blockdev \'{{"node-name":"libvirt-{scsi_index}-format","read-only":false,"discard":"unmap","cache":{{"direct":true,"no-flush":false}},"driver":"qcow2","file":"libvirt-{scsi_index}-storage","backing":null}}\'' # noqa: E501 boot_index += 1 if args.iso: qemu += f' -device virtio-scsi-pci,bus=pcie.0,id=scsi{scsi_index + 1}' qemu += f' -device scsi-cd,drive=cdrom0,bus=scsi{scsi_index + 1}.0,bootindex={boot_index}' qemu += f' -drive file={args.iso},media=cdrom,if=none,format=raw,cache=none,id=cdrom0' boot_index += 1 # if args.vfio: # qemu += f' -drive file={args.vfio},index=2,media=cdrom' if args.tap: qemu += f' -device virtio-net-pci,mac={args.tap_mac},id=network0,netdev=network0.0,status=on,bus=pcie.0' qemu += f' -netdev tap,ifname={args.tap},id=network0.0,script=no,downscript=no' print(gray(qemu)) qemu_session = subprocess.run(shlex.split(qemu), check=True, capture_output=True) if qemu_session.stdout: print(qemu_session.stdout.decode()) if qemu_session.returncode != 0: print(red(qemu_session.stderr.decode()))