LVM support (#2104)

* Submenu for disk configuration

* Update

* Add LVM manual config

* PV selection

* LVM volume menu

* Update

* Fix mypy

* Update

* Update

* Update

* Update

* Update

* Update

* Update

* Update

* Update LVM

* Update

* Update

* Btrfs support

* Refactor

* LVM on Luks

* Luks on LVM

* Update

* LVM on Luks

* Update

* Update

* mypy

* Update

* Fix bug with LuksOnLvm and Btrfs

* Update

* Update

* Info -> Debug output
This commit is contained in:
Daniel Girtler 2024-04-15 18:49:00 +10:00 committed by GitHub
parent 7d9e9d8ba0
commit b470b16ec9
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
21 changed files with 1713 additions and 413 deletions

View File

@ -11,6 +11,11 @@ from .device_model import (
BDevice,
DiskLayoutType,
DiskLayoutConfiguration,
LvmLayoutType,
LvmConfiguration,
LvmVolumeGroup,
LvmVolume,
LvmVolumeStatus,
PartitionTable,
Unit,
Size,
@ -30,7 +35,7 @@ from .device_model import (
CleanType,
get_lsblk_info,
get_all_lsblk_info,
get_lsblk_by_mountpoint
get_lsblk_by_mountpoint,
)
from .encryption_menu import (
select_encryption_type,
@ -39,3 +44,5 @@ from .encryption_menu import (
select_partitions_to_encrypt,
DiskEncryptionMenu,
)
from .disk_menu import DiskLayoutConfigurationMenu

View File

@ -3,8 +3,9 @@ from __future__ import annotations
import json
import os
import logging
import time
from pathlib import Path
from typing import List, Dict, Any, Optional, TYPE_CHECKING
from typing import List, Dict, Any, Optional, TYPE_CHECKING, Literal, Iterable
from parted import ( # type: ignore
Disk, Geometry, FileSystem,
@ -17,11 +18,12 @@ from .device_model import (
BDevice, _DeviceInfo, _PartitionInfo,
FilesystemType, Unit, PartitionTable,
ModificationStatus, get_lsblk_info, LsblkInfo,
_BtrfsSubvolumeInfo, get_all_lsblk_info, DiskEncryption
_BtrfsSubvolumeInfo, get_all_lsblk_info, DiskEncryption, LvmVolumeGroup, LvmVolume, Size, LvmGroupInfo,
SectorSize, LvmVolumeInfo, LvmPVInfo, SubvolumeModification, BtrfsMountOption
)
from ..exceptions import DiskError, UnknownFilesystemFormat
from ..general import SysCommand, SysCallError, JSON
from ..general import SysCommand, SysCallError, JSON, SysCommandWorker
from ..luks import Luks2
from ..output import debug, error, info, warn, log
from ..utils.util import is_subpath
@ -189,7 +191,7 @@ class DeviceHandler(object):
return subvol_infos
def _perform_formatting(
def format(
self,
fs_type: FilesystemType,
path: Path,
@ -234,7 +236,7 @@ class DeviceHandler(object):
options += additional_parted_options
options_str = ' '.join(options)
info(f'Formatting filesystem: /usr/bin/{command} {options_str} {path}')
debug(f'Formatting filesystem: /usr/bin/{command} {options_str} {path}')
try:
SysCommand(f"/usr/bin/{command} {options_str} {path}")
@ -243,7 +245,33 @@ class DeviceHandler(object):
error(msg)
raise DiskError(msg) from err
def _perform_enc_formatting(
def encrypt(
self,
dev_path: Path,
mapper_name: Optional[str],
enc_password: str,
lock_after_create: bool = True
) -> Luks2:
luks_handler = Luks2(
dev_path,
mapper_name=mapper_name,
password=enc_password
)
key_file = luks_handler.encrypt()
luks_handler.unlock(key_file=key_file)
if not luks_handler.mapper_dev:
raise DiskError('Failed to unlock luks device')
if lock_after_create:
debug(f'luks2 locking device: {dev_path}')
luks_handler.lock()
return luks_handler
def format_encrypted(
self,
dev_path: Path,
mapper_name: Optional[str],
@ -258,71 +286,160 @@ class DeviceHandler(object):
key_file = luks_handler.encrypt()
debug(f'Unlocking luks2 device: {dev_path}')
luks_handler.unlock(key_file=key_file)
if not luks_handler.mapper_dev:
raise DiskError('Failed to unlock luks device')
info(f'luks2 formatting mapper dev: {luks_handler.mapper_dev}')
self._perform_formatting(fs_type, luks_handler.mapper_dev)
self.format(fs_type, luks_handler.mapper_dev)
info(f'luks2 locking device: {dev_path}')
luks_handler.lock()
def _validate_partitions(self, partitions: List[PartitionModification]):
checks = {
# verify that all partitions have a path set (which implies that they have been created)
lambda x: x.dev_path is None: ValueError('When formatting, all partitions must have a path set'),
# crypto luks is not a valid file system type
lambda x: x.fs_type is FilesystemType.Crypto_luks: ValueError('Crypto luks cannot be set as a filesystem type'),
# file system type must be set
lambda x: x.fs_type is None: ValueError('File system type must be set for modification')
}
for check, exc in checks.items():
found = next(filter(check, partitions), None)
if found is not None:
raise exc
def format(
def _lvm_info(
self,
device_mod: DeviceModification,
enc_conf: Optional['DiskEncryption'] = None
):
"""
Format can be given an overriding path, for instance /dev/null to test
the formatting functionality and in essence the support for the given filesystem.
"""
cmd: str,
info_type: Literal['lv', 'vg', 'pvseg']
) -> Optional[Any]:
raw_info = SysCommand(cmd).decode().split('\n')
# only verify partitions that are being created or modified
create_or_modify_parts = [p for p in device_mod.partitions if p.is_create_or_modify()]
# for whatever reason the output sometimes contains
# "File descriptor X leaked leaked on vgs invocation
data = '\n'.join([raw for raw in raw_info if 'File descriptor' not in raw])
self._validate_partitions(create_or_modify_parts)
debug(f'LVM info: {data}')
# make sure all devices are unmounted
self._umount_all_existing(device_mod.device_path)
reports = json.loads(data)
for part_mod in create_or_modify_parts:
# partition will be encrypted
if enc_conf is not None and part_mod in enc_conf.partitions:
self._perform_enc_formatting(
part_mod.safe_dev_path,
part_mod.mapper_name,
part_mod.safe_fs_type,
enc_conf
)
else:
self._perform_formatting(part_mod.safe_fs_type, part_mod.safe_dev_path)
for report in reports['report']:
if len(report[info_type]) != 1:
raise ValueError(f'Report does not contain any entry')
# synchronize with udev before using lsblk
SysCommand('udevadm settle')
entry = report[info_type][0]
lsblk_info = self._fetch_part_info(part_mod.safe_dev_path)
match info_type:
case 'pvseg':
return LvmPVInfo(
pv_name=Path(entry['pv_name']),
lv_name=entry['lv_name'],
vg_name=entry['vg_name'],
)
case 'lv':
return LvmVolumeInfo(
lv_name=entry['lv_name'],
vg_name=entry['vg_name'],
lv_size=Size(int(entry[f'lv_size'][:-1]), Unit.B, SectorSize.default())
)
case 'vg':
return LvmGroupInfo(
vg_uuid=entry['vg_uuid'],
vg_size=Size(int(entry[f'vg_size'][:-1]), Unit.B, SectorSize.default())
)
part_mod.partn = lsblk_info.partn
part_mod.partuuid = lsblk_info.partuuid
part_mod.uuid = lsblk_info.uuid
return None
def _lvm_info_with_retry(self, cmd: str, info_type: Literal['lv', 'vg', 'pvseg']) -> Optional[Any]:
attempts = 3
for attempt_nr in range(attempts):
try:
return self._lvm_info(cmd, info_type)
except ValueError:
time.sleep(attempt_nr + 1)
raise ValueError(f'Failed to fetch {info_type} information')
def lvm_vol_info(self, lv_name: str) -> Optional[LvmVolumeInfo]:
cmd = (
'lvs --reportformat json '
'--unit B '
f'-S lv_name={lv_name}'
)
return self._lvm_info_with_retry(cmd, 'lv')
def lvm_group_info(self, vg_name: str) -> Optional[LvmGroupInfo]:
cmd = (
'vgs --reportformat json '
'--unit B '
'-o vg_name,vg_uuid,vg_size '
f'-S vg_name={vg_name}'
)
return self._lvm_info_with_retry(cmd, 'vg')
def lvm_pvseg_info(self, vg_name: str, lv_name: str) -> Optional[LvmPVInfo]:
cmd = (
'pvs '
'--segments -o+lv_name,vg_name '
f'-S vg_name={vg_name},lv_name={lv_name} '
'--reportformat json '
)
return self._lvm_info_with_retry(cmd, 'pvseg')
def lvm_vol_change(self, vol: LvmVolume, activate: bool):
active_flag = 'y' if activate else 'n'
cmd = f'lvchange -a {active_flag} {vol.safe_dev_path}'
debug(f'lvchange volume: {cmd}')
SysCommand(cmd)
def lvm_export_vg(self, vg: LvmVolumeGroup):
cmd = f'vgexport {vg.name}'
debug(f'vgexport: {cmd}')
SysCommand(cmd)
def lvm_import_vg(self, vg: LvmVolumeGroup):
cmd = f'vgimport {vg.name}'
debug(f'vgimport: {cmd}')
SysCommand(cmd)
def lvm_vol_reduce(self, vol_path: Path, amount: Size):
val = amount.format_size(Unit.B, include_unit=False)
cmd = f'lvreduce -L -{val}B {vol_path}'
debug(f'Reducing LVM volume size: {cmd}')
SysCommand(cmd)
def lvm_pv_create(self, pvs: Iterable[Path]):
cmd = 'pvcreate ' + ' '.join([str(pv) for pv in pvs])
debug(f'Creating LVM PVS: {cmd}')
worker = SysCommandWorker(cmd)
worker.poll()
worker.write(b'y\n', line_ending=False)
def lvm_vg_create(self, pvs: Iterable[Path], vg_name: str):
pvs_str = ' '.join([str(pv) for pv in pvs])
cmd = f'vgcreate --yes {vg_name} {pvs_str}'
debug(f'Creating LVM group: {cmd}')
worker = SysCommandWorker(cmd)
worker.poll()
worker.write(b'y\n', line_ending=False)
def lvm_vol_create(self, vg_name: str, volume: LvmVolume, offset: Optional[Size] = None):
if offset is not None:
length = volume.length - offset
else:
length = volume.length
length_str = length.format_size(Unit.B, include_unit=False)
cmd = f'lvcreate --yes -L {length_str}B {vg_name} -n {volume.name}'
debug(f'Creating volume: {cmd}')
worker = SysCommandWorker(cmd)
worker.poll()
worker.write(b'y\n', line_ending=False)
volume.vg_name = vg_name
volume.dev_path = Path(f'/dev/{vg_name}/{volume.name}')
def _setup_partition(
self,
@ -385,7 +502,7 @@ class DeviceHandler(object):
# the partition has a path now that it has been added
part_mod.dev_path = Path(partition.path)
def _fetch_part_info(self, path: Path) -> LsblkInfo:
def fetch_part_info(self, path: Path) -> LsblkInfo:
lsblk_info = get_lsblk_info(path)
if not lsblk_info.partn:
@ -404,6 +521,37 @@ class DeviceHandler(object):
return lsblk_info
def create_lvm_btrfs_subvolumes(
self,
path: Path,
btrfs_subvols: List[SubvolumeModification],
mount_options: List[str]
):
info(f'Creating subvolumes: {path}')
self.mount(path, self._TMP_BTRFS_MOUNT, create_target_mountpoint=True)
for sub_vol in btrfs_subvols:
debug(f'Creating subvolume: {sub_vol.name}')
subvol_path = self._TMP_BTRFS_MOUNT / sub_vol.name
SysCommand(f"btrfs subvolume create {subvol_path}")
if BtrfsMountOption.nodatacow.value in mount_options:
try:
SysCommand(f'chattr +C {subvol_path}')
except SysCallError as err:
raise DiskError(f'Could not set nodatacow attribute at {subvol_path}: {err}')
if BtrfsMountOption.compress.value in mount_options:
try:
SysCommand(f'chattr +c {subvol_path}')
except SysCallError as err:
raise DiskError(f'Could not set compress attribute at {subvol_path}: {err}')
self.umount(path)
def create_btrfs_volumes(
self,
part_mod: PartitionModification,
@ -468,8 +616,8 @@ class DeviceHandler(object):
return luks_handler
def _umount_all_existing(self, device_path: Path):
info(f'Unmounting all existing partitions: {device_path}')
def umount_all_existing(self, device_path: Path):
debug(f'Unmounting all existing partitions: {device_path}')
existing_partitions = self._devices[device_path].partition_infos
@ -498,7 +646,7 @@ class DeviceHandler(object):
raise DiskError('Too many partitions on disk, MBR disks can only have 3 primary partitions')
# make sure all devices are unmounted
self._umount_all_existing(modification.device_path)
self.umount_all_existing(modification.device_path)
# WARNING: the entire device will be wiped and all data lost
if modification.wipe:

View File

@ -41,6 +41,8 @@ class DiskLayoutType(Enum):
class DiskLayoutConfiguration:
config_type: DiskLayoutType
device_modifications: List[DeviceModification] = field(default_factory=list)
lvm_config: Optional[LvmConfiguration] = None
# used for pre-mounted config
mountpoint: Optional[Path] = None
@ -51,13 +53,18 @@ class DiskLayoutConfiguration:
'mountpoint': str(self.mountpoint)
}
else:
return {
config: Dict[str, Any] = {
'config_type': self.config_type.value,
'device_modifications': [mod.json() for mod in self.device_modifications]
'device_modifications': [mod.json() for mod in self.device_modifications],
}
if self.lvm_config:
config['lvm_config'] = self.lvm_config.json()
return config
@classmethod
def parse_arg(cls, disk_config: Dict[str, List[Dict[str, Any]]]) -> Optional[DiskLayoutConfiguration]:
def parse_arg(cls, disk_config: Dict[str, Any]) -> Optional[DiskLayoutConfiguration]:
from .device_handler import device_handler
device_modifications: List[DeviceModification] = []
@ -124,6 +131,10 @@ class DiskLayoutConfiguration:
device_modification.partitions = device_partitions
device_modifications.append(device_modification)
# Parse LVM configuration from settings
if (lvm_arg := disk_config.get('lvm_config', None)) is not None:
config.lvm_config = LvmConfiguration.parse_arg(lvm_arg, config)
return config
@ -133,24 +144,24 @@ class PartitionTable(Enum):
class Unit(Enum):
B = 1 # byte
kB = 1000**1 # kilobyte
MB = 1000**2 # megabyte
GB = 1000**3 # gigabyte
TB = 1000**4 # terabyte
PB = 1000**5 # petabyte
EB = 1000**6 # exabyte
ZB = 1000**7 # zettabyte
YB = 1000**8 # yottabyte
B = 1 # byte
kB = 1000 ** 1 # kilobyte
MB = 1000 ** 2 # megabyte
GB = 1000 ** 3 # gigabyte
TB = 1000 ** 4 # terabyte
PB = 1000 ** 5 # petabyte
EB = 1000 ** 6 # exabyte
ZB = 1000 ** 7 # zettabyte
YB = 1000 ** 8 # yottabyte
KiB = 1024**1 # kibibyte
MiB = 1024**2 # mebibyte
GiB = 1024**3 # gibibyte
TiB = 1024**4 # tebibyte
PiB = 1024**5 # pebibyte
EiB = 1024**6 # exbibyte
ZiB = 1024**7 # zebibyte
YiB = 1024**8 # yobibyte
KiB = 1024 ** 1 # kibibyte
MiB = 1024 ** 2 # mebibyte
GiB = 1024 ** 3 # gibibyte
TiB = 1024 ** 4 # tebibyte
PiB = 1024 ** 5 # pebibyte
EiB = 1024 ** 6 # exbibyte
ZiB = 1024 ** 7 # zebibyte
YiB = 1024 ** 8 # yobibyte
sectors = 'sectors' # size in sector
@ -575,7 +586,7 @@ class PartitionFlag(Enum):
Which is the way libparted checks for its flags: https://git.savannah.gnu.org/gitweb/?p=parted.git;a=blob;f=libparted/labels/gpt.c;hb=4a0e468ed63fff85a1f9b923189f20945b32f4f1#l183
"""
Boot = _ped.PARTITION_BOOT
XBOOTLDR = _ped.PARTITION_BLS_BOOT # Note: parted calls this bls_boot
XBOOTLDR = _ped.PARTITION_BLS_BOOT # Note: parted calls this bls_boot
ESP = _ped.PARTITION_ESP
@ -658,6 +669,10 @@ class PartitionModification:
flags: List[PartitionFlag] = field(default_factory=list)
btrfs_subvols: List[SubvolumeModification] = field(default_factory=list)
# only set when modification was created from an existing
# partition info object to be able to reference it back
part_info: Optional[_PartitionInfo] = None
# only set if the device was created or exists
dev_path: Optional[Path] = None
partn: Optional[int] = None
@ -724,7 +739,8 @@ class PartitionModification:
uuid=partition_info.uuid,
flags=partition_info.flags,
mountpoint=mountpoint,
btrfs_subvols=subvol_mods
btrfs_subvols=subvol_mods,
part_info=partition_info
)
@property
@ -832,6 +848,270 @@ class PartitionModification:
return part_mod
class LvmLayoutType(Enum):
Default = 'default'
# Manual = 'manual_lvm'
def display_msg(self) -> str:
match self:
case LvmLayoutType.Default:
return str(_('Default layout'))
# case LvmLayoutType.Manual:
# return str(_('Manual configuration'))
raise ValueError(f'Unknown type: {self}')
@dataclass
class LvmVolumeGroup:
name: str
pvs: List[PartitionModification]
volumes: List[LvmVolume] = field(default_factory=list)
def json(self) -> Dict[str, Any]:
return {
'name': self.name,
'lvm_pvs': [p.obj_id for p in self.pvs],
'volumes': [vol.json() for vol in self.volumes]
}
@staticmethod
def parse_arg(arg: Dict[str, Any], disk_config: DiskLayoutConfiguration) -> LvmVolumeGroup:
lvm_pvs = []
for mod in disk_config.device_modifications:
for part in mod.partitions:
if part.obj_id in arg.get('lvm_pvs', []):
lvm_pvs.append(part)
return LvmVolumeGroup(
arg['name'],
lvm_pvs,
[LvmVolume.parse_arg(vol) for vol in arg['volumes']]
)
def contains_lv(self, lv: LvmVolume) -> bool:
return lv in self.volumes
class LvmVolumeStatus(Enum):
Exist = 'existing'
Modify = 'modify'
Delete = 'delete'
Create = 'create'
@dataclass
class LvmVolume:
status: LvmVolumeStatus
name: str
fs_type: FilesystemType
length: Size
mountpoint: Optional[Path]
mount_options: List[str] = field(default_factory=list)
btrfs_subvols: List[SubvolumeModification] = field(default_factory=list)
# volume group name
vg_name: Optional[str] = None
# mapper device path /dev/<vg>/<vol>
dev_path: Optional[Path] = None
def __post_init__(self):
# needed to use the object as a dictionary key due to hash func
if not hasattr(self, '_obj_id'):
self._obj_id = uuid.uuid4()
def __hash__(self):
return hash(self._obj_id)
@property
def obj_id(self) -> str:
if hasattr(self, '_obj_id'):
return str(self._obj_id)
return ''
@property
def mapper_name(self) -> Optional[str]:
if self.dev_path:
return f'{storage.get("ENC_IDENTIFIER", "ai")}{self.safe_dev_path.name}'
return None
@property
def mapper_path(self) -> Path:
if self.mapper_name:
return Path(f'/dev/mapper/{self.mapper_name}')
raise ValueError('No mapper path set')
@property
def safe_dev_path(self) -> Path:
if self.dev_path:
return self.dev_path
raise ValueError('No device path for volume defined')
@property
def safe_fs_type(self) -> FilesystemType:
if self.fs_type is None:
raise ValueError('File system type is not set')
return self.fs_type
@property
def relative_mountpoint(self) -> Path:
"""
Will return the relative path based on the anchor
e.g. Path('/mnt/test') -> Path('mnt/test')
"""
if self.mountpoint is not None:
return self.mountpoint.relative_to(self.mountpoint.anchor)
raise ValueError('Mountpoint is not specified')
@staticmethod
def parse_arg(arg: Dict[str, Any]) -> LvmVolume:
volume = LvmVolume(
status=LvmVolumeStatus(arg['status']),
name=arg['name'],
fs_type=FilesystemType(arg['fs_type']),
length=Size.parse_args(arg['length']),
mountpoint=Path(arg['mountpoint']) if arg['mountpoint'] else None,
mount_options=arg.get('mount_options', []),
btrfs_subvols=SubvolumeModification.parse_args(arg.get('btrfs', []))
)
setattr(volume, '_obj_id', arg['obj_id'])
return volume
def json(self) -> Dict[str, Any]:
return {
'obj_id': self.obj_id,
'status': self.status.value,
'name': self.name,
'fs_type': self.fs_type.value,
'length': self.length.json(),
'mountpoint': str(self.mountpoint) if self.mountpoint else None,
'mount_options': self.mount_options,
'btrfs': [vol.json() for vol in self.btrfs_subvols]
}
def table_data(self) -> Dict[str, Any]:
part_mod = {
'Type': self.status.value,
'Name': self.name,
'Size': self.length.format_highest(),
'FS type': self.fs_type.value,
'Mountpoint': str(self.mountpoint) if self.mountpoint else '',
'Mount options': ', '.join(self.mount_options),
'Btrfs': '{} {}'.format(str(len(self.btrfs_subvols)), 'vol')
}
return part_mod
def is_modify(self) -> bool:
return self.status == LvmVolumeStatus.Modify
def exists(self) -> bool:
return self.status == LvmVolumeStatus.Exist
def is_exists_or_modify(self) -> bool:
return self.status in [LvmVolumeStatus.Exist, LvmVolumeStatus.Modify]
def is_root(self) -> bool:
if self.mountpoint is not None:
return Path('/') == self.mountpoint
else:
for subvol in self.btrfs_subvols:
if subvol.is_root():
return True
return False
@dataclass
class LvmGroupInfo:
vg_size: Size
vg_uuid: str
@dataclass
class LvmVolumeInfo:
lv_name: str
vg_name: str
lv_size: Size
@dataclass
class LvmPVInfo:
pv_name: Path
lv_name: str
vg_name: str
@dataclass
class LvmConfiguration:
config_type: LvmLayoutType
vol_groups: List[LvmVolumeGroup]
def __post_init__(self):
# make sure all volume groups have unique PVs
pvs = []
for group in self.vol_groups:
for pv in group.pvs:
if pv in pvs:
raise ValueError('A PV cannot be used in multiple volume groups')
pvs.append(pv)
def json(self) -> Dict[str, Any]:
return {
'config_type': self.config_type.value,
'vol_groups': [vol_gr.json() for vol_gr in self.vol_groups]
}
@staticmethod
def parse_arg(arg: Dict[str, Any], disk_config: DiskLayoutConfiguration) -> LvmConfiguration:
lvm_pvs = []
for mod in disk_config.device_modifications:
for part in mod.partitions:
if part.obj_id in arg.get('lvm_pvs', []):
lvm_pvs.append(part)
return LvmConfiguration(
config_type=LvmLayoutType(arg['config_type']),
vol_groups=[LvmVolumeGroup.parse_arg(vol_group, disk_config) for vol_group in arg['vol_groups']],
)
def get_all_pvs(self) -> List[PartitionModification]:
pvs = []
for vg in self.vol_groups:
pvs += vg.pvs
return pvs
def get_all_volumes(self) -> List[LvmVolume]:
volumes = []
for vg in self.vol_groups:
volumes += vg.volumes
return volumes
def get_root_volume(self) -> Optional[LvmVolume]:
for vg in self.vol_groups:
filtered = next(filter(lambda x: x.is_root(), vg.volumes), None)
if filtered:
return filtered
return None
# def get_lv_crypt_uuid(self, lv: LvmVolume, encryption: EncryptionType) -> str:
# """
# Find the LUKS superblock UUID for the device that
# contains the given logical volume
# """
# for vg in self.vol_groups:
# if vg.contains_lv(lv):
@dataclass
class DeviceModification:
device: BDevice
@ -885,11 +1165,16 @@ class DeviceModification:
class EncryptionType(Enum):
NoEncryption = "no_encryption"
Luks = "luks"
LvmOnLuks = 'lvm_on_luks'
LuksOnLvm = 'luks_on_lvm'
@classmethod
def _encryption_type_mapper(cls) -> Dict[str, 'EncryptionType']:
return {
'Luks': EncryptionType.Luks
str(_('No Encryption')): EncryptionType.NoEncryption,
str(_('LUKS')): EncryptionType.Luks,
str(_('LVM on LUKS')): EncryptionType.LvmOnLuks,
str(_('LUKS on LVM')): EncryptionType.LuksOnLvm
}
@classmethod
@ -906,18 +1191,31 @@ class EncryptionType(Enum):
@dataclass
class DiskEncryption:
encryption_type: EncryptionType = EncryptionType.Luks
encryption_type: EncryptionType = EncryptionType.NoEncryption
encryption_password: str = ''
partitions: List[PartitionModification] = field(default_factory=list)
lvm_volumes: List[LvmVolume] = field(default_factory=list)
hsm_device: Optional[Fido2Device] = None
def should_generate_encryption_file(self, part_mod: PartitionModification) -> bool:
return part_mod in self.partitions and part_mod.mountpoint != Path('/')
def __post_init__(self):
if self.encryption_type in [EncryptionType.Luks, EncryptionType.LvmOnLuks] and not self.partitions:
raise ValueError('Luks or LvmOnLuks encryption require partitions to be defined')
if self.encryption_type == EncryptionType.LuksOnLvm and not self.lvm_volumes:
raise ValueError('LuksOnLvm encryption require LMV volumes to be defined')
def should_generate_encryption_file(self, dev: PartitionModification | LvmVolume) -> bool:
if isinstance(dev, PartitionModification):
return dev in self.partitions and dev.mountpoint != Path('/')
elif isinstance(dev, LvmVolume):
return dev in self.lvm_volumes and dev.mountpoint != Path('/')
return False
def json(self) -> Dict[str, Any]:
obj: Dict[str, Any] = {
'encryption_type': self.encryption_type.value,
'partitions': [p.obj_id for p in self.partitions]
'partitions': [p.obj_id for p in self.partitions],
'lvm_volumes': [vol.obj_id for vol in self.lvm_volumes]
}
if self.hsm_device:
@ -925,23 +1223,47 @@ class DiskEncryption:
return obj
@classmethod
def validate_enc(cls, disk_config: DiskLayoutConfiguration) -> bool:
partitions = []
for mod in disk_config.device_modifications:
for part in mod.partitions:
partitions.append(part)
if len(partitions) > 2: # assume one boot and at least 2 additional
if disk_config.lvm_config:
return False
return True
@classmethod
def parse_arg(
cls,
disk_config: DiskLayoutConfiguration,
arg: Dict[str, Any],
password: str = ''
) -> 'DiskEncryption':
) -> Optional['DiskEncryption']:
if not cls.validate_enc(disk_config):
return None
enc_partitions = []
for mod in disk_config.device_modifications:
for part in mod.partitions:
if part.obj_id in arg.get('partitions', []):
enc_partitions.append(part)
volumes = []
if disk_config.lvm_config:
for vol in disk_config.lvm_config.get_all_volumes():
if vol.obj_id in arg.get('lvm_volumes', []):
volumes.append(vol)
enc = DiskEncryption(
EncryptionType(arg['encryption_type']),
password,
enc_partitions
enc_partitions,
volumes
)
if hsm := arg.get('hsm_device', None):
@ -992,7 +1314,7 @@ class LsblkInfo:
tran: Optional[str] = None
partn: Optional[int] = None
partuuid: Optional[str] = None
parttype :Optional[str] = None
parttype: Optional[str] = None
uuid: Optional[str] = None
fstype: Optional[str] = None
fsver: Optional[str] = None
@ -1017,7 +1339,7 @@ class LsblkInfo:
'tran': self.tran,
'partn': self.partn,
'partuuid': self.partuuid,
'parttype' : self.parttype,
'parttype': self.parttype,
'uuid': self.uuid,
'fstype': self.fstype,
'fsver': self.fsver,
@ -1102,13 +1424,24 @@ def _clean_field(name: str, clean_type: CleanType) -> str:
return name.replace('_percentage', '%').replace('_', '-')
def _fetch_lsblk_info(dev_path: Optional[Union[Path, str]] = None) -> List[LsblkInfo]:
def _fetch_lsblk_info(
dev_path: Optional[Union[Path, str]] = None,
reverse: bool = False,
full_dev_path: bool = False,
retry: int = 3
) -> List[LsblkInfo]:
fields = [_clean_field(f, CleanType.Lsblk) for f in LsblkInfo.fields()]
cmd = ['lsblk', '--json', '--bytes', '--output', '+' + ','.join(fields)]
if dev_path:
cmd.append(str(dev_path))
if reverse:
cmd.append('--inverse')
if full_dev_path:
cmd.append('--paths')
try:
result = SysCommand(cmd).decode()
except SysCallError as err:
@ -1132,8 +1465,12 @@ def _fetch_lsblk_info(dev_path: Optional[Union[Path, str]] = None) -> List[Lsblk
return [LsblkInfo.from_json(device) for device in blockdevices]
def get_lsblk_info(dev_path: Union[Path, str]) -> LsblkInfo:
if infos := _fetch_lsblk_info(dev_path):
def get_lsblk_info(
dev_path: Union[Path, str],
reverse: bool = False,
full_dev_path: bool = False
) -> LsblkInfo:
if infos := _fetch_lsblk_info(dev_path, reverse=reverse, full_dev_path=full_dev_path):
return infos[0]
raise DiskError(f'lsblk failed to retrieve information for "{dev_path}"')
@ -1142,6 +1479,7 @@ def get_lsblk_info(dev_path: Union[Path, str]) -> LsblkInfo:
def get_all_lsblk_info() -> List[LsblkInfo]:
return _fetch_lsblk_info()
def get_lsblk_by_mountpoint(mountpoint: Path, as_prefix: bool = False) -> List[LsblkInfo]:
def _check(infos: List[LsblkInfo]) -> List[LsblkInfo]:
devices = []

View File

@ -0,0 +1,140 @@
from typing import Dict, Optional, Any, TYPE_CHECKING, List
from . import DiskLayoutConfiguration, DiskLayoutType
from .device_model import LvmConfiguration
from ..disk import (
DeviceModification
)
from ..interactions import select_disk_config
from ..interactions.disk_conf import select_lvm_config
from ..menu import (
Selector,
AbstractSubMenu
)
from ..output import FormattedOutput
if TYPE_CHECKING:
_: Any
class DiskLayoutConfigurationMenu(AbstractSubMenu):
def __init__(
self,
disk_layout_config: Optional[DiskLayoutConfiguration],
data_store: Dict[str, Any],
advanced: bool = False
):
self._disk_layout_config = disk_layout_config
self._advanced = advanced
super().__init__(data_store=data_store, preview_size=0.5)
def setup_selection_menu_options(self):
self._menu_options['disk_config'] = \
Selector(
_('Partitioning'),
lambda x: self._select_disk_layout_config(x),
display_func=lambda x: self._display_disk_layout(x),
preview_func=self._prev_disk_layouts,
default=self._disk_layout_config,
enabled=True
)
self._menu_options['lvm_config'] = \
Selector(
_('Logical Volume Management (LVM)'),
lambda x: self._select_lvm_config(x),
display_func=lambda x: self.defined_text if x else '',
preview_func=self._prev_lvm_config,
default=self._disk_layout_config.lvm_config if self._disk_layout_config else None,
dependencies=[self._check_dep_lvm],
enabled=True
)
def run(self, allow_reset: bool = True) -> Optional[DiskLayoutConfiguration]:
super().run(allow_reset=allow_reset)
disk_layout_config: Optional[DiskLayoutConfiguration] = self._data_store.get('disk_config', None)
if disk_layout_config:
disk_layout_config.lvm_config = self._data_store.get('lvm_config', None)
return disk_layout_config
def _check_dep_lvm(self) -> bool:
disk_layout_conf: Optional[DiskLayoutConfiguration] = self._menu_options['disk_config'].current_selection
if disk_layout_conf and disk_layout_conf.config_type == DiskLayoutType.Default:
return True
return False
def _select_disk_layout_config(
self,
preset: Optional[DiskLayoutConfiguration]
) -> Optional[DiskLayoutConfiguration]:
disk_config = select_disk_config(preset, advanced_option=self._advanced)
if disk_config != preset:
self._menu_options['lvm_config'].set_current_selection(None)
return disk_config
def _select_lvm_config(self, preset: Optional[LvmConfiguration]) -> Optional[LvmConfiguration]:
disk_config: Optional[DiskLayoutConfiguration] = self._menu_options['disk_config'].current_selection
if disk_config:
return select_lvm_config(disk_config, preset=preset)
return preset
def _display_disk_layout(self, current_value: Optional[DiskLayoutConfiguration] = None) -> str:
if current_value:
return current_value.config_type.display_msg()
return ''
def _prev_disk_layouts(self) -> Optional[str]:
disk_layout_conf: Optional[DiskLayoutConfiguration] = self._menu_options['disk_config'].current_selection
if disk_layout_conf:
device_mods: List[DeviceModification] = \
list(filter(lambda x: len(x.partitions) > 0, disk_layout_conf.device_modifications))
if device_mods:
output_partition = '{}: {}\n'.format(str(_('Configuration')), disk_layout_conf.config_type.display_msg())
output_btrfs = ''
for mod in device_mods:
# create partition table
partition_table = FormattedOutput.as_table(mod.partitions)
output_partition += f'{mod.device_path}: {mod.device.device_info.model}\n'
output_partition += partition_table + '\n'
# create btrfs table
btrfs_partitions = list(
filter(lambda p: len(p.btrfs_subvols) > 0, mod.partitions)
)
for partition in btrfs_partitions:
output_btrfs += FormattedOutput.as_table(partition.btrfs_subvols) + '\n'
output = output_partition + output_btrfs
return output.rstrip()
return None
def _prev_lvm_config(self) -> Optional[str]:
lvm_config: Optional[LvmConfiguration] = self._menu_options['lvm_config'].current_selection
if lvm_config:
output = '{}: {}\n'.format(str(_('Configuration')), lvm_config.config_type.display_msg())
for vol_gp in lvm_config.vol_groups:
pv_table = FormattedOutput.as_table(vol_gp.pvs)
output += '{}:\n{}'.format(str(_('Physical volumes')), pv_table)
output += f'\nVolume Group: {vol_gp.name}'
lvm_volumes = FormattedOutput.as_table(vol_gp.volumes)
output += '\n\n{}:\n{}'.format(str(_('Volumes')), lvm_volumes)
return output
return None

View File

@ -1,6 +1,7 @@
from pathlib import Path
from typing import Dict, Optional, Any, TYPE_CHECKING, List
from . import LvmConfiguration, LvmVolume
from ..disk import (
DeviceModification,
DiskLayoutConfiguration,
@ -40,31 +41,41 @@ class DiskEncryptionMenu(AbstractSubMenu):
super().__init__(data_store=data_store)
def setup_selection_menu_options(self):
self._menu_options['encryption_type'] = \
Selector(
_('Encryption type'),
func=lambda preset: select_encryption_type(self._disk_config, preset),
display_func=lambda x: EncryptionType.type_to_text(x) if x else None,
default=self._preset.encryption_type,
enabled=True,
)
self._menu_options['encryption_password'] = \
Selector(
_('Encryption password'),
lambda x: select_encrypted_password(),
dependencies=[self._check_dep_enc_type],
display_func=lambda x: secret(x) if x else '',
default=self._preset.encryption_password,
enabled=True
)
self._menu_options['encryption_type'] = \
Selector(
_('Encryption type'),
func=lambda preset: select_encryption_type(preset),
display_func=lambda x: EncryptionType.type_to_text(x) if x else None,
dependencies=['encryption_password'],
default=self._preset.encryption_type,
enabled=True
)
self._menu_options['partitions'] = \
Selector(
_('Partitions'),
func=lambda preset: select_partitions_to_encrypt(self._disk_config.device_modifications, preset),
display_func=lambda x: f'{len(x)} {_("Partitions")}' if x else None,
dependencies=['encryption_password'],
dependencies=[self._check_dep_partitions],
default=self._preset.partitions,
preview_func=self._prev_disk_layouts,
preview_func=self._prev_partitions,
enabled=True
)
self._menu_options['lvm_vols'] = \
Selector(
_('LVM volumes'),
func=lambda preset: self._select_lvm_vols(preset),
display_func=lambda x: f'{len(x)} {_("LVM volumes")}' if x else None,
dependencies=[self._check_dep_lvm_vols],
default=self._preset.lvm_volumes,
preview_func=self._prev_lvm_vols,
enabled=True
)
self._menu_options['HSM'] = \
@ -73,19 +84,54 @@ class DiskEncryptionMenu(AbstractSubMenu):
func=lambda preset: select_hsm(preset),
display_func=lambda x: self._display_hsm(x),
preview_func=self._prev_hsm,
dependencies=['encryption_password'],
dependencies=[self._check_dep_enc_type],
default=self._preset.hsm_device,
enabled=True
)
def _select_lvm_vols(self, preset: List[LvmVolume]) -> List[LvmVolume]:
if self._disk_config.lvm_config:
return select_lvm_vols_to_encrypt(self._disk_config.lvm_config, preset=preset)
return []
def _check_dep_enc_type(self) -> bool:
enc_type: Optional[EncryptionType] = self._menu_options['encryption_type'].current_selection
if enc_type and enc_type != EncryptionType.NoEncryption:
return True
return False
def _check_dep_partitions(self) -> bool:
enc_type: Optional[EncryptionType] = self._menu_options['encryption_type'].current_selection
if enc_type and enc_type in [EncryptionType.Luks, EncryptionType.LvmOnLuks]:
return True
return False
def _check_dep_lvm_vols(self) -> bool:
enc_type: Optional[EncryptionType] = self._menu_options['encryption_type'].current_selection
if enc_type and enc_type == EncryptionType.LuksOnLvm:
return True
return False
def run(self, allow_reset: bool = True) -> Optional[DiskEncryption]:
super().run(allow_reset=allow_reset)
if self._data_store.get('encryption_password', None):
enc_type = self._data_store.get('encryption_type', None)
enc_password = self._data_store.get('encryption_password', None)
enc_partitions = self._data_store.get('partitions', None)
enc_lvm_vols = self._data_store.get('lvm_vols', None)
if enc_type in [EncryptionType.Luks, EncryptionType.LvmOnLuks] and enc_partitions:
enc_lvm_vols = []
if enc_type == EncryptionType.LuksOnLvm:
enc_partitions = []
if enc_type != EncryptionType.NoEncryption and enc_password and (enc_partitions or enc_lvm_vols):
return DiskEncryption(
encryption_password=self._data_store.get('encryption_password', None),
encryption_type=self._data_store['encryption_type'],
partitions=self._data_store.get('partitions', None),
encryption_password=enc_password,
encryption_type=enc_type,
partitions=enc_partitions,
lvm_volumes=enc_lvm_vols,
hsm_device=self._data_store.get('HSM', None)
)
@ -97,7 +143,7 @@ class DiskEncryptionMenu(AbstractSubMenu):
return None
def _prev_disk_layouts(self) -> Optional[str]:
def _prev_partitions(self) -> Optional[str]:
partitions: Optional[List[PartitionModification]] = self._menu_options['partitions'].current_selection
if partitions:
output = str(_('Partitions to be encrypted')) + '\n'
@ -106,6 +152,15 @@ class DiskEncryptionMenu(AbstractSubMenu):
return None
def _prev_lvm_vols(self) -> Optional[str]:
volumes: Optional[List[PartitionModification]] = self._menu_options['lvm_vols'].current_selection
if volumes:
output = str(_('LVM volumes to be encrypted')) + '\n'
output += FormattedOutput.as_table(volumes)
return output.rstrip()
return None
def _prev_hsm(self) -> Optional[str]:
try:
Fido2.get_fido2_devices()
@ -123,13 +178,19 @@ class DiskEncryptionMenu(AbstractSubMenu):
return None
def select_encryption_type(preset: EncryptionType) -> Optional[EncryptionType]:
def select_encryption_type(disk_config: DiskLayoutConfiguration, preset: EncryptionType) -> Optional[EncryptionType]:
title = str(_('Select disk encryption option'))
options = [
EncryptionType.type_to_text(EncryptionType.Luks)
]
if disk_config.lvm_config:
options = [
EncryptionType.type_to_text(EncryptionType.LvmOnLuks),
EncryptionType.type_to_text(EncryptionType.LuksOnLvm)
]
else:
options = [EncryptionType.type_to_text(EncryptionType.Luks)]
preset_value = EncryptionType.type_to_text(preset)
choice = Menu(title, options, preset_values=preset_value).run()
match choice.type_:
@ -197,3 +258,31 @@ def select_partitions_to_encrypt(
case MenuSelectionType.Selection:
return choice.multi_value
return []
def select_lvm_vols_to_encrypt(
lvm_config: LvmConfiguration,
preset: List[LvmVolume]
) -> List[LvmVolume]:
volumes: List[LvmVolume] = lvm_config.get_all_volumes()
if volumes:
title = str(_('Select which LVM volumes to encrypt'))
partition_table = FormattedOutput.as_table(volumes)
choice = TableMenu(
title,
table_data=(volumes, partition_table),
preset=preset,
multi=True
).run()
match choice.type_:
case MenuSelectionType.Reset:
return []
case MenuSelectionType.Skip:
return preset
case MenuSelectionType.Selection:
return choice.multi_value
return []

View File

@ -4,7 +4,7 @@ import getpass
from pathlib import Path
from typing import List
from .device_model import PartitionModification, Fido2Device
from .device_model import Fido2Device
from ..general import SysCommand, SysCommandWorker, clear_vt100_escape_codes
from ..output import error, info
from ..exceptions import SysCallError
@ -72,16 +72,16 @@ class Fido2:
def fido2_enroll(
cls,
hsm_device: Fido2Device,
part_mod: PartitionModification,
dev_path: Path,
password: str
):
worker = SysCommandWorker(f"systemd-cryptenroll --fido2-device={hsm_device.path} {part_mod.dev_path}", peek_output=True)
worker = SysCommandWorker(f"systemd-cryptenroll --fido2-device={hsm_device.path} {dev_path}", peek_output=True)
pw_inputted = False
pin_inputted = False
while worker.is_alive():
if pw_inputted is False:
if bytes(f"please enter current passphrase for disk {part_mod.dev_path}", 'UTF-8') in worker._trace_log.lower():
if bytes(f"please enter current passphrase for disk {dev_path}", 'UTF-8') in worker._trace_log.lower():
worker.write(bytes(password, 'UTF-8'))
pw_inputted = True
elif pin_inputted is False:

View File

@ -3,13 +3,21 @@ from __future__ import annotations
import signal
import sys
import time
from typing import Any, Optional, TYPE_CHECKING
from pathlib import Path
from typing import Any, Optional, TYPE_CHECKING, List, Dict, Set
from .device_model import DiskLayoutConfiguration, DiskLayoutType, PartitionTable, FilesystemType, DiskEncryption
from .device_handler import device_handler
from .device_model import (
DiskLayoutConfiguration, DiskLayoutType, PartitionTable,
FilesystemType, DiskEncryption, LvmVolumeGroup,
Size, Unit, SectorSize, PartitionModification, EncryptionType,
LvmVolume, LvmConfiguration
)
from ..hardware import SysInfo
from ..output import debug
from ..luks import Luks2
from ..menu import Menu
from ..output import debug, info
from ..general import SysCommand
if TYPE_CHECKING:
_: Any
@ -52,13 +60,288 @@ class FilesystemHandler:
for mod in device_mods:
device_handler.partition(mod, partition_table=partition_table)
device_handler.format(mod, enc_conf=self._enc_config)
for part_mod in mod.partitions:
if part_mod.is_create_or_modify():
if self._disk_config.lvm_config:
for mod in device_mods:
if boot_part := mod.get_boot_partition():
debug(f'Formatting boot partition: {boot_part.dev_path}')
self._format_partitions(
[boot_part],
mod.device_path
)
self.perform_lvm_operations()
else:
for mod in device_mods:
self._format_partitions(
mod.partitions,
mod.device_path
)
for part_mod in mod.partitions:
if part_mod.fs_type == FilesystemType.Btrfs:
device_handler.create_btrfs_volumes(part_mod, enc_conf=self._enc_config)
def _format_partitions(
self,
partitions: List[PartitionModification],
device_path: Path
):
"""
Format can be given an overriding path, for instance /dev/null to test
the formatting functionality and in essence the support for the given filesystem.
"""
# don't touch existing partitions
create_or_modify_parts = [p for p in partitions if p.is_create_or_modify()]
self._validate_partitions(create_or_modify_parts)
# make sure all devices are unmounted
device_handler.umount_all_existing(device_path)
for part_mod in create_or_modify_parts:
# partition will be encrypted
if self._enc_config is not None and part_mod in self._enc_config.partitions:
device_handler.format_encrypted(
part_mod.safe_dev_path,
part_mod.mapper_name,
part_mod.safe_fs_type,
self._enc_config
)
else:
device_handler.format(part_mod.safe_fs_type, part_mod.safe_dev_path)
# synchronize with udev before using lsblk
SysCommand('udevadm settle')
lsblk_info = device_handler.fetch_part_info(part_mod.safe_dev_path)
part_mod.partn = lsblk_info.partn
part_mod.partuuid = lsblk_info.partuuid
part_mod.uuid = lsblk_info.uuid
def _validate_partitions(self, partitions: List[PartitionModification]):
checks = {
# verify that all partitions have a path set (which implies that they have been created)
lambda x: x.dev_path is None: ValueError('When formatting, all partitions must have a path set'),
# crypto luks is not a valid file system type
lambda x: x.fs_type is FilesystemType.Crypto_luks: ValueError(
'Crypto luks cannot be set as a filesystem type'),
# file system type must be set
lambda x: x.fs_type is None: ValueError('File system type must be set for modification')
}
for check, exc in checks.items():
found = next(filter(check, partitions), None)
if found is not None:
raise exc
def perform_lvm_operations(self):
info('Setting up LVM config...')
if not self._disk_config.lvm_config:
return
if self._enc_config:
self._setup_lvm_encrypted(
self._disk_config.lvm_config,
self._enc_config
)
else:
self._setup_lvm(self._disk_config.lvm_config)
self._format_lvm_vols(self._disk_config.lvm_config)
def _setup_lvm_encrypted(self, lvm_config: LvmConfiguration, enc_config: DiskEncryption):
if enc_config.encryption_type == EncryptionType.LvmOnLuks:
enc_mods = self._encrypt_partitions(enc_config, lock_after_create=False)
self._setup_lvm(lvm_config, enc_mods)
self._format_lvm_vols(lvm_config)
# export the lvm group safely otherwise the Luks cannot be closed
self._safely_close_lvm(lvm_config)
for luks in enc_mods.values():
luks.lock()
elif enc_config.encryption_type == EncryptionType.LuksOnLvm:
self._setup_lvm(lvm_config)
enc_vols = self._encrypt_lvm_vols(lvm_config, enc_config, False)
self._format_lvm_vols(lvm_config, enc_vols)
for luks in enc_vols.values():
luks.lock()
self._safely_close_lvm(lvm_config)
def _safely_close_lvm(self, lvm_config: LvmConfiguration):
for vg in lvm_config.vol_groups:
for vol in vg.volumes:
device_handler.lvm_vol_change(vol, False)
device_handler.lvm_export_vg(vg)
def _setup_lvm(
self,
lvm_config: LvmConfiguration,
enc_mods: Dict[PartitionModification, Luks2] = {}
):
self._lvm_create_pvs(lvm_config, enc_mods)
for vg in lvm_config.vol_groups:
pv_dev_paths = self._get_all_pv_dev_paths(vg.pvs, enc_mods)
device_handler.lvm_vg_create(pv_dev_paths, vg.name)
# figure out what the actual available size in the group is
vg_info = device_handler.lvm_group_info(vg.name)
if not vg_info:
raise ValueError('Unable to fetch VG info')
# the actual available LVM Group size will be smaller than the
# total PVs size due to reserved metadata storage etc.
# so we'll have a look at the total avail. size, check the delta
# to the desired sizes and subtract some equally from the actually
# created volume
avail_size = vg_info.vg_size
desired_size = sum([vol.length for vol in vg.volumes], Size(0, Unit.B, SectorSize.default()))
delta = desired_size - avail_size
max_vol_offset = delta.convert(Unit.B)
max_vol = max(vg.volumes, key=lambda x: x.length)
for lv in vg.volumes:
offset = max_vol_offset if lv == max_vol else None
debug(f'vg: {vg.name}, vol: {lv.name}, offset: {offset}')
device_handler.lvm_vol_create(vg.name, lv, offset)
while True:
debug('Fetching LVM volume info')
lv_info = device_handler.lvm_vol_info(lv.name)
if lv_info is not None:
break
time.sleep(1)
self._lvm_vol_handle_e2scrub(vg)
def _format_lvm_vols(
self,
lvm_config: LvmConfiguration,
enc_vols: Dict[LvmVolume, Luks2] = {}
):
for vol in lvm_config.get_all_volumes():
if enc_vol := enc_vols.get(vol, None):
if not enc_vol.mapper_dev:
raise ValueError('No mapper device defined')
path = enc_vol.mapper_dev
else:
path = vol.safe_dev_path
# wait a bit otherwise the mkfs will fail as it can't
# find the mapper device yet
device_handler.format(vol.fs_type, path)
if vol.fs_type == FilesystemType.Btrfs:
device_handler.create_lvm_btrfs_subvolumes(path, vol.btrfs_subvols, vol.mount_options)
def _lvm_create_pvs(
self,
lvm_config: LvmConfiguration,
enc_mods: Dict[PartitionModification, Luks2] = {}
):
pv_paths: Set[Path] = set()
for vg in lvm_config.vol_groups:
pv_paths |= self._get_all_pv_dev_paths(vg.pvs, enc_mods)
device_handler.lvm_pv_create(pv_paths)
def _get_all_pv_dev_paths(
self,
pvs: List[PartitionModification],
enc_mods: Dict[PartitionModification, Luks2] = {}
) -> Set[Path]:
pv_paths: Set[Path] = set()
for pv in pvs:
if enc_pv := enc_mods.get(pv, None):
if mapper := enc_pv.mapper_dev:
pv_paths.add(mapper)
else:
pv_paths.add(pv.safe_dev_path)
return pv_paths
def _encrypt_lvm_vols(
self,
lvm_config: LvmConfiguration,
enc_config: DiskEncryption,
lock_after_create: bool = True
) -> Dict[LvmVolume, Luks2]:
enc_vols: Dict[LvmVolume, Luks2] = {}
for vol in lvm_config.get_all_volumes():
if vol in enc_config.lvm_volumes:
luks_handler = device_handler.encrypt(
vol.safe_dev_path,
vol.mapper_name,
enc_config.encryption_password,
lock_after_create
)
enc_vols[vol] = luks_handler
return enc_vols
def _encrypt_partitions(
self,
enc_config: DiskEncryption,
lock_after_create: bool = True
) -> Dict[PartitionModification, Luks2]:
enc_mods: Dict[PartitionModification, Luks2] = {}
for mod in self._disk_config.device_modifications:
partitions = mod.partitions
# don't touch existing partitions
filtered_part = [p for p in partitions if not p.exists()]
self._validate_partitions(filtered_part)
# make sure all devices are unmounted
device_handler.umount_all_existing(mod.device_path)
enc_mods = {}
for part_mod in filtered_part:
if part_mod in enc_config.partitions:
luks_handler = device_handler.encrypt(
part_mod.safe_dev_path,
part_mod.mapper_name,
enc_config.encryption_password,
lock_after_create=lock_after_create
)
enc_mods[part_mod] = luks_handler
return enc_mods
def _lvm_vol_handle_e2scrub(self, vol_gp: LvmVolumeGroup):
# from arch wiki:
# If a logical volume will be formatted with ext4, leave at least 256 MiB
# free space in the volume group to allow using e2scrub
if any([vol.fs_type == FilesystemType.Ext4 for vol in vol_gp.volumes]):
largest_vol = max(vol_gp.volumes, key=lambda x: x.length)
device_handler.lvm_vol_reduce(
largest_vol.safe_dev_path,
Size(256, Unit.MiB, SectorSize.default())
)
def _do_countdown(self) -> bool:
SIG_TRIGGER = False

View File

@ -2,7 +2,7 @@ from __future__ import annotations
import re
from pathlib import Path
from typing import Any, Dict, TYPE_CHECKING, List, Optional, Tuple
from typing import Any, TYPE_CHECKING, List, Optional, Tuple
from .device_model import PartitionModification, FilesystemType, BDevice, Size, Unit, PartitionType, PartitionFlag, \
ModificationStatus, DeviceGeometry, SectorSize, BtrfsMountOption
@ -38,21 +38,6 @@ class PartitioningList(ListManager):
display_actions = list(self._actions.values())
super().__init__(prompt, device_partitions, display_actions[:2], display_actions[3:])
def reformat(self, data: List[PartitionModification]) -> Dict[str, Optional[PartitionModification]]:
table = FormattedOutput.as_table(data)
rows = table.split('\n')
# these are the header rows of the table and do not map to any User obviously
# we're adding 2 spaces as prefix because the menu selector '> ' will be put before
# the selectable rows so the header has to be aligned
display_data: Dict[str, Optional[PartitionModification]] = {f' {rows[0]}': None, f' {rows[1]}': None}
for row, user in zip(rows[2:], data):
row = row.replace('|', '\\|')
display_data[row] = user
return display_data
def selected_action_display(self, partition: PartitionModification) -> str:
return str(_('Partition'))
@ -258,7 +243,6 @@ class PartitioningList(ListManager):
while True:
value = TextInput(prompt).run().strip()
size: Optional[Size] = None
if not value:
size = default
else:

View File

@ -1,9 +1,8 @@
from pathlib import Path
from typing import Dict, List, Optional, Any, TYPE_CHECKING
from typing import List, Optional, Any, TYPE_CHECKING
from .device_model import SubvolumeModification
from ..menu import TextInput, ListManager
from ..output import FormattedOutput
if TYPE_CHECKING:
_: Any
@ -18,21 +17,6 @@ class SubvolumeMenu(ListManager):
]
super().__init__(prompt, btrfs_subvols, [self._actions[0]], self._actions[1:])
def reformat(self, data: List[SubvolumeModification]) -> Dict[str, Optional[SubvolumeModification]]:
table = FormattedOutput.as_table(data)
rows = table.split('\n')
# these are the header rows of the table and do not map to any User obviously
# we're adding 2 spaces as prefix because the menu selector '> ' will be put before
# the selectable rows so the header has to be aligned
display_data: Dict[str, Optional[SubvolumeModification]] = {f' {rows[0]}': None, f' {rows[1]}': None}
for row, subvol in zip(rows[2:], data):
row = row.replace('|', '\\|')
display_data[row] = subvol
return display_data
def selected_action_display(self, subvolume: SubvolumeModification) -> str:
return str(subvolume.name)

View File

@ -14,7 +14,6 @@ from .models.audio_configuration import Audio, AudioConfiguration
from .models.users import User
from .output import FormattedOutput
from .profile.profile_menu import ProfileConfiguration
from .storage import storage
from .configuration import save_config
from .interactions import add_number_of_parallel_downloads
from .interactions import ask_additional_packages_to_install
@ -30,7 +29,6 @@ from .interactions import select_additional_repositories
from .interactions import select_kernel
from .utils.util import format_cols
from .interactions import ask_ntp
from .interactions.disk_conf import select_disk_config
if TYPE_CHECKING:
_: Any
@ -38,7 +36,6 @@ if TYPE_CHECKING:
class GlobalMenu(AbstractMenu):
def __init__(self, data_store: Dict[str, Any]):
self._defined_text = str(_('Defined'))
super().__init__(data_store=data_store, auto_cursor=True, preview_size=0.3)
def setup_selection_menu_options(self):
@ -54,20 +51,20 @@ class GlobalMenu(AbstractMenu):
_('Locales'),
lambda preset: self._locale_selection(preset),
preview_func=self._prev_locale,
display_func=lambda x: self._defined_text if x else '')
display_func=lambda x: self.defined_text if x else '')
self._menu_options['mirror_config'] = \
Selector(
_('Mirrors'),
lambda preset: self._mirror_configuration(preset),
display_func=lambda x: self._defined_text if x else '',
display_func=lambda x: self.defined_text if x else '',
preview_func=self._prev_mirror_config
)
self._menu_options['disk_config'] = \
Selector(
_('Disk configuration'),
lambda preset: self._select_disk_config(preset),
preview_func=self._prev_disk_layouts,
display_func=lambda x: self._display_disk_layout(x),
preview_func=self._prev_disk_config,
display_func=lambda x: self.defined_text if x else '',
)
self._menu_options['disk_encryption'] = \
Selector(
@ -75,7 +72,8 @@ class GlobalMenu(AbstractMenu):
lambda preset: self._disk_encryption(preset),
preview_func=self._prev_disk_encryption,
display_func=lambda x: self._display_disk_encryption(x),
dependencies=['disk_config'])
dependencies=['disk_config']
)
self._menu_options['swap'] = \
Selector(
_('Swap'),
@ -140,7 +138,7 @@ class GlobalMenu(AbstractMenu):
Selector(
_('Additional packages'),
lambda preset: ask_additional_packages_to_install(preset),
display_func=lambda x: self._defined_text if x else '',
display_func=lambda x: self.defined_text if x else '',
preview_func=self._prev_additional_pkgs,
default=[])
self._menu_options['additional-repositories'] = \
@ -247,14 +245,17 @@ class GlobalMenu(AbstractMenu):
return config.type.display_msg()
def _disk_encryption(self, preset: Optional[disk.DiskEncryption]) -> Optional[disk.DiskEncryption]:
mods: Optional[disk.DiskLayoutConfiguration] = self._menu_options['disk_config'].current_selection
disk_config: Optional[disk.DiskLayoutConfiguration] = self._menu_options['disk_config'].current_selection
if not mods:
if not disk_config:
# this should not happen as the encryption menu has the disk_config as dependency
raise ValueError('No disk layout specified')
if not disk.DiskEncryption.validate_enc(disk_config):
return None
data_store: Dict[str, Any] = {}
disk_encryption = disk.DiskEncryptionMenu(mods, data_store, preset=preset).run()
disk_encryption = disk.DiskEncryptionMenu(disk_config, data_store, preset=preset).run()
return disk_encryption
def _locale_selection(self, preset: LocaleConfiguration) -> LocaleConfiguration:
@ -287,44 +288,35 @@ class GlobalMenu(AbstractMenu):
return format_cols(packages, None)
return None
def _prev_disk_layouts(self) -> Optional[str]:
def _prev_disk_config(self) -> Optional[str]:
selector = self._menu_options['disk_config']
disk_layout_conf: Optional[disk.DiskLayoutConfiguration] = selector.current_selection
output = ''
if disk_layout_conf:
device_mods: List[disk.DeviceModification] = \
list(filter(lambda x: len(x.partitions) > 0, disk_layout_conf.device_modifications))
output += str(_('Configuration type: {}')).format(disk_layout_conf.config_type.display_msg())
if device_mods:
output_partition = '{}: {}\n'.format(str(_('Configuration')), disk_layout_conf.config_type.display_msg())
output_btrfs = ''
if disk_layout_conf.lvm_config:
output += '\n{}: {}'.format(str(_('LVM configuration type')), disk_layout_conf.lvm_config.config_type.display_msg())
for mod in device_mods:
# create partition table
partition_table = FormattedOutput.as_table(mod.partitions)
output_partition += f'{mod.device_path}: {mod.device.device_info.model}\n'
output_partition += partition_table + '\n'
# create btrfs table
btrfs_partitions = list(
filter(lambda p: len(p.btrfs_subvols) > 0, mod.partitions)
)
for partition in btrfs_partitions:
output_btrfs += FormattedOutput.as_table(partition.btrfs_subvols) + '\n'
output = output_partition + output_btrfs
return output.rstrip()
if output:
return output
return None
def _display_disk_layout(self, current_value: Optional[disk.DiskLayoutConfiguration] = None) -> str:
def _display_disk_config(self, current_value: Optional[disk.DiskLayoutConfiguration] = None) -> str:
if current_value:
return current_value.config_type.display_msg()
return ''
def _prev_disk_encryption(self) -> Optional[str]:
disk_config: Optional[disk.DiskLayoutConfiguration] = self._menu_options['disk_config'].current_selection
if disk_config and not disk.DiskEncryption.validate_enc(disk_config):
return str(_('LVM disk encryption with more than 2 partitions is currently not supported'))
encryption: Optional[disk.DiskEncryption] = self._menu_options['disk_encryption'].current_selection
if encryption:
enc_type = disk.EncryptionType.type_to_text(encryption.encryption_type)
output = str(_('Encryption type')) + f': {enc_type}\n'
@ -332,6 +324,8 @@ class GlobalMenu(AbstractMenu):
if encryption.partitions:
output += 'Partitions: {} selected'.format(len(encryption.partitions)) + '\n'
elif encryption.lvm_volumes:
output += 'LVM volumes: {} selected'.format(len(encryption.lvm_volumes)) + '\n'
if encryption.hsm_device:
output += f'HSM: {encryption.hsm_device.manufacturer}'
@ -425,10 +419,8 @@ class GlobalMenu(AbstractMenu):
self,
preset: Optional[disk.DiskLayoutConfiguration] = None
) -> Optional[disk.DiskLayoutConfiguration]:
disk_config = select_disk_config(
preset,
storage['arguments'].get('advanced', False)
)
data_store: Dict[str, Any] = {}
disk_config = disk.DiskLayoutConfigurationMenu(preset, data_store).run()
if disk_config != preset:
self._menu_options['disk_encryption'].set_current_selection(None)

View File

@ -52,7 +52,7 @@ class Installer:
`Installer()` is the wrapper for most basic installation steps.
It also wraps :py:func:`~archinstall.Installer.pacstrap` among other things.
"""
self.base_packages = base_packages or __packages__[:3]
self._base_packages = base_packages or __packages__[:3]
self.kernels = kernels or ['linux']
self._disk_config = disk_config
@ -64,11 +64,11 @@ class Installer:
self.helper_flags: Dict[str, Any] = {'base': False, 'bootloader': None}
for kernel in self.kernels:
self.base_packages.append(kernel)
self._base_packages.append(kernel)
# If using accessibility tools in the live environment, append those to the packages list
if accessibility_tools_in_use():
self.base_packages.extend(__accessibility_packages__)
self._base_packages.extend(__accessibility_packages__)
self.post_base_install: List[Callable] = []
@ -90,6 +90,8 @@ class Installer:
self._fstab_entries: List[str] = []
self._zram_enabled = False
self._disable_fstrim = False
self.pacman = Pacman(self.target, storage['arguments'].get('silent', False))
def __enter__(self) -> 'Installer':
@ -198,31 +200,71 @@ class Installer:
self._verify_service_stop()
def mount_ordered_layout(self):
info('Mounting partitions in order')
debug('Mounting ordered layout')
luks_handlers: Dict[Any, Luks2] = {}
match self._disk_encryption.encryption_type:
case disk.EncryptionType.NoEncryption:
self._mount_lvm_layout()
case disk.EncryptionType.Luks:
luks_handlers = self._prepare_luks_partitions(self._disk_encryption.partitions)
case disk.EncryptionType.LvmOnLuks:
luks_handlers = self._prepare_luks_partitions(self._disk_encryption.partitions)
self._import_lvm()
self._mount_lvm_layout(luks_handlers)
case disk.EncryptionType.LuksOnLvm:
self._import_lvm()
luks_handlers = self._prepare_luks_lvm(self._disk_encryption.lvm_volumes)
self._mount_lvm_layout(luks_handlers)
# mount all regular partitions
self._mount_partition_layout(luks_handlers)
def _mount_partition_layout(self, luks_handlers: Dict[Any, Luks2]):
debug('Mounting partition layout')
# do not mount any PVs part of the LVM configuration
pvs = []
if self._disk_config.lvm_config:
pvs = self._disk_config.lvm_config.get_all_pvs()
for mod in self._disk_config.device_modifications:
not_pv_part_mods = list(filter(lambda x: x not in pvs, mod.partitions))
# partitions have to mounted in the right order on btrfs the mountpoint will
# be empty as the actual subvolumes are getting mounted instead so we'll use
# '/' just for sorting
sorted_part_mods = sorted(mod.partitions, key=lambda x: x.mountpoint or Path('/'))
enc_partitions = []
if self._disk_encryption.encryption_type is not disk.EncryptionType.NoEncryption:
enc_partitions = list(set(sorted_part_mods) & set(self._disk_encryption.partitions))
# attempt to decrypt all luks partitions
luks_handlers = self._prepare_luks_partitions(enc_partitions)
sorted_part_mods = sorted(not_pv_part_mods, key=lambda x: x.mountpoint or Path('/'))
for part_mod in sorted_part_mods:
if luks_handler := luks_handlers.get(part_mod):
# mount encrypted partition
self._mount_luks_partition(part_mod, luks_handler)
else:
# partition is not encrypted
self._mount_partition(part_mod)
def _prepare_luks_partitions(self, partitions: List[disk.PartitionModification]) -> Dict[
disk.PartitionModification, Luks2]:
def _mount_lvm_layout(self, luks_handlers: Dict[Any, Luks2] = {}):
lvm_config = self._disk_config.lvm_config
if not lvm_config:
debug('No lvm config defined to be mounted')
return
debug('Mounting LVM layout')
for vg in lvm_config.vol_groups:
sorted_vol = sorted(vg.volumes, key=lambda x: x.mountpoint or Path('/'))
for vol in sorted_vol:
if luks_handler := luks_handlers.get(vol):
self._mount_luks_volume(vol, luks_handler)
else:
self._mount_lvm_vol(vol)
def _prepare_luks_partitions(
self,
partitions: List[disk.PartitionModification]
) -> Dict[disk.PartitionModification, Luks2]:
return {
part_mod: disk.device_handler.unlock_luks2_dev(
part_mod.dev_path,
@ -233,6 +275,33 @@ class Installer:
if part_mod.mapper_name and part_mod.dev_path
}
def _import_lvm(self):
lvm_config = self._disk_config.lvm_config
if not lvm_config:
debug('No lvm config defined to be imported')
return
for vg in lvm_config.vol_groups:
disk.device_handler.lvm_import_vg(vg)
for vol in vg.volumes:
disk.device_handler.lvm_vol_change(vol, True)
def _prepare_luks_lvm(
self,
lvm_volumes: List[disk.LvmVolume]
) -> Dict[disk.LvmVolume, Luks2]:
return {
vol: disk.device_handler.unlock_luks2_dev(
vol.dev_path,
vol.mapper_name,
self._disk_encryption.encryption_password
)
for vol in lvm_volumes
if vol.mapper_name and vol.dev_path
}
def _mount_partition(self, part_mod: disk.PartitionModification):
# it would be none if it's btrfs as the subvolumes will have the mountpoints defined
if part_mod.mountpoint and part_mod.dev_path:
@ -246,14 +315,32 @@ class Installer:
part_mod.mount_options
)
def _mount_lvm_vol(self, volume: disk.LvmVolume):
if volume.fs_type != disk.FilesystemType.Btrfs:
if volume.mountpoint and volume.dev_path:
target = self.target / volume.relative_mountpoint
disk.device_handler.mount(volume.dev_path, target, options=volume.mount_options)
if volume.fs_type == disk.FilesystemType.Btrfs and volume.dev_path:
self._mount_btrfs_subvol(volume.dev_path, volume.btrfs_subvols, volume.mount_options)
def _mount_luks_partition(self, part_mod: disk.PartitionModification, luks_handler: Luks2):
# it would be none if it's btrfs as the subvolumes will have the mountpoints defined
if part_mod.mountpoint and luks_handler.mapper_dev:
target = self.target / part_mod.relative_mountpoint
disk.device_handler.mount(luks_handler.mapper_dev, target, options=part_mod.mount_options)
if part_mod.fs_type != disk.FilesystemType.Btrfs:
if part_mod.mountpoint and luks_handler.mapper_dev:
target = self.target / part_mod.relative_mountpoint
disk.device_handler.mount(luks_handler.mapper_dev, target, options=part_mod.mount_options)
if part_mod.fs_type == disk.FilesystemType.Btrfs and luks_handler.mapper_dev:
self._mount_btrfs_subvol(luks_handler.mapper_dev, part_mod.btrfs_subvols)
self._mount_btrfs_subvol(luks_handler.mapper_dev, part_mod.btrfs_subvols, part_mod.mount_options)
def _mount_luks_volume(self, volume: disk.LvmVolume, luks_handler: Luks2):
if volume.fs_type != disk.FilesystemType.Btrfs:
if volume.mountpoint and luks_handler.mapper_dev:
target = self.target / volume.relative_mountpoint
disk.device_handler.mount(luks_handler.mapper_dev, target, options=volume.mount_options)
if volume.fs_type == disk.FilesystemType.Btrfs and luks_handler.mapper_dev:
self._mount_btrfs_subvol(luks_handler.mapper_dev, volume.btrfs_subvols, volume.mount_options)
def _mount_btrfs_subvol(
self,
@ -262,13 +349,23 @@ class Installer:
mount_options: List[str] = []
):
for subvol in subvolumes:
disk.device_handler.mount(
dev_path,
self.target / subvol.relative_mountpoint,
options=mount_options + [f'subvol={subvol.name}']
)
mountpoint = self.target / subvol.relative_mountpoint
mount_options = mount_options + [f'subvol={subvol.name}']
disk.device_handler.mount(dev_path, mountpoint, options=mount_options)
def generate_key_files(self):
match self._disk_encryption.encryption_type:
case disk.EncryptionType.Luks:
self._generate_key_files_partitions()
case disk.EncryptionType.LuksOnLvm:
self._generate_key_file_lvm_volumes()
case disk.EncryptionType.LvmOnLuks:
# currently LvmOnLuks only supports a single
# partitioning layout (boot + partition)
# so we won't need any keyfile generation atm
pass
def _generate_key_files_partitions(self):
for part_mod in self._disk_encryption.partitions:
gen_enc_file = self._disk_encryption.should_generate_encryption_file(part_mod)
@ -279,14 +376,36 @@ class Installer:
)
if gen_enc_file and not part_mod.is_root():
info(f'Creating key-file: {part_mod.dev_path}')
debug(f'Creating key-file: {part_mod.dev_path}')
luks_handler.create_keyfile(self.target)
if part_mod.is_root() and not gen_enc_file:
if self._disk_encryption.hsm_device:
disk.Fido2.fido2_enroll(
self._disk_encryption.hsm_device,
part_mod,
part_mod.safe_dev_path,
self._disk_encryption.encryption_password
)
def _generate_key_file_lvm_volumes(self):
for vol in self._disk_encryption.lvm_volumes:
gen_enc_file = self._disk_encryption.should_generate_encryption_file(vol)
luks_handler = Luks2(
vol.safe_dev_path,
mapper_name=vol.mapper_name,
password=self._disk_encryption.encryption_password
)
if gen_enc_file and not vol.is_root():
info(f'Creating key-file: {vol.dev_path}')
luks_handler.create_keyfile(self.target)
if vol.is_root() and not gen_enc_file:
if self._disk_encryption.hsm_device:
disk.Fido2.fido2_enroll(
self._disk_encryption.hsm_device,
vol.safe_dev_path,
self._disk_encryption.encryption_password
)
@ -393,7 +512,7 @@ class Installer:
for entry in self._fstab_entries:
fp.write(f'{entry}\n')
def set_hostname(self, hostname: str, *args: str, **kwargs: str) -> None:
def set_hostname(self, hostname: str):
with open(f'{self.target}/etc/hostname', 'w') as fh:
fh.write(hostname + '\n')
@ -444,7 +563,7 @@ class Installer:
(self.target / 'etc/locale.conf').write_text(f'LANG={lang_value}\n')
return True
def set_timezone(self, zone: str, *args: str, **kwargs: str) -> bool:
def set_timezone(self, zone: str) -> bool:
if not zone:
return True
if not len(zone):
@ -532,7 +651,7 @@ class Installer:
if enable_services:
# If we haven't installed the base yet (function called pre-maturely)
if self.helper_flags.get('base', False) is False:
self.base_packages.append('iwd')
self._base_packages.append('iwd')
# This function will be called after minimal_installation()
# as a hook for post-installs. This hook is only needed if
@ -608,6 +727,79 @@ class Installer:
return vendor.get_ucode()
return None
def _handle_partition_installation(self):
pvs = []
if self._disk_config.lvm_config:
pvs = self._disk_config.lvm_config.get_all_pvs()
for mod in self._disk_config.device_modifications:
for part in mod.partitions:
if part in pvs or part.fs_type is None:
continue
if (pkg := part.fs_type.installation_pkg) is not None:
self._base_packages.append(pkg)
if (module := part.fs_type.installation_module) is not None:
self._modules.append(module)
if (binary := part.fs_type.installation_binary) is not None:
self._binaries.append(binary)
# https://github.com/archlinux/archinstall/issues/1837
if part.fs_type.fs_type_mount == 'btrfs':
self._disable_fstrim = True
# There is not yet an fsck tool for NTFS. If it's being used for the root filesystem, the hook should be removed.
if part.fs_type.fs_type_mount == 'ntfs3' and part.mountpoint == self.target:
if 'fsck' in self._hooks:
self._hooks.remove('fsck')
if part in self._disk_encryption.partitions:
if self._disk_encryption.hsm_device:
# Required by mkinitcpio to add support for fido2-device options
self.pacman.strap('libfido2')
if 'sd-encrypt' not in self._hooks:
self._hooks.insert(self._hooks.index('filesystems'), 'sd-encrypt')
else:
if 'encrypt' not in self._hooks:
self._hooks.insert(self._hooks.index('filesystems'), 'encrypt')
def _handle_lvm_installation(self):
if not self._disk_config.lvm_config:
return
self.add_additional_packages('lvm2')
self._hooks.insert(self._hooks.index('filesystems') - 1, 'lvm2')
for vg in self._disk_config.lvm_config.vol_groups:
for vol in vg.volumes:
if vol.fs_type is not None:
if (pkg := vol.fs_type.installation_pkg) is not None:
self._base_packages.append(pkg)
if (module := vol.fs_type.installation_module) is not None:
self._modules.append(module)
if (binary := vol.fs_type.installation_binary) is not None:
self._binaries.append(binary)
if vol.fs_type.fs_type_mount == 'btrfs':
self._disable_fstrim = True
# There is not yet an fsck tool for NTFS. If it's being used for the root filesystem, the hook should be removed.
if vol.fs_type.fs_type_mount == 'ntfs3' and vol.mountpoint == self.target:
if 'fsck' in self._hooks:
self._hooks.remove('fsck')
if self._disk_encryption.encryption_type in [disk.EncryptionType.LvmOnLuks, disk.EncryptionType.LuksOnLvm]:
if self._disk_encryption.hsm_device:
# Required by mkinitcpio to add support for fido2-device options
self.pacman.strap('libfido2')
if 'sd-encrypt' not in self._hooks:
self._hooks.insert(self._hooks.index('lvm2') - 1, 'sd-encrypt')
else:
if 'encrypt' not in self._hooks:
self._hooks.insert(self._hooks.index('lvm2') - 1, 'encrypt')
def minimal_installation(
self,
testing: bool = False,
@ -616,43 +808,17 @@ class Installer:
hostname: str = 'archinstall',
locale_config: LocaleConfiguration = LocaleConfiguration.default()
):
_disable_fstrim = False
for mod in self._disk_config.device_modifications:
for part in mod.partitions:
if part.fs_type is not None:
if (pkg := part.fs_type.installation_pkg) is not None:
self.base_packages.append(pkg)
if (module := part.fs_type.installation_module) is not None:
self._modules.append(module)
if (binary := part.fs_type.installation_binary) is not None:
self._binaries.append(binary)
# https://github.com/archlinux/archinstall/issues/1837
if part.fs_type.fs_type_mount == 'btrfs':
_disable_fstrim = True
# There is not yet an fsck tool for NTFS. If it's being used for the root filesystem, the hook should be removed.
if part.fs_type.fs_type_mount == 'ntfs3' and part.mountpoint == self.target:
if 'fsck' in self._hooks:
self._hooks.remove('fsck')
if part in self._disk_encryption.partitions:
if self._disk_encryption.hsm_device:
# Required by mkinitcpio to add support for fido2-device options
self.pacman.strap('libfido2')
if 'sd-encrypt' not in self._hooks:
self._hooks.insert(self._hooks.index('filesystems'), 'sd-encrypt')
else:
if 'encrypt' not in self._hooks:
self._hooks.insert(self._hooks.index('filesystems'), 'encrypt')
if self._disk_config.lvm_config:
self._handle_lvm_installation()
else:
self._handle_partition_installation()
if not SysInfo.has_uefi():
self.base_packages.append('grub')
self._base_packages.append('grub')
if ucode := self._get_microcode():
(self.target / 'boot' / ucode).unlink(missing_ok=True)
self.base_packages.append(ucode.stem)
self._base_packages.append(ucode.stem)
else:
debug('Archinstall will not install any ucode.')
@ -673,7 +839,7 @@ class Installer:
pacman_conf.apply()
self.pacman.strap(self.base_packages)
self.pacman.strap(self._base_packages)
self.helper_flags['base-strapped'] = True
pacman_conf.persist()
@ -685,7 +851,7 @@ class Installer:
# https://github.com/archlinux/archinstall/issues/880
# https://github.com/archlinux/archinstall/issues/1837
# https://github.com/archlinux/archinstall/issues/1841
if not _disable_fstrim:
if not self._disable_fstrim:
self.enable_periodic_trim()
# TODO: Support locale and timezone
@ -742,13 +908,24 @@ class Installer:
return boot
return None
def _get_root_partition(self) -> Optional[disk.PartitionModification]:
for mod in self._disk_config.device_modifications:
if root := mod.get_root_partition():
return root
def _get_root(self) -> Optional[disk.PartitionModification | disk.LvmVolume]:
if self._disk_config.lvm_config:
return self._disk_config.lvm_config.get_root_volume()
else:
for mod in self._disk_config.device_modifications:
if root := mod.get_root_partition():
return root
return None
def _get_kernel_params(
def _get_luks_uuid_from_mapper_dev(self, mapper_dev_path: Path) -> str:
lsblk_info = disk.get_lsblk_info(mapper_dev_path, reverse=True, full_dev_path=True)
if not lsblk_info.children or not lsblk_info.children[0].uuid:
raise ValueError('Unable to determine UUID of luks superblock')
return lsblk_info.children[0].uuid
def _get_kernel_params_partition(
self,
root_partition: disk.PartitionModification,
id_root: bool = True,
@ -784,20 +961,74 @@ class Installer:
debug(f'Identifying root partition by UUID: {root_partition.uuid}')
kernel_parameters.append(f'root=UUID={root_partition.uuid}')
return kernel_parameters
def _get_kernel_params_lvm(
self,
lvm: disk.LvmVolume
) -> List[str]:
kernel_parameters = []
match self._disk_encryption.encryption_type:
case disk.EncryptionType.LvmOnLuks:
if not lvm.vg_name:
raise ValueError(f'Unable to determine VG name for {lvm.name}')
pv_seg_info = disk.device_handler.lvm_pvseg_info(lvm.vg_name, lvm.name)
if not pv_seg_info:
raise ValueError(f'Unable to determine PV segment info for {lvm.vg_name}/{lvm.name}')
uuid = self._get_luks_uuid_from_mapper_dev(pv_seg_info.pv_name)
if self._disk_encryption.hsm_device:
debug(f'LvmOnLuks, encrypted root partition, HSM, identifying by UUID: {uuid}')
kernel_parameters.append(f'rd.luks.name={uuid}=cryptlvm root={lvm.safe_dev_path}')
else:
debug(f'LvmOnLuks, encrypted root partition, identifying by UUID: {uuid}')
kernel_parameters.append(f'cryptdevice=UUID={uuid}:cryptlvm root={lvm.safe_dev_path}')
case disk.EncryptionType.LuksOnLvm:
uuid = self._get_luks_uuid_from_mapper_dev(lvm.mapper_path)
if self._disk_encryption.hsm_device:
debug(f'LuksOnLvm, encrypted root partition, HSM, identifying by UUID: {uuid}')
kernel_parameters.append(f'rd.luks.name={uuid}=root root=/dev/mapper/root')
else:
debug(f'LuksOnLvm, encrypted root partition, identifying by UUID: {uuid}')
kernel_parameters.append(f'cryptdevice=UUID={uuid}:root root=/dev/mapper/root')
case disk.EncryptionType.NoEncryption:
debug(f'Identifying root lvm by mapper device: {lvm.dev_path}')
kernel_parameters.append(f'root={lvm.safe_dev_path}')
return kernel_parameters
def _get_kernel_params(
self,
root: disk.PartitionModification | disk.LvmVolume,
id_root: bool = True,
partuuid: bool = True
) -> List[str]:
kernel_parameters = []
if isinstance(root, disk.LvmVolume):
kernel_parameters = self._get_kernel_params_lvm(root)
else:
kernel_parameters = self._get_kernel_params_partition(root, id_root, partuuid)
# Zswap should be disabled when using zram.
# https://github.com/archlinux/archinstall/issues/881
if self._zram_enabled:
kernel_parameters.append('zswap.enabled=0')
if id_root:
for sub_vol in root_partition.btrfs_subvols:
for sub_vol in root.btrfs_subvols:
if sub_vol.is_root():
kernel_parameters.append(f'rootflags=subvol={sub_vol.name}')
break
kernel_parameters.append('rw')
kernel_parameters.append(f'rootfstype={root_partition.safe_fs_type.fs_type_mount}')
kernel_parameters.append(f'rootfstype={root.safe_fs_type.fs_type_mount}')
kernel_parameters.extend(self._kernel_params)
debug(f'kernel parameters: {" ".join(kernel_parameters)}')
@ -807,10 +1038,12 @@ class Installer:
def _add_systemd_bootloader(
self,
boot_partition: disk.PartitionModification,
root_partition: disk.PartitionModification,
root: disk.PartitionModification | disk.LvmVolume,
efi_partition: Optional[disk.PartitionModification],
uki_enabled: bool = False
):
debug('Installing systemd bootloader')
self.pacman.strap('efibootmgr')
if not SysInfo.has_uefi():
@ -882,7 +1115,7 @@ class Installer:
f'# Created on: {self.init_time}'
)
options = 'options ' + ' '.join(self._get_kernel_params(root_partition))
options = 'options ' + ' '.join(self._get_kernel_params(root))
for kernel in self.kernels:
for variant in ("", "-fallback"):
@ -904,15 +1137,17 @@ class Installer:
def _add_grub_bootloader(
self,
boot_partition: disk.PartitionModification,
root_partition: disk.PartitionModification,
root: disk.PartitionModification | disk.LvmVolume,
efi_partition: Optional[disk.PartitionModification]
):
debug('Installing grub bootloader')
self.pacman.strap('grub') # no need?
grub_default = self.target / 'etc/default/grub'
config = grub_default.read_text()
kernel_parameters = ' '.join(self._get_kernel_params(root_partition, False, False))
kernel_parameters = ' '.join(self._get_kernel_params(root, False, False))
config = re.sub(r'(GRUB_CMDLINE_LINUX=")("\n)', rf'\1{kernel_parameters}\2', config, 1)
grub_default.write_text(config)
@ -934,7 +1169,7 @@ class Installer:
info(f"GRUB EFI partition: {efi_partition.dev_path}")
self.pacman.strap('efibootmgr') # TODO: Do we need? Yes, but remove from minimal_installation() instead?
self.pacman.strap('efibootmgr') # TODO: Do we need? Yes, but remove from minimal_installation() instead?
boot_dir_arg = []
if boot_partition.mountpoint and boot_partition.mountpoint != boot_dir:
@ -988,8 +1223,10 @@ class Installer:
self,
boot_partition: disk.PartitionModification,
efi_partition: Optional[disk.PartitionModification],
root_partition: disk.PartitionModification
root: disk.PartitionModification | disk.LvmVolume
):
debug('Installing limine bootloader')
self.pacman.strap('limine')
info(f"Limine boot partition: {boot_partition.dev_path}")
@ -1052,7 +1289,7 @@ Exec = /bin/sh -c "{hook_command}"
hook_path = hooks_dir / '99-limine.hook'
hook_path.write_text(hook_contents)
kernel_params = ' '.join(self._get_kernel_params(root_partition))
kernel_params = ' '.join(self._get_kernel_params(root))
config_contents = 'TIMEOUT=5\n'
for kernel in self.kernels:
@ -1075,9 +1312,11 @@ Exec = /bin/sh -c "{hook_command}"
def _add_efistub_bootloader(
self,
boot_partition: disk.PartitionModification,
root_partition: disk.PartitionModification,
root: disk.PartitionModification | disk.LvmVolume,
uki_enabled: bool = False
):
debug('Installing efistub bootloader')
self.pacman.strap('efibootmgr')
if not SysInfo.has_uefi():
@ -1092,7 +1331,7 @@ Exec = /bin/sh -c "{hook_command}"
entries = (
'initrd=/initramfs-{kernel}.img',
*self._get_kernel_params(root_partition)
*self._get_kernel_params(root)
)
cmdline = [' '.join(entries)]
@ -1122,7 +1361,7 @@ Exec = /bin/sh -c "{hook_command}"
def _config_uki(
self,
root_partition: disk.PartitionModification,
root: disk.PartitionModification | disk.LvmVolume,
efi_partition: Optional[disk.PartitionModification]
):
if not efi_partition or not efi_partition.mountpoint:
@ -1130,7 +1369,7 @@ Exec = /bin/sh -c "{hook_command}"
# Set up kernel command line
with open(self.target / 'etc/kernel/cmdline', 'w') as cmdline:
kernel_parameters = self._get_kernel_params(root_partition)
kernel_parameters = self._get_kernel_params(root)
cmdline.write(' '.join(kernel_parameters) + '\n')
diff_mountpoint = None
@ -1191,37 +1430,33 @@ Exec = /bin/sh -c "{hook_command}"
efi_partition = self._get_efi_partition()
boot_partition = self._get_boot_partition()
root_partition = self._get_root_partition()
root = self._get_root()
if boot_partition is None:
raise ValueError(f'Could not detect boot at mountpoint {self.target}')
if root_partition is None:
if root is None:
raise ValueError(f'Could not detect root at mountpoint {self.target}')
info(f'Adding bootloader {bootloader.value} to {boot_partition.dev_path}')
if uki_enabled:
self._config_uki(root_partition, efi_partition)
self._config_uki(root, efi_partition)
match bootloader:
case Bootloader.Systemd:
self._add_systemd_bootloader(boot_partition, root_partition, efi_partition, uki_enabled)
self._add_systemd_bootloader(boot_partition, root, efi_partition, uki_enabled)
case Bootloader.Grub:
self._add_grub_bootloader(boot_partition, root_partition, efi_partition)
self._add_grub_bootloader(boot_partition, root, efi_partition)
case Bootloader.Efistub:
self._add_efistub_bootloader(boot_partition, root_partition, uki_enabled)
self._add_efistub_bootloader(boot_partition, root, uki_enabled)
case Bootloader.Limine:
self._add_limine_bootloader(boot_partition, efi_partition, root_partition)
self._add_limine_bootloader(boot_partition, efi_partition, root)
def add_additional_packages(self, packages: Union[str, List[str]]) -> bool:
return self.pacman.strap(packages)
def _enable_users(self, service: str, users: List[User]):
for user in users:
self.arch_chroot(f'systemctl enable --user {service}', run_as=user.username)
def enable_sudo(self, entity: str, group :bool = False):
def enable_sudo(self, entity: str, group: bool = False):
info(f'Enabling sudo permissions for {entity}')
sudoers_dir = f"{self.target}/etc/sudoers.d"
@ -1237,7 +1472,7 @@ Exec = /bin/sh -c "{hook_command}"
# We count how many files are there already so we know which number to prefix the file with
num_of_rules_already = len(os.listdir(sudoers_dir))
file_num_str = "{:02d}".format(num_of_rules_already) # We want 00_user1, 01_user2, etc
file_num_str = "{:02d}".format(num_of_rules_already) # We want 00_user1, 01_user2, etc
# Guarantees that entity str does not contain invalid characters for a linux file name:
# \ / : * ? " < > |
@ -1293,7 +1528,7 @@ Exec = /bin/sh -c "{hook_command}"
if sudo and self.enable_sudo(user):
self.helper_flags['user'] = True
def user_set_pw(self, user :str, password :str) -> bool:
def user_set_pw(self, user: str, password: str) -> bool:
info(f'Setting password for {user}')
if user == 'root':
@ -1310,7 +1545,7 @@ Exec = /bin/sh -c "{hook_command}"
except SysCallError:
return False
def user_set_shell(self, user :str, shell :str) -> bool:
def user_set_shell(self, user: str, shell: str) -> bool:
info(f'Setting shell for {user} to {shell}')
try:
@ -1319,7 +1554,7 @@ Exec = /bin/sh -c "{hook_command}"
except SysCallError:
return False
def chown(self, owner :str, path :str, options :List[str] = []) -> bool:
def chown(self, owner: str, path: str, options: List[str] = []) -> bool:
cleaned_path = path.replace('\'', '\\\'')
try:
SysCommand(f"/usr/bin/arch-chroot {self.target} sh -c 'chown {' '.join(options)} {owner} {cleaned_path}'")

View File

@ -58,7 +58,7 @@ def select_devices(preset: List[disk.BDevice] = []) -> List[disk.BDevice]:
case MenuSelectionType.Reset: return []
case MenuSelectionType.Skip: return preset
case MenuSelectionType.Selection:
selected_device_info: List[disk._DeviceInfo] = choice.value # type: ignore
selected_device_info: List[disk._DeviceInfo] = choice.single_value
selected_devices = []
for device in devices:
@ -73,7 +73,6 @@ def get_default_partition_layout(
filesystem_type: Optional[disk.FilesystemType] = None,
advanced_option: bool = False
) -> List[disk.DeviceModification]:
if len(devices) == 1:
device_modification = suggest_single_disk_layout(
devices[0],
@ -133,7 +132,7 @@ def select_disk_config(
case MenuSelectionType.Reset: return None
case MenuSelectionType.Selection:
if choice.single_value == pre_mount_mode:
output = "You will use whatever drive-setup is mounted at the specified directory\n"
output = 'You will use whatever drive-setup is mounted at the specified directory\n'
output += "WARNING: Archinstall won't check the suitability of this setup\n"
try:
@ -151,7 +150,6 @@ def select_disk_config(
)
preset_devices = [mod.device for mod in preset.device_modifications] if preset else []
devices = select_devices(preset_devices)
if not devices:
@ -177,6 +175,36 @@ def select_disk_config(
return None
def select_lvm_config(
disk_config: disk.DiskLayoutConfiguration,
preset: Optional[disk.LvmConfiguration] = None,
) -> Optional[disk.LvmConfiguration]:
default_mode = disk.LvmLayoutType.Default.display_msg()
options = [default_mode]
preset_value = preset.config_type.display_msg() if preset else None
warning = str(_('Are you sure you want to reset this setting?'))
choice = Menu(
_('Select a LVM option'),
options,
allow_reset=True,
allow_reset_warning_msg=warning,
sort=False,
preview_size=0.2,
preset_values=preset_value
).run()
match choice.type_:
case MenuSelectionType.Skip: return preset
case MenuSelectionType.Reset: return None
case MenuSelectionType.Selection:
if choice.single_value == default_mode:
return suggest_lvm_layout(disk_config)
return preset
def _boot_partition(sector_size: disk.SectorSize, using_gpt: bool) -> disk.PartitionModification:
flags = [disk.PartitionFlag.Boot]
if using_gpt:
@ -199,7 +227,7 @@ def _boot_partition(sector_size: disk.SectorSize, using_gpt: bool) -> disk.Parti
)
def select_main_filesystem_format(advanced_options=False) -> disk.FilesystemType:
def select_main_filesystem_format(advanced_options: bool = False) -> disk.FilesystemType:
options = {
'btrfs': disk.FilesystemType.Btrfs,
'ext4': disk.FilesystemType.Ext4,
@ -250,7 +278,6 @@ def suggest_single_disk_layout(
prompt = str(_('Would you like to use BTRFS subvolumes with a default structure?'))
choice = Menu(prompt, Menu.yes_no(), skip=False, default_option=Menu.yes()).run()
using_subvolumes = choice.value == Menu.yes()
mount_options = select_mount_options()
device_modification = disk.DeviceModification(device, wipe=True)
@ -288,7 +315,11 @@ def suggest_single_disk_layout(
root_start = boot_partition.start + boot_partition.length
# Set a size for / (/root)
if using_subvolumes or device_size_gib < min_size_to_allow_home_part or not using_home_partition:
if (
using_subvolumes
or device_size_gib < min_size_to_allow_home_part
or not using_home_partition
):
root_length = device.device_info.total_size - root_start
else:
root_length = min(device.device_info.total_size, root_partition_size)
@ -305,6 +336,7 @@ def suggest_single_disk_layout(
fs_type=filesystem_type,
mount_options=mount_options
)
device_modification.add_partition(root_partition)
if using_subvolumes:
@ -388,9 +420,9 @@ def suggest_multi_disk_layout(
device_paths = ', '.join([str(d.device_info.path) for d in devices])
debug(f"Suggesting multi-disk-layout for devices: {device_paths}")
debug(f"/root: {root_device.device_info.path}")
debug(f"/home: {home_device.device_info.path}")
debug(f'Suggesting multi-disk-layout for devices: {device_paths}')
debug(f'/root: {root_device.device_info.path}')
debug(f'/home: {home_device.device_info.path}')
root_device_modification = disk.DeviceModification(root_device, wipe=True)
home_device_modification = disk.DeviceModification(home_device, wipe=True)
@ -444,3 +476,85 @@ def suggest_multi_disk_layout(
home_device_modification.add_partition(home_partition)
return [root_device_modification, home_device_modification]
def suggest_lvm_layout(
disk_config: disk.DiskLayoutConfiguration,
filesystem_type: Optional[disk.FilesystemType] = None,
vg_grp_name: str = 'ArchinstallVg',
) -> disk.LvmConfiguration:
if disk_config.config_type != disk.DiskLayoutType.Default:
raise ValueError('LVM suggested volumes are only available for default partitioning')
using_subvolumes = False
btrfs_subvols = []
home_volume = True
mount_options = []
if not filesystem_type:
filesystem_type = select_main_filesystem_format()
if filesystem_type == disk.FilesystemType.Btrfs:
prompt = str(_('Would you like to use BTRFS subvolumes with a default structure?'))
choice = Menu(prompt, Menu.yes_no(), skip=False, default_option=Menu.yes()).run()
using_subvolumes = choice.value == Menu.yes()
mount_options = select_mount_options()
if using_subvolumes:
btrfs_subvols = [
disk.SubvolumeModification(Path('@'), Path('/')),
disk.SubvolumeModification(Path('@home'), Path('/home')),
disk.SubvolumeModification(Path('@log'), Path('/var/log')),
disk.SubvolumeModification(Path('@pkg'), Path('/var/cache/pacman/pkg')),
disk.SubvolumeModification(Path('@.snapshots'), Path('/.snapshots')),
]
home_volume = False
boot_part: Optional[disk.PartitionModification] = None
other_part: List[disk.PartitionModification] = []
for mod in disk_config.device_modifications:
for part in mod.partitions:
if part.is_boot():
boot_part = part
else:
other_part.append(part)
if not boot_part:
raise ValueError('Unable to find boot partition in partition modifications')
total_vol_available = sum(
[p.length for p in other_part],
disk.Size(0, disk.Unit.B, disk.SectorSize.default()),
)
root_vol_size = disk.Size(20, disk.Unit.GiB, disk.SectorSize.default())
home_vol_size = total_vol_available - root_vol_size
lvm_vol_group = disk.LvmVolumeGroup(vg_grp_name, pvs=other_part, )
root_vol = disk.LvmVolume(
status=disk.LvmVolumeStatus.Create,
name='root',
fs_type=filesystem_type,
length=root_vol_size,
mountpoint=Path('/'),
btrfs_subvols=btrfs_subvols,
mount_options=mount_options
)
lvm_vol_group.volumes.append(root_vol)
if home_volume:
home_vol = disk.LvmVolume(
status=disk.LvmVolumeStatus.Create,
name='home',
fs_type=filesystem_type,
length=home_vol_size,
mountpoint=Path('/home'),
)
lvm_vol_group.volumes.append(home_vol)
return disk.LvmConfiguration(disk.LvmLayoutType.Default, [lvm_vol_group])

View File

@ -1,12 +1,11 @@
from __future__ import annotations
import re
from typing import Any, Dict, TYPE_CHECKING, List, Optional
from typing import Any, TYPE_CHECKING, List, Optional
from .utils import get_password
from ..menu import Menu, ListManager
from ..models.users import User
from ..output import FormattedOutput
if TYPE_CHECKING:
_: Any
@ -26,21 +25,6 @@ class UserList(ListManager):
]
super().__init__(prompt, lusers, [self._actions[0]], self._actions[1:])
def reformat(self, data: List[User]) -> Dict[str, Any]:
table = FormattedOutput.as_table(data)
rows = table.split('\n')
# these are the header rows of the table and do not map to any User obviously
# we're adding 2 spaces as prefix because the menu selector '> ' will be put before
# the selectable rows so the header has to be aligned
display_data: Dict[str, Optional[User]] = {f' {rows[0]}': None, f' {rows[1]}': None}
for row, user in zip(rows[2:], data):
row = row.replace('|', '\\|')
display_data[row] = user
return display_data
def selected_action_display(self, user: User) -> str:
return user.username

View File

@ -60,7 +60,7 @@ class Luks2:
iter_time: int = 10000,
key_file: Optional[Path] = None
) -> Path:
info(f'Luks2 encrypting: {self.luks_dev_path}')
debug(f'Luks2 encrypting: {self.luks_dev_path}')
byte_password = self._password_bytes()
@ -87,12 +87,15 @@ class Luks2:
'luksFormat', str(self.luks_dev_path),
])
debug(f'cryptsetup format: {cryptsetup_args}')
# Retry formatting the volume because archinstall can some times be too quick
# which generates a "Device /dev/sdX does not exist or access denied." between
# setting up partitions and us trying to encrypt it.
for retry_attempt in range(storage['DISK_RETRY_ATTEMPTS'] + 1):
try:
SysCommand(cryptsetup_args)
result = SysCommand(cryptsetup_args).decode()
debug(f'cryptsetup luksFormat output: {result}')
break
except SysCallError as err:
time.sleep(storage['DISK_TIMEOUTS'])
@ -106,10 +109,13 @@ class Luks2:
self.lock()
# Then try again to set up the crypt-device
SysCommand(cryptsetup_args)
result = SysCommand(cryptsetup_args).decode()
debug(f'cryptsetup luksFormat output: {result}')
else:
raise DiskError(f'Could not encrypt volume "{self.luks_dev_path}": {err}')
self.key_file = key_file
return key_file
def _get_luks_uuid(self) -> str:
@ -152,7 +158,15 @@ class Luks2:
while Path(self.luks_dev_path).exists() is False and time.time() - wait_timer < 10:
time.sleep(0.025)
SysCommand(f'/usr/bin/cryptsetup open {self.luks_dev_path} {self.mapper_name} --key-file {key_file} --type luks2')
result = SysCommand(
'/usr/bin/cryptsetup open '
f'{self.luks_dev_path} '
f'{self.mapper_name} '
f'--key-file {key_file} '
f'--type luks2'
).decode()
debug(f'cryptsetup open output: {result}')
if not self.mapper_dev or not self.mapper_dev.is_symlink():
raise DiskError(f'Failed to open luks2 device: {self.luks_dev_path}')
@ -199,8 +213,8 @@ class Luks2:
key_file.parent.mkdir(parents=True, exist_ok=True)
with open(key_file, "w") as keyfile:
keyfile.write(generate_password(length=512))
pwd = generate_password(length=512)
key_file.write_text(pwd)
key_file.chmod(0o400)
@ -208,7 +222,7 @@ class Luks2:
self._crypttab(crypttab_path, kf_path, options=["luks", "key-slot=1"])
def _add_key(self, key_file: Path):
info(f'Adding additional key-file {key_file}')
debug(f'Adding additional key-file {key_file}')
command = f'/usr/bin/cryptsetup -q -v luksAddKey {self.luks_dev_path} {key_file}'
worker = SysCommandWorker(command, environment_vars={'LC_ALL': 'C'})
@ -228,7 +242,7 @@ class Luks2:
key_file: Path,
options: List[str]
) -> None:
info(f'Adding crypttab entry for key {key_file}')
debug(f'Adding crypttab entry for key {key_file}')
with open(crypttab_path, 'a') as crypttab:
opt = ','.join(options)

View File

@ -10,6 +10,7 @@ from ..translationhandler import TranslationHandler, Language
if TYPE_CHECKING:
_: Any
class Selector:
def __init__(
self,
@ -68,42 +69,19 @@ class Selector:
:param no_store: A boolean which determines that the field should or shouldn't be stored in the data storage
:type no_store: bool
"""
self._description = description
self.func = func
self._display_func = display_func
self._current_selection = default
self.enabled = enabled
self._dependencies = dependencies
self._dependencies_not = dependencies_not
self.exec_func = exec_func
self._preview_func = preview_func
self.mandatory = mandatory
self._no_store = no_store
self._default = default
@property
def default(self) -> Any:
return self._default
@property
def description(self) -> str:
return self._description
@property
def dependencies(self) -> List:
return self._dependencies
@property
def dependencies_not(self) -> List:
return self._dependencies_not
@property
def current_selection(self) -> Optional[Any]:
return self._current_selection
@property
def preview_func(self):
return self._preview_func
self.description = description
self.func = func
self.current_selection = default
self.enabled = enabled
self.dependencies = dependencies
self.dependencies_not = dependencies_not
self.exec_func = exec_func
self.preview_func = preview_func
self.mandatory = mandatory
self.default = default
def do_store(self) -> bool:
return self._no_store is False
@ -112,45 +90,45 @@ class Selector:
self.enabled = status
def update_description(self, description: str):
self._description = description
self.description = description
def menu_text(self, padding: int = 0) -> str:
if self._description == '': # special menu option for __separator__
if self.description == '': # special menu option for __separator__
return ''
current = ''
if self._display_func:
current = self._display_func(self._current_selection)
current = self._display_func(self.current_selection)
else:
if self._current_selection is not None:
current = str(self._current_selection)
if self.current_selection is not None:
current = str(self.current_selection)
if current:
padding += 5
description = unicode_ljust(str(self._description), padding, ' ')
description = unicode_ljust(str(self.description), padding, ' ')
current = current
else:
description = self._description
description = self.description
current = ''
return f'{description} {current}'
def set_current_selection(self, current: Optional[Any]):
self._current_selection = current
self.current_selection = current
def has_selection(self) -> bool:
if not self._current_selection:
if not self.current_selection:
return False
return True
def get_selection(self) -> Any:
return self._current_selection
return self.current_selection
def is_empty(self) -> bool:
if self._current_selection is None:
if self.current_selection is None:
return True
elif isinstance(self._current_selection, (str, list, dict)) and len(self._current_selection) == 0:
elif isinstance(self.current_selection, (str, list, dict)) and len(self.current_selection) == 0:
return True
return False
@ -197,6 +175,8 @@ class AbstractMenu:
self._sync_all()
self._populate_default_values()
self.defined_text = str(_('Defined'))
@property
def last_choice(self):
return self._last_choice
@ -382,9 +362,10 @@ class AbstractMenu:
result = None
if selector.func is not None:
presel_val = self.option(config_name).get_selection()
result = selector.func(presel_val)
cur_value = self.option(config_name).get_selection()
result = selector.func(cur_value)
self._menu_options[config_name].set_current_selection(result)
if selector.do_store():
self._data_store[config_name] = result
@ -398,19 +379,23 @@ class AbstractMenu:
return True
def _verify_selection_enabled(self, selection_name: str) -> bool:
""" general """
if selection := self._menu_options.get(selection_name, None):
if not selection.enabled:
return False
if len(selection.dependencies) > 0:
for d in selection.dependencies:
if not self._verify_selection_enabled(d) or self._menu_options[d].is_empty():
return False
for dep in selection.dependencies:
if isinstance(dep, str):
if not self._verify_selection_enabled(dep) or self._menu_options[dep].is_empty():
return False
elif callable(dep): # callable dependency eval
return dep()
else:
raise ValueError(f'Unsupported dependency: {selection_name}')
if len(selection.dependencies_not) > 0:
for d in selection.dependencies_not:
if not self._menu_options[d].is_empty():
for dep in selection.dependencies_not:
if not self._menu_options[dep].is_empty():
return False
return True
@ -454,8 +439,8 @@ class AbstractMenu:
class AbstractSubMenu(AbstractMenu):
def __init__(self, data_store: Dict[str, Any] = {}):
super().__init__(data_store=data_store)
def __init__(self, data_store: Dict[str, Any] = {}, preview_size: float = 0.2):
super().__init__(data_store=data_store, preview_size=preview_size)
self._menu_options['__separator__'] = Selector('')
self._menu_options['back'] = \

View File

@ -3,6 +3,7 @@ from os import system
from typing import Any, TYPE_CHECKING, Dict, Optional, Tuple, List
from .menu import Menu
from ..output import FormattedOutput
if TYPE_CHECKING:
_: Any
@ -127,6 +128,25 @@ class ListManager:
if choice.value and choice.value != self._cancel_action:
self._data = self.handle_action(choice.value, entry, self._data)
def reformat(self, data: List[Any]) -> Dict[str, Optional[Any]]:
"""
Default implementation of the table to be displayed.
Override if any custom formatting is needed
"""
table = FormattedOutput.as_table(data)
rows = table.split('\n')
# these are the header rows of the table and do not map to any User obviously
# we're adding 2 spaces as prefix because the menu selector '> ' will be put before
# the selectable rows so the header has to be aligned
display_data: Dict[str, Optional[Any]] = {f' {rows[0]}': None, f' {rows[1]}': None}
for row, entry in zip(rows[2:], data):
row = row.replace('|', '\\|')
display_data[row] = entry
return display_data
def selected_action_display(self, selection: Any) -> str:
"""
this will return the value to be displayed in the
@ -134,14 +154,6 @@ class ListManager:
"""
raise NotImplementedError('Please implement me in the child class')
def reformat(self, data: List[Any]) -> Dict[str, Optional[Any]]:
"""
this should return a dictionary of display string to actual data entry
mapping; if the value for a given display string is None it will be used
in the header value (useful when displaying tables)
"""
raise NotImplementedError('Please implement me in the child class')
def handle_action(self, action: Any, entry: Optional[Any], data: List[Any]) -> List[Any]:
"""
this function is called when a base action or

View File

@ -66,7 +66,7 @@ class Menu(TerminalMenu):
sort: bool = True,
preset_values: Optional[Union[str, List[str]]] = None,
cursor_index: Optional[int] = None,
preview_command: Optional[Callable] = None,
preview_command: Optional[Callable[[Any], str | None]] = None,
preview_size: float = 0.0,
preview_title: str = 'Info',
header: Union[List[str], str] = [],
@ -228,7 +228,11 @@ class Menu(TerminalMenu):
default_str = str(_('(default)'))
return f'{self._default_option} {default_str}'
def _show_preview(self, preview_command: Optional[Callable], selection: str) -> Optional[str]:
def _show_preview(
self,
preview_command: Optional[Callable[[Any], str | None]],
selection: str
) -> Optional[str]:
if selection == self.back():
return None

View File

@ -19,6 +19,7 @@ class TableMenu(Menu):
preview_size: float = 0.0,
allow_reset: bool = True,
allow_reset_warning_msg: Optional[str] = None,
skip: bool = True
):
"""
param title: Text that will be displayed above the menu
@ -81,7 +82,8 @@ class TableMenu(Menu):
preview_title=preview_title,
extra_bottom_space=extra_bottom_space,
allow_reset=allow_reset,
allow_reset_warning_msg=allow_reset_warning_msg
allow_reset_warning_msg=allow_reset_warning_msg,
skip=skip
)
def _preset_values(self, preset: List[Any]) -> List[str]:

View File

@ -121,21 +121,6 @@ class CustomMirrorList(ListManager):
]
super().__init__(prompt, custom_mirrors, [self._actions[0]], self._actions[1:])
def reformat(self, data: List[CustomMirror]) -> Dict[str, Any]:
table = FormattedOutput.as_table(data)
rows = table.split('\n')
# these are the header rows of the table and do not map to any User obviously
# we're adding 2 spaces as prefix because the menu selector '> ' will be put before
# the selectable rows so the header has to be aligned
display_data: Dict[str, Optional[CustomMirror]] = {f' {rows[0]}': None, f' {rows[1]}': None}
for row, user in zip(rows[2:], data):
row = row.replace('|', '\\|')
display_data[row] = user
return display_data
def selected_action_display(self, mirror: CustomMirror) -> str:
return mirror.name

View File

@ -104,7 +104,7 @@ def perform_installation(mountpoint: Path):
Only requirement is that the block devices are
formatted and setup prior to entering this function.
"""
info('Starting installation')
info('Starting installation...')
disk_config: disk.DiskLayoutConfiguration = archinstall.arguments['disk_config']
# Retrieve list of additional repositories and set boolean values appropriately

View File

@ -82,7 +82,7 @@ def perform_installation(mountpoint: Path):
Only requirement is that the block devices are
formatted and setup prior to entering this function.
"""
info('Starting installation')
info('Starting installation...')
disk_config: disk.DiskLayoutConfiguration = archinstall.arguments['disk_config']
# Retrieve list of additional repositories and set boolean values appropriately