Update the subvolume menu - fix for #1278 (#1297)

* Update subvolume

* Add mypy compliance

Co-authored-by: Daniel Girtler <girtler.daniel@gmail.com>
Co-authored-by: Anton Hvornum <anton@hvornum.se>
This commit is contained in:
Daniel Girtler 2022-06-07 01:28:46 +10:00 committed by GitHub
parent 2d4b262046
commit a7ca037a26
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
17 changed files with 269 additions and 428 deletions

View File

@ -15,4 +15,4 @@ jobs:
# one day this will be enabled
# run: mypy --strict --module archinstall || exit 0
- name: run mypy
run: mypy --follow-imports=silent archinstall/lib/menu/selection_menu.py archinstall/lib/menu/global_menu.py archinstall/lib/models/network_configuration.py archinstall/lib/menu/list_manager.py archinstall/lib/user_interaction/network_conf.py archinstall/lib/models/users.py archinstall/lib/translation.py
run: mypy --follow-imports=silent archinstall/lib/menu/selection_menu.py archinstall/lib/menu/global_menu.py archinstall/lib/models/network_configuration.py archinstall/lib/menu/list_manager.py archinstall/lib/user_interaction/network_conf.py archinstall/lib/models/users.py archinstall/lib/user_interaction/subvolume_config.py archinstall/lib/disk/btrfs/btrfs_helpers.py archinstall/lib/translation.py

View File

@ -226,8 +226,6 @@ def post_process_arguments(arguments):
load_plugin(arguments['plugin'])
if arguments.get('disk_layouts', None) is not None:
# if 'disk_layouts' not in storage:
# storage['disk_layouts'] = {}
layout_storage = {}
if not json_stream_to_structure('--disk_layouts',arguments['disk_layouts'],layout_storage):
exit(1)
@ -236,10 +234,12 @@ def post_process_arguments(arguments):
arguments['harddrives'] = [disk for disk in layout_storage]
# backward compatibility. Change partition.format for partition.wipe
for disk in layout_storage:
for i,partition in enumerate(layout_storage[disk].get('partitions',[])):
for i, partition in enumerate(layout_storage[disk].get('partitions',[])):
if 'format' in partition:
partition['wipe'] = partition['format']
del partition['format']
elif 'btrfs' in partition:
partition['btrfs']['subvolumes'] = Subvolume.parse_arguments(partition['btrfs']['subvolumes'])
arguments['disk_layouts'] = layout_storage
load_config()

View File

