Reworking SysCommand & Moving to localectl for locale related activities

* Moving to `localectl` rather than local file manipulation *(both for listing locales and setting them)*.
* Swapped `loadkeys` for localectl.
* Renamed `main` to `maim` in awesome profile.
* Created `archinstall.Boot(<installation>)` which spawns a `systemd-nspawn` container against the installation target.
* Exposing systemd.py's internals to archinstall global scope.
* Re-worked `SysCommand` completely, it's now a wrapper for `SysCommandWorker` which supports interacting with the process in a different way. `SysCommand` should behave just like the old one, for backwards compatibility reasons. This fixes #68 and #69.
* `SysCommand()` now has a `.decode()` function that defaults to `UTF-8`.
* Adding back peak_output=True to pacstrap.

Co-authored-by: Anton Hvornum <anton.feeds@gmail.com>
Co-authored-by: Dylan Taylor <dylan@dylanmtaylor.com>
This commit is contained in:
Anton Hvornum 2021-05-19 14:45:13 +00:00 committed by GitHub
parent 52960bd686
commit 49e6cbdc54
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
12 changed files with 418 additions and 246 deletions

3
.gitignore vendored
View File

@ -20,9 +20,10 @@ SAFETY_LOCK
**/**.network **/**.network
**/**.target **/**.target
**/**.qcow2 **/**.qcow2
**/test.py /test*.py
**/archiso **/archiso
/guided.py /guided.py
/install.log /install.log
venv venv
.idea/** .idea/**
**/install.log

View File

@ -13,6 +13,7 @@ from .lib.packages import *
from .lib.profiles import * from .lib.profiles import *
from .lib.services import * from .lib.services import *
from .lib.storage import * from .lib.storage import *
from .lib.systemd import *
from .lib.user_interaction import * from .lib.user_interaction import *
__version__ = "2.2.0.dev1" __version__ = "2.2.0.dev1"

View File

@ -1,6 +1,7 @@
import glob import glob
import pathlib import pathlib
import re import re
import time
from collections import OrderedDict from collections import OrderedDict
from typing import Optional from typing import Optional
@ -77,7 +78,7 @@ class BlockDevice:
raise DiskError(f'Could not locate backplane info for "{self.path}"') raise DiskError(f'Could not locate backplane info for "{self.path}"')
if self.info['type'] == 'loop': if self.info['type'] == 'loop':
for drive in json.loads(b''.join(SysCommand(['losetup', '--json'], hide_from_log=True)).decode('UTF_8'))['loopdevices']: for drive in json.loads(b''.join(SysCommand(['losetup', '--json'])).decode('UTF_8'))['loopdevices']:
if not drive['name'] == self.path: if not drive['name'] == self.path:
continue continue
@ -264,7 +265,9 @@ class Partition:
raise DiskError(f'Could not mount and check for content on {self.path} because: {b"".join(handle)}') raise DiskError(f'Could not mount and check for content on {self.path} because: {b"".join(handle)}')
files = len(glob.glob(f"{temporary_mountpoint}/*")) files = len(glob.glob(f"{temporary_mountpoint}/*"))
SysCommand(f'/usr/bin/umount {temporary_mountpoint}') iterations = 0
while SysCommand(f"/usr/bin/umount -R {temporary_mountpoint}").exit_code != 0 and (iterations := iterations+1) < 10:
time.sleep(1)
temporary_path.rmdir() temporary_path.rmdir()
@ -425,7 +428,7 @@ class Partition:
""" """
try: try:
self.format(self.filesystem, '/dev/null', log_formatting=False, allow_formatting=True) self.format(self.filesystem, '/dev/null', log_formatting=False, allow_formatting=True)
except SysCallError: except (SysCallError, DiskError):
pass # We supported it, but /dev/null is not formatable as expected so the mkfs call exited with an error code pass # We supported it, but /dev/null is not formatable as expected so the mkfs call exited with an error code
except UnknownFilesystemFormat as err: except UnknownFilesystemFormat as err:
raise err raise err
@ -572,7 +575,7 @@ def all_disks(*args, **kwargs):
kwargs.setdefault("partitions", False) kwargs.setdefault("partitions", False)
drives = OrderedDict() drives = OrderedDict()
# for drive in json.loads(sys_command(f'losetup --json', *args, **lkwargs, hide_from_log=True)).decode('UTF_8')['loopdevices']: # for drive in json.loads(sys_command(f'losetup --json', *args, **lkwargs, hide_from_log=True)).decode('UTF_8')['loopdevices']:
for drive in json.loads(b''.join(SysCommand('lsblk --json -l -n -o path,size,type,mountpoint,label,pkname,model', *args, **kwargs, hide_from_log=True)).decode('UTF_8'))['blockdevices']: for drive in json.loads(b''.join(SysCommand('lsblk --json -l -n -o path,size,type,mountpoint,label,pkname,model')).decode('UTF_8'))['blockdevices']:
if not kwargs['partitions'] and drive['type'] == 'part': if not kwargs['partitions'] and drive['type'] == 'part':
continue continue
@ -603,13 +606,17 @@ def harddrive(size=None, model=None, fuzzy=False):
return collection[drive] return collection[drive]
def get_mount_info(path): def get_mount_info(path) -> dict:
try: try:
output = b''.join(SysCommand(f'/usr/bin/findmnt --json {path}')) output = SysCommand(f'/usr/bin/findmnt --json {path}')
except SysCallError: except SysCallError:
return {} return {}
output = output.decode('UTF-8') output = output.decode('UTF-8')
if not output:
return {}
output = json.loads(output) output = json.loads(output)
if 'filesystems' in output: if 'filesystems' in output:
if len(output['filesystems']) > 1: if len(output['filesystems']) > 1:
@ -618,15 +625,19 @@ def get_mount_info(path):
return output['filesystems'][0] return output['filesystems'][0]
def get_partitions_in_use(mountpoint): def get_partitions_in_use(mountpoint) -> list:
try: try:
output = b''.join(SysCommand(f'/usr/bin/findmnt --json -R {mountpoint}')) output = SysCommand(f'/usr/bin/findmnt --json -R {mountpoint}')
except SysCallError: except SysCallError:
return {} return []
mounts = [] mounts = []
output = output.decode('UTF-8') output = output.decode('UTF-8')
if not output:
return []
output = json.loads(output) output = json.loads(output)
for target in output.get('filesystems', []): for target in output.get('filesystems', []):
mounts.append(Partition(target['source'], None, filesystem=target.get('fstype', None), mountpoint=target['target'])) mounts.append(Partition(target['source'], None, filesystem=target.get('fstype', None), mountpoint=target['target']))

