183 lines
8.4 KiB
Python
183 lines
8.4 KiB
Python
from __future__ import annotations
|
|
import pathlib
|
|
import glob
|
|
import logging
|
|
import re
|
|
from typing import Union, Dict, TYPE_CHECKING, Any, Iterator
|
|
|
|
# https://stackoverflow.com/a/39757388/929999
|
|
if TYPE_CHECKING:
|
|
from ...installer import Installer
|
|
|
|
from .btrfs_helpers import (
|
|
subvolume_info_from_path as subvolume_info_from_path,
|
|
find_parent_subvolume as find_parent_subvolume,
|
|
setup_subvolumes as setup_subvolumes,
|
|
mount_subvolume as mount_subvolume
|
|
)
|
|
from .btrfssubvolume import BtrfsSubvolume as BtrfsSubvolume
|
|
from .btrfspartition import BTRFSPartition as BTRFSPartition
|
|
|
|
from ..helpers import get_mount_info
|
|
from ...exceptions import DiskError, Deprecated
|
|
from ...general import SysCommand
|
|
from ...output import log
|
|
from ...exceptions import SysCallError
|
|
|
|
def get_subvolume_info(path :pathlib.Path) -> Dict[str, Any]:
|
|
try:
|
|
output = SysCommand(f"btrfs subvol show {path}").decode()
|
|
except SysCallError as error:
|
|
print('Error:', error)
|
|
|
|
result = {}
|
|
for line in output.replace('\r\n', '\n').split('\n'):
|
|
if ':' in line:
|
|
key, val = line.replace('\t', '').split(':', 1)
|
|
result[key.strip().lower().replace(' ', '_')] = val.strip()
|
|
|
|
return result
|
|
|
|
def create_subvolume(installation :Installer, subvolume_location :Union[pathlib.Path, str]) -> bool:
|
|
"""
|
|
This function uses btrfs to create a subvolume.
|
|
|
|
@installation: archinstall.Installer instance
|
|
@subvolume_location: a localized string or path inside the installation / or /boot for instance without specifying /mnt/boot
|
|
"""
|
|
|
|
installation_mountpoint = installation.target
|
|
if type(installation_mountpoint) == str:
|
|
installation_mountpoint = pathlib.Path(installation_mountpoint)
|
|
# Set up the required physical structure
|
|
if type(subvolume_location) == str:
|
|
subvolume_location = pathlib.Path(subvolume_location)
|
|
|
|
target = installation_mountpoint / subvolume_location.relative_to(subvolume_location.anchor)
|
|
|
|
# Difference from mount_subvolume:
|
|
# We only check if the parent exists, since we'll run in to "target path already exists" otherwise
|
|
if not target.parent.exists():
|
|
target.parent.mkdir(parents=True)
|
|
|
|
if glob.glob(str(target / '*')):
|
|
raise DiskError(f"Cannot create subvolume at {target} because it contains data (non-empty folder target)")
|
|
|
|
# Remove the target if it exists
|
|
if target.exists():
|
|
target.rmdir()
|
|
|
|
log(f"Creating a subvolume on {target}", level=logging.INFO)
|
|
if (cmd := SysCommand(f"btrfs subvolume create {target}")).exit_code != 0:
|
|
raise DiskError(f"Could not create a subvolume at {target}: {cmd}")
|
|
|
|
def _has_option(option :str,options :list) -> bool:
|
|
""" auxiliary routine to check if an option is present in a list.
|
|
we check if the string appears in one of the options, 'cause it can appear in several forms (option, option=val,...)
|
|
"""
|
|
if not options:
|
|
return False
|
|
|
|
for item in options:
|
|
if option in item:
|
|
return True
|
|
|
|
return False
|
|
|
|
def manage_btrfs_subvolumes(installation :Installer,
|
|
partition :Dict[str, str],) -> list:
|
|
|
|
raise Deprecated("Use setup_subvolumes() instead.")
|
|
|
|
from copy import deepcopy
|
|
""" we do the magic with subvolumes in a centralized place
|
|
parameters:
|
|
* the installation object
|
|
* the partition dictionary entry which represents the physical partition
|
|
returns
|
|
* mountpoinst, the list which contains all the "new" partititon to be mounted
|
|
|
|
We expect the partition has been mounted as / , and it to be unmounted after the processing
|
|
Then we create all the subvolumes inside btrfs as demand
|
|
We clone then, both the partition dictionary and the object inside it and adapt it to the subvolume needs
|
|
Then we return a list of "new" partitions to be processed as "normal" partitions
|
|
# TODO For encrypted devices we need some special processing prior to it
|
|
"""
|
|
# We process each of the pairs <subvolume name: mount point | None | mount info dict>
|
|
# th mount info dict has an entry for the path of the mountpoint (named 'mountpoint') and 'options' which is a list
|
|
# of mount options (or similar used by brtfs)
|
|
mountpoints = []
|
|
subvolumes = partition['btrfs']['subvolumes']
|
|
for name, right_hand in subvolumes.items():
|
|
try:
|
|
# we normalize the subvolume name (getting rid of slash at the start if exists. In our implementation has no semantic load - every subvolume is created from the top of the hierarchy- and simplifies its further use
|
|
if name.startswith('/'):
|
|
name = name[1:]
|
|
# renormalize the right hand.
|
|
location = None
|
|
subvol_options = []
|
|
# no contents, so it is not to be mounted
|
|
if not right_hand:
|
|
location = None
|
|
# just a string. per backward compatibility the mount point
|
|
elif isinstance(right_hand,str):
|
|
location = right_hand
|
|
# a dict. two elements 'mountpoint' (obvious) and and a mount options list ¿?
|
|
elif isinstance(right_hand,dict):
|
|
location = right_hand.get('mountpoint',None)
|
|
subvol_options = right_hand.get('options',[])
|
|
# we create the subvolume
|
|
create_subvolume(installation,name)
|
|
# Make the nodatacow processing now
|
|
# It will be the main cause of creation of subvolumes which are not to be mounted
|
|
# it is not an options which can be established by subvolume (but for whole file systems), and can be
|
|
# set up via a simple attribute change in a directory (if empty). And here the directories are brand new
|
|
if 'nodatacow' in subvol_options:
|
|
if (cmd := SysCommand(f"chattr +C {installation.target}/{name}")).exit_code != 0:
|
|
raise DiskError(f"Could not set nodatacow attribute at {installation.target}/{name}: {cmd}")
|
|
# entry is deleted so nodatacow doesn't propagate to the mount options
|
|
del subvol_options[subvol_options.index('nodatacow')]
|
|
# Make the compress processing now
|
|
# it is not an options which can be established by subvolume (but for whole file systems), and can be
|
|
# set up via a simple attribute change in a directory (if empty). And here the directories are brand new
|
|
# in this way only zstd compression is activaded
|
|
# TODO WARNING it is not clear if it should be a standard feature, so it might need to be deactivated
|
|
if 'compress' in subvol_options:
|
|
if not _has_option('compress',partition.get('filesystem',{}).get('mount_options',[])):
|
|
if (cmd := SysCommand(f"chattr +c {installation.target}/{name}")).exit_code != 0:
|
|
raise DiskError(f"Could not set compress attribute at {installation.target}/{name}: {cmd}")
|
|
# entry is deleted so compress doesn't propagate to the mount options
|
|
del subvol_options[subvol_options.index('compress')]
|
|
# END compress processing.
|
|
# we do not mount if THE basic partition will be mounted or if we exclude explicitly this subvolume
|
|
if not partition['mountpoint'] and location is not None:
|
|
# we begin to create a fake partition entry. First we copy the original -the one that corresponds to
|
|
# the primary partition. We make a deepcopy to avoid altering the original content in any case
|
|
fake_partition = deepcopy(partition)
|
|
# we start to modify entries in the "fake partition" to match the needs of the subvolumes
|
|
# to avoid any chance of entering in a loop (not expected) we delete the list of subvolumes in the copy
|
|
del fake_partition['btrfs']
|
|
fake_partition['encrypted'] = False
|
|
fake_partition['generate-encryption-key-file'] = False
|
|
# Mount destination. As of now the right hand part
|
|
fake_partition['mountpoint'] = location
|
|
# we load the name in an attribute called subvolume, but i think it is not needed anymore, 'cause the mount logic uses a different path.
|
|
fake_partition['subvolume'] = name
|
|
# here we add the special mount options for the subvolume, if any.
|
|
# if the original partition['options'] is not a list might give trouble
|
|
if fake_partition.get('filesystem',{}).get('mount_options',[]):
|
|
fake_partition['filesystem']['mount_options'].extend(subvol_options)
|
|
else:
|
|
fake_partition['filesystem']['mount_options'] = subvol_options
|
|
# Here comes the most exotic part. The dictionary attribute 'device_instance' contains an instance of Partition. This instance will be queried along the mount process at the installer.
|
|
# As the rest will query there the path of the "partition" to be mounted, we feed it with the bind name needed to mount subvolumes
|
|
# As we made a deepcopy we have a fresh instance of this object we can manipulate problemless
|
|
fake_partition['device_instance'].path = f"{partition['device_instance'].path}[/{name}]"
|
|
|
|
# Well, now that this "fake partition" is ready, we add it to the list of the ones which are to be mounted,
|
|
# as "normal" ones
|
|
mountpoints.append(fake_partition)
|
|
except Exception as e:
|
|
raise e
|
|
return mountpoints
|