Refactor installer and general design patterns (#1895)

* fix: refactor clear_vt100_escape_codes

* fix: check for structure being a dict after handling potential parsing errors

* refactor: use short circuit logic than if-elif-else chains

* fix: use or for nullish moutpoint attribute

* fix: better error handling for JSON from urls and paths

* chore: json_stream_to_structure documentation

* refactor: dry up relative and chroot path for custom command scripts

* refactor: use write_text for pathlib.Path object

* refactor: use sets to find intersection instead of filter and list

* refactor: replace loop with dictionary comprehension in preparing luks partition

* refactor: use walrus operator to check if luks_handler exists

* refactor: use read_text and splitlines for potential Path object

* fix: use keepends in splitlines for compatibility

* fix: use keepends in splitlines for compatibility

* feat: set pacman_conf Path as an attribute of installer

* fix: empty string is a part of any string, avoid tuples

* refactor: use iterator patterns to uncomment multilib and testing blocks

* fix: don't json.loads an already loaded structure

* fix: use fstab_path uniformly in genfstab

* fix: remove unused variable matched

* refactor: create separate class to modify pacman.conf in a single pass

* fix: remove unused attribute pacman_conf from installer

* fix: remove unused attribute pacman_conf from installer

* feat: add persist method for pacman.conf, rewrite only when needed

* fix: use path.write_text for locale.conf

* use `or` operator for nullish new_conf

* refactor: Installer.target is always a pathlib.Path object, do not check for string type

* fix: use Optional[str] in function type definition instead of sumtype of str and None

* fix: mypy type annotation

* fix: make flake8 happy

* chore: move pacman config and repo into pacman module

* refactor: use Pacman object instead of Installer's pacstrap method

* fix: break after first sync

* fix: keep old build script for now

* use nullish operator for base_packages and disk_encryption of Installer

* feat: use shutil.which instead of rolling our own implementation

* fix: check for binary only if list is not empty

* fix: import Enum and fix mypy errors

* refactor: use nullish operator for default values

* refactor: linear search for key in Installer._trace_log only once

* fix: use logs instead of the entirety of self._trace_log when searching for key

* refactor: do not copy slice of bytes for search

* refactor: use rfind only once to iterate over logs, do not raise ValueError in clear_vt100_escape_codes since TYPE_CHECKING will take care of it.

* refactor: try decoding trace log before falling back to strigification

* refactor: use an empty dict as default for callbacks in SysCommand.__init__

* refactor: use nullish or operator for slice start and end when not specified

* refactor: use nullish or operator for SysCommand session

* refactor: use pre-existing decode method in __repr__ for SysCommand

* fix: overindentation

* fix: use shallow copy of callbacks to prevent mutating the key-value relationships of the argument dict

* refactor: use truthy value of self.session is not None for json encoding SysCommand

* refactor: directly assign to SysCommand.session in create_session since it short circuits to True if already present

* refactor: use dict.items() instead of manually retrieving the value using the key

* refactor: user_config_to_json method sounds pretty self explanatory

* refactor: store path validity as boolean for return

* refactor: use pathlib.Path.write_text to write configs to destinations

* fix: cannot use assignment expressions with expression

* fix: use config_output.save for saving both config and creds

* refactor: switch dictionary keys and values for options to avoid redundancy

* refactor: use itertools.takewhile to collect locale.gen entries until the empty line

* refactor: use iterative approach for nvidia driver fix

* refactor: install packages if not nvidia

* refactor: return early if no profile is selected

* refactor: use strip to remove commented lines

* fix: install additional packages only when we have a driver

* fix: path with one command is matched as relative to '.'

* fix: remove translation for debug log

---------

Co-authored-by: Anton Hvornum <anton@hvornum.se>
This commit is contained in:
Himadri Bhattacharjee 2023-06-28 11:42:53 +00:00 committed by GitHub
parent 57ebc42ffd
commit 1ae1f2ff11
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
11 changed files with 347 additions and 398 deletions

View File

@ -26,7 +26,7 @@ class ConfigurationOutput:
self._config = config self._config = config
self._user_credentials: Dict[str, Any] = {} self._user_credentials: Dict[str, Any] = {}
self._user_config: Dict[str, Any] = {} self._user_config: Dict[str, Any] = {}
self._default_save_path = Path(storage.get('LOG_PATH', '.')) self._default_save_path = storage.get('LOG_PATH', Path('.'))
self._user_config_file = 'user_configuration.json' self._user_config_file = 'user_configuration.json'
self._user_creds_file = "user_credentials.json" self._user_creds_file = "user_credentials.json"
@ -44,17 +44,17 @@ class ConfigurationOutput:
return self._user_config_file return self._user_config_file
def _process_config(self): def _process_config(self):
for key in self._config: for key, value in self._config.items():
if key in self._sensitive: if key in self._sensitive:
self._user_credentials[key] = self._config[key] self._user_credentials[key] = value
elif key in self._ignore: elif key in self._ignore:
pass pass
else: else:
self._user_config[key] = self._config[key] self._user_config[key] = value
# special handling for encryption password # special handling for encryption password
if key == 'disk_encryption' and self._config[key] is not None: if key == 'disk_encryption' and value:
self._user_credentials['encryption_password'] = self._config[key].encryption_password self._user_credentials['encryption_password'] = value.encryption_password
def user_config_to_json(self) -> str: def user_config_to_json(self) -> str:
return json.dumps({ return json.dumps({
@ -72,42 +72,33 @@ class ConfigurationOutput:
print(_('\nThis is your chosen configuration:')) print(_('\nThis is your chosen configuration:'))
debug(" -- Chosen configuration --") debug(" -- Chosen configuration --")
user_conig = self.user_config_to_json() info(self.user_config_to_json())
info(user_conig)
print() print()
def _is_valid_path(self, dest_path: Path) -> bool: def _is_valid_path(self, dest_path: Path) -> bool:
if (not dest_path.exists()) or not (dest_path.is_dir()): dest_path_ok = dest_path.exists() and dest_path.is_dir()
if not dest_path_ok:
warn( warn(
f'Destination directory {dest_path.resolve()} does not exist or is not a directory\n.', f'Destination directory {dest_path.resolve()} does not exist or is not a directory\n.',
'Configuration files can not be saved' 'Configuration files can not be saved'
) )
return False return dest_path_ok
return True
def save_user_config(self, dest_path: Path): def save_user_config(self, dest_path: Path):
if self._is_valid_path(dest_path): if self._is_valid_path(dest_path):
target = dest_path / self._user_config_file target = dest_path / self._user_config_file
target.write_text(self.user_config_to_json())
with open(target, 'w') as config_file: os.chmod(target, stat.S_IRUSR | stat.S_IWUSR | stat.S_IRGRP)
config_file.write(self.user_config_to_json())
os.chmod(str(dest_path / self._user_config_file), stat.S_IRUSR | stat.S_IWUSR | stat.S_IRGRP)
def save_user_creds(self, dest_path: Path): def save_user_creds(self, dest_path: Path):
if self._is_valid_path(dest_path): if self._is_valid_path(dest_path):
if user_creds := self.user_credentials_to_json(): if user_creds := self.user_credentials_to_json():
target = dest_path / self._user_creds_file target = dest_path / self._user_creds_file
target.write_text(user_creds)
with open(target, 'w') as config_file: os.chmod(target, stat.S_IRUSR | stat.S_IWUSR | stat.S_IRGRP)
config_file.write(user_creds)
os.chmod(str(target), stat.S_IRUSR | stat.S_IWUSR | stat.S_IRGRP)
def save(self, dest_path: Optional[Path] = None): def save(self, dest_path: Optional[Path] = None):
if not dest_path: dest_path = dest_path or self._default_save_path
dest_path = self._default_save_path
if self._is_valid_path(dest_path): if self._is_valid_path(dest_path):
self.save_user_config(dest_path) self.save_user_config(dest_path)
@ -116,33 +107,33 @@ class ConfigurationOutput:
def save_config(config: Dict): def save_config(config: Dict):
def preview(selection: str): def preview(selection: str):
if options["user_config"] == selection: match options[selection]:
serialized = config_output.user_config_to_json() case "user_config":
return f"{config_output.user_configuration_file}\n{serialized}" serialized = config_output.user_config_to_json()
elif options["user_creds"] == selection: return f"{config_output.user_configuration_file}\n{serialized}"
if maybe_serial := config_output.user_credentials_to_json(): case "user_creds":
return f"{config_output.user_credentials_file}\n{maybe_serial}" if maybe_serial := config_output.user_credentials_to_json():
else: return f"{config_output.user_credentials_file}\n{maybe_serial}"
return str(_("No configuration")) return str(_("No configuration"))
elif options["all"] == selection: case "all":
output = f"{config_output.user_configuration_file}\n" output = [config_output.user_configuration_file]
if config_output.user_credentials_to_json(): if config_output.user_credentials_to_json():
output += f"{config_output.user_credentials_file}\n" output.append(config_output.user_credentials_file)
return output[:-1] return '\n'.join(output)
return None return None
try: try:
config_output = ConfigurationOutput(config) config_output = ConfigurationOutput(config)
options = { options = {
"user_config": str(_("Save user configuration (including disk layout)")), str(_("Save user configuration (including disk layout)")): "user_config",
"user_creds": str(_("Save user credentials")), str(_("Save user credentials")): "user_creds",
"all": str(_("Save all")), str(_("Save all")): "all",
} }
save_choice = Menu( save_choice = Menu(
_("Choose which configuration to save"), _("Choose which configuration to save"),
list(options.values()), list(options),
sort=False, sort=False,
skip=True, skip=True,
preview_size=0.75, preview_size=0.75,
@ -170,27 +161,21 @@ def save_config(config: Dict):
prompt = _( prompt = _(
"Do you want to save {} configuration file(s) in the following location?\n\n{}" "Do you want to save {} configuration file(s) in the following location?\n\n{}"
).format( ).format(options[str(save_choice.value)], dest_path.absolute())
list(options.keys())[list(options.values()).index(str(save_choice.value))],
dest_path.absolute(),
)
save_confirmation = Menu(prompt, Menu.yes_no(), default_option=Menu.yes()).run() save_confirmation = Menu(prompt, Menu.yes_no(), default_option=Menu.yes()).run()
if save_confirmation == Menu.no(): if save_confirmation == Menu.no():
return return
debug( debug("Saving {} configuration files to {}".format(options[str(save_choice.value)], dest_path.absolute()))
_("Saving {} configuration files to {}").format(
list(options.keys())[list(options.values()).index(str(save_choice.value))], match options[str(save_choice.value)]:
dest_path.absolute(), case "user_config":
) config_output.save_user_config(dest_path)
) case "user_creds":
config_output.save_user_creds(dest_path)
case "all":
config_output.save(dest_path)
if options["user_config"] == save_choice.value:
config_output.save_user_config(dest_path)
elif options["user_creds"] == save_choice.value:
config_output.save_user_creds(dest_path)
elif options["all"] == save_choice.value:
config_output.save_user_config(dest_path)
config_output.save_user_creds(dest_path)
except KeyboardInterrupt: except KeyboardInterrupt:
return return

View File

@ -11,13 +11,14 @@ import sys
import time import time
import re import re
import urllib.parse import urllib.parse
import urllib.request from urllib.request import Request, urlopen
import urllib.error import urllib.error
import pathlib import pathlib
from datetime import datetime, date from datetime import datetime, date
from enum import Enum from enum import Enum
from typing import Callable, Optional, Dict, Any, List, Union, Iterator, TYPE_CHECKING from typing import Callable, Optional, Dict, Any, List, Union, Iterator, TYPE_CHECKING
from select import epoll, EPOLLIN, EPOLLHUP from select import epoll, EPOLLIN, EPOLLHUP
from shutil import which
from .exceptions import RequirementError, SysCallError from .exceptions import RequirementError, SysCallError
from .output import debug, error, info from .output import debug, error, info
@ -34,28 +35,17 @@ def generate_password(length :int = 64) -> str:
def locate_binary(name :str) -> str: def locate_binary(name :str) -> str:
for PATH in os.environ['PATH'].split(':'): if path := which(name):
for root, folders, files in os.walk(PATH): return path
for file in files:
if file == name:
return os.path.join(root, file)
break # Don't recurse
raise RequirementError(f"Binary {name} does not exist.") raise RequirementError(f"Binary {name} does not exist.")
def clear_vt100_escape_codes(data :Union[bytes, str]) -> Union[bytes, str]: def clear_vt100_escape_codes(data :Union[bytes, str]) -> Union[bytes, str]:
# https://stackoverflow.com/a/43627833/929999 # https://stackoverflow.com/a/43627833/929999
if type(data) == bytes: vt100_escape_regex = r'\x1B\[[?0-9;]*[a-zA-Z]'
byte_vt100_escape_regex = bytes(r'\x1B\[[?0-9;]*[a-zA-Z]', 'UTF-8') if isinstance(data, bytes):
data = re.sub(byte_vt100_escape_regex, b'', data) return re.sub(vt100_escape_regex.encode(), b'', data)
elif type(data) == str: return re.sub(vt100_escape_regex, '', data)
vt100_escape_regex = r'\x1B\[[?0-9;]*[a-zA-Z]'
data = re.sub(vt100_escape_regex, '', data)
else:
raise ValueError(f'Unsupported data type: {type(data)}')
return data
def jsonify(obj: Any, safe: bool = True) -> Any: def jsonify(obj: Any, safe: bool = True) -> Any:
@ -120,21 +110,15 @@ class SysCommandWorker:
working_directory :Optional[str] = './', working_directory :Optional[str] = './',
remove_vt100_escape_codes_from_lines :bool = True remove_vt100_escape_codes_from_lines :bool = True
): ):
if not callbacks: callbacks = callbacks or {}
callbacks = {} environment_vars = environment_vars or {}
if not environment_vars: if isinstance(cmd, str):
environment_vars = {}
if type(cmd) is str:
cmd = shlex.split(cmd) cmd = shlex.split(cmd)
cmd = list(cmd) # This is to please mypy if cmd:
if cmd[0][0] != '/' and cmd[0][:2] != './': if cmd[0][0] != '/' and cmd[0][:2] != './': # pathlib.Path does not work well
# "which" doesn't work as it's a builtin to bash. cmd[0] = locate_binary(cmd[0])
# It used to work, but for whatever reason it doesn't anymore.
# We there for fall back on manual lookup in os.PATH
cmd[0] = locate_binary(cmd[0])
self.cmd = cmd self.cmd = cmd
self.callbacks = callbacks self.callbacks = callbacks
@ -158,29 +142,36 @@ class SysCommandWorker:
Contains will also move the current buffert position forward. Contains will also move the current buffert position forward.
This is to avoid re-checking the same data when looking for output. This is to avoid re-checking the same data when looking for output.
""" """
assert type(key) == bytes assert isinstance(key, bytes)
if (contains := key in self._trace_log[self._trace_log_pos:]): index = self._trace_log.find(key, self._trace_log_pos)
self._trace_log_pos += self._trace_log[self._trace_log_pos:].find(key) + len(key) if index >= 0:
self._trace_log_pos += index + len(key)
return True
return contains return False
def __iter__(self, *args :str, **kwargs :Dict[str, Any]) -> Iterator[bytes]: def __iter__(self, *args :str, **kwargs :Dict[str, Any]) -> Iterator[bytes]:
for line in self._trace_log[self._trace_log_pos:self._trace_log.rfind(b'\n')].split(b'\n'): last_line = self._trace_log.rfind(b'\n')
if line: lines = filter(None, self._trace_log[self._trace_log_pos:last_line].splitlines())
escaped_line: bytes = line for line in lines:
if self.remove_vt100_escape_codes_from_lines:
line = clear_vt100_escape_codes(line) # type: ignore
if self.remove_vt100_escape_codes_from_lines: yield line + b'\n'
escaped_line = clear_vt100_escape_codes(line) # type: ignore
yield escaped_line + b'\n' self._trace_log_pos = last_line
self._trace_log_pos = self._trace_log.rfind(b'\n')
def __repr__(self) -> str: def __repr__(self) -> str:
self.make_sure_we_are_executing() self.make_sure_we_are_executing()
return str(self._trace_log) return str(self._trace_log)
def __str__(self) -> str:
try:
return self._trace_log.decode('utf-8')
except UnicodeDecodeError:
return str(self._trace_log)
def __enter__(self) -> 'SysCommandWorker': def __enter__(self) -> 'SysCommandWorker':
return self return self
@ -205,7 +196,7 @@ class SysCommandWorker:
if self.exit_code != 0: if self.exit_code != 0:
raise SysCallError( raise SysCallError(
f"{self.cmd} exited with abnormal exit code [{self.exit_code}]: {str(self._trace_log[-500:])}", f"{self.cmd} exited with abnormal exit code [{self.exit_code}]: {str(self)[-500:]}",
self.exit_code, self.exit_code,
worker=self worker=self
) )
@ -244,7 +235,7 @@ class SysCommandWorker:
def peak(self, output: Union[str, bytes]) -> bool: def peak(self, output: Union[str, bytes]) -> bool:
if self.peek_output: if self.peek_output:
if type(output) == bytes: if isinstance(output, bytes):
try: try:
output = output.decode('UTF-8') output = output.decode('UTF-8')
except UnicodeDecodeError: except UnicodeDecodeError:
@ -282,7 +273,7 @@ class SysCommandWorker:
self.ended = time.time() self.ended = time.time()
break break
if self.ended or (got_output is False and _pid_exists(self.pid) is False): if self.ended or (not got_output and not _pid_exists(self.pid)):
self.ended = time.time() self.ended = time.time()
try: try:
wait_status = os.waitpid(self.pid, 0)[1] wait_status = os.waitpid(self.pid, 0)[1]
@ -321,10 +312,8 @@ class SysCommandWorker:
if change_perm: if change_perm:
os.chmod(str(history_logfile), stat.S_IRUSR | stat.S_IWUSR | stat.S_IRGRP) os.chmod(str(history_logfile), stat.S_IRUSR | stat.S_IWUSR | stat.S_IRGRP)
except PermissionError: except (PermissionError, FileNotFoundError):
pass
# If history_logfile does not exist, ignore the error # If history_logfile does not exist, ignore the error
except FileNotFoundError:
pass pass
except Exception as e: except Exception as e:
exception_type = type(e).__name__ exception_type = type(e).__name__
@ -355,22 +344,18 @@ class SysCommandWorker:
class SysCommand: class SysCommand:
def __init__(self, def __init__(self,
cmd :Union[str, List[str]], cmd :Union[str, List[str]],
callbacks :Optional[Dict[str, Callable[[Any], Any]]] = None, callbacks :Dict[str, Callable[[Any], Any]] = {},
start_callback :Optional[Callable[[Any], Any]] = None, start_callback :Optional[Callable[[Any], Any]] = None,
peek_output :Optional[bool] = False, peek_output :Optional[bool] = False,
environment_vars :Optional[Dict[str, Any]] = None, environment_vars :Optional[Dict[str, Any]] = None,
working_directory :Optional[str] = './', working_directory :Optional[str] = './',
remove_vt100_escape_codes_from_lines :bool = True): remove_vt100_escape_codes_from_lines :bool = True):
_callbacks = {} self._callbacks = callbacks.copy()
if callbacks:
for hook, func in callbacks.items():
_callbacks[hook] = func
if start_callback: if start_callback:
_callbacks['on_start'] = start_callback self._callbacks['on_start'] = start_callback
self.cmd = cmd self.cmd = cmd
self._callbacks = _callbacks
self.peek_output = peek_output self.peek_output = peek_output
self.environment_vars = environment_vars self.environment_vars = environment_vars
self.working_directory = working_directory self.working_directory = working_directory
@ -398,17 +383,15 @@ class SysCommand:
if not self.session: if not self.session:
raise KeyError(f"SysCommand() does not have an active session.") raise KeyError(f"SysCommand() does not have an active session.")
elif type(key) is slice: elif type(key) is slice:
start = key.start if key.start else 0 start = key.start or 0
end = key.stop if key.stop else len(self.session._trace_log) end = key.stop or len(self.session._trace_log)
return self.session._trace_log[start:end] return self.session._trace_log[start:end]
else: else:
raise ValueError("SysCommand() doesn't have key & value pairs, only slices, SysCommand('ls')[:10] as an example.") raise ValueError("SysCommand() doesn't have key & value pairs, only slices, SysCommand('ls')[:10] as an example.")
def __repr__(self, *args :List[Any], **kwargs :Dict[str, Any]) -> str: def __repr__(self, *args :List[Any], **kwargs :Dict[str, Any]) -> str:
if self.session: return self.decode('UTF-8', errors='backslashreplace') or ''
return self.session._trace_log.decode('UTF-8', errors='backslashreplace')
return ''
def __json__(self) -> Dict[str, Union[str, bool, List[str], Dict[str, Any], Optional[bool], Optional[Dict[str, Any]]]]: def __json__(self) -> Dict[str, Union[str, bool, List[str], Dict[str, Any], Optional[bool], Optional[Dict[str, Any]]]]:
return { return {
@ -416,7 +399,7 @@ class SysCommand:
'callbacks': self._callbacks, 'callbacks': self._callbacks,
'peak': self.peek_output, 'peak': self.peek_output,
'environment_vars': self.environment_vars, 'environment_vars': self.environment_vars,
'session': True if self.session else False 'session': self.session is not None
} }
def create_session(self) -> bool: def create_session(self) -> bool:
@ -436,10 +419,9 @@ class SysCommand:
remove_vt100_escape_codes_from_lines=self.remove_vt100_escape_codes_from_lines, remove_vt100_escape_codes_from_lines=self.remove_vt100_escape_codes_from_lines,
working_directory=self.working_directory) as session: working_directory=self.working_directory) as session:
if not self.session: self.session = session
self.session = session
while self.session.ended is None: while not self.session.ended:
self.session.poll() self.session.poll()
if self.peek_output: if self.peek_output:
@ -448,9 +430,9 @@ class SysCommand:
return True return True
def decode(self, fmt :str = 'UTF-8') -> Optional[str]: def decode(self, *args, **kwargs) -> Optional[str]:
if self.session: if self.session:
return self.session._trace_log.decode(fmt) return self.session._trace_log.decode(*args, **kwargs)
return None return None
@property @property
@ -476,54 +458,52 @@ def _pid_exists(pid: int) -> bool:
def run_custom_user_commands(commands :List[str], installation :Installer) -> None: def run_custom_user_commands(commands :List[str], installation :Installer) -> None:
for index, command in enumerate(commands): for index, command in enumerate(commands):
script_path = f"/var/tmp/user-command.{index}.sh"
chroot_path = installation.target / script_path
info(f'Executing custom command "{command}" ...') info(f'Executing custom command "{command}" ...')
chroot_path.write_text(command)
SysCommand(f"arch-chroot {installation.target} bash {script_path}")
with open(f"{installation.target}/var/tmp/user-command.{index}.sh", "w") as temp_script: os.unlink(chroot_path)
temp_script.write(command)
SysCommand(f"arch-chroot {installation.target} bash /var/tmp/user-command.{index}.sh")
os.unlink(f"{installation.target}/var/tmp/user-command.{index}.sh")
def json_stream_to_structure(configuration_identifier : str, stream :str, target :dict) -> bool : def json_stream_to_structure(configuration_identifier : str, stream :str, target :dict) -> bool :
""" """
Function to load a stream (file (as name) or valid JSON string into an existing dictionary Load a JSON encoded dictionary from a stream and merge it into an existing dictionary.
Returns true if it could be done A stream can be a filepath, a URL or a raw JSON string.
Return false if operation could not be executed Returns True if the operation succeeded, False otherwise.
+configuration_identifier is just a parameter to get meaningful, but not so long messages +configuration_identifier is just a parameter to get meaningful, but not so long messages
""" """
parsed_url = urllib.parse.urlparse(stream) raw: Optional[str] = None
# Try using the stream as a URL that should be grabbed
if parsed_url.scheme: # The stream is in fact a URL that should be grabbed if urllib.parse.urlparse(stream).scheme:
try: try:
with urllib.request.urlopen(urllib.request.Request(stream, headers={'User-Agent': 'ArchInstall'})) as response: with urlopen(Request(stream, headers={'User-Agent': 'ArchInstall'})) as response:
target.update(json.loads(response.read())) raw = response.read()
except urllib.error.HTTPError as err: except urllib.error.HTTPError as err:
error(f"Could not load {configuration_identifier} via {parsed_url} due to: {err}") error(f"Could not fetch JSON from {stream} as {configuration_identifier}: {err}")
return False return False
else:
if pathlib.Path(stream).exists():
try:
with pathlib.Path(stream).open() as fh:
target.update(json.load(fh))
except Exception as err:
error(f"{configuration_identifier} = {stream} does not contain a valid JSON format: {err}")
return False
else:
# NOTE: This is a rudimentary check if what we're trying parse is a dict structure.
# Which is the only structure we tolerate anyway.
if stream.strip().startswith('{') and stream.strip().endswith('}'):
try:
target.update(json.loads(stream))
except Exception as e:
error(f"{configuration_identifier} Contains an invalid JSON format: {e}")
return False
else:
error(f"{configuration_identifier} is neither a file nor is a JSON string")
return False
# Try using the stream as a filepath that should be read
if raw is None and (path := pathlib.Path(stream)).exists():
try:
raw = path.read_text()
except Exception as err:
error(f"Could not read file {stream} as {configuration_identifier}: {err}")
return False
try:
# We use `or` to try the stream as raw JSON to be parsed
structure = json.loads(raw or stream)
except Exception as err:
error(f"{configuration_identifier} contains an invalid JSON format: {err}")
return False
if not isinstance(structure, dict):
error(f"{stream} passed as {configuration_identifier} is not a JSON encoded dictionary")
return False
target.update(structure)
return True return True

View File

@ -20,7 +20,8 @@ from .models.bootloader import Bootloader
from .models.network_configuration import NetworkConfiguration from .models.network_configuration import NetworkConfiguration
from .models.users import User from .models.users import User
from .output import log, error, info, warn, debug from .output import log, error, info, warn, debug
from .pacman import run_pacman from . import pacman
from .pacman import Pacman
from .plugins import plugins from .plugins import plugins
from .storage import storage from .storage import storage
@ -52,27 +53,16 @@ class Installer:
`Installer()` is the wrapper for most basic installation steps. `Installer()` is the wrapper for most basic installation steps.
It also wraps :py:func:`~archinstall.Installer.pacstrap` among other things. It also wraps :py:func:`~archinstall.Installer.pacstrap` among other things.
""" """
if not base_packages: self.base_packages = base_packages or __packages__[:3]
base_packages = __packages__[:3] self.kernels = kernels or ['linux']
if kernels is None:
self.kernels = ['linux']
else:
self.kernels = kernels
self._disk_config = disk_config self._disk_config = disk_config
if disk_encryption is None: self._disk_encryption = disk_encryption or disk.DiskEncryption(disk.EncryptionType.NoEncryption)
self._disk_encryption = disk.DiskEncryption(disk.EncryptionType.NoEncryption)
else:
self._disk_encryption = disk_encryption
self.target: Path = target self.target: Path = target
self.init_time = time.strftime('%Y-%m-%d_%H-%M-%S') self.init_time = time.strftime('%Y-%m-%d_%H-%M-%S')
self.milliseconds = int(str(time.time()).split('.')[1]) self.milliseconds = int(str(time.time()).split('.')[1])
self.helper_flags: Dict[str, Any] = {'base': False, 'bootloader': None} self.helper_flags: Dict[str, Any] = {'base': False, 'bootloader': None}
self.base_packages = base_packages
for kernel in self.kernels: for kernel in self.kernels:
self.base_packages.append(kernel) self.base_packages.append(kernel)
@ -101,6 +91,7 @@ class Installer:
self._fstab_entries: List[str] = [] self._fstab_entries: List[str] = []
self._zram_enabled = False self._zram_enabled = False
self.pacman = Pacman(self.target, storage['arguments'].get('silent', False))
def __enter__(self) -> 'Installer': def __enter__(self) -> 'Installer':
return self return self
@ -189,35 +180,33 @@ class Installer:
# partitions have to mounted in the right order on btrfs the mountpoint will # partitions have to mounted in the right order on btrfs the mountpoint will
# be empty as the actual subvolumes are getting mounted instead so we'll use # be empty as the actual subvolumes are getting mounted instead so we'll use
# '/' just for sorting # '/' just for sorting
sorted_part_mods = sorted(mod.partitions, key=lambda x: x.mountpoint if x.mountpoint else Path('/')) sorted_part_mods = sorted(mod.partitions, key=lambda x: x.mountpoint or Path('/'))
enc_partitions = []
if self._disk_encryption.encryption_type is not disk.EncryptionType.NoEncryption: if self._disk_encryption.encryption_type is not disk.EncryptionType.NoEncryption:
enc_partitions = list(filter(lambda x: x in self._disk_encryption.partitions, sorted_part_mods)) enc_partitions = list(set(sorted_part_mods) & set(self._disk_encryption.partitions))
else:
enc_partitions = []
# attempt to decrypt all luks partitions # attempt to decrypt all luks partitions
luks_handlers = self._prepare_luks_partitions(enc_partitions) luks_handlers = self._prepare_luks_partitions(enc_partitions)
for part_mod in sorted_part_mods: for part_mod in sorted_part_mods:
if part_mod not in luks_handlers: # partition is not encrypted if luks_handler := luks_handlers.get(part_mod):
# mount encrypted partition
self._mount_luks_partiton(part_mod, luks_handler)
else:
# partition is not encrypted
self._mount_partition(part_mod) self._mount_partition(part_mod)
else: # mount encrypted partition
self._mount_luks_partiton(part_mod, luks_handlers[part_mod])
def _prepare_luks_partitions(self, partitions: List[disk.PartitionModification]) -> Dict[disk.PartitionModification, Luks2]: def _prepare_luks_partitions(self, partitions: List[disk.PartitionModification]) -> Dict[disk.PartitionModification, Luks2]:
luks_handlers = {} return {
part_mod: disk.device_handler.unlock_luks2_dev(
for part_mod in partitions: part_mod.dev_path,
if part_mod.mapper_name and part_mod.dev_path: part_mod.mapper_name,
luks_handler = disk.device_handler.unlock_luks2_dev( self._disk_encryption.encryption_password
part_mod.dev_path, )
part_mod.mapper_name, for part_mod in partitions
self._disk_encryption.encryption_password if part_mod.mapper_name and part_mod.dev_path
) }
luks_handlers[part_mod] = luks_handler
return luks_handlers
def _mount_partition(self, part_mod: disk.PartitionModification): def _mount_partition(self, part_mod: disk.PartitionModification):
# it would be none if it's btrfs as the subvolumes will have the mountpoints defined # it would be none if it's btrfs as the subvolumes will have the mountpoints defined
@ -302,93 +291,6 @@ class Installer:
def post_install_check(self, *args :str, **kwargs :str) -> List[str]: def post_install_check(self, *args :str, **kwargs :str) -> List[str]:
return [step for step, flag in self.helper_flags.items() if flag is False] return [step for step, flag in self.helper_flags.items() if flag is False]
def enable_multilib_repository(self):
# Set up a regular expression pattern of a commented line containing 'multilib' within []
pattern = re.compile(r"^#\s*\[multilib\]$")
# This is used to track if the previous line is a match, so we end up uncommenting the line after the block.
matched = False
# Read in the lines from the original file
with open("/etc/pacman.conf", "r") as pacman_conf:
lines = pacman_conf.readlines()
# Open the file again in write mode, to replace the contents
with open("/etc/pacman.conf", "w") as pacman_conf:
for line in lines:
if pattern.match(line):
# If this is the [] block containing 'multilib', uncomment it and set the matched tracking boolean.
pacman_conf.write(line.lstrip('#'))
matched = True
elif matched:
# The previous line was a match for [.*multilib.*].
# This means we're on a line that looks like '#Include = /etc/pacman.d/mirrorlist'
pacman_conf.write(line.lstrip('#'))
matched = False # Reset the state of matched to False.
else:
pacman_conf.write(line)
def enable_testing_repositories(self, enable_multilib_testing=False):
# Set up a regular expression pattern of a commented line containing 'testing' within []
pattern = re.compile("^#\\[.*testing.*\\]$")
# This is used to track if the previous line is a match, so we end up uncommenting the line after the block.
matched = False
# Read in the lines from the original file
with open("/etc/pacman.conf", "r") as pacman_conf:
lines = pacman_conf.readlines()
# Open the file again in write mode, to replace the contents
with open("/etc/pacman.conf", "w") as pacman_conf:
for line in lines:
if pattern.match(line) and (enable_multilib_testing or 'multilib' not in line):
# If this is the [] block containing 'testing', uncomment it and set the matched tracking boolean.
pacman_conf.write(line.lstrip('#'))
matched = True
elif matched:
# The previous line was a match for [.*testing.*].
# This means we're on a line that looks like '#Include = /etc/pacman.d/mirrorlist'
pacman_conf.write(line.lstrip('#'))
matched = False # Reset the state of matched to False.
else:
pacman_conf.write(line)
def _pacstrap(self, packages: Union[str, List[str]]) -> bool:
if isinstance(packages, str):
packages = [packages]
for plugin in plugins.values():
if hasattr(plugin, 'on_pacstrap'):
if (result := plugin.on_pacstrap(packages)):
packages = result
info(f'Installing packages: {packages}')
# TODO: We technically only need to run the -Syy once.
try:
run_pacman('-Syy', default_cmd='/usr/bin/pacman')
except SysCallError as err:
error(f'Could not sync a new package database: {err}')
if storage['arguments'].get('silent', False) is False:
if input('Would you like to re-try this download? (Y/n): ').lower().strip() in ('', 'y'):
return self._pacstrap(packages)
raise RequirementError(f'Could not sync mirrors: {err}')
try:
SysCommand(f'/usr/bin/pacstrap -C /etc/pacman.conf -K {self.target} {" ".join(packages)} --noconfirm', peek_output=True)
return True
except SysCallError as err:
error(f'Could not strap in packages: {err}')
if storage['arguments'].get('silent', False) is False:
if input('Would you like to re-try this download? (Y/n): ').lower().strip() in ('', 'y'):
return self._pacstrap(packages)
raise RequirementError("Pacstrap failed. See /var/log/archinstall/install.log or above message for error details.")
def set_mirrors(self, mirror_config: MirrorConfiguration): def set_mirrors(self, mirror_config: MirrorConfiguration):
for plugin in plugins.values(): for plugin in plugins.values():
if hasattr(plugin, 'on_mirrors'): if hasattr(plugin, 'on_mirrors'):
@ -402,7 +304,8 @@ class Installer:
add_custom_mirrors(mirror_config.custom_mirrors) add_custom_mirrors(mirror_config.custom_mirrors)
def genfstab(self, flags :str = '-pU'): def genfstab(self, flags :str = '-pU'):
info(f"Updating {self.target}/etc/fstab") fstab_path = self.target / "etc" / "fstab"
info(f"Updating {fstab_path}")
try: try:
gen_fstab = SysCommand(f'/usr/bin/genfstab {flags} {self.target}').decode() gen_fstab = SysCommand(f'/usr/bin/genfstab {flags} {self.target}').decode()
@ -412,10 +315,10 @@ class Installer:
if not gen_fstab: if not gen_fstab:
raise RequirementError(f'Genrating fstab returned empty value') raise RequirementError(f'Genrating fstab returned empty value')
with open(f"{self.target}/etc/fstab", 'a') as fp: with open(fstab_path, 'a') as fp:
fp.write(gen_fstab) fp.write(gen_fstab)
if not os.path.isfile(f'{self.target}/etc/fstab'): if not fstab_path.is_file():
raise RequirementError(f'Could not create fstab file') raise RequirementError(f'Could not create fstab file')
for plugin in plugins.values(): for plugin in plugins.values():
@ -423,7 +326,7 @@ class Installer:
if plugin.on_genfstab(self) is True: if plugin.on_genfstab(self) is True:
break break
with open(f"{self.target}/etc/fstab", 'a') as fp: with open(fstab_path, 'a') as fp:
for entry in self._fstab_entries: for entry in self._fstab_entries:
fp.write(f'{entry}\n') fp.write(f'{entry}\n')
@ -432,9 +335,7 @@ class Installer:
if part_mod.fs_type != disk.FilesystemType.Btrfs: if part_mod.fs_type != disk.FilesystemType.Btrfs:
continue continue
fstab_file = Path(f'{self.target}/etc/fstab') with fstab_path.open('r') as fp:
with fstab_file.open('r') as fp:
fstab = fp.readlines() fstab = fp.readlines()
# Replace the {installation}/etc/fstab with entries # Replace the {installation}/etc/fstab with entries
@ -456,7 +357,7 @@ class Installer:
fstab[index] = line.replace(subvoldef[0], f',compress=zstd{subvoldef[0]}') fstab[index] = line.replace(subvoldef[0], f',compress=zstd{subvoldef[0]}')
break break
with fstab_file.open('w') as fp: with fstab_path.open('w') as fp:
fp.writelines(fstab) fp.writelines(fstab)
def set_hostname(self, hostname: str, *args :str, **kwargs :str) -> None: def set_hostname(self, hostname: str, *args :str, **kwargs :str) -> None:
@ -486,8 +387,7 @@ class Installer:
with open(f'{self.target}/etc/locale.gen', 'a') as fh: with open(f'{self.target}/etc/locale.gen', 'a') as fh:
fh.write(f'{lang}.{encoding}{modifier} {encoding}\n') fh.write(f'{lang}.{encoding}{modifier} {encoding}\n')
with open(f'{self.target}/etc/locale.conf', 'w') as fh: (self.target / "etc" / "locale.conf").write_text(f'LANG={lang}.{encoding}{modifier}\n')
fh.write(f'LANG={lang}.{encoding}{modifier}\n')
try: try:
SysCommand(f'/usr/bin/arch-chroot {self.target} locale-gen') SysCommand(f'/usr/bin/arch-chroot {self.target} locale-gen')
@ -561,16 +461,13 @@ class Installer:
for plugin in plugins.values(): for plugin in plugins.values():
if hasattr(plugin, 'on_configure_nic'): if hasattr(plugin, 'on_configure_nic'):
new_conf = plugin.on_configure_nic( conf = plugin.on_configure_nic(
network_config.iface, network_config.iface,
network_config.dhcp, network_config.dhcp,
network_config.ip, network_config.ip,
network_config.gateway, network_config.gateway,
network_config.dns network_config.dns
) ) or conf
if new_conf:
conf = new_conf
with open(f"{self.target}/etc/systemd/network/10-{network_config.iface}.network", "a") as netconf: with open(f"{self.target}/etc/systemd/network/10-{network_config.iface}.network", "a") as netconf:
netconf.write(str(conf)) netconf.write(str(conf))
@ -597,7 +494,7 @@ class Installer:
# Otherwise, we can go ahead and add the required package # Otherwise, we can go ahead and add the required package
# and enable it's service: # and enable it's service:
else: else:
self._pacstrap('iwd') self.pacman.strap('iwd')
self.enable_service('iwd') self.enable_service('iwd')
for psk in psk_files: for psk in psk_files:
@ -683,7 +580,7 @@ class Installer:
if part in self._disk_encryption.partitions: if part in self._disk_encryption.partitions:
if self._disk_encryption.hsm_device: if self._disk_encryption.hsm_device:
# Required bby mkinitcpio to add support for fido2-device options # Required bby mkinitcpio to add support for fido2-device options
self._pacstrap('libfido2') self.pacman.strap('libfido2')
if 'sd-encrypt' not in self._hooks: if 'sd-encrypt' not in self._hooks:
self._hooks.insert(self._hooks.index('filesystems'), 'sd-encrypt') self._hooks.insert(self._hooks.index('filesystems'), 'sd-encrypt')
@ -709,24 +606,27 @@ class Installer:
# Determine whether to enable multilib/testing repositories before running pacstrap if testing flag is set. # Determine whether to enable multilib/testing repositories before running pacstrap if testing flag is set.
# This action takes place on the host system as pacstrap copies over package repository lists. # This action takes place on the host system as pacstrap copies over package repository lists.
pacman_conf = pacman.Config(self.target)
if multilib: if multilib:
info("The multilib flag is set. This system will be installed with the multilib repository enabled.") info("The multilib flag is set. This system will be installed with the multilib repository enabled.")
self.enable_multilib_repository() pacman_conf.enable(pacman.Repo.Multilib)
else: else:
info("The multilib flag is not set. This system will be installed without multilib repositories enabled.") info("The multilib flag is not set. This system will be installed without multilib repositories enabled.")
if testing: if testing:
info("The testing flag is set. This system will be installed with testing repositories enabled.") info("The testing flag is set. This system will be installed with testing repositories enabled.")
self.enable_testing_repositories(multilib) pacman_conf.enable(pacman.Repo.Testing)
if multilib:
pacman_conf.enable(pacman.Repo.MultilibTesting)
else: else:
info("The testing flag is not set. This system will be installed without testing repositories enabled.") info("The testing flag is not set. This system will be installed without testing repositories enabled.")
self._pacstrap(self.base_packages) pacman_conf.apply()
self.pacman.strap(self.base_packages)
self.helper_flags['base-strapped'] = True self.helper_flags['base-strapped'] = True
# This handles making sure that the repositories we enabled persist on the installed system pacman_conf.persist()
if multilib or testing:
shutil.copy2("/etc/pacman.conf", f"{self.target}/etc/pacman.conf")
# Periodic TRIM may improve the performance and longevity of SSDs whilst # Periodic TRIM may improve the performance and longevity of SSDs whilst
# having no adverse effect on other devices. Most distributions enable # having no adverse effect on other devices. Most distributions enable
@ -761,7 +661,7 @@ class Installer:
def setup_swap(self, kind :str = 'zram'): def setup_swap(self, kind :str = 'zram'):
if kind == 'zram': if kind == 'zram':
info(f"Setting up swap on zram") info(f"Setting up swap on zram")
self._pacstrap('zram-generator') self.pacman.strap('zram-generator')
# We could use the default example below, but maybe not the best idea: https://github.com/archlinux/archinstall/pull/678#issuecomment-962124813 # We could use the default example below, but maybe not the best idea: https://github.com/archlinux/archinstall/pull/678#issuecomment-962124813
# zram_example_location = '/usr/share/doc/zram-generator/zram-generator.conf.example' # zram_example_location = '/usr/share/doc/zram-generator/zram-generator.conf.example'
@ -788,7 +688,7 @@ class Installer:
return None return None
def _add_systemd_bootloader(self, root_partition: disk.PartitionModification): def _add_systemd_bootloader(self, root_partition: disk.PartitionModification):
self._pacstrap('efibootmgr') self.pacman.strap('efibootmgr')
if not SysInfo.has_uefi(): if not SysInfo.has_uefi():
raise HardwareIncompatibilityError raise HardwareIncompatibilityError
@ -897,7 +797,7 @@ class Installer:
boot_partition: disk.PartitionModification, boot_partition: disk.PartitionModification,
root_partition: disk.PartitionModification root_partition: disk.PartitionModification
): ):
self._pacstrap('grub') # no need? self.pacman.strap('grub') # no need?
_file = "/etc/default/grub" _file = "/etc/default/grub"
@ -916,7 +816,7 @@ class Installer:
info(f"GRUB boot partition: {boot_partition.dev_path}") info(f"GRUB boot partition: {boot_partition.dev_path}")
if SysInfo.has_uefi(): if SysInfo.has_uefi():
self._pacstrap('efibootmgr') # TODO: Do we need? Yes, but remove from minimal_installation() instead? self.pacman.strap('efibootmgr') # TODO: Do we need? Yes, but remove from minimal_installation() instead?
try: try:
SysCommand(f'/usr/bin/arch-chroot {self.target} grub-install --debug --target=x86_64-efi --efi-directory=/boot --bootloader-id=GRUB --removable', peek_output=True) SysCommand(f'/usr/bin/arch-chroot {self.target} grub-install --debug --target=x86_64-efi --efi-directory=/boot --bootloader-id=GRUB --removable', peek_output=True)
@ -955,7 +855,7 @@ class Installer:
boot_partition: disk.PartitionModification, boot_partition: disk.PartitionModification,
root_partition: disk.PartitionModification root_partition: disk.PartitionModification
): ):
self._pacstrap('efibootmgr') self.pacman.strap('efibootmgr')
if not SysInfo.has_uefi(): if not SysInfo.has_uefi():
raise HardwareIncompatibilityError raise HardwareIncompatibilityError
@ -1030,9 +930,6 @@ class Installer:
if plugin.on_add_bootloader(self): if plugin.on_add_bootloader(self):
return True return True
if type(self.target) == str:
self.target = Path(self.target)
boot_partition = self._get_boot_partition() boot_partition = self._get_boot_partition()
root_partition = self._get_root_partition() root_partition = self._get_root_partition()
@ -1053,7 +950,7 @@ class Installer:
self._add_efistub_bootloader(boot_partition, root_partition) self._add_efistub_bootloader(boot_partition, root_partition)
def add_additional_packages(self, packages: Union[str, List[str]]) -> bool: def add_additional_packages(self, packages: Union[str, List[str]]) -> bool:
return self._pacstrap(packages) return self.pacman.strap(packages)
def _enable_users(self, service: str, users: List[User]): def _enable_users(self, service: str, users: List[User]):
for user in users: for user in users:
@ -1214,7 +1111,7 @@ class Installer:
return True return True
def _service_started(self, service_name: str) -> str | None: def _service_started(self, service_name: str) -> Optional[str]:
if os.path.splitext(service_name)[1] not in ('.service', '.target', '.timer'): if os.path.splitext(service_name)[1] not in ('.service', '.target', '.timer'):
service_name += '.service' # Just to be safe service_name += '.service' # Just to be safe