View File

@ -4,6 +4,7 @@ import logging
import os import os
import pty import pty
import shlex import shlex
import subprocess
import sys import sys
import time import time
from datetime import datetime, date from datetime import datetime, date
@ -41,6 +42,8 @@ def locate_binary(name):
return os.path.join(root, file) return os.path.join(root, file)
break # Don't recurse break # Don't recurse
raise RequirementError(f"Binary {name} does not exist.")
class JsonEncoder: class JsonEncoder:
def _encode(obj): def _encode(obj):
@ -84,108 +87,125 @@ class JSON(json.JSONEncoder, json.JSONDecoder):
return super(JSON, self).encode(self._encode(obj)) return super(JSON, self).encode(self._encode(obj))
class SysCommand: class SysCommandWorker:
""" def __init__(self, cmd, callbacks=None, peak_output=False, environment_vars=None, logfile=None, working_directory='./'):
Stolen from archinstall_gui if not callbacks:
""" callbacks = {}
if not environment_vars:
def __init__(self, cmd, callback=None, start_callback=None, peak_output=False, environment_vars=None, *args, **kwargs):
if environment_vars is None:
environment_vars = {} environment_vars = {}
kwargs.setdefault("worker_id", gen_uid())
kwargs.setdefault("emulate", False)
kwargs.setdefault("suppress_errors", False)
self.log = kwargs.get('log', log) if type(cmd) is str:
cmd = shlex.split(cmd)
if kwargs['emulate']: if cmd[0][0] != '/' and cmd[0][:2] != './':
self.log(f"Starting command '{cmd}' in emulation mode.", level=logging.DEBUG) # "which" doesn't work as it's a builtin to bash.
# It used to work, but for whatever reason it doesn't anymore.
# We there for fall back on manual lookup in os.PATH
cmd[0] = locate_binary(cmd[0])
if type(cmd) is list: self.cmd = cmd
# if we get a list of arguments self.callbacks = callbacks
self.raw_cmd = shlex.join(cmd)
self.cmd = cmd
else:
# else consider it a single shell string
# this should only be used if really necessary
self.raw_cmd = cmd
try:
self.cmd = shlex.split(cmd)
except Exception as e:
raise ValueError(f'Incorrect string to split: {cmd}\n{e}')
self.args = args
self.kwargs = kwargs
self.peak_output = peak_output self.peak_output = peak_output
self.environment_vars = environment_vars self.environment_vars = environment_vars
self.logfile = logfile
self.working_directory = working_directory
self.kwargs.setdefault("worker", None)
self.callback = callback
self.pid = None
self.exit_code = None self.exit_code = None
self.started = time.time() self._trace_log = b''
self._trace_log_pos = 0
self.poll_object = epoll()
self.child_fd = None
self.started = None
self.ended = None self.ended = None
self.worker_id = kwargs['worker_id']
self.trace_log = b''
self.status = 'starting'
user_catalogue = os.path.expanduser('~') def __contains__(self, key: bytes):
"""
Contains will also move the current buffert position forward.
This is to avoid re-checking the same data when looking for output.
"""
assert type(key) == bytes
if workdir := kwargs.get('workdir', None): if (contains := key in self._trace_log[self._trace_log_pos:]):
self.cwd = workdir self._trace_log_pos += self._trace_log[self._trace_log_pos:].find(key) + len(key)
self.exec_dir = workdir
else:
self.cwd = f"{user_catalogue}/.cache/archinstall/workers/{kwargs['worker_id']}/"
self.exec_dir = f'{self.cwd}/{os.path.basename(self.cmd[0])}_workingdir'
if not self.cmd[0][0] == '/': return contains
# "which" doesn't work as it's a builtin to bash.
# It used to work, but for whatever reason it doesn't anymore. So back to square one..
# self.log('Worker command is not executed with absolute path, trying to find: {}'.format(self.cmd[0]), origin='spawn', level=5)
# self.log('This is the binary {} for {}'.format(o.decode('UTF-8'), self.cmd[0]), origin='spawn', level=5)
self.cmd[0] = locate_binary(self.cmd[0])
if not os.path.isdir(self.exec_dir):
os.makedirs(self.exec_dir)
if start_callback:
start_callback(self, *args, **kwargs)
self.run()
def __iter__(self, *args, **kwargs): def __iter__(self, *args, **kwargs):
for line in self.trace_log.split(b'\n'): for line in self._trace_log[self._trace_log_pos:self._trace_log.rfind(b'\n')].split(b'\n'):
yield line if line:
yield line + b'\n'
def __repr__(self, *args, **kwargs): self._trace_log_pos = self._trace_log.rfind(b'\n')
return f"{self.cmd, self.trace_log}"
def decode(self, fmt='UTF-8'): def __repr__(self):
return self.trace_log.decode(fmt) self.make_sure_we_are_executing()
return str(self._trace_log)
def dump(self): def __enter__(self):
return { return self
'status': self.status,
'worker_id': self.worker_id,
'worker_result': self.trace_log.decode('UTF-8'),
'started': self.started,
'ended': self.ended,
'started_pprint': '{}-{}-{} {}:{}:{}'.format(*time.localtime(self.started)),
'ended_pprint': '{}-{}-{} {}:{}:{}'.format(*time.localtime(self.ended)) if self.ended else None,
'exit_code': self.exit_code,
}
def peak(self, output: Union[str, bytes]) -> bool: def __exit__(self, *args):
if type(output) == bytes: # b''.join(sys_command('sync')) # No need to, since the underlying fs() object will call sync.
# TODO: https://stackoverflow.com/questions/28157929/how-to-safely-handle-an-exception-inside-a-context-manager
if self.child_fd:
try: try:
output = output.decode('UTF-8') os.close(self.child_fd)
except UnicodeDecodeError: except:
return False pass
output = output.strip('\r\n ')
if len(output) <= 0:
return False
if self.peak_output: if self.peak_output:
# To make sure any peaked output didn't leave us hanging
# on the same line we were on.
sys.stdout.write("\n")
sys.stdout.flush()
if len(args) >= 2 and args[1]:
log(args[1], level=logging.ERROR, fg='red')
if self.exit_code != 0:
raise SysCallError(f"{self.cmd} exited with abnormal exit code: {self.exit_code}")
def is_alive(self):
self.poll()
if self.started and self.ended is None:
return True
return False
def write(self, data: bytes, line_ending=True):
assert type(data) == bytes # TODO: Maybe we can support str as well and encode it
self.make_sure_we_are_executing()
os.write(self.child_fd, data + (b'\n' if line_ending else b''))
def make_sure_we_are_executing(self):
if not self.started:
return self.execute()
def tell(self) -> int:
self.make_sure_we_are_executing()
return self._trace_log_pos
def seek(self, pos):
self.make_sure_we_are_executing()
# Safety check to ensure 0 < pos < len(tracelog)
self._trace_log_pos = min(max(0, pos), len(self._trace_log))
def peak(self, output: Union[str, bytes]) -> bool:
if self.peak_output:
if type(output) == bytes:
try:
output = output.decode('UTF-8')
except UnicodeDecodeError:
return False
output = output.strip('\r\n ')
if len(output) <= 0:
return False
from .user_interaction import get_terminal_width from .user_interaction import get_terminal_width
# Move back to the beginning of the terminal # Move back to the beginning of the terminal
@ -207,125 +227,127 @@ class SysCommand:
sys.stdout.flush() sys.stdout.flush()
return True return True
def run(self): def poll(self):
self.status = 'running' self.make_sure_we_are_executing()
old_dir = os.getcwd()
os.chdir(self.exec_dir)
self.pid, child_fd = pty.fork()
if not self.pid: # Child process
# Replace child process with our main process
if not self.kwargs['emulate']:
try:
os.execve(self.cmd[0], self.cmd, {**os.environ, **self.environment_vars})
except FileNotFoundError:
self.status = 'done'
self.log(f"{self.cmd[0]} does not exist.", level=logging.DEBUG)
self.exit_code = 1
return False
os.chdir(old_dir) got_output = False
for fileno, event in self.poll_object.poll(0.1):
try:
output = os.read(self.child_fd, 8192)
got_output = True
self.peak(output)
self._trace_log += output
except OSError as err:
self.ended = time.time()
break
poller = epoll() if self.ended or (got_output is False and pid_exists(self.pid) is False):
poller.register(child_fd, EPOLLIN | EPOLLHUP) self.ended = time.time()
if 'events' in self.kwargs and 'debug' in self.kwargs:
self.log(f'[D] Using triggers for command: {self.cmd}', level=logging.DEBUG)
self.log(json.dumps(self.kwargs['events']), level=logging.DEBUG)
alive = True
last_trigger_pos = 0
while alive and not self.kwargs['emulate']:
for fileno, event in poller.poll(0.1):
try:
output = os.read(child_fd, 8192)
self.peak(output)
self.trace_log += output
except OSError:
alive = False
break
if 'debug' in self.kwargs and self.kwargs['debug'] and len(output):
self.log(self.cmd, 'gave:', output.decode('UTF-8'), level=logging.DEBUG)
if 'on_output' in self.kwargs:
self.kwargs['on_output'](self.kwargs['worker'], output)
lower = output.lower()
broke = False
if 'events' in self.kwargs:
for trigger in list(self.kwargs['events']):
if type(trigger) != bytes:
original = trigger
trigger = bytes(original, 'UTF-8')
self.kwargs['events'][trigger] = self.kwargs['events'][original]
del self.kwargs['events'][original]
if type(self.kwargs['events'][trigger]) != bytes:
self.kwargs['events'][trigger] = bytes(self.kwargs['events'][trigger], 'UTF-8')
if trigger.lower() in self.trace_log[last_trigger_pos:].lower():
trigger_pos = self.trace_log[last_trigger_pos:].lower().find(trigger.lower())
if 'debug' in self.kwargs and self.kwargs['debug']:
self.log(f"Writing to subprocess {self.cmd[0]}: {self.kwargs['events'][trigger].decode('UTF-8')}", level=logging.DEBUG)
self.log(f"Writing to subprocess {self.cmd[0]}: {self.kwargs['events'][trigger].decode('UTF-8')}", level=logging.DEBUG)
last_trigger_pos = trigger_pos
os.write(child_fd, self.kwargs['events'][trigger])
del self.kwargs['events'][trigger]
broke = True
break
if broke:
continue
# Adding a exit trigger:
if len(self.kwargs['events']) == 0:
if 'debug' in self.kwargs and self.kwargs['debug']:
self.log(f"Waiting for last command {self.cmd[0]} to finish.", level=logging.DEBUG)
if bytes(']$'.lower(), 'UTF-8') in self.trace_log[0 - len(']$') - 5:].lower():
if 'debug' in self.kwargs and self.kwargs['debug']:
self.log(f"{self.cmd[0]} has finished.", level=logging.DEBUG)
alive = False
break
self.status = 'done'
if 'debug' in self.kwargs and self.kwargs['debug']:
self.log(f"{self.cmd[0]} waiting for exit code.", level=logging.DEBUG)
if not self.kwargs['emulate']:
try: try:
self.exit_code = os.waitpid(self.pid, 0)[1] self.exit_code = os.waitpid(self.pid, 0)[1]
except ChildProcessError: except ChildProcessError:
try: try:
self.exit_code = os.waitpid(child_fd, 0)[1] self.exit_code = os.waitpid(self.child_fd, 0)[1]
except ChildProcessError: except ChildProcessError:
self.exit_code = 1 self.exit_code = 1
else:
self.exit_code = 0
if 'debug' in self.kwargs and self.kwargs['debug']: def execute(self) -> bool:
self.log(f"{self.cmd[0]} got exit code: {self.exit_code}", level=logging.DEBUG) if (old_dir := os.getcwd()) != self.working_directory:
os.chdir(self.working_directory)
if 'ignore_errors' in self.kwargs: # Note: If for any reason, we get a Python exception between here
self.exit_code = 0 # and until os.close(), the traceback will get locked inside
# stdout of the child_fd object. `os.read(self.child_fd, 8192)` is the
# only way to get the traceback without loosing it.
self.pid, self.child_fd = pty.fork()
os.chdir(old_dir)
if self.exit_code != 0 and not self.kwargs['suppress_errors']: if not self.pid:
# self.log(self.trace_log.decode('UTF-8'), level=logging.DEBUG) try:
# self.log(f"'{self.raw_cmd}' did not exit gracefully, exit code {self.exit_code}.", level=logging.ERROR) os.execve(self.cmd[0], self.cmd, {**os.environ, **self.environment_vars})
raise SysCallError( except FileNotFoundError:
message=f"{self.trace_log.decode('UTF-8')}\n'{self.raw_cmd}' did not exit gracefully (trace log above), exit code: {self.exit_code}", log(f"{self.cmd[0]} does not exist.", level=logging.ERROR, fg="red")
exit_code=self.exit_code) self.exit_code = 1
return False
self.ended = time.time() self.started = time.time()
with open(f'{self.cwd}/trace.log', 'wb') as fh: self.poll_object.register(self.child_fd, EPOLLIN | EPOLLHUP)
fh.write(self.trace_log)
return True
def decode(self, encoding='UTF-8'):
return self._trace_log.decode(encoding)
class SysCommand:
def __init__(self, cmd, callback=None, start_callback=None, peak_output=False, environment_vars=None, working_directory='./'):
_callbacks = {}
if callback:
_callbacks['on_end'] = callback
if start_callback:
_callbacks['on_start'] = start_callback
self.cmd = cmd
self._callbacks = _callbacks
self.peak_output = peak_output
self.environment_vars = environment_vars
self.working_directory = working_directory
self.session = None
self.create_session()
def __enter__(self):
return self.session
def __exit__(self, *args, **kwargs):
# b''.join(sys_command('sync')) # No need to, since the underlying fs() object will call sync.
# TODO: https://stackoverflow.com/questions/28157929/how-to-safely-handle-an-exception-inside-a-context-manager
if len(args) >= 2 and args[1]:
log(args[1], level=logging.ERROR, fg='red')
def __iter__(self, *args, **kwargs):
for line in self.session:
yield line
def __repr__(self, *args, **kwargs):
return self.session._trace_log.decode('UTF-8')
def __json__(self):
return {
'cmd': self.cmd,
'callbacks': self._callbacks,
'peak': self.peak_output,
'environment_vars': self.environment_vars,
'session': True if self.session else False
}
def create_session(self):
if self.session:
return True
try: try:
os.close(child_fd) self.session = SysCommandWorker(self.cmd, callbacks=self._callbacks, peak_output=self.peak_output, environment_vars=self.environment_vars)
except:
pass while self.session.ended is None:
self.session.poll()
except SysCallError:
return False
return True
def decode(self, fmt='UTF-8'):
return self.session._trace_log.decode(fmt)
@property
def exit_code(self):
return self.session.exit_code
@property
def trace_log(self):
return self.session._trace_log
def prerequisite_check(): def prerequisite_check():
@ -337,3 +359,9 @@ def prerequisite_check():
def reboot(): def reboot():
o = b''.join(SysCommand("/usr/bin/reboot")) o = b''.join(SysCommand("/usr/bin/reboot"))
def pid_exists(pid :int):
try:
return any(subprocess.check_output(['/usr/bin/ps', '--no-headers', '-o', 'pid', '-p', str(pid)]).strip())
except subprocess.CalledProcessError:
return False

