luks2().unlock() now returns a MapperDev(). MapperDev() now have a .format() function, which relies on Partition.format() but with a custom path for now. It's not ideal, as it's a bit slow to resolve the Partition() object, but it works.

This commit is contained in:
Anton Hvornum 2022-02-09 11:33:51 +01:00
parent 32d6c1e635
commit 369daa43fc
No known key found for this signature in database
GPG Key ID: F1234C5BA67C59DF
4 changed files with 59 additions and 52 deletions

View File

@ -5,3 +5,5 @@ from .filesystem import Filesystem, MBR, GPT
from .partition import *
from .user_guides import *
from .validators import *
from .mapperdev import MapperDev
from .dmcryptdev import DMCryptDev

View File

@ -80,4 +80,8 @@ class MapperDev:
for mountpoint in self.mount_information:
for result in get_subvolumes_from_findmnt(mountpoint):
yield result
yield result
def format(self, filesystem :str, options :List[str] = []) -> bool:
# TODO: Create a format() helper function rather than relying on a dummy Partition().format() call:
self.partition.format(filesystem=filesystem, options = options, path=self.path)

View File

@ -29,8 +29,8 @@ class Partition:
part_id = os.path.basename(path)
self.block_device = block_device
if type(self.block_device) is str:
raise ValueError(f"Partition()'s 'block_device' parameter has to be a archinstall.BlockDevice() instance!")
if type(self.block_device) != BlockDevice:
raise AssertionError(f"Partition()'s 'block_device' parameter has to be a archinstall.BlockDevice() instance!")
self.path = path
self.part_id = part_id

View File

@ -10,7 +10,7 @@ from typing import Optional, List, Union, TYPE_CHECKING
if TYPE_CHECKING:
from .installer import Installer
from .disk import Partition, convert_device_to_uuid
from .disk import Partition, convert_device_to_uuid, MapperDev
from .general import SysCommand, SysCommandWorker
from .output import log
from .exceptions import SysCallError, DiskError
@ -97,12 +97,43 @@ class luks2:
'--hash', hash_type,
'--key-size', str(key_size),
'--iter-time', str(iter_time),
'--key-file', '/dev/stdin', # Reason: See issue #137
# '--key-file', '/tmp/x.pw', # Reason: See issue #137
'--use-urandom',
'luksFormat', partition.path,
])
try:
cryptworker = SysCommandWorker(cryptsetup_args)
pw_given = False
while cryptworker.is_alive():
if bytes(f'Enter passphrase for {partition.path}', 'UTF-8') in cryptworker and pw_given is False:
cryptworker.write(password)
pw_given = True
if cryptworker.exit_code == 256:
log(f'{partition} is being used, trying to unmount and crypt-close the device and running one more attempt at encrypting the device: {cryptworker}', level=logging.INFO)
# Partition was in use, unmount it and try again
partition.unmount()
# Get crypt-information about the device by doing a reverse lookup starting with the partition path
# For instance: /dev/sda
partition.partprobe()
devinfo = json.loads(b''.join(SysCommand(f"lsblk --fs -J {partition.path}")).decode('UTF-8'))['blockdevices'][0]
# For each child (sub-partition/sub-device)
if len(children := devinfo.get('children', [])):
for child in children:
# Unmount the child location
if child_mountpoint := child.get('mountpoint', None):
log(f'Unmounting {child_mountpoint}', level=logging.DEBUG)
SysCommand(f"umount -R {child_mountpoint}")
# And close it if possible.
log(f"Closing crypt device {child['name']}", level=logging.DEBUG)
SysCommand(f"cryptsetup close {child['name']}")
# Then try again to set up the crypt-device
cryptworker = SysCommandWorker(cryptsetup_args)
pw_given = False
@ -110,41 +141,8 @@ class luks2:
if not pw_given:
cryptworker.write(password)
pw_given = True
except SysCallError as err:
if err.exit_code == 256:
log(f'{partition} is being used, trying to unmount and crypt-close the device and running one more attempt at encrypting the device.', level=logging.DEBUG)
# Partition was in use, unmount it and try again
partition.unmount()
# Get crypt-information about the device by doing a reverse lookup starting with the partition path
# For instance: /dev/sda
partition.partprobe()
devinfo = json.loads(b''.join(SysCommand(f"lsblk --fs -J {partition.path}")).decode('UTF-8'))['blockdevices'][0]
# For each child (sub-partition/sub-device)
if len(children := devinfo.get('children', [])):
for child in children:
# Unmount the child location
if child_mountpoint := child.get('mountpoint', None):
log(f'Unmounting {child_mountpoint}', level=logging.DEBUG)
SysCommand(f"umount -R {child_mountpoint}")
# And close it if possible.
log(f"Closing crypt device {child['name']}", level=logging.DEBUG)
SysCommand(f"cryptsetup close {child['name']}")
# Then try again to set up the crypt-device
cryptworker = SysCommandWorker(cryptsetup_args)
pw_given = False
while cryptworker.is_alive():
if not pw_given:
cryptworker.write(password)
pw_given = True
else:
raise err
elif cryptworker.exit_code > 0:
raise DiskError(f"Could not encrypt {partition}: {cryptworker}")
return True
@ -183,30 +181,33 @@ class luks2:
if '/' in mountpoint:
os.path.basename(mountpoint) # TODO: Raise exception instead?
wait_timer = time.time()
while pathlib.Path(partition.path).exists() is False and time.time() - wait_timer < 10:
time.sleep(0.025)
cryptworker = SysCommandWorker(f'/usr/bin/cryptsetup open {partition.path} {mountpoint} --key-file /dev/stdin --type luks2')
cryptworker = SysCommandWorker(f'/usr/bin/cryptsetup open {partition.path} {mountpoint} --type luks2')
pw_given = False
while cryptworker.is_alive():
if not pw_given:
if bytes(f'Enter passphrase for {partition.path}', 'UTF-8') in cryptworker and pw_given is False:
cryptworker.write(password)
pw_given = True
if not cryptworker.exit_code == 0:
raise DiskError(f"Could not unlock {partition}: {cryptworker}")
if os.path.islink(f'/dev/mapper/{mountpoint}'):
self.mapdev = f'/dev/mapper/{mountpoint}'
# TODO: Return MapperDev instead of Partition
unlocked_partition = Partition(self.mapdev, None, encrypted=True, filesystem=get_filesystem_type(self.mapdev), autodetect_filesystem=False)
return unlocked_partition
return MapperDev(mountpoint)
def close(self, mountpoint :Optional[str] = None) -> bool:
if not mountpoint:
mountpoint = self.mapdev
SysCommand(f'/usr/bin/cryptsetup close {self.mapdev}')
return os.path.islink(self.mapdev) is False
if mountpoint:
SysCommand(f'/usr/bin/cryptsetup close {self.mapdev}')
if not mountpoint:
return False
return os.path.islink(mountpoint) is False
def format(self, path :str) -> None:
if (handle := SysCommand(f"/usr/bin/cryptsetup -q -v luksErase {path}")).exit_code != 0: