Tweaked get_mount_info() and mount_subvolume(). mount info now returns the path it found after traversal. mount_subvolume will no longer assume installation.target is of pathlib.Path and converts it if it isn't.

This commit is contained in:
Anton Hvornum 2021-10-30 17:15:33 +02:00
parent 4bc164cccc
commit 3ee1a5c18e
No known key found for this signature in database
GPG Key ID: F1234C5BA67C59DF
2 changed files with 34 additions and 16 deletions

View File

@ -6,29 +6,38 @@ from ..exceptions import DiskError
from ..general import SysCommand
from ..output import log
def mount_subvolume(installation, location :Union[pathlib.Path, str], force=False) -> bool:
def mount_subvolume(installation, subvolume_location :Union[pathlib.Path, str], force=False) -> bool:
"""
This function uses mount to mount a subvolume on a given device, at a given location with a given subvolume name.
@installation: archinstall.Installer instance
@location: a localized string or path inside the installation / or /boot for instance without specifying /mnt/boot
@subvolume_location: a localized string or path inside the installation / or /boot for instance without specifying /mnt/boot
@force: overrides the check for weither or not the subvolume mountpoint is empty or not
"""
installation_mountpoint = installation.target
if type(installation_mountpoint) == str:
installation_mountpoint = pathlib.Path(installation_mountpoint)
# Set up the required physical structure
if type(location) == str:
location = pathlib.Path(location)
if type(subvolume_location) == str:
subvolume_location = pathlib.Path(subvolume_location)
if not (installation.target/location).exists():
(installation.target/location).mkdir(parents=True)
target = installation_mountpoint / subvolume_location.relative_to(subvolume_location.anchor)
if glob.glob(str(installation.target/location/'*')) and force is False:
raise DiskError(f"Cannot mount subvolume to {installation.target/location} because it contains data (non-empty folder target)")
if not (target).exists():
(target).mkdir(parents=True)
if glob.glob(str(target/'*')) and force is False:
raise DiskError(f"Cannot mount subvolume to {target} because it contains data (non-empty folder target)")
log(f"Mounting {installation.target/location} as a subvolume", level=logging.INFO)
log(f"Mounting {target} as a subvolume", level=logging.INFO)
# Mount the logical volume to the physical structure
mount_location = get_mount_info(installation.target/location, traverse=True)['source']
SysCommand(f"umount {mount_location}")
return SysCommand(f"mount {mount_location} {installation.target}/{str(location)} -o subvol=@/{str(location)}").exit_code == 0
mountpoint_device, mountpoint_device_real_path = get_mount_info(target, traverse=True, return_real_path=True)['source']
if mountpoint_device_real_path == str(target):
log(f"Unmounting non-subvolume {mountpoint_device} previously mounted at {target}")
SysCommand(f"umount {mountpoint_device}")
return SysCommand(f"mount {mountpoint_device} {target} -o subvol=@{subvolume_location}").exit_code == 0
def create_subvolume(installation, location :Union[pathlib.Path, str]) -> bool:
"""

View File

@ -117,7 +117,7 @@ def harddrive(size=None, model=None, fuzzy=False):
return collection[drive]
def get_mount_info(path :Union[pathlib.Path, str], traverse=False) -> dict:
def get_mount_info(path :Union[pathlib.Path, str], traverse=False, return_real_path=False) -> dict:
for traversal in list(map(str, [str(path)] + list(pathlib.Path(str(path)).parents))):
try:
log(f"Getting mount information at location {traversal}", level=logging.INFO)
@ -131,16 +131,25 @@ def get_mount_info(path :Union[pathlib.Path, str], traverse=False) -> dict:
break
if not output:
return {}
if return_real_path:
return {}, None
else:
return {}
output = json.loads(output)
if 'filesystems' in output:
if len(output['filesystems']) > 1:
raise DiskError(f"Path '{path}' contains multiple mountpoints: {output['filesystems']}")
return output['filesystems'][0]
if return_real_path:
return output['filesystems'][0], traversal
else:
return output['filesystems'][0]
return {}
if return_real_path:
return {}, traversal
else:
return {}
def get_partitions_in_use(mountpoint) -> list: