670 lines
22 KiB
Python
670 lines
22 KiB
Python
import glob
|
|
import time
|
|
import logging
|
|
import json
|
|
import os
|
|
import hashlib
|
|
import typing
|
|
from dataclasses import dataclass, field
|
|
from pathlib import Path
|
|
from typing import Optional, Dict, Any, List, Union, Iterator
|
|
|
|
from .blockdevice import BlockDevice
|
|
from .helpers import get_filesystem_type, convert_size_to_gb, split_bind_name
|
|
from ..storage import storage
|
|
from ..exceptions import DiskError, SysCallError, UnknownFilesystemFormat
|
|
from ..output import log
|
|
from ..general import SysCommand
|
|
from .btrfs.btrfs_helpers import subvolume_info_from_path
|
|
from .btrfs.btrfssubvolumeinfo import BtrfsSubvolumeInfo
|
|
|
|
@dataclass
|
|
class PartitionInfo:
|
|
partition_object: 'Partition'
|
|
device_path: str # This would be /dev/sda1 for instance
|
|
bootable: bool
|
|
size: float
|
|
sector_size: int
|
|
start: Optional[int]
|
|
end: Optional[int]
|
|
pttype: Optional[str]
|
|
filesystem_type: Optional[str]
|
|
partuuid: Optional[str]
|
|
uuid: Optional[str]
|
|
mountpoints: List[Path] = field(default_factory=list)
|
|
|
|
def __post_init__(self):
|
|
if not all([self.partuuid, self.uuid]):
|
|
for i in range(storage['DISK_RETRY_ATTEMPTS']):
|
|
lsblk_info = SysCommand(f"lsblk --json -b -o+LOG-SEC,SIZE,PTTYPE,PARTUUID,UUID,FSTYPE {self.device_path}").decode('UTF-8')
|
|
try:
|
|
lsblk_info = json.loads(lsblk_info)
|
|
except json.decoder.JSONDecodeError:
|
|
log(f"Could not decode JSON: {lsblk_info}", fg="red", level=logging.ERROR)
|
|
raise DiskError(f'Failed to retrieve information for "{self.device_path}" with lsblk')
|
|
|
|
if not (device := lsblk_info.get('blockdevices', [None])[0]):
|
|
raise DiskError(f'Failed to retrieve information for "{self.device_path}" with lsblk')
|
|
|
|
self.partuuid = device.get('partuuid')
|
|
self.uuid = device.get('uuid')
|
|
|
|
# Lets build a list of requirements that we would like
|
|
# to retry and build (stuff that can take time between partprobes)
|
|
requirements = []
|
|
requirements.append(self.partuuid)
|
|
|
|
# Unformatted partitions won't have a UUID
|
|
if lsblk_info.get('fstype') is not None:
|
|
requirements.append(self.uuid)
|
|
|
|
if all(requirements):
|
|
break
|
|
|
|
self.partition_object.partprobe()
|
|
time.sleep(max(0.1, storage['DISK_TIMEOUTS'] * i))
|
|
|
|
def get_first_mountpoint(self) -> Optional[Path]:
|
|
if len(self.mountpoints) > 0:
|
|
return self.mountpoints[0]
|
|
return None
|
|
|
|
|
|
class Partition:
|
|
def __init__(
|
|
self,
|
|
path: str,
|
|
block_device: BlockDevice,
|
|
part_id :Optional[str] = None,
|
|
filesystem :Optional[str] = None,
|
|
mountpoint :Optional[str] = None,
|
|
encrypted :bool = False,
|
|
autodetect_filesystem :bool = True,
|
|
):
|
|
if not part_id:
|
|
part_id = os.path.basename(path)
|
|
|
|
if type(block_device) is str:
|
|
raise ValueError(f"Partition()'s 'block_device' parameter has to be a archinstall.BlockDevice() instance!")
|
|
|
|
self.block_device = block_device
|
|
self._path = path
|
|
self._part_id = part_id
|
|
self._target_mountpoint = mountpoint
|
|
self._encrypted = encrypted
|
|
self._wipe = False
|
|
self._type = 'primary'
|
|
|
|
if mountpoint:
|
|
self.mount(mountpoint)
|
|
|
|
try:
|
|
self._partition_info = self._fetch_information()
|
|
|
|
if not autodetect_filesystem and filesystem:
|
|
self._partition_info.filesystem_type = filesystem
|
|
|
|
if self._partition_info.filesystem_type == 'crypto_LUKS':
|
|
self._encrypted = True
|
|
except DiskError:
|
|
self._partition_info = None
|
|
|
|
@typing.no_type_check # I hate doint this but I'm currently unsure where this is used.
|
|
def __lt__(self, left_comparitor :BlockDevice) -> bool:
|
|
if type(left_comparitor) == Partition:
|
|
left_comparitor = left_comparitor.path
|
|
else:
|
|
left_comparitor = str(left_comparitor)
|
|
|
|
# The goal is to check if /dev/nvme0n1p1 comes before /dev/nvme0n1p5
|
|
return self._path < left_comparitor
|
|
|
|
def __repr__(self, *args :str, **kwargs :str) -> str:
|
|
mount_repr = ''
|
|
if self._partition_info:
|
|
if mountpoint := self._partition_info.get_first_mountpoint():
|
|
mount_repr = f", mounted={mountpoint}"
|
|
elif self._target_mountpoint:
|
|
mount_repr = f", rel_mountpoint={self._target_mountpoint}"
|
|
|
|
classname = self.__class__.__name__
|
|
|
|
if not self._partition_info:
|
|
return f'{classname}(path={self._path})'
|
|
elif self._encrypted:
|
|
return f'{classname}(path={self._path}, size={self.size}, PARTUUID={self.part_uuid}, parent={self.real_device}, fs={self._partition_info.filesystem_type}{mount_repr})'
|
|
else:
|
|
return f'{classname}(path={self._path}, size={self.size}, PARTUUID={self.part_uuid}, fs={self._partition_info.filesystem_type}{mount_repr})'
|
|
|
|
def as_json(self) -> Dict[str, Any]:
|
|
"""
|
|
this is used for the table representation of the partition (see FormattedOutput)
|
|
"""
|
|
partition_info = {
|
|
'type': self._type,
|
|
'PARTUUID': self.part_uuid,
|
|
'wipe': self._wipe,
|
|
'boot': self.boot,
|
|
'ESP': self.boot,
|
|
'mountpoint': self._target_mountpoint,
|
|
'encrypted': self._encrypted,
|
|
'start': self.start,
|
|
'size': self.end,
|
|
'filesystem': self._partition_info.filesystem_type if self._partition_info else 'Unknown'
|
|
}
|
|
|
|
return partition_info
|
|
|
|
def __dump__(self) -> Dict[str, Any]:
|
|
# TODO remove this in favour of as_json
|
|
return {
|
|
'type': self._type,
|
|
'PARTUUID': self.part_uuid,
|
|
'wipe': self._wipe,
|
|
'boot': self.boot,
|
|
'ESP': self.boot,
|
|
'mountpoint': self._target_mountpoint,
|
|
'encrypted': self._encrypted,
|
|
'start': self.start,
|
|
'size': self.end,
|
|
'filesystem': {
|
|
'format': self._partition_info.filesystem_type if self._partition_info else 'None'
|
|
}
|
|
}
|
|
|
|
def _call_lsblk(self) -> Dict[str, Any]:
|
|
for retry_attempt in range(storage['DISK_RETRY_ATTEMPTS']):
|
|
self.partprobe()
|
|
time.sleep(max(0.1, storage['DISK_TIMEOUTS'] * retry_attempt)) # TODO: Remove, we should be relying on blkid instead of lsblk
|
|
# This sleep might be overkill, but lsblk is known to
|
|
# work against a chaotic cache that can change during call
|
|
# causing no information to be returned (blkid is better)
|
|
# time.sleep(1)
|
|
|
|
# TODO: Maybe incorporate a re-try system here based on time.sleep(max(0.1, storage.get('DISK_TIMEOUTS', 1)))
|
|
|
|
try:
|
|
output = SysCommand(f"lsblk --json -b -o+LOG-SEC,SIZE,PTTYPE,PARTUUID,UUID,FSTYPE {self.device_path}").decode('UTF-8')
|
|
except SysCallError as error:
|
|
# It appears as if lsblk can return exit codes like 8192 to indicate something.
|
|
# But it does return output in stderr so we'll try to catch it minus the message/info.
|
|
output = error.worker.decode('UTF-8')
|
|
if '{' in output:
|
|
output = output[output.find('{'):]
|
|
|
|
if output:
|
|
try:
|
|
lsblk_info = json.loads(output)
|
|
return lsblk_info
|
|
except json.decoder.JSONDecodeError:
|
|
log(f"Could not decode JSON: {output}", fg="red", level=logging.ERROR)
|
|
|
|
raise DiskError(f'Failed to get partition information "{self.device_path}" with lsblk')
|
|
|
|
def _call_sfdisk(self) -> Dict[str, Any]:
|
|
output = SysCommand(f"sfdisk --json {self.block_device.path}").decode('UTF-8')
|
|
|
|
if output:
|
|
sfdisk_info = json.loads(output)
|
|
partitions = sfdisk_info.get('partitiontable', {}).get('partitions', [])
|
|
node = list(filter(lambda x: x['node'] == self._path, partitions))
|
|
|
|
if len(node) > 0:
|
|
return node[0]
|
|
|
|
return {}
|
|
|
|
raise DiskError(f'Failed to read disk "{self.block_device.path}" with sfdisk')
|
|
|
|
def _fetch_information(self) -> PartitionInfo:
|
|
lsblk_info = self._call_lsblk()
|
|
sfdisk_info = self._call_sfdisk()
|
|
|
|
if not (device := lsblk_info.get('blockdevices', [])):
|
|
raise DiskError(f'Failed to retrieve information for "{self.device_path}" with lsblk')
|
|
|
|
# Grab the first (and only) block device in the list as we're targeting a specific partition
|
|
device = device[0]
|
|
|
|
mountpoints = [Path(mountpoint) for mountpoint in device['mountpoints'] if mountpoint]
|
|
bootable = sfdisk_info.get('bootable', False) or sfdisk_info.get('type', '') == 'C12A7328-F81F-11D2-BA4B-00A0C93EC93B'
|
|
|
|
return PartitionInfo(
|
|
partition_object=self,
|
|
device_path=self._path,
|
|
pttype=device['pttype'],
|
|
partuuid=device['partuuid'],
|
|
uuid=device['uuid'],
|
|
sector_size=device['log-sec'],
|
|
size=convert_size_to_gb(device['size']),
|
|
start=sfdisk_info.get('start', None),
|
|
end=sfdisk_info.get('size', None),
|
|
bootable=bootable,
|
|
filesystem_type=device['fstype'],
|
|
mountpoints=mountpoints
|
|
)
|
|
|
|
@property
|
|
def target_mountpoint(self) -> Optional[str]:
|
|
return self._target_mountpoint
|
|
|
|
@property
|
|
def path(self) -> str:
|
|
return self._path
|
|
|
|
@property
|
|
def filesystem(self) -> str:
|
|
if self._partition_info:
|
|
return self._partition_info.filesystem_type
|
|
|
|
@property
|
|
def mountpoint(self) -> Optional[Path]:
|
|
if len(self.mountpoints) > 0:
|
|
return self.mountpoints[0]
|
|
return None
|
|
|
|
@property
|
|
def mountpoints(self) -> List[Path]:
|
|
if self._partition_info:
|
|
return self._partition_info.mountpoints
|
|
|
|
@property
|
|
def sector_size(self) -> int:
|
|
if self._partition_info:
|
|
return self._partition_info.sector_size
|
|
|
|
@property
|
|
def start(self) -> Optional[int]:
|
|
if self._partition_info:
|
|
return self._partition_info.start
|
|
|
|
@property
|
|
def end(self) -> Optional[int]:
|
|
if self._partition_info:
|
|
return self._partition_info.end
|
|
|
|
@property
|
|
def end_sectors(self) -> Optional[int]:
|
|
if self._partition_info:
|
|
start = self._partition_info.start
|
|
end = self._partition_info.end
|
|
if start and end:
|
|
return start + end
|
|
|
|
@property
|
|
def size(self) -> Optional[float]:
|
|
if self._partition_info:
|
|
return self._partition_info.size
|
|
|
|
@property
|
|
def boot(self) -> bool:
|
|
if self._partition_info:
|
|
return self._partition_info.bootable
|
|
|
|
@property
|
|
def partition_type(self) -> Optional[str]:
|
|
if self._partition_info:
|
|
return self._partition_info.pttype
|
|
|
|
@property
|
|
def part_uuid(self) -> str:
|
|
if self._partition_info:
|
|
return self._partition_info.partuuid
|
|
|
|
@property
|
|
def uuid(self) -> Optional[str]:
|
|
"""
|
|
Returns the UUID as returned by lsblk for the **partition**.
|
|
This is more reliable than relying on /dev/disk/by-uuid as
|
|
it doesn't seam to be able to detect md raid partitions.
|
|
For bind mounts all the subvolumes share the same uuid
|
|
"""
|
|
for i in range(storage['DISK_RETRY_ATTEMPTS']):
|
|
if not self.partprobe():
|
|
raise DiskError(f"Could not perform partprobe on {self.device_path}")
|
|
|
|
time.sleep(storage.get('DISK_TIMEOUTS', 1) * i)
|
|
|
|
partuuid = self._safe_uuid
|
|
if partuuid:
|
|
return partuuid
|
|
|
|
raise DiskError(f"Could not get PARTUUID for {self.path} using 'blkid -s PARTUUID -o value {self.path}'")
|
|
|
|
@property
|
|
def _safe_uuid(self) -> Optional[str]:
|
|
"""
|
|
A near copy of self.uuid but without any delays.
|
|
This function should only be used where uuid is not crucial.
|
|
For instance when you want to get a __repr__ of the class.
|
|
"""
|
|
if not self.partprobe():
|
|
if self.block_device.partition_type == 'iso9660':
|
|
return None
|
|
|
|
log(f"Could not reliably refresh PARTUUID of partition {self.device_path} due to partprobe error.", level=logging.DEBUG)
|
|
|
|
try:
|
|
return SysCommand(f'blkid -s UUID -o value {self.device_path}').decode('UTF-8').strip()
|
|
except SysCallError as error:
|
|
if self.block_device.partition_type == 'iso9660':
|
|
# Parent device is a Optical Disk (.iso dd'ed onto a device for instance)
|
|
return None
|
|
|
|
log(f"Could not get PARTUUID of partition using 'blkid -s UUID -o value {self.device_path}': {error}")
|
|
|
|
@property
|
|
def _safe_part_uuid(self) -> Optional[str]:
|
|
"""
|
|
A near copy of self.uuid but without any delays.
|
|
This function should only be used where uuid is not crucial.
|
|
For instance when you want to get a __repr__ of the class.
|
|
"""
|
|
if not self.partprobe():
|
|
if self.block_device.partition_type == 'iso9660':
|
|
return None
|
|
|
|
log(f"Could not reliably refresh PARTUUID of partition {self.device_path} due to partprobe error.", level=logging.DEBUG)
|
|
|
|
try:
|
|
return SysCommand(f'blkid -s PARTUUID -o value {self.device_path}').decode('UTF-8').strip()
|
|
except SysCallError as error:
|
|
if self.block_device.partition_type == 'iso9660':
|
|
# Parent device is a Optical Disk (.iso dd'ed onto a device for instance)
|
|
return None
|
|
|
|
log(f"Could not get PARTUUID of partition using 'blkid -s PARTUUID -o value {self.device_path}': {error}")
|
|
|
|
if self._partition_info:
|
|
return self._partition_info.uuid
|
|
|
|
@property
|
|
def encrypted(self) -> Union[bool, None]:
|
|
return self._encrypted
|
|
|
|
@property
|
|
def parent(self) -> str:
|
|
return self.real_device
|
|
|
|
@property
|
|
def real_device(self) -> str:
|
|
output = SysCommand('lsblk -J').decode('UTF-8')
|
|
|
|
if output:
|
|
for blockdevice in json.loads(output)['blockdevices']:
|
|
if parent := self.find_parent_of(blockdevice, os.path.basename(self.device_path)):
|
|
return f"/dev/{parent}"
|
|
return self._path
|
|
|
|
raise DiskError('Unable to get disk information for command "lsblk -J"')
|
|
|
|
@property
|
|
def device_path(self) -> str:
|
|
""" for bind mounts returns the physical path of the partition
|
|
"""
|
|
device_path, bind_name = split_bind_name(self._path)
|
|
return device_path
|
|
|
|
@property
|
|
def bind_name(self) -> str:
|
|
""" for bind mounts returns the bind name (subvolume path).
|
|
Returns none if this property does not exist
|
|
"""
|
|
device_path, bind_name = split_bind_name(self._path)
|
|
return bind_name
|
|
|
|
@property
|
|
def subvolumes(self) -> Iterator[BtrfsSubvolumeInfo]:
|
|
from .helpers import findmnt
|
|
|
|
def iterate_children_recursively(information):
|
|
for child in information.get('children', []):
|
|
if target := child.get('target'):
|
|
if child.get('fstype') == 'btrfs':
|
|
if subvolume := subvolume_info_from_path(Path(target)):
|
|
yield subvolume
|
|
|
|
if child.get('children'):
|
|
for subchild in iterate_children_recursively(child):
|
|
yield subchild
|
|
|
|
if self._partition_info.filesystem_type == 'btrfs':
|
|
for mountpoint in self._partition_info.mountpoints:
|
|
if result := findmnt(mountpoint):
|
|
for filesystem in result.get('filesystems', []):
|
|
if subvolume := subvolume_info_from_path(mountpoint):
|
|
yield subvolume
|
|
|
|
for child in iterate_children_recursively(filesystem):
|
|
yield child
|
|
|
|
def partprobe(self) -> bool:
|
|
try:
|
|
if self.block_device:
|
|
return 0 == SysCommand(f'partprobe {self.block_device.device}').exit_code
|
|
except SysCallError as error:
|
|
log(f"Unreliable results might be given for {self._path} due to partprobe error: {error}", level=logging.DEBUG)
|
|
|
|
return False
|
|
|
|
def detect_inner_filesystem(self, password :str) -> Optional[str]:
|
|
log(f'Trying to detect inner filesystem format on {self} (This might take a while)', level=logging.INFO)
|
|
from ..luks import luks2
|
|
|
|
try:
|
|
with luks2(self, storage.get('ENC_IDENTIFIER', 'ai') + 'loop', password, auto_unmount=True) as unlocked_device:
|
|
return unlocked_device.filesystem
|
|
except SysCallError:
|
|
pass
|
|
return None
|
|
|
|
def has_content(self) -> bool:
|
|
fs_type = self._partition_info.filesystem_type
|
|
if not fs_type or "swap" in fs_type:
|
|
return False
|
|
|
|
temporary_mountpoint = '/tmp/' + hashlib.md5(bytes(f"{time.time()}", 'UTF-8') + os.urandom(12)).hexdigest()
|
|
temporary_path = Path(temporary_mountpoint)
|
|
|
|
temporary_path.mkdir(parents=True, exist_ok=True)
|
|
if (handle := SysCommand(f'/usr/bin/mount {self._path} {temporary_mountpoint}')).exit_code != 0:
|
|
raise DiskError(f'Could not mount and check for content on {self._path} because: {handle}')
|
|
|
|
files = len(glob.glob(f"{temporary_mountpoint}/*"))
|
|
iterations = 0
|
|
while SysCommand(f"/usr/bin/umount -R {temporary_mountpoint}").exit_code != 0 and (iterations := iterations + 1) < 10:
|
|
time.sleep(1)
|
|
|
|
temporary_path.rmdir()
|
|
|
|
return True if files > 0 else False
|
|
|
|
def encrypt(self, password: Optional[str] = None) -> str:
|
|
"""
|
|
A wrapper function for luks2() instances and the .encrypt() method of that instance.
|
|
"""
|
|
from ..luks import luks2
|
|
|
|
handle = luks2(self, None, None)
|
|
return handle.encrypt(self, password=password)
|
|
|
|
def format(self, filesystem :Optional[str] = None, path :Optional[str] = None, log_formatting :bool = True, options :List[str] = [], retry :bool = True) -> bool:
|
|
"""
|
|
Format can be given an overriding path, for instance /dev/null to test
|
|
the formatting functionality and in essence the support for the given filesystem.
|
|
"""
|
|
if filesystem is None:
|
|
filesystem = self._partition_info.filesystem_type
|
|
|
|
if path is None:
|
|
path = self._path
|
|
|
|
# This converts from fat32 -> vfat to unify filesystem names
|
|
filesystem = get_mount_fs_type(filesystem)
|
|
|
|
# To avoid "unable to open /dev/x: No such file or directory"
|
|
start_wait = time.time()
|
|
while Path(path).exists() is False and time.time() - start_wait < 10:
|
|
time.sleep(0.025)
|
|
|
|
if log_formatting:
|
|
log(f'Formatting {path} -> {filesystem}', level=logging.INFO)
|
|
|
|
try:
|
|
if filesystem == 'btrfs':
|
|
options = ['-f'] + options
|
|
|
|
mkfs = SysCommand(f"/usr/bin/mkfs.btrfs {' '.join(options)} {path}").decode('UTF-8')
|
|
if mkfs and 'UUID:' not in mkfs:
|
|
raise DiskError(f'Could not format {path} with {filesystem} because: {mkfs}')
|
|
self._partition_info.filesystem_type = filesystem
|
|
|
|
elif filesystem == 'vfat':
|
|
options = ['-F32'] + options
|
|
log(f"/usr/bin/mkfs.vfat {' '.join(options)} {path}")
|
|
if (handle := SysCommand(f"/usr/bin/mkfs.vfat {' '.join(options)} {path}")).exit_code != 0:
|
|
raise DiskError(f"Could not format {path} with {filesystem} because: {handle.decode('UTF-8')}")
|
|
self._partition_info.filesystem_type = filesystem
|
|
|
|
elif filesystem == 'ext4':
|
|
options = ['-F'] + options
|
|
|
|
if (handle := SysCommand(f"/usr/bin/mkfs.ext4 {' '.join(options)} {path}")).exit_code != 0:
|
|
raise DiskError(f"Could not format {path} with {filesystem} because: {handle.decode('UTF-8')}")
|
|
self._partition_info.filesystem_type = filesystem
|
|
|
|
elif filesystem == 'ext2':
|
|
options = ['-F'] + options
|
|
|
|
if (handle := SysCommand(f"/usr/bin/mkfs.ext2 {' '.join(options)} {path}")).exit_code != 0:
|
|
raise DiskError(f"Could not format {path} with {filesystem} because: {handle.decode('UTF-8')}")
|
|
self._partition_info.filesystem_type = 'ext2'
|
|
elif filesystem == 'xfs':
|
|
options = ['-f'] + options
|
|
|
|
if (handle := SysCommand(f"/usr/bin/mkfs.xfs {' '.join(options)} {path}")).exit_code != 0:
|
|
raise DiskError(f"Could not format {path} with {filesystem} because: {handle.decode('UTF-8')}")
|
|
self._partition_info.filesystem_type = filesystem
|
|
|
|
elif filesystem == 'f2fs':
|
|
options = ['-f'] + options
|
|
|
|
if (handle := SysCommand(f"/usr/bin/mkfs.f2fs {' '.join(options)} {path}")).exit_code != 0:
|
|
raise DiskError(f"Could not format {path} with {filesystem} because: {handle.decode('UTF-8')}")
|
|
self._partition_info.filesystem_type = filesystem
|
|
|
|
elif filesystem == 'ntfs3':
|
|
options = ['-f'] + options
|
|
|
|
if (handle := SysCommand(f"/usr/bin/mkfs.ntfs -Q {' '.join(options)} {path}")).exit_code != 0:
|
|
raise DiskError(f"Could not format {path} with {filesystem} because: {handle.decode('UTF-8')}")
|
|
self._partition_info.filesystem_type = filesystem
|
|
|
|
elif filesystem == 'crypto_LUKS':
|
|
# from ..luks import luks2
|
|
# encrypted_partition = luks2(self, None, None)
|
|
# encrypted_partition.format(path)
|
|
self._partition_info.filesystem_type = filesystem
|
|
|
|
else:
|
|
raise UnknownFilesystemFormat(f"Fileformat '{filesystem}' is not yet implemented.")
|
|
except SysCallError as error:
|
|
log(f"Formatting ran in to an error: {error}", level=logging.WARNING, fg="orange")
|
|
if retry is True:
|
|
log(f"Retrying in {storage.get('DISK_TIMEOUTS', 1)} seconds.", level=logging.WARNING, fg="orange")
|
|
time.sleep(storage.get('DISK_TIMEOUTS', 1))
|
|
|
|
return self.format(filesystem, path, log_formatting, options, retry=False)
|
|
|
|
if get_filesystem_type(path) == 'crypto_LUKS' or get_filesystem_type(self.real_device) == 'crypto_LUKS':
|
|
self._encrypted = True
|
|
else:
|
|
self._encrypted = False
|
|
|
|
return True
|
|
|
|
def find_parent_of(self, data :Dict[str, Any], name :str, parent :Optional[str] = None) -> Optional[str]:
|
|
if data['name'] == name:
|
|
return parent
|
|
elif 'children' in data:
|
|
for child in data['children']:
|
|
if parent := self.find_parent_of(child, name, parent=data['name']):
|
|
return parent
|
|
|
|
return None
|
|
|
|
def mount(self, target :str, fs :Optional[str] = None, options :str = '') -> bool:
|
|
if not self._partition_info.get_first_mountpoint():
|
|
log(f'Mounting {self} to {target}', level=logging.INFO)
|
|
|
|
if not fs:
|
|
fs = self._partition_info.filesystem_type
|
|
|
|
fs_type = get_mount_fs_type(fs)
|
|
|
|
Path(target).mkdir(parents=True, exist_ok=True)
|
|
|
|
if self.bind_name:
|
|
device_path = self.device_path
|
|
# TODO options should be better be a list than a string
|
|
if options:
|
|
options = f"{options},subvol={self.bind_name}"
|
|
else:
|
|
options = f"subvol={self.bind_name}"
|
|
else:
|
|
device_path = self._path
|
|
try:
|
|
if options:
|
|
mnt_handle = SysCommand(f"/usr/bin/mount -t {fs_type} -o {options} {device_path} {target}")
|
|
else:
|
|
mnt_handle = SysCommand(f"/usr/bin/mount -t {fs_type} {device_path} {target}")
|
|
|
|
# TODO: Should be redundant to check for exit_code
|
|
if mnt_handle.exit_code != 0:
|
|
raise DiskError(f"Could not mount {self._path} to {target} using options {options}")
|
|
except SysCallError as err:
|
|
raise err
|
|
|
|
# Update the partition info since the mount info has changed after this call.
|
|
self._partition_info = self._fetch_information()
|
|
return True
|
|
|
|
return False
|
|
|
|
def unmount(self) -> bool:
|
|
worker = SysCommand(f"/usr/bin/umount {self._path}")
|
|
exit_code = worker.exit_code
|
|
|
|
# Without to much research, it seams that low error codes are errors.
|
|
# And above 8k is indicators such as "/dev/x not mounted.".
|
|
# So anything in between 0 and 8k are errors (?).
|
|
if exit_code and 0 < exit_code < 8000:
|
|
raise SysCallError(f"Could not unmount {self._path} properly: {worker}", exit_code=exit_code)
|
|
|
|
# Update the partition info since the mount info has changed after this call.
|
|
self._partition_info = self._fetch_information()
|
|
return True
|
|
|
|
def filesystem_supported(self) -> bool:
|
|
"""
|
|
The support for a filesystem (this partition) is tested by calling
|
|
partition.format() with a path set to '/dev/null' which returns two exceptions:
|
|
1. SysCallError saying that /dev/null is not formattable - but the filesystem is supported
|
|
2. UnknownFilesystemFormat that indicates that we don't support the given filesystem type
|
|
"""
|
|
try:
|
|
self.format(self._partition_info.filesystem_type, '/dev/null', log_formatting=False)
|
|
except (SysCallError, DiskError):
|
|
pass # We supported it, but /dev/null is not formattable as expected so the mkfs call exited with an error code
|
|
except UnknownFilesystemFormat as err:
|
|
raise err
|
|
return True
|
|
|
|
|
|
def get_mount_fs_type(fs :str) -> str:
|
|
if fs == 'ntfs':
|
|
return 'ntfs3' # Needed to use the Paragon R/W NTFS driver
|
|
elif fs == 'fat32':
|
|
return 'vfat' # This is the actual type used for fat32 mounting
|
|
return fs
|