View File

@ -1,8 +1,8 @@
from .disk import * from .disk import *
from .hardware import * from .hardware import *
from .locale_helpers import verify_x11_keyboard_layout
from .mirrors import * from .mirrors import *
from .storage import storage from .storage import storage
from .systemd import Networkd
from .user_interaction import * from .user_interaction import *
# Any package that the Installer() is responsible for (optional and the default ones) # Any package that the Installer() is responsible for (optional and the default ones)
@ -78,7 +78,6 @@ class Installer:
# TODO: https://stackoverflow.com/questions/28157929/how-to-safely-handle-an-exception-inside-a-context-manager # TODO: https://stackoverflow.com/questions/28157929/how-to-safely-handle-an-exception-inside-a-context-manager
if len(args) >= 2 and args[1]: if len(args) >= 2 and args[1]:
# self.log(self.trace_log.decode('UTF-8'), level=logging.DEBUG)
self.log(args[1], level=logging.ERROR, fg='red') self.log(args[1], level=logging.ERROR, fg='red')
self.sync_log_to_install_medium() self.sync_log_to_install_medium()
@ -136,7 +135,7 @@ class Installer:
self.log(f'Installing packages: {packages}', level=logging.INFO) self.log(f'Installing packages: {packages}', level=logging.INFO)
if (sync_mirrors := SysCommand('/usr/bin/pacman -Syy')).exit_code == 0: if (sync_mirrors := SysCommand('/usr/bin/pacman -Syy')).exit_code == 0:
if (pacstrap := SysCommand(f'/usr/bin/pacstrap {self.target} {" ".join(packages)}', **kwargs)).exit_code == 0: if (pacstrap := SysCommand(f'/usr/bin/pacstrap {self.target} {" ".join(packages)}', peak_output=True)).exit_code == 0:
return True return True
else: else:
self.log(f'Could not strap in packages: {pacstrap.exit_code}', level=logging.INFO) self.log(f'Could not strap in packages: {pacstrap.exit_code}', level=logging.INFO)
@ -149,9 +148,8 @@ class Installer:
def genfstab(self, flags='-pU'): def genfstab(self, flags='-pU'):
self.log(f"Updating {self.target}/etc/fstab", level=logging.INFO) self.log(f"Updating {self.target}/etc/fstab", level=logging.INFO)
fstab = SysCommand(f'/usr/bin/genfstab {flags} {self.target}').trace_log with open(f"{self.target}/etc/fstab", 'a') as fstab_fh:
with open(f"{self.target}/etc/fstab", 'ab') as fstab_fh: fstab_fh.write(SysCommand(f'/usr/bin/genfstab {flags} {self.target}').decode())
fstab_fh.write(fstab)
if not os.path.isfile(f'{self.target}/etc/fstab'): if not os.path.isfile(f'{self.target}/etc/fstab'):
raise RequirementError(f'Could not generate fstab, strapping in packages most likely failed (disk out of space?)\n{fstab}') raise RequirementError(f'Could not generate fstab, strapping in packages most likely failed (disk out of space?)\n{fstab}')
@ -215,6 +213,8 @@ class Installer:
subprocess.check_call(f"/usr/bin/arch-chroot {self.target}", shell=True) subprocess.check_call(f"/usr/bin/arch-chroot {self.target}", shell=True)
def configure_nic(self, nic, dhcp=True, ip=None, gateway=None, dns=None, *args, **kwargs): def configure_nic(self, nic, dhcp=True, ip=None, gateway=None, dns=None, *args, **kwargs):
from .systemd import Networkd
if dhcp: if dhcp:
conf = Networkd(Match={"Name": nic}, Network={"DHCP": "yes"}) conf = Networkd(Match={"Name": nic}, Network={"DHCP": "yes"})
else: else:
@ -514,11 +514,44 @@ class Installer:
o = b''.join(SysCommand(f"/usr/bin/arch-chroot {self.target} sh -c \"chsh -s {shell} {user}\"")) o = b''.join(SysCommand(f"/usr/bin/arch-chroot {self.target} sh -c \"chsh -s {shell} {user}\""))
pass pass
def set_keyboard_language(self, language): def set_keyboard_language(self, language: str) -> bool:
if len(language.strip()): if len(language.strip()):
with open(f'{self.target}/etc/vconsole.conf', 'w') as vconsole: if not verify_keyboard_layout(language):
vconsole.write(f'KEYMAP={language}\n') self.log(f"Invalid keyboard language specified: {language}", fg="red", level=logging.ERROR)
vconsole.write('FONT=lat9w-16\n') return False
# In accordance with https://github.com/archlinux/archinstall/issues/107#issuecomment-841701968
# Setting an empty keymap first, allows the subsequent call to set layout for both console and x11.
from .systemd import Boot
with Boot(self) as session:
session.SysCommand(["localectl", "set-keymap", '""'])
if (output := session.SysCommand(["localectl", "set-keymap", language])).exit_code != 0:
raise ServiceException(f"Unable to set locale '{language}' for console: {output}")
self.log(f"Keyboard language for this installation is now set to: {language}")
else: else:
self.log('Keyboard language was not changed from default (no language specified).', fg="yellow", level=logging.INFO) self.log('Keyboard language was not changed from default (no language specified).', fg="yellow", level=logging.INFO)
return True
def set_x11_keyboard_language(self, language: str) -> bool:
"""
A fallback function to set x11 layout specifically and separately from console layout.
This isn't strictly necessary since .set_keyboard_language() does this as well.
"""
if len(language.strip()):
if not verify_x11_keyboard_layout(language):
self.log(f"Invalid x11-keyboard language specified: {language}", fg="red", level=logging.ERROR)
return False
with Boot(self) as session:
session.SysCommand(["localectl", "set-x11-keymap", '""'])
if (output := session.SysCommand(["localectl", "set-x11-keymap", language])).exit_code != 0:
raise ServiceException(f"Unable to set locale '{language}' for X11: {output}")
else:
self.log(f'X11-Keyboard language was not changed from default (no language specified).', fg="yellow", level=logging.INFO)
return True return True

View File

@ -1,23 +1,18 @@
import os import logging
import subprocess
from .exceptions import * from .exceptions import ServiceException
from .general import SysCommand
from .output import log
# from .general import sys_command
def list_keyboard_languages(): def list_keyboard_languages():
locale_dir = '/usr/share/kbd/keymaps/' for line in SysCommand("localectl --no-pager list-keymaps", environment_vars={'SYSTEMD_COLORS': '0'}):
yield line.decode('UTF-8').strip()
if not os.path.isdir(locale_dir):
raise RequirementError(f'Directory containing locales does not exist: {locale_dir}')
for root, folders, files in os.walk(locale_dir): def list_x11_keyboard_languages():
for line in SysCommand("localectl --no-pager list-x11-keymap-layouts", environment_vars={'SYSTEMD_COLORS': '0'}):
for file in files: yield line.decode('UTF-8').strip()
if os.path.splitext(file)[1] == '.gz':
yield file.strip('.gz').strip('.map')
def verify_keyboard_layout(layout): def verify_keyboard_layout(layout):
@ -27,11 +22,28 @@ def verify_keyboard_layout(layout):
return False return False
def search_keyboard_layout(layout_filter): def verify_x11_keyboard_layout(layout):
for language in list_x11_keyboard_languages():
if layout.lower() == language.lower():
return True
return False
def search_keyboard_layout(layout):
for language in list_keyboard_languages(): for language in list_keyboard_languages():
if layout_filter.lower() in language.lower(): if layout.lower() in language.lower():
yield language yield language
def set_keyboard_language(locale): def set_keyboard_language(locale):
return subprocess.call(['loadkeys', locale]) == 0 if len(locale.strip()):
if not verify_keyboard_layout(locale):
log(f"Invalid keyboard locale specified: {locale}", fg="red", level=logging.ERROR)
return False
if (output := SysCommand(f'localectl set-keymap {locale}')).exit_code != 0:
raise ServiceException(f"Unable to set locale '{locale}' for console: {output}")
return True
return False

