Fix various mypy errors in archinstall/lib/ (#2640)

* Fix various mypy errors in archinstall/lib/

* Additional mypy fixes in archinstall/lib/ to pass CI checks
This commit is contained in:
correctmost 2024-08-27 15:43:54 -04:00 committed by GitHub
parent 63b4184f70
commit c7aeead7d2
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
25 changed files with 121 additions and 121 deletions

View File

@ -94,7 +94,7 @@ class Boot:
return self.session.is_alive() return self.session.is_alive()
def SysCommand(self, cmd: list, *args, **kwargs) -> SysCommand: def SysCommand(self, cmd: list[str], *args, **kwargs) -> SysCommand:
if cmd[0][0] != '/' and cmd[0][:2] != './': if cmd[0][0] != '/' and cmd[0][:2] != './':
# This check is also done in SysCommand & SysCommandWorker. # This check is also done in SysCommand & SysCommandWorker.
# However, that check is done for `machinectl` and not for our chroot command. # However, that check is done for `machinectl` and not for our chroot command.
@ -104,7 +104,7 @@ class Boot:
return SysCommand(["systemd-run", f"--machine={self.container_name}", "--pty", *cmd], *args, **kwargs) return SysCommand(["systemd-run", f"--machine={self.container_name}", "--pty", *cmd], *args, **kwargs)
def SysCommandWorker(self, cmd: list, *args, **kwargs) -> SysCommandWorker: def SysCommandWorker(self, cmd: list[str], *args, **kwargs) -> SysCommandWorker:
if cmd[0][0] != '/' and cmd[0][:2] != './': if cmd[0][0] != '/' and cmd[0][:2] != './':
cmd[0] = locate_binary(cmd[0]) cmd[0] = locate_binary(cmd[0])

View File

@ -36,14 +36,14 @@ class ConfigurationOutput:
self._process_config() self._process_config()
@property @property
def user_credentials_file(self): def user_credentials_file(self) -> str:
return self._user_creds_file return self._user_creds_file
@property @property
def user_configuration_file(self): def user_configuration_file(self) -> str:
return self._user_config_file return self._user_config_file
def _process_config(self): def _process_config(self) -> None:
for key, value in self._config.items(): for key, value in self._config.items():
if key in self._sensitive: if key in self._sensitive:
self._user_credentials[key] = value self._user_credentials[key] = value
@ -68,7 +68,7 @@ class ConfigurationOutput:
return json.dumps(self._user_credentials, indent=4, sort_keys=True, cls=UNSAFE_JSON) return json.dumps(self._user_credentials, indent=4, sort_keys=True, cls=UNSAFE_JSON)
return None return None
def show(self): def show(self) -> None:
print(_('\nThis is your chosen configuration:')) print(_('\nThis is your chosen configuration:'))
debug(" -- Chosen configuration --") debug(" -- Chosen configuration --")
@ -84,20 +84,20 @@ class ConfigurationOutput:
) )
return dest_path_ok return dest_path_ok
def save_user_config(self, dest_path: Path): def save_user_config(self, dest_path: Path) -> None:
if self._is_valid_path(dest_path): if self._is_valid_path(dest_path):
target = dest_path / self._user_config_file target = dest_path / self._user_config_file
target.write_text(self.user_config_to_json()) target.write_text(self.user_config_to_json())
os.chmod(target, stat.S_IRUSR | stat.S_IWUSR | stat.S_IRGRP) os.chmod(target, stat.S_IRUSR | stat.S_IWUSR | stat.S_IRGRP)
def save_user_creds(self, dest_path: Path): def save_user_creds(self, dest_path: Path) -> None:
if self._is_valid_path(dest_path): if self._is_valid_path(dest_path):
if user_creds := self.user_credentials_to_json(): if user_creds := self.user_credentials_to_json():
target = dest_path / self._user_creds_file target = dest_path / self._user_creds_file
target.write_text(user_creds) target.write_text(user_creds)
os.chmod(target, stat.S_IRUSR | stat.S_IWUSR | stat.S_IRGRP) os.chmod(target, stat.S_IRUSR | stat.S_IWUSR | stat.S_IRGRP)
def save(self, dest_path: Optional[Path] = None): def save(self, dest_path: Optional[Path] = None) -> None:
dest_path = dest_path or self._default_save_path dest_path = dest_path or self._default_save_path
if self._is_valid_path(dest_path): if self._is_valid_path(dest_path):
@ -105,8 +105,8 @@ class ConfigurationOutput:
self.save_user_creds(dest_path) self.save_user_creds(dest_path)
def save_config(config: Dict): def save_config(config: Dict) -> None:
def preview(selection: str): def preview(selection: str) -> Optional[str]:
match options[selection]: match options[selection]:
case "user_config": case "user_config":
serialized = config_output.user_config_to_json() serialized = config_output.user_config_to_json()

View File

