Full mypy compliance and small fixes (#1777)

* Fix mypy compliance

---------

Co-authored-by: Daniel Girtler <girtler.daniel@gmail.com>
This commit is contained in:
Daniel Girtler 2023-05-04 00:36:46 +10:00 committed by GitHub
parent e78ddb03e1
commit ec4ecbcb7a
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
22 changed files with 453 additions and 428 deletions

View File

@ -15,4 +15,4 @@ jobs:
# one day this will be enabled # one day this will be enabled
# run: mypy --strict --module archinstall || exit 0 # run: mypy --strict --module archinstall || exit 0
- name: run mypy - name: run mypy
run: mypy --config-file mypy.ini run: mypy --config-file pyproject.toml

View File

@ -233,7 +233,8 @@ def post_process_arguments(arguments):
log(f"Warning: --debug mode will write certain credentials to {storage['LOG_PATH']}/{storage['LOG_FILE']}!", fg="red", level=logging.WARNING) log(f"Warning: --debug mode will write certain credentials to {storage['LOG_PATH']}/{storage['LOG_FILE']}!", fg="red", level=logging.WARNING)
if arguments.get('plugin', None): if arguments.get('plugin', None):
load_plugin(arguments['plugin']) path = arguments['plugin']
load_plugin(path)
load_config() load_config()

View File

@ -269,13 +269,13 @@ class DeviceHandler(object):
# partition will be encrypted # partition will be encrypted
if enc_conf is not None and part_mod in enc_conf.partitions: if enc_conf is not None and part_mod in enc_conf.partitions:
self._perform_enc_formatting( self._perform_enc_formatting(
part_mod.real_dev_path, part_mod.safe_dev_path,
part_mod.mapper_name, part_mod.mapper_name,
part_mod.fs_type, part_mod.fs_type,
enc_conf enc_conf
) )
else: else:
self._perform_formatting(part_mod.fs_type, part_mod.real_dev_path) self._perform_formatting(part_mod.fs_type, part_mod.safe_dev_path)
def _perform_partitioning( def _perform_partitioning(
self, self,
@ -287,11 +287,11 @@ class DeviceHandler(object):
# when we require a delete and the partition to be (re)created # when we require a delete and the partition to be (re)created
# already exists then we have to delete it first # already exists then we have to delete it first
if requires_delete and part_mod.status in [ModificationStatus.Modify, ModificationStatus.Delete]: if requires_delete and part_mod.status in [ModificationStatus.Modify, ModificationStatus.Delete]:
log(f'Delete existing partition: {part_mod.real_dev_path}', level=logging.INFO) log(f'Delete existing partition: {part_mod.safe_dev_path}', level=logging.INFO)
part_info = self.find_partition(part_mod.real_dev_path) part_info = self.find_partition(part_mod.safe_dev_path)
if not part_info: if not part_info:
raise DiskError(f'No partition for dev path found: {part_mod.real_dev_path}') raise DiskError(f'No partition for dev path found: {part_mod.safe_dev_path}')
disk.deletePartition(part_info.partition) disk.deletePartition(part_info.partition)
disk.commit() disk.commit()
@ -375,7 +375,7 @@ class DeviceHandler(object):
part_mod: PartitionModification, part_mod: PartitionModification,
enc_conf: Optional['DiskEncryption'] = None enc_conf: Optional['DiskEncryption'] = None
): ):
log(f'Creating subvolumes: {part_mod.real_dev_path}', level=logging.INFO) log(f'Creating subvolumes: {part_mod.safe_dev_path}', level=logging.INFO)
luks_handler = None luks_handler = None
@ -385,7 +385,7 @@ class DeviceHandler(object):
raise ValueError('No device path specified for modification') raise ValueError('No device path specified for modification')
luks_handler = self.unlock_luks2_dev( luks_handler = self.unlock_luks2_dev(
part_mod.real_dev_path, part_mod.safe_dev_path,
part_mod.mapper_name, part_mod.mapper_name,
enc_conf.encryption_password enc_conf.encryption_password
) )
@ -395,7 +395,7 @@ class DeviceHandler(object):
self.mount(luks_handler.mapper_dev, self._TMP_BTRFS_MOUNT, create_target_mountpoint=True) self.mount(luks_handler.mapper_dev, self._TMP_BTRFS_MOUNT, create_target_mountpoint=True)
else: else:
self.mount(part_mod.real_dev_path, self._TMP_BTRFS_MOUNT, create_target_mountpoint=True) self.mount(part_mod.safe_dev_path, self._TMP_BTRFS_MOUNT, create_target_mountpoint=True)
for sub_vol in part_mod.btrfs_subvols: for sub_vol in part_mod.btrfs_subvols:
log(f'Creating subvolume: {sub_vol.name}', level=logging.DEBUG) log(f'Creating subvolume: {sub_vol.name}', level=logging.DEBUG)
@ -419,7 +419,7 @@ class DeviceHandler(object):
self.umount(luks_handler.mapper_dev) self.umount(luks_handler.mapper_dev)
luks_handler.lock() luks_handler.lock()
else: else:
self.umount(part_mod.real_dev_path) self.umount(part_mod.safe_dev_path)
def unlock_luks2_dev(self, dev_path: Path, mapper_name: str, enc_password: str) -> Luks2: def unlock_luks2_dev(self, dev_path: Path, mapper_name: str, enc_password: str) -> Luks2:
luks_handler = Luks2(dev_path, mapper_name=mapper_name, password=enc_password) luks_handler = Luks2(dev_path, mapper_name=mapper_name, password=enc_password)

View File

@ -603,7 +603,7 @@ class PartitionModification:
return '' return ''
@property @property
def real_dev_path(self) -> Path: def safe_dev_path(self) -> Path:
if self.dev_path is None: if self.dev_path is None:
raise ValueError('Device path was not set') raise ValueError('Device path was not set')
return self.dev_path return self.dev_path

View File

@ -2,7 +2,8 @@ from __future__ import annotations
import getpass import getpass
import logging import logging
from typing import List from pathlib import Path
from typing import List, Optional
from .device_model import PartitionModification, Fido2Device from .device_model import PartitionModification, Fido2Device
from ..general import SysCommand, SysCommandWorker, clear_vt100_escape_codes from ..general import SysCommand, SysCommandWorker, clear_vt100_escape_codes
@ -36,12 +37,12 @@ class Fido2:
# to prevent continous reloading which will slow # to prevent continous reloading which will slow
# down moving the cursor in the menu # down moving the cursor in the menu
if not cls._loaded or reload: if not cls._loaded or reload:
ret = SysCommand(f"systemd-cryptenroll --fido2-device=list").decode('UTF-8') ret: Optional[str] = SysCommand(f"systemd-cryptenroll --fido2-device=list").decode('UTF-8')
if not ret: if not ret:
log('Unable to retrieve fido2 devices', level=logging.ERROR) log('Unable to retrieve fido2 devices', level=logging.ERROR)
return [] return []
fido_devices = clear_vt100_escape_codes(ret) fido_devices: str = clear_vt100_escape_codes(ret) # type: ignore
manufacturer_pos = 0 manufacturer_pos = 0
product_pos = 0 product_pos = 0
@ -58,7 +59,7 @@ class Fido2:
product = line[product_pos:] product = line[product_pos:]
devices.append( devices.append(
Fido2Device(path, manufacturer, product) Fido2Device(Path(path), manufacturer, product)
) )
cls._loaded = True cls._loaded = True

View File

@ -19,9 +19,15 @@ import pathlib
from datetime import datetime, date from datetime import datetime, date
from typing import Callable, Optional, Dict, Any, List, Union, Iterator, TYPE_CHECKING from typing import Callable, Optional, Dict, Any, List, Union, Iterator, TYPE_CHECKING
from .exceptions import RequirementError, SysCallError
from .output import log
from .storage import storage
if TYPE_CHECKING: if TYPE_CHECKING:
from .installer import Installer from .installer import Installer
if sys.platform == 'linux': if sys.platform == 'linux':
from select import epoll, EPOLLIN, EPOLLHUP from select import epoll, EPOLLIN, EPOLLHUP
else: else:
@ -53,30 +59,15 @@ else:
except OSError: except OSError:
return [] return []
from .exceptions import RequirementError, SysCallError
from .output import log
from .storage import storage
def gen_uid(entropy_length :int = 256) -> str: def gen_uid(entropy_length :int = 256) -> str:
return hashlib.sha512(os.urandom(entropy_length)).hexdigest() return hashlib.sha512(os.urandom(entropy_length)).hexdigest()
def generate_password(length :int = 64) -> str: def generate_password(length :int = 64) -> str:
haystack = string.printable # digits, ascii_letters, punctiation (!"#$[] etc) and whitespace haystack = string.printable # digits, ascii_letters, punctiation (!"#$[] etc) and whitespace
return ''.join(secrets.choice(haystack) for i in range(length)) return ''.join(secrets.choice(haystack) for i in range(length))
def multisplit(s :str, splitters :List[str]) -> str:
s = [s, ]
for key in splitters:
ns = []
for obj in s:
x = obj.split(key)
for index, part in enumerate(x):
if len(part):
ns.append(part)
if index < len(x) - 1:
ns.append(key)
s = ns
return s
def locate_binary(name :str) -> str: def locate_binary(name :str) -> str:
for PATH in os.environ['PATH'].split(':'): for PATH in os.environ['PATH'].split(':'):
@ -88,20 +79,20 @@ def locate_binary(name :str) -> str:
raise RequirementError(f"Binary {name} does not exist.") raise RequirementError(f"Binary {name} does not exist.")
def clear_vt100_escape_codes(data :Union[bytes, str]):
def clear_vt100_escape_codes(data :Union[bytes, str]) -> Union[bytes, str]:
# https://stackoverflow.com/a/43627833/929999 # https://stackoverflow.com/a/43627833/929999
if type(data) == bytes: if type(data) == bytes:
vt100_escape_regex = bytes(r'\x1B\[[?0-9;]*[a-zA-Z]', 'UTF-8') byte_vt100_escape_regex = bytes(r'\x1B\[[?0-9;]*[a-zA-Z]', 'UTF-8')
else: data = re.sub(byte_vt100_escape_regex, b'', data)
elif type(data) == str:
vt100_escape_regex = r'\x1B\[[?0-9;]*[a-zA-Z]' vt100_escape_regex = r'\x1B\[[?0-9;]*[a-zA-Z]'
data = re.sub(vt100_escape_regex, '', data)
for match in re.findall(vt100_escape_regex, data, re.IGNORECASE): else:
data = data.replace(match, '' if type(data) == str else b'') raise ValueError(f'Unsupported data type: {type(data)}')
return data return data
def json_dumps(*args :str, **kwargs :str) -> str:
return json.dumps(*args, **{**kwargs, 'cls': JSON})
class JsonEncoder: class JsonEncoder:
@staticmethod @staticmethod
@ -245,10 +236,12 @@ class SysCommandWorker:
def __iter__(self, *args :str, **kwargs :Dict[str, Any]) -> Iterator[bytes]: def __iter__(self, *args :str, **kwargs :Dict[str, Any]) -> Iterator[bytes]:
for line in self._trace_log[self._trace_log_pos:self._trace_log.rfind(b'\n')].split(b'\n'): for line in self._trace_log[self._trace_log_pos:self._trace_log.rfind(b'\n')].split(b'\n'):
if line: if line:
if self.remove_vt100_escape_codes_from_lines: escaped_line: bytes = line
line = clear_vt100_escape_codes(line)
yield line + b'\n' if self.remove_vt100_escape_codes_from_lines:
escaped_line = clear_vt100_escape_codes(line) # type: ignore
yield escaped_line + b'\n'
self._trace_log_pos = self._trace_log.rfind(b'\n') self._trace_log_pos = self._trace_log.rfind(b'\n')
@ -279,7 +272,11 @@ class SysCommandWorker:
log(args[1], level=logging.DEBUG, fg='red') log(args[1], level=logging.DEBUG, fg='red')
if self.exit_code != 0: if self.exit_code != 0:
raise SysCallError(f"{self.cmd} exited with abnormal exit code [{self.exit_code}]: {self._trace_log[-500:]}", self.exit_code, worker=self) raise SysCallError(
f"{self.cmd} exited with abnormal exit code [{self.exit_code}]: {str(self._trace_log[-500:])}",
self.exit_code,
worker=self
)
def is_alive(self) -> bool: def is_alive(self) -> bool:
self.poll() self.poll()
@ -328,7 +325,7 @@ class SysCommandWorker:
change_perm = True change_perm = True
with peak_logfile.open("a") as peek_output_log: with peak_logfile.open("a") as peek_output_log:
peek_output_log.write(output) peek_output_log.write(str(output))
if change_perm: if change_perm:
os.chmod(str(peak_logfile), stat.S_IRUSR | stat.S_IWUSR | stat.S_IRGRP) os.chmod(str(peak_logfile), stat.S_IRUSR | stat.S_IWUSR | stat.S_IRGRP)
@ -497,7 +494,7 @@ class SysCommand:
clears any printed output if ``.peek_output=True``. clears any printed output if ``.peek_output=True``.
""" """
if self.session: if self.session:
return self.session return True
with SysCommandWorker( with SysCommandWorker(
self.cmd, self.cmd,

View File

@ -2,7 +2,7 @@ import os
import logging import logging
from functools import partial from functools import partial
from pathlib import Path from pathlib import Path
from typing import Iterator, Optional, Union from typing import Iterator, Optional, Dict
from .general import SysCommand from .general import SysCommand
from .networking import list_interfaces, enrich_iface_types from .networking import list_interfaces, enrich_iface_types
@ -61,15 +61,15 @@ AVAILABLE_GFX_DRIVERS = {
"VMware / VirtualBox (open-source)": ["mesa", "xf86-video-vmware"], "VMware / VirtualBox (open-source)": ["mesa", "xf86-video-vmware"],
} }
CPUINFO = Path("/proc/cpuinfo")
MEMINFO = Path("/proc/meminfo")
def cpuinfo() -> Iterator[dict[str, str]]: def cpuinfo() -> Iterator[dict[str, str]]:
"""Yields information about the CPUs of the system.""" """
cpu = {} Yields information about the CPUs of the system
"""
cpu_info_path = Path("/proc/cpuinfo")
cpu: Dict[str, str] = {}
with CPUINFO.open() as file: with cpu_info_path.open() as file:
for line in file: for line in file:
if not (line := line.strip()): if not (line := line.strip()):
yield cpu yield cpu
@ -80,24 +80,31 @@ def cpuinfo() -> Iterator[dict[str, str]]:
cpu[key.strip()] = value.strip() cpu[key.strip()] = value.strip()
def meminfo(key: Optional[str] = None) -> Union[dict[str, int], Optional[int]]: def all_meminfo() -> Dict[str, int]:
"""Returns a dict with memory info if called with no args """
Returns a dict with memory info if called with no args
or the value of the given key of said dict. or the value of the given key of said dict.
""" """
with MEMINFO.open() as file: mem_info_path = Path("/proc/meminfo")
mem_info = { mem_info: Dict[str, int] = {}
(columns := line.strip().split())[0].rstrip(':'): int(columns[1])
for line in file
}
if key is None: with mem_info_path.open() as file:
return mem_info for line in file:
key, value = line.strip().split(':')
num = value.split()[0]
mem_info[key] = int(num)
return mem_info.get(key) return mem_info
def meminfo_for_key(key: str) -> int:
info = all_meminfo()
return info[key]
def has_wifi() -> bool: def has_wifi() -> bool:
return 'WIRELESS' in enrich_iface_types(list_interfaces().values()).values() ifaces = list(list_interfaces().values())
return 'WIRELESS' in enrich_iface_types(ifaces).values()
def has_cpu_vendor(vendor_id: str) -> bool: def has_cpu_vendor(vendor_id: str) -> bool:
@ -160,15 +167,15 @@ def product_name() -> Optional[str]:
def mem_available() -> Optional[int]: def mem_available() -> Optional[int]:
return meminfo('MemAvailable') return meminfo_for_key('MemAvailable')
def mem_free() -> Optional[int]: def mem_free() -> Optional[int]:
return meminfo('MemFree') return meminfo_for_key('MemFree')
def mem_total() -> Optional[int]: def mem_total() -> Optional[int]:
return meminfo('MemTotal') return meminfo_for_key('MemTotal')
def virtualization() -> Optional[str]: def virtualization() -> Optional[str]:
@ -182,9 +189,9 @@ def virtualization() -> Optional[str]:
def is_vm() -> bool: def is_vm() -> bool:
try: try:
return b"none" not in b"".join(SysCommand("systemd-detect-virt")).lower() result = SysCommand("systemd-detect-virt")
return b"none" not in b"".join(result).lower()
except SysCallError as error: except SysCallError as error:
log(f"System is not running in a VM: {error}", level=logging.DEBUG) log(f"System is not running in a VM: {error}", level=logging.DEBUG)
return None
# TODO: Add more identifiers return False

View File

@ -7,7 +7,7 @@ import shutil
import subprocess import subprocess
import time import time
from pathlib import Path from pathlib import Path
from typing import Any, Iterator, List, Mapping, Optional, TYPE_CHECKING, Union, Dict from typing import Any, List, Optional, TYPE_CHECKING, Union, Dict, Callable, Iterable
from . import disk from . import disk
from .exceptions import DiskError, ServiceException, RequirementError, HardwareIncompatibilityError, SysCallError from .exceptions import DiskError, ServiceException, RequirementError, HardwareIncompatibilityError, SysCallError
@ -36,32 +36,6 @@ __packages__ = ["base", "base-devel", "linux-firmware", "linux", "linux-lts", "l
__accessibility_packages__ = ["brltty", "espeakup", "alsa-utils"] __accessibility_packages__ = ["brltty", "espeakup", "alsa-utils"]
class InstallationFile:
def __init__(self, installation :'Installer', filename :str, owner :str, mode :str = "w"):
self.installation = installation
self.filename = filename
self.owner = owner
self.mode = mode
self.fh = None
def __enter__(self) -> 'InstallationFile':
self.fh = open(self.filename, self.mode)
return self
def __exit__(self, *args :str) -> None:
self.fh.close()
self.installation.chown(self.owner, self.filename)
def write(self, data: Union[str, bytes]) -> int:
return self.fh.write(data)
def read(self, *args) -> Union[str, bytes]:
return self.fh.read(*args)
# def poll(self, *args) -> bool:
# return self.fh.poll(*args)
def accessibility_tools_in_use() -> bool: def accessibility_tools_in_use() -> bool:
return os.system('systemctl is-active --quiet espeakup.service') == 0 return os.system('systemctl is-active --quiet espeakup.service') == 0
@ -106,15 +80,17 @@ class Installer:
self.kernels = kernels self.kernels = kernels
self._disk_config = disk_config self._disk_config = disk_config
self._disk_encryption = disk_encryption
if self._disk_encryption is None: if disk_encryption is None:
self._disk_encryption = disk.DiskEncryption(disk.EncryptionType.NoEncryption) self._disk_encryption = disk.DiskEncryption(disk.EncryptionType.NoEncryption)
else:
self._disk_encryption = disk_encryption
self.target: Path = target
self.target = target
self.init_time = time.strftime('%Y-%m-%d_%H-%M-%S') self.init_time = time.strftime('%Y-%m-%d_%H-%M-%S')
self.milliseconds = int(str(time.time()).split('.')[1]) self.milliseconds = int(str(time.time()).split('.')[1])
self.helper_flags = {'base': False, 'bootloader': False} self.helper_flags: Dict[str, Any] = {'base': False, 'bootloader': None}
self.base_packages = base_packages self.base_packages = base_packages
for kernel in self.kernels: for kernel in self.kernels:
@ -124,31 +100,33 @@ class Installer:
if accessibility_tools_in_use(): if accessibility_tools_in_use():
self.base_packages.extend(__accessibility_packages__) self.base_packages.extend(__accessibility_packages__)
self.post_base_install = [] self.post_base_install: List[Callable] = []
# TODO: Figure out which one of these two we'll use.. But currently we're mixing them.. # TODO: Figure out which one of these two we'll use.. But currently we're mixing them..
storage['session'] = self storage['session'] = self
storage['installation_session'] = self storage['installation_session'] = self
self.MODULES = [] self.modules: List[str] = []
self.BINARIES = [] self._binaries: List[str] = []
self.FILES = [] self._files: List[str] = []
# systemd, sd-vconsole and sd-encrypt will be replaced by udev, keymap and encrypt # systemd, sd-vconsole and sd-encrypt will be replaced by udev, keymap and encrypt
# if HSM is not used to encrypt the root volume. Check mkinitcpio() function for that override. # if HSM is not used to encrypt the root volume. Check mkinitcpio() function for that override.
self.HOOKS = ["base", "systemd", "autodetect", "keyboard", "sd-vconsole", "modconf", "block", "filesystems", "fsck"] self._hooks: List[str] = [
self.KERNEL_PARAMS = [] "base", "systemd", "autodetect", "keyboard",
self.FSTAB_ENTRIES = [] "sd-vconsole", "modconf", "block", "filesystems", "fsck"
]
self._kernel_params: List[str] = []
self._fstab_entries: List[str] = []
self._zram_enabled = False self._zram_enabled = False
def __enter__(self, *args: str, **kwargs: str) -> 'Installer': def __enter__(self) -> 'Installer':
return self return self
def __exit__(self, *args :str, **kwargs :str) -> bool: def __exit__(self, exc_type, exc_val, exc_tb):
# TODO: https://stackoverflow.com/questions/28157929/how-to-safely-handle-an-exception-inside-a-context-manager if exc_type is not None:
log(exc_val, fg='red', level=logging.ERROR)
if len(args) >= 2 and args[1]:
self.log(args[1], level=logging.ERROR, fg='red')
self.sync_log_to_install_medium() self.sync_log_to_install_medium()
@ -156,7 +134,7 @@ class Installer:
# and then reboot, and a identical log file will be found in the ISO medium anyway. # and then reboot, and a identical log file will be found in the ISO medium anyway.
print(_("[!] A log file has been created here: {}").format(os.path.join(storage['LOG_PATH'], storage['LOG_FILE']))) print(_("[!] A log file has been created here: {}").format(os.path.join(storage['LOG_PATH'], storage['LOG_FILE'])))
print(_(" Please submit this issue (and file) to https://github.com/archlinux/archinstall/issues")) print(_(" Please submit this issue (and file) to https://github.com/archlinux/archinstall/issues"))
raise args[1] raise exc_val
if not (missing_steps := self.post_install_check()): if not (missing_steps := self.post_install_check()):
self.log('Installation completed without any errors. You may now reboot.', fg='green', level=logging.INFO) self.log('Installation completed without any errors. You may now reboot.', fg='green', level=logging.INFO)
@ -164,6 +142,7 @@ class Installer:
return True return True
else: else:
self.log('Some required steps were not successfully installed/configured before leaving the installer:', fg='red', level=logging.WARNING) self.log('Some required steps were not successfully installed/configured before leaving the installer:', fg='red', level=logging.WARNING)
for step in missing_steps: for step in missing_steps:
self.log(f' - {step}', fg='red', level=logging.WARNING) self.log(f' - {step}', fg='red', level=logging.WARNING)
@ -247,31 +226,32 @@ class Installer:
luks_handlers = {} luks_handlers = {}
for part_mod in partitions: for part_mod in partitions:
luks_handler = disk.device_handler.unlock_luks2_dev( if part_mod.mapper_name and part_mod.dev_path:
part_mod.dev_path, luks_handler = disk.device_handler.unlock_luks2_dev(
part_mod.mapper_name, part_mod.dev_path,
self._disk_encryption.encryption_password part_mod.mapper_name,
) self._disk_encryption.encryption_password
luks_handlers[part_mod] = luks_handler )
luks_handlers[part_mod] = luks_handler
return luks_handlers return luks_handlers
def _mount_partition(self, part_mod: disk.PartitionModification): def _mount_partition(self, part_mod: disk.PartitionModification):
# it would be none if it's btrfs as the subvolumes will have the mountpoints defined # it would be none if it's btrfs as the subvolumes will have the mountpoints defined
if part_mod.mountpoint is not None: if part_mod.mountpoint and part_mod.dev_path:
target = self.target / part_mod.relative_mountpoint target = self.target / part_mod.relative_mountpoint
disk.device_handler.mount(part_mod.dev_path, target, options=part_mod.mount_options) disk.device_handler.mount(part_mod.dev_path, target, options=part_mod.mount_options)
if part_mod.fs_type == disk.FilesystemType.Btrfs: if part_mod.fs_type == disk.FilesystemType.Btrfs and part_mod.dev_path:
self._mount_btrfs_subvol(part_mod.dev_path, part_mod.btrfs_subvols) self._mount_btrfs_subvol(part_mod.dev_path, part_mod.btrfs_subvols)
def _mount_luks_partiton(self, part_mod: disk.PartitionModification, luks_handler: Luks2): def _mount_luks_partiton(self, part_mod: disk.PartitionModification, luks_handler: Luks2):
# it would be none if it's btrfs as the subvolumes will have the mountpoints defined # it would be none if it's btrfs as the subvolumes will have the mountpoints defined
if part_mod.mountpoint is not None: if part_mod.mountpoint and luks_handler.mapper_dev:
target = self.target / part_mod.relative_mountpoint target = self.target / part_mod.relative_mountpoint
disk.device_handler.mount(luks_handler.mapper_dev, target, options=part_mod.mount_options) disk.device_handler.mount(luks_handler.mapper_dev, target, options=part_mod.mount_options)
if part_mod.fs_type == disk.FilesystemType.Btrfs: if part_mod.fs_type == disk.FilesystemType.Btrfs and luks_handler.mapper_dev:
self._mount_btrfs_subvol(luks_handler.mapper_dev, part_mod.btrfs_subvols) self._mount_btrfs_subvol(luks_handler.mapper_dev, part_mod.btrfs_subvols)
def _mount_btrfs_subvol(self, dev_path: Path, subvolumes: List[disk.SubvolumeModification]): def _mount_btrfs_subvol(self, dev_path: Path, subvolumes: List[disk.SubvolumeModification]):
@ -346,15 +326,15 @@ class Installer:
SysCommand(f'chmod 0600 {self.target}{file}') SysCommand(f'chmod 0600 {self.target}{file}')
SysCommand(f'mkswap {self.target}{file}') SysCommand(f'mkswap {self.target}{file}')
self.FSTAB_ENTRIES.append(f'{file} none swap defaults 0 0') self._fstab_entries.append(f'{file} none swap defaults 0 0')
if enable_resume: if enable_resume:
resume_uuid = SysCommand(f'findmnt -no UUID -T {self.target}{file}').decode('UTF-8').strip() resume_uuid = SysCommand(f'findmnt -no UUID -T {self.target}{file}').decode('UTF-8').strip()
resume_offset = SysCommand(f'/usr/bin/filefrag -v {self.target}{file}').decode('UTF-8').split('0:', 1)[1].split(":", 1)[1].split("..", 1)[0].strip() resume_offset = SysCommand(f'/usr/bin/filefrag -v {self.target}{file}').decode('UTF-8').split('0:', 1)[1].split(":", 1)[1].split("..", 1)[0].strip()
self.HOOKS.append('resume') self._hooks.append('resume')
self.KERNEL_PARAMS.append(f'resume=UUID={resume_uuid}') self._kernel_params.append(f'resume=UUID={resume_uuid}')
self.KERNEL_PARAMS.append(f'resume_offset={resume_offset}') self._kernel_params.append(f'resume_offset={resume_offset}')
def post_install_check(self, *args :str, **kwargs :str) -> List[str]: def post_install_check(self, *args :str, **kwargs :str) -> List[str]:
return [step for step, flag in self.helper_flags.items() if flag is False] return [step for step, flag in self.helper_flags.items() if flag is False]
@ -411,7 +391,7 @@ class Installer:
else: else:
pacman_conf.write(line) pacman_conf.write(line)
def pacstrap(self, *packages: Union[str, List[str]], **kwargs :str) -> bool: def _pacstrap(self, packages: Union[str, List[str]]) -> bool:
if type(packages[0]) in (list, tuple): if type(packages[0]) in (list, tuple):
packages = packages[0] packages = packages[0]
@ -430,9 +410,9 @@ class Installer:
if storage['arguments'].get('silent', False) is False: if storage['arguments'].get('silent', False) is False:
if input('Would you like to re-try this download? (Y/n): ').lower().strip() in ('', 'y'): if input('Would you like to re-try this download? (Y/n): ').lower().strip() in ('', 'y'):
return self.pacstrap(*packages, **kwargs) return self._pacstrap(packages)
raise RequirementError(f'Could not sync mirrors: {error}', level=logging.ERROR, fg="red") raise RequirementError(f'Could not sync mirrors: {error}')
try: try:
SysCommand(f'/usr/bin/pacstrap -C /etc/pacman.conf -K {self.target} {" ".join(packages)} --noconfirm', peek_output=True) SysCommand(f'/usr/bin/pacstrap -C /etc/pacman.conf -K {self.target} {" ".join(packages)} --noconfirm', peek_output=True)
@ -442,40 +422,44 @@ class Installer:
if storage['arguments'].get('silent', False) is False: if storage['arguments'].get('silent', False) is False:
if input('Would you like to re-try this download? (Y/n): ').lower().strip() in ('', 'y'): if input('Would you like to re-try this download? (Y/n): ').lower().strip() in ('', 'y'):
return self.pacstrap(*packages, **kwargs) return self._pacstrap(packages)
raise RequirementError("Pacstrap failed. See /var/log/archinstall/install.log or above message for error details.") raise RequirementError("Pacstrap failed. See /var/log/archinstall/install.log or above message for error details.")
def set_mirrors(self, mirrors :Mapping[str, Iterator[str]]) -> None: def set_mirrors(self, mirrors: Dict[str, Iterable[str]]):
for plugin in plugins.values(): for plugin in plugins.values():
if hasattr(plugin, 'on_mirrors'): if hasattr(plugin, 'on_mirrors'):
if result := plugin.on_mirrors(mirrors): if result := plugin.on_mirrors(mirrors):
mirrors = result mirrors = result
return use_mirrors(mirrors, destination=f'{self.target}/etc/pacman.d/mirrorlist') destination = f'{self.target}/etc/pacman.d/mirrorlist'
use_mirrors(mirrors, destination=destination)
def genfstab(self, flags :str = '-pU'): def genfstab(self, flags :str = '-pU'):
self.log(f"Updating {self.target}/etc/fstab", level=logging.INFO) self.log(f"Updating {self.target}/etc/fstab", level=logging.INFO)
try: try:
fstab = SysCommand(f'/usr/bin/genfstab {flags} {self.target}') gen_fstab = SysCommand(f'/usr/bin/genfstab {flags} {self.target}').decode()
except SysCallError as error: except SysCallError as error:
raise RequirementError(f'Could not generate fstab, strapping in packages most likely failed (disk out of space?)\n Error: {error}') raise RequirementError(f'Could not generate fstab, strapping in packages most likely failed (disk out of space?)\n Error: {error}')
with open(f"{self.target}/etc/fstab", 'a') as fstab_fh: if not gen_fstab:
fstab_fh.write(fstab.decode()) raise RequirementError(f'Genrating fstab returned empty value')
with open(f"{self.target}/etc/fstab", 'a') as fp:
fp.write(gen_fstab)
if not os.path.isfile(f'{self.target}/etc/fstab'): if not os.path.isfile(f'{self.target}/etc/fstab'):
raise RequirementError(f'Could not generate fstab, strapping in packages most likely failed (disk out of space?)\n Error: {fstab}') raise RequirementError(f'Could not create fstab file')
for plugin in plugins.values(): for plugin in plugins.values():
if hasattr(plugin, 'on_genfstab'): if hasattr(plugin, 'on_genfstab'):
if plugin.on_genfstab(self) is True: if plugin.on_genfstab(self) is True:
break break
with open(f"{self.target}/etc/fstab", 'a') as fstab_fh: with open(f"{self.target}/etc/fstab", 'a') as fp:
for entry in self.FSTAB_ENTRIES: for entry in self._fstab_entries:
fstab_fh.write(f'{entry}\n') fp.write(f'{entry}\n')
for mod in self._disk_config.device_modifications: for mod in self._disk_config.device_modifications:
for part_mod in mod.partitions: for part_mod in mod.partitions:
@ -583,7 +567,7 @@ class Installer:
# fstrim is owned by util-linux, a dependency of both base and systemd. # fstrim is owned by util-linux, a dependency of both base and systemd.
self.enable_service("fstrim.timer") self.enable_service("fstrim.timer")
def enable_service(self, *services: Union[str, List[str]]) -> None: def enable_service(self, services: Union[str, List[str]]) -> None:
if type(services[0]) in (list, tuple): if type(services[0]) in (list, tuple):
services = services[0] services = services[0]
@ -611,19 +595,7 @@ class Installer:
subprocess.check_call(f"/usr/bin/arch-chroot {self.target}", shell=True) subprocess.check_call(f"/usr/bin/arch-chroot {self.target}", shell=True)
def configure_nic(self, network_config: NetworkConfiguration) -> None: def configure_nic(self, network_config: NetworkConfiguration) -> None:
from .systemd import Networkd conf = network_config.as_systemd_config()
if network_config.dhcp:
conf = Networkd(Match={"Name": network_config.iface}, Network={"DHCP": "yes"})
else:
network = {"Address": network_config.ip}
if network_config.gateway:
network["Gateway"] = network_config.gateway
if network_config.dns:
dns = network_config.dns
network["DNS"] = dns if isinstance(dns, list) else [dns]
conf = Networkd(Match={"Name": network_config.iface}, Network=network)
for plugin in plugins.values(): for plugin in plugins.values():
if hasattr(plugin, 'on_configure_nic'): if hasattr(plugin, 'on_configure_nic'):
@ -663,7 +635,7 @@ class Installer:
# Otherwise, we can go ahead and add the required package # Otherwise, we can go ahead and add the required package
# and enable it's service: # and enable it's service:
else: else:
self.pacstrap('iwd') self._pacstrap('iwd')
self.enable_service('iwd') self.enable_service('iwd')
for psk in psk_files: for psk in psk_files:
@ -682,12 +654,12 @@ class Installer:
if self.helper_flags.get('base', False) is False: if self.helper_flags.get('base', False) is False:
def post_install_enable_networkd_resolved(*args :str, **kwargs :str): def post_install_enable_networkd_resolved(*args :str, **kwargs :str):
self.enable_service('systemd-networkd', 'systemd-resolved') self.enable_service(['systemd-networkd', 'systemd-resolved'])
self.post_base_install.append(post_install_enable_networkd_resolved) self.post_base_install.append(post_install_enable_networkd_resolved)
# Otherwise, we can go ahead and enable the services # Otherwise, we can go ahead and enable the services
else: else:
self.enable_service('systemd-networkd', 'systemd-resolved') self.enable_service(['systemd-networkd', 'systemd-resolved'])
return True return True
@ -704,9 +676,9 @@ class Installer:
fh.write(f"KEYMAP={storage['arguments']['keyboard-layout']}\n") fh.write(f"KEYMAP={storage['arguments']['keyboard-layout']}\n")
with open(f'{self.target}/etc/mkinitcpio.conf', 'w') as mkinit: with open(f'{self.target}/etc/mkinitcpio.conf', 'w') as mkinit:
mkinit.write(f"MODULES=({' '.join(self.MODULES)})\n") mkinit.write(f"MODULES=({' '.join(self.modules)})\n")
mkinit.write(f"BINARIES=({' '.join(self.BINARIES)})\n") mkinit.write(f"BINARIES=({' '.join(self._binaries)})\n")
mkinit.write(f"FILES=({' '.join(self.FILES)})\n") mkinit.write(f"FILES=({' '.join(self._files)})\n")
if not self._disk_encryption.hsm_device: if not self._disk_encryption.hsm_device:
# For now, if we don't use HSM we revert to the old # For now, if we don't use HSM we revert to the old
@ -714,9 +686,9 @@ class Installer:
# This is purely for stability reasons, we're going away from this. # This is purely for stability reasons, we're going away from this.
# * systemd -> udev # * systemd -> udev
# * sd-vconsole -> keymap # * sd-vconsole -> keymap
self.HOOKS = [hook.replace('systemd', 'udev').replace('sd-vconsole', 'keymap') for hook in self.HOOKS] self._hooks = [hook.replace('systemd', 'udev').replace('sd-vconsole', 'keymap') for hook in self._hooks]
mkinit.write(f"HOOKS=({' '.join(self.HOOKS)})\n") mkinit.write(f"HOOKS=({' '.join(self._hooks)})\n")
try: try:
SysCommand(f'/usr/bin/arch-chroot {self.target} mkinitcpio {" ".join(flags)}') SysCommand(f'/usr/bin/arch-chroot {self.target} mkinitcpio {" ".join(flags)}')
@ -736,25 +708,25 @@ class Installer:
if (pkg := part.fs_type.installation_pkg) is not None: if (pkg := part.fs_type.installation_pkg) is not None:
self.base_packages.append(pkg) self.base_packages.append(pkg)
if (module := part.fs_type.installation_module) is not None: if (module := part.fs_type.installation_module) is not None:
self.MODULES.append(module) self.modules.append(module)
if (binary := part.fs_type.installation_binary) is not None: if (binary := part.fs_type.installation_binary) is not None:
self.BINARIES.append(binary) self._binaries.append(binary)
# There is not yet an fsck tool for NTFS. If it's being used for the root filesystem, the hook should be removed. # There is not yet an fsck tool for NTFS. If it's being used for the root filesystem, the hook should be removed.
if part.fs_type.fs_type_mount == 'ntfs3' and part.mountpoint == self.target: if part.fs_type.fs_type_mount == 'ntfs3' and part.mountpoint == self.target:
if 'fsck' in self.HOOKS: if 'fsck' in self._hooks:
self.HOOKS.remove('fsck') self._hooks.remove('fsck')
if part in self._disk_encryption.partitions: if part in self._disk_encryption.partitions:
if self._disk_encryption.hsm_device: if self._disk_encryption.hsm_device:
# Required bby mkinitcpio to add support for fido2-device options # Required bby mkinitcpio to add support for fido2-device options
self.pacstrap('libfido2') self._pacstrap('libfido2')
if 'sd-encrypt' not in self.HOOKS: if 'sd-encrypt' not in self._hooks:
self.HOOKS.insert(self.HOOKS.index('filesystems'), 'sd-encrypt') self._hooks.insert(self._hooks.index('filesystems'), 'sd-encrypt')
else: else:
if 'encrypt' not in self.HOOKS: if 'encrypt' not in self._hooks:
self.HOOKS.insert(self.HOOKS.index('filesystems'), 'encrypt') self._hooks.insert(self._hooks.index('filesystems'), 'encrypt')
if not has_uefi(): if not has_uefi():
self.base_packages.append('grub') self.base_packages.append('grub')
@ -786,7 +758,7 @@ class Installer:
else: else:
self.log("The testing flag is not set. This system will be installed without testing repositories enabled.") self.log("The testing flag is not set. This system will be installed without testing repositories enabled.")
self.pacstrap(self.base_packages) self._pacstrap(self.base_packages)
self.helper_flags['base-strapped'] = True self.helper_flags['base-strapped'] = True
# This handles making sure that the repositories we enabled persist on the installed system # This handles making sure that the repositories we enabled persist on the installed system
@ -826,7 +798,7 @@ class Installer:
def setup_swap(self, kind :str = 'zram'): def setup_swap(self, kind :str = 'zram'):
if kind == 'zram': if kind == 'zram':
self.log(f"Setting up swap on zram") self.log(f"Setting up swap on zram")
self.pacstrap('zram-generator') self._pacstrap('zram-generator')
# We could use the default example below, but maybe not the best idea: https://github.com/archlinux/archinstall/pull/678#issuecomment-962124813 # We could use the default example below, but maybe not the best idea: https://github.com/archlinux/archinstall/pull/678#issuecomment-962124813
# zram_example_location = '/usr/share/doc/zram-generator/zram-generator.conf.example' # zram_example_location = '/usr/share/doc/zram-generator/zram-generator.conf.example'
@ -853,7 +825,7 @@ class Installer:
return None return None
def _add_systemd_bootloader(self, root_partition: disk.PartitionModification): def _add_systemd_bootloader(self, root_partition: disk.PartitionModification):
self.pacstrap('efibootmgr') self._pacstrap('efibootmgr')
if not has_uefi(): if not has_uefi():
raise HardwareIncompatibilityError raise HardwareIncompatibilityError
@ -919,7 +891,7 @@ class Installer:
# blkid doesn't trigger on loopback devices really well, # blkid doesn't trigger on loopback devices really well,
# so we'll use the old manual method until we get that sorted out. # so we'll use the old manual method until we get that sorted out.
options_entry = f'rw rootfstype={root_partition.fs_type.fs_type_mount} {" ".join(self.KERNEL_PARAMS)}\n' options_entry = f'rw rootfstype={root_partition.fs_type.fs_type_mount} {" ".join(self._kernel_params)}\n'
for sub_vol in root_partition.btrfs_subvols: for sub_vol in root_partition.btrfs_subvols:
if sub_vol.is_root(): if sub_vol.is_root():
@ -958,7 +930,7 @@ class Installer:
boot_partition: disk.PartitionModification, boot_partition: disk.PartitionModification,
root_partition: disk.PartitionModification root_partition: disk.PartitionModification
): ):
self.pacstrap('grub') # no need? self._pacstrap('grub') # no need?
_file = "/etc/default/grub" _file = "/etc/default/grub"
@ -977,7 +949,7 @@ class Installer:
log(f"GRUB boot partition: {boot_partition.dev_path}", level=logging.INFO) log(f"GRUB boot partition: {boot_partition.dev_path}", level=logging.INFO)
if has_uefi(): if has_uefi():
self.pacstrap('efibootmgr') # TODO: Do we need? Yes, but remove from minimal_installation() instead? self._pacstrap('efibootmgr') # TODO: Do we need? Yes, but remove from minimal_installation() instead?
try: try:
SysCommand(f'/usr/bin/arch-chroot {self.target} grub-install --debug --target=x86_64-efi --efi-directory=/boot --bootloader-id=GRUB --removable', peek_output=True) SysCommand(f'/usr/bin/arch-chroot {self.target} grub-install --debug --target=x86_64-efi --efi-directory=/boot --bootloader-id=GRUB --removable', peek_output=True)
@ -987,8 +959,20 @@ class Installer:
except SysCallError as error: except SysCallError as error:
raise DiskError(f"Could not install GRUB to {self.target}/boot: {error}") raise DiskError(f"Could not install GRUB to {self.target}/boot: {error}")
else: else:
device = disk.device_handler.get_device_by_partition_path(boot_partition.safe_dev_path)
if not device:
raise ValueError(f'Can not find block device: {boot_partition.safe_dev_path}')
try: try:
SysCommand(f'/usr/bin/arch-chroot {self.target} grub-install --debug --target=i386-pc --recheck {boot_partition.parent}', peek_output=True) cmd = f'/usr/bin/arch-chroot' \
f' {self.target}' \
f' grub-install' \
f' --debug' \
f' --target=i386-pc' \
f' --recheck {device.device_info.path}'
SysCommand(cmd, peek_output=True)
except SysCallError as error: except SysCallError as error:
raise DiskError(f"Failed to install GRUB boot on {boot_partition.dev_path}: {error}") raise DiskError(f"Failed to install GRUB boot on {boot_partition.dev_path}: {error}")
@ -1004,7 +988,7 @@ class Installer:
boot_partition: disk.PartitionModification, boot_partition: disk.PartitionModification,
root_partition: disk.PartitionModification root_partition: disk.PartitionModification
): ):
self.pacstrap('efibootmgr') self._pacstrap('efibootmgr')
if not has_uefi(): if not has_uefi():
raise HardwareIncompatibilityError raise HardwareIncompatibilityError
@ -1038,17 +1022,30 @@ class Installer:
# TODO: We need to detect if the encrypted device is a whole disk encryption, # TODO: We need to detect if the encrypted device is a whole disk encryption,
# or simply a partition encryption. Right now we assume it's a partition (and we always have) # or simply a partition encryption. Right now we assume it's a partition (and we always have)
log(f'Identifying root partition by PARTUUID: {root_partition.partuuid}', level=logging.DEBUG) log(f'Identifying root partition by PARTUUID: {root_partition.partuuid}', level=logging.DEBUG)
kernel_parameters.append(f'cryptdevice=PARTUUID={root_partition.partuuid}:luksdev root=/dev/mapper/luksdev rw rootfstype={root_partition.fs_type.value} {" ".join(self.KERNEL_PARAMS)}') kernel_parameters.append(f'cryptdevice=PARTUUID={root_partition.partuuid}:luksdev root=/dev/mapper/luksdev rw rootfstype={root_partition.fs_type.value} {" ".join(self._kernel_params)}')
else: else:
log(f'Root partition is an encrypted device identifying by PARTUUID: {root_partition.partuuid}', level=logging.DEBUG) log(f'Root partition is an encrypted device identifying by PARTUUID: {root_partition.partuuid}', level=logging.DEBUG)
kernel_parameters.append(f'root=PARTUUID={root_partition.partuuid} rw rootfstype={root_partition.fs_type.value} {" ".join(self.KERNEL_PARAMS)}') kernel_parameters.append(f'root=PARTUUID={root_partition.partuuid} rw rootfstype={root_partition.fs_type.value} {" ".join(self._kernel_params)}')
device = disk.device_handler.get_device_by_partition_path(boot_partition.dev_path) device = disk.device_handler.get_device_by_partition_path(boot_partition.safe_dev_path)
SysCommand(f'efibootmgr --disk {device.path} --part {device.path} --create --label "{label}" --loader {loader} --unicode \'{" ".join(kernel_parameters)}\' --verbose')
if not device:
raise ValueError(f'Unable to find block device: {boot_partition.safe_dev_path}')
cmd = f'efibootmgr ' \
f'--disk {device.device_info.path} ' \
f'--part {boot_partition.safe_dev_path} ' \
f'--create ' \
f'--label "{label}" ' \
f'--loader {loader} ' \
f'--unicode \'{" ".join(kernel_parameters)}\' ' \
f'--verbose'
SysCommand(cmd)
self.helper_flags['bootloader'] = "efistub" self.helper_flags['bootloader'] = "efistub"
def add_bootloader(self, bootloader: Bootloader) -> bool: def add_bootloader(self, bootloader: Bootloader):
""" """
Adds a bootloader to the installation instance. Adds a bootloader to the installation instance.
Archinstall supports one of three types: Archinstall supports one of three types:
@ -1056,8 +1053,7 @@ class Installer:
* grub * grub
* efistub (beta) * efistub (beta)
:param bootloader: Can be one of the three strings :param bootloader: Type of bootloader to be added
'systemd-bootctl', 'grub' or 'efistub' (beta)
""" """
for plugin in plugins.values(): for plugin in plugins.values():
@ -1089,8 +1085,8 @@ class Installer:
case Bootloader.Efistub: case Bootloader.Efistub:
self._add_efistub_bootloader(boot_partition, root_partition) self._add_efistub_bootloader(boot_partition, root_partition)
def add_additional_packages(self, *packages: Union[str, List[str]]) -> bool: def add_additional_packages(self, packages: Union[str, List[str]]) -> bool:
return self.pacstrap(*packages) return self._pacstrap(packages)
def _enable_users(self, service: str, users: List[User]): def _enable_users(self, service: str, users: List[User]):
for user in users: for user in users:
@ -1201,9 +1197,6 @@ class Installer:
except SysCallError: except SysCallError:
return False return False
def create_file(self, filename :str, owner :Optional[str] = None) -> InstallationFile:
return InstallationFile(self, filename, owner)
def set_keyboard_language(self, language: str) -> bool: def set_keyboard_language(self, language: str) -> bool:
log(f"Setting keyboard language to {language}", level=logging.INFO) log(f"Setting keyboard language to {language}", level=logging.INFO)
if len(language.strip()): if len(language.strip()):