View File

@ -1,5 +1,6 @@
import fcntl import fcntl
import os import os
import logging
import socket import socket
import struct import struct
from collections import OrderedDict from collections import OrderedDict
@ -7,7 +8,7 @@ from collections import OrderedDict
from .exceptions import * from .exceptions import *
from .general import SysCommand from .general import SysCommand
from .storage import storage from .storage import storage
from .output import log
def get_hw_addr(ifname): def get_hw_addr(ifname):
s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
@ -27,12 +28,12 @@ def list_interfaces(skip_loopback=True):
def check_mirror_reachable(): def check_mirror_reachable():
try: if (exit_code := SysCommand("pacman -Sy").exit_code) == 0:
check = SysCommand("pacman -Sy") return True
return check.exit_code == 0 elif exit_code == 256:
except: log("check_mirror_reachable() uses 'pacman -Sy' which requires root.", level=logging.ERROR, fg="red")
return False
return False
def enrich_iface_types(interfaces: dict): def enrich_iface_types(interfaces: dict):
result = {} result = {}

View File

@ -155,7 +155,7 @@ def log(*args, **kwargs):
log("Deprecated level detected in log message, please use new logging.<level> instead for the following log message:", fg="red", level=logging.ERROR, force=True) log("Deprecated level detected in log message, please use new logging.<level> instead for the following log message:", fg="red", level=logging.ERROR, force=True)
kwargs['level'] = logging.DEBUG kwargs['level'] = logging.DEBUG
if kwargs['level'] > storage.get('LOG_LEVEL', logging.INFO) and 'force' not in kwargs: if kwargs['level'] < storage.get('LOG_LEVEL', logging.INFO) and 'force' not in kwargs:
# Level on log message was Debug, but output level is set to Info. # Level on log message was Debug, but output level is set to Info.
# In that case, we'll drop it. # In that case, we'll drop it.
return None return None