View File

@ -1,3 +1,5 @@
from itertools import takewhile
from pathlib import Path
from typing import Iterator, List from typing import Iterator, List
from .exceptions import ServiceException, SysCallError from .exceptions import ServiceException, SysCallError
@ -11,21 +13,12 @@ def list_keyboard_languages() -> Iterator[str]:
def list_locales() -> List[str]: def list_locales() -> List[str]:
with open('/etc/locale.gen', 'r') as fp: entries = Path('/etc/locale.gen').read_text().splitlines()
locales = [] # Before the list of locales begins there's an empty line with a '#' in front
# before the list of locales begins there's an empty line with a '#' in front # so we'll collect the locales from bottom up and halt when we're done.
# so we'll collect the localels from bottom up and halt when we're donw locales = list(takewhile(bool, map(lambda entry: entry.strip('\n\t #'), reversed(entries))))
entries = fp.readlines() locales.reverse()
entries.reverse() return locales
for entry in entries:
text = entry.replace('#', '').strip()
if text == '':
break
locales.append(text)
locales.reverse()
return locales
def list_x11_keyboard_languages() -> Iterator[str]: def list_x11_keyboard_languages() -> Iterator[str]:

View File

@ -9,7 +9,7 @@ from urllib.request import urlopen
from .exceptions import SysCallError from .exceptions import SysCallError
from .output import error, info, debug from .output import error, info, debug
from .pacman import run_pacman from .pacman import Pacman
def get_hw_addr(ifname :str) -> str: def get_hw_addr(ifname :str) -> str:
@ -35,7 +35,7 @@ def list_interfaces(skip_loopback :bool = True) -> Dict[str, str]:
def check_mirror_reachable() -> bool: def check_mirror_reachable() -> bool:
info("Testing connectivity to the Arch Linux mirrors...") info("Testing connectivity to the Arch Linux mirrors...")
try: try:
run_pacman("-Sy") Pacman.run("-Sy")
return True return True
except SysCallError as err: except SysCallError as err:
if os.geteuid() != 0: if os.geteuid() != 0:
@ -48,7 +48,7 @@ def check_mirror_reachable() -> bool:
def update_keyring() -> bool: def update_keyring() -> bool:
info("Updating archlinux-keyring ...") info("Updating archlinux-keyring ...")
try: try:
run_pacman("-Sy --noconfirm archlinux-keyring") Pacman.run("-Sy --noconfirm archlinux-keyring")
return True return True
except SysCallError: except SysCallError:
if os.geteuid() != 0: if os.geteuid() != 0:

