Fix most of the mypy errors in archinstall/lib/disk/ (#2639)

This commit is contained in:
correctmost 2024-08-27 15:40:51 -04:00 committed by GitHub
parent 1e7f1194d7
commit 63b4184f70
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
7 changed files with 58 additions and 58 deletions

View File

@ -36,7 +36,7 @@ if TYPE_CHECKING:
class DeviceHandler(object):
_TMP_BTRFS_MOUNT = Path('/mnt/arch_btrfs')
def __init__(self):
def __init__(self) -> None:
self._devices: Dict[Path, BDevice] = {}
self.load_devices()
@ -44,7 +44,7 @@ class DeviceHandler(object):
def devices(self) -> List[BDevice]:
return list(self._devices.values())
def load_devices(self):
def load_devices(self) -> None:
block_devices = {}
SysCommand('udevadm settle')
@ -216,7 +216,7 @@ class DeviceHandler(object):
fs_type: FilesystemType,
path: Path,
additional_parted_options: List[str] = []
):
) -> None:
mkfs_type = fs_type.value
options = []
@ -282,7 +282,7 @@ class DeviceHandler(object):
mapper_name: Optional[str],
fs_type: FilesystemType,
enc_conf: DiskEncryption
):
) -> None:
luks_handler = Luks2(
dev_path,
mapper_name=mapper_name,
@ -384,33 +384,33 @@ class DeviceHandler(object):
return self._lvm_info_with_retry(cmd, 'pvseg')
def lvm_vol_change(self, vol: LvmVolume, activate: bool):
def lvm_vol_change(self, vol: LvmVolume, activate: bool) -> None:
active_flag = 'y' if activate else 'n'
cmd = f'lvchange -a {active_flag} {vol.safe_dev_path}'
debug(f'lvchange volume: {cmd}')
SysCommand(cmd)
def lvm_export_vg(self, vg: LvmVolumeGroup):
def lvm_export_vg(self, vg: LvmVolumeGroup) -> None:
cmd = f'vgexport {vg.name}'
debug(f'vgexport: {cmd}')
SysCommand(cmd)
def lvm_import_vg(self, vg: LvmVolumeGroup):
def lvm_import_vg(self, vg: LvmVolumeGroup) -> None:
cmd = f'vgimport {vg.name}'
debug(f'vgimport: {cmd}')
SysCommand(cmd)
def lvm_vol_reduce(self, vol_path: Path, amount: Size):
def lvm_vol_reduce(self, vol_path: Path, amount: Size) -> None:
val = amount.format_size(Unit.B, include_unit=False)
cmd = f'lvreduce -L -{val}B {vol_path}'
debug(f'Reducing LVM volume size: {cmd}')
SysCommand(cmd)
def lvm_pv_create(self, pvs: Iterable[Path]):
def lvm_pv_create(self, pvs: Iterable[Path]) -> None:
cmd = 'pvcreate ' + ' '.join([str(pv) for pv in pvs])
debug(f'Creating LVM PVS: {cmd}')
@ -418,7 +418,7 @@ class DeviceHandler(object):
worker.poll()
worker.write(b'y\n', line_ending=False)
def lvm_vg_create(self, pvs: Iterable[Path], vg_name: str):
def lvm_vg_create(self, pvs: Iterable[Path], vg_name: str) -> None:
pvs_str = ' '.join([str(pv) for pv in pvs])
cmd = f'vgcreate --yes {vg_name} {pvs_str}'
@ -428,7 +428,7 @@ class DeviceHandler(object):
worker.poll()
worker.write(b'y\n', line_ending=False)
def lvm_vol_create(self, vg_name: str, volume: LvmVolume, offset: Optional[Size] = None):
def lvm_vol_create(self, vg_name: str, volume: LvmVolume, offset: Optional[Size] = None) -> None:
if offset is not None:
length = volume.length - offset
else:
@ -452,7 +452,7 @@ class DeviceHandler(object):
block_device: BDevice,
disk: Disk,
requires_delete: bool
):
) -> None:
# when we require a delete and the partition to be (re)created
# already exists then we have to delete it first
if requires_delete and part_mod.status in [ModificationStatus.Modify, ModificationStatus.Delete]:
@ -535,7 +535,7 @@ class DeviceHandler(object):
path: Path,
btrfs_subvols: List[SubvolumeModification],
mount_options: List[str]
):
) -> None:
info(f'Creating subvolumes: {path}')
self.mount(path, self._TMP_BTRFS_MOUNT, create_target_mountpoint=True)
@ -565,7 +565,7 @@ class DeviceHandler(object):
self,
part_mod: PartitionModification,
enc_conf: Optional['DiskEncryption'] = None
):
) -> None:
info(f'Creating subvolumes: {part_mod.safe_dev_path}')
# unlock the partition first if it's encrypted
@ -617,7 +617,7 @@ class DeviceHandler(object):
return luks_handler
def umount_all_existing(self, device_path: Path):
def umount_all_existing(self, device_path: Path) -> None:
debug(f'Unmounting all existing partitions: {device_path}')
existing_partitions = self._devices[device_path].partition_infos
@ -635,7 +635,7 @@ class DeviceHandler(object):
self,
modification: DeviceModification,
partition_table: Optional[PartitionTable] = None
):
) -> None:
"""
Create a partition table on the block device and create all partitions.
"""
@ -675,7 +675,7 @@ class DeviceHandler(object):
mount_fs: Optional[str] = None,
create_target_mountpoint: bool = True,
options: List[str] = []
):
) -> None:
if create_target_mountpoint and not target_mountpoint.exists():
target_mountpoint.mkdir(parents=True, exist_ok=True)
@ -705,7 +705,7 @@ class DeviceHandler(object):
except SysCallError as err:
raise DiskError(f'Could not mount {dev_path}: {command}\n{err.message}')
def umount(self, mountpoint: Path, recursive: bool = False):
def umount(self, mountpoint: Path, recursive: bool = False) -> None:
lsblk_info = get_lsblk_info(mountpoint)
if not lsblk_info.mountpoints:
@ -748,7 +748,7 @@ class DeviceHandler(object):
return device_mods
def partprobe(self, path: Optional[Path] = None):
def partprobe(self, path: Optional[Path] = None) -> None:
if path is not None:
command = f'partprobe {path}'
else:
@ -763,7 +763,7 @@ class DeviceHandler(object):
else:
error(f'"{command}" failed to run (continuing anyway): {err}')
def _wipe(self, dev_path: Path):
def _wipe(self, dev_path: Path) -> None:
"""
Wipe a device (partition or otherwise) of meta-data, be it file system, LVM, etc.
@param dev_path: Device path of the partition to be wiped.
@ -772,7 +772,7 @@ class DeviceHandler(object):
with open(dev_path, 'wb') as p:
p.write(bytearray(1024))
def wipe_dev(self, block_device: BDevice):
def wipe_dev(self, block_device: BDevice) -> None:
"""
Wipe the block device of meta-data, be it file system, LVM, etc.
This is not intended to be secure, but rather to ensure that