View File

@ -482,9 +482,9 @@ class AbstractMenu:
if item in self._menus_to_enable(): if item in self._menus_to_enable():
yield item yield item
def _select_archinstall_language(self, preset_value: Language) -> Language: def _select_archinstall_language(self, preset: Language) -> Language:
from ..user_interaction.general_conf import select_archinstall_language from ..user_interaction.general_conf import select_archinstall_language
language = select_archinstall_language(self.translation_handler.translated_languages, preset_value) language = select_archinstall_language(self.translation_handler.translated_languages, preset)
self._translation_handler.activate(language) self._translation_handler.activate(language)
return language return language

View File

@ -3,7 +3,7 @@ from enum import Enum, auto
from os import system from os import system
from typing import Dict, List, Union, Any, TYPE_CHECKING, Optional, Callable from typing import Dict, List, Union, Any, TYPE_CHECKING, Optional, Callable
from simple_term_menu import TerminalMenu from simple_term_menu import TerminalMenu # type: ignore
from ..exceptions import RequirementError from ..exceptions import RequirementError
from ..output import log from ..output import log
@ -29,11 +29,11 @@ class MenuSelection:
@property @property
def single_value(self) -> Any: def single_value(self) -> Any:
return self.value return self.value # type: ignore
@property @property
def multi_value(self) -> List[Any]: def multi_value(self) -> List[Any]:
return self.value return self.value # type: ignore
class Menu(TerminalMenu): class Menu(TerminalMenu):
@ -67,7 +67,7 @@ class Menu(TerminalMenu):
preview_command: Optional[Callable] = None, preview_command: Optional[Callable] = None,
preview_size: float = 0.0, preview_size: float = 0.0,
preview_title: str = 'Info', preview_title: str = 'Info',
header: Union[List[str],str] = None, header: Union[List[str], str] = [],
allow_reset: bool = False, allow_reset: bool = False,
allow_reset_warning_msg: Optional[str] = None, allow_reset_warning_msg: Optional[str] = None,
clear_screen: bool = True, clear_screen: bool = True,
@ -141,8 +141,6 @@ class Menu(TerminalMenu):
log(f"invalid parameter at Menu() call was at <{sys._getframe(1).f_code.co_name}>",level=logging.WARNING) log(f"invalid parameter at Menu() call was at <{sys._getframe(1).f_code.co_name}>",level=logging.WARNING)
raise RequirementError("Menu() requires an iterable as option.") raise RequirementError("Menu() requires an iterable as option.")
self._default_str = str(_('(default)'))
if isinstance(p_options,dict): if isinstance(p_options,dict):
options = list(p_options.keys()) options = list(p_options.keys())
else: else:
@ -193,8 +191,7 @@ class Menu(TerminalMenu):
if default_option: if default_option:
# if a default value was specified we move that one # if a default value was specified we move that one
# to the top of the list and mark it as default as well # to the top of the list and mark it as default as well
default = f'{default_option} {self._default_str}' self._menu_options = [self._default_menu_value] + [o for o in self._menu_options if default_option != o]
self._menu_options = [default] + [o for o in self._menu_options if default_option != o]
if display_back_option and not multi and skip: if display_back_option and not multi and skip:
skip_empty_entries = True skip_empty_entries = True
@ -204,7 +201,18 @@ class Menu(TerminalMenu):
skip_empty_entries = True skip_empty_entries = True
self._menu_options += [''] self._menu_options += ['']
self._preselection(preset_values,cursor_index) preset_list: Optional[List[str]] = None
if preset_values and isinstance(preset_values, str):
preset_list = [preset_values]
calc_cursor_idx = self._determine_cursor_pos(preset_list, cursor_index)
# when we're not in multi selection mode we don't care about
# passing the pre-selection list to the menu as the position
# of the cursor is the one determining the pre-selection
if not self._multi:
preset_values = None
cursor = "> " cursor = "> "
main_menu_cursor_style = ("fg_cyan", "bold") main_menu_cursor_style = ("fg_cyan", "bold")
@ -217,8 +225,8 @@ class Menu(TerminalMenu):
menu_cursor_style=main_menu_cursor_style, menu_cursor_style=main_menu_cursor_style,
menu_highlight_style=main_menu_style, menu_highlight_style=main_menu_style,
multi_select=multi, multi_select=multi,
preselected_entries=self.preset_values, preselected_entries=preset_values,
cursor_index=self.cursor_index, cursor_index=calc_cursor_idx,
preview_command=lambda x: self._show_preview(preview_command, x), preview_command=lambda x: self._show_preview(preview_command, x),
preview_size=preview_size, preview_size=preview_size,
preview_title=preview_title, preview_title=preview_title,
@ -231,12 +239,17 @@ class Menu(TerminalMenu):
skip_empty_entries=skip_empty_entries skip_empty_entries=skip_empty_entries
) )
@property
def _default_menu_value(self) -> str:
default_str = str(_('(default)'))
return f'{self._default_option} {default_str}'
def _show_preview(self, preview_command: Optional[Callable], selection: str) -> Optional[str]: def _show_preview(self, preview_command: Optional[Callable], selection: str) -> Optional[str]:
if selection == self.back(): if selection == self.back():
return None return None
if preview_command: if preview_command:
if self._default_option is not None and f'{self._default_option} {self._default_str}' == selection: if self._default_option is not None and self._default_menu_value == selection:
selection = self._default_option selection = self._default_option
return preview_command(selection) return preview_command(selection)
@ -249,7 +262,7 @@ class Menu(TerminalMenu):
return MenuSelection(type_=MenuSelectionType.Reset) return MenuSelection(type_=MenuSelectionType.Reset)
def check_default(elem): def check_default(elem):
if self._default_option is not None and f'{self._default_option} {self._default_str}' in elem: if self._default_option is not None and self._default_menu_value in elem:
return self._default_option return self._default_option
else: else:
return elem return elem
@ -297,31 +310,44 @@ class Menu(TerminalMenu):
pos = self._menu_entries.index(value) pos = self._menu_entries.index(value)
self.set_cursor_pos(pos) self.set_cursor_pos(pos)
def _preselection(self,preset_values :Union[str, List[str]] = [], cursor_index : Optional[int] = None): def _determine_cursor_pos(
def from_preset_to_cursor(): self,
if preset_values: preset: Optional[List[str]] = None,
# if the value is not extant return 0 as cursor index cursor_index: Optional[int] = None
) -> Optional[int]:
"""
The priority order to determine the cursor position is:
1. A static cursor position was provided
2. Preset values have been provided so the cursor will be
positioned on those
3. A default value for a selection is given so the cursor
will be placed on such
"""
if cursor_index:
return cursor_index
if preset:
indexes = []
for p in preset:
try: try:
if isinstance(preset_values,str): # the options of the table selection menu
self.cursor_index = self._menu_options.index(self.preset_values) # are already escaped so we have to escape
else: # should return an error, but this is smoother # the preset values as well for the comparison
self.cursor_index = self._menu_options.index(self.preset_values[0]) if '|' in p:
except ValueError: p = p.replace('|', '\\|')
self.cursor_index = 0
self.cursor_index = cursor_index idx = self._menu_options.index(p)
if not preset_values: indexes.append(idx)
self.preset_values = None except (IndexError, ValueError):
return log(f'Error finding index of {p}: {self._menu_options}', level=logging.DEBUG)
if len(indexes) == 0:
indexes.append(0)
return indexes[0]
self.preset_values = preset_values
if self._default_option: if self._default_option:
if isinstance(preset_values,str) and self._default_option == preset_values: return self._menu_options.index(self._default_menu_value)
self.preset_values = f"{preset_values} {self._default_str}"
elif isinstance(preset_values,(list,tuple)) and self._default_option in preset_values: return None
idx = preset_values.index(self._default_option)
self.preset_values[idx] = f"{preset_values[idx]} {self._default_str}"
if cursor_index is None or not self._multi:
from_preset_to_cursor()
if not self._multi: # Not supported by the infraestructure
self.preset_values = None