View File

@ -8,7 +8,7 @@ from urllib.request import urlopen
from ..exceptions import PackageError, SysCallError from ..exceptions import PackageError, SysCallError
from ..models.gen import PackageSearch, PackageSearchResult, LocalPackage from ..models.gen import PackageSearch, PackageSearchResult, LocalPackage
from ..pacman import run_pacman from ..pacman import Pacman
BASE_URL_PKG_SEARCH = 'https://archlinux.org/packages/search/json/' BASE_URL_PKG_SEARCH = 'https://archlinux.org/packages/search/json/'
# BASE_URL_PKG_CONTENT = 'https://archlinux.org/packages/search/json/' # BASE_URL_PKG_CONTENT = 'https://archlinux.org/packages/search/json/'
@ -106,7 +106,7 @@ def validate_package_list(packages :list) -> Tuple[list, list]:
def installed_package(package :str) -> LocalPackage: def installed_package(package :str) -> LocalPackage:
package_info = {} package_info = {}
try: try:
for line in run_pacman(f"-Q --info {package}"): for line in Pacman.run(f"-Q --info {package}"):
if b':' in line: if b':' in line:
key, value = line.decode().split(':', 1) key, value = line.decode().split(':', 1)
package_info[key.strip().lower().replace(' ', '_')] = value.strip() package_info[key.strip().lower().replace(' ', '_')] = value.strip()

