Cleanup partition (#1333)

* Cleanup partition

* Update

* Remove unused method

* Update partitioning

* Update

* Update

* Fix mypy

Co-authored-by: Daniel Girtler <girtler.daniel@gmail.com>
This commit is contained in:
Daniel Girtler 2022-07-26 18:46:50 +10:00 committed by GitHub
parent 5c3c1312a4
commit 9194f6d859
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
7 changed files with 229 additions and 285 deletions

View File

@ -88,9 +88,6 @@ class BlockDevice:
raise KeyError(f'{self.info} does not contain information: "{key}"')
def __len__(self) -> int:
return len(self.partitions)
def __lt__(self, left_comparitor :'BlockDevice') -> bool:
return self._path < left_comparitor.path
@ -121,6 +118,8 @@ class BlockDevice:
def _load_partitions(self):
from .partition import Partition
self._partitions.clear()
lsblk_info = self._call_lsblk(self._path)
device = lsblk_info['blockdevices'][0]
self._partitions.clear()
@ -233,8 +232,6 @@ class BlockDevice:
@property
def partitions(self) -> Dict[str, 'Partition']:
self._partprobe()
self._load_partitions()
return OrderedDict(sorted(self._partitions.items()))
@property
@ -282,7 +279,7 @@ class BlockDevice:
try:
if uuid and partition.uuid and partition.uuid.lower() == uuid.lower():
return partition
elif partuuid and partition.part_uuid.lower() == partuuid.lower():
elif partuuid and partition.part_uuid and partition.part_uuid.lower() == partuuid.lower():
return partition
except DiskError as error:
# Most likely a blockdevice that doesn't support or use UUID's
@ -291,6 +288,7 @@ class BlockDevice:
pass
log(f"uuid {uuid} or {partuuid} not found. Waiting {storage.get('DISK_TIMEOUTS', 1) * count}s for next attempt",level=logging.DEBUG)
self.flush_cache()
time.sleep(storage.get('DISK_TIMEOUTS', 1) * count)
log(f"Could not find {uuid}/{partuuid} in disk after 5 retries", level=logging.INFO)

View File

@ -17,22 +17,11 @@ if TYPE_CHECKING:
from ...installer import Installer
from .btrfssubvolumeinfo import BtrfsSubvolumeInfo
class BTRFSPartition(Partition):
def __init__(self, *args, **kwargs):
Partition.__init__(self, *args, **kwargs)
def __repr__(self, *args :str, **kwargs :str) -> str:
mount_repr = ''
if self.mountpoint:
mount_repr = f", mounted={self.mountpoint}"
elif self.target_mountpoint:
mount_repr = f", rel_mountpoint={self.target_mountpoint}"
if self._encrypted:
return f'BTRFSPartition(path={self.path}, size={self.size}, PARTUUID={self._safe_uuid}, parent={self.real_device}, fs={self.filesystem}{mount_repr})'
else:
return f'BTRFSPartition(path={self.path}, size={self.size}, PARTUUID={self._safe_uuid}, fs={self.filesystem}{mount_repr})'
@property
def subvolumes(self):
for filesystem in findmnt(pathlib.Path(self.path), recurse=True).get('filesystems', []):
@ -40,11 +29,11 @@ class BTRFSPartition(Partition):
yield subvolume_info_from_path(filesystem['target'])
def iterate_children(struct):
for child in struct.get('children', []):
for c in struct.get('children', []):
if '[' in child.get('source', ''):
yield subvolume_info_from_path(child['target'])
yield subvolume_info_from_path(c['target'])
for sub_child in iterate_children(child):
for sub_child in iterate_children(c):
yield sub_child
for child in iterate_children(filesystem):

View File

@ -210,7 +210,14 @@ class Filesystem:
# TODO: Implement this with declarative profiles instead.
raise ValueError("Installation().use_entire_disk() has to be re-worked.")
def add_partition(self, partition_type :str, start :str, end :str, partition_format :Optional[str] = None, skip_mklabel :bool = False) -> Partition:
def add_partition(
self,
partition_type :str,
start :str,
end :str,
partition_format :Optional[str] = None,
skip_mklabel :bool = False
) -> Partition:
log(f'Adding partition to {self.blockdevice}, {start}->{end}', level=logging.INFO)
if len(self.blockdevice.partitions) == 0 and skip_mklabel is False:
@ -232,6 +239,7 @@ class Filesystem:
except DiskError:
pass
# TODO this check should probably run in the setup process rather than during the installation
if self.mode == MBR:
if len(self.blockdevice.partitions) > 3:
DiskError("Too many partitions on disk, MBR disks can only have 3 primary partitions")
@ -246,14 +254,9 @@ class Filesystem:
if self.parted(parted_string):
for count in range(storage.get('DISK_RETRY_ATTEMPTS', 3)):
self.partprobe()
self.blockdevice.flush_cache()
new_partition_uuids = []
for partition in self.blockdevice.partitions.values():
try:
new_partition_uuids.append(partition.part_uuid)
except DiskError:
pass
new_partition_uuids = [partition.part_uuid for partition in self.blockdevice.partitions.values()]
new_partuuid_set = (set(previous_partuuids) ^ set(new_partition_uuids))
if len(new_partuuid_set) and (new_partuuid := new_partuuid_set.pop()):
@ -263,17 +266,20 @@ class Filesystem:
log(f'Blockdevice: {self.blockdevice}', level=logging.ERROR, fg="red")
log(f'Partitions: {self.blockdevice.partitions}', level=logging.ERROR, fg="red")
log(f'Partition set: {new_partuuid_set}', level=logging.ERROR, fg="red")
log(f'New UUID: {[new_partuuid]}', level=logging.ERROR, fg="red")
log(f'New PARTUUID: {[new_partuuid]}', level=logging.ERROR, fg="red")
log(f'get_partition(): {self.blockdevice.get_partition}', level=logging.ERROR, fg="red")
raise err
else:
log(f"Could not get UUID for partition. Waiting {storage.get('DISK_TIMEOUTS', 1) * count}s before retrying.",level=logging.DEBUG)
time.sleep(storage.get('DISK_TIMEOUTS', 1) * count)
total_partitions = set([partition.part_uuid for partition in self.blockdevice.partitions.values()])
total_partitions.update(previous_partuuids)
# TODO: This should never be able to happen
log(f"Could not find the new PARTUUID after adding the partition.", level=logging.ERROR, fg="red")
log(f"Previous partitions: {previous_partuuids}", level=logging.ERROR, fg="red")
log(f"New partitions: {(previous_partuuids ^ {partition.part_uuid for partition in self.blockdevice.partitions.values()})}", level=logging.ERROR, fg="red")
log(f"New partitions: {total_partitions}", level=logging.ERROR, fg="red")
raise DiskError(f"Could not add partition using: {parted_string}")
def set_name(self, partition: int, name: str) -> bool:

View File

@ -370,7 +370,7 @@ def get_all_targets(data :Dict[str, Any], filters :Dict[str, None] = {}) -> Dict
return filters
def get_partitions_in_use(mountpoint :str) -> List[Partition]:
def get_partitions_in_use(mountpoint :str) -> Dict[str, Any]:
from .partition import Partition
try:
@ -393,8 +393,12 @@ def get_partitions_in_use(mountpoint :str) -> List[Partition]:
if not type(blockdev) in (Partition, MapperDev):
continue
for blockdev_mountpoint in blockdev.mount_information:
block_devices_mountpoints[blockdev_mountpoint['target']] = blockdev
if isinstance(blockdev, Partition):
for blockdev_mountpoint in blockdev.mountpoints:
block_devices_mountpoints[blockdev_mountpoint] = blockdev
else:
for blockdev_mountpoint in blockdev.mount_information:
block_devices_mountpoints[blockdev_mountpoint['target']] = blockdev
log(f'Filtering available mounts {block_devices_mountpoints} to those under {mountpoint}', level=logging.DEBUG)

View File

@ -1,14 +1,16 @@
import glob
import pathlib
import time
import logging
import json
import os
import hashlib
import typing
from dataclasses import dataclass
from pathlib import Path
from typing import Optional, Dict, Any, List, Union, Iterator
from .blockdevice import BlockDevice
from .helpers import find_mountpoint, get_filesystem_type, convert_size_to_gb, split_bind_name
from .helpers import get_filesystem_type, convert_size_to_gb, split_bind_name
from ..storage import storage
from ..exceptions import DiskError, SysCallError, UnknownFilesystemFormat
from ..output import log
@ -16,6 +18,26 @@ from ..general import SysCommand
from .btrfs.btrfs_helpers import subvolume_info_from_path
from .btrfs.btrfssubvolumeinfo import BtrfsSubvolumeInfo
@dataclass
class PartitionInfo:
pttype: str
partuuid: str
uuid: str
start: Optional[int]
end: Optional[int]
bootable: bool
size: float
sector_size: int
filesystem_type: str
mountpoints: List[Path]
def get_first_mountpoint(self) -> Optional[Path]:
if len(self.mountpoints) > 0:
return self.mountpoints[0]
return None
class Partition:
def __init__(
self,
@ -25,38 +47,37 @@ class Partition:
filesystem :Optional[str] = None,
mountpoint :Optional[str] = None,
encrypted :bool = False,
autodetect_filesystem :bool = True
autodetect_filesystem :bool = True,
):
if not part_id:
part_id = os.path.basename(path)
self.block_device = block_device
if type(self.block_device) is str:
if type(block_device) is str:
raise ValueError(f"Partition()'s 'block_device' parameter has to be a archinstall.BlockDevice() instance!")
self.path = path
self.part_id = part_id
self.target_mountpoint = mountpoint
self.filesystem = filesystem
self.block_device = block_device
self._path = path
self._part_id = part_id
self._target_mountpoint = mountpoint
self._encrypted = None
self.encrypted = encrypted
self.allow_formatting = False
self._encrypted = encrypted
self._wipe = False
self._type = 'primary'
if mountpoint:
self.mount(mountpoint)
try:
self.mount_information = list(find_mountpoint(self.path))
except DiskError:
self.mount_information = [{}]
self._partition_info = self._fetch_information()
if not self.filesystem and autodetect_filesystem:
self.filesystem = get_filesystem_type(path)
if not autodetect_filesystem and filesystem:
self._partition_info.filesystem_type = filesystem
if self.filesystem == 'crypto_LUKS':
self.encrypted = True
if self._partition_info.filesystem_type == 'crypto_LUKS':
self._encrypted = True
# I hate doint this but I'm currently unsure where this
# is acutally used to be able to fix the typing issues properly
@typing.no_type_check
def __lt__(self, left_comparitor :BlockDevice) -> bool:
if type(left_comparitor) == Partition:
left_comparitor = left_comparitor.path
@ -64,254 +85,191 @@ class Partition:
left_comparitor = str(left_comparitor)
# The goal is to check if /dev/nvme0n1p1 comes before /dev/nvme0n1p5
return self.path < left_comparitor
return self._path < left_comparitor
def __repr__(self, *args :str, **kwargs :str) -> str:
mount_repr = ''
if self.mountpoint:
mount_repr = f", mounted={self.mountpoint}"
elif self.target_mountpoint:
mount_repr = f", rel_mountpoint={self.target_mountpoint}"
if mountpoint := self._partition_info.get_first_mountpoint():
mount_repr = f", mounted={mountpoint}"
elif self._target_mountpoint:
mount_repr = f", rel_mountpoint={self._target_mountpoint}"
classname = self.__class__.__name__
if self._encrypted:
return f'Partition(path={self.path}, size={self.size}, PARTUUID={self._safe_uuid}, parent={self.real_device}, fs={self.filesystem}{mount_repr})'
return f'{classname}(path={self._path}, size={self.size}, PARTUUID={self.part_uuid}, parent={self.real_device}, fs={self._partition_info.filesystem_type}{mount_repr})'
else:
return f'Partition(path={self.path}, size={self.size}, PARTUUID={self._safe_uuid}, fs={self.filesystem}{mount_repr})'
return f'{classname}(path={self._path}, size={self.size}, PARTUUID={self.part_uuid}, fs={self._partition_info.filesystem_type}{mount_repr})'
def as_json(self) -> Dict[str, Any]:
"""
this is used for the table representation of the partition (see FormattedOutput)
"""
partition_info = {
'type': 'primary',
'PARTUUID': self._safe_uuid,
'wipe': self.allow_formatting,
'type': self._type,
'PARTUUID': self.part_uuid,
'wipe': self._wipe,
'boot': self.boot,
'ESP': self.boot,
'mountpoint': self.target_mountpoint,
'mountpoint': self._target_mountpoint,
'encrypted': self._encrypted,
'start': self.start,
'size': self.end,
'filesystem': self.filesystem_type
'filesystem': self._partition_info.filesystem_type
}
return partition_info
def __dump__(self) -> Dict[str, Any]:
# TODO remove this in favour of as_json
log(get_filesystem_type(self.path))
return {
'type': 'primary',
'PARTUUID': self._safe_uuid,
'wipe': self.allow_formatting,
'type': self._type,
'PARTUUID': self.part_uuid,
'wipe': self._wipe,
'boot': self.boot,
'ESP': self.boot,
'mountpoint': self.target_mountpoint,
'mountpoint': self._target_mountpoint,
'encrypted': self._encrypted,
'start': self.start,
'size': self.end,
'filesystem': {
'format': self.filesystem_type
'format': self._partition_info.filesystem_type
}
}
@property
def filesystem_type(self) -> Optional[str]:
return get_filesystem_type(self.path)
def _call_lsblk(self) -> Dict[str, Any]:
self.partprobe()
output = SysCommand(f"lsblk --json -b -o+LOG-SEC,SIZE,PTTYPE,PARTUUID,UUID,FSTYPE {self.device_path}").decode('UTF-8')
if output:
lsblk_info = json.loads(output)
return lsblk_info
raise DiskError(f'Failed to read disk "{self.device_path}" with lsblk')
def _call_sfdisk(self) -> Dict[str, Any]:
output = SysCommand(f"sfdisk --json {self.block_device.path}").decode('UTF-8')
if output:
sfdisk_info = json.loads(output)
partitions = sfdisk_info.get('partitiontable', {}).get('partitions', [])
node = list(filter(lambda x: x['node'] == self._path, partitions))
if len(node) > 0:
return node[0]
return {}
raise DiskError(f'Failed to read disk "{self.block_device.path}" with sfdisk')
def _fetch_information(self) -> PartitionInfo:
lsblk_info = self._call_lsblk()
sfdisk_info = self._call_sfdisk()
device = lsblk_info['blockdevices'][0]
mountpoints = [Path(mountpoint) for mountpoint in device['mountpoints'] if mountpoint]
bootable = sfdisk_info.get('bootable', False) or sfdisk_info.get('type', '') == 'C12A7328-F81F-11D2-BA4B-00A0C93EC93B'
return PartitionInfo(
pttype=device['pttype'],
partuuid=device['partuuid'],
uuid=device['uuid'],
sector_size=device['log-sec'],
size=convert_size_to_gb(device['size']),
start=sfdisk_info.get('start', None),
end=sfdisk_info.get('size', None),
bootable=bootable,
filesystem_type=device['fstype'],
mountpoints=mountpoints
)
@property
def mountpoint(self) -> Optional[str]:
try:
data = json.loads(SysCommand(f"findmnt --json -R {self.path}").decode())
for filesystem in data['filesystems']:
return pathlib.Path(filesystem.get('target'))
def target_mountpoint(self) -> Optional[str]:
return self._target_mountpoint
except SysCallError as error:
# Not mounted anywhere most likely
log(f"Could not locate mount information for {self.path}: {error}", level=logging.DEBUG, fg="grey")
pass
@property
def path(self) -> str:
return self._path
@property
def filesystem(self) -> str:
return self._partition_info.filesystem_type
@property
def mountpoint(self) -> Optional[Path]:
if len(self.mountpoints) > 0:
return self.mountpoints[0]
return None
@property
def sector_size(self) -> Optional[int]:
output = json.loads(SysCommand(f"lsblk --json -o+LOG-SEC {self.device_path}").decode('UTF-8'))
for device in output['blockdevices']:
return device.get('log-sec', None)
def mountpoints(self) -> List[Path]:
return self._partition_info.mountpoints
@property
def start(self) -> Optional[str]:
output = json.loads(SysCommand(f"sfdisk --json {self.block_device.path}").decode('UTF-8'))
for partition in output.get('partitiontable', {}).get('partitions', []):
if partition['node'] == self.path:
return partition['start'] # * self.sector_size
def sector_size(self) -> int:
return self._partition_info.sector_size
@property
def end(self) -> Optional[str]:
# TODO: actually this is size in sectors unit
# TODO: Verify that the logic holds up, that 'size' is the size without 'start' added to it.
output = json.loads(SysCommand(f"sfdisk --json {self.block_device.path}").decode('UTF-8'))
for partition in output.get('partitiontable', {}).get('partitions', []):
if partition['node'] == self.path:
return partition['size'] # * self.sector_size
def start(self) -> Optional[int]:
return self._partition_info.start
@property
def end_sectors(self) -> Optional[str]:
output = json.loads(SysCommand(f"sfdisk --json {self.block_device.path}").decode('UTF-8'))
def end(self) -> Optional[int]:
return self._partition_info.end
for partition in output.get('partitiontable', {}).get('partitions', []):
if partition['node'] == self.path:
return partition['start'] + partition['size']
@property
def end_sectors(self) -> Optional[int]:
start = self._partition_info.start
end = self._partition_info.end
if start and end:
return start + end
return None
@property
def size(self) -> Optional[float]:
for i in range(storage['DISK_RETRY_ATTEMPTS']):
self.partprobe()
time.sleep(max(0.1, storage['DISK_TIMEOUTS'] * i))
try:
lsblk = json.loads(SysCommand(f"lsblk --json -b -o+SIZE {self.device_path}").decode())
for device in lsblk['blockdevices']:
return convert_size_to_gb(device['size'])
except SysCallError as error:
if error.exit_code == 8192:
return None
else:
raise error
return self._partition_info.size
@property
def boot(self) -> bool:
output = json.loads(SysCommand(f"sfdisk --json {self.block_device.path}").decode('UTF-8'))
for partition in output.get('partitiontable', {}).get('partitions', []):
if partition['node'] == self.path:
# first condition is for MBR disks, second for GPT disks
return partition.get('bootable', False) or partition.get('type','') == 'C12A7328-F81F-11D2-BA4B-00A0C93EC93B'
return False
return self._partition_info.bootable
@property
def partition_type(self) -> Optional[str]:
lsblk = json.loads(SysCommand(f"lsblk --json -o+PTTYPE {self.device_path}").decode('UTF-8'))
for device in lsblk['blockdevices']:
return device['pttype']
return self._partition_info.pttype
@property
def part_uuid(self) -> str:
"""
Returns the PARTUUID as returned by lsblk.
This is more reliable than relying on /dev/disk/by-partuuid as
it doesn't seam to be able to detect md raid partitions.
For bind mounts all the subvolumes share the same uuid
"""
for i in range(storage['DISK_RETRY_ATTEMPTS']):
if not self.partprobe():
raise DiskError(f"Could not perform partprobe on {self.device_path}")
time.sleep(max(0.1, storage['DISK_TIMEOUTS'] * i))
partuuid = self._safe_part_uuid
if partuuid:
return partuuid
raise DiskError(f"Could not get PARTUUID for {self.path} using 'blkid -s PARTUUID -o value {self.path}'")
return self._partition_info.partuuid
@property
def uuid(self) -> Optional[str]:
"""
Returns the UUID as returned by lsblk for the **partition**.
This is more reliable than relying on /dev/disk/by-uuid as
it doesn't seam to be able to detect md raid partitions.
For bind mounts all the subvolumes share the same uuid
"""
for i in range(storage['DISK_RETRY_ATTEMPTS']):
if not self.partprobe():
raise DiskError(f"Could not perform partprobe on {self.device_path}")
time.sleep(storage.get('DISK_TIMEOUTS', 1) * i)
partuuid = self._safe_uuid
if partuuid:
return partuuid
raise DiskError(f"Could not get PARTUUID for {self.path} using 'blkid -s PARTUUID -o value {self.path}'")
@property
def _safe_uuid(self) -> Optional[str]:
"""
A near copy of self.uuid but without any delays.
This function should only be used where uuid is not crucial.
For instance when you want to get a __repr__ of the class.
"""
if not self.partprobe():
if self.block_device.partition_type == 'iso9660':
return None
log(f"Could not reliably refresh PARTUUID of partition {self.device_path} due to partprobe error.", level=logging.DEBUG)
try:
return SysCommand(f'blkid -s UUID -o value {self.device_path}').decode('UTF-8').strip()
except SysCallError as error:
if self.block_device.partition_type == 'iso9660':
# Parent device is a Optical Disk (.iso dd'ed onto a device for instance)
return None
log(f"Could not get PARTUUID of partition using 'blkid -s UUID -o value {self.device_path}': {error}")
@property
def _safe_part_uuid(self) -> Optional[str]:
"""
A near copy of self.uuid but without any delays.
This function should only be used where uuid is not crucial.
For instance when you want to get a __repr__ of the class.
"""
if not self.partprobe():
if self.block_device.partition_type == 'iso9660':
return None
log(f"Could not reliably refresh PARTUUID of partition {self.device_path} due to partprobe error.", level=logging.DEBUG)
try:
return self.block_device.uuid
except SysCallError as error:
if self.block_device.partition_type == 'iso9660':
# Parent device is a Optical Disk (.iso dd'ed onto a device for instance)
return None
log(f"Could not get PARTUUID of partition using 'blkid -s PARTUUID -o value {self.device_path}': {error}")
return self._partition_info.uuid
@property
def encrypted(self) -> Union[bool, None]:
return self._encrypted
@encrypted.setter
def encrypted(self, value: bool) -> None:
self._encrypted = value
@property
def parent(self) -> str:
return self.real_device
@property
def real_device(self) -> str:
for blockdevice in json.loads(SysCommand('lsblk -J').decode('UTF-8'))['blockdevices']:
if parent := self.find_parent_of(blockdevice, os.path.basename(self.device_path)):
return f"/dev/{parent}"
# raise DiskError(f'Could not find appropriate parent for encrypted partition {self}')
return self.path
output = SysCommand('lsblk -J').decode('UTF-8')
if output:
for blockdevice in json.loads(output)['blockdevices']:
if parent := self.find_parent_of(blockdevice, os.path.basename(self.device_path)):
return f"/dev/{parent}"
return self._path
raise DiskError('Unable to get disk information for command "lsblk -J"')
@property
def device_path(self) -> str:
""" for bind mounts returns the phisical path of the partition
""" for bind mounts returns the physical path of the partition
"""
device_path, bind_name = split_bind_name(self.path)
device_path, bind_name = split_bind_name(self._path)
return device_path
@property
@ -319,7 +277,7 @@ class Partition:
""" for bind mounts returns the bind name (subvolume path).
Returns none if this property does not exist
"""
device_path, bind_name = split_bind_name(self.path)
device_path, bind_name = split_bind_name(self._path)
return bind_name
@property
@ -330,29 +288,29 @@ class Partition:
for child in information.get('children', []):
if target := child.get('target'):
if child.get('fstype') == 'btrfs':
if subvolume := subvolume_info_from_path(pathlib.Path(target)):
if subvolume := subvolume_info_from_path(Path(target)):
yield subvolume
if child.get('children'):
for subchild in iterate_children_recursively(child):
yield subchild
for mountpoint in self.mount_information:
if result := findmnt(pathlib.Path(mountpoint['target'])):
for filesystem in result.get('filesystems', []):
if mountpoint.get('fstype') == 'btrfs':
if subvolume := subvolume_info_from_path(pathlib.Path(mountpoint['target'])):
if self._partition_info.filesystem_type == 'btrfs':
for mountpoint in self._partition_info.mountpoints:
if result := findmnt(mountpoint):
for filesystem in result.get('filesystems', []):
if subvolume := subvolume_info_from_path(mountpoint):
yield subvolume
for child in iterate_children_recursively(filesystem):
yield child
for child in iterate_children_recursively(filesystem):
yield child
def partprobe(self) -> bool:
try:
if self.block_device:
return 0 == SysCommand(f'partprobe {self.block_device.device}').exit_code
except SysCallError as error:
log(f"Unreliable results might be given for {self.path} due to partprobe error: {error}", level=logging.DEBUG)
log(f"Unreliable results might be given for {self._path} due to partprobe error: {error}", level=logging.DEBUG)
return False
@ -364,19 +322,20 @@ class Partition:
with luks2(self, storage.get('ENC_IDENTIFIER', 'ai') + 'loop', password, auto_unmount=True) as unlocked_device:
return unlocked_device.filesystem
except SysCallError:
return None
pass
return None
def has_content(self) -> bool:
fs_type = get_filesystem_type(self.path)
fs_type = self._partition_info.filesystem_type
if not fs_type or "swap" in fs_type:
return False
temporary_mountpoint = '/tmp/' + hashlib.md5(bytes(f"{time.time()}", 'UTF-8') + os.urandom(12)).hexdigest()
temporary_path = pathlib.Path(temporary_mountpoint)
temporary_path = Path(temporary_mountpoint)
temporary_path.mkdir(parents=True, exist_ok=True)
if (handle := SysCommand(f'/usr/bin/mount {self.path} {temporary_mountpoint}')).exit_code != 0:
raise DiskError(f'Could not mount and check for content on {self.path} because: {b"".join(handle)}')
if (handle := SysCommand(f'/usr/bin/mount {self._path} {temporary_mountpoint}')).exit_code != 0:
raise DiskError(f'Could not mount and check for content on {self._path} because: {handle}')
files = len(glob.glob(f"{temporary_mountpoint}/*"))
iterations = 0
@ -387,14 +346,14 @@ class Partition:
return True if files > 0 else False
def encrypt(self, *args :str, **kwargs :str) -> str:
def encrypt(self, password: Optional[str] = None) -> str:
"""
A wrapper function for luks2() instances and the .encrypt() method of that instance.
"""
from ..luks import luks2
handle = luks2(self, None, None)
return handle.encrypt(self, *args, **kwargs)
return handle.encrypt(self, password=password)
def format(self, filesystem :Optional[str] = None, path :Optional[str] = None, log_formatting :bool = True, options :List[str] = [], retry :bool = True) -> bool:
"""
@ -402,17 +361,17 @@ class Partition:
the formatting functionality and in essence the support for the given filesystem.
"""
if filesystem is None:
filesystem = self.filesystem
filesystem = self._partition_info.filesystem_type
if path is None:
path = self.path
path = self._path
# This converts from fat32 -> vfat to unify filesystem names
filesystem = get_mount_fs_type(filesystem)
# To avoid "unable to open /dev/x: No such file or directory"
start_wait = time.time()
while pathlib.Path(path).exists() is False and time.time() - start_wait < 10:
while Path(path).exists() is False and time.time() - start_wait < 10:
time.sleep(0.025)
if log_formatting:
@ -422,57 +381,57 @@ class Partition:
if filesystem == 'btrfs':
options = ['-f'] + options
if 'UUID:' not in (mkfs := SysCommand(f"/usr/bin/mkfs.btrfs {' '.join(options)} {path}").decode('UTF-8')):
mkfs = SysCommand(f"/usr/bin/mkfs.btrfs {' '.join(options)} {path}").decode('UTF-8')
if mkfs and 'UUID:' not in mkfs:
raise DiskError(f'Could not format {path} with {filesystem} because: {mkfs}')
self.filesystem = filesystem
self._partition_info.filesystem_type = filesystem
elif filesystem == 'vfat':
options = ['-F32'] + options
log(f"/usr/bin/mkfs.vfat {' '.join(options)} {path}")
if (handle := SysCommand(f"/usr/bin/mkfs.vfat {' '.join(options)} {path}")).exit_code != 0:
raise DiskError(f"Could not format {path} with {filesystem} because: {handle.decode('UTF-8')}")
self.filesystem = filesystem
self._partition_info.filesystem_type = filesystem
elif filesystem == 'ext4':
options = ['-F'] + options
if (handle := SysCommand(f"/usr/bin/mkfs.ext4 {' '.join(options)} {path}")).exit_code != 0:
raise DiskError(f"Could not format {path} with {filesystem} because: {handle.decode('UTF-8')}")
self.filesystem = filesystem
self._partition_info.filesystem_type = filesystem
elif filesystem == 'ext2':
options = ['-F'] + options
if (handle := SysCommand(f"/usr/bin/mkfs.ext2 {' '.join(options)} {path}")).exit_code != 0:
raise DiskError(f'Could not format {path} with {filesystem} because: {b"".join(handle)}')
self.filesystem = 'ext2'
raise DiskError(f"Could not format {path} with {filesystem} because: {handle.decode('UTF-8')}")
self._partition_info.filesystem_type = 'ext2'
elif filesystem == 'xfs':
options = ['-f'] + options
if (handle := SysCommand(f"/usr/bin/mkfs.xfs {' '.join(options)} {path}")).exit_code != 0:
raise DiskError(f"Could not format {path} with {filesystem} because: {handle.decode('UTF-8')}")
self.filesystem = filesystem
self._partition_info.filesystem_type = filesystem
elif filesystem == 'f2fs':
options = ['-f'] + options
if (handle := SysCommand(f"/usr/bin/mkfs.f2fs {' '.join(options)} {path}")).exit_code != 0:
raise DiskError(f"Could not format {path} with {filesystem} because: {handle.decode('UTF-8')}")
self.filesystem = filesystem
self._partition_info.filesystem_type = filesystem
elif filesystem == 'ntfs3':
options = ['-f'] + options
if (handle := SysCommand(f"/usr/bin/mkfs.ntfs -Q {' '.join(options)} {path}")).exit_code != 0:
raise DiskError(f"Could not format {path} with {filesystem} because: {handle.decode('UTF-8')}")
self.filesystem = filesystem
self._partition_info.filesystem_type = filesystem
elif filesystem == 'crypto_LUKS':
# from ..luks import luks2
# encrypted_partition = luks2(self, None, None)
# encrypted_partition.format(path)
self.filesystem = filesystem
self._partition_info.filesystem_type = filesystem
else:
raise UnknownFilesystemFormat(f"Fileformat '{filesystem}' is not yet implemented.")
@ -485,9 +444,9 @@ class Partition:
return self.format(filesystem, path, log_formatting, options, retry=False)
if get_filesystem_type(path) == 'crypto_LUKS' or get_filesystem_type(self.real_device) == 'crypto_LUKS':
self.encrypted = True
self._encrypted = True
else:
self.encrypted = False
self._encrypted = False
return True
@ -499,18 +458,18 @@ class Partition:
if parent := self.find_parent_of(child, name, parent=data['name']):
return parent
return None
def mount(self, target :str, fs :Optional[str] = None, options :str = '') -> bool:
if not self.mountpoint:
if not self._partition_info.get_first_mountpoint():
log(f'Mounting {self} to {target}', level=logging.INFO)
if not fs:
if not self.filesystem:
raise DiskError(f'Need to format (or define) the filesystem on {self} before mounting.')
fs = self.filesystem
fs = self._partition_info.filesystem_type
fs_type = get_mount_fs_type(fs)
pathlib.Path(target).mkdir(parents=True, exist_ok=True)
Path(target).mkdir(parents=True, exist_ok=True)
if self.bind_name:
device_path = self.device_path
@ -520,7 +479,7 @@ class Partition:
else:
options = f"subvol={self.bind_name}"
else:
device_path = self.path
device_path = self._path
try:
if options:
mnt_handle = SysCommand(f"/usr/bin/mount -t {fs_type} -o {options} {device_path} {target}")
@ -529,7 +488,7 @@ class Partition:
# TODO: Should be redundant to check for exit_code
if mnt_handle.exit_code != 0:
raise DiskError(f"Could not mount {self.path} to {target} using options {options}")
raise DiskError(f"Could not mount {self._path} to {target} using options {options}")
except SysCallError as err:
raise err
@ -538,19 +497,17 @@ class Partition:
return False
def unmount(self) -> bool:
worker = SysCommand(f"/usr/bin/umount {self.path}")
worker = SysCommand(f"/usr/bin/umount {self._path}")
exit_code = worker.exit_code
# Without to much research, it seams that low error codes are errors.
# And above 8k is indicators such as "/dev/x not mounted.".
# So anything in between 0 and 8k are errors (?).
if 0 < worker.exit_code < 8000:
raise SysCallError(f"Could not unmount {self.path} properly: {worker}", exit_code=worker.exit_code)
if exit_code and 0 < exit_code < 8000:
raise SysCallError(f"Could not unmount {self._path} properly: {worker}", exit_code=exit_code)
return True
def umount(self) -> bool:
return self.unmount()
def filesystem_supported(self) -> bool:
"""
The support for a filesystem (this partition) is tested by calling
@ -559,7 +516,7 @@ class Partition:
2. UnknownFilesystemFormat that indicates that we don't support the given filesystem type
"""
try:
self.format(self.filesystem, '/dev/null', log_formatting=False, allow_formatting=True)
self.format(self._partition_info.filesystem_type, '/dev/null', log_formatting=False)
except (SysCallError, DiskError):
pass # We supported it, but /dev/null is not formattable as expected so the mkfs call exited with an error code
except UnknownFilesystemFormat as err:

View File

@ -22,9 +22,9 @@ from .disk.btrfs import BTRFSPartition
class luks2:
def __init__(self,
partition :Partition,
mountpoint :str,
password :str,
partition: Partition,
mountpoint: Optional[str],
password: Optional[str],
key_file :Optional[str] = None,
auto_unmount :bool = False,
*args :str,

View File

@ -140,16 +140,6 @@ def get_default_partition_layout(
return suggest_multi_disk_layout(block_devices, advanced_options=advanced_options)
def select_individual_blockdevice_usage(block_devices: list) -> Dict[str, Any]:
result = {}
for device in block_devices:
layout = manage_new_and_existing_partitions(device)
result[device.path] = layout
return result
def manage_new_and_existing_partitions(block_device: 'BlockDevice') -> Dict[str, Any]: # noqa: max-complexity: 50
block_device_struct = {"partitions": [partition.__dump__() for partition in block_device.partitions.values()]}
original_layout = copy.deepcopy(block_device_struct)