Merge remote-tracking branch 'origin'

This commit is contained in:
Werner Llácer 2022-02-02 18:21:36 +01:00
commit 5280e01e88
22 changed files with 646 additions and 173 deletions

View File

@ -14,7 +14,21 @@ from .lib.luks import *
from .lib.mirrors import *
from .lib.networking import *
from .lib.output import *
from .lib.packages import *
from .lib.models.dataclasses import (
VersionDef,
PackageSearchResult,
PackageSearch,
LocalPackage
)
from .lib.packages.packages import (
find_group,
package_search,
IsGroup,
find_package,
find_packages,
installed_package,
validate_package_list
)
from .lib.profiles import *
from .lib.services import *
from .lib.storage import *
@ -26,7 +40,7 @@ from .lib.plugins import plugins, load_plugin # This initiates the plugin loadin
parser = ArgumentParser()
__version__ = "2.3.1.dev0"
__version__ = "2.4.0-dev0"
storage['__version__'] = __version__

View File

@ -1,4 +1,15 @@
import archinstall
import importlib
import sys
import pathlib
# Load .git version before the builtin version
if pathlib.Path('./archinstall/__init__.py').absolute().exists():
spec = importlib.util.spec_from_file_location("archinstall", "./archinstall/__init__.py")
archinstall = importlib.util.module_from_spec(spec)
sys.modules["archinstall"] = archinstall
spec.loader.exec_module(sys.modules["archinstall"])
else:
import archinstall
if __name__ == '__main__':
archinstall.run_as_a_module()

View File

@ -8,7 +8,7 @@ from typing import Optional, Dict, Any, Iterator, Tuple, List, TYPE_CHECKING
if TYPE_CHECKING:
from .partition import Partition
from ..exceptions import DiskError
from ..exceptions import DiskError, SysCallError
from ..output import log
from ..general import SysCommand
from ..storage import storage
@ -189,10 +189,13 @@ class BlockDevice:
# that is "outside" the disk. in /dev/sr0 this is usually the case with Archiso,
# so the free will ignore the ESP partition and just give the "free" space.
# Doesn't harm us, but worth noting in case something weird happens.
for line in SysCommand(f"parted -s --machine {self.path} print free"):
if 'free' in (free_space := line.decode('UTF-8')):
_, start, end, size, *_ = free_space.strip('\r\n;').split(':')
yield (start, end, size)
try:
for line in SysCommand(f"parted -s --machine {self.path} print free"):
if 'free' in (free_space := line.decode('UTF-8')):
_, start, end, size, *_ = free_space.strip('\r\n;').split(':')
yield (start, end, size)
except SysCallError as error:
log(f"Could not get free space on {self.path}: {error}", level=logging.INFO)
@property
def largest_free_space(self) -> List[str]:

View File

