From d57f29757d9f6edc24d261c83ea0195af31aa900 Mon Sep 17 00:00:00 2001 From: Anton Hvornum Date: Wed, 9 Feb 2022 10:17:05 +0100 Subject: [PATCH] Added back key_file support for legacy reasons, but it now acts as an input instead. --- archinstall/lib/disk/partition.py | 11 ++-- archinstall/lib/luks.py | 83 ++++++++++++++++++++++++------- 2 files changed, 74 insertions(+), 20 deletions(-) diff --git a/archinstall/lib/disk/partition.py b/archinstall/lib/disk/partition.py index 708edd29..a86a5ccb 100644 --- a/archinstall/lib/disk/partition.py +++ b/archinstall/lib/disk/partition.py @@ -261,9 +261,14 @@ class Partition: yield result def partprobe(self) -> bool: - if self.block_device and SysCommand(f'partprobe {self.block_device.device}').exit_code == 0: - time.sleep(1) - return True + if self.block_device + try: + SysCommand(f'partprobe {self.block_device.device}') + time.sleep(1) + return True + except SysCallError as error: + log(f"Could not run patprobe on {self.block_device.device}: {error}", level=logging.DEBUG) + return False def detect_inner_filesystem(self, password :str) -> Optional[str]: diff --git a/archinstall/lib/luks.py b/archinstall/lib/luks.py index e25a1b61..2bb0e3db 100644 --- a/archinstall/lib/luks.py +++ b/archinstall/lib/luks.py @@ -20,12 +20,14 @@ class luks2: def __init__(self, partition :Partition, mountpoint :str, - password :str, + password :Optional[Union[str, bytes]] = None, + key_file :Optional[str] = None, auto_unmount :bool = False, *args :str, **kwargs :str): self.password = password + self.key_file = key_file self.partition = partition self.mountpoint = mountpoint self.args = args @@ -35,6 +37,13 @@ class luks2: self.mapdev = None def __enter__(self) -> Partition: + if self.password is None and (self.key_file is None or pathlib.Path(self.key_file).exists() is False): + raise AssertionError(f"luks2() requires either a `key_file` or a `password` parameter to operate.") + + if self.password is None: + with pathlib.Path(self.key_file).resolve().open('rb') as pwfile: + self.password = pwfile.read().rstrip(b'\r\n') + if type(self.password) != bytes: self.password = bytes(self.password, 'UTF-8') @@ -51,7 +60,8 @@ class luks2: return True def encrypt(self, partition :Partition, - password :Optional[str] = None, + password :Optional[Union[str, bytes]] = None, + key_file :Optional[str] = None, key_size :int = 512, hash_type :str = 'sha512', iter_time :int = 10000) -> bool: @@ -60,11 +70,24 @@ class luks2: if not password: password = self.password + if not key_file: + key_file = self.key_file + + if not any(password, key_file): + raise AssertionError(f"luks2().encrypt() requires either a `key_file` or a `password` parameter to operate.") + + if password is None and (key_file is None or pathlib.Path(key_file).exists() is False): + raise AssertionError(f"luks2().encrypt() requires either a `key_file` or a `password` parameter to operate.") + + if password is None: + with pathlib.Path(key_file).resolve().open('rb') as pwfile: + password = pwfile.read().rstrip(b'\r\n') if type(password) != bytes: password = bytes(password, 'UTF-8') partition.partprobe() + time.sleep(1) cryptsetup_args = shlex.join([ '/usr/bin/cryptsetup', @@ -75,23 +98,20 @@ class luks2: '--hash', hash_type, '--key-size', str(key_size), '--iter-time', str(iter_time), - '--key-file', '/dev/stdin', + '--key-file', '/dev/stdin', # Reason: See issue #137 '--use-urandom', 'luksFormat', partition.path, ]) try: - # Retry formatting the volume because archinstall can some times be too quick - # which generates a "Device /dev/sdX does not exist or access denied." between - # setting up partitions and us trying to encrypt it. - for i in range(storage['DISK_RETRY_ATTEMPTS']): - if (cmd_handle := SysCommand(cryptsetup_args)).exit_code != 0: - time.sleep(storage['DISK_TIMEOUTS']) - else: - break + cryptworker = SysCommandWorker(cryptsetup_args) + + pw_given = False + while cryptworker.is_alive(): + if not pw_given: + cryptworker.write(password) + pw_given = True - if cmd_handle.exit_code != 0: - raise DiskError(f'Could not encrypt volume "{partition.path}": {b"".join(cmd_handle)}') except SysCallError as err: if err.exit_code == 256: log(f'{partition} is being used, trying to unmount and crypt-close the device and running one more attempt at encrypting the device.', level=logging.DEBUG) @@ -100,7 +120,8 @@ class luks2: # Get crypt-information about the device by doing a reverse lookup starting with the partition path # For instance: /dev/sda - SysCommand(f'bash -c "partprobe"') + partition.partprobe() + devinfo = json.loads(b''.join(SysCommand(f"lsblk --fs -J {partition.path}")).decode('UTF-8'))['blockdevices'][0] # For each child (sub-partition/sub-device) @@ -116,13 +137,23 @@ class luks2: SysCommand(f"cryptsetup close {child['name']}") # Then try again to set up the crypt-device - cmd_handle = SysCommand(cryptsetup_args) + cryptworker = SysCommandWorker(cryptsetup_args) + + pw_given = False + while cryptworker.is_alive(): + if not pw_given: + cryptworker.write(password) + pw_given = True else: raise err return True - def unlock(self, partition :Partition, mountpoint :str, password :str) -> Partition: + def unlock(self, + partition :Partition, + mountpoint :str, + password :Optional[Union[str, bytes]] = None, + key_file :Optional[str] = None) -> Partition: """ Mounts a luks2 compatible partition to a certain mountpoint. Keyfile must be specified as there's no way to interact with the pw-prompt atm. @@ -132,6 +163,24 @@ class luks2: """ from .disk import get_filesystem_type + if not password: + password = self.password + if not key_file: + key_file = self.key_file + + if not any(password, key_file): + raise AssertionError(f"luks2().encrypt() requires either a `key_file` or a `password` parameter to operate.") + + if password is None and (key_file is None or pathlib.Path(key_file).exists() is False): + raise AssertionError(f"luks2().encrypt() requires either a `key_file` or a `password` parameter to operate.") + + if password is None: + with pathlib.Path(key_file).resolve().open('rb') as pwfile: + password = pwfile.read().rstrip(b'\r\n') + + if type(password) != bytes: + password = bytes(password, 'UTF-8') + if '/' in mountpoint: os.path.basename(mountpoint) # TODO: Raise exception instead? @@ -144,7 +193,7 @@ class luks2: pw_given = False while cryptworker.is_alive(): if not pw_given: - cryptworker.write(bytes(password, 'UTF-8')) + cryptworker.write(password) pw_given = True if os.path.islink(f'/dev/mapper/{mountpoint}'):