View File

@ -3,12 +3,22 @@ import pathlib
import urllib.error import urllib.error
import urllib.request import urllib.request
from typing import Union, Iterable, Dict, Any, List from typing import Union, Iterable, Dict, Any, List
from dataclasses import dataclass
from .general import SysCommand from .general import SysCommand
from .output import log from .output import log
from .storage import storage from .storage import storage
def sort_mirrorlist(raw_data :bytes, sort_order=["https", "http"]) -> bytes:
@dataclass
class CustomMirror:
url: str
signcheck: str
signoptions: str
name: str
def sort_mirrorlist(raw_data :bytes, sort_order: List[str] = ['https', 'http']) -> bytes:
""" """
This function can sort /etc/pacman.d/mirrorlist according to the This function can sort /etc/pacman.d/mirrorlist according to the
mirror's URL prefix. By default places HTTPS before HTTP but it also mirror's URL prefix. By default places HTTPS before HTTP but it also
@ -28,8 +38,9 @@ def sort_mirrorlist(raw_data :bytes, sort_order=["https", "http"]) -> bytes:
from server url definitions (commented or uncommented). from server url definitions (commented or uncommented).
""" """
comments_and_whitespaces = b"" comments_and_whitespaces = b""
sort_order += ['Unknown']
categories: Dict[str, List] = {key: [] for key in sort_order}
categories = {key: [] for key in sort_order + ["Unknown"]}
for line in raw_data.split(b"\n"): for line in raw_data.split(b"\n"):
if line[0:2] in (b'##', b''): if line[0:2] in (b'##', b''):
comments_and_whitespaces += line + b'\n' comments_and_whitespaces += line + b'\n'
@ -82,18 +93,18 @@ def filter_mirrors_by_region(regions :str,
return new_list.decode('UTF-8') return new_list.decode('UTF-8')
def add_custom_mirrors(mirrors: List[str], *args :str, **kwargs :str) -> bool: def add_custom_mirrors(mirrors: List[CustomMirror]) -> bool:
""" """
This will append custom mirror definitions in pacman.conf This will append custom mirror definitions in pacman.conf
:param mirrors: A list of mirror data according to: `{'url': 'http://url.com', 'signcheck': 'Optional', 'signoptions': 'TrustAll', 'name': 'testmirror'}` :param mirrors: A list of custom mirrors
:type mirrors: dict :type mirrors: List[CustomMirror]
""" """
with open('/etc/pacman.conf', 'a') as pacman: with open('/etc/pacman.conf', 'a') as pacman:
for mirror in mirrors: for mirror in mirrors:
pacman.write(f"[{mirror['name']}]\n") pacman.write(f"[{mirror.name}]\n")
pacman.write(f"SigLevel = {mirror['signcheck']} {mirror['signoptions']}\n") pacman.write(f"SigLevel = {mirror.signcheck} {mirror.signoptions}\n")
pacman.write(f"Server = {mirror['url']}\n") pacman.write(f"Server = {mirror.url}\n")
return True return True
@ -123,7 +134,7 @@ def insert_mirrors(mirrors :Dict[str, Any], *args :str, **kwargs :str) -> bool:
def use_mirrors( def use_mirrors(
regions: Dict[str, Iterable[str]], regions: Dict[str, Iterable[str]],
destination: str = '/etc/pacman.d/mirrorlist' destination: str = '/etc/pacman.d/mirrorlist'
) -> None: ):
log(f'A new package mirror-list has been created: {destination}', level=logging.INFO) log(f'A new package mirror-list has been created: {destination}', level=logging.INFO)
with open(destination, 'w') as mirrorlist: with open(destination, 'w') as mirrorlist:
for region, mirrors in regions.items(): for region, mirrors in regions.items():
@ -146,7 +157,7 @@ def re_rank_mirrors(
def list_mirrors(sort_order :List[str] = ["https", "http"]) -> Dict[str, Any]: def list_mirrors(sort_order :List[str] = ["https", "http"]) -> Dict[str, Any]:
regions = {} regions: Dict[str, Dict[str, Any]] = {}
if storage['arguments']['offline']: if storage['arguments']['offline']:
with pathlib.Path('/etc/pacman.d/mirrorlist').open('rb') as fh: with pathlib.Path('/etc/pacman.d/mirrorlist').open('rb') as fh:
@ -170,18 +181,19 @@ def list_mirrors(sort_order :List[str] = ["https", "http"]) -> Dict[str, Any]:
if len(line.strip()) == 0: if len(line.strip()) == 0:
continue continue
line = line.decode('UTF-8').strip('\n').strip('\r') clean_line = line.decode('UTF-8').strip('\n').strip('\r')
if line[:3] == '## ':
region = line[3:] if clean_line[:3] == '## ':
elif line[:10] == '#Server = ': region = clean_line[3:]
elif clean_line[:10] == '#Server = ':
regions.setdefault(region, {}) regions.setdefault(region, {})
url = line.lstrip('#Server = ') url = clean_line.lstrip('#Server = ')
regions[region][url] = True regions[region][url] = True
elif line.startswith('Server = '): elif clean_line.startswith('Server = '):
regions.setdefault(region, {}) regions.setdefault(region, {})
url = line.lstrip('Server = ') url = clean_line.lstrip('Server = ')
regions[region][url] = True regions[region][url] = True
return regions return regions

View File

@ -1,8 +1,9 @@
from __future__ import annotations from __future__ import annotations
from dataclasses import dataclass import logging
from dataclasses import dataclass, field
from enum import Enum from enum import Enum
from typing import List, Optional, Dict, Union, Any, TYPE_CHECKING from typing import List, Optional, Dict, Union, Any, TYPE_CHECKING, Tuple
from ..output import log from ..output import log
from ..storage import storage from ..storage import storage
@ -24,7 +25,7 @@ class NetworkConfiguration:
ip: Optional[str] = None ip: Optional[str] = None
dhcp: bool = True dhcp: bool = True
gateway: Optional[str] = None gateway: Optional[str] = None
dns: Union[None, List[str]] = None dns: List[str] = field(default_factory=list)
def __str__(self): def __str__(self):
if self.is_iso(): if self.is_iso():
@ -53,6 +54,33 @@ class NetworkConfiguration:
return data return data
def as_systemd_config(self) -> str:
match: List[Tuple[str, str]] = []
network: List[Tuple[str, str]] = []
if self.iface:
match.append(('Name', self.iface))
if self.dhcp:
network.append(('DHCP', 'yes'))
else:
if self.ip:
network.append(('Address', self.ip))
if self.gateway:
network.append(('Gateway', self.gateway))
for dns in self.dns:
network.append(('DNS', dns))
config = {'Match': match, 'Network': network}
config_str = ''
for top, entries in config.items():
config_str += f'[{top}]\n'
config_str += '\n'.join([f'{k}={v}' for k, v in entries])
config_str += '\n\n'
return config_str
def json(self) -> Dict: def json(self) -> Dict:
# for json serialization when calling json.dumps(...) on this class # for json serialization when calling json.dumps(...) on this class
return self.__dict__ return self.__dict__
@ -90,41 +118,14 @@ class NetworkConfigurationHandler:
# Perform a copy of the config # Perform a copy of the config
if self._configuration.is_iso(): if self._configuration.is_iso():
installation.copy_iso_network_config( installation.copy_iso_network_config(
enable_services=True) # Sources the ISO network configuration to the install medium. enable_services=True # Sources the ISO network configuration to the install medium.
)
elif self._configuration.is_network_manager(): elif self._configuration.is_network_manager():
installation.add_additional_packages(["networkmanager"]) installation.add_additional_packages(["networkmanager"])
if (profile := storage['arguments'].get('profile_config')) and profile.is_desktop_type_profile: if (profile := storage['arguments'].get('profile_config')) and profile.is_desktop_type_profile:
installation.add_additional_packages(["network-manager-applet"]) installation.add_additional_packages(["network-manager-applet"])
installation.enable_service('NetworkManager.service') installation.enable_service('NetworkManager.service')
def _backwards_compability_config(self, config: Union[str,Dict[str, str]]) -> Union[List[NetworkConfiguration], NetworkConfiguration, None]:
def get(config: Dict[str, str], key: str) -> List[str]:
if (value := config.get(key, None)) is not None:
return [value]
return []
if isinstance(config, str): # is a ISO network
return NetworkConfiguration(NicType.ISO)
elif config.get('NetworkManager'): # is a network manager configuration
return NetworkConfiguration(NicType.NM)
elif 'ip' in config:
return [NetworkConfiguration(
NicType.MANUAL,
iface=config.get('nic', ''),
ip=config.get('ip'),
gateway=config.get('gateway', ''),
dns=get(config, 'dns'),
dhcp=False
)]
elif 'nic' in config:
return [NetworkConfiguration(
NicType.MANUAL,
iface=config.get('nic', ''),
dhcp=True
)]
else: # not recognized
return None
def _parse_manual_config(self, configs: List[Dict[str, Any]]) -> Optional[List[NetworkConfiguration]]: def _parse_manual_config(self, configs: List[Dict[str, Any]]) -> Optional[List[NetworkConfiguration]]:
configurations = [] configurations = []
@ -145,13 +146,17 @@ class NetworkConfigurationHandler:
log(_('Manual nic configuration with no auto DHCP requires an IP address'), fg='red') log(_('Manual nic configuration with no auto DHCP requires an IP address'), fg='red')
exit(1) exit(1)
dns = manual_config.get('dns', [])
if not isinstance(dns, list):
dns = [dns]
configurations.append( configurations.append(
NetworkConfiguration( NetworkConfiguration(
NicType.MANUAL, NicType.MANUAL,
iface=iface, iface=iface,
ip=ip, ip=ip,
gateway=manual_config.get('gateway', ''), gateway=manual_config.get('gateway', ''),
dns=manual_config.get('dns', []), dns=dns,
dhcp=False dhcp=False
) )
) )
@ -176,8 +181,5 @@ class NetworkConfigurationHandler:
self._configuration = NetworkConfiguration(type_) self._configuration = NetworkConfiguration(type_)
else: # manual configuration settings else: # manual configuration settings
self._configuration = self._parse_manual_config([config]) self._configuration = self._parse_manual_config([config])
else: # old style definitions else:
network_config = self._backwards_compability_config(config) log(f'Unable to parse network configuration: {config}', level=logging.DEBUG)
if network_config:
return network_config
return None

View File

@ -3,77 +3,86 @@ import importlib
import logging import logging
import os import os
import sys import sys
import pathlib
import urllib.parse import urllib.parse
import urllib.request import urllib.request
from importlib import metadata from importlib import metadata
from pathlib import Path
from typing import Optional, List from typing import Optional, List
from types import ModuleType
from .output import log from .output import log
from .storage import storage from .storage import storage
plugins = {} plugins = {}
# 1: List archinstall.plugin definitions # 1: List archinstall.plugin definitions
# 2: Load the plugin entrypoint # 2: Load the plugin entrypoint
# 3: Initiate the plugin and store it as .name in plugins # 3: Initiate the plugin and store it as .name in plugins
for plugin_definition in metadata.entry_points().select(group='archinstall.plugin'): for plugin_definition in metadata.entry_points().select(group='archinstall.plugin'):
plugin_entrypoint = plugin_definition.load() plugin_entrypoint = plugin_definition.load()
try: try:
plugins[plugin_definition.name] = plugin_entrypoint() plugins[plugin_definition.name] = plugin_entrypoint()
except Exception as err: except Exception as err:
log(err, level=logging.ERROR) log(f'Error: {err}', level=logging.ERROR)
log(f"The above error was detected when loading the plugin: {plugin_definition}", fg="red", level=logging.ERROR) log(f"The above error was detected when loading the plugin: {plugin_definition}", fg="red", level=logging.ERROR)
# The following functions and core are support structures for load_plugin() def localize_path(path: Path) -> Path:
def localize_path(profile_path :str) -> str: """
if (url := urllib.parse.urlparse(profile_path)).scheme and url.scheme in ('https', 'http'): Support structures for load_plugin()
converted_path = f"/tmp/{os.path.basename(profile_path).replace('.py', '')}_{hashlib.md5(os.urandom(12)).hexdigest()}.py" """
url = urllib.parse.urlparse(str(path))
if url.scheme and url.scheme in ('https', 'http'):
converted_path = Path(f'/tmp/{path.stem}_{hashlib.md5(os.urandom(12)).hexdigest()}.py')
with open(converted_path, "w") as temp_file: with open(converted_path, "w") as temp_file:
temp_file.write(urllib.request.urlopen(url.geturl()).read().decode('utf-8')) temp_file.write(urllib.request.urlopen(url.geturl()).read().decode('utf-8'))
return converted_path return converted_path
else: else:
return profile_path return path
def import_via_path(path :str, namespace :Optional[str] = None) -> ModuleType: def import_via_path(path: Path, namespace: Optional[str] = None) -> Optional[str]:
if not namespace: if not namespace:
namespace = os.path.basename(path) namespace = os.path.basename(path)
if namespace == '__init__.py': if namespace == '__init__.py':
path = pathlib.PurePath(path)
namespace = path.parent.name namespace = path.parent.name
try: try:
spec = importlib.util.spec_from_file_location(namespace, path) spec = importlib.util.spec_from_file_location(namespace, path)
imported = importlib.util.module_from_spec(spec) if spec and spec.loader:
sys.modules[namespace] = imported imported = importlib.util.module_from_spec(spec)
spec.loader.exec_module(sys.modules[namespace]) sys.modules[namespace] = imported
spec.loader.exec_module(sys.modules[namespace])
return namespace return namespace
except Exception as err: except Exception as err:
log(err, level=logging.ERROR) log(f'Error: {err}', level=logging.ERROR)
log(f"The above error was detected when loading the plugin: {path}", fg="red", level=logging.ERROR) log(f"The above error was detected when loading the plugin: {path}", fg="red", level=logging.ERROR)
try: try:
del(sys.modules[namespace]) # noqa: E275 del sys.modules[namespace]
except: except Exception:
pass pass
def find_nth(haystack :List[str], needle :str, n :int) -> int: return namespace
start = haystack.find(needle)
while start >= 0 and n > 1:
start = haystack.find(needle, start + len(needle))
n -= 1
return start
def load_plugin(path :str) -> ModuleType:
parsed_url = urllib.parse.urlparse(path) def find_nth(haystack: List[str], needle: str, n: int) -> Optional[int]:
log(f"Loading plugin {parsed_url}.", fg="gray", level=logging.INFO) indices = [idx for idx, elem in enumerate(haystack) if elem == needle]
if n <= len(indices):
return indices[n - 1]
return None
def load_plugin(path: Path):
namespace: Optional[str] = None
parsed_url = urllib.parse.urlparse(str(path))
log(f"Loading plugin from url {parsed_url}.", level=logging.INFO)
# The Profile was not a direct match on a remote URL # The Profile was not a direct match on a remote URL
if not parsed_url.scheme: if not parsed_url.scheme:
@ -81,9 +90,10 @@ def load_plugin(path :str) -> ModuleType:
if os.path.isfile(path): if os.path.isfile(path):
namespace = import_via_path(path) namespace = import_via_path(path)
elif parsed_url.scheme in ('https', 'http'): elif parsed_url.scheme in ('https', 'http'):
namespace = import_via_path(localize_path(path)) localized = localize_path(path)
namespace = import_via_path(localized)
if namespace in sys.modules: if namespace and namespace in sys.modules:
# Version dependency via __archinstall__version__ variable (if present) in the plugin # Version dependency via __archinstall__version__ variable (if present) in the plugin
# Any errors in version inconsistency will be handled through normal error handling if not defined. # Any errors in version inconsistency will be handled through normal error handling if not defined.
if hasattr(sys.modules[namespace], '__archinstall__version__'): if hasattr(sys.modules[namespace], '__archinstall__version__'):
@ -99,7 +109,7 @@ def load_plugin(path :str) -> ModuleType:
plugins[namespace] = sys.modules[namespace].Plugin() plugins[namespace] = sys.modules[namespace].Plugin()
log(f"Plugin {plugins[namespace]} has been loaded.", fg="gray", level=logging.INFO) log(f"Plugin {plugins[namespace]} has been loaded.", fg="gray", level=logging.INFO)
except Exception as err: except Exception as err:
log(err, level=logging.ERROR) log(f'Error: {err}', level=logging.ERROR)
log(f"The above error was detected when initiating the plugin: {path}", fg="red", level=logging.ERROR) log(f"The above error was detected when initiating the plugin: {path}", fg="red", level=logging.ERROR)
else: else:
log(f"Plugin '{path}' is missing a valid entry-point or is corrupt.", fg="yellow", level=logging.WARNING) log(f"Plugin '{path}' is missing a valid entry-point or is corrupt.", fg="yellow", level=logging.WARNING)

View File

@ -194,23 +194,23 @@ class ProfileHandler:
install_session.add_additional_packages(f"{kernel}-headers") install_session.add_additional_packages(f"{kernel}-headers")
# I've had kernel regen fail if it wasn't installed before nvidia-dkms # I've had kernel regen fail if it wasn't installed before nvidia-dkms
install_session.add_additional_packages("dkms xorg-server xorg-xinit nvidia-dkms") install_session.add_additional_packages(['dkms', 'xorg-server', 'xorg-xinit', 'nvidia-dkms'])
return return
elif 'amdgpu' in driver_pkgs: elif 'amdgpu' in driver_pkgs:
# The order of these two are important if amdgpu is installed #808 # The order of these two are important if amdgpu is installed #808
if 'amdgpu' in install_session.MODULES: if 'amdgpu' in install_session.modules:
install_session.MODULES.remove('amdgpu') install_session.modules.remove('amdgpu')
install_session.MODULES.append('amdgpu') install_session.modules.append('amdgpu')
if 'radeon' in install_session.MODULES: if 'radeon' in install_session.modules:
install_session.MODULES.remove('radeon') install_session.modules.remove('radeon')
install_session.MODULES.append('radeon') install_session.modules.append('radeon')
install_session.add_additional_packages(additional_pkg) install_session.add_additional_packages(additional_pkg)
except Exception as err: except Exception as err:
log(f"Could not handle nvidia and linuz-zen specific situations during xorg installation: {err}", level=logging.WARNING, fg="yellow") log(f"Could not handle nvidia and linuz-zen specific situations during xorg installation: {err}", level=logging.WARNING, fg="yellow")
# Prep didn't run, so there's no driver to install # Prep didn't run, so there's no driver to install
install_session.add_additional_packages("xorg-server xorg-xinit") install_session.add_additional_packages(['xorg-server', 'xorg-xinit'])
def install_profile_config(self, install_session: 'Installer', profile_config: ProfileConfiguration): def install_profile_config(self, install_session: 'Installer', profile_config: ProfileConfiguration):
profile = profile_config.profile profile = profile_config.profile

View File

@ -1,6 +1,6 @@
import logging import logging
import time import time
from typing import Iterator from typing import Iterator, Optional
from .exceptions import SysCallError from .exceptions import SysCallError
from .general import SysCommand, SysCommandWorker, locate_binary from .general import SysCommand, SysCommandWorker, locate_binary
from .installer import Installer from .installer import Installer
@ -8,51 +8,11 @@ from .output import log
from .storage import storage from .storage import storage
class Ini:
def __init__(self, *args :str, **kwargs :str):
"""
Limited INI handler for now.
Supports multiple keywords through dictionary list items.
"""
self.kwargs = kwargs
def __str__(self) -> str:
result = ''
first_row_done = False
for top_level in self.kwargs:
if first_row_done:
result += f"\n[{top_level}]\n"
else:
result += f"[{top_level}]\n"
first_row_done = True
for key, val in self.kwargs[top_level].items():
if type(val) == list:
for item in val:
result += f"{key}={item}\n"
else:
result += f"{key}={val}\n"
return result
class Systemd(Ini):
"""
Placeholder class to do systemd specific setups.
"""
class Networkd(Systemd):
"""
Placeholder class to do systemd-network specific setups.
"""
class Boot: class Boot:
def __init__(self, installation: Installer): def __init__(self, installation: Installer):
self.instance = installation self.instance = installation
self.container_name = 'archinstall' self.container_name = 'archinstall'
self.session = None self.session: Optional[SysCommandWorker] = None
self.ready = False self.ready = False
def __enter__(self) -> 'Boot': def __enter__(self) -> 'Boot':
@ -63,17 +23,18 @@ class Boot:
self.session = existing_session.session self.session = existing_session.session
self.ready = existing_session.ready self.ready = existing_session.ready
else: else:
# '-P' or --console=pipe could help us not having to do a bunch
# of os.write() calls, but instead use pipes (stdin, stdout and stderr) as usual.
self.session = SysCommandWorker([ self.session = SysCommandWorker([
'/usr/bin/systemd-nspawn', '/usr/bin/systemd-nspawn',
'-D', self.instance.target, '-D', str(self.instance.target),
'--timezone=off', '--timezone=off',
'-b', '-b',
'--no-pager', '--no-pager',
'--machine', self.container_name '--machine', self.container_name
]) ])
# '-P' or --console=pipe could help us not having to do a bunch of os.write() calls, but instead use pipes (stdin, stdout and stderr) as usual.
if not self.ready: if not self.ready and self.session:
while self.session.is_alive(): while self.session.is_alive():
if b' login:' in self.session: if b' login:' in self.session:
self.ready = True self.ready = True
@ -91,25 +52,31 @@ class Boot:
log(f"The error above occurred in a temporary boot-up of the installation {self.instance}", level=logging.ERROR, fg="red") log(f"The error above occurred in a temporary boot-up of the installation {self.instance}", level=logging.ERROR, fg="red")
shutdown = None shutdown = None
shutdown_exit_code = -1 shutdown_exit_code: Optional[int] = -1
try: try:
shutdown = SysCommand(f'systemd-run --machine={self.container_name} --pty shutdown now') shutdown = SysCommand(f'systemd-run --machine={self.container_name} --pty shutdown now')
except SysCallError as error: except SysCallError as error:
shutdown_exit_code = error.exit_code shutdown_exit_code = error.exit_code
while self.session.is_alive(): if self.session:
time.sleep(0.25) while self.session.is_alive():
time.sleep(0.25)
if shutdown: if shutdown and shutdown.exit_code:
shutdown_exit_code = shutdown.exit_code shutdown_exit_code = shutdown.exit_code
if self.session.exit_code == 0 or shutdown_exit_code == 0: if self.session and (self.session.exit_code == 0 or shutdown_exit_code == 0):
storage['active_boot'] = None storage['active_boot'] = None
else: else:
raise SysCallError(f"Could not shut down temporary boot of {self.instance}: {self.session.exit_code}/{shutdown_exit_code}", exit_code=next(filter(bool, [self.session.exit_code, shutdown_exit_code]))) session_exit_code = self.session.exit_code if self.session else -1
def __iter__(self) -> Iterator[str]: raise SysCallError(
f"Could not shut down temporary boot of {self.instance}: {session_exit_code}/{shutdown_exit_code}",
exit_code=next(filter(bool, [session_exit_code, shutdown_exit_code]))
)
def __iter__(self) -> Iterator[bytes]:
if self.session: if self.session:
for value in self.session: for value in self.session:
yield value yield value

View File

@ -3,7 +3,6 @@ from __future__ import annotations
import logging import logging
import pathlib import pathlib
from typing import List, Any, Optional, Dict, TYPE_CHECKING from typing import List, Any, Optional, Dict, TYPE_CHECKING
from typing import Union
from ..locale_helpers import list_keyboard_languages, list_timezones from ..locale_helpers import list_keyboard_languages, list_timezones
from ..menu import MenuSelectionType, Menu, TextInput from ..menu import MenuSelectionType, Menu, TextInput
@ -29,13 +28,18 @@ def ask_ntp(preset: bool = True) -> bool:
return False if choice.value == Menu.no() else True return False if choice.value == Menu.no() else True
def ask_hostname(preset: str = None) -> str: def ask_hostname(preset: str = '') -> str:
while True: while True:
hostname = TextInput(_('Desired hostname for the installation: '), preset).run().strip() hostname = TextInput(
str(_('Desired hostname for the installation: ')),
preset
).run().strip()
if hostname: if hostname:
return hostname return hostname
def ask_for_a_timezone(preset: str = None) -> str:
def ask_for_a_timezone(preset: Optional[str] = None) -> Optional[str]:
timezones = list_timezones() timezones = list_timezones()
default = 'UTC' default = 'UTC'
@ -48,10 +52,12 @@ def ask_for_a_timezone(preset: str = None) -> str:
match choice.type_: match choice.type_:
case MenuSelectionType.Skip: return preset case MenuSelectionType.Skip: return preset
case MenuSelectionType.Selection: return choice.value case MenuSelectionType.Selection: return choice.single_value
return None
def ask_for_audio_selection(desktop: bool = True, preset: Union[str, None] = None) -> Union[str, None]: def ask_for_audio_selection(desktop: bool = True, preset: Optional[str] = None) -> Optional[str]:
no_audio = str(_('No audio server')) no_audio = str(_('No audio server'))
choices = ['pipewire', 'pulseaudio'] if desktop else ['pipewire', 'pulseaudio', no_audio] choices = ['pipewire', 'pulseaudio'] if desktop else ['pipewire', 'pulseaudio', no_audio]
default = 'pipewire' if desktop else no_audio default = 'pipewire' if desktop else no_audio
@ -60,10 +66,12 @@ def ask_for_audio_selection(desktop: bool = True, preset: Union[str, None] = Non
match choice.type_: match choice.type_:
case MenuSelectionType.Skip: return preset case MenuSelectionType.Skip: return preset
case MenuSelectionType.Selection: return choice.value case MenuSelectionType.Selection: return choice.single_value
return None
def select_language(preset_value: str = None) -> str: def select_language(preset: Optional[str] = None) -> Optional[str]:
""" """
Asks the user to select a language Asks the user to select a language
Usually this is combined with :ref:`archinstall.list_keyboard_languages`. Usually this is combined with :ref:`archinstall.list_keyboard_languages`.
@ -75,17 +83,18 @@ def select_language(preset_value: str = None) -> str:
# sort alphabetically and then by length # sort alphabetically and then by length
sorted_kb_lang = sorted(sorted(list(kb_lang)), key=len) sorted_kb_lang = sorted(sorted(list(kb_lang)), key=len)
selected_lang = Menu( choice = Menu(
_('Select keyboard layout'), _('Select keyboard layout'),
sorted_kb_lang, sorted_kb_lang,
preset_values=preset_value, preset_values=preset,
sort=False sort=False
).run() ).run()
if selected_lang.value is None: match choice.type_:
return preset_value case MenuSelectionType.Skip: return preset
case MenuSelectionType.Selection: return choice.single_value
return selected_lang.value return None
def select_mirror_regions(preset_values: Dict[str, Any] = {}) -> Dict[str, Any]: def select_mirror_regions(preset_values: Dict[str, Any] = {}) -> Dict[str, Any]:
@ -100,8 +109,10 @@ def select_mirror_regions(preset_values: Dict[str, Any] = {}) -> Dict[str, Any]:
preselected = None preselected = None
else: else:
preselected = list(preset_values.keys()) preselected = list(preset_values.keys())
mirrors = list_mirrors() mirrors = list_mirrors()
selected_mirror = Menu(
choice = Menu(
_('Select one of the regions to download packages from'), _('Select one of the regions to download packages from'),
list(mirrors.keys()), list(mirrors.keys()),
preset_values=preselected, preset_values=preselected,
@ -109,13 +120,18 @@ def select_mirror_regions(preset_values: Dict[str, Any] = {}) -> Dict[str, Any]:
allow_reset=True allow_reset=True
).run() ).run()
match selected_mirror.type_: match choice.type_:
case MenuSelectionType.Reset: return {} case MenuSelectionType.Reset:
case MenuSelectionType.Skip: return preset_values return {}
case _: return {selected: mirrors[selected] for selected in selected_mirror.value} case MenuSelectionType.Skip:
return preset_values
case MenuSelectionType.Selection:
return {selected: mirrors[selected] for selected in choice.multi_value}
return {}
def select_archinstall_language(languages: List[Language], preset_value: Language) -> Language: def select_archinstall_language(languages: List[Language], preset: Language) -> Language:
# these are the displayed language names which can either be # these are the displayed language names which can either be
# the english name of a language or, if present, the # the english name of a language or, if present, the
# name of the language in its own language # name of the language in its own language
@ -128,15 +144,15 @@ def select_archinstall_language(languages: List[Language], preset_value: Languag
choice = Menu( choice = Menu(
title, title,
list(options.keys()), list(options.keys()),
default_option=preset_value.display_name, default_option=preset.display_name,
preview_size=0.5 preview_size=0.5
).run() ).run()
match choice.type_: match choice.type_:
case MenuSelectionType.Skip: case MenuSelectionType.Skip: return preset
return preset_value case MenuSelectionType.Selection: return options[choice.single_value]
case MenuSelectionType.Selection:
return options[choice.value] raise ValueError('Language selection not handled')
def ask_additional_packages_to_install(pre_set_packages: List[str] = []) -> List[str]: def ask_additional_packages_to_install(pre_set_packages: List[str] = []) -> List[str]:
@ -223,4 +239,6 @@ def select_additional_repositories(preset: List[str]) -> List[str]:
match choice.type_: match choice.type_:
case MenuSelectionType.Skip: return preset case MenuSelectionType.Skip: return preset
case MenuSelectionType.Reset: return [] case MenuSelectionType.Reset: return []
case MenuSelectionType.Selection: return choice.value case MenuSelectionType.Selection: return choice.single_value
return []

View File

@ -1,6 +1,6 @@
from __future__ import annotations from __future__ import annotations
from typing import Any, TYPE_CHECKING from typing import Any, TYPE_CHECKING, Optional
from ..locale_helpers import list_locales from ..locale_helpers import list_locales
from ..menu import Menu, MenuSelectionType from ..menu import Menu, MenuSelectionType
@ -9,33 +9,37 @@ if TYPE_CHECKING:
_: Any _: Any
def select_locale_lang(preset: str = None) -> str: def select_locale_lang(preset: Optional[str] = None) -> Optional[str]:
locales = list_locales() locales = list_locales()
locale_lang = set([locale.split()[0] for locale in locales]) locale_lang = set([locale.split()[0] for locale in locales])
selected_locale = Menu( choice = Menu(
_('Choose which locale language to use'), _('Choose which locale language to use'),
list(locale_lang), list(locale_lang),
sort=True, sort=True,
preset_values=preset preset_values=preset
).run() ).run()
match selected_locale.type_: match choice.type_:
case MenuSelectionType.Selection: return selected_locale.value case MenuSelectionType.Selection: return choice.single_value
case MenuSelectionType.Skip: return preset case MenuSelectionType.Skip: return preset
return None
def select_locale_enc(preset: str = None) -> str:
def select_locale_enc(preset: Optional[str] = None) -> Optional[str]:
locales = list_locales() locales = list_locales()
locale_enc = set([locale.split()[1] for locale in locales]) locale_enc = set([locale.split()[1] for locale in locales])
selected_locale = Menu( choice = Menu(
_('Choose which locale encoding to use'), _('Choose which locale encoding to use'),
list(locale_enc), list(locale_enc),
sort=True, sort=True,
preset_values=preset preset_values=preset
).run() ).run()
match selected_locale.type_: match choice.type_:
case MenuSelectionType.Selection: return selected_locale.value case MenuSelectionType.Selection: return choice.single_value
case MenuSelectionType.Skip: return preset case MenuSelectionType.Skip: return preset
return None

View File

@ -223,7 +223,7 @@ def perform_installation(mountpoint: Path):
# If the user provided a list of services to be enabled, pass the list to the enable_service function. # If the user provided a list of services to be enabled, pass the list to the enable_service function.
# Note that while it's called enable_service, it can actually take a list of services and iterate it. # Note that while it's called enable_service, it can actually take a list of services and iterate it.
if archinstall.arguments.get('services', None): if archinstall.arguments.get('services', None):
installation.enable_service(*archinstall.arguments['services']) installation.enable_service(archinstall.arguments.get('services', []))
# If the user provided custom commands to be run post-installation, execute them now. # If the user provided custom commands to be run post-installation, execute them now.
if archinstall.arguments.get('custom-commands', None): if archinstall.arguments.get('custom-commands', None):

View File

@ -239,7 +239,7 @@ def perform_installation(mountpoint: Path, exec_mode: ExecutionMode):
handler.config_installer(installation) handler.config_installer(installation)
if archinstall.arguments.get('packages', None) and archinstall.arguments.get('packages', None)[0] != '': if archinstall.arguments.get('packages', None) and archinstall.arguments.get('packages', None)[0] != '':
installation.add_additional_packages(archinstall.arguments.get('packages', None)) installation.add_additional_packages(archinstall.arguments.get('packages', []))
if users := archinstall.arguments.get('!users', None): if users := archinstall.arguments.get('!users', None):
installation.create_users(users) installation.create_users(users)
@ -278,7 +278,7 @@ def perform_installation(mountpoint: Path, exec_mode: ExecutionMode):
# If the user provided a list of services to be enabled, pass the list to the enable_service function. # If the user provided a list of services to be enabled, pass the list to the enable_service function.
# Note that while it's called enable_service, it can actually take a list of services and iterate it. # Note that while it's called enable_service, it can actually take a list of services and iterate it.
if archinstall.arguments.get('services', None): if archinstall.arguments.get('services', None):
installation.enable_service(*archinstall.arguments['services']) installation.enable_service(archinstall.arguments.get('services', []))
# If the user provided custom commands to be run post-installation, execute them now. # If the user provided custom commands to be run post-installation, execute them now.
if archinstall.arguments.get('custom-commands', None): if archinstall.arguments.get('custom-commands', None):

View File

@ -147,7 +147,7 @@ def perform_installation(mountpoint: Path):
handler.config_installer(installation) handler.config_installer(installation)
if archinstall.arguments.get('packages', None) and archinstall.arguments.get('packages', None)[0] != '': if archinstall.arguments.get('packages', None) and archinstall.arguments.get('packages', None)[0] != '':
installation.add_additional_packages(archinstall.arguments.get('packages', None)) installation.add_additional_packages(archinstall.arguments.get('packages', []))
if users := archinstall.arguments.get('!users', None): if users := archinstall.arguments.get('!users', None):
installation.create_users(users) installation.create_users(users)
@ -186,7 +186,7 @@ def perform_installation(mountpoint: Path):
# If the user provided a list of services to be enabled, pass the list to the enable_service function. # If the user provided a list of services to be enabled, pass the list to the enable_service function.
# Note that while it's called enable_service, it can actually take a list of services and iterate it. # Note that while it's called enable_service, it can actually take a list of services and iterate it.
if archinstall.arguments.get('services', None): if archinstall.arguments.get('services', None):
installation.enable_service(*archinstall.arguments['services']) installation.enable_service(archinstall.arguments.get('services', []))
# If the user provided custom commands to be run post-installation, execute them now. # If the user provided custom commands to be run post-installation, execute them now.
if archinstall.arguments.get('custom-commands', None): if archinstall.arguments.get('custom-commands', None):

View File

@ -1,14 +0,0 @@
[mypy]
python_version = 3.10
follow_imports = silent
exclude = (?x)(^archinstall/lib/disk/btrfs/btrfssubvolumeinfo\.py$
| ^archinstall/lib/general\.py$
| ^archinstall/lib/hardware\.py$
| ^archinstall/lib/menu/menu\.py$
| ^archinstall/lib/mirrors\.py$
| ^archinstall/lib/plugins\.py$
| ^archinstall/lib/installer\.py$
| ^archinstall/lib/systemd\.py$
| ^archinstall/lib/user_interaction/general_conf\.py$
| ^archinstall/lib/user_interaction/locale_conf\.py$)
files = archinstall/

View File

@ -60,6 +60,7 @@ packages = ["archinstall"]
[tool.mypy] [tool.mypy]
python_version = "3.10" python_version = "3.10"
files = "archinstall/"
exclude = "tests" exclude = "tests"
[tool.bandit] [tool.bandit]