@ -430,7 +430,7 @@ class SysCommand:
return True return True
def decode(self, encoding: str = 'utf-8', errors='backslashreplace', strip: bool = True) -> str: def decode(self, encoding: str = 'utf-8', errors: str = 'backslashreplace', strip: bool = True) -> str:
if not self.session: if not self.session:
raise ValueError('No session available to decode') raise ValueError('No session available to decode')
@ -520,6 +520,6 @@ def json_stream_to_structure(configuration_identifier : str, stream :str, target
return True return True
def secret(x :str): def secret(x :str) -> str:
""" return * with len equal to to the input string """ """ return * with len equal to to the input string """
return '*' * len(x) return '*' * len(x)

View File

@ -38,7 +38,7 @@ class GlobalMenu(AbstractMenu):
def __init__(self, data_store: Dict[str, Any]): def __init__(self, data_store: Dict[str, Any]):
super().__init__(data_store=data_store, auto_cursor=True, preview_size=0.3) super().__init__(data_store=data_store, auto_cursor=True, preview_size=0.3)
def setup_selection_menu_options(self): def setup_selection_menu_options(self) -> None:
# archinstall.Language will not use preset values # archinstall.Language will not use preset values
self._menu_options['archinstall-language'] = \ self._menu_options['archinstall-language'] = \
Selector( Selector(
@ -181,7 +181,7 @@ class GlobalMenu(AbstractMenu):
self._menu_options['abort'] = Selector(_('Abort'), exec_func=lambda n,v:exit(1)) self._menu_options['abort'] = Selector(_('Abort'), exec_func=lambda n,v:exit(1))
def _missing_configs(self) -> List[str]: def _missing_configs(self) -> List[str]:
def check(s) -> bool: def check(s: str) -> bool:
obj = self._menu_options.get(s) obj = self._menu_options.get(s)
if obj and obj.has_selection(): if obj and obj.has_selection():
return True return True
@ -216,7 +216,7 @@ class GlobalMenu(AbstractMenu):
return False return False
return self._validate_bootloader() is None return self._validate_bootloader() is None
def _update_uki_display(self, name: Optional[str] = None): def _update_uki_display(self, name: Optional[str] = None) -> None:
if bootloader := self._menu_options['bootloader'].current_selection: if bootloader := self._menu_options['bootloader'].current_selection:
if not SysInfo.has_uefi() or not bootloader.has_uki_support(): if not SysInfo.has_uefi() or not bootloader.has_uki_support():
self._menu_options['uki'].set_current_selection(False) self._menu_options['uki'].set_current_selection(False)
@ -224,15 +224,15 @@ class GlobalMenu(AbstractMenu):
elif name and name == 'bootloader': elif name and name == 'bootloader':
self._menu_options['uki'].set_enabled(True) self._menu_options['uki'].set_enabled(True)
def _update_install_text(self, name: Optional[str] = None, value: Any = None): def _update_install_text(self, name: Optional[str] = None, value: Any = None) -> None:
text = self._install_text() text = self._install_text()
self._menu_options['install'].update_description(text) self._menu_options['install'].update_description(text)
def post_callback(self, name: Optional[str] = None, value: Any = None): def post_callback(self, name: Optional[str] = None, value: Any = None) -> None:
self._update_uki_display(name) self._update_uki_display(name)
self._update_install_text(name, value) self._update_install_text(name, value)
def _install_text(self): def _install_text(self) -> str:
missing = len(self._missing_configs()) missing = len(self._missing_configs())
if missing > 0: if missing > 0:
return _('Install ({} config(s) missing)').format(missing) return _('Install ({} config(s) missing)').format(missing)
@ -281,7 +281,7 @@ class GlobalMenu(AbstractMenu):
return output return output
return None return None
def _prev_additional_pkgs(self): def _prev_additional_pkgs(self) -> Optional[str]:
selector = self._menu_options['packages'] selector = self._menu_options['packages']
if selector.current_selection: if selector.current_selection:
packages: List[str] = selector.current_selection packages: List[str] = selector.current_selection

View File

@ -141,7 +141,7 @@ class GfxDriver(Enum):
return packages return packages
class _SysInfo: class _SysInfo:
def __init__(self): def __init__(self) -> None:
pass pass
@cached_property @cached_property

View File

@ -97,7 +97,7 @@ class Installer:
def __enter__(self) -> 'Installer': def __enter__(self) -> 'Installer':
return self return self
def __exit__(self, exc_type, exc_val, exc_tb): def __exit__(self, exc_type, exc_val, exc_tb) -> bool:
if exc_type is not None: if exc_type is not None:
error(exc_val) error(exc_val)
@ -126,15 +126,15 @@ class Installer:
self.sync_log_to_install_medium() self.sync_log_to_install_medium()
return False return False
def remove_mod(self, mod: str): def remove_mod(self, mod: str) -> None:
if mod in self._modules: if mod in self._modules:
self._modules.remove(mod) self._modules.remove(mod)
def append_mod(self, mod: str): def append_mod(self, mod: str) -> None:
if mod not in self._modules: if mod not in self._modules:
self._modules.append(mod) self._modules.append(mod)
def _verify_service_stop(self): def _verify_service_stop(self) -> None:
""" """
Certain services might be running that affects the system during installation. Certain services might be running that affects the system during installation.
One such service is "reflector.service" which updates /etc/pacman.d/mirrorlist One such service is "reflector.service" which updates /etc/pacman.d/mirrorlist
@ -177,7 +177,7 @@ class Installer:
while self._service_state('archlinux-keyring-wkd-sync.service') not in ('dead', 'failed', 'exited'): while self._service_state('archlinux-keyring-wkd-sync.service') not in ('dead', 'failed', 'exited'):
time.sleep(1) time.sleep(1)
def _verify_boot_part(self): def _verify_boot_part(self) -> None:
""" """
Check that mounted /boot device has at minimum size for installation Check that mounted /boot device has at minimum size for installation
The reason this check is here is to catch pre-mounted device configuration and potentially The reason this check is here is to catch pre-mounted device configuration and potentially
@ -221,7 +221,7 @@ class Installer:
# mount all regular partitions # mount all regular partitions
self._mount_partition_layout(luks_handlers) self._mount_partition_layout(luks_handlers)
def _mount_partition_layout(self, luks_handlers: Dict[Any, Luks2]): def _mount_partition_layout(self, luks_handlers: Dict[Any, Luks2]) -> None:
debug('Mounting partition layout') debug('Mounting partition layout')
# do not mount any PVs part of the LVM configuration # do not mount any PVs part of the LVM configuration
@ -252,7 +252,7 @@ class Installer:
else: else:
self._mount_partition(part_mod) self._mount_partition(part_mod)
def _mount_lvm_layout(self, luks_handlers: Dict[Any, Luks2] = {}): def _mount_lvm_layout(self, luks_handlers: Dict[Any, Luks2] = {}) -> None:
lvm_config = self._disk_config.lvm_config lvm_config = self._disk_config.lvm_config
if not lvm_config: if not lvm_config:
@ -284,7 +284,7 @@ class Installer:
if part_mod.mapper_name and part_mod.dev_path if part_mod.mapper_name and part_mod.dev_path
} }
def _import_lvm(self): def _import_lvm(self) -> None:
lvm_config = self._disk_config.lvm_config lvm_config = self._disk_config.lvm_config
if not lvm_config: if not lvm_config:
@ -311,7 +311,7 @@ class Installer:
if vol.mapper_name and vol.dev_path if vol.mapper_name and vol.dev_path
} }
def _mount_partition(self, part_mod: disk.PartitionModification): def _mount_partition(self, part_mod: disk.PartitionModification) -> None:
# it would be none if it's btrfs as the subvolumes will have the mountpoints defined # it would be none if it's btrfs as the subvolumes will have the mountpoints defined
if part_mod.mountpoint and part_mod.dev_path: if part_mod.mountpoint and part_mod.dev_path:
target = self.target / part_mod.relative_mountpoint target = self.target / part_mod.relative_mountpoint
@ -324,7 +324,7 @@ class Installer:
part_mod.mount_options part_mod.mount_options
) )
def _mount_lvm_vol(self, volume: disk.LvmVolume): def _mount_lvm_vol(self, volume: disk.LvmVolume) -> None:
if volume.fs_type != disk.FilesystemType.Btrfs: if volume.fs_type != disk.FilesystemType.Btrfs:
if volume.mountpoint and volume.dev_path: if volume.mountpoint and volume.dev_path:
target = self.target / volume.relative_mountpoint target = self.target / volume.relative_mountpoint
@ -333,7 +333,7 @@ class Installer:
if volume.fs_type == disk.FilesystemType.Btrfs and volume.dev_path: if volume.fs_type == disk.FilesystemType.Btrfs and volume.dev_path:
self._mount_btrfs_subvol(volume.dev_path, volume.btrfs_subvols, volume.mount_options) self._mount_btrfs_subvol(volume.dev_path, volume.btrfs_subvols, volume.mount_options)
def _mount_luks_partition(self, part_mod: disk.PartitionModification, luks_handler: Luks2): def _mount_luks_partition(self, part_mod: disk.PartitionModification, luks_handler: Luks2) -> None:
if part_mod.fs_type != disk.FilesystemType.Btrfs: if part_mod.fs_type != disk.FilesystemType.Btrfs:
if part_mod.mountpoint and luks_handler.mapper_dev: if part_mod.mountpoint and luks_handler.mapper_dev:
target = self.target / part_mod.relative_mountpoint target = self.target / part_mod.relative_mountpoint
@ -342,7 +342,7 @@ class Installer:
if part_mod.fs_type == disk.FilesystemType.Btrfs and luks_handler.mapper_dev: if part_mod.fs_type == disk.FilesystemType.Btrfs and luks_handler.mapper_dev:
self._mount_btrfs_subvol(luks_handler.mapper_dev, part_mod.btrfs_subvols, part_mod.mount_options) self._mount_btrfs_subvol(luks_handler.mapper_dev, part_mod.btrfs_subvols, part_mod.mount_options)
def _mount_luks_volume(self, volume: disk.LvmVolume, luks_handler: Luks2): def _mount_luks_volume(self, volume: disk.LvmVolume, luks_handler: Luks2) -> None:
if volume.fs_type != disk.FilesystemType.Btrfs: if volume.fs_type != disk.FilesystemType.Btrfs:
if volume.mountpoint and luks_handler.mapper_dev: if volume.mountpoint and luks_handler.mapper_dev:
target = self.target / volume.relative_mountpoint target = self.target / volume.relative_mountpoint
@ -356,7 +356,7 @@ class Installer:
dev_path: Path, dev_path: Path,
subvolumes: List[disk.SubvolumeModification], subvolumes: List[disk.SubvolumeModification],
mount_options: List[str] = [] mount_options: List[str] = []
): ) -> None:
for subvol in subvolumes: for subvol in subvolumes:
mountpoint = self.target / subvol.relative_mountpoint mountpoint = self.target / subvol.relative_mountpoint
mount_options = mount_options + [f'subvol={subvol.name}'] mount_options = mount_options + [f'subvol={subvol.name}']
@ -374,7 +374,7 @@ class Installer:
# so we won't need any keyfile generation atm # so we won't need any keyfile generation atm
pass pass
def _generate_key_files_partitions(self): def _generate_key_files_partitions(self) -> None:
for part_mod in self._disk_encryption.partitions: for part_mod in self._disk_encryption.partitions:
gen_enc_file = self._disk_encryption.should_generate_encryption_file(part_mod) gen_enc_file = self._disk_encryption.should_generate_encryption_file(part_mod)
@ -396,7 +396,7 @@ class Installer:
self._disk_encryption.encryption_password self._disk_encryption.encryption_password
) )
def _generate_key_file_lvm_volumes(self): def _generate_key_file_lvm_volumes(self) -> None:
for vol in self._disk_encryption.lvm_volumes: for vol in self._disk_encryption.lvm_volumes:
gen_enc_file = self._disk_encryption.should_generate_encryption_file(vol) gen_enc_file = self._disk_encryption.should_generate_encryption_file(vol)
@ -432,7 +432,7 @@ class Installer:
return True return True
def add_swapfile(self, size='4G', enable_resume=True, file='/swapfile'): def add_swapfile(self, size: str = '4G', enable_resume: bool = True, file: str = '/swapfile') -> None:
if file[:1] != '/': if file[:1] != '/':
file = f"/{file}" file = f"/{file}"
if len(file.strip()) <= 0 or file == '/': if len(file.strip()) <= 0 or file == '/':
@ -457,7 +457,7 @@ class Installer:
def post_install_check(self, *args: str, **kwargs: str) -> List[str]: def post_install_check(self, *args: str, **kwargs: str) -> List[str]:
return [step for step, flag in self.helper_flags.items() if flag is False] return [step for step, flag in self.helper_flags.items() if flag is False]
def set_mirrors(self, mirror_config: MirrorConfiguration, on_target: bool = False): def set_mirrors(self, mirror_config: MirrorConfiguration, on_target: bool = False) -> None:
""" """
Set the mirror configuration for the installation. Set the mirror configuration for the installation.
@ -496,7 +496,7 @@ class Installer:
with local_mirrorlist_conf.open('w') as fp: with local_mirrorlist_conf.open('w') as fp:
fp.write(mirrorlist_config) fp.write(mirrorlist_config)
def genfstab(self, flags: str = '-pU'): def genfstab(self, flags: str = '-pU') -> None:
fstab_path = self.target / "etc" / "fstab" fstab_path = self.target / "etc" / "fstab"
info(f"Updating {fstab_path}") info(f"Updating {fstab_path}")
@ -521,7 +521,7 @@ class Installer:
for entry in self._fstab_entries: for entry in self._fstab_entries:
fp.write(f'{entry}\n') fp.write(f'{entry}\n')
def set_hostname(self, hostname: str): def set_hostname(self, hostname: str) -> None:
with open(f'{self.target}/etc/hostname', 'w') as fh: with open(f'{self.target}/etc/hostname', 'w') as fh:
fh.write(hostname + '\n') fh.write(hostname + '\n')
@ -634,7 +634,7 @@ class Installer:
def drop_to_shell(self) -> None: def drop_to_shell(self) -> None:
subprocess.check_call(f"/usr/bin/arch-chroot {self.target}", shell=True) subprocess.check_call(f"/usr/bin/arch-chroot {self.target}", shell=True)
def configure_nic(self, nic: Nic): def configure_nic(self, nic: Nic) -> None:
conf = nic.as_systemd_config() conf = nic.as_systemd_config()
for plugin in plugins.values(): for plugin in plugins.values():
@ -739,7 +739,7 @@ class Installer:
return vendor.get_ucode() return vendor.get_ucode()
return None return None
def _handle_partition_installation(self): def _handle_partition_installation(self) -> None:
pvs = [] pvs = []
if self._disk_config.lvm_config: if self._disk_config.lvm_config:
pvs = self._disk_config.lvm_config.get_all_pvs() pvs = self._disk_config.lvm_config.get_all_pvs()
@ -776,7 +776,7 @@ class Installer:
if 'encrypt' not in self._hooks: if 'encrypt' not in self._hooks:
self._hooks.insert(self._hooks.index('filesystems'), 'encrypt') self._hooks.insert(self._hooks.index('filesystems'), 'encrypt')
def _handle_lvm_installation(self): def _handle_lvm_installation(self) -> None:
if not self._disk_config.lvm_config: if not self._disk_config.lvm_config:
return return
@ -891,7 +891,7 @@ class Installer:
if hasattr(plugin, 'on_install'): if hasattr(plugin, 'on_install'):
plugin.on_install(self) plugin.on_install(self)
def setup_swap(self, kind: str = 'zram'): def setup_swap(self, kind: str = 'zram') -> None:
if kind == 'zram': if kind == 'zram':
info(f"Setting up swap on zram") info(f"Setting up swap on zram")
self.pacman.strap('zram-generator') self.pacman.strap('zram-generator')
@ -1053,7 +1053,7 @@ class Installer:
root: disk.PartitionModification | disk.LvmVolume, root: disk.PartitionModification | disk.LvmVolume,
efi_partition: Optional[disk.PartitionModification], efi_partition: Optional[disk.PartitionModification],
uki_enabled: bool = False uki_enabled: bool = False
): ) -> None:
debug('Installing systemd bootloader') debug('Installing systemd bootloader')
self.pacman.strap('efibootmgr') self.pacman.strap('efibootmgr')
@ -1151,7 +1151,7 @@ class Installer:
boot_partition: disk.PartitionModification, boot_partition: disk.PartitionModification,
root: disk.PartitionModification | disk.LvmVolume, root: disk.PartitionModification | disk.LvmVolume,
efi_partition: Optional[disk.PartitionModification] efi_partition: Optional[disk.PartitionModification]
): ) -> None:
debug('Installing grub bootloader') debug('Installing grub bootloader')
self.pacman.strap('grub') # no need? self.pacman.strap('grub') # no need?
@ -1236,7 +1236,7 @@ class Installer:
boot_partition: disk.PartitionModification, boot_partition: disk.PartitionModification,
efi_partition: Optional[disk.PartitionModification], efi_partition: Optional[disk.PartitionModification],
root: disk.PartitionModification | disk.LvmVolume root: disk.PartitionModification | disk.LvmVolume
): ) -> None:
debug('Installing limine bootloader') debug('Installing limine bootloader')
self.pacman.strap('limine') self.pacman.strap('limine')
@ -1326,7 +1326,7 @@ Exec = /bin/sh -c "{hook_command}"
boot_partition: disk.PartitionModification, boot_partition: disk.PartitionModification,
root: disk.PartitionModification | disk.LvmVolume, root: disk.PartitionModification | disk.LvmVolume,
uki_enabled: bool = False uki_enabled: bool = False
): ) -> None:
debug('Installing efistub bootloader') debug('Installing efistub bootloader')
self.pacman.strap('efibootmgr') self.pacman.strap('efibootmgr')
@ -1375,7 +1375,7 @@ Exec = /bin/sh -c "{hook_command}"
self, self,
root: disk.PartitionModification | disk.LvmVolume, root: disk.PartitionModification | disk.LvmVolume,
efi_partition: Optional[disk.PartitionModification] efi_partition: Optional[disk.PartitionModification]
): ) -> None:
if not efi_partition or not efi_partition.mountpoint: if not efi_partition or not efi_partition.mountpoint:
raise ValueError(f'Could not detect ESP at mountpoint {self.target}') raise ValueError(f'Could not detect ESP at mountpoint {self.target}')
@ -1465,7 +1465,7 @@ Exec = /bin/sh -c "{hook_command}"
case Bootloader.Limine: case Bootloader.Limine:
self._add_limine_bootloader(boot_partition, efi_partition, root) self._add_limine_bootloader(boot_partition, efi_partition, root)
def add_additional_packages(self, packages: Union[str, List[str]]) -> bool: def add_additional_packages(self, packages: Union[str, List[str]]) -> None:
return self.pacman.strap(packages) return self.pacman.strap(packages)
def enable_sudo(self, entity: str, group: bool = False): def enable_sudo(self, entity: str, group: bool = False):
@ -1498,7 +1498,7 @@ Exec = /bin/sh -c "{hook_command}"
# Guarantees sudoer conf file recommended perms # Guarantees sudoer conf file recommended perms
os.chmod(Path(rule_file_name), 0o440) os.chmod(Path(rule_file_name), 0o440)
def create_users(self, users: Union[User, List[User]]): def create_users(self, users: Union[User, List[User]]) -> None:
if not isinstance(users, list): if not isinstance(users, list):
users = [users] users = [users]

