Move mount() to disk.utils (#4232)

This commit is contained in:
codefiles 2026-02-15 21:08:59 -05:00 committed by GitHub
parent 9bd2131792
commit 3b6f7db942
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
3 changed files with 50 additions and 49 deletions

View File

@ -44,6 +44,7 @@ from .utils import (
find_lsblk_info,
get_all_lsblk_info,
get_lsblk_info,
mount,
umount,
)
@ -218,7 +219,7 @@ class DeviceHandler:
subvol_infos: list[_BtrfsSubvolumeInfo] = []
if not lsblk_info.mountpoint:
self.mount(dev_path, self._TMP_BTRFS_MOUNT, create_target_mountpoint=True)
mount(dev_path, self._TMP_BTRFS_MOUNT, create_target_mountpoint=True)
mountpoint = self._TMP_BTRFS_MOUNT
else:
# when multiple subvolumes are mounted then the lsblk output may look like
@ -622,7 +623,7 @@ class DeviceHandler:
) -> None:
info(f'Creating subvolumes: {path}')
self.mount(path, self._TMP_BTRFS_MOUNT, create_target_mountpoint=True)
mount(path, self._TMP_BTRFS_MOUNT, create_target_mountpoint=True)
for sub_vol in sorted(btrfs_subvols, key=lambda x: x.name):
debug(f'Creating subvolume: {sub_vol.name}')
@ -671,7 +672,7 @@ class DeviceHandler:
luks_handler = None
dev_path = part_mod.safe_dev_path
self.mount(
mount(
dev_path,
self._TMP_BTRFS_MOUNT,
create_target_mountpoint=True,
@ -769,43 +770,6 @@ class DeviceHandler:
except SysCallError as err:
raise DiskError(f'Could not enable swap {path}:\n{err.message}')
def mount(
self,
dev_path: Path,
target_mountpoint: Path,
mount_fs: str | None = None,
create_target_mountpoint: bool = True,
options: list[str] = [],
) -> None:
if create_target_mountpoint and not target_mountpoint.exists():
target_mountpoint.mkdir(parents=True, exist_ok=True)
if not target_mountpoint.exists():
raise ValueError('Target mountpoint does not exist')
lsblk_info = get_lsblk_info(dev_path)
if target_mountpoint in lsblk_info.mountpoints:
info(f'Device already mounted at {target_mountpoint}')
return
cmd = ['mount']
if len(options):
cmd.extend(('-o', ','.join(options)))
if mount_fs:
cmd.extend(('-t', mount_fs))
cmd.extend((str(dev_path), str(target_mountpoint)))
command = ' '.join(cmd)
debug(f'Mounting {dev_path}: {command}')
try:
SysCommand(command)
except SysCallError as err:
raise DiskError(f'Could not mount {dev_path}: {command}\n{err.message}')
def detect_pre_mounted_mods(self, base_mountpoint: Path) -> list[DeviceModification]:
part_mods: dict[Path, list[PartitionModification]] = {}

View File

@ -5,7 +5,7 @@ from pydantic import BaseModel
from archinstall.lib.command import SysCommand
from archinstall.lib.exceptions import DiskError, SysCallError
from archinstall.lib.models.device import LsblkInfo
from archinstall.lib.output import debug, warn
from archinstall.lib.output import debug, info, warn
class LsblkOutput(BaseModel):
@ -67,12 +67,12 @@ def get_lsblk_output() -> LsblkOutput:
def find_lsblk_info(
dev_path: Path | str,
info: list[LsblkInfo],
info_list: list[LsblkInfo],
) -> LsblkInfo | None:
if isinstance(dev_path, str):
dev_path = Path(dev_path)
for lsblk_info in info:
for lsblk_info in info_list:
if lsblk_info.path == dev_path:
return lsblk_info
@ -110,6 +110,43 @@ def disk_layouts() -> str:
return lsblk_output.model_dump_json(indent=4)
def mount(
dev_path: Path,
target_mountpoint: Path,
mount_fs: str | None = None,
create_target_mountpoint: bool = True,
options: list[str] = [],
) -> None:
if create_target_mountpoint and not target_mountpoint.exists():
target_mountpoint.mkdir(parents=True, exist_ok=True)
if not target_mountpoint.exists():
raise ValueError('Target mountpoint does not exist')
lsblk_info = get_lsblk_info(dev_path)
if target_mountpoint in lsblk_info.mountpoints:
info(f'Device already mounted at {target_mountpoint}')
return
cmd = ['mount']
if len(options):
cmd.extend(('-o', ','.join(options)))
if mount_fs:
cmd.extend(('-t', mount_fs))
cmd.extend((str(dev_path), str(target_mountpoint)))
command = ' '.join(cmd)
debug(f'Mounting {dev_path}: {command}')
try:
SysCommand(command)
except SysCallError as err:
raise DiskError(f'Could not mount {dev_path}: {command}\n{err.message}')
def umount(mountpoint: Path, recursive: bool = False) -> None:
lsblk_info = get_lsblk_info(mountpoint)

View File

@ -16,7 +16,7 @@ from typing import Any, Self
from archinstall.lib.disk.device_handler import device_handler
from archinstall.lib.disk.fido import Fido2
from archinstall.lib.disk.utils import get_lsblk_by_mountpoint, get_lsblk_info
from archinstall.lib.disk.utils import get_lsblk_by_mountpoint, get_lsblk_info, mount
from archinstall.lib.models.application import ZramAlgorithm
from archinstall.lib.models.device import (
DiskEncryption,
@ -360,7 +360,7 @@ class Installer:
# it would be none if it's btrfs as the subvolumes will have the mountpoints defined
if part_mod.mountpoint:
target = self.target / part_mod.relative_mountpoint
device_handler.mount(part_mod.dev_path, target, options=part_mod.mount_options)
mount(part_mod.dev_path, target, options=part_mod.mount_options)
elif part_mod.fs_type == FilesystemType.Btrfs:
# Only mount BTRFS subvolumes that have mountpoints specified
subvols_with_mountpoints = [sv for sv in part_mod.btrfs_subvols if sv.mountpoint is not None]
@ -377,7 +377,7 @@ class Installer:
if volume.fs_type != FilesystemType.Btrfs:
if volume.mountpoint and volume.dev_path:
target = self.target / volume.relative_mountpoint
device_handler.mount(volume.dev_path, target, options=volume.mount_options)
mount(volume.dev_path, target, options=volume.mount_options)
if volume.fs_type == FilesystemType.Btrfs and volume.dev_path:
# Only mount BTRFS subvolumes that have mountpoints specified
@ -396,13 +396,13 @@ class Installer:
self._mount_btrfs_subvol(luks_handler.mapper_dev, part_mod.btrfs_subvols, part_mod.mount_options)
elif part_mod.mountpoint:
target = self.target / part_mod.relative_mountpoint
device_handler.mount(luks_handler.mapper_dev, target, options=part_mod.mount_options)
mount(luks_handler.mapper_dev, target, options=part_mod.mount_options)
def _mount_luks_volume(self, volume: LvmVolume, luks_handler: Luks2) -> None:
if volume.fs_type != FilesystemType.Btrfs:
if volume.mountpoint and luks_handler.mapper_dev:
target = self.target / volume.relative_mountpoint
device_handler.mount(luks_handler.mapper_dev, target, options=volume.mount_options)
mount(luks_handler.mapper_dev, target, options=volume.mount_options)
if volume.fs_type == FilesystemType.Btrfs and luks_handler.mapper_dev:
# Only mount BTRFS subvolumes that have mountpoints specified
@ -421,7 +421,7 @@ class Installer:
for subvol in sorted(subvols_with_mountpoints, key=lambda x: x.relative_mountpoint):
mountpoint = self.target / subvol.relative_mountpoint
options = mount_options + [f'subvol={subvol.name}']
device_handler.mount(dev_path, mountpoint, options=options)
mount(dev_path, mountpoint, options=options)
def generate_key_files(self) -> None:
match self._disk_encryption.encryption_type: