Fix whitespace issues detected by flake8 (#2652)

This commit also removes exclusions that are no longer needed.
This commit is contained in:
correctmost 2024-08-28 11:40:53 -04:00 committed by GitHub
parent 7b5f1f72f9
commit 62b4099c8d
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
26 changed files with 141 additions and 127 deletions

View File

@ -1,11 +1,11 @@
[flake8] [flake8]
count = True count = True
# Several of the following could be autofixed or improved by running the code through psf/black # Several of the following could be autofixed or improved by running the code through psf/black
ignore = E123,E126,E128,E203,E227,E231,E261,E302,E402,E722,F541,W191,W292,W293,W503,W504 ignore = E123,E128,E722,F541,W191,W503,W504
max-complexity = 40 max-complexity = 40
max-line-length = 236 max-line-length = 220
show-source = True show-source = True
statistics = True statistics = True
builtins = _ builtins = _
per-file-ignores = __init__.py:F401,F403,F405 simple_menu.py:C901,W503 guided.py:C901 network_configuration.py:F821 per-file-ignores = __init__.py:F401
exclude = .git,__pycache__,docs,actions-runner exclude = .git,__pycache__,build,docs,actions-runner

View File

@ -1,5 +1,5 @@
on: [ push, pull_request ] on: [ push, pull_request ]
name: flake8 linting (15 ignores) name: flake8 linting (7 ignores)
jobs: jobs:
flake8: flake8:
runs-on: ubuntu-latest runs-on: ubuntu-latest

View File

@ -27,7 +27,7 @@ The exceptions to PEP8 are:
* Archinstall uses [tabs instead of spaces](https://www.python.org/dev/peps/pep-0008/#tabs-or-spaces) simply to make it * Archinstall uses [tabs instead of spaces](https://www.python.org/dev/peps/pep-0008/#tabs-or-spaces) simply to make it
easier for non-IDE developers to navigate the code *(Tab display-width should be equal to 4 spaces)*. Exception to the easier for non-IDE developers to navigate the code *(Tab display-width should be equal to 4 spaces)*. Exception to the
rule are comments that need fine-tuned indentation for documentation purposes. rule are comments that need fine-tuned indentation for documentation purposes.
* [Line length](https://www.python.org/dev/peps/pep-0008/#maximum-line-length) a maximum line length is enforced via flake8 with 236 characters * [Line length](https://www.python.org/dev/peps/pep-0008/#maximum-line-length) a maximum line length is enforced via flake8 with 220 characters
* [Line breaks before/after binary operator](https://www.python.org/dev/peps/pep-0008/#should-a-line-break-before-or-after-a-binary-operator) * [Line breaks before/after binary operator](https://www.python.org/dev/peps/pep-0008/#should-a-line-break-before-or-after-a-binary-operator)
is not enforced, as long as the style of line breaks is consistent within the same code block. is not enforced, as long as the style of line breaks is consistent within the same code block.
* Archinstall should always be saved with **Unix-formatted line endings** and no other platform-specific formats. * Archinstall should always be saved with **Unix-formatted line endings** and no other platform-specific formats.

View File

@ -6,6 +6,7 @@ from archinstall.default_profiles.xorg import XorgProfile
if TYPE_CHECKING: if TYPE_CHECKING:
_: Any _: Any
class CosmicProfile(XorgProfile): class CosmicProfile(XorgProfile):
def __init__(self) -> None: def __init__(self) -> None:
super().__init__('cosmic-epoch', ProfileType.DesktopEnv, description='', advanced=True) super().__init__('cosmic-epoch', ProfileType.DesktopEnv, description='', advanced=True)

View File

@ -6,6 +6,7 @@ from archinstall.default_profiles.xorg import XorgProfile
if TYPE_CHECKING: if TYPE_CHECKING:
_: Any _: Any
class PlasmaProfile(XorgProfile): class PlasmaProfile(XorgProfile):
def __init__(self) -> None: def __init__(self) -> None:
super().__init__('KDE Plasma', ProfileType.DesktopEnv, description='') super().__init__('KDE Plasma', ProfileType.DesktopEnv, description='')

View File

@ -40,6 +40,7 @@ class GreeterType(Enum):
if '--advanced' in sys.argv: if '--advanced' in sys.argv:
CosmicSession = "cosmic-greeter" CosmicSession = "cosmic-greeter"
class SelectResult(Enum): class SelectResult(Enum):
NewSelection = auto() NewSelection = auto()
SameSelection = auto() SameSelection = auto()

View File

@ -42,7 +42,7 @@ class Boot:
storage['active_boot'] = self storage['active_boot'] = self
return self return self
def __exit__(self, *args :str, **kwargs :str) -> None: def __exit__(self, *args: str, **kwargs: str) -> None:
# b''.join(sys_command('sync')) # No need to, since the underlying fs() object will call sync. # b''.join(sys_command('sync')) # No need to, since the underlying fs() object will call sync.
# TODO: https://stackoverflow.com/questions/28157929/how-to-safely-handle-an-exception-inside-a-context-manager # TODO: https://stackoverflow.com/questions/28157929/how-to-safely-handle-an-exception-inside-a-context-manager

View File

@ -167,9 +167,9 @@ class DeviceHandler(object):
return partition.partuuid if partition else None return partition.partuuid if partition else None
def get_btrfs_info( def get_btrfs_info(
self, self,
dev_path: Path, dev_path: Path,
lsblk_info: Optional[LsblkInfo] = None lsblk_info: Optional[LsblkInfo] = None
) -> List[_BtrfsSubvolumeInfo]: ) -> List[_BtrfsSubvolumeInfo]:
if not lsblk_info: if not lsblk_info:
lsblk_info = get_lsblk_info(dev_path) lsblk_info = get_lsblk_info(dev_path)

View File

@ -17,7 +17,7 @@ class UnknownFilesystemFormat(Exception):
class SysCallError(Exception): class SysCallError(Exception):
def __init__(self, message :str, exit_code :Optional[int] = None, worker :Optional['SysCommandWorker'] = None) -> None: def __init__(self, message: str, exit_code: Optional[int] = None, worker: Optional['SysCommandWorker'] = None) -> None:
super(SysCallError, self).__init__(message) super(SysCallError, self).__init__(message)
self.message = message self.message = message
self.exit_code = exit_code self.exit_code = exit_code
@ -43,4 +43,4 @@ class Deprecated(Exception):
class DownloadTimeout(Exception): class DownloadTimeout(Exception):
''' '''
Download timeout exception raised by DownloadTimer. Download timeout exception raised by DownloadTimer.
''' '''

View File

@ -29,18 +29,18 @@ if TYPE_CHECKING:
from .installer import Installer from .installer import Installer
def generate_password(length :int = 64) -> str: def generate_password(length: int = 64) -> str:
haystack = string.printable # digits, ascii_letters, punctuation (!"#$[] etc) and whitespace haystack = string.printable # digits, ascii_letters, punctuation (!"#$[] etc) and whitespace
return ''.join(secrets.choice(haystack) for i in range(length)) return ''.join(secrets.choice(haystack) for i in range(length))
def locate_binary(name :str) -> str: def locate_binary(name: str) -> str:
if path := which(name): if path := which(name):
return path return path
raise RequirementError(f"Binary {name} does not exist.") raise RequirementError(f"Binary {name} does not exist.")
def clear_vt100_escape_codes(data :Union[bytes, str]) -> Union[bytes, str]: def clear_vt100_escape_codes(data: Union[bytes, str]) -> Union[bytes, str]:
# https://stackoverflow.com/a/43627833/929999 # https://stackoverflow.com/a/43627833/929999
vt100_escape_regex = r'\x1B\[[?0-9;]*[a-zA-Z]' vt100_escape_regex = r'\x1B\[[?0-9;]*[a-zA-Z]'
if isinstance(data, bytes): if isinstance(data, bytes):
@ -80,6 +80,7 @@ def jsonify(obj: Any, safe: bool = True) -> Any:
return obj return obj
class JSON(json.JSONEncoder, json.JSONDecoder): class JSON(json.JSONEncoder, json.JSONDecoder):
""" """
A safe JSON encoder that will omit private information in dicts (starting with !) A safe JSON encoder that will omit private information in dicts (starting with !)
@ -101,13 +102,13 @@ class UNSAFE_JSON(json.JSONEncoder, json.JSONDecoder):
class SysCommandWorker: class SysCommandWorker:
def __init__( def __init__(
self, self,
cmd :Union[str, List[str]], cmd: Union[str, List[str]],
callbacks :Optional[Dict[str, Any]] = None, callbacks: Optional[Dict[str, Any]] = None,
peek_output :Optional[bool] = False, peek_output: Optional[bool] = False,
environment_vars :Optional[Dict[str, Any]] = None, environment_vars: Optional[Dict[str, Any]] = None,
logfile :Optional[None] = None, logfile: Optional[None] = None,
working_directory :Optional[str] = './', working_directory: Optional[str] = './',
remove_vt100_escape_codes_from_lines :bool = True remove_vt100_escape_codes_from_lines: bool = True
): ):
callbacks = callbacks or {} callbacks = callbacks or {}
environment_vars = environment_vars or {} environment_vars = environment_vars or {}
@ -116,25 +117,25 @@ class SysCommandWorker:
cmd = shlex.split(cmd) cmd = shlex.split(cmd)
if cmd: if cmd:
if cmd[0][0] != '/' and cmd[0][:2] != './': # pathlib.Path does not work well if cmd[0][0] != '/' and cmd[0][:2] != './': # pathlib.Path does not work well
cmd[0] = locate_binary(cmd[0]) cmd[0] = locate_binary(cmd[0])
self.cmd = cmd self.cmd = cmd
self.callbacks = callbacks self.callbacks = callbacks
self.peek_output = peek_output self.peek_output = peek_output
# define the standard locale for command outputs. For now the C ascii one. Can be overridden # define the standard locale for command outputs. For now the C ascii one. Can be overridden
self.environment_vars = {**storage.get('CMD_LOCALE',{}),**environment_vars} self.environment_vars = {**storage.get('CMD_LOCALE', {}), **environment_vars}
self.logfile = logfile self.logfile = logfile
self.working_directory = working_directory self.working_directory = working_directory
self.exit_code :Optional[int] = None self.exit_code: Optional[int] = None
self._trace_log = b'' self._trace_log = b''
self._trace_log_pos = 0 self._trace_log_pos = 0
self.poll_object = epoll() self.poll_object = epoll()
self.child_fd :Optional[int] = None self.child_fd: Optional[int] = None
self.started :Optional[float] = None self.started: Optional[float] = None
self.ended :Optional[float] = None self.ended: Optional[float] = None
self.remove_vt100_escape_codes_from_lines :bool = remove_vt100_escape_codes_from_lines self.remove_vt100_escape_codes_from_lines: bool = remove_vt100_escape_codes_from_lines
def __contains__(self, key: bytes) -> bool: def __contains__(self, key: bytes) -> bool:
""" """
@ -150,7 +151,7 @@ class SysCommandWorker:
return False return False
def __iter__(self, *args :str, **kwargs :Dict[str, Any]) -> Iterator[bytes]: def __iter__(self, *args: str, **kwargs: Dict[str, Any]) -> Iterator[bytes]:
last_line = self._trace_log.rfind(b'\n') last_line = self._trace_log.rfind(b'\n')
lines = filter(None, self._trace_log[self._trace_log_pos:last_line].splitlines()) lines = filter(None, self._trace_log[self._trace_log_pos:last_line].splitlines())
for line in lines: for line in lines:
@ -174,7 +175,7 @@ class SysCommandWorker:
def __enter__(self) -> 'SysCommandWorker': def __enter__(self) -> 'SysCommandWorker':
return self return self
def __exit__(self, *args :str) -> None: def __exit__(self, *args: str) -> None:
# b''.join(sys_command('sync')) # No need to, since the underlying fs() object will call sync. # b''.join(sys_command('sync')) # No need to, since the underlying fs() object will call sync.
# TODO: https://stackoverflow.com/questions/28157929/how-to-safely-handle-an-exception-inside-a-context-manager # TODO: https://stackoverflow.com/questions/28157929/how-to-safely-handle-an-exception-inside-a-context-manager
@ -208,7 +209,7 @@ class SysCommandWorker:
return False return False
def write(self, data: bytes, line_ending :bool = True) -> int: def write(self, data: bytes, line_ending: bool = True) -> int:
assert isinstance(data, bytes) # TODO: Maybe we can support str as well and encode it assert isinstance(data, bytes) # TODO: Maybe we can support str as well and encode it
self.make_sure_we_are_executing() self.make_sure_we_are_executing()
@ -228,7 +229,7 @@ class SysCommandWorker:
self.make_sure_we_are_executing() self.make_sure_we_are_executing()
return self._trace_log_pos return self._trace_log_pos
def seek(self, pos :int) -> None: def seek(self, pos: int) -> None:
self.make_sure_we_are_executing() self.make_sure_we_are_executing()
# Safety check to ensure 0 < pos < len(tracelog) # Safety check to ensure 0 < pos < len(tracelog)
self._trace_log_pos = min(max(0, pos), len(self._trace_log)) self._trace_log_pos = min(max(0, pos), len(self._trace_log))
@ -337,19 +338,19 @@ class SysCommandWorker:
return True return True
def decode(self, encoding :str = 'UTF-8') -> str: def decode(self, encoding: str = 'UTF-8') -> str:
return self._trace_log.decode(encoding) return self._trace_log.decode(encoding)
class SysCommand: class SysCommand:
def __init__(self, def __init__(self,
cmd :Union[str, List[str]], cmd: Union[str, List[str]],
callbacks :Dict[str, Callable[[Any], Any]] = {}, callbacks: Dict[str, Callable[[Any], Any]] = {},
start_callback :Optional[Callable[[Any], Any]] = None, start_callback: Optional[Callable[[Any], Any]] = None,
peek_output :Optional[bool] = False, peek_output: Optional[bool] = False,
environment_vars :Optional[Dict[str, Any]] = None, environment_vars: Optional[Dict[str, Any]] = None,
working_directory :Optional[str] = './', working_directory: Optional[str] = './',
remove_vt100_escape_codes_from_lines :bool = True): remove_vt100_escape_codes_from_lines: bool = True):
self._callbacks = callbacks.copy() self._callbacks = callbacks.copy()
if start_callback: if start_callback:
@ -361,25 +362,25 @@ class SysCommand:
self.working_directory = working_directory self.working_directory = working_directory
self.remove_vt100_escape_codes_from_lines = remove_vt100_escape_codes_from_lines self.remove_vt100_escape_codes_from_lines = remove_vt100_escape_codes_from_lines
self.session :Optional[SysCommandWorker] = None self.session: Optional[SysCommandWorker] = None
self.create_session() self.create_session()
def __enter__(self) -> Optional[SysCommandWorker]: def __enter__(self) -> Optional[SysCommandWorker]:
return self.session return self.session
def __exit__(self, *args :str, **kwargs :Dict[str, Any]) -> None: def __exit__(self, *args: str, **kwargs: Dict[str, Any]) -> None:
# b''.join(sys_command('sync')) # No need to, since the underlying fs() object will call sync. # b''.join(sys_command('sync')) # No need to, since the underlying fs() object will call sync.
# TODO: https://stackoverflow.com/questions/28157929/how-to-safely-handle-an-exception-inside-a-context-manager # TODO: https://stackoverflow.com/questions/28157929/how-to-safely-handle-an-exception-inside-a-context-manager
if len(args) >= 2 and args[1]: if len(args) >= 2 and args[1]:
error(args[1]) error(args[1])
def __iter__(self, *args :List[Any], **kwargs :Dict[str, Any]) -> Iterator[bytes]: def __iter__(self, *args: List[Any], **kwargs: Dict[str, Any]) -> Iterator[bytes]:
if self.session: if self.session:
for line in self.session: for line in self.session:
yield line yield line
def __getitem__(self, key :slice) -> Optional[bytes]: def __getitem__(self, key: slice) -> Optional[bytes]:
if not self.session: if not self.session:
raise KeyError(f"SysCommand() does not have an active session.") raise KeyError(f"SysCommand() does not have an active session.")
elif type(key) is slice: elif type(key) is slice:
@ -390,7 +391,7 @@ class SysCommand:
else: else:
raise ValueError("SysCommand() doesn't have key & value pairs, only slices, SysCommand('ls')[:10] as an example.") raise ValueError("SysCommand() doesn't have key & value pairs, only slices, SysCommand('ls')[:10] as an example.")
def __repr__(self, *args :List[Any], **kwargs :Dict[str, Any]) -> str: def __repr__(self, *args: List[Any], **kwargs: Dict[str, Any]) -> str:
return self.decode('UTF-8', errors='backslashreplace') or '' return self.decode('UTF-8', errors='backslashreplace') or ''
def __json__(self) -> Dict[str, Union[str, bool, List[str], Dict[str, Any], Optional[bool], Optional[Dict[str, Any]]]]: def __json__(self) -> Dict[str, Union[str, bool, List[str], Dict[str, Any], Optional[bool], Optional[Dict[str, Any]]]]:
@ -467,7 +468,7 @@ def _pid_exists(pid: int) -> bool:
return False return False
def run_custom_user_commands(commands :List[str], installation :Installer) -> None: def run_custom_user_commands(commands: List[str], installation: Installer) -> None:
for index, command in enumerate(commands): for index, command in enumerate(commands):
script_path = f"/var/tmp/user-command.{index}.sh" script_path = f"/var/tmp/user-command.{index}.sh"
chroot_path = f"{installation.target}/{script_path}" chroot_path = f"{installation.target}/{script_path}"
@ -481,7 +482,7 @@ def run_custom_user_commands(commands :List[str], installation :Installer) -> No
os.unlink(chroot_path) os.unlink(chroot_path)
def json_stream_to_structure(configuration_identifier : str, stream :str, target :dict) -> bool : def json_stream_to_structure(configuration_identifier: str, stream: str, target: dict) -> bool:
""" """
Load a JSON encoded dictionary from a stream and merge it into an existing dictionary. Load a JSON encoded dictionary from a stream and merge it into an existing dictionary.
A stream can be a filepath, a URL or a raw JSON string. A stream can be a filepath, a URL or a raw JSON string.
@ -520,6 +521,6 @@ def json_stream_to_structure(configuration_identifier : str, stream :str, target
return True return True
def secret(x :str) -> str: def secret(x: str) -> str:
""" return * with len equal to to the input string """ """ return * with len equal to to the input string """
return '*' * len(x) return '*' * len(x)

View File

@ -99,7 +99,7 @@ class GlobalMenu(AbstractMenu):
self._menu_options['!root-password'] = \ self._menu_options['!root-password'] = \
Selector( Selector(
_('Root password'), _('Root password'),
lambda preset:self._set_root_password(), lambda preset: self._set_root_password(),
display_func=lambda x: secret(x) if x else '') display_func=lambda x: secret(x) if x else '')
self._menu_options['!users'] = \ self._menu_options['!users'] = \
Selector( Selector(
@ -178,7 +178,7 @@ class GlobalMenu(AbstractMenu):
preview_func=self._prev_install_invalid_config, preview_func=self._prev_install_invalid_config,
no_store=True) no_store=True)
self._menu_options['abort'] = Selector(_('Abort'), exec_func=lambda n,v:exit(1)) self._menu_options['abort'] = Selector(_('Abort'), exec_func=lambda n, v: exit(1))
def _missing_configs(self) -> List[str]: def _missing_configs(self) -> List[str]:
def check(s: str) -> bool: def check(s: str) -> bool:

View File

@ -140,6 +140,7 @@ class GfxDriver(Enum):
return packages return packages
class _SysInfo: class _SysInfo:
def __init__(self) -> None: def __init__(self) -> None:
pass pass

View File

@ -152,7 +152,7 @@ def ask_additional_packages_to_install(preset: List[str] = []) -> List[str]:
return packages return packages
def add_number_of_parallel_downloads(input_number :Optional[int] = None) -> Optional[int]: def add_number_of_parallel_downloads(input_number: Optional[int] = None) -> Optional[int]:
max_recommended = 5 max_recommended = 5
print(_(f"This option enables the number of parallel downloads that can occur during package downloads")) print(_(f"This option enables the number of parallel downloads that can occur during package downloads"))
print(_("Enter the number of parallel downloads to be enabled.\n\nNote:\n")) print(_("Enter the number of parallel downloads to be enabled.\n\nNote:\n"))

View File

@ -30,14 +30,14 @@ def list_x11_keyboard_languages() -> List[str]:
).decode().splitlines() ).decode().splitlines()
def verify_keyboard_layout(layout :str) -> bool: def verify_keyboard_layout(layout: str) -> bool:
for language in list_keyboard_languages(): for language in list_keyboard_languages():
if layout.lower() == language.lower(): if layout.lower() == language.lower():
return True return True
return False return False
def verify_x11_keyboard_layout(layout :str) -> bool: def verify_x11_keyboard_layout(layout: str) -> bool:
for language in list_x11_keyboard_languages(): for language in list_x11_keyboard_languages():
if layout.lower() == language.lower(): if layout.lower() == language.lower():
return True return True
@ -57,7 +57,7 @@ def get_kb_layout() -> str:
for line in lines: for line in lines:
if "VC Keymap: " in line: if "VC Keymap: " in line:
vcline = line vcline = line
if vcline == "": if vcline == "":
return "" return ""
@ -68,7 +68,7 @@ def get_kb_layout() -> str:
return layout return layout
def set_kb_layout(locale :str) -> bool: def set_kb_layout(locale: str) -> bool:
if len(locale.strip()): if len(locale.strip()):
if not verify_keyboard_layout(locale): if not verify_keyboard_layout(locale):
error(f"Invalid keyboard locale specified: {locale}") error(f"Invalid keyboard locale specified: {locale}")

View File

@ -168,7 +168,7 @@ class Menu(TerminalMenu): # type: ignore[misc]
menu_title = f'\n{action_info}{title}\n' menu_title = f'\n{action_info}{title}\n'
if header: if header:
if not isinstance(header,(list,tuple)): if not isinstance(header, (list, tuple)):
header = [header] header = [header]
menu_title += '\n' + '\n'.join(header) menu_title += '\n' + '\n'.join(header)
@ -294,13 +294,13 @@ class Menu(TerminalMenu): # type: ignore[misc]
return selection return selection
def set_cursor_pos(self,pos :int) -> None: def set_cursor_pos(self, pos: int) -> None:
if pos and 0 < pos < len(self._menu_entries): if pos and 0 < pos < len(self._menu_entries):
self._view.active_menu_index = pos self._view.active_menu_index = pos
else: else:
self._view.active_menu_index = 0 # we define a default self._view.active_menu_index = 0 # we define a default
def set_cursor_pos_entry(self,value :str) -> None: def set_cursor_pos_entry(self, value: str) -> None:
pos = self._menu_entries.index(value) pos = self._menu_entries.index(value)
self.set_cursor_pos(pos) self.set_cursor_pos(pos)

View File

@ -286,7 +286,7 @@ def select_custom_mirror(prompt: str = '', preset: List[CustomMirror] = []) -> l
return custom_mirrors return custom_mirrors
def sort_mirrors_by_performance(mirror_list :List[MirrorStatusEntryV3]) -> List[MirrorStatusEntryV3]: def sort_mirrors_by_performance(mirror_list: List[MirrorStatusEntryV3]) -> List[MirrorStatusEntryV3]:
return sorted(mirror_list, key=lambda mirror: (mirror.score, mirror.speed)) return sorted(mirror_list, key=lambda mirror: (mirror.score, mirror.speed))
@ -298,8 +298,8 @@ def _parse_mirror_list(mirrorlist: str) -> Dict[str, List[MirrorStatusEntryV3]]:
for mirror in mirror_status.urls: for mirror in mirror_status.urls:
# We filter out mirrors that have bad criteria values # We filter out mirrors that have bad criteria values
if any([ if any([
mirror.active is False, # Disabled by mirror-list admins mirror.active is False, # Disabled by mirror-list admins
mirror.last_sync is None, # Has not synced recently mirror.last_sync is None, # Has not synced recently
# mirror.score (error rate) over time reported from backend: https://github.com/archlinux/archweb/blob/31333d3516c91db9a2f2d12260bd61656c011fd1/mirrors/utils.py#L111C22-L111C66 # mirror.score (error rate) over time reported from backend: https://github.com/archlinux/archweb/blob/31333d3516c91db9a2f2d12260bd61656c011fd1/mirrors/utils.py#L111C22-L111C66
(mirror.score is None or mirror.score >= 100), (mirror.score is None or mirror.score >= 100),
]): ]):

View File

@ -129,7 +129,7 @@ class PackageSearch:
class LocalPackage: class LocalPackage:
name: str name: str
version: str version: str
description:str description: str
architecture: str architecture: str
url: str url: str
licenses: str licenses: str

View File

@ -13,28 +13,28 @@ from ..output import info, debug
class MirrorStatusEntryV3(pydantic.BaseModel): class MirrorStatusEntryV3(pydantic.BaseModel):
url :str url: str
protocol :str protocol: str
active :bool active: bool
country :str country: str
country_code :str country_code: str
isos :bool isos: bool
ipv4 :bool ipv4: bool
ipv6 :bool ipv6: bool
details :str details: str
delay :int|None = None delay: int | None = None
last_sync :datetime.datetime|None = None last_sync: datetime.datetime | None = None
duration_avg :float|None = None duration_avg: float | None = None
duration_stddev :float|None = None duration_stddev: float | None = None
completion_pct :float|None = None completion_pct: float | None = None
score :int|None = None score: int | None = None
_latency :float|None = None _latency: float | None = None
_speed :float|None = None _speed: float | None = None
_hostname :str|None = None _hostname: str | None = None
_port :int|None = None _port: int | None = None
@property @property
def speed(self) -> float|None: def speed(self) -> float | None:
if self._speed is None: if self._speed is None:
info(f"Checking download speed of {self._hostname}[{self.score}] by fetching: {self.url}core/os/x86_64/core.db") info(f"Checking download speed of {self._hostname}[{self.score}] by fetching: {self.url}core/os/x86_64/core.db")
req = urllib.request.Request(url=f"{self.url}core/os/x86_64/core.db") req = urllib.request.Request(url=f"{self.url}core/os/x86_64/core.db")
@ -58,7 +58,7 @@ class MirrorStatusEntryV3(pydantic.BaseModel):
return self._speed return self._speed
@property @property
def latency(self) -> float|None: def latency(self) -> float | None:
""" """
Latency measures the miliseconds between one ICMP request & response. Latency measures the miliseconds between one ICMP request & response.
It only does so once because we check if self._latency is None, and a ICMP timeout result in -1 It only does so once because we check if self._latency is None, and a ICMP timeout result in -1
@ -72,7 +72,7 @@ class MirrorStatusEntryV3(pydantic.BaseModel):
return self._latency return self._latency
@pydantic.field_validator('score', mode='before') @pydantic.field_validator('score', mode='before')
def validate_score(cls, value) -> int|None: def validate_score(cls, value) -> int | None:
if value is not None: if value is not None:
value = round(value) value = round(value)
debug(f" score: {value}") debug(f" score: {value}")
@ -87,16 +87,17 @@ class MirrorStatusEntryV3(pydantic.BaseModel):
debug(f"Loaded mirror {self._hostname}" + (f" with current score of {round(self.score)}" if self.score else '')) debug(f"Loaded mirror {self._hostname}" + (f" with current score of {round(self.score)}" if self.score else ''))
return self return self
class MirrorStatusListV3(pydantic.BaseModel): class MirrorStatusListV3(pydantic.BaseModel):
cutoff :int cutoff: int
last_check :datetime.datetime last_check: datetime.datetime
num_checks :int num_checks: int
urls :List[MirrorStatusEntryV3] urls: List[MirrorStatusEntryV3]
version :int version: int
@pydantic.model_validator(mode='before') @pydantic.model_validator(mode='before')
@classmethod @classmethod
def check_model(cls, data: Dict[str, int|datetime.datetime|List[MirrorStatusEntryV3]]) -> Dict[str, int|datetime.datetime|List[MirrorStatusEntryV3]]: def check_model(cls, data: Dict[str, int | datetime.datetime | List[MirrorStatusEntryV3]]) -> Dict[str, int | datetime.datetime | List[MirrorStatusEntryV3]]:
if data.get('version') == 3: if data.get('version') == 3:
return data return data

View File

@ -15,6 +15,7 @@ from .exceptions import SysCallError, DownloadTimeout
from .output import error, info from .output import error, info
from .pacman import Pacman from .pacman import Pacman
class DownloadTimer(): class DownloadTimer():
''' '''
Context manager for timing downloads with timeouts. Context manager for timing downloads with timeouts.
@ -65,14 +66,14 @@ class DownloadTimer():
self.start_time = None self.start_time = None
def get_hw_addr(ifname :str) -> str: def get_hw_addr(ifname: str) -> str:
import fcntl import fcntl
s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
ret = fcntl.ioctl(s.fileno(), 0x8927, struct.pack('256s', bytes(ifname, 'utf-8')[:15])) ret = fcntl.ioctl(s.fileno(), 0x8927, struct.pack('256s', bytes(ifname, 'utf-8')[:15]))
return ':'.join('%02x' % b for b in ret[18:24]) return ':'.join('%02x' % b for b in ret[18:24])
def list_interfaces(skip_loopback :bool = True) -> Dict[str, str]: def list_interfaces(skip_loopback: bool = True) -> Dict[str, str]:
interfaces = {} interfaces = {}
for index, iface in socket.if_nameindex(): for index, iface in socket.if_nameindex():
@ -147,9 +148,10 @@ def calc_checksum(icmp_packet) -> int:
checksum = (checksum >> 16) + (checksum & 0xFFFF) checksum = (checksum >> 16) + (checksum & 0xFFFF)
checksum = ~checksum & 0xFFFF checksum = ~checksum & 0xFFFF
return checksum return checksum
def build_icmp(payload: bytes) -> bytes: def build_icmp(payload: bytes) -> bytes:
# Define the ICMP Echo Request packet # Define the ICMP Echo Request packet
icmp_packet = struct.pack('!BBHHH', 8, 0, 0, 0, 1) + payload icmp_packet = struct.pack('!BBHHH', 8, 0, 0, 0, 1) + payload
@ -158,11 +160,12 @@ def build_icmp(payload: bytes) -> bytes:
return struct.pack('!BBHHH', 8, 0, checksum, 0, 1) + payload return struct.pack('!BBHHH', 8, 0, checksum, 0, 1) + payload
def ping(hostname, timeout=5) -> int: def ping(hostname, timeout=5) -> int:
watchdog = select.epoll() watchdog = select.epoll()
started = time.time() started = time.time()
random_identifier = f'archinstall-{random.randint(1000, 9999)}'.encode() random_identifier = f'archinstall-{random.randint(1000, 9999)}'.encode()
# Create a raw socket (requires root, which should be fine on archiso) # Create a raw socket (requires root, which should be fine on archiso)
icmp_socket = socket.socket(socket.AF_INET, socket.SOCK_RAW, socket.IPPROTO_ICMP) icmp_socket = socket.socket(socket.AF_INET, socket.SOCK_RAW, socket.IPPROTO_ICMP)
watchdog.register(icmp_socket, select.EPOLLIN | select.EPOLLHUP) watchdog.register(icmp_socket, select.EPOLLIN | select.EPOLLHUP)

View File

@ -13,6 +13,7 @@ from .storage import storage
if TYPE_CHECKING: if TYPE_CHECKING:
from _typeshed import DataclassInstance from _typeshed import DataclassInstance
class FormattedOutput: class FormattedOutput:
@classmethod @classmethod
@ -214,21 +215,21 @@ def _stylize_output(
Adds styling to a text given a set of color arguments. Adds styling to a text given a set of color arguments.
""" """
colors = { colors = {
'black' : '0', 'black': '0',
'red' : '1', 'red': '1',
'green' : '2', 'green': '2',
'yellow' : '3', 'yellow': '3',
'blue' : '4', 'blue': '4',
'magenta' : '5', 'magenta': '5',
'cyan' : '6', 'cyan': '6',
'white' : '7', 'white': '7',
'teal' : '8;5;109', # Extended 256-bit colors (not always supported) 'teal': '8;5;109', # Extended 256-bit colors (not always supported)
'orange' : '8;5;208', # https://www.lihaoyi.com/post/BuildyourownCommandLinewithANSIescapecodes.html#256-colors 'orange': '8;5;208', # https://www.lihaoyi.com/post/BuildyourownCommandLinewithANSIescapecodes.html#256-colors
'darkorange' : '8;5;202', 'darkorange': '8;5;202',
'gray' : '8;5;246', 'gray': '8;5;246',
'grey' : '8;5;246', 'grey': '8;5;246',
'darkgray' : '8;5;240', 'darkgray': '8;5;240',
'lightgray' : '8;5;256' 'lightgray': '8;5;256'
} }
foreground = {key: f'3{colors[key]}' for key in colors} foreground = {key: f'3{colors[key]}' for key in colors}
@ -330,10 +331,12 @@ def log(
sys.stdout.write(f"{text}\n") sys.stdout.write(f"{text}\n")
sys.stdout.flush() sys.stdout.flush()
def _count_wchars(string: str) -> int: def _count_wchars(string: str) -> int:
"Count the total number of wide characters contained in a string" "Count the total number of wide characters contained in a string"
return sum(unicodedata.east_asian_width(c) in 'FW' for c in string) return sum(unicodedata.east_asian_width(c) in 'FW' for c in string)
def unicode_ljust(string: str, width: int, fillbyte: str = ' ') -> str: def unicode_ljust(string: str, width: int, fillbyte: str = ' ') -> str:
"""Return a left-justified unicode string of length width. """Return a left-justified unicode string of length width.
>>> unicode_ljust('Hello', 15, '*') >>> unicode_ljust('Hello', 15, '*')
@ -347,6 +350,7 @@ def unicode_ljust(string: str, width: int, fillbyte: str = ' ') -> str:
""" """
return string.ljust(width - _count_wchars(string), fillbyte) return string.ljust(width - _count_wchars(string), fillbyte)
def unicode_rjust(string: str, width: int, fillbyte: str = ' ') -> str: def unicode_rjust(string: str, width: int, fillbyte: str = ' ') -> str:
"""Return a right-justified unicode string of length width. """Return a right-justified unicode string of length width.
>>> unicode_rjust('Hello', 15, '*') >>> unicode_rjust('Hello', 15, '*')

View File

@ -26,7 +26,7 @@ def _make_request(url: str, params: Dict) -> Any:
return urlopen(full_url, context=ssl_context) return urlopen(full_url, context=ssl_context)
def group_search(name :str) -> List[PackageSearchResult]: def group_search(name: str) -> List[PackageSearchResult]:
# TODO UPSTREAM: Implement /json/ for the groups search # TODO UPSTREAM: Implement /json/ for the groups search
try: try:
response = _make_request(BASE_GROUP_URL, {'name': name}) response = _make_request(BASE_GROUP_URL, {'name': name})
@ -42,7 +42,7 @@ def group_search(name :str) -> List[PackageSearchResult]:
return [PackageSearchResult(**package) for package in json.loads(data)['results']] return [PackageSearchResult(**package) for package in json.loads(data)['results']]
def package_search(package :str) -> PackageSearch: def package_search(package: str) -> PackageSearch:
""" """
Finds a specific package via the package database. Finds a specific package via the package database.
It makes a simple web-request, which might be a bit slow. It makes a simple web-request, which might be a bit slow.
@ -59,7 +59,7 @@ def package_search(package :str) -> PackageSearch:
return PackageSearch.from_json(json_data) return PackageSearch.from_json(json_data)
def find_package(package :str) -> List[PackageSearchResult]: def find_package(package: str) -> List[PackageSearchResult]:
data = package_search(package) data = package_search(package)
results = [] results = []
@ -77,7 +77,7 @@ def find_package(package :str) -> List[PackageSearchResult]:
return results return results
def find_packages(*names :str) -> Dict[str, Any]: def find_packages(*names: str) -> Dict[str, Any]:
""" """
This function returns the search results for many packages. This function returns the search results for many packages.
The function itself is rather slow, so consider not sending to The function itself is rather slow, so consider not sending to
@ -91,7 +91,7 @@ def find_packages(*names :str) -> Dict[str, Any]:
return result return result
def validate_package_list(packages :list) -> Tuple[list, list]: def validate_package_list(packages: list) -> Tuple[list, list]:
""" """
Validates a list of given packages. Validates a list of given packages.
return: Tuple of lists containing valid packavges in the first and invalid return: Tuple of lists containing valid packavges in the first and invalid
@ -103,7 +103,7 @@ def validate_package_list(packages :list) -> Tuple[list, list]:
return list(valid_packages), list(invalid_packages) return list(valid_packages), list(invalid_packages)
def installed_package(package :str) -> LocalPackage: def installed_package(package: str) -> LocalPackage:
package_info = {} package_info = {}
try: try:
for line in Pacman.run(f"-Q --info {package}"): for line in Pacman.run(f"-Q --info {package}"):

View File

@ -23,7 +23,7 @@ class Pacman:
self.target = target self.target = target
@staticmethod @staticmethod
def run(args :str, default_cmd :str = 'pacman') -> SysCommand: def run(args: str, default_cmd: str = 'pacman') -> SysCommand:
""" """
A centralized function to call `pacman` from. A centralized function to call `pacman` from.
It also protects us from colliding with other running pacman sessions (if used locally). It also protects us from colliding with other running pacman sessions (if used locally).

View File

@ -1,5 +1,6 @@
from enum import Enum from enum import Enum
class Repo(Enum): class Repo(Enum):
Multilib = "multilib" Multilib = "multilib"
Testing = "testing" Testing = "testing"

View File

@ -15,8 +15,8 @@ storage: Dict[str, Any] = {
'LOG_FILE': Path('install.log'), 'LOG_FILE': Path('install.log'),
'MOUNT_POINT': Path('/mnt/archinstall'), 'MOUNT_POINT': Path('/mnt/archinstall'),
'ENC_IDENTIFIER': 'ainst', 'ENC_IDENTIFIER': 'ainst',
'DISK_TIMEOUTS' : 1, # seconds 'DISK_TIMEOUTS': 1, # seconds
'DISK_RETRY_ATTEMPTS' : 5, # RETRY_ATTEMPTS * DISK_TIMEOUTS is used in disk operations 'DISK_RETRY_ATTEMPTS': 5, # RETRY_ATTEMPTS * DISK_TIMEOUTS is used in disk operations
'CMD_LOCALE':{'LC_ALL':'C'}, # default locale for execution commands. Can be overridden with set_cmd_locale() 'CMD_LOCALE': {'LC_ALL': 'C'}, # default locale for execution commands. Can be overridden with set_cmd_locale()
'CMD_LOCALE_DEFAULT':{'LC_ALL':'C'}, # should be the same as the former. Not be used except in reset_cmd_locale() 'CMD_LOCALE_DEFAULT': {'LC_ALL': 'C'}, # should be the same as the former. Not be used except in reset_cmd_locale()
} }

View File

@ -7,4 +7,4 @@ for script in [pathlib.Path(x) for x in glob.glob(f"{pathlib.Path(__file__).pare
if script.stem in ['__init__', 'list']: if script.stem in ['__init__', 'list']:
continue continue
print(f" {script.stem}") print(f" {script.stem}")

View File

@ -47,13 +47,13 @@ class SetupMenu(GlobalMenu):
self._menu_options['mode'] = menu.Selector( self._menu_options['mode'] = menu.Selector(
'Execution mode', 'Execution mode',
lambda x : select_mode(), lambda x: select_mode(),
display_func=lambda x: x.value if x else '', display_func=lambda x: x.value if x else '',
default=ExecutionMode.Full) default=ExecutionMode.Full)
self._menu_options['continue'] = menu.Selector( self._menu_options['continue'] = menu.Selector(
'Continue', 'Continue',
exec_func=lambda n,v: True) exec_func=lambda n, v: True)
self.enable('archinstall-language') self.enable('archinstall-language')
self.enable('ntp') self.enable('ntp')
@ -96,11 +96,11 @@ class SwissMainMenu(GlobalMenu):
mandatory_list = ['disk_config', 'bootloader', 'hostname'] mandatory_list = ['disk_config', 'bootloader', 'hostname']
case ExecutionMode.Only_HD: case ExecutionMode.Only_HD:
options_list = ['disk_config', 'disk_encryption','swap'] options_list = ['disk_config', 'disk_encryption', 'swap']
mandatory_list = ['disk_config'] mandatory_list = ['disk_config']
case ExecutionMode.Only_OS: case ExecutionMode.Only_OS:
options_list = [ options_list = [
'mirror_config','bootloader', 'hostname', 'mirror_config', 'bootloader', 'hostname',
'!root-password', '!users', 'profile_config', 'audio_config', 'kernels', '!root-password', '!users', 'profile_config', 'audio_config', 'kernels',
'packages', 'additional-repositories', 'network_config', 'timezone', 'ntp' 'packages', 'additional-repositories', 'network_config', 'timezone', 'ntp'
] ]