@ -2,8 +2,7 @@ from __future__ import annotations
import pathlib
import glob
import logging
import re
from typing import Union, Dict, TYPE_CHECKING, Any, Iterator
from typing import Union, Dict, TYPE_CHECKING
# https://stackoverflow.com/a/39757388/929999
if TYPE_CHECKING:
@ -15,30 +14,15 @@ from .btrfs_helpers import (
setup_subvolumes as setup_subvolumes,
mount_subvolume as mount_subvolume
)
from .btrfssubvolume import BtrfsSubvolume as BtrfsSubvolume
from .btrfssubvolumeinfo import BtrfsSubvolumeInfo as BtrfsSubvolume
from .btrfspartition import BTRFSPartition as BTRFSPartition
from ..helpers import get_mount_info
from ...exceptions import DiskError, Deprecated
from ...general import SysCommand
from ...output import log
from ...exceptions import SysCallError
def get_subvolume_info(path :pathlib.Path) -> Dict[str, Any]:
try:
output = SysCommand(f"btrfs subvol show {path}").decode()
except SysCallError as error:
print('Error:', error)
result = {}
for line in output.replace('\r\n', '\n').split('\n'):
if ':' in line:
key, val = line.replace('\t', '').split(':', 1)
result[key.strip().lower().replace(' ', '_')] = val.strip()
return result
def create_subvolume(installation :Installer, subvolume_location :Union[pathlib.Path, str]) -> bool:
def create_subvolume(installation: Installer, subvolume_location :Union[pathlib.Path, str]) -> bool:
"""
This function uses btrfs to create a subvolume.
@ -71,112 +55,6 @@ def create_subvolume(installation :Installer, subvolume_location :Union[pathlib.
if (cmd := SysCommand(f"btrfs subvolume create {target}")).exit_code != 0:
raise DiskError(f"Could not create a subvolume at {target}: {cmd}")
def _has_option(option :str,options :list) -> bool:
""" auxiliary routine to check if an option is present in a list.
we check if the string appears in one of the options, 'cause it can appear in several forms (option, option=val,...)
"""
if not options:
return False
for item in options:
if option in item:
return True
return False
def manage_btrfs_subvolumes(installation :Installer,
partition :Dict[str, str],) -> list:
def manage_btrfs_subvolumes(installation :Installer, partition :Dict[str, str]) -> list:
raise Deprecated("Use setup_subvolumes() instead.")
from copy import deepcopy
""" we do the magic with subvolumes in a centralized place
parameters:
* the installation object
* the partition dictionary entry which represents the physical partition
returns
* mountpoinst, the list which contains all the "new" partititon to be mounted
We expect the partition has been mounted as / , and it to be unmounted after the processing
Then we create all the subvolumes inside btrfs as demand
We clone then, both the partition dictionary and the object inside it and adapt it to the subvolume needs
Then we return a list of "new" partitions to be processed as "normal" partitions
# TODO For encrypted devices we need some special processing prior to it
"""
# We process each of the pairs <subvolume name: mount point | None | mount info dict>
# th mount info dict has an entry for the path of the mountpoint (named 'mountpoint') and 'options' which is a list
# of mount options (or similar used by brtfs)
mountpoints = []
subvolumes = partition['btrfs']['subvolumes']
for name, right_hand in subvolumes.items():
try:
# we normalize the subvolume name (getting rid of slash at the start if exists. In our implementation has no semantic load - every subvolume is created from the top of the hierarchy- and simplifies its further use
if name.startswith('/'):
name = name[1:]
# renormalize the right hand.
location = None
subvol_options = []
# no contents, so it is not to be mounted
if not right_hand:
location = None
# just a string. per backward compatibility the mount point
elif isinstance(right_hand,str):
location = right_hand
# a dict. two elements 'mountpoint' (obvious) and and a mount options list ¿?
elif isinstance(right_hand,dict):
location = right_hand.get('mountpoint',None)
subvol_options = right_hand.get('options',[])
# we create the subvolume
create_subvolume(installation,name)
# Make the nodatacow processing now
# It will be the main cause of creation of subvolumes which are not to be mounted
# it is not an options which can be established by subvolume (but for whole file systems), and can be
# set up via a simple attribute change in a directory (if empty). And here the directories are brand new
if 'nodatacow' in subvol_options:
if (cmd := SysCommand(f"chattr +C {installation.target}/{name}")).exit_code != 0:
raise DiskError(f"Could not set nodatacow attribute at {installation.target}/{name}: {cmd}")
# entry is deleted so nodatacow doesn't propagate to the mount options
del subvol_options[subvol_options.index('nodatacow')]
# Make the compress processing now
# it is not an options which can be established by subvolume (but for whole file systems), and can be
# set up via a simple attribute change in a directory (if empty). And here the directories are brand new
# in this way only zstd compression is activaded
# TODO WARNING it is not clear if it should be a standard feature, so it might need to be deactivated
if 'compress' in subvol_options:
if not _has_option('compress',partition.get('filesystem',{}).get('mount_options',[])):
if (cmd := SysCommand(f"chattr +c {installation.target}/{name}")).exit_code != 0:
raise DiskError(f"Could not set compress attribute at {installation.target}/{name}: {cmd}")
# entry is deleted so compress doesn't propagate to the mount options
del subvol_options[subvol_options.index('compress')]
# END compress processing.
# we do not mount if THE basic partition will be mounted or if we exclude explicitly this subvolume
if not partition['mountpoint'] and location is not None:
# we begin to create a fake partition entry. First we copy the original -the one that corresponds to
# the primary partition. We make a deepcopy to avoid altering the original content in any case
fake_partition = deepcopy(partition)
# we start to modify entries in the "fake partition" to match the needs of the subvolumes
# to avoid any chance of entering in a loop (not expected) we delete the list of subvolumes in the copy
del fake_partition['btrfs']
fake_partition['encrypted'] = False
fake_partition['generate-encryption-key-file'] = False
# Mount destination. As of now the right hand part
fake_partition['mountpoint'] = location
# we load the name in an attribute called subvolume, but i think it is not needed anymore, 'cause the mount logic uses a different path.
fake_partition['subvolume'] = name
# here we add the special mount options for the subvolume, if any.
# if the original partition['options'] is not a list might give trouble
if fake_partition.get('filesystem',{}).get('mount_options',[]):
fake_partition['filesystem']['mount_options'].extend(subvol_options)
else:
fake_partition['filesystem']['mount_options'] = subvol_options
# Here comes the most exotic part. The dictionary attribute 'device_instance' contains an instance of Partition. This instance will be queried along the mount process at the installer.
# As the rest will query there the path of the "partition" to be mounted, we feed it with the bind name needed to mount subvolumes
# As we made a deepcopy we have a fresh instance of this object we can manipulate problemless
fake_partition['device_instance'].path = f"{partition['device_instance'].path}[/{name}]"
# Well, now that this "fake partition" is ready, we add it to the list of the ones which are to be mounted,
# as "normal" ones
mountpoints.append(fake_partition)
except Exception as e:
raise e
return mountpoints

View File

@ -1,72 +1,42 @@
import pathlib
import logging
from typing import Optional
from pathlib import Path
from typing import Optional, Dict, Any, TYPE_CHECKING
from ...models.subvolume import Subvolume
from ...exceptions import SysCallError, DiskError
from ...general import SysCommand
from ...output import log
from ..helpers import get_mount_info
from .btrfssubvolume import BtrfsSubvolume
from .btrfssubvolumeinfo import BtrfsSubvolumeInfo
if TYPE_CHECKING:
from .btrfspartition import BTRFSPartition
from ...installer import Installer
def mount_subvolume(installation, device, name, subvolume_information):
# we normalize the subvolume name (getting rid of slash at the start if exists. In our implementation has no semantic load.
def mount_subvolume(installation: 'Installer', device: 'BTRFSPartition', subvolume: Subvolume):
# we normalize the subvolume name (getting rid of slash at the start if exists.
# In our implementation has no semantic load.
# Every subvolume is created from the top of the hierarchy- and simplifies its further use
name = name.lstrip('/')
# renormalize the right hand.
mountpoint = subvolume_information.get('mountpoint', None)
if not mountpoint:
return None
if type(mountpoint) == str:
mountpoint = pathlib.Path(mountpoint)
installation_target = installation.target
if type(installation_target) == str:
installation_target = pathlib.Path(installation_target)
name = subvolume.name.lstrip('/')
mountpoint = Path(subvolume.mountpoint)
installation_target = Path(installation.target)
mountpoint = installation_target / mountpoint.relative_to(mountpoint.anchor)
mountpoint.mkdir(parents=True, exist_ok=True)
mount_options = subvolume_information.get('options', [])
if not any('subvol=' in x for x in mount_options):
mount_options += [f'subvol={name}']
mount_options = subvolume.options + [f'subvol={name}']
log(f"Mounting subvolume {name} on {device} to {mountpoint}", level=logging.INFO, fg="gray")
SysCommand(f"mount {device.path} {mountpoint} -o {','.join(mount_options)}")
def setup_subvolumes(installation, partition_dict):
"""
Taken from: ..user_guides.py
partition['btrfs'] = {
"subvolumes" : {
"@": "/",
"@home": "/home",
"@log": "/var/log",
"@pkg": "/var/cache/pacman/pkg",
"@.snapshots": "/.snapshots"
}
}
"""
def setup_subvolumes(installation: 'Installer', partition_dict: Dict[str, Any]):
log(f"Setting up subvolumes: {partition_dict['btrfs']['subvolumes']}", level=logging.INFO, fg="gray")
for name, right_hand in partition_dict['btrfs']['subvolumes'].items():
for subvolume in partition_dict['btrfs']['subvolumes']:
# we normalize the subvolume name (getting rid of slash at the start if exists. In our implementation has no semantic load.
# Every subvolume is created from the top of the hierarchy- and simplifies its further use
name = name.lstrip('/')
# renormalize the right hand.
# mountpoint = None
subvol_options = []
match right_hand:
# case str(): # backwards-compatability
# mountpoint = right_hand
case dict():
# mountpoint = right_hand.get('mountpoint', None)
subvol_options = right_hand.get('options', [])
name = subvolume.name.lstrip('/')
# We create the subvolume using the BTRFSPartition instance.
# That way we ensure not only easy access, but also accurate mount locations etc.
@ -76,27 +46,25 @@ def setup_subvolumes(installation, partition_dict):
# It will be the main cause of creation of subvolumes which are not to be mounted
# it is not an options which can be established by subvolume (but for whole file systems), and can be
# set up via a simple attribute change in a directory (if empty). And here the directories are brand new
if 'nodatacow' in subvol_options:
if subvolume.nodatacow:
if (cmd := SysCommand(f"chattr +C {installation.target}/{name}")).exit_code != 0:
raise DiskError(f"Could not set nodatacow attribute at {installation.target}/{name}: {cmd}")
# entry is deleted so nodatacow doesn't propagate to the mount options
del subvol_options[subvol_options.index('nodatacow')]
# Make the compress processing now
# it is not an options which can be established by subvolume (but for whole file systems), and can be
# set up via a simple attribute change in a directory (if empty). And here the directories are brand new
# in this way only zstd compression is activaded
# TODO WARNING it is not clear if it should be a standard feature, so it might need to be deactivated
if 'compress' in subvol_options:
if subvolume.compress:
if not any(['compress' in filesystem_option for filesystem_option in partition_dict.get('filesystem', {}).get('mount_options', [])]):
if (cmd := SysCommand(f"chattr +c {installation.target}/{name}")).exit_code != 0:
raise DiskError(f"Could not set compress attribute at {installation.target}/{name}: {cmd}")
# entry is deleted so compress doesn't propagate to the mount options
del subvol_options[subvol_options.index('compress')]
def subvolume_info_from_path(path :pathlib.Path) -> Optional[BtrfsSubvolume]:
def subvolume_info_from_path(path: Path) -> Optional[BtrfsSubvolumeInfo]:
try:
subvolume_name = None
subvolume_name = ''
result = {}
for index, line in enumerate(SysCommand(f"btrfs subvolume show {path}")):
if index == 0:
@ -110,14 +78,14 @@ def subvolume_info_from_path(path :pathlib.Path) -> Optional[BtrfsSubvolume]:
# allows for hooking in a pre-processor to do this we have to do it here:
result[key.lower().replace(' ', '_').replace('(s)', 's')] = value.strip()
return BtrfsSubvolume(**{'full_path' : path, 'name' : subvolume_name, **result})
return BtrfsSubvolumeInfo(**{'full_path' : path, 'name' : subvolume_name, **result}) # type: ignore
except SysCallError as error:
log(f"Could not retrieve subvolume information from {path}: {error}", level=logging.WARNING, fg="orange")
return None
def find_parent_subvolume(path :pathlib.Path, filters=[]):
def find_parent_subvolume(path: Path, filters=[]) -> Optional[BtrfsSubvolumeInfo]:
# A root path cannot have a parent
if str(path) == '/':
return None
@ -127,6 +95,8 @@ def find_parent_subvolume(path :pathlib.Path, filters=[]):
if found_mount['target'] == '/':
return None
return find_parent_subvolume(path.parent, traverse=True, filters=[*filters, found_mount['target']])
return find_parent_subvolume(path.parent, filters=[*filters, found_mount['target']])
return subvolume
return subvolume
return None

View File

@ -15,7 +15,7 @@ from .btrfs_helpers import (
if TYPE_CHECKING:
from ...installer import Installer
from .btrfssubvolume import BtrfsSubvolume
from .btrfssubvolumeinfo import BtrfsSubvolumeInfo
class BTRFSPartition(Partition):
def __init__(self, *args, **kwargs):
@ -50,7 +50,7 @@ class BTRFSPartition(Partition):
for child in iterate_children(filesystem):
yield child
def create_subvolume(self, subvolume :pathlib.Path, installation :Optional['Installer'] = None) -> 'BtrfsSubvolume':
def create_subvolume(self, subvolume :pathlib.Path, installation :Optional['Installer'] = None) -> 'BtrfsSubvolumeInfo':
"""
Subvolumes have to be created within a mountpoint.
This means we need to get the current installation target.
@ -117,4 +117,4 @@ class BTRFSPartition(Partition):
# And deal with it here:
SysCommand(f"btrfs subvolume create {subvolume}")
return subvolume_info_from_path(subvolume)
return subvolume_info_from_path(subvolume)

