Menu option save config (#1009)

* Add new save config menu option

* Update

* Fixed issue with merging

* Fixed merge issue (I think)

Co-authored-by: Daniel Girtler <girtler.daniel@gmail.com>
Co-authored-by: Anton Hvornum <anton.feeds@gmail.com>
This commit is contained in:
Daniel 2022-03-01 03:03:37 +11:00 committed by GitHub
parent 04e3880d8f
commit 35a19a616a
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
7 changed files with 299 additions and 99 deletions

View File

@ -1,64 +1,122 @@
import json
import pathlib
import logging
from pathlib import Path
from typing import Optional, Dict
from .storage import storage
from .general import JSON, UNSAFE_JSON
from .output import log
def output_configs(area :dict, show :bool = True, save :bool = True):
""" Show on the screen the configuration data (except credentials) and/or save them on a json file
:param area: a dictionary to be shown/save (basically archinstall.arguments, but needed to be passed explictly to avoid circular references
:type area: dict
:param show:Determines if the config data will be displayed on screen in Json format
:type show: bool
:param save:Determines if the config data will we written as a Json file
:type save:bool
"""
user_credentials = {}
disk_layout = {}
user_config = {}
for key in area:
if key in ['!users','!superusers','!encryption-password']:
user_credentials[key] = area[key]
elif key == 'disk_layouts':
disk_layout = area[key]
elif key in ['abort','install','config','creds','dry_run']:
pass
else:
user_config[key] = area[key]
user_configuration_json = json.dumps({
'config_version': storage['__version__'], # Tells us what version was used to generate the config
**user_config, # __version__ will be overwritten by old version definition found in config
'version': storage['__version__']
} , indent=4, sort_keys=True, cls=JSON)
if disk_layout:
disk_layout_json = json.dumps(disk_layout, indent=4, sort_keys=True, cls=JSON)
if user_credentials:
user_credentials_json = json.dumps(user_credentials, indent=4, sort_keys=True, cls=UNSAFE_JSON)
class ConfigurationOutput:
def __init__(self, config: Dict):
"""
Configuration output handler to parse the existing configuration data structure and prepare for output on the
console and for saving it to configuration files
if save:
dest_path = pathlib.Path(storage.get('LOG_PATH','.'))
if (not dest_path.exists()) or not (dest_path.is_dir()):
log(f"Destination directory {dest_path.resolve()} does not exist or is not a directory,\n Configuration files can't be saved",fg="yellow",)
input("Press enter to continue")
else:
with (dest_path / "user_configuration.json").open('w') as config_file:
config_file.write(user_configuration_json)
if user_credentials:
target = dest_path / "user_credentials.json"
with target.open('w') as config_file:
config_file.write(user_credentials_json)
if disk_layout:
target = dest_path / "user_disk_layout.json"
with target.open('w') as config_file:
config_file.write(disk_layout_json)
:param config: A dictionary containing configurations (basically archinstall.arguments)
:type config: Dict
"""
self._config = config
self._user_credentials = {}
self._disk_layout = None
self._user_config = {}
self._default_save_path = Path(storage.get('LOG_PATH', '.'))
self._user_config_file = 'user_configuration.json'
self._user_creds_file = "user_credentials.json"
self._disk_layout_file = "user_disk_layout.json"
self._sensitive = ['!users', '!superusers', '!encryption-password']
self._ignore = ['abort', 'install', 'config', 'creds', 'dry_run']
self._process_config()
@property
def user_credentials_file(self):
return self._user_creds_file
@property
def user_configuration_file(self):
return self._user_config_file
@property
def disk_layout_file(self):
return self._disk_layout_file
def _process_config(self):
for key in self._config:
if key in self._sensitive:
self._user_credentials[key] = self._config[key]
elif key == 'disk_layouts':
self._disk_layout = self._config[key]
elif key in self._ignore:
pass
else:
self._user_config[key] = self._config[key]
def user_config_to_json(self) -> str:
return json.dumps({
'config_version': storage['__version__'], # Tells us what version was used to generate the config
**self._user_config, # __version__ will be overwritten by old version definition found in config
'version': storage['__version__']
}, indent=4, sort_keys=True, cls=JSON)
def disk_layout_to_json(self) -> Optional[str]:
if self._disk_layout:
return json.dumps(self._disk_layout, indent=4, sort_keys=True, cls=JSON)
return None
def user_credentials_to_json(self) -> Optional[str]:
if self._user_credentials:
return json.dumps(self._user_credentials, indent=4, sort_keys=True, cls=UNSAFE_JSON)
return None
def show(self):
print(_('\nThis is your chosen configuration:'))
log(" -- Chosen configuration --", level=logging.DEBUG)
user_conig = self.user_config_to_json()
disk_layout = self.disk_layout_to_json()
log(user_conig, level=logging.INFO)
if show:
print()
print('This is your chosen configuration:')
log("-- Guided template chosen (with below config) --", level=logging.DEBUG)
log(user_configuration_json, level=logging.INFO)
if disk_layout:
log(disk_layout_json, level=logging.INFO)
log(disk_layout, level=logging.INFO)
print()
def _is_valid_path(self, dest_path :Path) -> bool:
if (not dest_path.exists()) or not (dest_path.is_dir()):
log(
'Destination directory {} does not exist or is not a directory,\n Configuration files can not be saved'.format(dest_path.resolve()),
fg="yellow"
)
return False
return True
def save_user_config(self, dest_path :Path = None):
if self._is_valid_path(dest_path):
with open(dest_path / self._user_config_file, 'w') as config_file:
config_file.write(self.user_config_to_json())
def save_user_creds(self, dest_path :Path = None):
if self._is_valid_path(dest_path):
if user_creds := self.user_credentials_to_json():
target = dest_path / self._user_creds_file
with open(target, 'w') as config_file:
config_file.write(user_creds)
def save_disk_layout(self, dest_path :Path = None):
if self._is_valid_path(dest_path):
if disk_layout := self.disk_layout_to_json():
target = dest_path / self._disk_layout_file
with target.open('w') as config_file:
config_file.write(disk_layout)
def save(self, dest_path :Path = None):
if not dest_path:
dest_path = self._default_save_path
if self._is_valid_path(dest_path):
self.save_user_config(dest_path)
self.save_user_creds(dest_path)
self.save_disk_layout(dest_path)

View File

@ -12,7 +12,6 @@ import logging
if TYPE_CHECKING:
_: Any
class Menu(TerminalMenu):
def __init__(
self,
@ -24,6 +23,9 @@ class Menu(TerminalMenu):
sort :bool = True,
preset_values :Union[str, List[str]] = None,
cursor_index :int = None,
preview_command=None,
preview_size=0.75,
preview_title='Info',
**kwargs
):
"""
@ -54,6 +56,15 @@ class Menu(TerminalMenu):
:param cursor_index: The position where the cursor will be located. If it is not in range (number of elements of the menu) it goes to the first position
:type cursor_index: int
:param preview_command: A function that should return a string that will be displayed in a preview window when a menu selection item is in focus
:type preview_command: Callable
:param preview_size: Size of the preview window in ratio to the full window
:type preview_size: float
:param preview_title: Title of the preview window
:type preview_title: str
:param kwargs : any SimpleTerminal parameter
"""
# we guarantee the inmutability of the options outside the class.
@ -123,6 +134,9 @@ class Menu(TerminalMenu):
# show_search_hint=True,
preselected_entries=self.preset_values,
cursor_index=self.cursor_index,
preview_command=preview_command,
preview_size=preview_size,
preview_title=preview_title,
**kwargs,
)