View File

@ -178,7 +178,7 @@ class SectorSize:
value: int
unit: Unit
def __post_init__(self):
def __post_init__(self) -> None:
match self.unit:
case Unit.sectors:
raise ValueError('Unit type sector not allowed for SectorSize')
@ -204,7 +204,7 @@ class SectorSize:
"""
will normalize the value of the unit to Byte
"""
return int(self.value * self.unit.value) # type: ignore
return int(self.value * self.unit.value)
@dataclass
@ -213,7 +213,7 @@ class Size:
unit: Unit
sector_size: SectorSize
def __post_init__(self):
def __post_init__(self) -> None:
if not isinstance(self.sector_size, SectorSize):
raise ValueError('sector size must be of type SectorSize')
@ -253,7 +253,7 @@ class Size:
sectors = math.ceil(norm / sector_size.value)
return Size(sectors, Unit.sectors, sector_size)
else:
value = int(self._normalize() / target_unit.value) # type: ignore
value = int(self._normalize() / target_unit.value)
return Size(value, target_unit, self.sector_size)
def as_text(self) -> str:
@ -293,7 +293,7 @@ class Size:
"""
if self.unit == Unit.sectors and self.sector_size is not None:
return self.value * self.sector_size.normalize()
return int(self.value * self.unit.value) # type: ignore
return int(self.value * self.unit.value)
def __sub__(self, other: Size) -> Size:
src_norm = self._normalize()
@ -305,22 +305,22 @@ class Size:
dest_norm = other._normalize()
return Size(abs(src_norm + dest_norm), Unit.B, self.sector_size)
def __lt__(self, other):
def __lt__(self, other: Size) -> bool:
return self._normalize() < other._normalize()
def __le__(self, other):
def __le__(self, other: Size) -> bool:
return self._normalize() <= other._normalize()
def __eq__(self, other):
def __eq__(self, other) -> bool:
return self._normalize() == other._normalize()
def __ne__(self, other):
def __ne__(self, other) -> bool:
return self._normalize() != other._normalize()
def __gt__(self, other):
def __gt__(self, other: Size) -> bool:
return self._normalize() > other._normalize()
def __ge__(self, other):
def __ge__(self, other: Size) -> bool:
return self._normalize() >= other._normalize()
@ -556,7 +556,7 @@ class BDevice:
device_info: _DeviceInfo
partition_infos: List[_PartitionInfo]
def __hash__(self):
def __hash__(self) -> int:
return hash(self.disk.device.path)
@ -680,7 +680,7 @@ class PartitionModification:
_efi_indicator_flags = (PartitionFlag.Boot, PartitionFlag.ESP)
_boot_indicator_flags = (PartitionFlag.Boot, PartitionFlag.XBOOTLDR)
def __post_init__(self):
def __post_init__(self) -> None:
# needed to use the object as a dictionary key due to hash func
if not hasattr(self, '_obj_id'):
self._obj_id = uuid.uuid4()
@ -691,7 +691,7 @@ class PartitionModification:
if self.fs_type is None and self.status == ModificationStatus.Modify:
raise ValueError('FS type must not be empty on modifications with status type modify')
def __hash__(self):
def __hash__(self) -> int:
return hash(self._obj_id)
@property
@ -796,11 +796,11 @@ class PartitionModification:
return f'{storage.get("ENC_IDENTIFIER", "ai")}{self.dev_path.name}'
return None
def set_flag(self, flag: PartitionFlag):
def set_flag(self, flag: PartitionFlag) -> None:
if flag not in self.flags:
self.flags.append(flag)
def invert_flag(self, flag: PartitionFlag):
def invert_flag(self, flag: PartitionFlag) -> None:
if flag in self.flags:
self.flags = [f for f in self.flags if f != flag]
else:
@ -915,12 +915,12 @@ class LvmVolume:
# mapper device path /dev/<vg>/<vol>
dev_path: Optional[Path] = None
def __post_init__(self):
def __post_init__(self) -> None:
# needed to use the object as a dictionary key due to hash func
if not hasattr(self, '_obj_id'):
self._obj_id = uuid.uuid4()
def __hash__(self):
def __hash__(self) -> int:
return hash(self._obj_id)
@property
@ -1050,7 +1050,7 @@ class LvmConfiguration:
config_type: LvmLayoutType
vol_groups: List[LvmVolumeGroup]
def __post_init__(self):
def __post_init__(self) -> None:
# make sure all volume groups have unique PVs
pvs = []
for group in self.vol_groups:
@ -1121,7 +1121,7 @@ class DeviceModification:
def device_path(self) -> Path:
return self.device.device_info.path
def add_partition(self, partition: PartitionModification):
def add_partition(self, partition: PartitionModification) -> None:
self.partitions.append(partition)
def get_efi_partition(self) -> Optional[PartitionModification]:
@ -1196,7 +1196,7 @@ class DiskEncryption:
lvm_volumes: List[LvmVolume] = field(default_factory=list)
hsm_device: Optional[Fido2Device] = None
def __post_init__(self):
def __post_init__(self) -> None:
if self.encryption_type in [EncryptionType.Luks, EncryptionType.LvmOnLuks] and not self.partitions:
raise ValueError('Luks or LvmOnLuks encryption require partitions to be defined')

View File

@ -29,7 +29,7 @@ class DiskLayoutConfigurationMenu(AbstractSubMenu):
super().__init__(data_store=data_store, preview_size=0.5)
def setup_selection_menu_options(self):
def setup_selection_menu_options(self) -> None:
self._menu_options['disk_config'] = \
Selector(
_('Partitioning'),

View File

@ -40,7 +40,7 @@ class DiskEncryptionMenu(AbstractSubMenu):
self._disk_config = disk_config
super().__init__(data_store=data_store)
def setup_selection_menu_options(self):
def setup_selection_menu_options(self) -> None:
self._menu_options['encryption_type'] = \
Selector(
_('Encryption type'),

View File

@ -74,7 +74,7 @@ class Fido2:
hsm_device: Fido2Device,
dev_path: Path,
password: str
):
) -> None:
worker = SysCommandWorker(f"systemd-cryptenroll --fido2-device={hsm_device.path} {dev_path}", peek_output=True)
pw_inputted = False
pin_inputted = False

View File

@ -32,7 +32,7 @@ class FilesystemHandler:
self._disk_config = disk_config
self._enc_config = enc_conf
def perform_filesystem_operations(self, show_countdown: bool = True):
def perform_filesystem_operations(self, show_countdown: bool = True) -> None:
if self._disk_config.config_type == DiskLayoutType.Pre_mount:
debug('Disk layout configuration is set to pre-mount, not performing any operations')
return
@ -90,7 +90,7 @@ class FilesystemHandler:
self,
partitions: List[PartitionModification],
device_path: Path
):
) -> None:
"""
Format can be given an overriding path, for instance /dev/null to test
the formatting functionality and in essence the support for the given filesystem.
@ -122,7 +122,7 @@ class FilesystemHandler:
part_mod.partuuid = lsblk_info.partuuid
part_mod.uuid = lsblk_info.uuid
def _validate_partitions(self, partitions: List[PartitionModification]):
def _validate_partitions(self, partitions: List[PartitionModification]) -> None:
checks = {
# verify that all partitions have a path set (which implies that they have been created)
lambda x: x.dev_path is None: ValueError('When formatting, all partitions must have a path set'),
@ -138,7 +138,7 @@ class FilesystemHandler:
if found is not None:
raise exc
def perform_lvm_operations(self):
def perform_lvm_operations(self) -> None:
info('Setting up LVM config...')
if not self._disk_config.lvm_config:
@ -153,7 +153,7 @@ class FilesystemHandler:
self._setup_lvm(self._disk_config.lvm_config)
self._format_lvm_vols(self._disk_config.lvm_config)
def _setup_lvm_encrypted(self, lvm_config: LvmConfiguration, enc_config: DiskEncryption):
def _setup_lvm_encrypted(self, lvm_config: LvmConfiguration, enc_config: DiskEncryption) -> None:
if enc_config.encryption_type == EncryptionType.LvmOnLuks:
enc_mods = self._encrypt_partitions(enc_config, lock_after_create=False)
@ -175,7 +175,7 @@ class FilesystemHandler:
self._safely_close_lvm(lvm_config)
def _safely_close_lvm(self, lvm_config: LvmConfiguration):
def _safely_close_lvm(self, lvm_config: LvmConfiguration) -> None:
for vg in lvm_config.vol_groups:
for vol in vg.volumes:
device_handler.lvm_vol_change(vol, False)
@ -186,7 +186,7 @@ class FilesystemHandler:
self,
lvm_config: LvmConfiguration,
enc_mods: Dict[PartitionModification, Luks2] = {}
):
) -> None:
self._lvm_create_pvs(lvm_config, enc_mods)
for vg in lvm_config.vol_groups:
@ -233,7 +233,7 @@ class FilesystemHandler:
self,
lvm_config: LvmConfiguration,
enc_vols: Dict[LvmVolume, Luks2] = {}
):
) -> None:
for vol in lvm_config.get_all_volumes():
if enc_vol := enc_vols.get(vol, None):
if not enc_vol.mapper_dev:
@ -253,7 +253,7 @@ class FilesystemHandler:
self,
lvm_config: LvmConfiguration,
enc_mods: Dict[PartitionModification, Luks2] = {}
):
) -> None:
pv_paths: Set[Path] = set()
for vg in lvm_config.vol_groups:
@ -328,7 +328,7 @@ class FilesystemHandler:
return enc_mods
def _lvm_vol_handle_e2scrub(self, vol_gp: LvmVolumeGroup):
def _lvm_vol_handle_e2scrub(self, vol_gp: LvmVolumeGroup) -> None:
# from arch wiki:
# If a logical volume will be formatted with ext4, leave at least 256 MiB
# free space in the volume group to allow using e2scrub

View File

@ -148,7 +148,7 @@ class PartitioningList(ListManager):
self,
partition: PartitionModification,
option: BtrfsMountOption
):
) -> None:
if option.value not in partition.mount_options:
if option == BtrfsMountOption.compress:
partition.mount_options = [
@ -167,13 +167,13 @@ class PartitioningList(ListManager):
o for o in partition.mount_options if o != option.value
]
def _set_btrfs_subvolumes(self, partition: PartitionModification):
def _set_btrfs_subvolumes(self, partition: PartitionModification) -> None:
partition.btrfs_subvols = SubvolumeMenu(
_("Manage btrfs subvolumes for current partition"),
partition.btrfs_subvols
).run()
def _prompt_formatting(self, partition: PartitionModification):
def _prompt_formatting(self, partition: PartitionModification) -> None:
# an existing partition can toggle between Exist or Modify
if partition.is_modify():
partition.status = ModificationStatus.Exist