View File

@ -16,8 +16,9 @@ from ...general import SysCommand
from ...output import log
from ...storage import storage
@dataclass
class BtrfsSubvolume:
class BtrfsSubvolumeInfo:
full_path :pathlib.Path
name :str
uuid :str
@ -188,4 +189,4 @@ class BtrfsSubvolume:
def unmount(self, recurse :bool = True):
SysCommand(f"umount {'-R' if recurse else ''} {self.full_path}")
log(f"Successfully unmounted {self}", level=logging.INFO, fg="gray")
log(f"Successfully unmounted {self}", level=logging.INFO, fg="gray")

View File

@ -8,6 +8,8 @@ import time
import glob
from typing import Union, List, Iterator, Dict, Optional, Any, TYPE_CHECKING
# https://stackoverflow.com/a/39757388/929999
from ..models.subvolume import Subvolume
if TYPE_CHECKING:
from .partition import Partition
@ -469,6 +471,7 @@ def convert_device_to_uuid(path :str) -> str:
raise DiskError(f"Could not retrieve the UUID of {path} within a timely manner.")
def has_mountpoint(partition: Union[dict,Partition,MapperDev], target: str, strict: bool = True) -> bool:
""" Determine if a certain partition is mounted (or has a mountpoint) as specific target (path)
Coded for clarity rather than performance
@ -485,10 +488,12 @@ def has_mountpoint(partition: Union[dict,Partition,MapperDev], target: str, stri
"""
# we create the mountpoint list
if isinstance(partition,dict):
subvols = partition.get('btrfs',{}).get('subvolumes',{})
mountpoints = [partition.get('mountpoint'),] + [subvols[subvol] if isinstance(subvols[subvol],str) or not subvols[subvol] else subvols[subvol].get('mountpoint') for subvol in subvols]
subvolumes: List[Subvolume] = partition.get('btrfs',{}).get('subvolumes', [])
mountpoints = [partition.get('mountpoint')]
mountpoints += [volume.mountpoint for volume in subvolumes]
else:
mountpoints = [partition.mountpoint,] + [subvol.target for subvol in partition.subvolumes]
# we check
if strict or target == '/':
if target in mountpoints:

View File

@ -10,7 +10,7 @@ from ..general import SysCommand
from ..output import log
if TYPE_CHECKING:
from .btrfs import BtrfsSubvolume
from .btrfs import BtrfsSubvolumeInfo
@dataclass
class MapperDev:
@ -37,12 +37,12 @@ class MapperDev:
for slave in glob.glob(f"/sys/class/block/{dm_device.name}/slaves/*"):
partition_belonging_to_dmcrypt_device = pathlib.Path(slave).name
try:
uevent_data = SysCommand(f"blkid -o export /dev/{partition_belonging_to_dmcrypt_device}").decode()
except SysCallError as error:
log(f"Could not get information on device /dev/{partition_belonging_to_dmcrypt_device}: {error}", level=logging.ERROR, fg="red")
information = uevent(uevent_data)
block_device = BlockDevice(get_parent_of_partition('/dev/' / pathlib.Path(information['DEVNAME'])))
@ -75,10 +75,10 @@ class MapperDev:
return get_filesystem_type(self.path)
@property
def subvolumes(self) -> Iterator['BtrfsSubvolume']:
def subvolumes(self) -> Iterator['BtrfsSubvolumeInfo']:
from .btrfs import subvolume_info_from_path
for mountpoint in self.mount_information:
if target := mountpoint.get('target'):
if subvolume := subvolume_info_from_path(pathlib.Path(target)):
yield subvolume
yield subvolume

View File