View File

@ -2,7 +2,7 @@ from __future__ import annotations
import sys
import logging
from typing import Callable, Any, List, Iterator
from typing import Callable, Any, List, Iterator, Tuple, Optional
from .menu import Menu
from ..general import SysCommand, secret
@ -12,7 +12,7 @@ from ..output import log
from ..profiles import is_desktop_profile
from ..disk import encrypted_partitions
from ..locale_helpers import set_keyboard_language
from ..user_interaction import get_password, ask_for_a_timezone
from ..user_interaction import get_password, ask_for_a_timezone, save_config
from ..user_interaction import ask_ntp
from ..user_interaction import ask_for_swap
from ..user_interaction import ask_for_bootloader
@ -47,7 +47,8 @@ class Selector:
dependencies_not :List = [],
exec_func :Callable = None,
preview_func :Callable = None,
mandatory :bool = False
mandatory :bool = False,
no_store :bool = False
):
"""
Create a new menu selection entry
@ -83,13 +84,15 @@ class Selector:
menu returns to the selection screen. If not specified it is assumed the return is False
:type exec_func: Callable
:param preview_func: A callable which invokws a preview screen (not implemented)
:param preview_func: A callable which invokws a preview screen
:type preview_func: Callable
:param mandatory: A boolean which determines that the field is mandatory, i.e. menu can not be exited if it is not set
:type mandatory: bool
"""
:param no_store: A boolean which determines that the field should or shouldn't be stored in the data storage
:type no_store: bool
"""
self._description = description
self.func = func
self._display_func = display_func
@ -98,21 +101,29 @@ class Selector:
self._dependencies = dependencies
self._dependencies_not = dependencies_not
self.exec_func = exec_func
self.preview_func = preview_func
self._preview_func = preview_func
self.mandatory = mandatory
self._no_store = no_store
@property
def dependencies(self) -> dict:
def dependencies(self) -> List:
return self._dependencies
@property
def dependencies_not(self) -> dict:
def dependencies_not(self) -> List:
return self._dependencies_not
@property
def current_selection(self):
return self._current_selection
@property
def preview_func(self):
return self._preview_func
def do_store(self) -> bool:
return self._no_store is False
def set_enabled(self, status :bool = True):
self.enabled = status
@ -247,6 +258,20 @@ class GeneralMenu:
print(f'No selector found: {selector_name}')
sys.exit(1)
def _preview_display(self, selection_name: str) -> Optional[str]:
config_name, selector = self._find_selection(selection_name)
if preview := selector.preview_func:
return preview()
return None
def _find_selection(self, selection_name: str) -> Tuple[str, Selector]:
option = [[k, v] for k, v in self._menu_options.items() if v.text.strip() == selection_name.strip()]
if len(option) != 1:
raise ValueError(f'Selection not found: {selection_name}')
config_name = option[0][0]
selector = option[0][1]
return config_name, selector
def run(self):
""" Calls the Menu framework"""
# we synch all the options just in case
@ -260,7 +285,16 @@ class GeneralMenu:
self._set_kb_language()
enabled_menus = self._menus_to_enable()
menu_text = [m.text for m in enabled_menus.values()]
selection = Menu('Set/Modify the below options', menu_text, sort=False, cursor_index=cursor_pos).run()
selection = Menu(
_('Set/Modify the below options'),
menu_text,
sort=False,
cursor_index=cursor_pos,
preview_command=self._preview_display,
preview_size=0.5
).run()
if selection and self.auto_cursor:
cursor_pos = menu_text.index(selection) + 1 # before the strip otherwise fails
if cursor_pos >= len(menu_text):
@ -273,21 +307,16 @@ class GeneralMenu:
if not self.is_context_mgr:
self.__exit__()
def _process_selection(self, selection :str) -> bool:
def _process_selection(self, selection_name :str) -> bool:
""" determines and executes the selection y
Can / Should be extended to handle specific selection issues
Returns true if the menu shall continue, False if it has ended
"""
# find the selected option in our option list
option = [[k, v] for k, v in self._menu_options.items() if v.text.strip() == selection.strip()]
if len(option) != 1:
raise ValueError(f'Selection not found: {selection}')
selector_name = option[0][0]
selector = option[0][1]
config_name, selector = self._find_selection(selection_name)
return self.exec_option(config_name, selector)
return self.exec_option(selector_name,selector)
def exec_option(self,selector_name :str, p_selector :Selector = None) -> bool:
def exec_option(self, config_name :str, p_selector :Selector = None) -> bool:
""" processes the exection of a given menu entry
- pre process callback
- selection function
@ -296,20 +325,22 @@ class GeneralMenu:
returns True if the loop has to continue, false if the loop can be closed
"""
if not p_selector:
selector = self.option(selector_name)
selector = self.option(config_name)
else:
selector = p_selector
self.pre_callback(selector_name)
self.pre_callback(config_name)
result = None
if selector.func:
presel_val = self.option(selector_name).get_selection()
presel_val = self.option(config_name).get_selection()
result = selector.func(presel_val)
self._menu_options[selector_name].set_current_selection(result)
self._data_store[selector_name] = result
exec_ret_val = selector.exec_func(selector_name,result) if selector.exec_func else False
self.post_callback(selector_name,result)
self._menu_options[config_name].set_current_selection(result)
if selector.do_store():
self._data_store[config_name] = result
exec_ret_val = selector.exec_func(config_name,result) if selector.exec_func else False
self.post_callback(config_name,result)
if exec_ret_val and self._check_mandatory_status():
return False
return True
@ -515,20 +546,28 @@ class GlobalMenu(GeneralMenu):
_('Set automatic time sync (NTP)'),
lambda preset: self._select_ntp(preset),
default=True)
self._menu_options['save_config'] = \
Selector(
_('Save configuration'),
lambda: save_config(self._data_store),
enabled=True,
no_store=True)
self._menu_options['install'] = \
Selector(
self._install_text(),
exec_func=lambda n,v: True if self._missing_configs() == 0 else False,
enabled=True)
exec_func=lambda n,v: True if len(self._missing_configs()) == 0 else False,
preview_func=self._prev_install_missing_config,
enabled=True,
no_store=True)
self._menu_options['abort'] = Selector(_('Abort'), exec_func=lambda n,v:exit(1), enabled=True)
def _update_install(self,name :str = None ,result :Any = None):
def _update_install_text(self, name :str = None, result :Any = None):
text = self._install_text()
self._menu_options.get('install').update_description(text)
def post_callback(self,name :str = None ,result :Any = None):
self._update_install(name,result)
self._update_install_text(name, result)
def exit_callback(self):
if self._data_store.get('harddrives', None) and self._data_store.get('!encryption-password', None):
@ -539,29 +578,37 @@ class GlobalMenu(GeneralMenu):
storage['arguments']['disk_layouts'], storage['arguments']['!encryption-password'])
def _install_text(self):
missing = self._missing_configs()
missing = len(self._missing_configs())
if missing > 0:
return _('Install ({} config(s) missing)').format(missing)
return 'Install'
def _missing_configs(self):
def _prev_install_missing_config(self) -> Optional[str]:
if missing := self._missing_configs():
text = str(_('Missing configurations:\n'))
for m in missing:
text += f'- {m}\n'
return text[:-1] # remove last new line
return None
def _missing_configs(self) -> List[str]:
def check(s):
return self._menu_options.get(s).has_selection()
missing = 0
missing = []
if not check('bootloader'):
missing += 1
missing += ['Bootloader']
if not check('hostname'):
missing += 1
missing += ['Hostname']
if not check('audio'):
missing += 1
missing += ['Audio']
if not check('!root-password') and not check('!superusers'):
missing += 1
missing += [str(_('Either root-password or at least 1 superuser must be specified'))]
if not check('harddrives'):
missing += 1
missing += ['Hard drives']
if check('harddrives'):
if not self._menu_options.get('harddrives').is_empty() and not check('disk_layouts'):
missing += 1
missing += ['Disk layout']
return missing

View File

@ -9,11 +9,13 @@ import signal
import sys
import time
from collections.abc import Iterable
from pathlib import Path
from copy import copy
from typing import List, Any, Optional, Dict, Union, TYPE_CHECKING
# https://stackoverflow.com/a/39757388/929999
from .menu.text_input import TextInput
from .configuration import ConfigurationOutput
from .models.network_configuration import NetworkConfiguration, NicType
if TYPE_CHECKING:
@ -1165,3 +1167,67 @@ def generic_multi_select(p_options :Union[list,dict],
sort=sort,
multi=True,
default=default)
def save_config(config: Dict):
def preview(selection: str):
if options['user_config'] == selection:
json_config = config_output.user_config_to_json()
return f'{config_output.user_configuration_file}\n{json_config}'
elif options['user_creds'] == selection:
if json_config := config_output.user_credentials_to_json():
return f'{config_output.user_credentials_file}\n{json_config}'
else:
return str(_('No configuration'))
elif options['disk_layout'] == selection:
if json_config := config_output.disk_layout_to_json():
return f'{config_output.disk_layout_file}\n{json_config}'
else:
return str(_('No configuration'))
elif options['all'] == selection:
output = f'{config_output.user_configuration_file}\n'
if json_config := config_output.user_credentials_to_json():
output += f'{config_output.user_credentials_file}\n'
if json_config := config_output.disk_layout_to_json():
output += f'{config_output.disk_layout_file}\n'
return output[:-1]
return None
config_output = ConfigurationOutput(config)
options = {
'user_config': str(_('Save user configuration')),
'user_creds': str(_('Save user credentials')),
'disk_layout': str(_('Save disk layout')),
'all': str(_('Save all'))
}
selection = Menu(
_('Choose which configuration to save'),
list(options.values()),
sort=False,
skip=True,
preview_size=0.75,
preview_command=preview
).run()
if not selection:
return
while True:
path = input(_('Enter a directory for the configuration(s) to be saved: ')).strip(' ')
dest_path = Path(path)
if dest_path.exists() and dest_path.is_dir():
break
log(_('Not a valid directory: {}').format(dest_path), fg='red')
if options['user_config'] == selection:
config_output.save_user_config(dest_path)
elif options['user_creds'] == selection:
config_output.save_user_creds(dest_path)
elif options['disk_layout'] == selection:
config_output.save_disk_layout(dest_path)
elif options['all'] == selection:
config_output.save_user_config(dest_path)
config_output.save_user_creds(dest_path)
config_output.save_disk_layout(dest_path)

View File

@ -3,6 +3,7 @@ import os
import time
import archinstall
from archinstall import ConfigurationOutput
if archinstall.arguments.get('help'):
print("See `man archinstall` for help.")
@ -257,7 +258,10 @@ if not archinstall.arguments.get('offline', False):
if not archinstall.arguments.get('silent'):
ask_user_questions()
archinstall.output_configs(archinstall.arguments,show=False if archinstall.arguments.get('silent') else True)
config_output = ConfigurationOutput(archinstall.arguments)
if not archinstall.arguments.get('silent'):
config_output.show()
config_output.save()
if archinstall.arguments.get('dry_run'):
exit(0)

View File

@ -4,6 +4,8 @@ import os
import pathlib
import archinstall
from archinstall import ConfigurationOutput
class OnlyHDMenu(archinstall.GlobalMenu):
def _setup_selection_menu_options(self):
@ -23,7 +25,7 @@ class OnlyHDMenu(archinstall.GlobalMenu):
self.enable(entry)
else:
self.option(entry).set_enabled(False)
self._update_install()
self._update_install_text()
def _missing_configs(self):
""" overloaded method """
@ -122,7 +124,11 @@ if not archinstall.check_mirror_reachable():
if not archinstall.arguments.get('silent'):
ask_user_questions()
archinstall.output_configs(archinstall.arguments,show=False if archinstall.arguments.get('silent') else True)
config_output = ConfigurationOutput(archinstall.arguments)
if not archinstall.arguments.get('silent'):
config_output.show()
config_output.save()
if archinstall.arguments.get('dry_run'):
exit(0)

View File

@ -19,6 +19,7 @@ import time
import pathlib
import archinstall
from archinstall import ConfigurationOutput
if archinstall.arguments.get('help'):
print("See `man archinstall` for help.")
@ -248,10 +249,10 @@ class MyMenu(archinstall.GlobalMenu):
self.enable(entry)
else:
self.option(entry).set_enabled(False)
self._update_install()
self._update_install_text()
def post_callback(self,option,value=None):
self._update_install(self._execution_mode)
self._update_install_text(self._execution_mode)
def _missing_configs(self,mode='full'):
def check(s):
@ -271,7 +272,7 @@ class MyMenu(archinstall.GlobalMenu):
return f'Instalation ({missing} config(s) missing)'
return 'Install'
def _update_install(self,mode='full'):
def _update_install_text(self, mode='full'):
text = self._install_text(mode)
self.option('install').update_description(text)
@ -492,7 +493,11 @@ mode = archinstall.arguments.get('mode', 'full').lower()
if not archinstall.arguments.get('silent'):
ask_user_questions(mode)
archinstall.output_configs(archinstall.arguments,show=False if archinstall.arguments.get('silent') else True)
config_output = ConfigurationOutput(archinstall.arguments)
if not archinstall.arguments.get('silent'):
config_output.show()
config_output.save()
if archinstall.arguments.get('dry_run'):
exit(0)
if not archinstall.arguments.get('silent'):