View File

@ -45,7 +45,7 @@ class ManualNetworkConfig(ListManager):
def selected_action_display(self, nic: Nic) -> str: def selected_action_display(self, nic: Nic) -> str:
return nic.iface if nic.iface else '' return nic.iface if nic.iface else ''
def handle_action(self, action: str, entry: Optional[Nic], data: List[Nic]): def handle_action(self, action: str, entry: Optional[Nic], data: List[Nic]) -> list[Nic]:
if action == self._actions[0]: # add if action == self._actions[0]: # add
iface = self._select_iface(data) iface = self._select_iface(data)
if iface: if iface:

View File

@ -60,7 +60,7 @@ class LocaleMenu(AbstractSubMenu):
self._preset = locale_conf self._preset = locale_conf
super().__init__(data_store=data_store) super().__init__(data_store=data_store)
def setup_selection_menu_options(self): def setup_selection_menu_options(self) -> None:
self._menu_options['keyboard-layout'] = \ self._menu_options['keyboard-layout'] = \
Selector( Selector(
_('Keyboard layout'), _('Keyboard layout'),

View File

@ -30,14 +30,14 @@ class Luks2:
return Path(f'/dev/mapper/{self.mapper_name}') return Path(f'/dev/mapper/{self.mapper_name}')
return None return None
def __post_init__(self): def __post_init__(self) -> None:
if self.luks_dev_path is None: if self.luks_dev_path is None:
raise ValueError('Partition must have a path set') raise ValueError('Partition must have a path set')
def __enter__(self): def __enter__(self) -> None:
self.unlock(self.key_file) self.unlock(self.key_file)
def __exit__(self, *args: str, **kwargs: str): def __exit__(self, *args: str, **kwargs: str) -> None:
if self.auto_unmount: if self.auto_unmount:
self.lock() self.lock()
@ -130,7 +130,7 @@ class Luks2:
def is_unlocked(self) -> bool: def is_unlocked(self) -> bool:
return self.mapper_name is not None and Path(f'/dev/mapper/{self.mapper_name}').exists() return self.mapper_name is not None and Path(f'/dev/mapper/{self.mapper_name}').exists()
def unlock(self, key_file: Optional[Path] = None): def unlock(self, key_file: Optional[Path] = None) -> None:
""" """
Unlocks the luks device, an optional key file location for unlocking can be specified, Unlocks the luks device, an optional key file location for unlocking can be specified,
otherwise a default location for the key file will be used. otherwise a default location for the key file will be used.
@ -171,7 +171,7 @@ class Luks2:
if not self.mapper_dev or not self.mapper_dev.is_symlink(): if not self.mapper_dev or not self.mapper_dev.is_symlink():
raise DiskError(f'Failed to open luks2 device: {self.luks_dev_path}') raise DiskError(f'Failed to open luks2 device: {self.luks_dev_path}')
def lock(self): def lock(self) -> None:
disk.device_handler.umount(self.luks_dev_path) disk.device_handler.umount(self.luks_dev_path)
# Get crypt-information about the device by doing a reverse lookup starting with the partition path # Get crypt-information about the device by doing a reverse lookup starting with the partition path
@ -191,7 +191,7 @@ class Luks2:
self._mapper_dev = None self._mapper_dev = None
def create_keyfile(self, target_path: Path, override: bool = False): def create_keyfile(self, target_path: Path, override: bool = False) -> None:
""" """
Routine to create keyfiles, so it can be moved elsewhere Routine to create keyfiles, so it can be moved elsewhere
""" """
@ -221,7 +221,7 @@ class Luks2:
self._add_key(key_file) self._add_key(key_file)
self._crypttab(crypttab_path, kf_path, options=["luks", "key-slot=1"]) self._crypttab(crypttab_path, kf_path, options=["luks", "key-slot=1"])
def _add_key(self, key_file: Path): def _add_key(self, key_file: Path) -> None:
debug(f'Adding additional key-file {key_file}') debug(f'Adding additional key-file {key_file}')
command = f'/usr/bin/cryptsetup -q -v luksAddKey {self.luks_dev_path} {key_file}' command = f'/usr/bin/cryptsetup -q -v luksAddKey {self.luks_dev_path} {key_file}'

View File

@ -86,10 +86,10 @@ class Selector:
def do_store(self) -> bool: def do_store(self) -> bool:
return self._no_store is False return self._no_store is False
def set_enabled(self, status: bool = True): def set_enabled(self, status: bool = True) -> None:
self.enabled = status self.enabled = status
def update_description(self, description: str): def update_description(self, description: str) -> None:
self.description = description self.description = description
def menu_text(self, padding: int = 0) -> str: def menu_text(self, padding: int = 0) -> str:
@ -114,7 +114,7 @@ class Selector:
return f'{description} {current}' return f'{description} {current}'
def set_current_selection(self, current: Optional[Any]): def set_current_selection(self, current: Optional[Any]) -> None:
self.current_selection = current self.current_selection = current
def has_selection(self) -> bool: def has_selection(self) -> bool:
@ -138,7 +138,7 @@ class Selector:
def is_mandatory(self) -> bool: def is_mandatory(self) -> bool:
return self.mandatory return self.mandatory
def set_mandatory(self, value: bool): def set_mandatory(self, value: bool) -> None:
self.mandatory = value self.mandatory = value
@ -204,16 +204,16 @@ class AbstractMenu:
def translation_handler(self) -> TranslationHandler: def translation_handler(self) -> TranslationHandler:
return self._translation_handler return self._translation_handler
def _populate_default_values(self): def _populate_default_values(self) -> None:
for config_key, selector in self._menu_options.items(): for config_key, selector in self._menu_options.items():
if selector.default is not None and config_key not in self._data_store: if selector.default is not None and config_key not in self._data_store:
self._data_store[config_key] = selector.default self._data_store[config_key] = selector.default
def _sync_all(self): def _sync_all(self) -> None:
for key in self._menu_options.keys(): for key in self._menu_options.keys():
self._sync(key) self._sync(key)
def _sync(self, selector_name: str): def _sync(self, selector_name: str) -> None:
value = self._data_store.get(selector_name, None) value = self._data_store.get(selector_name, None)
selector = self._menu_options.get(selector_name, None) selector = self._menu_options.get(selector_name, None)
@ -222,13 +222,13 @@ class AbstractMenu:
elif selector is not None and selector.has_selection(): elif selector is not None and selector.has_selection():
self._data_store[selector_name] = selector.current_selection self._data_store[selector_name] = selector.current_selection
def setup_selection_menu_options(self): def setup_selection_menu_options(self) -> None:
""" Define the menu options. """ Define the menu options.
Menu options can be defined here in a subclass or done per program calling self.set_option() Menu options can be defined here in a subclass or done per program calling self.set_option()
""" """
return return
def pre_callback(self, selector_name): def pre_callback(self, selector_name) -> None:
""" will be called before each action in the menu """ """ will be called before each action in the menu """
return return
@ -236,14 +236,14 @@ class AbstractMenu:
""" will be called after each action in the menu """ """ will be called after each action in the menu """
return True return True
def exit_callback(self): def exit_callback(self) -> None:
""" will be called at the end of the processing of the menu """ """ will be called at the end of the processing of the menu """
return return
def _update_enabled_order(self, selector_name: str): def _update_enabled_order(self, selector_name: str) -> None:
self._enabled_order.append(selector_name) self._enabled_order.append(selector_name)
def enable(self, selector_name: str, mandatory: bool = False): def enable(self, selector_name: str, mandatory: bool = False) -> None:
""" activates menu options """ """ activates menu options """
if self._menu_options.get(selector_name, None): if self._menu_options.get(selector_name, None):
self._menu_options[selector_name].set_enabled(True) self._menu_options[selector_name].set_enabled(True)
@ -259,7 +259,7 @@ class AbstractMenu:
return preview() return preview()
return None return None
def _get_menu_text_padding(self, entries: List[Selector]): def _get_menu_text_padding(self, entries: List[Selector]) -> int:
return max([len(str(selection.description)) for selection in entries]) return max([len(str(selection.description)) for selection in entries])
def _find_selection(self, selection_name: str) -> Tuple[str, Selector]: def _find_selection(self, selection_name: str) -> Tuple[str, Selector]:

View File

@ -26,7 +26,7 @@ class MenuSelection:
@property @property
def single_value(self) -> Any: def single_value(self) -> Any:
return self.value # type: ignore return self.value
@property @property
def multi_value(self) -> List[Any]: def multi_value(self) -> List[Any]:
@ -251,7 +251,7 @@ class Menu(TerminalMenu):
except KeyboardInterrupt: except KeyboardInterrupt:
return MenuSelection(type_=MenuSelectionType.Reset) return MenuSelection(type_=MenuSelectionType.Reset)
def check_default(elem): def check_default(elem) -> str:
if self._default_option is not None and self._default_menu_value in elem: if self._default_option is not None and self._default_menu_value in elem:
return self._default_option return self._default_option
else: else:
@ -294,13 +294,13 @@ class Menu(TerminalMenu):
return selection return selection
def set_cursor_pos(self,pos :int): def set_cursor_pos(self,pos :int) -> None:
if pos and 0 < pos < len(self._menu_entries): if pos and 0 < pos < len(self._menu_entries):
self._view.active_menu_index = pos self._view.active_menu_index = pos
else: else:
self._view.active_menu_index = 0 # we define a default self._view.active_menu_index = 0 # we define a default
def set_cursor_pos_entry(self,value :str): def set_cursor_pos_entry(self,value :str) -> None:
pos = self._menu_entries.index(value) pos = self._menu_entries.index(value)
self.set_cursor_pos(pos) self.set_cursor_pos(pos)

View File

@ -7,7 +7,7 @@ class TextInput:
self._prompt = prompt self._prompt = prompt
self._prefilled_text = prefilled_text self._prefilled_text = prefilled_text
def _hook(self): def _hook(self) -> None:
readline.insert_text(self._prefilled_text) readline.insert_text(self._prefilled_text)
readline.redisplay() readline.redisplay()

View File

@ -203,7 +203,7 @@ class MirrorMenu(AbstractSubMenu):
super().__init__(data_store=data_store) super().__init__(data_store=data_store)
def setup_selection_menu_options(self): def setup_selection_menu_options(self) -> None:
self._menu_options['mirror_regions'] = \ self._menu_options['mirror_regions'] = \
Selector( Selector(
_('Mirror region'), _('Mirror region'),
@ -281,7 +281,7 @@ def select_mirror_regions(preset_values: Dict[str, List[str]] = {}) -> Dict[str,
return {} return {}
def select_custom_mirror(prompt: str = '', preset: List[CustomMirror] = []): def select_custom_mirror(prompt: str = '', preset: List[CustomMirror] = []) -> list[CustomMirror]:
custom_mirrors = CustomMirrorList(prompt, preset).run() custom_mirrors = CustomMirrorList(prompt, preset).run()
return custom_mirrors return custom_mirrors

View File

@ -38,7 +38,7 @@ class AudioConfiguration:
def install_audio_config( def install_audio_config(
self, self,
installation: Any installation: Any
): ) -> None:
info(f'Installing audio server: {self.audio.name}') info(f'Installing audio server: {self.audio.name}')
match self.audio: match self.audio:

View File

@ -98,7 +98,7 @@ class PackageSearchResult:
def __eq__(self, other) -> bool: def __eq__(self, other) -> bool:
return self.pkg_version == other.pkg_version return self.pkg_version == other.pkg_version
def __lt__(self, other) -> bool: def __lt__(self, other: 'PackageSearchResult') -> bool:
return self.pkg_version < other.pkg_version return self.pkg_version < other.pkg_version

View File

@ -124,7 +124,7 @@ class NetworkConfiguration:
self, self,
installation: Any, installation: Any,
profile_config: Optional[ProfileConfiguration] = None profile_config: Optional[ProfileConfiguration] = None
): ) -> None:
match self.type: match self.type:
case NicType.ISO: case NicType.ISO:
installation.copy_iso_network_config( installation.copy_iso_network_config(

View File

@ -13,14 +13,14 @@ class PasswordStrength(Enum):
STRONG = 'strong' STRONG = 'strong'
@property @property
def value(self): def value(self) -> str:
match self: match self:
case PasswordStrength.VERY_WEAK: return str(_('very weak')) case PasswordStrength.VERY_WEAK: return str(_('very weak'))
case PasswordStrength.WEAK: return str(_('weak')) case PasswordStrength.WEAK: return str(_('weak'))
case PasswordStrength.MODERATE: return str(_('moderate')) case PasswordStrength.MODERATE: return str(_('moderate'))
case PasswordStrength.STRONG: return str(_('strong')) case PasswordStrength.STRONG: return str(_('strong'))
def color(self): def color(self) -> str:
match self: match self:
case PasswordStrength.VERY_WEAK: return 'red' case PasswordStrength.VERY_WEAK: return 'red'
case PasswordStrength.WEAK: return 'red' case PasswordStrength.WEAK: return 'red'

View File

@ -19,20 +19,20 @@ class DownloadTimer():
''' '''
Context manager for timing downloads with timeouts. Context manager for timing downloads with timeouts.
''' '''
def __init__(self, timeout=5): def __init__(self, timeout: int = 5):
''' '''
Args: Args:
timeout: timeout:
The download timeout in seconds. The DownloadTimeout exception The download timeout in seconds. The DownloadTimeout exception
will be raised in the context after this many seconds. will be raised in the context after this many seconds.
''' '''
self.time = None self.time: Optional[float] = None
self.start_time = None self.start_time: Optional[float] = None
self.timeout = timeout self.timeout = timeout
self.previous_handler = None self.previous_handler = None
self.previous_timer = None self.previous_timer: Optional[int] = None
def raise_timeout(self, signl, frame): def raise_timeout(self, signl, frame) -> None:
''' '''
Raise the DownloadTimeout exception. Raise the DownloadTimeout exception.
''' '''
@ -40,13 +40,13 @@ class DownloadTimer():
def __enter__(self): def __enter__(self):
if self.timeout > 0: if self.timeout > 0:
self.previous_handler = signal.signal(signal.SIGALRM, self.raise_timeout) self.previous_handler = signal.signal(signal.SIGALRM, self.raise_timeout) # type: ignore[assignment]
self.previous_timer = signal.alarm(self.timeout) self.previous_timer = signal.alarm(self.timeout)
self.start_time = time.time() self.start_time = time.time()
return self return self
def __exit__(self, typ, value, traceback): def __exit__(self, typ, value, traceback) -> None:
if self.start_time: if self.start_time:
time_delta = time.time() - self.start_time time_delta = time.time() - self.start_time
signal.alarm(0) signal.alarm(0)
@ -136,7 +136,7 @@ def fetch_data_from_url(url: str, params: Optional[Dict] = None) -> str:
raise ValueError(f'Unable to fetch data from url: {url}') raise ValueError(f'Unable to fetch data from url: {url}')
def calc_checksum(icmp_packet): def calc_checksum(icmp_packet) -> int:
# Calculate the ICMP checksum # Calculate the ICMP checksum
checksum = 0 checksum = 0
for i in range(0, len(icmp_packet), 2): for i in range(0, len(icmp_packet), 2):
@ -158,7 +158,7 @@ def build_icmp(payload):
return struct.pack('!BBHHH', 8, 0, checksum, 0, 1) + payload return struct.pack('!BBHHH', 8, 0, checksum, 0, 1) + payload
def ping(hostname, timeout=5): def ping(hostname, timeout=5) -> int:
watchdog = select.epoll() watchdog = select.epoll()
started = time.time() started = time.time()
random_identifier = f'archinstall-{random.randint(1000, 9999)}'.encode() random_identifier = f'archinstall-{random.randint(1000, 9999)}'.encode()

View File

@ -146,7 +146,7 @@ class Journald:
log_adapter.log(level, message) log_adapter.log(level, message)
def _check_log_permissions(): def _check_log_permissions() -> None:
filename = storage.get('LOG_FILE', None) filename = storage.get('LOG_FILE', None)
log_dir = storage.get('LOG_PATH', Path('./')) log_dir = storage.get('LOG_PATH', Path('./'))
@ -258,7 +258,7 @@ def info(
bg: Optional[str] = None, bg: Optional[str] = None,
reset: bool = False, reset: bool = False,
font: List[Font] = [] font: List[Font] = []
): ) -> None:
log(*msgs, level=level, fg=fg, bg=bg, reset=reset, font=font) log(*msgs, level=level, fg=fg, bg=bg, reset=reset, font=font)
@ -269,7 +269,7 @@ def debug(
bg: Optional[str] = None, bg: Optional[str] = None,
reset: bool = False, reset: bool = False,
font: List[Font] = [] font: List[Font] = []
): ) -> None:
log(*msgs, level=level, fg=fg, bg=bg, reset=reset, font=font) log(*msgs, level=level, fg=fg, bg=bg, reset=reset, font=font)
@ -280,7 +280,7 @@ def error(
bg: Optional[str] = None, bg: Optional[str] = None,
reset: bool = False, reset: bool = False,
font: List[Font] = [] font: List[Font] = []
): ) -> None:
log(*msgs, level=level, fg=fg, bg=bg, reset=reset, font=font) log(*msgs, level=level, fg=fg, bg=bg, reset=reset, font=font)
@ -291,7 +291,7 @@ def warn(
bg: Optional[str] = None, bg: Optional[str] = None,
reset: bool = False, reset: bool = False,
font: List[Font] = [] font: List[Font] = []
): ) -> None:
log(*msgs, level=level, fg=fg, bg=bg, reset=reset, font=font) log(*msgs, level=level, fg=fg, bg=bg, reset=reset, font=font)
@ -302,7 +302,7 @@ def log(
bg: Optional[str] = None, bg: Optional[str] = None,
reset: bool = False, reset: bool = False,
font: List[Font] = [] font: List[Font] = []
): ) -> None:
# leave this check here as we need to setup the logging # leave this check here as we need to setup the logging
# right from the beginning when the modules are loaded # right from the beginning when the modules are loaded
_check_log_permissions() _check_log_permissions()