View File

@ -1,31 +0,0 @@
import pathlib
import time
from typing import TYPE_CHECKING, Any
from .general import SysCommand
from .output import warn, error
if TYPE_CHECKING:
_: Any
def run_pacman(args :str, default_cmd :str = 'pacman') -> SysCommand:
"""
A centralized function to call `pacman` from.
It also protects us from colliding with other running pacman sessions (if used locally).
The grace period is set to 10 minutes before exiting hard if another pacman instance is running.
"""
pacman_db_lock = pathlib.Path('/var/lib/pacman/db.lck')
if pacman_db_lock.exists():
warn(_('Pacman is already running, waiting maximum 10 minutes for it to terminate.'))
started = time.time()
while pacman_db_lock.exists():
time.sleep(0.25)
if time.time() - started > (60 * 10):
error(_('Pre-existing pacman lock never exited. Please clean up any existing pacman sessions before using archinstall.'))
exit(1)
return SysCommand(f'{default_cmd} {args}')

View File

@ -0,0 +1,88 @@
from pathlib import Path
import time
import re
from typing import TYPE_CHECKING, Any, List, Callable, Union
from shutil import copy2
from ..general import SysCommand
from ..output import warn, error, info
from .repo import Repo
from .config import Config
from ..exceptions import RequirementError
from ..plugins import plugins
if TYPE_CHECKING:
_: Any
class Pacman:
def __init__(self, target: Path, silent: bool = False):
self.synced = False
self.silent = silent
self.target = target
@staticmethod
def run(args :str, default_cmd :str = 'pacman') -> SysCommand:
"""
A centralized function to call `pacman` from.
It also protects us from colliding with other running pacman sessions (if used locally).
The grace period is set to 10 minutes before exiting hard if another pacman instance is running.
"""
pacman_db_lock = Path('/var/lib/pacman/db.lck')
if pacman_db_lock.exists():
warn(_('Pacman is already running, waiting maximum 10 minutes for it to terminate.'))
started = time.time()
while pacman_db_lock.exists():
time.sleep(0.25)
if time.time() - started > (60 * 10):
error(_('Pre-existing pacman lock never exited. Please clean up any existing pacman sessions before using archinstall.'))
exit(1)
return SysCommand(f'{default_cmd} {args}')
def ask(self, error_message: str, bail_message: str, func: Callable, *args, **kwargs):
while True:
try:
func(*args, **kwargs)
break
except Exception as err:
error(f'{error_message}: {err}')
if not self.silent and input('Would you like to re-try this download? (Y/n): ').lower().strip() in 'y':
continue
raise RequirementError(f'{bail_message}: {err}')
def sync(self):
if self.synced:
return
self.ask(
'Could not sync a new package database',
'Could not sync mirrors',
self.run,
'-Syy',
default_cmd='/usr/bin/pacman'
)
self.synced = True
def strap(self, packages: Union[str, List[str]]):
self.sync()
if isinstance(packages, str):
packages = [packages]
for plugin in plugins.values():
if hasattr(plugin, 'on_pacstrap'):
if (result := plugin.on_pacstrap(packages)):
packages = result
info(f'Installing packages: {packages}')
self.ask(
'Could not strap in packages',
'Pacstrap failed. See /var/log/archinstall/install.log or above message for error details',
SysCommand,
f'/usr/bin/pacstrap -C /etc/pacman.conf -K {self.target} {" ".join(packages)} --noconfirm',
peek_output=True
)

