356 lines
12 KiB
Python
356 lines
12 KiB
Python
from __future__ import annotations
|
|
|
|
import re
|
|
from pathlib import Path
|
|
from typing import Any, Dict, TYPE_CHECKING, List, Optional, Tuple
|
|
|
|
from .device_model import PartitionModification, FilesystemType, BDevice, Size, Unit, PartitionType, PartitionFlag, \
|
|
ModificationStatus, DeviceGeometry
|
|
from ..menu import Menu, ListManager, MenuSelection, TextInput
|
|
from ..output import FormattedOutput, warn
|
|
from .subvolume_menu import SubvolumeMenu
|
|
|
|
if TYPE_CHECKING:
|
|
_: Any
|
|
|
|
|
|
class PartitioningList(ListManager):
|
|
"""
|
|
subclass of ListManager for the managing of user accounts
|
|
"""
|
|
def __init__(self, prompt: str, device: BDevice, device_partitions: List[PartitionModification]):
|
|
self._device = device
|
|
self._actions = {
|
|
'create_new_partition': str(_('Create a new partition')),
|
|
'suggest_partition_layout': str(_('Suggest partition layout')),
|
|
'remove_added_partitions': str(_('Remove all newly added partitions')),
|
|
'assign_mountpoint': str(_('Assign mountpoint')),
|
|
'mark_formatting': str(_('Mark/Unmark to be formatted (wipes data)')),
|
|
'mark_bootable': str(_('Mark/Unmark as bootable')),
|
|
'set_filesystem': str(_('Change filesystem')),
|
|
'btrfs_mark_compressed': str(_('Mark/Unmark as compressed')), # btrfs only
|
|
'btrfs_set_subvolumes': str(_('Set subvolumes')), # btrfs only
|
|
'delete_partition': str(_('Delete partition'))
|
|
}
|
|
|
|
display_actions = list(self._actions.values())
|
|
super().__init__(prompt, device_partitions, display_actions[:2], display_actions[3:])
|
|
|
|
def reformat(self, data: List[PartitionModification]) -> Dict[str, Optional[PartitionModification]]:
|
|
table = FormattedOutput.as_table(data)
|
|
rows = table.split('\n')
|
|
|
|
# these are the header rows of the table and do not map to any User obviously
|
|
# we're adding 2 spaces as prefix because the menu selector '> ' will be put before
|
|
# the selectable rows so the header has to be aligned
|
|
display_data: Dict[str, Optional[PartitionModification]] = {f' {rows[0]}': None, f' {rows[1]}': None}
|
|
|
|
for row, user in zip(rows[2:], data):
|
|
row = row.replace('|', '\\|')
|
|
display_data[row] = user
|
|
|
|
return display_data
|
|
|
|
def selected_action_display(self, partition: PartitionModification) -> str:
|
|
return str(_('Partition'))
|
|
|
|
def filter_options(self, selection: PartitionModification, options: List[str]) -> List[str]:
|
|
not_filter = []
|
|
|
|
# only display formatting if the partition exists already
|
|
if not selection.exists():
|
|
not_filter += [self._actions['mark_formatting']]
|
|
else:
|
|
# only allow these options if the existing partition
|
|
# was marked as formatting, otherwise we run into issues where
|
|
# 1. select a new fs -> potentially mark as wipe now
|
|
# 2. Switch back to old filesystem -> should unmark wipe now, but
|
|
# how do we know it was the original one?
|
|
not_filter += [
|
|
self._actions['set_filesystem'],
|
|
self._actions['assign_mountpoint'],
|
|
self._actions['mark_bootable'],
|
|
self._actions['btrfs_mark_compressed'],
|
|
self._actions['btrfs_set_subvolumes']
|
|
]
|
|
|
|
# non btrfs partitions shouldn't get btrfs options
|
|
if selection.fs_type != FilesystemType.Btrfs:
|
|
not_filter += [self._actions['btrfs_mark_compressed'], self._actions['btrfs_set_subvolumes']]
|
|
else:
|
|
not_filter += [self._actions['assign_mountpoint']]
|
|
|
|
return [o for o in options if o not in not_filter]
|
|
|
|
def handle_action(
|
|
self,
|
|
action: str,
|
|
entry: Optional[PartitionModification],
|
|
data: List[PartitionModification]
|
|
) -> List[PartitionModification]:
|
|
action_key = [k for k, v in self._actions.items() if v == action][0]
|
|
|
|
match action_key:
|
|
case 'create_new_partition':
|
|
new_partition = self._create_new_partition()
|
|
data += [new_partition]
|
|
case 'suggest_partition_layout':
|
|
new_partitions = self._suggest_partition_layout(data)
|
|
if len(new_partitions) > 0:
|
|
data = new_partitions
|
|
case 'remove_added_partitions':
|
|
choice = self._reset_confirmation()
|
|
if choice.value == Menu.yes():
|
|
data = [part for part in data if part.is_exists_or_modify()]
|
|
case 'assign_mountpoint' if entry:
|
|
entry.mountpoint = self._prompt_mountpoint()
|
|
if entry.mountpoint == Path('/boot'):
|
|
entry.set_flag(PartitionFlag.Boot)
|
|
case 'mark_formatting' if entry:
|
|
self._prompt_formatting(entry)
|
|
case 'mark_bootable' if entry:
|
|
entry.invert_flag(PartitionFlag.Boot)
|
|
case 'set_filesystem' if entry:
|
|
fs_type = self._prompt_partition_fs_type()
|
|
if fs_type:
|
|
entry.fs_type = fs_type
|
|
# btrfs subvolumes will define mountpoints
|
|
if fs_type == FilesystemType.Btrfs:
|
|
entry.mountpoint = None
|
|
case 'btrfs_mark_compressed' if entry:
|
|
self._set_compressed(entry)
|
|
case 'btrfs_set_subvolumes' if entry:
|
|
self._set_btrfs_subvolumes(entry)
|
|
case 'delete_partition' if entry:
|
|
data = self._delete_partition(entry, data)
|
|
|
|
return data
|
|
|
|
def _delete_partition(
|
|
self,
|
|
entry: PartitionModification,
|
|
data: List[PartitionModification]
|
|
) -> List[PartitionModification]:
|
|
if entry.is_exists_or_modify():
|
|
entry.status = ModificationStatus.Delete
|
|
return data
|
|
else:
|
|
return [d for d in data if d != entry]
|
|
|
|
def _set_compressed(self, partition: PartitionModification):
|
|
compression = 'compress=zstd'
|
|
|
|
if compression in partition.mount_options:
|
|
partition.mount_options = [o for o in partition.mount_options if o != compression]
|
|
else:
|
|
partition.mount_options.append(compression)
|
|
|
|
def _set_btrfs_subvolumes(self, partition: PartitionModification):
|
|
partition.btrfs_subvols = SubvolumeMenu(
|
|
_("Manage btrfs subvolumes for current partition"),
|
|
partition.btrfs_subvols
|
|
).run()
|
|
|
|
def _prompt_formatting(self, partition: PartitionModification):
|
|
# an existing partition can toggle between Exist or Modify
|
|
if partition.is_modify():
|
|
partition.status = ModificationStatus.Exist
|
|
return
|
|
elif partition.exists():
|
|
partition.status = ModificationStatus.Modify
|
|
|
|
# If we mark a partition for formatting, but the format is CRYPTO LUKS, there's no point in formatting it really
|
|
# without asking the user which inner-filesystem they want to use. Since the flag 'encrypted' = True is already set,
|
|
# it's safe to change the filesystem for this partition.
|
|
if partition.fs_type == FilesystemType.Crypto_luks:
|
|
prompt = str(_('This partition is currently encrypted, to format it a filesystem has to be specified'))
|
|
fs_type = self._prompt_partition_fs_type(prompt)
|
|
partition.fs_type = fs_type
|
|
|
|
if fs_type == FilesystemType.Btrfs:
|
|
partition.mountpoint = None
|
|
|
|
def _prompt_mountpoint(self) -> Path:
|
|
header = str(_('Partition mount-points are relative to inside the installation, the boot would be /boot as an example.')) + '\n'
|
|
header += str(_('If mountpoint /boot is set, then the partition will also be marked as bootable.')) + '\n'
|
|
prompt = str(_('Mountpoint: '))
|
|
|
|
print(header)
|
|
|
|
while True:
|
|
value = TextInput(prompt).run().strip()
|
|
|
|
if value:
|
|
mountpoint = Path(value)
|
|
break
|
|
|
|
return mountpoint
|
|
|
|
def _prompt_partition_fs_type(self, prompt: str = '') -> FilesystemType:
|
|
options = {fs.value: fs for fs in FilesystemType if fs != FilesystemType.Crypto_luks}
|
|
|
|
prompt = prompt + '\n' + str(_('Enter a desired filesystem type for the partition'))
|
|
choice = Menu(prompt, options, sort=False, skip=False).run()
|
|
return options[choice.single_value]
|
|
|
|
def _validate_value(
|
|
self,
|
|
sector_size: Size,
|
|
total_size: Size,
|
|
value: str
|
|
) -> Optional[Size]:
|
|
match = re.match(r'([0-9]+)([a-zA-Z|%]*)', value, re.I)
|
|
|
|
if match:
|
|
value, unit = match.groups()
|
|
|
|
if unit == '%':
|
|
unit = Unit.Percent.name
|
|
|
|
if unit and unit not in Unit.get_all_units():
|
|
return None
|
|
|
|
unit = Unit[unit] if unit else Unit.sectors
|
|
return Size(int(value), unit, sector_size, total_size)
|
|
|
|
return None
|
|
|
|
def _enter_size(
|
|
self,
|
|
sector_size: Size,
|
|
total_size: Size,
|
|
prompt: str,
|
|
default: Size
|
|
) -> Size:
|
|
while True:
|
|
value = TextInput(prompt).run().strip()
|
|
|
|
size: Optional[Size] = None
|
|
|
|
if not value:
|
|
size = default
|
|
else:
|
|
size = self._validate_value(sector_size, total_size, value)
|
|
|
|
if size:
|
|
return size
|
|
|
|
warn(f'Invalid value: {value}')
|
|
|
|
def _prompt_size(self) -> Tuple[Size, Size]:
|
|
device_info = self._device.device_info
|
|
|
|
text = str(_('Current free sectors on device {}:')).format(device_info.path) + '\n\n'
|
|
free_space_table = FormattedOutput.as_table(device_info.free_space_regions)
|
|
prompt = text + free_space_table + '\n'
|
|
|
|
total_sectors = device_info.total_size.format_size(Unit.sectors, device_info.sector_size)
|
|
total_bytes = device_info.total_size.format_size(Unit.B)
|
|
|
|
prompt += str(_('Total: {} / {}')).format(total_sectors, total_bytes) + '\n\n'
|
|
prompt += str(_('All entered values can be suffixed with a unit: B, KB, KiB, MB, MiB...')) + '\n'
|
|
prompt += str(_('If no unit is provided, the value is interpreted as sectors')) + '\n'
|
|
print(prompt)
|
|
|
|
largest_free_area: DeviceGeometry = max(device_info.free_space_regions, key=lambda r: r.get_length())
|
|
|
|
# prompt until a valid start sector was entered
|
|
default_start = Size(largest_free_area.start, Unit.sectors, device_info.sector_size)
|
|
start_prompt = str(_('Enter start (default: sector {}): ')).format(largest_free_area.start)
|
|
start_size = self._enter_size(
|
|
device_info.sector_size,
|
|
device_info.total_size,
|
|
start_prompt,
|
|
default_start
|
|
)
|
|
|
|
if start_size.value == largest_free_area.start:
|
|
end_size = Size(largest_free_area.end, Unit.sectors, device_info.sector_size)
|
|
else:
|
|
end_size = Size(100, Unit.Percent, total_size=device_info.total_size)
|
|
|
|
# prompt until valid end sector was entered
|
|
end_prompt = str(_('Enter end (default: {}): ')).format(end_size.as_text())
|
|
end_size = self._enter_size(
|
|
device_info.sector_size,
|
|
device_info.total_size,
|
|
end_prompt,
|
|
end_size
|
|
)
|
|
|
|
return start_size, end_size
|
|
|
|
def _create_new_partition(self) -> PartitionModification:
|
|
fs_type = self._prompt_partition_fs_type()
|
|
|
|
start_size, end_size = self._prompt_size()
|
|
length = end_size - start_size
|
|
|
|
# new line for the next prompt
|
|
print()
|
|
|
|
mountpoint = None
|
|
if fs_type != FilesystemType.Btrfs:
|
|
mountpoint = self._prompt_mountpoint()
|
|
|
|
partition = PartitionModification(
|
|
status=ModificationStatus.Create,
|
|
type=PartitionType.Primary,
|
|
start=start_size,
|
|
length=length,
|
|
fs_type=fs_type,
|
|
mountpoint=mountpoint
|
|
)
|
|
|
|
if partition.mountpoint == Path('/boot'):
|
|
partition.set_flag(PartitionFlag.Boot)
|
|
|
|
return partition
|
|
|
|
def _reset_confirmation(self) -> MenuSelection:
|
|
prompt = str(_('This will remove all newly added partitions, continue?'))
|
|
choice = Menu(prompt, Menu.yes_no(), default_option=Menu.no(), skip=False).run()
|
|
return choice
|
|
|
|
def _suggest_partition_layout(self, data: List[PartitionModification]) -> List[PartitionModification]:
|
|
# if modifications have been done already, inform the user
|
|
# that this operation will erase those modifications
|
|
if any([not entry.exists() for entry in data]):
|
|
choice = self._reset_confirmation()
|
|
if choice.value == Menu.no():
|
|
return []
|
|
|
|
from ..interactions.disk_conf import suggest_single_disk_layout
|
|
|
|
device_modification = suggest_single_disk_layout(self._device)
|
|
return device_modification.partitions
|
|
|
|
|
|
def manual_partitioning(
|
|
device: BDevice,
|
|
prompt: str = '',
|
|
preset: List[PartitionModification] = []
|
|
) -> List[PartitionModification]:
|
|
if not prompt:
|
|
prompt = str(_('Partition management: {}')).format(device.device_info.path) + '\n'
|
|
prompt += str(_('Total length: {}')).format(device.device_info.total_size.format_size(Unit.MiB))
|
|
|
|
manual_preset = []
|
|
|
|
if not preset:
|
|
# we'll display the existing partitions of the device
|
|
for partition in device.partition_infos:
|
|
manual_preset.append(
|
|
PartitionModification.from_existing_partition(partition)
|
|
)
|
|
else:
|
|
manual_preset = preset
|
|
|
|
menu_list = PartitioningList(prompt, device, manual_preset)
|
|
partitions: List[PartitionModification] = menu_list.run()
|
|
|
|
if menu_list.is_last_choice_cancel():
|
|
return preset
|
|
|
|
return partitions
|