Added back key_file support for legacy reasons, but it now acts as an input instead.

This commit is contained in:
Anton Hvornum 2022-02-09 10:17:05 +01:00
parent 47dddfc497
commit d57f29757d
No known key found for this signature in database
GPG Key ID: F1234C5BA67C59DF
2 changed files with 74 additions and 20 deletions

View File

@ -261,9 +261,14 @@ class Partition:
yield result yield result
def partprobe(self) -> bool: def partprobe(self) -> bool:
if self.block_device and SysCommand(f'partprobe {self.block_device.device}').exit_code == 0: if self.block_device
time.sleep(1) try:
return True SysCommand(f'partprobe {self.block_device.device}')
time.sleep(1)
return True
except SysCallError as error:
log(f"Could not run patprobe on {self.block_device.device}: {error}", level=logging.DEBUG)
return False return False
def detect_inner_filesystem(self, password :str) -> Optional[str]: def detect_inner_filesystem(self, password :str) -> Optional[str]:

View File

@ -20,12 +20,14 @@ class luks2:
def __init__(self, def __init__(self,
partition :Partition, partition :Partition,
mountpoint :str, mountpoint :str,
password :str, password :Optional[Union[str, bytes]] = None,
key_file :Optional[str] = None,
auto_unmount :bool = False, auto_unmount :bool = False,
*args :str, *args :str,
**kwargs :str): **kwargs :str):
self.password = password self.password = password
self.key_file = key_file
self.partition = partition self.partition = partition
self.mountpoint = mountpoint self.mountpoint = mountpoint
self.args = args self.args = args
@ -35,6 +37,13 @@ class luks2:
self.mapdev = None self.mapdev = None
def __enter__(self) -> Partition: def __enter__(self) -> Partition:
if self.password is None and (self.key_file is None or pathlib.Path(self.key_file).exists() is False):
raise AssertionError(f"luks2() requires either a `key_file` or a `password` parameter to operate.")
if self.password is None:
with pathlib.Path(self.key_file).resolve().open('rb') as pwfile:
self.password = pwfile.read().rstrip(b'\r\n')
if type(self.password) != bytes: if type(self.password) != bytes:
self.password = bytes(self.password, 'UTF-8') self.password = bytes(self.password, 'UTF-8')
@ -51,7 +60,8 @@ class luks2:
return True return True
def encrypt(self, partition :Partition, def encrypt(self, partition :Partition,
password :Optional[str] = None, password :Optional[Union[str, bytes]] = None,
key_file :Optional[str] = None,
key_size :int = 512, key_size :int = 512,
hash_type :str = 'sha512', hash_type :str = 'sha512',
iter_time :int = 10000) -> bool: iter_time :int = 10000) -> bool:
@ -60,11 +70,24 @@ class luks2:
if not password: if not password:
password = self.password password = self.password
if not key_file:
key_file = self.key_file
if not any(password, key_file):
raise AssertionError(f"luks2().encrypt() requires either a `key_file` or a `password` parameter to operate.")
if password is None and (key_file is None or pathlib.Path(key_file).exists() is False):
raise AssertionError(f"luks2().encrypt() requires either a `key_file` or a `password` parameter to operate.")
if password is None:
with pathlib.Path(key_file).resolve().open('rb') as pwfile:
password = pwfile.read().rstrip(b'\r\n')
if type(password) != bytes: if type(password) != bytes:
password = bytes(password, 'UTF-8') password = bytes(password, 'UTF-8')
partition.partprobe() partition.partprobe()
time.sleep(1)
cryptsetup_args = shlex.join([ cryptsetup_args = shlex.join([
'/usr/bin/cryptsetup', '/usr/bin/cryptsetup',
@ -75,23 +98,20 @@ class luks2:
'--hash', hash_type, '--hash', hash_type,
'--key-size', str(key_size), '--key-size', str(key_size),
'--iter-time', str(iter_time), '--iter-time', str(iter_time),
'--key-file', '/dev/stdin', '--key-file', '/dev/stdin', # Reason: See issue #137
'--use-urandom', '--use-urandom',
'luksFormat', partition.path, 'luksFormat', partition.path,
]) ])
try: try:
# Retry formatting the volume because archinstall can some times be too quick cryptworker = SysCommandWorker(cryptsetup_args)
# which generates a "Device /dev/sdX does not exist or access denied." between
# setting up partitions and us trying to encrypt it. pw_given = False
for i in range(storage['DISK_RETRY_ATTEMPTS']): while cryptworker.is_alive():
if (cmd_handle := SysCommand(cryptsetup_args)).exit_code != 0: if not pw_given:
time.sleep(storage['DISK_TIMEOUTS']) cryptworker.write(password)
else: pw_given = True
break
if cmd_handle.exit_code != 0:
raise DiskError(f'Could not encrypt volume "{partition.path}": {b"".join(cmd_handle)}')
except SysCallError as err: except SysCallError as err:
if err.exit_code == 256: if err.exit_code == 256:
log(f'{partition} is being used, trying to unmount and crypt-close the device and running one more attempt at encrypting the device.', level=logging.DEBUG) log(f'{partition} is being used, trying to unmount and crypt-close the device and running one more attempt at encrypting the device.', level=logging.DEBUG)
@ -100,7 +120,8 @@ class luks2:
# Get crypt-information about the device by doing a reverse lookup starting with the partition path # Get crypt-information about the device by doing a reverse lookup starting with the partition path
# For instance: /dev/sda # For instance: /dev/sda
SysCommand(f'bash -c "partprobe"') partition.partprobe()
devinfo = json.loads(b''.join(SysCommand(f"lsblk --fs -J {partition.path}")).decode('UTF-8'))['blockdevices'][0] devinfo = json.loads(b''.join(SysCommand(f"lsblk --fs -J {partition.path}")).decode('UTF-8'))['blockdevices'][0]
# For each child (sub-partition/sub-device) # For each child (sub-partition/sub-device)
@ -116,13 +137,23 @@ class luks2:
SysCommand(f"cryptsetup close {child['name']}") SysCommand(f"cryptsetup close {child['name']}")
# Then try again to set up the crypt-device # Then try again to set up the crypt-device
cmd_handle = SysCommand(cryptsetup_args) cryptworker = SysCommandWorker(cryptsetup_args)
pw_given = False
while cryptworker.is_alive():
if not pw_given:
cryptworker.write(password)
pw_given = True
else: else:
raise err raise err
return True return True
def unlock(self, partition :Partition, mountpoint :str, password :str) -> Partition: def unlock(self,
partition :Partition,
mountpoint :str,
password :Optional[Union[str, bytes]] = None,
key_file :Optional[str] = None) -> Partition:
""" """
Mounts a luks2 compatible partition to a certain mountpoint. Mounts a luks2 compatible partition to a certain mountpoint.
Keyfile must be specified as there's no way to interact with the pw-prompt atm. Keyfile must be specified as there's no way to interact with the pw-prompt atm.
@ -132,6 +163,24 @@ class luks2:
""" """
from .disk import get_filesystem_type from .disk import get_filesystem_type
if not password:
password = self.password
if not key_file:
key_file = self.key_file
if not any(password, key_file):
raise AssertionError(f"luks2().encrypt() requires either a `key_file` or a `password` parameter to operate.")
if password is None and (key_file is None or pathlib.Path(key_file).exists() is False):
raise AssertionError(f"luks2().encrypt() requires either a `key_file` or a `password` parameter to operate.")
if password is None:
with pathlib.Path(key_file).resolve().open('rb') as pwfile:
password = pwfile.read().rstrip(b'\r\n')
if type(password) != bytes:
password = bytes(password, 'UTF-8')
if '/' in mountpoint: if '/' in mountpoint:
os.path.basename(mountpoint) # TODO: Raise exception instead? os.path.basename(mountpoint) # TODO: Raise exception instead?
@ -144,7 +193,7 @@ class luks2:
pw_given = False pw_given = False
while cryptworker.is_alive(): while cryptworker.is_alive():
if not pw_given: if not pw_given:
cryptworker.write(bytes(password, 'UTF-8')) cryptworker.write(password)
pw_given = True pw_given = True
if os.path.islink(f'/dev/mapper/{mountpoint}'): if os.path.islink(f'/dev/mapper/{mountpoint}'):