View File

@ -0,0 +1,33 @@
import re
from pathlib import Path
from shutil import copy2
from typing import List
from .repo import Repo
class Config:
def __init__(self, target: Path):
self.path = Path("/etc") / "pacman.conf"
self.chroot_path = target / "etc" / "pacman.conf"
self.patterns: List[re.Pattern] = []
def enable(self, repo: Repo):
self.patterns.append(re.compile(r"^#\s*\[{}\]$".format(repo.value)))
def apply(self):
if not self.patterns:
return
lines = iter(self.path.read_text().splitlines(keepends=True))
with open(self.path, 'w') as f:
for line in lines:
if any(pattern.match(line) for pattern in self.patterns):
# Uncomment this line and the next.
f.write(line.lstrip('#'))
f.write(next(lines).lstrip('#'))
else:
f.write(line)
def persist(self):
if self.patterns:
copy2(self.path, self.chroot_path)

View File

@ -0,0 +1,6 @@
from enum import Enum
class Repo(Enum):
Multilib = "multilib"
Testing = "testing"
MultilibTesting = "multilib-testing"

View File

@ -98,14 +98,19 @@ class ProfileHandler:
profile = self.get_profile_by_name(main) if main else None profile = self.get_profile_by_name(main) if main else None
valid: List[Profile] = [] valid: List[Profile] = []
details: List[str] = profile_config.get('details', [])
if details:
valid = []
invalid = []
if details := profile_config.get('details', []): for detail in filter(None, details):
resolved = {detail: self.get_profile_by_name(detail) for detail in details if detail} if profile := self.get_profile_by_name(detail):
valid = [p for p in resolved.values() if p is not None] valid.append(profile)
invalid = ', '.join([k for k, v in resolved.items() if v is None]) else:
invalid.append(detail)
if invalid: if invalid:
info(f'No profile definition found: {invalid}') info('No profile definition found: {}'.format(', '.join(invalid)))
custom_settings = profile_config.get('custom_settings', {}) custom_settings = profile_config.get('custom_settings', {})
for profile in valid: for profile in valid:
@ -123,14 +128,12 @@ class ProfileHandler:
""" """
List of all available default_profiles List of all available default_profiles
""" """
if self._profiles is None: self._profiles = self._profiles or self._find_available_profiles()
self._profiles = self._find_available_profiles()
return self._profiles return self._profiles
@cached_property @cached_property
def _local_mac_addresses(self) -> List[str]: def _local_mac_addresses(self) -> List[str]:
ifaces = list_interfaces() return list(list_interfaces())
return list(ifaces.keys())
def add_custom_profiles(self, profiles: Union[TProfile, List[TProfile]]): def add_custom_profiles(self, profiles: Union[TProfile, List[TProfile]]):
if not isinstance(profiles, list): if not isinstance(profiles, list):
@ -190,25 +193,20 @@ class ProfileHandler:
def install_gfx_driver(self, install_session: 'Installer', driver: Optional[GfxDriver]): def install_gfx_driver(self, install_session: 'Installer', driver: Optional[GfxDriver]):
try: try:
driver_pkgs = driver.packages() if driver else []
pkg_names = [p.value for p in driver_pkgs]
additional_pkg = ' '.join(['xorg-server', 'xorg-xinit'] + pkg_names)
if driver is not None: if driver is not None:
# Find the intersection between the set of known nvidia drivers driver_pkgs = driver.packages()
# and the selected driver packages. Since valid intesections can pkg_names = [p.value for p in driver_pkgs]
# only have one element or none, we iterate and try to take the for driver_pkg in {GfxPackage.Nvidia, GfxPackage.NvidiaOpen} & set(driver_pkgs):
# first element. for kernel in {"linux-lts", "linux-zen"} & set(install_session.kernels):
if driver_pkg := next(iter({GfxPackage.Nvidia, GfxPackage.NvidiaOpen} & set(driver_pkgs)), None): # Fixes https://github.com/archlinux/archinstall/issues/585
if any(kernel in install_session.base_packages for kernel in ("linux-lts", "linux-zen")): install_session.add_additional_packages(f"{kernel}-headers")
for kernel in install_session.kernels:
# Fixes https://github.com/archlinux/archinstall/issues/585
install_session.add_additional_packages(f"{kernel}-headers")
# I've had kernel regen fail if it wasn't installed before nvidia-dkms # I've had kernel regen fail if it wasn't installed before nvidia-dkms
install_session.add_additional_packages(['dkms', 'xorg-server', 'xorg-xinit', f'{driver_pkg}-dkms']) install_session.add_additional_packages(['dkms', 'xorg-server', 'xorg-xinit', f'{driver_pkg.value}-dkms'])
return # Return after first driver match, since it is impossible to use both simultaneously.
elif 'amdgpu' in driver_pkgs: return
if 'amdgpu' in driver_pkgs:
# The order of these two are important if amdgpu is installed #808 # The order of these two are important if amdgpu is installed #808
if 'amdgpu' in install_session.modules: if 'amdgpu' in install_session.modules:
install_session.modules.remove('amdgpu') install_session.modules.remove('amdgpu')
@ -218,23 +216,24 @@ class ProfileHandler:
install_session.modules.remove('radeon') install_session.modules.remove('radeon')
install_session.modules.append('radeon') install_session.modules.append('radeon')
install_session.add_additional_packages(additional_pkg) install_session.add_additional_packages(pkg_names)
except Exception as err: except Exception as err:
warn(f"Could not handle nvidia and linuz-zen specific situations during xorg installation: {err}") warn(f"Could not handle nvidia and linuz-zen specific situations during xorg installation: {err}")
# Prep didn't run, so there's no driver to install # Prep didn't run, so there's no driver to install
install_session.add_additional_packages(['xorg-server', 'xorg-xinit']) install_session.add_additional_packages(['xorg-server', 'xorg-xinit'])
def install_profile_config(self, install_session: 'Installer', profile_config: ProfileConfiguration): def install_profile_config(self, install_session: 'Installer', profile_config: ProfileConfiguration):
profile = profile_config.profile profile = profile_config.profile
if profile: if not profile:
profile.install(install_session) return
if profile and profile_config.gfx_driver: profile.install(install_session)
if profile.is_xorg_type_profile() or profile.is_desktop_type_profile():
self.install_gfx_driver(install_session, profile_config.gfx_driver)
if profile and profile_config.greeter: if profile_config.gfx_driver and (profile.is_xorg_type_profile() or profile.is_desktop_type_profile()):
self.install_gfx_driver(install_session, profile_config.gfx_driver)
if profile_config.greeter:
self.install_greeter(install_session, profile_config.greeter) self.install_greeter(install_session, profile_config.greeter)
def _import_profile_from_url(self, url: str): def _import_profile_from_url(self, url: str):
@ -312,8 +311,7 @@ class ProfileHandler:
debug(f'Importing profile: {file}') debug(f'Importing profile: {file}')
try: try:
spec = importlib.util.spec_from_file_location(name, file) if spec := importlib.util.spec_from_file_location(name, file):
if spec is not None:
imported = importlib.util.module_from_spec(spec) imported = importlib.util.module_from_spec(spec)
if spec.loader is not None: if spec.loader is not None:
spec.loader.exec_module(imported) spec.loader.exec_module(imported)