View File

@ -1,3 +1,10 @@
import logging
from .general import SysCommand, SysCommandWorker, locate_binary
from .installer import Installer
from .output import log
from .storage import storage
class Ini: class Ini:
def __init__(self, *args, **kwargs): def __init__(self, *args, **kwargs):
""" """
@ -36,3 +43,78 @@ class Networkd(Systemd):
""" """
Placeholder class to do systemd-network specific setups. Placeholder class to do systemd-network specific setups.
""" """
class Boot:
def __init__(self, installation: Installer):
self.instance = installation
self.container_name = 'archinstall'
self.session = None
self.ready = False
def __enter__(self):
if (existing_session := storage.get('active_boot', None)) and existing_session.instance != self.instance:
raise KeyError("Archinstall only supports booting up one instance, and a active session is already active and it is not this one.")
if existing_session:
self.session = existing_session.session
self.ready = existing_session.ready
else:
self.session = SysCommandWorker([
'/usr/bin/systemd-nspawn',
'-D', self.instance.target,
'-b',
'--machine', self.container_name
])
if not self.ready:
while self.session.is_alive():
if b' login:' in self.session:
self.ready = True
break
storage['active_boot'] = self
return self
def __exit__(self, *args, **kwargs):
# b''.join(sys_command('sync')) # No need to, since the underlying fs() object will call sync.
# TODO: https://stackoverflow.com/questions/28157929/how-to-safely-handle-an-exception-inside-a-context-manager
if len(args) >= 2 and args[1]:
log(args[1], level=logging.ERROR, fg='red')
log(f"The error above occured in a temporary boot-up of the installation {self.instance}", level=logging.ERROR, fg="red")
SysCommand(f'machinectl shell {self.container_name} /bin/bash -c "shutdown now"')
def __iter__(self):
if self.session:
for value in self.session:
yield value
def __contains__(self, key: bytes):
if self.session is None:
return False
return key in self.session
def is_alive(self):
if self.session is None:
return False
return self.session.is_alive()
def SysCommand(self, cmd :list, *args, **kwargs):
if cmd[0][0] != '/' and cmd[0][:2] != './':
# This check is also done in SysCommand & SysCommandWorker.
# However, that check is done for `machinectl` and not for our chroot command.
# So this wrapper for SysCommand will do this additionally.
cmd[0] = locate_binary(cmd[0])
return SysCommand(["machinectl", "shell", self.container_name, *cmd], *args, **kwargs)
def SysCommandWorker(self, cmd :list, *args, **kwargs):
if cmd[0][0] != '/' and cmd[0][:2] != './':
cmd[0] = locate_binary(cmd[0])
return SysCommandWorker(["machinectl", "shell", self.container_name, *cmd], *args, **kwargs)

