Flexible menu 2 (#916)

* Correct definition of btrfs standard layout

* New version of the FlexibleMenu
* Added new functionality to Selector
* Created a GeneralMenu class
* GlobalMenu is made a child of GeneralMenu

* Some refining in GeneralMenu
secret is now a general function

* log is invoked in GeneralMenu directly

* Correction at GeneralMenu

* Materialize again _setup_selection_menu_options. Gives more room to play

* Callbacks converted as methods
Synch() (data area and menu) decoupled from enable()
and made general before any run

* script swiss added to the patch set

* Only_hd gets a new implementation of the menu
flake8 corrections

* swiss.py description added

* New version of the FlexibleMenu
* Added new functionality to Selector
* Created a GeneralMenu class
* GlobalMenu is made a child of GeneralMenu

* changes from the rebase left dangling

* Modify order of execution between exec_menu and post_processing.
Added selector_name as parameter for exec_menu

* minor corrections to the scripts

* Adapt to PR #874

* Solve issue #936

* make ask_for_a_timezone as synonym to ask_timezone

* Adapted to nationalization framework (PR 893).
String still NOT adapted

* flake8 complains

* Use of archinstall.output_config instead of local copy at swiss.py

* Problems with the last merge

* more flake8 complains. caused by reverted changes re. ask*timezone

* git complains

Co-authored-by: Anton Hvornum <anton@hvornum.se>
This commit is contained in:
Werner Llácer 2022-02-06 11:54:13 +01:00 committed by GitHub
parent 9fb8d3164c
commit 1ea6fea1d8
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
8 changed files with 850 additions and 70 deletions

View File

@ -35,7 +35,11 @@ from .lib.storage import *
from .lib.systemd import *
from .lib.user_interaction import *
from .lib.menu import Menu
from .lib.menu.selection_menu import GlobalMenu
from .lib.menu.selection_menu import (
GlobalMenu,
Selector,
GeneralMenu
)
from .lib.translation import Translation, DeferredTranslation
from .lib.plugins import plugins, load_plugin # This initiates the plugin loading ceremony
from .lib.configuration import *

View File

@ -550,3 +550,7 @@ def json_stream_to_structure(id : str, stream :str, target :dict) -> bool :
log(f" {id} is neither a file nor is a JSON string:",level=logging.ERROR)
return False
return True
def secret(x :str):
""" return * with len equal to to the input string """
return '*' * len(x)

View File

@ -1,8 +1,10 @@
from __future__ import annotations
import sys
from typing import Dict
import logging
from typing import Callable, Any, List, Iterator, Dict
from .menu import Menu
from ..general import SysCommand
from ..general import SysCommand, secret
from ..storage import storage
from ..output import log
from ..profiles import is_desktop_profile
@ -34,13 +36,16 @@ from ..translation import Translation
class Selector:
def __init__(
self,
description,
func=None,
display_func=None,
default=None,
enabled=False,
dependencies=[],
dependencies_not=[]
description :str,
func :Callable = None,
display_func :Callable = None,
default :Any = None,
enabled :bool = False,
dependencies :List = [],
dependencies_not :List = [],
exec_func :Callable = None,
preview_func :Callable = None,
mandatory :bool = False
):
"""
Create a new menu selection entry
@ -70,6 +75,17 @@ class Selector:
:param dependencies_not: These are the exclusive options; the menu item will only be
displayed if non of the entries in the list have been specified
:type dependencies_not: list
:param exec_func: A function with the name and the result of the selection as input parameter and which returns boolean.
Can be used for any action deemed necessary after selection. If it returns True, exits the menu loop, if False,
menu returns to the selection screen. If not specified it is assumed the return is False
:type exec_func: Callable
:param preview_func: A callable which invokws a preview screen (not implemented)
:type preview_func: Callable
:param mandatory: A boolean which determines that the field is mandatory, i.e. menu can not be exited if it is not set
:type mandatory: bool
"""
self._description = description
@ -79,26 +95,29 @@ class Selector:
self.enabled = enabled
self._dependencies = dependencies
self._dependencies_not = dependencies_not
self.exec_func = exec_func
self.preview_func = preview_func
self.mandatory = mandatory
@property
def dependencies(self):
def dependencies(self) -> dict:
return self._dependencies
@property
def dependencies_not(self):
def dependencies_not(self) -> dict:
return self._dependencies_not
@property
def current_selection(self):
return self._current_selection
def set_enabled(self):
self.enabled = True
def set_enabled(self, status :bool = True):
self.enabled = status
def update_description(self, description):
def update_description(self, description :str):
self._description = description
def menu_text(self):
def menu_text(self) -> str:
current = ''
if self._display_func:
@ -113,29 +132,271 @@ class Selector:
return f'{self._description} {current}'
def set_current_selection(self, current):
def set_current_selection(self, current :str):
self._current_selection = current
def has_selection(self):
def has_selection(self) -> bool:
if self._current_selection is None:
return False
return True
def is_empty(self):
def get_selection(self) -> Any:
return self._current_selection
def is_empty(self) -> bool:
if self._current_selection is None:
return True
elif isinstance(self._current_selection, (str, list, dict)) and len(self._current_selection) == 0:
return True
return False
def is_enabled(self) -> bool:
return self.enabled
class GlobalMenu:
def __init__(self):
def is_mandatory(self) -> bool:
return self.mandatory
def set_mandatory(self, status :bool = True):
self.mandatory = status
if status and not self.is_enabled():
self.set_enabled(True)
class GeneralMenu:
def __init__(self, data_store :dict = None):
"""
Create a new selection menu.
:param data_store: Area (Dict) where the resulting data will be held. At least an entry for each option. Default area is self._data_store (not preset in the call, due to circular references
:type data_store: Dict
"""
self._translation = Translation.load_nationalization()
self.is_context_mgr = False
self._data_store = data_store if data_store is not None else {}
self._menu_options = {}
self._setup_selection_menu_options()
def __enter__(self, *args :Any, **kwargs :Any) -> GeneralMenu:
self.is_context_mgr = True
return self
def __exit__(self, *args :Any, **kwargs :Any) -> None:
# TODO: https://stackoverflow.com/questions/28157929/how-to-safely-handle-an-exception-inside-a-context-manager
# TODO: skip processing when it comes from a planified exit
if len(args) >= 2 and args[1]:
log(args[1], level=logging.ERROR, fg='red')
print(" Please submit this issue (and file) to https://github.com/archlinux/archinstall/issues")
raise args[1]
for key in self._menu_options:
sel = self._menu_options[key]
if key and key not in self._data_store:
self._data_store[key] = sel._current_selection
self.exit_callback()
def _setup_selection_menu_options(self):
""" Define the menu options.
Menu options can be defined here in a subclass or done per progam calling self.set_option()
"""
return
def pre_callback(self, selector_name):
""" will be called before each action in the menu """
return
def post_callback(self, selector_name :str, value :Any):
""" will be called after each action in the menu """
return True
def exit_callback(self):
""" will be called at the end of the processing of the menu """
return
def synch(self, selector_name :str, omit_if_set :bool = False,omit_if_disabled :bool = False):
""" loads menu options with data_store value """
arg = self._data_store.get(selector_name, None)
# don't display the menu option if it was defined already
if arg is not None and omit_if_set:
return
if not self.option(selector_name).is_enabled() and omit_if_disabled:
return
if arg is not None:
self._menu_options[selector_name].set_current_selection(arg)
def enable(self, selector_name :str, omit_if_set :bool = False , mandatory :bool = False):
""" activates menu options """
if self._menu_options.get(selector_name, None):
self._menu_options[selector_name].set_enabled(True)
if mandatory:
self._menu_options[selector_name].set_mandatory(True)
self.synch(selector_name,omit_if_set)
else:
print(f'No selector found: {selector_name}')
sys.exit(1)
def run(self):
""" Calls the Menu framework"""
# we synch all the options just in case
for item in self.list_options():
self.synch(item)
while True:
# Before continuing, set the preferred keyboard layout/language in the current terminal.
# This will just help the user with the next following questions.
self._set_kb_language()
enabled_menus = self._menus_to_enable()
menu_text = [m.text for m in enabled_menus.values()]
selection = Menu('Set/Modify the below options', menu_text, sort=False).run()
if selection:
selection = selection.strip()
if selection:
# if this calls returns false, we exit the menu. We allow for an callback for special processing on realeasing control
if not self._process_selection(selection):
break
if not self.is_context_mgr:
self.__exit__()
def _process_selection(self, selection :str) -> bool:
""" determines and executes the selection y
Can / Should be extended to handle specific selection issues
Returns true if the menu shall continue, False if it has ended
"""
# find the selected option in our option list
option = [[k, v] for k, v in self._menu_options.items() if v.text.strip() == selection]
if len(option) != 1:
raise ValueError(f'Selection not found: {selection}')
selector_name = option[0][0]
selector = option[0][1]
return self.exec_option(selector_name,selector)
def exec_option(self,selector_name :str, p_selector :Selector = None) -> bool:
""" processes the exection of a given menu entry
- pre process callback
- selection function
- post process callback
- exec action
returns True if the loop has to continue, false if the loop can be closed
"""
if not p_selector:
selector = self.option(selector_name)
else:
selector = p_selector
self.pre_callback(selector_name)
result = None
if selector.func:
result = selector.func()
self._menu_options[selector_name].set_current_selection(result)
self._data_store[selector_name] = result
exec_ret_val = selector.exec_func(selector_name,result) if selector.exec_func else False
self.post_callback(selector_name,result)
if exec_ret_val and self._check_mandatory_status():
return False
return True
""" old behaviour
# we allow for a callback after we get the result
self.post_callback(selector_name,result)
# we have a callback, by option, to determine if we can exit the menu. Only if ALL mandatory fields are written
if selector.exec_func:
if selector.exec_func(result) and self._check_mandatory_status():
return False
"""
return True
def _set_kb_language(self):
""" general for ArchInstall"""
# Before continuing, set the preferred keyboard layout/language in the current terminal.
# This will just help the user with the next following questions.
if self._data_store.get('keyboard-layout', None) and len(self._data_store['keyboard-layout']):
set_keyboard_language(self._data_store['keyboard-layout'])
def _verify_selection_enabled(self, selection_name :str) -> bool:
""" general """
if selection := self._menu_options.get(selection_name, None):
if not selection.enabled:
return False
if len(selection.dependencies) > 0:
for d in selection.dependencies:
if not self._verify_selection_enabled(d) or self._menu_options.get(d).is_empty():
return False
if len(selection.dependencies_not) > 0:
for d in selection.dependencies_not:
if not self._menu_options.get(d).is_empty():
return False
return True
raise ValueError(f'No selection found: {selection_name}')
def _menus_to_enable(self) -> dict:
""" general """
enabled_menus = {}
for name, selection in self._menu_options.items():
if self._verify_selection_enabled(name):
enabled_menus[name] = selection
return enabled_menus
def option(self,name :str) -> Selector:
# TODO check inexistent name
return self._menu_options[name]
def list_options(self) -> Iterator:
""" Iterator to retrieve the enabled menu option names
"""
for item in self._menu_options:
yield item
def list_enabled_options(self) -> Iterator:
""" Iterator to retrieve the enabled menu options at a given time.
The results are dynamic (if between calls to the iterator some elements -still not retrieved- are (de)activated
"""
for item in self._menu_options:
if item in self._menus_to_enable():
yield item
def set_option(self, name :str, selector :Selector):
self._menu_options[name] = selector
self.synch(name)
def _check_mandatory_status(self) -> bool:
for field in self._menu_options:
option = self._menu_options[field]
if option.is_mandatory() and not option.has_selection():
return False
return True
def set_mandatory(self, field :str, status :bool):
self.option(field).set_mandatory(status)
def mandatory_overview(self) -> [int, int]:
mandatory_fields = 0
mandatory_waiting = 0
for field in self._menu_options:
option = self._menu_options[field]
if option.is_mandatory():
mandatory_fields += 1
if not option.has_selection():
mandatory_waiting += 1
return mandatory_fields, mandatory_waiting
def _select_archinstall_language(self, default_lang):
language = select_archinstall_language(default_lang)
self._translation.activate(language)
return language
class GlobalMenu(GeneralMenu):
def __init__(self,data_store):
super().__init__(data_store=data_store)
def _setup_selection_menu_options(self):
self._menu_options['archinstall-language'] = \
Selector(
@ -170,8 +431,7 @@ class GlobalMenu:
self._menu_options['!encryption-password'] = \
Selector(
_('Set encryption password'),
lambda: get_password(prompt=str(_('Enter disk encryption password (leave blank for no encryption): '))),
display_func=lambda x: self._secret(x) if x else 'None',
display_func=lambda x: secret(x) if x else 'None',
dependencies=['harddrives'])
self._menu_options['swap'] = \
Selector(
@ -188,7 +448,7 @@ class GlobalMenu:
Selector(
_('Set root password'),
lambda: self._set_root_password(),
display_func=lambda x: self._secret(x) if x else 'None')
display_func=lambda x: secret(x) if x else 'None')
self._menu_options['!superusers'] = \
Selector(
_('Specify superuser account'),
@ -236,7 +496,9 @@ class GlobalMenu:
self._menu_options['install'] = \
Selector(
self._install_text(),
exec_func=lambda n,v: True if self._missing_configs() == 0 else False,
enabled=True)
self._menu_options['abort'] = Selector(_('Abort'), enabled=True)
def enable(self, selector_name, omit_if_set=False):
@ -300,8 +562,11 @@ class GlobalMenu:
text = self._install_text()
self._menu_options.get('install').update_description(text)
def _post_processing(self):
if storage['arguments'].get('harddrives', None) and storage['arguments'].get('!encryption-password', None):
def post_callback(self,name :str = None ,result :Any = None):
self._update_install(name,result)
def exit_callback(self):
if self._data_store.get('harddrives', None) and self._data_store.get('!encryption-password', None):
# If no partitions was marked as encrypted, but a password was supplied and we have some disks to format..
# Then we need to identify which partitions to encrypt. This will default to / (root).
if len(list(encrypted_partitions(storage['arguments'].get('disk_layouts', [])))) == 0:
@ -337,11 +602,6 @@ class GlobalMenu:
return missing
def _select_archinstall_language(self, default_lang):
language = select_archinstall_language(default_lang)
self._translation.activate(language)
return language
def _set_root_password(self):
prompt = str(_('Enter root password (leave blank to disable root): '))
password = get_password(prompt=prompt)
@ -386,9 +646,6 @@ class GlobalMenu:
return harddrives
def _secret(self, x):
return '*' * len(x)
def _select_profile(self):
profile = select_profile()
@ -444,4 +701,4 @@ class GlobalMenu:
if self._verify_selection_enabled(name):
enabled_menus[name] = selection
return enabled_menus
return enabled_menus

View File

@ -351,7 +351,6 @@ def ask_for_a_timezone() -> str:
return selected_tz
def ask_for_bootloader(advanced_options :bool = False) -> str:
bootloader = "systemd-bootctl" if has_uefi() else "grub-install"
if has_uefi():

View File

@ -36,7 +36,7 @@ def ask_user_questions():
# the system immediately
archinstall.SysCommand('timedatectl set-ntp true')
global_menu = archinstall.GlobalMenu()
global_menu = archinstall.GlobalMenu(data_store=archinstall.arguments)
global_menu.enable('keyboard-layout')
# Set which region to download packages from during the installation

View File

@ -4,41 +4,41 @@ import pathlib
import archinstall
def ask_harddrives():
# Ask which harddrives/block-devices we will install to
# and convert them into archinstall.BlockDevice() objects.
if archinstall.arguments.get('harddrives', None) is None:
archinstall.arguments['harddrives'] = archinstall.generic_multi_select(archinstall.all_disks(),
text="Select one or more harddrives to use and configure (leave blank to skip this step): ",
allow_empty=True)
class OnlyHDMenu(archinstall.GlobalMenu):
def _setup_selection_menu_options(self):
super()._setup_selection_menu_options()
options_list = []
mandatory_list = []
options_list = ['harddrives', 'disk_layouts', '!encryption-password','swap']
mandatory_list = ['harddrives']
options_list.extend(['install','abort'])
if not archinstall.arguments['harddrives']:
archinstall.log("You decided to skip harddrive selection",fg="red",level=logging.INFO)
archinstall.log(f"and will use whatever drive-setup is mounted at {archinstall.storage['MOUNT_POINT']} (experimental)",fg="red",level=logging.INFO)
archinstall.log("WARNING: Archinstall won't check the suitability of this setup",fg="red",level=logging.INFO)
if input("Do you wish to continue ? [Y/n]").strip().lower() == 'n':
exit(1)
else:
if archinstall.arguments.get('disk_layouts', None) is None:
archinstall.arguments['disk_layouts'] = archinstall.select_disk_layout(archinstall.arguments['harddrives'], archinstall.arguments.get('advanced', False))
for entry in self._menu_options:
if entry in options_list:
# for not lineal executions, only self.option(entry).set_enabled and set_mandatory are necessary
if entry in mandatory_list:
self.enable(entry,mandatory=True)
else:
self.enable(entry)
else:
self.option(entry).set_enabled(False)
self._update_install()
def _missing_configs(self):
""" overloaded method """
def check(s):
return self.option(s).has_selection()
# Get disk encryption password (or skip if blank)
if archinstall.arguments.get('!encryption-password', None) is None:
if passwd := archinstall.get_password(prompt=str(_('Enter disk encryption password (leave blank for no encryption): '))):
archinstall.arguments['!encryption-password'] = passwd
if archinstall.arguments.get('!encryption-password', None):
# If no partitions was marked as encrypted, but a password was supplied and we have some disks to format..
# Then we need to identify which partitions to encrypt. This will default to / (root).
if len(list(archinstall.encrypted_partitions(archinstall.arguments['disk_layouts']))) == 0:
archinstall.arguments['disk_layouts'] = archinstall.select_encrypted_partitions(archinstall.arguments['disk_layouts'], archinstall.arguments['!encryption-password'])
# Ask which boot-loader to use (will only ask if we're in BIOS (non-efi) mode)
if not archinstall.arguments.get("bootloader", None):
archinstall.arguments["bootloader"] = archinstall.ask_for_bootloader(archinstall.arguments.get('advanced', False))
if not archinstall.arguments.get('swap', None):
archinstall.arguments['swap'] = archinstall.ask_for_swap()
_, missing = self.mandatory_overview()
if check('harddrives'):
if not self.option('harddrives').is_empty() and not check('disk_layouts'):
missing += 1
return missing
def ask_user_questions():
"""
@ -46,7 +46,11 @@ def ask_user_questions():
Not until we're satisfied with what we want to install
will we continue with the actual installation steps.
"""
ask_harddrives()
with OnlyHDMenu(data_store=archinstall.arguments) as menu:
# We select the execution language separated
menu.exec_option('archinstall-language')
menu.option('archinstall-language').set_enabled(False)
menu.run()
def perform_disk_operations():
"""
@ -56,7 +60,6 @@ def perform_disk_operations():
if archinstall.arguments.get('harddrives', None):
print(f" ! Formatting {archinstall.arguments['harddrives']} in ", end='')
archinstall.do_countdown()
"""
Setup the blockdevice, filesystem (and optionally encryption).
Once that's done, we'll hand over to perform_installation()
@ -66,9 +69,9 @@ def perform_disk_operations():
mode = archinstall.MBR
for drive in archinstall.arguments.get('harddrives', []):
if dl_disk := archinstall.arguments.get('disk_layouts', {}).get(drive.path):
if archinstall.arguments.get('disk_layouts', {}).get(drive.path):
with archinstall.Filesystem(drive, mode) as fs:
fs.load_layout(dl_disk)
fs.load_layout(archinstall.arguments['disk_layouts'][drive.path])
def perform_installation(mountpoint):
"""

512
examples/swiss.py Normal file
View File

@ -0,0 +1,512 @@
"""
Script swiss (army knife)
Designed to make different workflows for the installation process. Which is controled by the argument --mode
mode full guides the full process of installation
mode only_hd only proceeds to the creation of the disk infraestructure (partition, mount points, encryption)
mode only_os processes only the installation of Archlinux and software at --mountpoint (or /mnt/archinstall)
mode minimal (still not implemented)
mode lineal. Instead of a menu, shows a sequence of selection screens (eq. to the old mode for guided.py)
When using the argument --advanced. an aditional menu for several special parameters needed during installation appears
This script respects the --dry_run argument
"""
import logging
import os
import time
import pathlib
import archinstall
if archinstall.arguments.get('help'):
print("See `man archinstall` for help.")
exit(0)
if os.getuid() != 0:
print("Archinstall requires root privileges to run. See --help for more.")
exit(1)
"""
particular routines to SetupMenu
TODO exec con return parameter
"""
def select_activate_NTP():
prompt = "Would you like to use automatic time synchronization (NTP) with the default time servers? [Y/n]: "
choice = archinstall.Menu(prompt, ['yes', 'no'], default_option='yes').run()
if choice == 'yes':
return True
else:
return False
def select_mode():
return archinstall.generic_select(['full','only_hd','only_os','minimal','lineal'],
'Select one execution mode',
default=archinstall.arguments.get('mode','full'))
"""
following functions will be at locale_helpers, so they will have to be called prefixed by archinstall
"""
def get_locale_mode_text(mode):
if mode == 'LC_ALL':
mode_text = "general (LC_ALL)"
elif mode == "LC_CTYPE":
mode_text = "Character set"
elif mode == "LC_NUMERIC":
mode_text = "Numeric values"
elif mode == "LC_TIME":
mode_text = "Time Values"
elif mode == "LC_COLLATE":
mode_text = "sort order"
elif mode == "LC_MESSAGES":
mode_text = "text messages"
else:
mode_text = "Unassigned"
return mode_text
def reset_cmd_locale():
""" sets the cmd_locale to its saved default """
archinstall.storage['CMD_LOCALE'] = archinstall.storage.get('CMD_LOCALE_DEFAULT',{})
def unset_cmd_locale():
""" archinstall will use the execution environment default """
archinstall.storage['CMD_LOCALE'] = {}
def set_cmd_locale(general :str = None,
charset :str = 'C',
numbers :str = 'C',
time :str = 'C',
collate :str = 'C',
messages :str = 'C'):
"""
Set the cmd locale.
If the parameter general is specified, it takes precedence over the rest (might as well not exist)
The rest define some specific settings above the installed default language. If anyone of this parameters is none means the installation default
"""
installed_locales = list_installed_locales()
result = {}
if general:
if general in installed_locales:
archinstall.storage['CMD_LOCALE'] = {'LC_ALL':general}
else:
archinstall.log(f"{get_locale_mode_text('LC_ALL')} {general} is not installed. Defaulting to C",fg="yellow",level=logging.WARNING)
return
if numbers:
if numbers in installed_locales:
result["LC_NUMERIC"] = numbers
else:
archinstall.log(f"{get_locale_mode_text('LC_NUMERIC')} {numbers} is not installed. Defaulting to installation language",fg="yellow",level=logging.WARNING)
if charset:
if charset in installed_locales:
result["LC_CTYPE"] = charset
else:
archinstall.log(f"{get_locale_mode_text('LC_CTYPE')} {charset} is not installed. Defaulting to installation language",fg="yellow",level=logging.WARNING)
if time:
if time in installed_locales:
result["LC_TIME"] = time
else:
archinstall.log(f"{get_locale_mode_text('LC_TIME')} {time} is not installed. Defaulting to installation language",fg="yellow",level=logging.WARNING)
if collate:
if collate in installed_locales:
result["LC_COLLATE"] = collate
else:
archinstall.log(f"{get_locale_mode_text('LC_COLLATE')} {collate} is not installed. Defaulting to installation language",fg="yellow",level=logging.WARNING)
if messages:
if messages in installed_locales:
result["LC_MESSAGES"] = messages
else:
archinstall.log(f"{get_locale_mode_text('LC_MESSAGES')} {messages} is not installed. Defaulting to installation language",fg="yellow",level=logging.WARNING)
archinstall.storage['CMD_LOCALE'] = result
def list_installed_locales() -> list[str]:
lista = []
for line in archinstall.SysCommand('locale -a'):
lista.append(line.decode('UTF-8').strip())
return lista
"""
end of locale helpers
"""
def select_installed_locale(mode):
mode_text = get_locale_mode_text(mode)
if mode == 'LC_ALL':
texto = "Select the default execution locale \nIf none, you will be prompted for specific settings"
else:
texto = f"Select the {mode_text} ({mode}) execution locale \nIf none, you will get the installation default"
return archinstall.generic_select([None] + list_installed_locales(),
texto,
allow_empty_input=True,
default=archinstall.storage.get('CMD_LOCALE',{}).get(mode,'C'))
"""
_menus
"""
class SetupMenu(archinstall.GeneralMenu):
def __init__(self,storage_area):
super().__init__(data_store=storage_area)
def _setup_selection_menu_options(self):
self.set_option('archinstall-language',
archinstall.Selector(
_('Select Archinstall language'),
lambda: self._select_archinstall_language('English'),
default='English',
enabled=True))
self.set_option('ntp',
archinstall.Selector(
'Activate NTP',
lambda: select_activate_NTP(),
default='Y',
enabled=True))
self.set_option('mode',
archinstall.Selector(
'Excution mode',
lambda: select_mode(),
default='full',
enabled=True))
for item in ['LC_ALL','LC_CTYPE','LC_NUMERIC','LC_TIME','LC_MESSAGES','LC_COLLATE']:
self.set_option(item,
archinstall.Selector(
f'{get_locale_mode_text(item)} locale',
lambda item=item: select_installed_locale(item), # the parmeter is needed for the lambda in the loop
enabled=True,
dependencies_not=['LC_ALL'] if item != 'LC_ALL' else []))
self.option('LC_ALL').set_enabled(True)
self.set_option('continue',
archinstall.Selector(
'Continue',
exec_func=lambda n,v: True,
enabled=True))
def exit_callback(self):
if self._data_store.get('ntp',False):
archinstall.log("Hardware time and other post-configuration steps might be required in order for NTP to work. For more information, please check the Arch wiki.", fg="yellow")
archinstall.SysCommand('timedatectl set-ntp true')
if self._data_store.get('mode',None):
archinstall.arguments['mode'] = self._data_store['mode']
archinstall.log(f"Archinstall will execute under {archinstall.arguments['mode']} mode")
if self._data_store.get('LC_ALL',None):
archinstall.storage['CMD_LOCALE'] = {'LC_ALL':self._data_store['LC_ALL']}
else:
exec_locale = {}
for item in ['LC_COLLATE','LC_CTYPE','LC_MESSAGES','LC_NUMERIC','LC_TIME']:
if self._data_store.get(item,None):
exec_locale[item] = self._data_store[item]
archinstall.storage['CMD_LOCALE'] = exec_locale
archinstall.log(f"Archinstall will execute with {archinstall.storage.get('CMD_LOCALE',None)} locale")
class MyMenu(archinstall.GlobalMenu):
def __init__(self,data_store=archinstall.arguments,mode='full'):
self._execution_mode = mode
super().__init__(data_store)
def _setup_selection_menu_options(self):
super()._setup_selection_menu_options()
options_list = []
mandatory_list = []
if self._execution_mode in ('full','lineal'):
options_list = ['keyboard-layout', 'mirror-region', 'harddrives', 'disk_layouts',
'!encryption-password','swap', 'bootloader', 'hostname', '!root-password',
'!superusers', '!users', 'profile', 'audio', 'kernels', 'packages','nic',
'timezone', 'ntp']
if archinstall.arguments.get('advanced',False):
options_list.extend(['sys-language','sys-encoding'])
mandatory_list = ['harddrives','bootloader','hostname']
elif self._execution_mode == 'only_hd':
options_list = ['harddrives', 'disk_layouts', '!encryption-password','swap']
mandatory_list = ['harddrives']
elif self._execution_mode == 'only_os':
options_list = ['keyboard-layout', 'mirror-region','bootloader', 'hostname',
'!root-password', '!superusers', '!users', 'profile', 'audio', 'kernels',
'packages', 'nic', 'timezone', 'ntp']
mandatory_list = ['hostname']
if archinstall.arguments.get('advanced',False):
options_list.expand(['sys-language','sys-encoding'])
elif self._execution_mode == 'minimal':
pass
else:
archinstall.log(f"self._execution_mode {self._execution_mode} not supported")
exit(1)
if self._execution_mode != 'lineal':
options_list.extend(['install','abort'])
if not archinstall.arguments.get('advanced'):
options_list.append('archinstall-language')
for entry in self._menu_options:
if entry in options_list:
# for not lineal executions, only self.option(entry).set_enabled and set_mandatory are necessary
if entry in mandatory_list:
self.enable(entry,mandatory=True)
else:
self.enable(entry)
else:
self.option(entry).set_enabled(False)
self._update_install()
def post_callback(self,option,value=None):
self._update_install(self._execution_mode)
def _missing_configs(self,mode='full'):
def check(s):
return self.option(s).has_selection()
_, missing = self.mandatory_overview()
if mode in ('full','only_os') and (not check('!root-password') and not check('!superusers')):
missing += 1
if mode in ('full', 'only_hd') and check('harddrives'):
if not self.option('harddrives').is_empty() and not check('disk_layouts'):
missing += 1
return missing
def _install_text(self,mode='full'):
missing = self._missing_configs(mode)
if missing > 0:
return f'Instalation ({missing} config(s) missing)'
return 'Install'
def _update_install(self,mode='full'):
text = self._install_text(mode)
self.option('install').update_description(text)
"""
Instalation general subroutines
"""
def get_current_status():
# Log various information about hardware before starting the installation. This might assist in troubleshooting
archinstall.log(f"Hardware model detected: {archinstall.sys_vendor()} {archinstall.product_name()}; UEFI mode: {archinstall.has_uefi()}", level=logging.DEBUG)
archinstall.log(f"Processor model detected: {archinstall.cpu_model()}", level=logging.DEBUG)
archinstall.log(f"Memory statistics: {archinstall.mem_available()} available out of {archinstall.mem_total()} total installed", level=logging.DEBUG)
archinstall.log(f"Virtualization detected: {archinstall.virtualization()}; is VM: {archinstall.is_vm()}", level=logging.DEBUG)
archinstall.log(f"Graphics devices detected: {archinstall.graphics_devices().keys()}", level=logging.DEBUG)
# For support reasons, we'll log the disk layout pre installation to match against post-installation layout
archinstall.log(f"Disk states before installing: {archinstall.disk_layouts()}", level=logging.DEBUG)
def ask_user_questions(mode):
"""
First, we'll ask the user for a bunch of user input.
Not until we're satisfied with what we want to install
will we continue with the actual installation steps.
"""
if archinstall.arguments.get('advanced',None):
# 3.9 syntax. former x = {**y,**z} or x.update(y)
set_cmd_locale(charset='es_ES.utf8',collate='es_ES.utf8')
setup_area = archinstall.storage.get('CMD_LOCALE',{}) | {}
with SetupMenu(setup_area) as setup:
if mode == 'lineal':
for entry in setup.list_enabled_options():
if entry in ('continue','abort'):
continue
if not setup.option(entry).enabled:
continue
setup.exec_option(entry)
else:
setup.run()
archinstall.arguments['archinstall-language'] = setup_area.get('archinstall-language')
else:
archinstall.log("Hardware time and other post-configuration steps might be required in order for NTP to work. For more information, please check the Arch wiki.", fg="yellow")
archinstall.SysCommand('timedatectl set-ntp true')
with MyMenu(data_store=archinstall.arguments,mode=mode) as global_menu:
if mode == 'lineal':
for entry in global_menu.list_enabled_options():
if entry in ('install','abort'):
continue
global_menu.exec_option(entry)
archinstall.arguments[entry] = global_menu.option(entry).get_selection()
else:
global_menu.set_option('install',
archinstall.Selector(
global_menu._install_text(mode),
exec_func=lambda n,v: True if global_menu._missing_configs(mode) == 0 else False,
enabled=True))
global_menu.run()
def perform_filesystem_operations():
"""
Issue a final warning before we continue with something un-revertable.
We mention the drive one last time, and count from 5 to 0.
"""
if archinstall.arguments.get('harddrives', None):
print(f" ! Formatting {archinstall.arguments['harddrives']} in ", end='')
archinstall.do_countdown()
"""
Setup the blockdevice, filesystem (and optionally encryption).
Once that's done, we'll hand over to perform_installation()
"""
mode = archinstall.GPT
if archinstall.has_uefi() is False:
mode = archinstall.MBR
for drive in archinstall.arguments.get('harddrives', []):
if archinstall.arguments.get('disk_layouts', {}).get(drive.path):
with archinstall.Filesystem(drive, mode) as fs:
fs.load_layout(archinstall.arguments['disk_layouts'][drive.path])
def disk_setup(installation):
# Mount all the drives to the desired mountpoint
# This *can* be done outside of the installation, but the installer can deal with it.
if archinstall.arguments.get('disk_layouts'):
installation.mount_ordered_layout(archinstall.arguments['disk_layouts'])
# Placing /boot check during installation because this will catch both re-use and wipe scenarios.
for partition in installation.partitions:
if partition.mountpoint == installation.target + '/boot':
if partition.size < 0.19: # ~200 MiB in GiB
raise archinstall.DiskError(
f"The selected /boot partition in use is not large enough to properly install a boot loader. Please resize it to at least 200MiB and re-run the installation.")
def os_setup(installation):
# if len(mirrors):
# Certain services might be running that affects the system during installation.
# Currently, only one such service is "reflector.service" which updates /etc/pacman.d/mirrorlist
# We need to wait for it before we continue since we opted in to use a custom mirror/region.
installation.log('Waiting for automatic mirror selection (reflector) to complete.', level=logging.INFO)
while archinstall.service_state('reflector') not in ('dead', 'failed'):
time.sleep(1)
# Set mirrors used by pacstrap (outside of installation)
if archinstall.arguments.get('mirror-region', None):
archinstall.use_mirrors(archinstall.arguments['mirror-region']) # Set the mirrors for the live medium
if installation.minimal_installation():
installation.set_locale(archinstall.arguments['sys-language'], archinstall.arguments['sys-encoding'].upper())
installation.set_hostname(archinstall.arguments['hostname'])
if archinstall.arguments['mirror-region'].get("mirrors", None) is not None:
installation.set_mirrors(
archinstall.arguments['mirror-region']) # Set the mirrors in the installation medium
if archinstall.arguments["bootloader"] == "grub-install" and archinstall.has_uefi():
installation.add_additional_packages("grub")
installation.add_bootloader(archinstall.arguments["bootloader"])
if archinstall.arguments['swap']:
installation.setup_swap('zram')
# If user selected to copy the current ISO network configuration
# Perform a copy of the config
if archinstall.arguments.get('nic', {}) == 'Copy ISO network configuration to installation':
installation.copy_iso_network_config(
enable_services=True) # Sources the ISO network configuration to the install medium.
elif archinstall.arguments.get('nic', {}).get('NetworkManager', False):
installation.add_additional_packages("networkmanager")
installation.enable_service('NetworkManager.service')
# Otherwise, if a interface was selected, configure that interface
elif archinstall.arguments.get('nic', {}):
installation.configure_nic(**archinstall.arguments.get('nic', {}))
installation.enable_service('systemd-networkd')
installation.enable_service('systemd-resolved')
if archinstall.arguments.get('audio', None) is not None:
installation.log(f"This audio server will be used: {archinstall.arguments.get('audio', None)}",level=logging.INFO)
if archinstall.arguments.get('audio', None) == 'pipewire':
archinstall.Application(installation, 'pipewire').install()
elif archinstall.arguments.get('audio', None) == 'pulseaudio':
print('Installing pulseaudio ...')
installation.add_additional_packages("pulseaudio")
else:
installation.log("No audio server will be installed.", level=logging.INFO)
if archinstall.arguments.get('packages', None) and archinstall.arguments.get('packages', None)[0] != '':
installation.add_additional_packages(archinstall.arguments.get('packages', None))
if archinstall.arguments.get('profile', None):
installation.install_profile(archinstall.arguments.get('profile', None))
for user, user_info in archinstall.arguments.get('!users', {}).items():
installation.user_create(user, user_info["!password"], sudo=False)
for superuser, user_info in archinstall.arguments.get('!superusers', {}).items():
installation.user_create(superuser, user_info["!password"], sudo=True)
if timezone := archinstall.arguments.get('timezone', None):
installation.set_timezone(timezone)
if archinstall.arguments.get('ntp', False):
installation.activate_time_syncronization()
if archinstall.accessibility_tools_in_use():
installation.enable_espeakup()
if (root_pw := archinstall.arguments.get('!root-password', None)) and len(root_pw):
installation.user_set_pw('root', root_pw)
# This step must be after profile installs to allow profiles to install language pre-requisits.
# After which, this step will set the language both for console and x11 if x11 was installed for instance.
installation.set_keyboard_language(archinstall.arguments['keyboard-layout'])
if archinstall.arguments['profile'] and archinstall.arguments['profile'].has_post_install():
with archinstall.arguments['profile'].load_instructions(
namespace=f"{archinstall.arguments['profile'].namespace}.py") as imported:
if not imported._post_install():
archinstall.log(' * Profile\'s post configuration requirements was not fulfilled.', fg='red')
exit(1)
# If the user provided a list of services to be enabled, pass the list to the enable_service function.
# Note that while it's called enable_service, it can actually take a list of services and iterate it.
if archinstall.arguments.get('services', None):
installation.enable_service(*archinstall.arguments['services'])
# If the user provided custom commands to be run post-installation, execute them now.
if archinstall.arguments.get('custom-commands', None):
archinstall.run_custom_user_commands(archinstall.arguments['custom-commands'], installation)
def perform_installation(mountpoint, mode):
"""
Performs the installation steps on a block device.
Only requirement is that the block devices are
formatted and setup prior to entering this function.
"""
with archinstall.Installer(mountpoint, kernels=archinstall.arguments.get('kernels', ['linux'])) as installation:
if mode in ('full','only_hd'):
disk_setup(installation)
if mode == 'only_hd':
target = pathlib.Path(f"{mountpoint}/etc/fstab")
if not target.parent.exists():
target.parent.mkdir(parents=True)
if mode in ('full','only_os'):
os_setup(installation)
installation.log("For post-installation tips, see https://wiki.archlinux.org/index.php/Installation_guide#Post-installation", fg="yellow")
if not archinstall.arguments.get('silent'):
prompt = 'Would you like to chroot into the newly created installation and perform post-installation configuration?'
choice = archinstall.Menu(prompt, ['yes', 'no'], default_option='yes').run()
if choice == 'yes':
try:
installation.drop_to_shell()
except:
pass
# For support reasons, we'll log the disk layout post installation (crash or no crash)
archinstall.log(f"Disk states after installing: {archinstall.disk_layouts()}", level=logging.DEBUG)
if not archinstall.check_mirror_reachable():
log_file = os.path.join(archinstall.storage.get('LOG_PATH', None), archinstall.storage.get('LOG_FILE', None))
archinstall.log(f"Arch Linux mirrors are not reachable. Please check your internet connection and the log file '{log_file}'.", level=logging.INFO, fg="red")
exit(1)
mode = archinstall.arguments.get('mode', 'full').lower()
if not archinstall.arguments.get('silent'):
ask_user_questions(mode)
archinstall.output_configs(archinstall.arguments,show=False if archinstall.arguments.get('silent') else True)
if archinstall.arguments.get('dry_run'):
exit(0)
if not archinstall.arguments.get('silent'):
input('Press Enter to continue.')
if mode in ('full','only_hd'):
perform_filesystem_operations()
perform_installation(archinstall.storage.get('MOUNT_POINT', '/mnt'), mode)

View File

@ -3,6 +3,7 @@
import archinstall
import logging
from archinstall.lib.hardware import __packages__ as __hwd__packages__
is_top_level_profile = True
__description__ = 'Installs a minimal system as well as xorg and graphics drivers.'