View File

@ -44,7 +44,7 @@ class Pacman:
return SysCommand(f'{default_cmd} {args}') return SysCommand(f'{default_cmd} {args}')
def ask(self, error_message: str, bail_message: str, func: Callable, *args, **kwargs): def ask(self, error_message: str, bail_message: str, func: Callable, *args, **kwargs) -> None:
while True: while True:
try: try:
func(*args, **kwargs) func(*args, **kwargs)
@ -55,7 +55,7 @@ class Pacman:
continue continue
raise RequirementError(f'{bail_message}: {err}') raise RequirementError(f'{bail_message}: {err}')
def sync(self): def sync(self) -> None:
if self.synced: if self.synced:
return return
self.ask( self.ask(
@ -67,7 +67,7 @@ class Pacman:
) )
self.synced = True self.synced = True
def strap(self, packages: Union[str, List[str]]): def strap(self, packages: Union[str, List[str]]) -> None:
self.sync() self.sync()
if isinstance(packages, str): if isinstance(packages, str):
packages = [packages] packages = [packages]

View File

@ -12,10 +12,10 @@ class Config:
self.chroot_path = target / "etc" / "pacman.conf" self.chroot_path = target / "etc" / "pacman.conf"
self.repos: List[Repo] = [] self.repos: List[Repo] = []
def enable(self, repo: Repo): def enable(self, repo: Repo) -> None:
self.repos.append(repo) self.repos.append(repo)
def apply(self): def apply(self) -> None:
if not self.repos: if not self.repos:
return return
@ -39,6 +39,6 @@ class Config:
else: else:
f.write(line) f.write(line)
def persist(self): def persist(self) -> None:
if self.repos: if self.repos:
copy2(self.path, self.chroot_path) copy2(self.path, self.chroot_path)