View File

@ -591,7 +591,7 @@ def select_profile(options):
raise RequirementError("Selecting profiles require a least one profile to be given as an option.") raise RequirementError("Selecting profiles require a least one profile to be given as an option.")
def select_language(options, show_only_country_codes=True): def select_language(options, show_only_country_codes=True, input_text='Select one of the above keyboard languages (by number or full name): '):
""" """
Asks the user to select a language from the `options` dictionary parameter. Asks the user to select a language from the `options` dictionary parameter.
Usually this is combined with :ref:`archinstall.list_keyboard_languages`. Usually this is combined with :ref:`archinstall.list_keyboard_languages`.
@ -613,14 +613,13 @@ def select_language(options, show_only_country_codes=True):
languages = sorted(list(options)) languages = sorted(list(options))
if len(languages) >= 1: if len(languages) >= 1:
for index, language in enumerate(languages): print_large_list(languages, margin_bottom=4)
print(f"{index}: {language}")
print(" -- You can choose a layout that isn't in this list, but whose name you know --") print(" -- You can choose a layout that isn't in this list, but whose name you know --")
print(" -- Also, you can enter '?' or 'help' to search for more languages, or skip to use US layout --") print(f" -- Also, you can enter '?' or 'help' to search for more languages, or skip to use {default_keyboard_language} layout --")
while True: while True:
selected_language = input('Select one of the above keyboard languages (by name or full name): ') selected_language = input(input_text)
if not selected_language: if not selected_language:
return default_keyboard_language return default_keyboard_language
elif selected_language.lower() in ('?', 'help'): elif selected_language.lower() in ('?', 'help'):
@ -705,8 +704,7 @@ def select_driver(options=AVAILABLE_GFX_DRIVERS):
default_option = options["All open-source (default)"] default_option = options["All open-source (default)"]
if drivers: if drivers:
lspci = SysCommand('/usr/bin/lspci') for line in SysCommand('/usr/bin/lspci'):
for line in lspci.trace_log.split(b'\r\n'):
if b' vga ' in line.lower(): if b' vga ' in line.lower():
if b'nvidia' in line.lower(): if b'nvidia' in line.lower():
print(' ** nvidia card detected, suggested driver: nvidia **') print(' ** nvidia card detected, suggested driver: nvidia **')

