diff --git a/archinstall/__init__.py b/archinstall/__init__.py index f4793555..f31d8b3d 100644 --- a/archinstall/__init__.py +++ b/archinstall/__init__.py @@ -14,7 +14,21 @@ from .lib.luks import * from .lib.mirrors import * from .lib.networking import * from .lib.output import * -from .lib.packages import * +from .lib.models.dataclasses import ( + VersionDef, + PackageSearchResult, + PackageSearch, + LocalPackage +) +from .lib.packages.packages import ( + find_group, + package_search, + IsGroup, + find_package, + find_packages, + installed_package, + validate_package_list +) from .lib.profiles import * from .lib.services import * from .lib.storage import * @@ -26,7 +40,7 @@ from .lib.plugins import plugins, load_plugin # This initiates the plugin loadin parser = ArgumentParser() -__version__ = "2.3.1.dev0" +__version__ = "2.4.0-dev0" storage['__version__'] = __version__ diff --git a/archinstall/__main__.py b/archinstall/__main__.py index c8a4779b..e125930f 100644 --- a/archinstall/__main__.py +++ b/archinstall/__main__.py @@ -1,4 +1,15 @@ -import archinstall +import importlib +import sys +import pathlib + +# Load .git version before the builtin version +if pathlib.Path('./archinstall/__init__.py').absolute().exists(): + spec = importlib.util.spec_from_file_location("archinstall", "./archinstall/__init__.py") + archinstall = importlib.util.module_from_spec(spec) + sys.modules["archinstall"] = archinstall + spec.loader.exec_module(sys.modules["archinstall"]) +else: + import archinstall if __name__ == '__main__': archinstall.run_as_a_module() diff --git a/archinstall/lib/disk/blockdevice.py b/archinstall/lib/disk/blockdevice.py index 5ffa06a8..fac258ef 100644 --- a/archinstall/lib/disk/blockdevice.py +++ b/archinstall/lib/disk/blockdevice.py @@ -8,7 +8,7 @@ from typing import Optional, Dict, Any, Iterator, Tuple, List, TYPE_CHECKING if TYPE_CHECKING: from .partition import Partition -from ..exceptions import DiskError +from ..exceptions import DiskError, SysCallError from ..output import log from ..general import SysCommand from ..storage import storage @@ -189,10 +189,13 @@ class BlockDevice: # that is "outside" the disk. in /dev/sr0 this is usually the case with Archiso, # so the free will ignore the ESP partition and just give the "free" space. # Doesn't harm us, but worth noting in case something weird happens. - for line in SysCommand(f"parted -s --machine {self.path} print free"): - if 'free' in (free_space := line.decode('UTF-8')): - _, start, end, size, *_ = free_space.strip('\r\n;').split(':') - yield (start, end, size) + try: + for line in SysCommand(f"parted -s --machine {self.path} print free"): + if 'free' in (free_space := line.decode('UTF-8')): + _, start, end, size, *_ = free_space.strip('\r\n;').split(':') + yield (start, end, size) + except SysCallError as error: + log(f"Could not get free space on {self.path}: {error}", level=logging.INFO) @property def largest_free_space(self) -> List[str]: diff --git a/archinstall/lib/disk/user_guides.py b/archinstall/lib/disk/user_guides.py index 25db14ea..8acb8cd2 100644 --- a/archinstall/lib/disk/user_guides.py +++ b/archinstall/lib/disk/user_guides.py @@ -1,6 +1,7 @@ from __future__ import annotations import logging from typing import Optional, Dict, Any, List, TYPE_CHECKING + # https://stackoverflow.com/a/39757388/929999 if TYPE_CHECKING: from .blockdevice import BlockDevice @@ -8,6 +9,7 @@ if TYPE_CHECKING: from .helpers import sort_block_devices_based_on_performance, select_largest_device, select_disk_larger_than_or_close_to from ..hardware import has_uefi from ..output import log +from ..menu import Menu def suggest_single_disk_layout(block_device :BlockDevice, default_filesystem :Optional[str] = None, @@ -22,7 +24,9 @@ def suggest_single_disk_layout(block_device :BlockDevice, using_home_partition = False if default_filesystem == 'btrfs': - using_subvolumes = input('Would you like to use BTRFS subvolumes with a default structure? (Y/n): ').strip().lower() in ('', 'y', 'yes') + prompt = 'Would you like to use BTRFS subvolumes with a default structure?' + choice = Menu(prompt, ['yes', 'no'], skip=False, default_option='yes').run() + using_subvolumes = choice == 'yes' layout = { block_device.path : { @@ -76,7 +80,9 @@ def suggest_single_disk_layout(block_device :BlockDevice, layout[block_device.path]['partitions'][-1]['start'] = '513MiB' if not using_subvolumes and block_device.size >= MIN_SIZE_TO_ALLOW_HOME_PART: - using_home_partition = input('Would you like to create a separate partition for /home? (Y/n): ').strip().lower() in ('', 'y', 'yes') + prompt = 'Would you like to create a separate partition for /home?' + choice = Menu(prompt, ['yes', 'no'], skip=False, default_option='yes').run() + using_home_partition = choice == 'yes' # Set a size for / (/root) if using_subvolumes or block_device.size < MIN_SIZE_TO_ALLOW_HOME_PART or not using_home_partition: diff --git a/archinstall/lib/exceptions.py b/archinstall/lib/exceptions.py index 783bc9c5..b89d1fcb 100644 --- a/archinstall/lib/exceptions.py +++ b/archinstall/lib/exceptions.py @@ -41,3 +41,6 @@ class UserError(BaseException): class ServiceException(BaseException): pass + +class PackageError(BaseException): + pass \ No newline at end of file diff --git a/archinstall/lib/general.py b/archinstall/lib/general.py index a3976234..a5444801 100644 --- a/archinstall/lib/general.py +++ b/archinstall/lib/general.py @@ -9,6 +9,7 @@ import subprocess import string import sys import time +import re from datetime import datetime, date from typing import Callable, Optional, Dict, Any, List, Union, Iterator, TYPE_CHECKING # https://stackoverflow.com/a/39757388/929999 @@ -81,6 +82,18 @@ def locate_binary(name :str) -> str: raise RequirementError(f"Binary {name} does not exist.") +def clear_vt100_escape_codes(data :Union[bytes, str]): + # https://stackoverflow.com/a/43627833/929999 + if type(data) == bytes: + vt100_escape_regex = bytes(r'\x1B\[[?0-9;]*[a-zA-Z]', 'UTF-8') + else: + vt100_escape_regex = r'\x1B\[[?0-9;]*[a-zA-Z]' + + for match in re.findall(vt100_escape_regex, data, re.IGNORECASE): + data = data.replace(match, '' if type(data) == str else b'') + + return data + def json_dumps(*args :str, **kwargs :str) -> str: return json.dumps(*args, **{**kwargs, 'cls': JSON}) @@ -168,7 +181,8 @@ class SysCommandWorker: peak_output :Optional[bool] = False, environment_vars :Optional[Dict[str, Any]] = None, logfile :Optional[None] = None, - working_directory :Optional[str] = './'): + working_directory :Optional[str] = './', + remove_vt100_escape_codes_from_lines :bool = True): if not callbacks: callbacks = {} @@ -200,6 +214,7 @@ class SysCommandWorker: self.child_fd :Optional[int] = None self.started :Optional[float] = None self.ended :Optional[float] = None + self.remove_vt100_escape_codes_from_lines :bool = remove_vt100_escape_codes_from_lines def __contains__(self, key: bytes) -> bool: """ @@ -216,6 +231,9 @@ class SysCommandWorker: def __iter__(self, *args :str, **kwargs :Dict[str, Any]) -> Iterator[bytes]: for line in self._trace_log[self._trace_log_pos:self._trace_log.rfind(b'\n')].split(b'\n'): if line: + if self.remove_vt100_escape_codes_from_lines: + line = clear_vt100_escape_codes(line) + yield line + b'\n' self._trace_log_pos = self._trace_log.rfind(b'\n') @@ -368,7 +386,8 @@ class SysCommand: start_callback :Optional[Callable[[Any], Any]] = None, peak_output :Optional[bool] = False, environment_vars :Optional[Dict[str, Any]] = None, - working_directory :Optional[str] = './'): + working_directory :Optional[str] = './', + remove_vt100_escape_codes_from_lines :bool = True): _callbacks = {} if callbacks: @@ -382,6 +401,7 @@ class SysCommand: self.peak_output = peak_output self.environment_vars = environment_vars self.working_directory = working_directory + self.remove_vt100_escape_codes_from_lines = remove_vt100_escape_codes_from_lines self.session :Optional[SysCommandWorker] = None self.create_session() @@ -435,7 +455,7 @@ class SysCommand: if self.session: return self.session - with SysCommandWorker(self.cmd, callbacks=self._callbacks, peak_output=self.peak_output, environment_vars=self.environment_vars) as session: + with SysCommandWorker(self.cmd, callbacks=self._callbacks, peak_output=self.peak_output, environment_vars=self.environment_vars, remove_vt100_escape_codes_from_lines=self.remove_vt100_escape_codes_from_lines) as session: if not self.session: self.session = session diff --git a/archinstall/lib/hardware.py b/archinstall/lib/hardware.py index cf99a530..ea570707 100644 --- a/archinstall/lib/hardware.py +++ b/archinstall/lib/hardware.py @@ -1,10 +1,13 @@ import os +import logging from functools import partial from pathlib import Path from typing import Iterator, Optional, Union from .general import SysCommand from .networking import list_interfaces, enrich_iface_types +from .exceptions import SysCallError +from .output import log __packages__ = [ "mesa", @@ -168,10 +171,19 @@ def mem_total() -> Optional[int]: def virtualization() -> Optional[str]: - return str(SysCommand("systemd-detect-virt")).strip('\r\n') + try: + return str(SysCommand("systemd-detect-virt")).strip('\r\n') + except SysCallError as error: + log(f"Could not detect virtual system: {error}", level=logging.DEBUG) + + return None def is_vm() -> bool: - return b"none" not in b"".join(SysCommand("systemd-detect-virt")).lower() + try: + return b"none" not in b"".join(SysCommand("systemd-detect-virt")).lower() + except SysCallError as error: + log(f"System is not running in a VM: {error}", level=logging.DEBUG) + return None # TODO: Add more identifiers diff --git a/archinstall/lib/menu/selection_menu.py b/archinstall/lib/menu/selection_menu.py index 8128fefc..940941be 100644 --- a/archinstall/lib/menu/selection_menu.py +++ b/archinstall/lib/menu/selection_menu.py @@ -1,8 +1,32 @@ import sys -import archinstall -from archinstall import Menu - +from .menu import Menu +from ..general import SysCommand +from ..storage import storage +from ..output import log +from ..profiles import is_desktop_profile +from ..disk import encrypted_partitions +from ..locale_helpers import set_keyboard_language +from ..user_interaction import get_password +from ..user_interaction import ask_ntp +from ..user_interaction import ask_for_swap +from ..user_interaction import ask_for_bootloader +from ..user_interaction import ask_hostname +from ..user_interaction import ask_for_audio_selection +from ..user_interaction import ask_additional_packages_to_install +from ..user_interaction import ask_to_configure_network +from ..user_interaction import ask_for_a_timezone +from ..user_interaction import ask_for_superuser_account +from ..user_interaction import ask_for_additional_users +from ..user_interaction import select_language +from ..user_interaction import select_mirror_regions +from ..user_interaction import select_locale_lang +from ..user_interaction import select_locale_enc +from ..user_interaction import select_disk_layout +from ..user_interaction import select_kernel +from ..user_interaction import select_encrypted_partitions +from ..user_interaction import select_harddrives +from ..user_interaction import select_profile class Selector: def __init__( @@ -109,17 +133,17 @@ class GlobalMenu: def _setup_selection_menu_options(self): self._menu_options['keyboard-layout'] = \ - Selector('Select keyboard layout', lambda: archinstall.select_language('us'), default='us') + Selector('Select keyboard layout', lambda: select_language('us'), default='us') self._menu_options['mirror-region'] = \ Selector( 'Select mirror region', - lambda: archinstall.select_mirror_regions(), + lambda: select_mirror_regions(), display_func=lambda x: list(x.keys()) if x else '[]', default={}) self._menu_options['sys-language'] = \ - Selector('Select locale language', lambda: archinstall.select_locale_lang('en_US'), default='en_US') + Selector('Select locale language', lambda: select_locale_lang('en_US'), default='en_US') self._menu_options['sys-encoding'] = \ - Selector('Select locale encoding', lambda: archinstall.select_locale_enc('utf-8'), default='utf-8') + Selector('Select locale encoding', lambda: select_locale_enc('utf-8'), default='utf-8') self._menu_options['harddrives'] = \ Selector( 'Select harddrives', @@ -127,28 +151,28 @@ class GlobalMenu: self._menu_options['disk_layouts'] = \ Selector( 'Select disk layout', - lambda: archinstall.select_disk_layout( - archinstall.arguments['harddrives'], - archinstall.arguments.get('advanced', False) + lambda: select_disk_layout( + storage['arguments'].get('harddrives', []), + storage['arguments'].get('advanced', False) ), dependencies=['harddrives']) self._menu_options['!encryption-password'] = \ Selector( 'Set encryption password', - lambda: archinstall.get_password(prompt='Enter disk encryption password (leave blank for no encryption): '), + lambda: get_password(prompt='Enter disk encryption password (leave blank for no encryption): '), display_func=lambda x: self._secret(x) if x else 'None', dependencies=['harddrives']) self._menu_options['swap'] = \ Selector( 'Use swap', - lambda: archinstall.ask_for_swap(), + lambda: ask_for_swap(), default=True) self._menu_options['bootloader'] = \ Selector( 'Select bootloader', - lambda: archinstall.ask_for_bootloader(archinstall.arguments.get('advanced', False)),) + lambda: ask_for_bootloader(storage['arguments'].get('advanced', False)),) self._menu_options['hostname'] = \ - Selector('Specify hostname', lambda: archinstall.ask_hostname()) + Selector('Specify hostname', lambda: ask_hostname()) self._menu_options['!root-password'] = \ Selector( 'Set root password', @@ -174,29 +198,29 @@ class GlobalMenu: self._menu_options['audio'] = \ Selector( 'Select audio', - lambda: archinstall.ask_for_audio_selection(archinstall.is_desktop_profile(archinstall.arguments.get('profile', None)))) + lambda: ask_for_audio_selection(is_desktop_profile(storage['arguments'].get('profile', None)))) self._menu_options['kernels'] = \ Selector( 'Select kernels', - lambda: archinstall.select_kernel(), + lambda: select_kernel(), default=['linux']) self._menu_options['packages'] = \ Selector( 'Additional packages to install', - lambda: archinstall.ask_additional_packages_to_install(archinstall.arguments.get('packages', None)), + lambda: ask_additional_packages_to_install(storage['arguments'].get('packages', None)), default=[]) self._menu_options['nic'] = \ Selector( 'Configure network', - lambda: archinstall.ask_to_configure_network(), + lambda: ask_to_configure_network(), display_func=lambda x: x if x else 'Not configured, unavailable unless setup manually', default={}) self._menu_options['timezone'] = \ - Selector('Select timezone', lambda: archinstall.ask_for_a_timezone()) + Selector('Select timezone', lambda: ask_for_a_timezone()) self._menu_options['ntp'] = \ Selector( 'Set automatic time sync (NTP)', - lambda: archinstall.ask_ntp(), + lambda: self._select_ntp(), default=True) self._menu_options['install'] = \ Selector( @@ -205,7 +229,7 @@ class GlobalMenu: self._menu_options['abort'] = Selector('Abort', enabled=True) def enable(self, selector_name, omit_if_set=False): - arg = archinstall.arguments.get(selector_name, None) + arg = storage['arguments'].get(selector_name, None) # don't display the menu option if it was defined already if arg is not None and omit_if_set: @@ -239,8 +263,8 @@ class GlobalMenu: self._process_selection(selection) for key in self._menu_options: sel = self._menu_options[key] - if key not in archinstall.arguments: - archinstall.arguments[key] = sel._current_selection + if key not in storage['arguments']: + storage['arguments'][key] = sel._current_selection self._post_processing() def _process_selection(self, selection): @@ -254,7 +278,7 @@ class GlobalMenu: selector = option[0][1] result = selector.func() self._menu_options[selector_name].set_current_selection(result) - archinstall.arguments[selector_name] = result + storage['arguments'][selector_name] = result self._update_install() @@ -263,12 +287,12 @@ class GlobalMenu: self._menu_options.get('install').update_description(text) def _post_processing(self): - if archinstall.arguments.get('harddrives', None) and archinstall.arguments.get('!encryption-password', None): + if storage['arguments'].get('harddrives', None) and storage['arguments'].get('!encryption-password', None): # If no partitions was marked as encrypted, but a password was supplied and we have some disks to format.. # Then we need to identify which partitions to encrypt. This will default to / (root). - if len(list(archinstall.encrypted_partitions(archinstall.arguments['disk_layouts']))) == 0: - archinstall.arguments['disk_layouts'] = archinstall.select_encrypted_partitions( - archinstall.arguments['disk_layouts'], archinstall.arguments['!encryption-password']) + if len(list(encrypted_partitions(storage['arguments'].get('disk_layouts', [])))) == 0: + storage['arguments']['disk_layouts'] = select_encrypted_partitions( + storage['arguments']['disk_layouts'], storage['arguments']['!encryption-password']) def _install_text(self): missing = self._missing_configs() @@ -301,27 +325,37 @@ class GlobalMenu: def _set_root_password(self): prompt = 'Enter root password (leave blank to disable root & create superuser): ' - password = archinstall.get_password(prompt=prompt) + password = get_password(prompt=prompt) + # TODO: Do we really wanna wipe the !superusers and !users if root password is set? + # What if they set a superuser first, but then decides to set a root password? if password is not None: self._menu_options.get('!superusers').set_current_selection(None) - archinstall.arguments['!users'] = {} - archinstall.arguments['!superusers'] = {} + storage['arguments']['!users'] = {} + storage['arguments']['!superusers'] = {} return password + def _select_ntp(self) -> bool: + ntp = ask_ntp() + + value = str(ntp).lower() + SysCommand(f'timedatectl set-ntp {value}') + + return ntp + def _select_harddrives(self): - old_haddrives = archinstall.arguments.get('harddrives') - harddrives = archinstall.select_harddrives() + old_haddrives = storage['arguments'].get('harddrives', []) + harddrives = select_harddrives() # in case the harddrives got changed we have to reset the disk layout as well if old_haddrives != harddrives: self._menu_options.get('disk_layouts').set_current_selection(None) - archinstall.arguments['disk_layouts'] = {} + storage['arguments']['disk_layouts'] = {} if not harddrives: prompt = 'You decided to skip harddrive selection\n' - prompt += f"and will use whatever drive-setup is mounted at {archinstall.storage['MOUNT_POINT']} (experimental)\n" + prompt += f"and will use whatever drive-setup is mounted at {storage['MOUNT_POINT']} (experimental)\n" prompt += "WARNING: Archinstall won't check the suitability of this setup\n" prompt += 'Do you wish to continue?' @@ -336,36 +370,33 @@ class GlobalMenu: return '*' * len(x) def _select_profile(self): - profile = archinstall.select_profile() + profile = select_profile() # Check the potentially selected profiles preparations to get early checks if some additional questions are needed. if profile and profile.has_prep_function(): namespace = f'{profile.namespace}.py' with profile.load_instructions(namespace=namespace) as imported: if not imported._prep_function(): - archinstall.log(' * Profile\'s preparation requirements was not fulfilled.', fg='red') + log(' * Profile\'s preparation requirements was not fulfilled.', fg='red') exit(1) return profile def _create_superuser_account(self): - superuser = archinstall.ask_for_superuser_account('Create a required super-user with sudo privileges: ', forced=True) + superuser = ask_for_superuser_account('Create a required super-user with sudo privileges: ', forced=True) return superuser def _create_user_account(self): - users, superusers = archinstall.ask_for_additional_users('Enter a username to create an additional user: ') - if not archinstall.arguments.get('!superusers', None): - archinstall.arguments['!superusers'] = superusers - else: - archinstall.arguments['!superusers'] = {**archinstall.arguments['!superusers'], **superusers} + users, superusers = ask_for_additional_users('Enter a username to create an additional user: ') + storage['arguments']['!superusers'] = {**storage['arguments'].get('!superusers', {}), **superusers} return users def _set_kb_language(self): # Before continuing, set the preferred keyboard layout/language in the current terminal. # This will just help the user with the next following questions. - if archinstall.arguments.get('keyboard-layout', None) and len(archinstall.arguments['keyboard-layout']): - archinstall.set_keyboard_language(archinstall.arguments['keyboard-layout']) + if len(storage['arguments'].get('keyboard-layout', [])): + set_keyboard_language(storage['arguments']['keyboard-layout']) def _verify_selection_enabled(self, selection_name): if selection := self._menu_options.get(selection_name, None): diff --git a/archinstall/lib/models/__init__.py b/archinstall/lib/models/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/archinstall/lib/models/dataclasses.py b/archinstall/lib/models/dataclasses.py new file mode 100644 index 00000000..99221fe3 --- /dev/null +++ b/archinstall/lib/models/dataclasses.py @@ -0,0 +1,136 @@ +from dataclasses import dataclass +from typing import Optional, List + +@dataclass +class VersionDef: + version_string: str + + @classmethod + def parse_version(self) -> List[str]: + if '.' in self.version_string: + versions = self.version_string.split('.') + else: + versions = [self.version_string] + + return versions + + @classmethod + def major(self) -> str: + return self.parse_version()[0] + + @classmethod + def minor(self) -> str: + versions = self.parse_version() + if len(versions) >= 2: + return versions[1] + + @classmethod + def patch(self) -> str: + versions = self.parse_version() + if '-' in versions[-1]: + _, patch_version = versions[-1].split('-', 1) + return patch_version + + def __eq__(self, other :'VersionDef') -> bool: + if other.major == self.major and \ + other.minor == self.minor and \ + other.patch == self.patch: + + return True + return False + + def __lt__(self, other :'VersionDef') -> bool: + if self.major > other.major: + return False + elif self.minor and other.minor and self.minor > other.minor: + return False + elif self.patch and other.patch and self.patch > other.patch: + return False + + def __str__(self) -> str: + return self.version_string + +@dataclass +class PackageSearchResult: + pkgname: str + pkgbase: str + repo: str + arch: str + pkgver: str + pkgrel: str + epoch: int + pkgdesc: str + url: str + filename: str + compressed_size: int + installed_size: int + build_date: str + last_update: str + flag_date: Optional[str] + maintainers: List[str] + packager: str + groups: List[str] + licenses: List[str] + conflicts: List[str] + provides: List[str] + replaces: List[str] + depends: List[str] + optdepends: List[str] + makedepends: List[str] + checkdepends: List[str] + + @property + def pkg_version(self) -> str: + return self.pkgver + + def __eq__(self, other :'VersionDef') -> bool: + return self.pkg_version == other.pkg_version + + def __lt__(self, other :'VersionDef') -> bool: + return self.pkg_version < other.pkg_version + +@dataclass +class PackageSearch: + version: int + limit: int + valid: bool + num_pages: int + page: int + results: List[PackageSearchResult] + + def __post_init__(self): + self.results = [PackageSearchResult(**x) for x in self.results] + +@dataclass +class LocalPackage: + name: str + version: str + description:str + architecture: str + url: str + licenses: str + groups: str + depends_on: str + optional_deps: str + required_by: str + optional_for: str + conflicts_with: str + replaces: str + installed_size: str + packager: str + build_date: str + install_date: str + install_reason: str + install_script: str + validated_by: str + provides: str + + @property + def pkg_version(self) -> str: + return self.version + + def __eq__(self, other :'VersionDef') -> bool: + return self.pkg_version == other.pkg_version + + def __lt__(self, other :'VersionDef') -> bool: + return self.pkg_version < other.pkg_version \ No newline at end of file diff --git a/archinstall/lib/models/pydantic.py b/archinstall/lib/models/pydantic.py new file mode 100644 index 00000000..799e92af --- /dev/null +++ b/archinstall/lib/models/pydantic.py @@ -0,0 +1,134 @@ +from typing import Optional, List +from pydantic import BaseModel + +""" +This python file is not in use. +Pydantic is not a builtin, and we use the dataclasses.py instead! +""" + +class VersionDef(BaseModel): + version_string: str + + @classmethod + def parse_version(self) -> List[str]: + if '.' in self.version_string: + versions = self.version_string.split('.') + else: + versions = [self.version_string] + + return versions + + @classmethod + def major(self) -> str: + return self.parse_version()[0] + + @classmethod + def minor(self) -> str: + versions = self.parse_version() + if len(versions) >= 2: + return versions[1] + + @classmethod + def patch(self) -> str: + versions = self.parse_version() + if '-' in versions[-1]: + _, patch_version = versions[-1].split('-', 1) + return patch_version + + def __eq__(self, other :'VersionDef') -> bool: + if other.major == self.major and \ + other.minor == self.minor and \ + other.patch == self.patch: + + return True + return False + + def __lt__(self, other :'VersionDef') -> bool: + if self.major > other.major: + return False + elif self.minor and other.minor and self.minor > other.minor: + return False + elif self.patch and other.patch and self.patch > other.patch: + return False + + def __str__(self) -> str: + return self.version_string + + +class PackageSearchResult(BaseModel): + pkgname: str + pkgbase: str + repo: str + arch: str + pkgver: str + pkgrel: str + epoch: int + pkgdesc: str + url: str + filename: str + compressed_size: int + installed_size: int + build_date: str + last_update: str + flag_date: Optional[str] + maintainers: List[str] + packager: str + groups: List[str] + licenses: List[str] + conflicts: List[str] + provides: List[str] + replaces: List[str] + depends: List[str] + optdepends: List[str] + makedepends: List[str] + checkdepends: List[str] + + @property + def pkg_version(self) -> str: + return self.pkgver + + def __eq__(self, other :'VersionDef') -> bool: + return self.pkg_version == other.pkg_version + + def __lt__(self, other :'VersionDef') -> bool: + return self.pkg_version < other.pkg_version + + +class PackageSearch(BaseModel): + version: int + limit: int + valid: bool + results: List[PackageSearchResult] + + +class LocalPackage(BaseModel): + name: str + version: str + description:str + architecture: str + url: str + licenses: str + groups: str + depends_on: str + optional_deps: str + required_by: str + optional_for: str + conflicts_with: str + replaces: str + installed_size: str + packager: str + build_date: str + install_date: str + install_reason: str + install_script: str + validated_by: str + + @property + def pkg_version(self) -> str: + return self.version + + def __eq__(self, other :'VersionDef') -> bool: + return self.pkg_version == other.pkg_version + + def __lt__(self, other :'VersionDef') -> bool: + return self.pkg_version < other.pkg_version \ No newline at end of file diff --git a/archinstall/lib/output.py b/archinstall/lib/output.py index 3ef1e234..ffe00370 100644 --- a/archinstall/lib/output.py +++ b/archinstall/lib/output.py @@ -106,4 +106,4 @@ def log(*args :str, **kwargs :Union[str, int, Dict[str, Union[str, int]]]) -> No # We use sys.stdout.write()+flush() instead of print() to try and # fix issue #94 sys.stdout.write(f"{string}\n") - sys.stdout.flush() \ No newline at end of file + sys.stdout.flush() diff --git a/archinstall/lib/packages.py b/archinstall/lib/packages.py deleted file mode 100644 index 1d46ef5e..00000000 --- a/archinstall/lib/packages.py +++ /dev/null @@ -1,66 +0,0 @@ -import json -import ssl -import urllib.error -import urllib.parse -import urllib.request -from typing import Dict, Any - -from .exceptions import RequirementError - -BASE_URL = 'https://archlinux.org/packages/search/json/?name={package}' -BASE_GROUP_URL = 'https://archlinux.org/groups/x86_64/{group}/' - - -def find_group(name :str) -> bool: - ssl_context = ssl.create_default_context() - ssl_context.check_hostname = False - ssl_context.verify_mode = ssl.CERT_NONE - try: - response = urllib.request.urlopen(BASE_GROUP_URL.format(group=name), context=ssl_context) - except urllib.error.HTTPError as err: - if err.code == 404: - return False - else: - raise err - - # Just to be sure some code didn't slip through the exception - if response.code == 200: - return True - - -def find_package(name :str) -> Any: - """ - Finds a specific package via the package database. - It makes a simple web-request, which might be a bit slow. - """ - ssl_context = ssl.create_default_context() - ssl_context.check_hostname = False - ssl_context.verify_mode = ssl.CERT_NONE - response = urllib.request.urlopen(BASE_URL.format(package=name), context=ssl_context) - data = response.read().decode('UTF-8') - return json.loads(data) - - -def find_packages(*names :str) -> Dict[str, Any]: - """ - This function returns the search results for many packages. - The function itself is rather slow, so consider not sending to - many packages to the search query. - """ - return {package: find_package(package) for package in names} - - -def validate_package_list(packages: list) -> bool: - """ - Validates a list of given packages. - Raises `RequirementError` if one or more packages are not found. - """ - invalid_packages = [ - package - for package in packages - if not find_package(package)['results'] and not find_group(package) - ] - if invalid_packages: - raise RequirementError(f"Invalid package names: {invalid_packages}") - - return True diff --git a/archinstall/lib/packages/__init__.py b/archinstall/lib/packages/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/archinstall/lib/packages/packages.py b/archinstall/lib/packages/packages.py new file mode 100644 index 00000000..5b94aa59 --- /dev/null +++ b/archinstall/lib/packages/packages.py @@ -0,0 +1,109 @@ +import ssl +import urllib.request +import json +from typing import Dict, Any +from ..general import SysCommand +from ..models.dataclasses import PackageSearch, PackageSearchResult, LocalPackage +from ..exceptions import PackageError, SysCallError, RequirementError + +BASE_URL_PKG_SEARCH = 'https://archlinux.org/packages/search/json/?name={package}' +# BASE_URL_PKG_CONTENT = 'https://archlinux.org/packages/search/json/' +BASE_GROUP_URL = 'https://archlinux.org/groups/x86_64/{group}/' + + +def find_group(name :str) -> bool: + # TODO UPSTREAM: Implement /json/ for the groups search + ssl_context = ssl.create_default_context() + ssl_context.check_hostname = False + ssl_context.verify_mode = ssl.CERT_NONE + try: + response = urllib.request.urlopen(BASE_GROUP_URL.format(group=name), context=ssl_context) + except urllib.error.HTTPError as err: + if err.code == 404: + return False + else: + raise err + + # Just to be sure some code didn't slip through the exception + if response.code == 200: + return True + + return False + +def package_search(package :str) -> PackageSearch: + """ + Finds a specific package via the package database. + It makes a simple web-request, which might be a bit slow. + """ + # TODO UPSTREAM: Implement bulk search, either support name=X&name=Y or split on space (%20 or ' ') + # TODO: utilize pacman cache first, upstream second. + ssl_context = ssl.create_default_context() + ssl_context.check_hostname = False + ssl_context.verify_mode = ssl.CERT_NONE + response = urllib.request.urlopen(BASE_URL_PKG_SEARCH.format(package=package), context=ssl_context) + + if response.code != 200: + raise PackageError(f"Could not locate package: [{response.code}] {response}") + + data = response.read().decode('UTF-8') + + return PackageSearch(**json.loads(data)) + +class IsGroup(BaseException): + pass + +def find_package(package :str) -> PackageSearchResult: + data = package_search(package) + + if not data.results: + # Check if the package is actually a group + if find_group(package): + # TODO: Until upstream adds a JSON result for group searches + # there is no way we're going to parse HTML reliably. + raise IsGroup("Implement group search") + + raise PackageError(f"Could not locate {package} while looking for repository category") + + # If we didn't find the package in the search results, + # odds are it's a group package + for result in data.results: + if result.pkgname == package: + return result + + raise PackageError(f"Could not locate {package} in result while looking for repository category") + +def find_packages(*names :str) -> Dict[str, Any]: + """ + This function returns the search results for many packages. + The function itself is rather slow, so consider not sending to + many packages to the search query. + """ + return {package: find_package(package) for package in names} + + +def validate_package_list(packages: list) -> bool: + """ + Validates a list of given packages. + Raises `RequirementError` if one or more packages are not found. + """ + invalid_packages = [ + package + for package in packages + if not find_package(package)['results'] and not find_group(package) + ] + if invalid_packages: + raise RequirementError(f"Invalid package names: {invalid_packages}") + + return True + +def installed_package(package :str) -> LocalPackage: + package_info = {} + try: + for line in SysCommand(f"pacman -Q --info {package}"): + if b':' in line: + key, value = line.decode().split(':', 1) + package_info[key.strip().lower().replace(' ', '_')] = value.strip() + except SysCallError: + pass + + return LocalPackage(**package_info) \ No newline at end of file diff --git a/archinstall/lib/user_interaction.py b/archinstall/lib/user_interaction.py index d5cd9257..8cc7de0a 100644 --- a/archinstall/lib/user_interaction.py +++ b/archinstall/lib/user_interaction.py @@ -28,7 +28,8 @@ from .mirrors import list_mirrors # TODO: Some inconsistencies between the selection processes. # Some return the keys from the options, some the values? -from .. import fs_types, validate_package_list +from .disk.validators import fs_types +from .packages.packages import validate_package_list # TODO: These can be removed after the move to simple_menu.py def get_terminal_height() -> int: @@ -78,8 +79,9 @@ def do_countdown() -> bool: print(".", end='') if SIG_TRIGGER: - abort = input('\nDo you really want to abort (y/n)? ') - if abort.strip() != 'n': + prompt = 'Do you really want to abort' + choice = Menu(prompt, ['yes', 'no'], skip=False).run() + if choice == 'yes': exit(0) if SIG_TRIGGER is False: @@ -270,7 +272,7 @@ def ask_for_swap(prompt='Would you like to use swap on zram?', forced=False): return False if choice == 'no' else True -def ask_ntp(): +def ask_ntp() -> bool: prompt = 'Would you like to use automatic time synchronization (NTP) with the default time servers?' prompt += 'Hardware time and other post-configuration steps might be required in order for NTP to work. For more information, please check the Arch wiki' choice = Menu(prompt, ['yes', 'no'], skip=False, default_option='yes').run() @@ -858,7 +860,7 @@ def select_harddrives() -> Optional[str]: return [] -def select_driver(options :Dict[str, Any] = AVAILABLE_GFX_DRIVERS) -> str: +def select_driver(options :Dict[str, Any] = AVAILABLE_GFX_DRIVERS, force_ask :bool = False) -> str: """ Some what convoluted function, whose job is simple. Select a graphics driver from a pre-defined set of popular options. @@ -880,7 +882,7 @@ def select_driver(options :Dict[str, Any] = AVAILABLE_GFX_DRIVERS) -> str: if has_nvidia_graphics(): title += 'For the best compatibility with your Nvidia hardware, you may want to use the Nvidia proprietary driver.\n' - if not arguments.get('gfx_driver', None): + if not arguments.get('gfx_driver', None) or force_ask: title += '\n\nSelect a graphics driver or leave blank to install all open-source drivers' arguments['gfx_driver'] = Menu(title, drivers).run() diff --git a/examples/custom-command-sample.json b/examples/custom-command-sample.json index 980541de..9c39e15f 100644 --- a/examples/custom-command-sample.json +++ b/examples/custom-command-sample.json @@ -1,37 +1,33 @@ { - "audio": "pipewire", + "dry_run": true, + "audio": "none", "bootloader": "systemd-bootctl", - "custom-commands": [ - "cd /home/devel; git clone https://aur.archlinux.org/paru.git", - "chown -R devel:devel /home/devel/paru", - "usermod -aG docker devel" + "debug": false, + "harddrives": [ + "/dev/loop0" ], - "!encryption-password": "supersecret", - "filesystem": "btrfs", - "gfx_driver": "All open-source (default)", - "harddrive": { - "path": "/dev/nvme0n1" - }, "hostname": "development-box", "kernels": [ "linux" ], - "keyboard-language": "us", + "keyboard-layout": "us", "mirror-region": "Worldwide", "nic": { - "NetworkManager": true + "NetworkManager": true }, "ntp": true, "packages": ["docker", "git", "wget", "zsh"], - "profile": "gnome", "services": ["docker"], - "superusers": { - "devel": { - "!password": "devel" - } - }, + "profile": "gnome", + "gfx_driver": "All open-source (default)", + "swap": true, "sys-encoding": "utf-8", "sys-language": "en_US", - "timezone": "US/Eastern", - "users": {} + "timezone": "Europe/Stockholm", + "version": "2.3.1.dev0", + "custom-commands": [ + "cd /home/devel; git clone https://aur.archlinux.org/paru.git", + "chown -R devel:devel /home/devel/paru", + "usermod -aG docker devel" + ] } diff --git a/examples/guided.py b/examples/guided.py index 22e0f883..9cc39c86 100644 --- a/examples/guided.py +++ b/examples/guided.py @@ -58,15 +58,16 @@ def ask_user_questions(): will we continue with the actual installation steps. """ + # ref: https://github.com/archlinux/archinstall/pull/831 + # we'll set NTP to true by default since this is also + # the default value specified in the menu options; in + # case it will be changed by the user we'll also update + # the system immediately + archinstall.SysCommand('timedatectl set-ntp true') + global_menu = archinstall.GlobalMenu() global_menu.enable('keyboard-layout') - if not archinstall.arguments.get('ntp', False): - archinstall.arguments['ntp'] = input("Would you like to use automatic time synchronization (NTP) with the default time servers? [Y/n]: ").strip().lower() in ('y', 'yes', '') - if archinstall.arguments['ntp']: - archinstall.log("Hardware time and other post-configuration steps might be required in order for NTP to work. For more information, please check the Arch wiki.", fg="yellow") - archinstall.SysCommand('timedatectl set-ntp true') - # Set which region to download packages from during the installation global_menu.enable('mirror-region') @@ -293,8 +294,9 @@ def perform_installation(mountpoint): installation.log("For post-installation tips, see https://wiki.archlinux.org/index.php/Installation_guide#Post-installation", fg="yellow") if not archinstall.arguments.get('silent'): - choice = input("Would you like to chroot into the newly created installation and perform post-installation configuration? [Y/n] ") - if choice.lower() in ("y", ""): + prompt = 'Would you like to chroot into the newly created installation and perform post-installation configuration?' + choice = archinstall.Menu(prompt, ['yes', 'no'], default_option='yes').run() + if choice == 'yes': try: installation.drop_to_shell() except: @@ -309,10 +311,17 @@ if not (archinstall.check_mirror_reachable() or archinstall.arguments.get('skip- archinstall.log(f"Arch Linux mirrors are not reachable. Please check your internet connection and the log file '{log_file}'.", level=logging.INFO, fg="red") exit(1) -if not (archinstall.update_keyring() or archinstall.arguments.get('skip-keyring-update', False)): - log_file = os.path.join(archinstall.storage.get('LOG_PATH', None), archinstall.storage.get('LOG_FILE', None)) - archinstall.log(f"Failed to update the keyring. Please check your internet connection and the log file '{log_file}'.", level=logging.INFO, fg="red") - exit(1) +if not archinstall.arguments.get('offline', False): + # If we want to check for keyring updates + # and the installed package version is lower than the upstream version + if archinstall.arguments.get('skip-keyring-update', False) is False and \ + archinstall.installed_package('archlinux-keyring') < archinstall.find_package('archlinux-keyring'): + + # Then we update the keyring in the ISO environment + if not archinstall.update_keyring(): + log_file = os.path.join(archinstall.storage.get('LOG_PATH', None), archinstall.storage.get('LOG_FILE', None)) + archinstall.log(f"Failed to update the keyring. Please check your internet connection and the log file '{log_file}'.", level=logging.INFO, fg="red") + exit(1) load_config() if not archinstall.arguments.get('silent'): diff --git a/profiles/desktop.py b/profiles/desktop.py index 389544df..da7f97e5 100644 --- a/profiles/desktop.py +++ b/profiles/desktop.py @@ -33,6 +33,7 @@ __supported__ = [ 'mate', 'deepin', 'enlightenment', + 'qtile' ] diff --git a/profiles/kde.py b/profiles/kde.py index 06798593..9edbe325 100644 --- a/profiles/kde.py +++ b/profiles/kde.py @@ -7,7 +7,7 @@ is_top_level_profile = False __packages__ = [ "plasma-meta", "konsole", - "kate", + "kwrite", "dolphin", "ark", "sddm", diff --git a/profiles/qtile.py b/profiles/qtile.py new file mode 100644 index 00000000..ae1409a6 --- /dev/null +++ b/profiles/qtile.py @@ -0,0 +1,43 @@ +# A desktop environment using "qtile" window manager with common packages. + +import archinstall + +is_top_level_profile = False + +# New way of defining packages for a profile, which is iterable and can be used out side +# of the profile to get a list of "what packages will be installed". +__packages__ = [ + 'qtile', + 'alacritty', + 'lightdm-gtk-greeter', + 'lightdm', + 'dmenu' +] + +def _prep_function(*args, **kwargs): + """ + Magic function called by the importing installer + before continuing any further. It also avoids executing any + other code in this stage. So it's a safe way to ask the user + for more input before any other installer steps start. + """ + + # qtile optionally supports xorg, we'll install it since it also + # includes graphic driver setups (this might change in the future) + profile = archinstall.Profile(None, 'xorg') + with profile.load_instructions(namespace='xorg.py') as imported: + if hasattr(imported, '_prep_function'): + return imported._prep_function() + else: + print('Deprecated (??): xorg profile has no _prep_function() anymore') + + +if __name__ == 'qtile': + # Install dependency profiles + archinstall.storage['installation_session'].install_profile('xorg') + + # Install packages for qtile + archinstall.storage['installation_session'].add_additional_packages(__packages__) + + # Auto start lightdm for all users + archinstall.storage['installation_session'].enable_service('lightdm') # Light Display Manager diff --git a/profiles/sway.py b/profiles/sway.py index b0ff8c19..27905e33 100644 --- a/profiles/sway.py +++ b/profiles/sway.py @@ -1,5 +1,4 @@ # A desktop environment using "Sway" - import archinstall is_top_level_profile = False @@ -18,6 +17,16 @@ __packages__ = [ ] +def _check_driver() -> bool: + if "nvidia" in archinstall.storage.get("gfx_driver_packages", None): + prompt = 'The proprietary Nvidia driver is not supported by Sway. It is likely that you will run into issues, are you okay with that?' + choice = archinstall.Menu(prompt, ['yes', 'no'], default_option='no').run() + if choice == 'no': + return False + + return True + + def _prep_function(*args, **kwargs): """ Magic function called by the importing installer @@ -25,7 +34,9 @@ def _prep_function(*args, **kwargs): other code in this stage. So it's a safe way to ask the user for more input before any other installer steps start. """ - archinstall.storage["gfx_driver_packages"] = archinstall.select_driver() + archinstall.storage["gfx_driver_packages"] = archinstall.select_driver(force_ask=True) + if not _check_driver(): + return _prep_function(args, kwargs) return True @@ -34,10 +45,8 @@ def _prep_function(*args, **kwargs): # through importlib.util.spec_from_file_location("sway", "/somewhere/sway.py") # or through conventional import sway if __name__ == "sway": - if "nvidia" in archinstall.storage.get("gfx_driver_packages", None): - choice = input("The proprietary Nvidia driver is not supported by Sway. It is likely that you will run into issues. Continue anyways? [y/N] ") - if choice.lower() in ("n", ""): - raise archinstall.lib.exceptions.HardwareIncompatibilityError("Sway does not support the proprietary nvidia drivers.") + if not _check_driver(): + raise archinstall.lib.exceptions.HardwareIncompatibilityError("Sway does not support the proprietary nvidia drivers.") # Install the Sway packages archinstall.storage['installation_session'].add_additional_packages(__packages__)