@ -14,7 +14,7 @@ from ..exceptions import DiskError, SysCallError, UnknownFilesystemFormat
from ..output import log
from ..general import SysCommand
from .btrfs.btrfs_helpers import subvolume_info_from_path
from .btrfs.btrfssubvolume import BtrfsSubvolume
from .btrfs.btrfssubvolumeinfo import BtrfsSubvolumeInfo
class Partition:
def __init__(self,
@ -185,7 +185,7 @@ class Partition:
for i in range(storage['DISK_RETRY_ATTEMPTS']):
if not self.partprobe():
raise DiskError(f"Could not perform partprobe on {self.device_path}")
time.sleep(max(0.1, storage['DISK_TIMEOUTS'] * i))
partuuid = self._safe_part_uuid
@ -294,9 +294,9 @@ class Partition:
return bind_name
@property
def subvolumes(self) -> Iterator[BtrfsSubvolume]:
def subvolumes(self) -> Iterator[BtrfsSubvolumeInfo]:
from .helpers import findmnt
def iterate_children_recursively(information):
for child in information.get('children', []):
if target := child.get('target'):
@ -452,7 +452,7 @@ class Partition:
if retry is True:
log(f"Retrying in {storage.get('DISK_TIMEOUTS', 1)} seconds.", level=logging.WARNING, fg="orange")
time.sleep(storage.get('DISK_TIMEOUTS', 1))
return self.format(filesystem, path, log_formatting, options, retry=False)
if get_filesystem_type(path) == 'crypto_LUKS' or get_filesystem_type(self.real_device) == 'crypto_LUKS':

View File

@ -3,6 +3,8 @@ import logging
from typing import Optional, Dict, Any, List, TYPE_CHECKING
# https://stackoverflow.com/a/39757388/929999
from ..models.subvolume import Subvolume
if TYPE_CHECKING:
from .blockdevice import BlockDevice
_: Any
@ -107,17 +109,14 @@ def suggest_single_disk_layout(block_device :BlockDevice,
# https://unix.stackexchange.com/questions/246976/btrfs-subvolume-uuid-clash
# https://github.com/classy-giraffe/easy-arch/blob/main/easy-arch.sh
layout[block_device.path]['partitions'][1]['btrfs'] = {
"subvolumes" : {
"@":"/",
"@home": "/home",
"@log": "/var/log",
"@pkg": "/var/cache/pacman/pkg",
"@.snapshots": "/.snapshots"
}
'subvolumes': [
Subvolume('@', '/'),
Subvolume('@home', '/home'),
Subvolume('@log', '/var/log'),
Subvolume('@pkg', '/var/cache/pacman/pkg'),
Subvolume('@.snapshots', '/.snapshots')
]
}
# else:
# pass # ... implement a guided setup
elif using_home_partition:
# If we don't want to use subvolumes,
# But we want to be able to re-use data between re-installs..

View File

@ -24,6 +24,7 @@ from .disk.partition import get_mount_fs_type
from .exceptions import DiskError, ServiceException, RequirementError, HardwareIncompatibilityError, SysCallError
from .hsm import fido2_enroll
from .models.users import User
from .models.subvolume import Subvolume
if TYPE_CHECKING:
_: Any
@ -263,47 +264,25 @@ class Installer:
hsm_device_path = storage['arguments']['HSM']
fido2_enroll(hsm_device_path, partition['device_instance'], password)
# we manage the btrfs partitions
if any(btrfs_subvolumes := [entry for entry in list_part if entry.get('btrfs', {}).get('subvolumes', {})]):
for partition in btrfs_subvolumes:
if mount_options := ','.join(partition.get('filesystem',{}).get('mount_options',[])):
self.mount(partition['device_instance'], "/", options=mount_options)
else:
self.mount(partition['device_instance'], "/")
btrfs_subvolumes = [entry for entry in list_part if entry.get('btrfs', {}).get('subvolumes', [])]
setup_subvolumes(
installation=self,
partition_dict=partition
)
partition['device_instance'].unmount()
for partition in btrfs_subvolumes:
device_instance = partition['device_instance']
mount_options = partition.get('filesystem', {}).get('mount_options', [])
self.mount(device_instance, "/", options=','.join(mount_options))
setup_subvolumes(installation=self, partition_dict=partition)
device_instance.unmount()
# We then handle any special cases, such as btrfs
if any(btrfs_subvolumes := [entry for entry in list_part if entry.get('btrfs', {}).get('subvolumes', {})]):
for partition_information in btrfs_subvolumes:
for name, mountpoint in sorted(partition_information['btrfs']['subvolumes'].items(), key=lambda item: item[1]):
btrfs_subvolume_information = {}
match mountpoint:
case str(): # backwards-compatability
btrfs_subvolume_information['mountpoint'] = mountpoint
btrfs_subvolume_information['options'] = []
case dict():
btrfs_subvolume_information['mountpoint'] = mountpoint.get('mountpoint', None)
btrfs_subvolume_information['options'] = mountpoint.get('options', [])
case _:
continue
if mountpoint_parsed := btrfs_subvolume_information.get('mountpoint'):
# We cache the mount call for later
mount_queue[mountpoint_parsed] = lambda device=partition_information['device_instance'], \
name=name, \
subvolume_information=btrfs_subvolume_information: mount_subvolume(
installation=self,
device=device,
name=name,
subvolume_information=subvolume_information
)
for partition in btrfs_subvolumes:
subvolumes: List[Subvolume] = partition['btrfs']['subvolumes']
for subvolume in sorted(subvolumes, key=lambda item: item.mountpoint):
# We cache the mount call for later
mount_queue[subvolume.mountpoint] = lambda sub_vol=subvolume, device=partition['device_instance']: mount_subvolume(
installation=self,
device=device,
subvolume=sub_vol
)
# We mount ordinary partitions, and we sort them by the mountpoint
for partition in sorted([entry for entry in list_part if entry.get('mountpoint', False)], key=lambda part: part['mountpoint']):

View File

@ -325,22 +325,23 @@ class GlobalMenu(GeneralMenu):
def _select_harddrives(self, old_harddrives : list) -> List:
harddrives = select_harddrives(old_harddrives)
if len(harddrives) == 0:
prompt = _(
"You decided to skip harddrive selection\nand will use whatever drive-setup is mounted at {} (experimental)\n"
"WARNING: Archinstall won't check the suitability of this setup\n"
"Do you wish to continue?"
).format(storage['MOUNT_POINT'])
if harddrives:
if len(harddrives) == 0:
prompt = _(
"You decided to skip harddrive selection\nand will use whatever drive-setup is mounted at {} (experimental)\n"
"WARNING: Archinstall won't check the suitability of this setup\n"
"Do you wish to continue?"
).format(storage['MOUNT_POINT'])
choice = Menu(prompt, Menu.yes_no(), default_option=Menu.yes(), skip=False).run()
choice = Menu(prompt, Menu.yes_no(), default_option=Menu.yes(), skip=False).run()
if choice.value == Menu.no():
return self._select_harddrives(old_harddrives)
if choice.value == Menu.no():
return self._select_harddrives(old_harddrives)
# in case the harddrives got changed we have to reset the disk layout as well
if old_harddrives != harddrives:
self._menu_options['disk_layouts'].set_current_selection(None)
storage['arguments']['disk_layouts'] = {}
# in case the harddrives got changed we have to reset the disk layout as well
if old_harddrives != harddrives:
self._menu_options['disk_layouts'].set_current_selection(None)
storage['arguments']['disk_layouts'] = {}
return harddrives

View File

@ -137,34 +137,35 @@ class ListManager:
else:
self._default_action = [str(default_action),]
self.header = header if header else None
self.cancel_action = str(_('Cancel'))
self.confirm_action = str(_('Confirm and exit'))
self.separator = ''
self.bottom_list = [self.confirm_action,self.cancel_action]
self.bottom_item = [self.cancel_action]
self.base_actions = base_actions if base_actions else [str(_('Add')),str(_('Copy')),str(_('Edit')),str(_('Delete'))]
self._header = header if header else None
self._cancel_action = str(_('Cancel'))
self._confirm_action = str(_('Confirm and exit'))
self._separator = ''
self._bottom_list = [self._confirm_action, self._cancel_action]
self._bottom_item = [self._cancel_action]
self._base_actions = base_actions if base_actions else [str(_('Add')), str(_('Copy')), str(_('Edit')), str(_('Delete'))]
self._original_data = copy.deepcopy(base_list)
self._data = copy.deepcopy(base_list) # as refs, changes are immediate
# default values for the null case
self.target: Optional[Any] = None
self.action = self._null_action
if len(self._data) == 0 and self._null_action:
self._data = self.exec_action(self._data)
def run(self):
while True:
# this will return a dictionary with the key as the menu entry to be displayed
# and the value is the original value from the self._data container
data_formatted = self.reformat(self._data)
options = list(data_formatted.keys())
options.append(self.separator)
if len(options) > 0:
options.append(self._separator)
if self._default_action:
options += self._default_action
options += self.bottom_list
options += self._bottom_list
system('clear')
@ -174,12 +175,12 @@ class ListManager:
sort=False,
clear_screen=False,
clear_menu_on_exit=False,
header=self.header,
header=self._header,
skip_empty_entries=True,
skip=False
).run()
if not target.value or target.value in self.bottom_list:
if not target.value or target.value in self._bottom_list:
self.action = target
break
@ -201,13 +202,13 @@ class ListManager:
# Possible enhancement. If run_actions returns false a message line indicating the failure
self.run_actions(target.value)
if target.value == self.cancel_action: # TODO dubious
if target.value == self._cancel_action: # TODO dubious
return self._original_data # return the original list
else:
return self._data
def run_actions(self,prompt_data=None):
options = self.action_list() + self.bottom_item
options = self.action_list() + self._bottom_item
prompt = _("Select an action for < {} >").format(prompt_data if prompt_data else self.target)
choice = Menu(
prompt,
@ -215,13 +216,13 @@ class ListManager:
sort=False,
clear_screen=False,
clear_menu_on_exit=False,
preset_values=self.bottom_item,
preset_values=self._bottom_item,
show_search_hint=False
).run()
self.action = choice.value
if self.action and self.action != self.cancel_action:
if self.action and self.action != self._cancel_action:
self._data = self.exec_action(self._data)
"""
@ -243,7 +244,7 @@ class ListManager:
can define alternate action list or customize the list for each item.
Executed after any item is selected, contained in self.target
"""
return self.base_actions
return self._base_actions
def exec_action(self, data: Any):
"""

View File

@ -0,0 +1,68 @@
from dataclasses import dataclass
from typing import List, Any, Dict
@dataclass
class Subvolume:
name: str
mountpoint: str
compress: bool = False
nodatacow: bool = False
def display(self) -> str:
options_str = ','.join(self.options)
return f'{_("Subvolume")}: {self.name:15} {_("Mountpoint")}: {self.mountpoint:20} {_("Options")}: {options_str}'
@property
def options(self) -> List[str]:
options = [
'compress' if self.compress else '',
'nodatacow' if self.nodatacow else ''
]
return [o for o in options if len(o)]
def json(self) -> Dict[str, Any]:
return {
'name': self.name,
'mountpoint': self.mountpoint,
'compress': self.compress,
'nodatacow': self.nodatacow
}
@classmethod
def _parse(cls, config_subvolumes: List[Dict[str, Any]]) -> List['Subvolume']:
subvolumes = []
for entry in config_subvolumes:
if not entry.get('name', None) or not entry.get('mountpoint', None):
continue
subvolumes.append(
Subvolume(
entry['name'],
entry['mountpoint'],
entry.get('compress', False),
entry.get('nodatacow', False)
)
)
return subvolumes
@classmethod
def _parse_backwards_compatible(cls, config_subvolumes) -> List['Subvolume']:
subvolumes = []
for name, mountpoint in config_subvolumes.items():
if not name or not mountpoint:
continue
subvolumes.append(Subvolume(name, mountpoint))
return subvolumes
@classmethod
def parse_arguments(cls, config_subvolumes: Any) -> List['Subvolume']:
if isinstance(config_subvolumes, list):
return cls._parse(config_subvolumes)
elif isinstance(config_subvolumes, dict):
return cls._parse_backwards_compatible(config_subvolumes)
raise ValueError('Unknown disk layout btrfs subvolume format')

View File

@ -27,8 +27,10 @@ class User:
}
def display(self) -> str:
strength = PasswordStrength.strength(self.password)
password = '*' * len(self.password) + f' ({strength.value})'
password = '*' * (len(self.password) if self.password else 0)
if password:
strength = PasswordStrength.strength(self.password)
password += f' ({strength.value})'
return f'{_("Username")}: {self.username:16} {_("Password")}: {password:20} sudo: {str(self.sudo)}'
@classmethod

View File

@ -351,18 +351,16 @@ def manage_new_and_existing_partitions(block_device: 'BlockDevice') -> Dict[str,
if partition is not None:
if not block_device_struct["partitions"][partition].get('btrfs', {}):
block_device_struct["partitions"][partition]['btrfs'] = {}
if not block_device_struct["partitions"][partition]['btrfs'].get('subvolumes', {}):
block_device_struct["partitions"][partition]['btrfs']['subvolumes'] = {}
if not block_device_struct["partitions"][partition]['btrfs'].get('subvolumes', []):
block_device_struct["partitions"][partition]['btrfs']['subvolumes'] = []
prev = block_device_struct["partitions"][partition]['btrfs']['subvolumes']
result = SubvolumeList(_("Manage btrfs subvolumes for current partition"),prev).run()
if result:
block_device_struct["partitions"][partition]['btrfs']['subvolumes'] = result
else:
del block_device_struct["partitions"][partition]['btrfs']
result = SubvolumeList(_("Manage btrfs subvolumes for current partition"), prev).run()
block_device_struct["partitions"][partition]['btrfs']['subvolumes'] = result
return block_device_struct
def select_encrypted_partitions(
title :str,
partitions :List[Partition],

View File

@ -1,155 +1,94 @@
from typing import Dict, List
from typing import Dict, List, Optional, Any, TYPE_CHECKING
from ..menu.list_manager import ListManager
from ..menu.menu import MenuSelectionType
from ..menu.selection_menu import Selector, GeneralMenu
from ..menu.text_input import TextInput
from ..menu import Menu
from ..models.subvolume import Subvolume
if TYPE_CHECKING:
_: Any
"""
UI classes
"""
class SubvolumeList(ListManager):
def __init__(self,prompt,list):
self.ObjectNullAction = None # str(_('Add'))
self.ObjectDefaultAction = str(_('Add'))
super().__init__(prompt,list,None,self.ObjectNullAction,self.ObjectDefaultAction)
def __init__(self, prompt: str, current_volumes: List[Subvolume]):
self._actions = [
str(_('Add subvolume')),
str(_('Edit subvolume')),
str(_('Delete subvolume'))
]
super().__init__(prompt, current_volumes, self._actions, self._actions[0])
def reformat(self, data: Dict) -> Dict:
def presentation(key :str, value :Dict):
text = _(" Subvolume :{:16}").format(key)
if isinstance(value,str):
text += _(" mounted at {:16}").format(value)
else:
if value.get('mountpoint'):
text += _(" mounted at {:16}").format(value['mountpoint'])
else:
text += (' ' * 28)
if value.get('options',[]):
text += _(" with option {}").format(', '.join(value['options']))
return text
formatted = {presentation(k, v): k for k, v in data.items()}
return {k: v for k, v in sorted(formatted.items(), key=lambda e: e[0])}
def reformat(self, data: List[Subvolume]) -> Dict[str, Subvolume]:
return {e.display(): e for e in data}
def action_list(self):
return super().action_list()
active_user = self.target if self.target else None
def exec_action(self, data: Dict):
if self.target:
origkey, origval = list(self.target.items())[0]
if active_user is None:
return [self._actions[0]]
else:
origkey = None
return self._actions[1:]
if self.action == str(_('Delete')):
del data[origkey]
else:
if self.action == str(_('Add')):
self.target = {}
print(_('\n Fill the desired values for a new subvolume \n'))
with SubvolumeMenu(self.target,self.action) as add_menu:
for elem in ['name','mountpoint','options']:
add_menu.exec_option(elem)
else:
SubvolumeMenu(self.target,self.action).run()
def _prompt_options(self, editing: Optional[Subvolume] = None) -> List[str]:
preset_options = []
if editing:
preset_options = editing.options
data.update(self.target)
return data
class SubvolumeMenu(GeneralMenu):
def __init__(self,parameters,action=None):
self.data = parameters
self.action = action
self.ds = {}
self.ds['name'] = None
self.ds['mountpoint'] = None
self.ds['options'] = None
if self.data:
origkey,origval = list(self.data.items())[0]
self.ds['name'] = origkey
if isinstance(origval,str):
self.ds['mountpoint'] = origval
else:
self.ds['mountpoint'] = self.data[origkey].get('mountpoint')
self.ds['options'] = self.data[origkey].get('options')
super().__init__(data_store=self.ds)
def _setup_selection_menu_options(self):
self._menu_options['name'] = Selector(
str(_('Subvolume name ')),
self._select_subvolume_name if not self.action or self.action in (str(_('Add')), str(_('Copy'))) else None,
mandatory=True,
enabled=True)
self._menu_options['mountpoint'] = Selector(
str(_('Subvolume mountpoint')),
self._select_subvolume_mount_point if not self.action or self.action in (str(_('Add')),str(_('Edit'))) else None,
enabled=True)
self._menu_options['options'] = Selector(
str(_('Subvolume options')),
self._select_subvolume_options if not self.action or self.action in (str(_('Add')),str(_('Edit'))) else None,
enabled=True)
self._menu_options['save'] = Selector(
str(_('Save')),
exec_func=lambda n,v:True,
enabled=True)
self._menu_options['cancel'] = Selector(
str(_('Cancel')),
# func = lambda pre:True,
exec_func=lambda n,v:self.fast_exit(n),
enabled=True)
self.cancel_action = 'cancel'
self.save_action = 'save'
self.bottom_list = [self.save_action,self.cancel_action]
def fast_exit(self,accion):
if self.option(accion).get_selection():
for item in self.list_options():
if self.option(item).is_mandatory():
self.option(item).set_mandatory(False)
return True
def exit_callback(self):
# we exit without moving data
if self.option(self.cancel_action).get_selection():
return
if not self.ds['name']:
return
else:
key = self.ds['name']
value = {}
if self.ds['mountpoint']:
value['mountpoint'] = self.ds['mountpoint']
if self.ds['options']:
value['options'] = self.ds['options']
self.data.update({key : value})
def _select_subvolume_name(self,value):
return TextInput(str(_("Subvolume name :")),value).run()
def _select_subvolume_mount_point(self,value):
return TextInput(str(_("Select a mount point :")),value).run()
def _select_subvolume_options(self,value) -> List[str]:
# def __init__(self, title, p_options, skip=True, multi=False, default_option=None, sort=True):
choice = Menu(
str(_("Select the desired subvolume options ")),
['nodatacow','compress'],
skip=True,
preset_values=value,
preset_values=preset_options,
multi=True
).run()
if choice.type_ == MenuSelectionType.Selection:
return choice.value
return choice.value # type: ignore
return []
def _add_subvolume(self, editing: Optional[Subvolume] = None) -> Optional[Subvolume]:
name = TextInput(f'\n\n{_("Subvolume name")}: ', editing.name if editing else '').run()
if not name:
return None
mountpoint = TextInput(f'\n{_("Subvolume mountpoint")}: ', editing.mountpoint if editing else '').run()
if not mountpoint:
return None
options = self._prompt_options(editing)
subvolume = Subvolume(name, mountpoint)
subvolume.compress = 'compress' in options
subvolume.nodatacow = 'nodatacow' in options
return subvolume
def exec_action(self, data: List[Subvolume]) -> List[Subvolume]:
if self.target:
active_subvolume = self.target
else:
active_subvolume = None
if self.action == self._actions[0]: # add
new_subvolume = self._add_subvolume()
if new_subvolume is not None:
# in case a user with the same username as an existing user
# was created we'll replace the existing one
data = [d for d in data if d.name != new_subvolume.name]
data += [new_subvolume]
elif self.action == self._actions[1]: # edit subvolume
new_subvolume = self._add_subvolume(active_subvolume)
if new_subvolume is not None:
# we'll remove the original subvolume and add the modified version
data = [d for d in data if d.name != active_subvolume.name and d.name != new_subvolume.name]
data += [new_subvolume]
elif self.action == self._actions[2]: # delete
data = [d for d in data if d != active_subvolume]
return data