View File

@ -1,6 +1,7 @@
import json import json
import logging import logging
import time import time
import os
import archinstall import archinstall
from archinstall.lib.hardware import has_uefi from archinstall.lib.hardware import has_uefi
@ -51,7 +52,7 @@ def ask_user_questions():
else: else:
archinstall.arguments['harddrive'] = archinstall.select_disk(archinstall.all_disks()) archinstall.arguments['harddrive'] = archinstall.select_disk(archinstall.all_disks())
if archinstall.arguments['harddrive'] is None: if archinstall.arguments['harddrive'] is None:
archinstall.arguments['target-mount'] = '/mnt' archinstall.arguments['target-mount'] = archinstall.storage.get('MOUNT_POINT', '/mnt')
# Perform a quick sanity check on the selected harddrive. # Perform a quick sanity check on the selected harddrive.
# 1. Check if it has partitions # 1. Check if it has partitions
@ -291,14 +292,14 @@ def perform_installation_steps():
# unlocks the drive so that it can be used as a normal block-device within archinstall. # unlocks the drive so that it can be used as a normal block-device within archinstall.
with archinstall.luks2(fs.find_partition('/'), 'luksloop', archinstall.arguments.get('!encryption-password', None)) as unlocked_device: with archinstall.luks2(fs.find_partition('/'), 'luksloop', archinstall.arguments.get('!encryption-password', None)) as unlocked_device:
unlocked_device.format(fs.find_partition('/').filesystem) unlocked_device.format(fs.find_partition('/').filesystem)
unlocked_device.mount('/mnt') unlocked_device.mount(archinstall.storage.get('MOUNT_POINT', '/mnt'))
else: else:
fs.find_partition('/').mount('/mnt') fs.find_partition('/').mount(archinstall.storage.get('MOUNT_POINT', '/mnt'))
if has_uefi(): if has_uefi():
fs.find_partition('/boot').mount('/mnt/boot') fs.find_partition('/boot').mount(archinstall.storage.get('MOUNT_POINT', '/mnt')+'/boot')
perform_installation('/mnt') perform_installation(archinstall.storage.get('MOUNT_POINT', '/mnt'))
def perform_installation(mountpoint): def perform_installation(mountpoint):
@ -324,7 +325,6 @@ def perform_installation(mountpoint):
installation.set_mirrors(archinstall.arguments['mirror-region']) # Set the mirrors in the installation medium installation.set_mirrors(archinstall.arguments['mirror-region']) # Set the mirrors in the installation medium
if archinstall.arguments["bootloader"] == "grub-install" and has_uefi(): if archinstall.arguments["bootloader"] == "grub-install" and has_uefi():
installation.add_additional_packages("grub") installation.add_additional_packages("grub")
installation.set_keyboard_language(archinstall.arguments['keyboard-language'])
installation.add_bootloader(archinstall.arguments["bootloader"]) installation.add_bootloader(archinstall.arguments["bootloader"])
# If user selected to copy the current ISO network configuration # If user selected to copy the current ISO network configuration
@ -370,6 +370,10 @@ def perform_installation(mountpoint):
if (root_pw := archinstall.arguments.get('!root-password', None)) and len(root_pw): if (root_pw := archinstall.arguments.get('!root-password', None)) and len(root_pw):
installation.user_set_pw('root', root_pw) installation.user_set_pw('root', root_pw)
# This step must be after profile installs to allow profiles to install language pre-requisits.
# After which, this step will set the language both for console and x11 if x11 was installed for instance.
installation.set_keyboard_language(archinstall.arguments['keyboard-language'])
if archinstall.arguments['profile'] and archinstall.arguments['profile'].has_post_install(): if archinstall.arguments['profile'] and archinstall.arguments['profile'].has_post_install():
with archinstall.arguments['profile'].load_instructions(namespace=f"{archinstall.arguments['profile'].namespace}.py") as imported: with archinstall.arguments['profile'].load_instructions(namespace=f"{archinstall.arguments['profile'].namespace}.py") as imported:
if not imported._post_install(): if not imported._post_install():
@ -389,7 +393,8 @@ def perform_installation(mountpoint):
if not check_mirror_reachable(): if not check_mirror_reachable():
archinstall.log("Arch Linux mirrors are not reachable. Please check your internet connection and try again.", level=logging.INFO, fg="red") log_file = os.path.join(archinstall.storage.get('LOG_PATH', None), archinstall.storage.get('LOG_FILE', None))
archinstall.log(f"Arch Linux mirrors are not reachable. Please check your internet connection and the log file '{log_file}'.", level=logging.INFO, fg="red")
exit(1) exit(1)
ask_user_questions() ask_user_questions()

View File

@ -9,7 +9,7 @@ is_top_level_profile = False
__packages__ = [ __packages__ = [
"nemo", "nemo",
"gpicview", "gpicview",
"main", "maim",
"alacritty", "alacritty",
] ]