View File

@ -25,7 +25,7 @@ class ProfileMenu(AbstractSubMenu):
super().__init__(data_store=data_store) super().__init__(data_store=data_store)
def setup_selection_menu_options(self): def setup_selection_menu_options(self) -> None:
self._menu_options['profile'] = Selector( self._menu_options['profile'] = Selector(
_('Type'), _('Type'),
lambda x: self._select_profile(x), lambda x: self._select_profile(x),

View File

@ -24,9 +24,9 @@ if TYPE_CHECKING:
class ProfileHandler: class ProfileHandler:
def __init__(self): def __init__(self) -> None:
self._profiles_path: Path = storage['PROFILE'] self._profiles_path: Path = storage['PROFILE']
self._profiles = None self._profiles: Optional[list[Profile]] = None
# special variable to keep track of a profile url configuration # special variable to keep track of a profile url configuration
# it is merely used to be able to export the path again when a user # it is merely used to be able to export the path again when a user
@ -138,7 +138,7 @@ class ProfileHandler:
def _local_mac_addresses(self) -> List[str]: def _local_mac_addresses(self) -> List[str]:
return list(list_interfaces()) return list(list_interfaces())
def add_custom_profiles(self, profiles: Union[Profile, List[Profile]]): def add_custom_profiles(self, profiles: Union[Profile, List[Profile]]) -> None:
if not isinstance(profiles, list): if not isinstance(profiles, list):
profiles = [profiles] profiles = [profiles]
@ -147,7 +147,7 @@ class ProfileHandler:
self._verify_unique_profile_names(self.profiles) self._verify_unique_profile_names(self.profiles)
def remove_custom_profiles(self, profiles: Union[Profile, List[Profile]]): def remove_custom_profiles(self, profiles: Union[Profile, List[Profile]]) -> None:
if not isinstance(profiles, list): if not isinstance(profiles, list):
profiles = [profiles] profiles = [profiles]
@ -174,7 +174,7 @@ class ProfileHandler:
match_mac_addr_profiles = list(filter(lambda x: x.name in self._local_mac_addresses, tailored)) match_mac_addr_profiles = list(filter(lambda x: x.name in self._local_mac_addresses, tailored))
return match_mac_addr_profiles return match_mac_addr_profiles
def install_greeter(self, install_session: 'Installer', greeter: GreeterType): def install_greeter(self, install_session: 'Installer', greeter: GreeterType) -> None:
packages = [] packages = []
service = None service = None
@ -213,7 +213,7 @@ class ProfileHandler:
with open(path, 'w') as file: with open(path, 'w') as file:
file.write(filedata) file.write(filedata)
def install_gfx_driver(self, install_session: 'Installer', driver: GfxDriver): def install_gfx_driver(self, install_session: 'Installer', driver: GfxDriver) -> None:
debug(f'Installing GFX driver: {driver.value}') debug(f'Installing GFX driver: {driver.value}')
if driver in [GfxDriver.NvidiaOpenKernel, GfxDriver.NvidiaProprietary]: if driver in [GfxDriver.NvidiaOpenKernel, GfxDriver.NvidiaProprietary]:
@ -232,7 +232,7 @@ class ProfileHandler:
pkg_names = [p.value for p in driver_pkgs] pkg_names = [p.value for p in driver_pkgs]
install_session.add_additional_packages(pkg_names) install_session.add_additional_packages(pkg_names)
def install_profile_config(self, install_session: 'Installer', profile_config: ProfileConfiguration): def install_profile_config(self, install_session: 'Installer', profile_config: ProfileConfiguration) -> None:
profile = profile_config.profile profile = profile_config.profile
if not profile: if not profile:
@ -246,7 +246,7 @@ class ProfileHandler:
if profile_config.greeter: if profile_config.greeter:
self.install_greeter(install_session, profile_config.greeter) self.install_greeter(install_session, profile_config.greeter)
def _import_profile_from_url(self, url: str): def _import_profile_from_url(self, url: str) -> None:
""" """
Import default_profiles from a url path Import default_profiles from a url path
""" """

View File

@ -39,7 +39,7 @@ class Language:
class TranslationHandler: class TranslationHandler:
def __init__(self): def __init__(self) -> None:
self._base_pot = 'base.pot' self._base_pot = 'base.pot'
self._languages = 'languages.json' self._languages = 'languages.json'
@ -145,7 +145,7 @@ class TranslationHandler:
except Exception: except Exception:
raise ValueError(f'No language with abbreviation "{abbr}" found') raise ValueError(f'No language with abbreviation "{abbr}" found')
def activate(self, language: Language): def activate(self, language: Language) -> None:
""" """
Set the provided language as the current translation Set the provided language as the current translation
""" """
@ -204,6 +204,6 @@ class DeferredTranslation:
return self.message.format(*args) return self.message.format(*args)
@classmethod @classmethod
def install(cls): def install(cls) -> None:
import builtins import builtins
builtins._ = cls # type: ignore builtins._ = cls # type: ignore

View File

@ -20,7 +20,7 @@ def prompt_dir(text: str, header: Optional[str] = None) -> Path:
info(_('Not a valid directory: {}').format(dest_path)) info(_('Not a valid directory: {}').format(dest_path))
def is_subpath(first: Path, second: Path): def is_subpath(first: Path, second: Path) -> bool:
""" """
Check if _first_ a subpath of _second_ Check if _first_ a subpath of _second_
""" """