@ -1,6 +1,7 @@
from __future__ import annotations
import logging
from typing import Optional, Dict, Any, List, TYPE_CHECKING
# https://stackoverflow.com/a/39757388/929999
if TYPE_CHECKING:
from .blockdevice import BlockDevice
@ -8,6 +9,7 @@ if TYPE_CHECKING:
from .helpers import sort_block_devices_based_on_performance, select_largest_device, select_disk_larger_than_or_close_to
from ..hardware import has_uefi
from ..output import log
from ..menu import Menu
def suggest_single_disk_layout(block_device :BlockDevice,
default_filesystem :Optional[str] = None,
@ -22,7 +24,9 @@ def suggest_single_disk_layout(block_device :BlockDevice,
using_home_partition = False
if default_filesystem == 'btrfs':
using_subvolumes = input('Would you like to use BTRFS subvolumes with a default structure? (Y/n): ').strip().lower() in ('', 'y', 'yes')
prompt = 'Would you like to use BTRFS subvolumes with a default structure?'
choice = Menu(prompt, ['yes', 'no'], skip=False, default_option='yes').run()
using_subvolumes = choice == 'yes'
layout = {
block_device.path : {
@ -76,7 +80,9 @@ def suggest_single_disk_layout(block_device :BlockDevice,
layout[block_device.path]['partitions'][-1]['start'] = '513MiB'
if not using_subvolumes and block_device.size >= MIN_SIZE_TO_ALLOW_HOME_PART:
using_home_partition = input('Would you like to create a separate partition for /home? (Y/n): ').strip().lower() in ('', 'y', 'yes')
prompt = 'Would you like to create a separate partition for /home?'
choice = Menu(prompt, ['yes', 'no'], skip=False, default_option='yes').run()
using_home_partition = choice == 'yes'
# Set a size for / (/root)
if using_subvolumes or block_device.size < MIN_SIZE_TO_ALLOW_HOME_PART or not using_home_partition:

View File

@ -41,3 +41,6 @@ class UserError(BaseException):
class ServiceException(BaseException):
pass
class PackageError(BaseException):
pass

View File

@ -9,6 +9,7 @@ import subprocess
import string
import sys
import time
import re
from datetime import datetime, date
from typing import Callable, Optional, Dict, Any, List, Union, Iterator, TYPE_CHECKING
# https://stackoverflow.com/a/39757388/929999
@ -81,6 +82,18 @@ def locate_binary(name :str) -> str:
raise RequirementError(f"Binary {name} does not exist.")
def clear_vt100_escape_codes(data :Union[bytes, str]):
# https://stackoverflow.com/a/43627833/929999
if type(data) == bytes:
vt100_escape_regex = bytes(r'\x1B\[[?0-9;]*[a-zA-Z]', 'UTF-8')
else:
vt100_escape_regex = r'\x1B\[[?0-9;]*[a-zA-Z]'
for match in re.findall(vt100_escape_regex, data, re.IGNORECASE):
data = data.replace(match, '' if type(data) == str else b'')
return data
def json_dumps(*args :str, **kwargs :str) -> str:
return json.dumps(*args, **{**kwargs, 'cls': JSON})
@ -168,7 +181,8 @@ class SysCommandWorker:
peak_output :Optional[bool] = False,
environment_vars :Optional[Dict[str, Any]] = None,
logfile :Optional[None] = None,
working_directory :Optional[str] = './'):
working_directory :Optional[str] = './',
remove_vt100_escape_codes_from_lines :bool = True):
if not callbacks:
callbacks = {}
@ -200,6 +214,7 @@ class SysCommandWorker:
self.child_fd :Optional[int] = None
self.started :Optional[float] = None
self.ended :Optional[float] = None
self.remove_vt100_escape_codes_from_lines :bool = remove_vt100_escape_codes_from_lines
def __contains__(self, key: bytes) -> bool:
"""
@ -216,6 +231,9 @@ class SysCommandWorker:
def __iter__(self, *args :str, **kwargs :Dict[str, Any]) -> Iterator[bytes]:
for line in self._trace_log[self._trace_log_pos:self._trace_log.rfind(b'\n')].split(b'\n'):
if line:
if self.remove_vt100_escape_codes_from_lines:
line = clear_vt100_escape_codes(line)
yield line + b'\n'
self._trace_log_pos = self._trace_log.rfind(b'\n')
@ -368,7 +386,8 @@ class SysCommand:
start_callback :Optional[Callable[[Any], Any]] = None,
peak_output :Optional[bool] = False,
environment_vars :Optional[Dict[str, Any]] = None,
working_directory :Optional[str] = './'):
working_directory :Optional[str] = './',
remove_vt100_escape_codes_from_lines :bool = True):
_callbacks = {}
if callbacks:
@ -382,6 +401,7 @@ class SysCommand:
self.peak_output = peak_output
self.environment_vars = environment_vars
self.working_directory = working_directory
self.remove_vt100_escape_codes_from_lines = remove_vt100_escape_codes_from_lines
self.session :Optional[SysCommandWorker] = None
self.create_session()
@ -435,7 +455,7 @@ class SysCommand:
if self.session:
return self.session
with SysCommandWorker(self.cmd, callbacks=self._callbacks, peak_output=self.peak_output, environment_vars=self.environment_vars) as session:
with SysCommandWorker(self.cmd, callbacks=self._callbacks, peak_output=self.peak_output, environment_vars=self.environment_vars, remove_vt100_escape_codes_from_lines=self.remove_vt100_escape_codes_from_lines) as session:
if not self.session:
self.session = session

View File

@ -1,10 +1,13 @@
import os
import logging
from functools import partial
from pathlib import Path
from typing import Iterator, Optional, Union
from .general import SysCommand
from .networking import list_interfaces, enrich_iface_types
from .exceptions import SysCallError
from .output import log
__packages__ = [
"mesa",
@ -168,10 +171,19 @@ def mem_total() -> Optional[int]:
def virtualization() -> Optional[str]:
return str(SysCommand("systemd-detect-virt")).strip('\r\n')
try:
return str(SysCommand("systemd-detect-virt")).strip('\r\n')
except SysCallError as error:
log(f"Could not detect virtual system: {error}", level=logging.DEBUG)
return None
def is_vm() -> bool:
return b"none" not in b"".join(SysCommand("systemd-detect-virt")).lower()
try:
return b"none" not in b"".join(SysCommand("systemd-detect-virt")).lower()
except SysCallError as error:
log(f"System is not running in a VM: {error}", level=logging.DEBUG)
return None
# TODO: Add more identifiers

View File

@ -1,8 +1,32 @@
import sys
import archinstall
from archinstall import Menu
from .menu import Menu
from ..general import SysCommand
from ..storage import storage
from ..output import log
from ..profiles import is_desktop_profile
from ..disk import encrypted_partitions
from ..locale_helpers import set_keyboard_language
from ..user_interaction import get_password
from ..user_interaction import ask_ntp
from ..user_interaction import ask_for_swap
from ..user_interaction import ask_for_bootloader
from ..user_interaction import ask_hostname
from ..user_interaction import ask_for_audio_selection
from ..user_interaction import ask_additional_packages_to_install
from ..user_interaction import ask_to_configure_network
from ..user_interaction import ask_for_a_timezone
from ..user_interaction import ask_for_superuser_account
from ..user_interaction import ask_for_additional_users
from ..user_interaction import select_language
from ..user_interaction import select_mirror_regions
from ..user_interaction import select_locale_lang
from ..user_interaction import select_locale_enc
from ..user_interaction import select_disk_layout
from ..user_interaction import select_kernel
from ..user_interaction import select_encrypted_partitions
from ..user_interaction import select_harddrives
from ..user_interaction import select_profile
class Selector:
def __init__(
@ -109,17 +133,17 @@ class GlobalMenu:
def _setup_selection_menu_options(self):
self._menu_options['keyboard-layout'] = \
Selector('Select keyboard layout', lambda: archinstall.select_language('us'), default='us')
Selector('Select keyboard layout', lambda: select_language('us'), default='us')
self._menu_options['mirror-region'] = \
Selector(
'Select mirror region',
lambda: archinstall.select_mirror_regions(),
lambda: select_mirror_regions(),
display_func=lambda x: list(x.keys()) if x else '[]',
default={})
self._menu_options['sys-language'] = \
Selector('Select locale language', lambda: archinstall.select_locale_lang('en_US'), default='en_US')
Selector('Select locale language', lambda: select_locale_lang('en_US'), default='en_US')
self._menu_options['sys-encoding'] = \
Selector('Select locale encoding', lambda: archinstall.select_locale_enc('utf-8'), default='utf-8')
Selector('Select locale encoding', lambda: select_locale_enc('utf-8'), default='utf-8')
self._menu_options['harddrives'] = \
Selector(
'Select harddrives',
@ -127,28 +151,28 @@ class GlobalMenu:
self._menu_options['disk_layouts'] = \
Selector(
'Select disk layout',
lambda: archinstall.select_disk_layout(
archinstall.arguments['harddrives'],
archinstall.arguments.get('advanced', False)
lambda: select_disk_layout(
storage['arguments'].get('harddrives', []),
storage['arguments'].get('advanced', False)
),
dependencies=['harddrives'])
self._menu_options['!encryption-password'] = \
Selector(
'Set encryption password',
lambda: archinstall.get_password(prompt='Enter disk encryption password (leave blank for no encryption): '),
lambda: get_password(prompt='Enter disk encryption password (leave blank for no encryption): '),
display_func=lambda x: self._secret(x) if x else 'None',
dependencies=['harddrives'])
self._menu_options['swap'] = \
Selector(
'Use swap',
lambda: archinstall.ask_for_swap(),
lambda: ask_for_swap(),
default=True)
self._menu_options['bootloader'] = \
Selector(
'Select bootloader',
lambda: archinstall.ask_for_bootloader(archinstall.arguments.get('advanced', False)),)
lambda: ask_for_bootloader(storage['arguments'].get('advanced', False)),)
self._menu_options['hostname'] = \
Selector('Specify hostname', lambda: archinstall.ask_hostname())
Selector('Specify hostname', lambda: ask_hostname())
self._menu_options['!root-password'] = \
Selector(
'Set root password',
@ -174,29 +198,29 @@ class GlobalMenu:
self._menu_options['audio'] = \
Selector(
'Select audio',
lambda: archinstall.ask_for_audio_selection(archinstall.is_desktop_profile(archinstall.arguments.get('profile', None))))
lambda: ask_for_audio_selection(is_desktop_profile(storage['arguments'].get('profile', None))))
self._menu_options['kernels'] = \
Selector(
'Select kernels',
lambda: archinstall.select_kernel(),
lambda: select_kernel(),
default=['linux'])
self._menu_options['packages'] = \
Selector(
'Additional packages to install',
lambda: archinstall.ask_additional_packages_to_install(archinstall.arguments.get('packages', None)),
lambda: ask_additional_packages_to_install(storage['arguments'].get('packages', None)),
default=[])
self._menu_options['nic'] = \
Selector(
'Configure network',
lambda: archinstall.ask_to_configure_network(),
lambda: ask_to_configure_network(),
display_func=lambda x: x if x else 'Not configured, unavailable unless setup manually',
default={})
self._menu_options['timezone'] = \
Selector('Select timezone', lambda: archinstall.ask_for_a_timezone())
Selector('Select timezone', lambda: ask_for_a_timezone())
self._menu_options['ntp'] = \
Selector(
'Set automatic time sync (NTP)',
lambda: archinstall.ask_ntp(),
lambda: self._select_ntp(),
default=True)
self._menu_options['install'] = \
Selector(
@ -205,7 +229,7 @@ class GlobalMenu:
self._menu_options['abort'] = Selector('Abort', enabled=True)
def enable(self, selector_name, omit_if_set=False):
arg = archinstall.arguments.get(selector_name, None)
arg = storage['arguments'].get(selector_name, None)
# don't display the menu option if it was defined already
if arg is not None and omit_if_set:
@ -239,8 +263,8 @@ class GlobalMenu:
self._process_selection(selection)
for key in self._menu_options:
sel = self._menu_options[key]
if key not in archinstall.arguments:
archinstall.arguments[key] = sel._current_selection
if key not in storage['arguments']:
storage['arguments'][key] = sel._current_selection
self._post_processing()
def _process_selection(self, selection):
@ -254,7 +278,7 @@ class GlobalMenu:
selector = option[0][1]
result = selector.func()
self._menu_options[selector_name].set_current_selection(result)
archinstall.arguments[selector_name] = result
storage['arguments'][selector_name] = result
self._update_install()
@ -263,12 +287,12 @@ class GlobalMenu:
self._menu_options.get('install').update_description(text)
def _post_processing(self):
if archinstall.arguments.get('harddrives', None) and archinstall.arguments.get('!encryption-password', None):
if storage['arguments'].get('harddrives', None) and storage['arguments'].get('!encryption-password', None):
# If no partitions was marked as encrypted, but a password was supplied and we have some disks to format..
# Then we need to identify which partitions to encrypt. This will default to / (root).
if len(list(archinstall.encrypted_partitions(archinstall.arguments['disk_layouts']))) == 0:
archinstall.arguments['disk_layouts'] = archinstall.select_encrypted_partitions(
archinstall.arguments['disk_layouts'], archinstall.arguments['!encryption-password'])
if len(list(encrypted_partitions(storage['arguments'].get('disk_layouts', [])))) == 0:
storage['arguments']['disk_layouts'] = select_encrypted_partitions(
storage['arguments']['disk_layouts'], storage['arguments']['!encryption-password'])
def _install_text(self):
missing = self._missing_configs()
@ -301,27 +325,37 @@ class GlobalMenu:
def _set_root_password(self):
prompt = 'Enter root password (leave blank to disable root & create superuser): '
password = archinstall.get_password(prompt=prompt)
password = get_password(prompt=prompt)
# TODO: Do we really wanna wipe the !superusers and !users if root password is set?
# What if they set a superuser first, but then decides to set a root password?
if password is not None:
self._menu_options.get('!superusers').set_current_selection(None)
archinstall.arguments['!users'] = {}
archinstall.arguments['!superusers'] = {}
storage['arguments']['!users'] = {}
storage['arguments']['!superusers'] = {}
return password
def _select_ntp(self) -> bool:
ntp = ask_ntp()
value = str(ntp).lower()
SysCommand(f'timedatectl set-ntp {value}')
return ntp
def _select_harddrives(self):
old_haddrives = archinstall.arguments.get('harddrives')
harddrives = archinstall.select_harddrives()
old_haddrives = storage['arguments'].get('harddrives', [])
harddrives = select_harddrives()
# in case the harddrives got changed we have to reset the disk layout as well
if old_haddrives != harddrives:
self._menu_options.get('disk_layouts').set_current_selection(None)
archinstall.arguments['disk_layouts'] = {}
storage['arguments']['disk_layouts'] = {}
if not harddrives:
prompt = 'You decided to skip harddrive selection\n'
prompt += f"and will use whatever drive-setup is mounted at {archinstall.storage['MOUNT_POINT']} (experimental)\n"
prompt += f"and will use whatever drive-setup is mounted at {storage['MOUNT_POINT']} (experimental)\n"
prompt += "WARNING: Archinstall won't check the suitability of this setup\n"
prompt += 'Do you wish to continue?'
@ -336,36 +370,33 @@ class GlobalMenu:
return '*' * len(x)
def _select_profile(self):
profile = archinstall.select_profile()
profile = select_profile()
# Check the potentially selected profiles preparations to get early checks if some additional questions are needed.
if profile and profile.has_prep_function():
namespace = f'{profile.namespace}.py'
with profile.load_instructions(namespace=namespace) as imported:
if not imported._prep_function():
archinstall.log(' * Profile\'s preparation requirements was not fulfilled.', fg='red')
log(' * Profile\'s preparation requirements was not fulfilled.', fg='red')
exit(1)
return profile
def _create_superuser_account(self):
superuser = archinstall.ask_for_superuser_account('Create a required super-user with sudo privileges: ', forced=True)
superuser = ask_for_superuser_account('Create a required super-user with sudo privileges: ', forced=True)
return superuser
def _create_user_account(self):
users, superusers = archinstall.ask_for_additional_users('Enter a username to create an additional user: ')
if not archinstall.arguments.get('!superusers', None):
archinstall.arguments['!superusers'] = superusers
else:
archinstall.arguments['!superusers'] = {**archinstall.arguments['!superusers'], **superusers}
users, superusers = ask_for_additional_users('Enter a username to create an additional user: ')
storage['arguments']['!superusers'] = {**storage['arguments'].get('!superusers', {}), **superusers}
return users
def _set_kb_language(self):
# Before continuing, set the preferred keyboard layout/language in the current terminal.
# This will just help the user with the next following questions.
if archinstall.arguments.get('keyboard-layout', None) and len(archinstall.arguments['keyboard-layout']):
archinstall.set_keyboard_language(archinstall.arguments['keyboard-layout'])
if len(storage['arguments'].get('keyboard-layout', [])):
set_keyboard_language(storage['arguments']['keyboard-layout'])
def _verify_selection_enabled(self, selection_name):
if selection := self._menu_options.get(selection_name, None):

View File

View File

@ -0,0 +1,136 @@
from dataclasses import dataclass
from typing import Optional, List
@dataclass
class VersionDef:
version_string: str
@classmethod
def parse_version(self) -> List[str]:
if '.' in self.version_string:
versions = self.version_string.split('.')
else:
versions = [self.version_string]
return versions
@classmethod
def major(self) -> str:
return self.parse_version()[0]
@classmethod
def minor(self) -> str:
versions = self.parse_version()
if len(versions) >= 2:
return versions[1]
@classmethod
def patch(self) -> str:
versions = self.parse_version()
if '-' in versions[-1]:
_, patch_version = versions[-1].split('-', 1)
return patch_version
def __eq__(self, other :'VersionDef') -> bool:
if other.major == self.major and \
other.minor == self.minor and \
other.patch == self.patch:
return True
return False
def __lt__(self, other :'VersionDef') -> bool:
if self.major > other.major:
return False
elif self.minor and other.minor and self.minor > other.minor:
return False
elif self.patch and other.patch and self.patch > other.patch:
return False
def __str__(self) -> str:
return self.version_string
@dataclass
class PackageSearchResult:
pkgname: str
pkgbase: str
repo: str
arch: str
pkgver: str
pkgrel: str
epoch: int
pkgdesc: str
url: str
filename: str
compressed_size: int
installed_size: int
build_date: str
last_update: str
flag_date: Optional[str]
maintainers: List[str]
packager: str
groups: List[str]
licenses: List[str]
conflicts: List[str]
provides: List[str]
replaces: List[str]
depends: List[str]
optdepends: List[str]
makedepends: List[str]
checkdepends: List[str]
@property
def pkg_version(self) -> str:
return self.pkgver
def __eq__(self, other :'VersionDef') -> bool:
return self.pkg_version == other.pkg_version
def __lt__(self, other :'VersionDef') -> bool:
return self.pkg_version < other.pkg_version
@dataclass
class PackageSearch:
version: int
limit: int
valid: bool
num_pages: int
page: int
results: List[PackageSearchResult]
def __post_init__(self):
self.results = [PackageSearchResult(**x) for x in self.results]
@dataclass
class LocalPackage:
name: str
version: str
description:str
architecture: str
url: str
licenses: str
groups: str
depends_on: str
optional_deps: str
required_by: str
optional_for: str
conflicts_with: str
replaces: str
installed_size: str
packager: str
build_date: str
install_date: str
install_reason: str
install_script: str
validated_by: str
provides: str
@property
def pkg_version(self) -> str:
return self.version
def __eq__(self, other :'VersionDef') -> bool:
return self.pkg_version == other.pkg_version
def __lt__(self, other :'VersionDef') -> bool:
return self.pkg_version < other.pkg_version

View File

@ -0,0 +1,134 @@
from typing import Optional, List
from pydantic import BaseModel
"""
This python file is not in use.
Pydantic is not a builtin, and we use the dataclasses.py instead!
"""
class VersionDef(BaseModel):
version_string: str
@classmethod
def parse_version(self) -> List[str]:
if '.' in self.version_string:
versions = self.version_string.split('.')
else:
versions = [self.version_string]
return versions
@classmethod
def major(self) -> str:
return self.parse_version()[0]
@classmethod
def minor(self) -> str:
versions = self.parse_version()
if len(versions) >= 2:
return versions[1]
@classmethod
def patch(self) -> str:
versions = self.parse_version()
if '-' in versions[-1]:
_, patch_version = versions[-1].split('-', 1)
return patch_version
def __eq__(self, other :'VersionDef') -> bool:
if other.major == self.major and \
other.minor == self.minor and \
other.patch == self.patch:
return True
return False
def __lt__(self, other :'VersionDef') -> bool:
if self.major > other.major:
return False
elif self.minor and other.minor and self.minor > other.minor:
return False
elif self.patch and other.patch and self.patch > other.patch:
return False
def __str__(self) -> str:
return self.version_string
class PackageSearchResult(BaseModel):
pkgname: str
pkgbase: str
repo: str
arch: str
pkgver: str
pkgrel: str
epoch: int
pkgdesc: str
url: str
filename: str
compressed_size: int
installed_size: int
build_date: str
last_update: str
flag_date: Optional[str]
maintainers: List[str]
packager: str
groups: List[str]
licenses: List[str]
conflicts: List[str]
provides: List[str]
replaces: List[str]
depends: List[str]
optdepends: List[str]
makedepends: List[str]
checkdepends: List[str]
@property
def pkg_version(self) -> str:
return self.pkgver
def __eq__(self, other :'VersionDef') -> bool:
return self.pkg_version == other.pkg_version
def __lt__(self, other :'VersionDef') -> bool:
return self.pkg_version < other.pkg_version
class PackageSearch(BaseModel):
version: int
limit: int
valid: bool
results: List[PackageSearchResult]
class LocalPackage(BaseModel):
name: str
version: str
description:str
architecture: str
url: str
licenses: str
groups: str
depends_on: str
optional_deps: str
required_by: str
optional_for: str
conflicts_with: str
replaces: str
installed_size: str
packager: str
build_date: str
install_date: str
install_reason: str
install_script: str
validated_by: str
@property
def pkg_version(self) -> str:
return self.version
def __eq__(self, other :'VersionDef') -> bool:
return self.pkg_version == other.pkg_version
def __lt__(self, other :'VersionDef') -> bool:
return self.pkg_version < other.pkg_version

View File

@ -106,4 +106,4 @@ def log(*args :str, **kwargs :Union[str, int, Dict[str, Union[str, int]]]) -> No
# We use sys.stdout.write()+flush() instead of print() to try and
# fix issue #94
sys.stdout.write(f"{string}\n")
sys.stdout.flush()
sys.stdout.flush()

View File

@ -1,66 +0,0 @@
import json
import ssl
import urllib.error
import urllib.parse
import urllib.request
from typing import Dict, Any
from .exceptions import RequirementError
BASE_URL = 'https://archlinux.org/packages/search/json/?name={package}'
BASE_GROUP_URL = 'https://archlinux.org/groups/x86_64/{group}/'
def find_group(name :str) -> bool:
ssl_context = ssl.create_default_context()
ssl_context.check_hostname = False
ssl_context.verify_mode = ssl.CERT_NONE
try:
response = urllib.request.urlopen(BASE_GROUP_URL.format(group=name), context=ssl_context)
except urllib.error.HTTPError as err:
if err.code == 404:
return False
else:
raise err
# Just to be sure some code didn't slip through the exception
if response.code == 200:
return True
def find_package(name :str) -> Any:
"""
Finds a specific package via the package database.
It makes a simple web-request, which might be a bit slow.
"""
ssl_context = ssl.create_default_context()
ssl_context.check_hostname = False
ssl_context.verify_mode = ssl.CERT_NONE
response = urllib.request.urlopen(BASE_URL.format(package=name), context=ssl_context)
data = response.read().decode('UTF-8')
return json.loads(data)
def find_packages(*names :str) -> Dict[str, Any]:
"""
This function returns the search results for many packages.
The function itself is rather slow, so consider not sending to
many packages to the search query.
"""
return {package: find_package(package) for package in names}
def validate_package_list(packages: list) -> bool:
"""
Validates a list of given packages.
Raises `RequirementError` if one or more packages are not found.
"""
invalid_packages = [
package
for package in packages
if not find_package(package)['results'] and not find_group(package)
]
if invalid_packages:
raise RequirementError(f"Invalid package names: {invalid_packages}")
return True

View File

View File

@ -0,0 +1,109 @@
import ssl
import urllib.request
import json
from typing import Dict, Any
from ..general import SysCommand
from ..models.dataclasses import PackageSearch, PackageSearchResult, LocalPackage
from ..exceptions import PackageError, SysCallError, RequirementError
BASE_URL_PKG_SEARCH = 'https://archlinux.org/packages/search/json/?name={package}'
# BASE_URL_PKG_CONTENT = 'https://archlinux.org/packages/search/json/'
BASE_GROUP_URL = 'https://archlinux.org/groups/x86_64/{group}/'
def find_group(name :str) -> bool:
# TODO UPSTREAM: Implement /json/ for the groups search
ssl_context = ssl.create_default_context()
ssl_context.check_hostname = False
ssl_context.verify_mode = ssl.CERT_NONE
try:
response = urllib.request.urlopen(BASE_GROUP_URL.format(group=name), context=ssl_context)
except urllib.error.HTTPError as err:
if err.code == 404:
return False
else:
raise err
# Just to be sure some code didn't slip through the exception
if response.code == 200:
return True
return False
def package_search(package :str) -> PackageSearch:
"""
Finds a specific package via the package database.
It makes a simple web-request, which might be a bit slow.
"""
# TODO UPSTREAM: Implement bulk search, either support name=X&name=Y or split on space (%20 or ' ')
# TODO: utilize pacman cache first, upstream second.
ssl_context = ssl.create_default_context()
ssl_context.check_hostname = False
ssl_context.verify_mode = ssl.CERT_NONE
response = urllib.request.urlopen(BASE_URL_PKG_SEARCH.format(package=package), context=ssl_context)
if response.code != 200:
raise PackageError(f"Could not locate package: [{response.code}] {response}")
data = response.read().decode('UTF-8')
return PackageSearch(**json.loads(data))
class IsGroup(BaseException):
pass
def find_package(package :str) -> PackageSearchResult:
data = package_search(package)
if not data.results:
# Check if the package is actually a group
if find_group(package):
# TODO: Until upstream adds a JSON result for group searches
# there is no way we're going to parse HTML reliably.
raise IsGroup("Implement group search")
raise PackageError(f"Could not locate {package} while looking for repository category")
# If we didn't find the package in the search results,
# odds are it's a group package
for result in data.results:
if result.pkgname == package:
return result
raise PackageError(f"Could not locate {package} in result while looking for repository category")
def find_packages(*names :str) -> Dict[str, Any]:
"""
This function returns the search results for many packages.
The function itself is rather slow, so consider not sending to
many packages to the search query.
"""
return {package: find_package(package) for package in names}
def validate_package_list(packages: list) -> bool:
"""
Validates a list of given packages.
Raises `RequirementError` if one or more packages are not found.
"""
invalid_packages = [
package
for package in packages
if not find_package(package)['results'] and not find_group(package)
]
if invalid_packages:
raise RequirementError(f"Invalid package names: {invalid_packages}")
return True
def installed_package(package :str) -> LocalPackage:
package_info = {}
try:
for line in SysCommand(f"pacman -Q --info {package}"):
if b':' in line:
key, value = line.decode().split(':', 1)
package_info[key.strip().lower().replace(' ', '_')] = value.strip()
except SysCallError:
pass
return LocalPackage(**package_info)

View File

@ -28,7 +28,8 @@ from .mirrors import list_mirrors
# TODO: Some inconsistencies between the selection processes.
# Some return the keys from the options, some the values?
from .. import fs_types, validate_package_list
from .disk.validators import fs_types
from .packages.packages import validate_package_list
# TODO: These can be removed after the move to simple_menu.py
def get_terminal_height() -> int:
@ -78,8 +79,9 @@ def do_countdown() -> bool:
print(".", end='')
if SIG_TRIGGER:
abort = input('\nDo you really want to abort (y/n)? ')
if abort.strip() != 'n':
prompt = 'Do you really want to abort'
choice = Menu(prompt, ['yes', 'no'], skip=False).run()
if choice == 'yes':
exit(0)
if SIG_TRIGGER is False:
@ -270,7 +272,7 @@ def ask_for_swap(prompt='Would you like to use swap on zram?', forced=False):
return False if choice == 'no' else True
def ask_ntp():
def ask_ntp() -> bool:
prompt = 'Would you like to use automatic time synchronization (NTP) with the default time servers?'
prompt += 'Hardware time and other post-configuration steps might be required in order for NTP to work. For more information, please check the Arch wiki'
choice = Menu(prompt, ['yes', 'no'], skip=False, default_option='yes').run()
@ -858,7 +860,7 @@ def select_harddrives() -> Optional[str]:
return []
def select_driver(options :Dict[str, Any] = AVAILABLE_GFX_DRIVERS) -> str:
def select_driver(options :Dict[str, Any] = AVAILABLE_GFX_DRIVERS, force_ask :bool = False) -> str:
"""
Some what convoluted function, whose job is simple.
Select a graphics driver from a pre-defined set of popular options.
@ -880,7 +882,7 @@ def select_driver(options :Dict[str, Any] = AVAILABLE_GFX_DRIVERS) -> str:
if has_nvidia_graphics():
title += 'For the best compatibility with your Nvidia hardware, you may want to use the Nvidia proprietary driver.\n'
if not arguments.get('gfx_driver', None):
if not arguments.get('gfx_driver', None) or force_ask:
title += '\n\nSelect a graphics driver or leave blank to install all open-source drivers'
arguments['gfx_driver'] = Menu(title, drivers).run()

View File

@ -1,37 +1,33 @@
{
"audio": "pipewire",
"dry_run": true,
"audio": "none",
"bootloader": "systemd-bootctl",
"custom-commands": [
"cd /home/devel; git clone https://aur.archlinux.org/paru.git",
"chown -R devel:devel /home/devel/paru",
"usermod -aG docker devel"
"debug": false,
"harddrives": [
"/dev/loop0"
],
"!encryption-password": "supersecret",
"filesystem": "btrfs",
"gfx_driver": "All open-source (default)",
"harddrive": {
"path": "/dev/nvme0n1"
},
"hostname": "development-box",
"kernels": [
"linux"
],
"keyboard-language": "us",
"keyboard-layout": "us",
"mirror-region": "Worldwide",
"nic": {
"NetworkManager": true
"NetworkManager": true
},
"ntp": true,
"packages": ["docker", "git", "wget", "zsh"],
"profile": "gnome",
"services": ["docker"],
"superusers": {
"devel": {
"!password": "devel"
}
},
"profile": "gnome",
"gfx_driver": "All open-source (default)",
"swap": true,
"sys-encoding": "utf-8",
"sys-language": "en_US",
"timezone": "US/Eastern",
"users": {}
"timezone": "Europe/Stockholm",
"version": "2.3.1.dev0",
"custom-commands": [
"cd /home/devel; git clone https://aur.archlinux.org/paru.git",
"chown -R devel:devel /home/devel/paru",
"usermod -aG docker devel"
]
}

View File

@ -58,15 +58,16 @@ def ask_user_questions():
will we continue with the actual installation steps.
"""
# ref: https://github.com/archlinux/archinstall/pull/831
# we'll set NTP to true by default since this is also
# the default value specified in the menu options; in
# case it will be changed by the user we'll also update
# the system immediately
archinstall.SysCommand('timedatectl set-ntp true')
global_menu = archinstall.GlobalMenu()
global_menu.enable('keyboard-layout')
if not archinstall.arguments.get('ntp', False):
archinstall.arguments['ntp'] = input("Would you like to use automatic time synchronization (NTP) with the default time servers? [Y/n]: ").strip().lower() in ('y', 'yes', '')
if archinstall.arguments['ntp']:
archinstall.log("Hardware time and other post-configuration steps might be required in order for NTP to work. For more information, please check the Arch wiki.", fg="yellow")
archinstall.SysCommand('timedatectl set-ntp true')
# Set which region to download packages from during the installation
global_menu.enable('mirror-region')
@ -293,8 +294,9 @@ def perform_installation(mountpoint):
installation.log("For post-installation tips, see https://wiki.archlinux.org/index.php/Installation_guide#Post-installation", fg="yellow")
if not archinstall.arguments.get('silent'):
choice = input("Would you like to chroot into the newly created installation and perform post-installation configuration? [Y/n] ")
if choice.lower() in ("y", ""):
prompt = 'Would you like to chroot into the newly created installation and perform post-installation configuration?'
choice = archinstall.Menu(prompt, ['yes', 'no'], default_option='yes').run()
if choice == 'yes':
try:
installation.drop_to_shell()
except:
@ -309,10 +311,17 @@ if not (archinstall.check_mirror_reachable() or archinstall.arguments.get('skip-
archinstall.log(f"Arch Linux mirrors are not reachable. Please check your internet connection and the log file '{log_file}'.", level=logging.INFO, fg="red")
exit(1)
if not (archinstall.update_keyring() or archinstall.arguments.get('skip-keyring-update', False)):
log_file = os.path.join(archinstall.storage.get('LOG_PATH', None), archinstall.storage.get('LOG_FILE', None))
archinstall.log(f"Failed to update the keyring. Please check your internet connection and the log file '{log_file}'.", level=logging.INFO, fg="red")
exit(1)
if not archinstall.arguments.get('offline', False):
# If we want to check for keyring updates
# and the installed package version is lower than the upstream version
if archinstall.arguments.get('skip-keyring-update', False) is False and \
archinstall.installed_package('archlinux-keyring') < archinstall.find_package('archlinux-keyring'):
# Then we update the keyring in the ISO environment
if not archinstall.update_keyring():
log_file = os.path.join(archinstall.storage.get('LOG_PATH', None), archinstall.storage.get('LOG_FILE', None))
archinstall.log(f"Failed to update the keyring. Please check your internet connection and the log file '{log_file}'.", level=logging.INFO, fg="red")
exit(1)
load_config()
if not archinstall.arguments.get('silent'):

View File

@ -33,6 +33,7 @@ __supported__ = [
'mate',
'deepin',
'enlightenment',
'qtile'
]

View File

@ -7,7 +7,7 @@ is_top_level_profile = False
__packages__ = [
"plasma-meta",
"konsole",
"kate",
"kwrite",
"dolphin",
"ark",
"sddm",

43
profiles/qtile.py Normal file
View File

@ -0,0 +1,43 @@
# A desktop environment using "qtile" window manager with common packages.
import archinstall
is_top_level_profile = False
# New way of defining packages for a profile, which is iterable and can be used out side
# of the profile to get a list of "what packages will be installed".
__packages__ = [
'qtile',
'alacritty',
'lightdm-gtk-greeter',
'lightdm',
'dmenu'
]
def _prep_function(*args, **kwargs):
"""
Magic function called by the importing installer
before continuing any further. It also avoids executing any
other code in this stage. So it's a safe way to ask the user
for more input before any other installer steps start.
"""
# qtile optionally supports xorg, we'll install it since it also
# includes graphic driver setups (this might change in the future)
profile = archinstall.Profile(None, 'xorg')
with profile.load_instructions(namespace='xorg.py') as imported:
if hasattr(imported, '_prep_function'):
return imported._prep_function()
else:
print('Deprecated (??): xorg profile has no _prep_function() anymore')
if __name__ == 'qtile':
# Install dependency profiles
archinstall.storage['installation_session'].install_profile('xorg')
# Install packages for qtile
archinstall.storage['installation_session'].add_additional_packages(__packages__)
# Auto start lightdm for all users
archinstall.storage['installation_session'].enable_service('lightdm') # Light Display Manager

View File

@ -1,5 +1,4 @@
# A desktop environment using "Sway"
import archinstall
is_top_level_profile = False
@ -18,6 +17,16 @@ __packages__ = [
]
def _check_driver() -> bool:
if "nvidia" in archinstall.storage.get("gfx_driver_packages", None):
prompt = 'The proprietary Nvidia driver is not supported by Sway. It is likely that you will run into issues, are you okay with that?'
choice = archinstall.Menu(prompt, ['yes', 'no'], default_option='no').run()
if choice == 'no':
return False
return True
def _prep_function(*args, **kwargs):
"""
Magic function called by the importing installer
@ -25,7 +34,9 @@ def _prep_function(*args, **kwargs):
other code in this stage. So it's a safe way to ask the user
for more input before any other installer steps start.
"""
archinstall.storage["gfx_driver_packages"] = archinstall.select_driver()
archinstall.storage["gfx_driver_packages"] = archinstall.select_driver(force_ask=True)
if not _check_driver():
return _prep_function(args, kwargs)
return True
@ -34,10 +45,8 @@ def _prep_function(*args, **kwargs):
# through importlib.util.spec_from_file_location("sway", "/somewhere/sway.py")
# or through conventional import sway
if __name__ == "sway":
if "nvidia" in archinstall.storage.get("gfx_driver_packages", None):
choice = input("The proprietary Nvidia driver is not supported by Sway. It is likely that you will run into issues. Continue anyways? [y/N] ")
if choice.lower() in ("n", ""):
raise archinstall.lib.exceptions.HardwareIncompatibilityError("Sway does not support the proprietary nvidia drivers.")
if not _check_driver():
raise archinstall.lib.exceptions.HardwareIncompatibilityError("Sway does not support the proprietary nvidia drivers.")
# Install the Sway packages
archinstall.storage['installation_session'].add_additional_packages(__packages__)