Merge pull request #426 from archlinux/torxed-rework-partitioning

Re-work partitioning logic, introducing more granularity (and stability)
This commit is contained in:
Anton Hvornum 2021-09-06 16:42:31 +02:00 committed by GitHub
commit e39e49ecc7
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
11 changed files with 856 additions and 316 deletions

View File

@ -149,7 +149,7 @@ After this, running archinstall with `python -m archinstall` will run against wh
To test this without a live ISO, the simplest approach is to use a local image and create a loop device.<br> To test this without a live ISO, the simplest approach is to use a local image and create a loop device.<br>
This can be done by installing `pacman -S arch-install-scripts util-linux` locally and doing the following: This can be done by installing `pacman -S arch-install-scripts util-linux` locally and doing the following:
# dd if=/dev/zero of=./testimage.img bs=1G count=5 # truncate -s 20G testimage.img
# losetup -fP ./testimage.img # losetup -fP ./testimage.img
# losetup -a | grep "testimage.img" | awk -F ":" '{print $1}' # losetup -a | grep "testimage.img" | awk -F ":" '{print $1}'
# pip install --upgrade archinstall # pip install --upgrade archinstall

View File

@ -2,7 +2,6 @@ import glob
import pathlib import pathlib
import re import re
import time import time
from collections import OrderedDict
from typing import Optional from typing import Optional
from .general import * from .general import *
@ -14,10 +13,218 @@ GPT = 0b00000001
MBR = 0b00000010 MBR = 0b00000010
# import ctypes def valid_parted_position(pos :str):
# import ctypes.util if not len(pos):
# libc = ctypes.CDLL(ctypes.util.find_library('c'), use_errno=True) return False
# libc.mount.argtypes = (ctypes.c_char_p, ctypes.c_char_p, ctypes.c_char_p, ctypes.c_ulong, ctypes.c_char_p)
if pos.isdigit():
return True
if pos[-1] == '%' and pos[:-1].isdigit():
return True
if pos[-3:].lower() in ['mib', 'kib', 'b', 'tib'] and pos[:-3].replace(".", "", 1).isdigit():
return True
if pos[-2:].lower() in ['kb', 'mb', 'gb', 'tb'] and pos[:-2].replace(".", "", 1).isdigit():
return True
return False
def valid_fs_type(fstype :str) -> bool:
# https://www.gnu.org/software/parted/manual/html_node/mkpart.html
# Above link doesn't agree with `man parted` /mkpart documentation:
"""
fs-type can
be one of "btrfs", "ext2",
"ext3", "ext4", "fat16",
"fat32", "hfs", "hfs+",
"linux-swap", "ntfs", "reis
erfs", "udf", or "xfs".
"""
return fstype.lower() in [
"btrfs",
"ext2",
"ext3", "ext4", # `man parted` allows these
"fat16", "fat32",
"hfs", "hfs+", # "hfsx", not included in `man parted`
"linux-swap",
"ntfs",
"reiserfs",
"udf", # "ufs", not included in `man parted`
"xfs", # `man parted` allows this
]
def sort_block_devices_based_on_performance(block_devices):
result = {device: 0 for device in block_devices}
for device, weight in result.items():
if device.spinning:
weight -= 10
else:
weight += 5
if device.bus_type == 'nvme':
weight += 20
elif device.bus_type == 'sata':
weight += 10
result[device] = weight
return result
def filter_disks_below_size_in_gb(devices, gigabytes):
for disk in devices:
if disk.size >= gigabytes:
yield disk
def select_largest_device(devices, gigabytes, filter_out=None):
if not filter_out:
filter_out = []
copy_devices = [*devices]
for filter_device in filter_out:
if filter_device in copy_devices:
copy_devices.pop(copy_devices.index(filter_device))
copy_devices = list(filter_disks_below_size_in_gb(copy_devices, gigabytes))
if not len(copy_devices):
return None
return max(copy_devices, key=(lambda device : device.size))
def select_disk_larger_than_or_close_to(devices, gigabytes, filter_out=None):
if not filter_out:
filter_out = []
copy_devices = [*devices]
for filter_device in filter_out:
if filter_device in copy_devices:
copy_devices.pop(copy_devices.index(filter_device))
if not len(copy_devices):
return None
return min(copy_devices, key=(lambda device : abs(device.size - gigabytes)))
def suggest_single_disk_layout(block_device):
MIN_SIZE_TO_ALLOW_HOME_PART = 40 # Gb
layout = {
block_device : {
"wipe" : True,
"partitions" : []
}
}
layout[block_device]['partitions'].append({
# Boot
"type" : "primary",
"start" : "1MiB",
"size" : "513MiB",
"boot" : True,
"encrypted" : False,
"format" : True,
"mountpoint" : "/boot",
"filesystem" : {
"format" : "fat32"
}
})
layout[block_device]['partitions'].append({
# Root
"type" : "primary",
"start" : "513MiB",
"encrypted" : False,
"format" : True,
"size" : "100%" if block_device.size < MIN_SIZE_TO_ALLOW_HOME_PART else f"{min(block_device.size, 20)*1024}MiB",
"mountpoint" : "/",
"filesystem" : {
"format" : "btrfs"
}
})
if block_device.size >= MIN_SIZE_TO_ALLOW_HOME_PART:
layout[block_device]['partitions'].append({
# Home
"type" : "primary",
"encrypted" : False,
"format" : True,
"start" : f"{min(block_device.size*0.2, 20)*1024}MiB",
"size" : "100%",
"mountpoint" : "/home",
"filesystem" : {
"format" : "btrfs"
}
})
return layout
def suggest_multi_disk_layout(block_devices):
MIN_SIZE_TO_ALLOW_HOME_PART = 40 # Gb
ARCH_LINUX_INSTALLED_SIZE = 20 # Gb, rough estimate taking in to account user desktops etc. TODO: Catch user packages to detect size?
block_devices = sort_block_devices_based_on_performance(block_devices).keys()
home_device = select_largest_device(block_devices, gigabytes=MIN_SIZE_TO_ALLOW_HOME_PART)
root_device = select_disk_larger_than_or_close_to(block_devices, gigabytes=ARCH_LINUX_INSTALLED_SIZE, filter_out=[home_device])
log(f"Suggesting multi-disk-layout using {len(block_devices)} disks, where {root_device} will be /root and {home_device} will be /home", level=logging.DEBUG)
layout = {
root_device : {
"wipe" : True,
"partitions" : []
},
home_device : {
"wipe" : True,
"partitions" : []
},
}
layout[root_device]['partitions'].append({
# Boot
"type" : "primary",
"start" : "1MiB",
"size" : "513MiB",
"boot" : True,
"encrypted" : False,
"format" : True,
"mountpoint" : "/boot",
"filesystem" : {
"format" : "fat32"
}
})
layout[root_device]['partitions'].append({
# Root
"type" : "primary",
"start" : "513MiB",
"encrypted" : False,
"format" : True,
"size" : "100%",
"mountpoint" : "/",
"filesystem" : {
"format" : "btrfs"
}
})
layout[home_device]['partitions'].append({
# Home
"type" : "primary",
"encrypted" : False,
"format" : True,
"start" : "4MiB",
"size" : "100%",
"mountpoint" : "/home",
"filesystem" : {
"format" : "btrfs"
}
})
return layout
class BlockDevice: class BlockDevice:
@ -30,14 +237,14 @@ class BlockDevice:
self.path = path self.path = path
self.info = info self.info = info
self.keep_partitions = True self.keep_partitions = True
self.part_cache = OrderedDict() self.part_cache = {}
# TODO: Currently disk encryption is a BIT misleading. # TODO: Currently disk encryption is a BIT misleading.
# It's actually partition-encryption, but for future-proofing this # It's actually partition-encryption, but for future-proofing this
# I'm placing the encryption password on a BlockDevice level. # I'm placing the encryption password on a BlockDevice level.
self.encryption_password = None
def __repr__(self, *args, **kwargs): def __repr__(self, *args, **kwargs):
return f"BlockDevice({self.device})" return f"BlockDevice({self.device}, size={self.size}GB, free_space={'+'.join(part[2] for part in self.free_space)}, bus_type={self.bus_type})"
def __iter__(self): def __iter__(self):
for partition in self.partitions: for partition in self.partitions:
@ -48,24 +255,35 @@ class BlockDevice:
raise KeyError(f'{self} does not contain information: "{key}"') raise KeyError(f'{self} does not contain information: "{key}"')
return self.info[key] return self.info[key]
def __len__(self):
return len(self.partitions)
def __lt__(self, left_comparitor):
return self.path < left_comparitor.path
def json(self): def json(self):
""" """
json() has precedence over __dump__, so this is a way json() has precedence over __dump__, so this is a way
to give less/partial information for user readability. to give less/partial information for user readability.
""" """
return { return self.path
'path': self.path,
'size': self.info['size'] if 'size' in self.info else '<unknown>',
'model': self.info['model'] if 'model' in self.info else '<unknown>'
}
def __dump__(self): def __dump__(self):
return { return {
'path': self.path, self.path : {
'info': self.info, 'partuuid' : self.uuid,
'partition_cache': self.part_cache 'wipe' : self.info.get('wipe', None),
'partitions' : [part.__dump__() for part in self.partitions.values()]
}
} }
@property
def partition_type(self):
output = json.loads(SysCommand(f"lsblk --json -o+PTTYPE {self.path}").decode('UTF-8'))
for device in output['blockdevices']:
return device['pttype']
@property @property
def device(self): def device(self):
""" """
@ -78,7 +296,7 @@ class BlockDevice:
raise DiskError(f'Could not locate backplane info for "{self.path}"') raise DiskError(f'Could not locate backplane info for "{self.path}"')
if self.info['type'] == 'loop': if self.info['type'] == 'loop':
for drive in json.loads(b''.join(SysCommand(['losetup', '--json'])).decode('UTF_8'))['loopdevices']: for drive in json.loads(SysCommand(['losetup', '--json']).decode('UTF_8'))['loopdevices']:
if not drive['name'] == self.path: if not drive['name'] == self.path:
continue continue
@ -100,18 +318,17 @@ class BlockDevice:
@property @property
def partitions(self): def partitions(self):
o = b''.join(SysCommand(['partprobe', self.path])) SysCommand(['partprobe', self.path])
# o = b''.join(sys_command('/usr/bin/lsblk -o name -J -b {dev}'.format(dev=dev))) result = SysCommand(['/usr/bin/lsblk', '-J', self.path])
o = b''.join(SysCommand(['/usr/bin/lsblk', '-J', self.path]))
if b'not a block device' in o: if b'not a block device' in result:
raise DiskError(f'Can not read partitions off something that isn\'t a block device: {self.path}') raise DiskError(f'Can not read partitions off something that isn\'t a block device: {self.path}')
if not o[:1] == b'{': if not result[:1] == b'{':
raise DiskError('Error getting JSON output from:', f'/usr/bin/lsblk -J {self.path}') raise DiskError('Error getting JSON output from:', f'/usr/bin/lsblk -J {self.path}')
r = json.loads(o.decode('UTF-8')) r = json.loads(result.decode('UTF-8'))
if len(r['blockdevices']) and 'children' in r['blockdevices'][0]: if len(r['blockdevices']) and 'children' in r['blockdevices'][0]:
root_path = f"/dev/{r['blockdevices'][0]['name']}" root_path = f"/dev/{r['blockdevices'][0]['name']}"
for part in r['blockdevices'][0]['children']: for part in r['blockdevices'][0]['children']:
@ -140,10 +357,65 @@ class BlockDevice:
This is more reliable than relying on /dev/disk/by-partuuid as This is more reliable than relying on /dev/disk/by-partuuid as
it doesn't seam to be able to detect md raid partitions. it doesn't seam to be able to detect md raid partitions.
""" """
lsblk = b''.join(SysCommand(f'lsblk -J -o+UUID {self.path}')) for partition in json.loads(SysCommand(f'lsblk -J -o+UUID {self.path}').decode('UTF-8'))['blockdevices']:
for partition in json.loads(lsblk.decode('UTF-8'))['blockdevices']:
return partition.get('uuid', None) return partition.get('uuid', None)
def convert_size_to_gb(self, size):
units = {
'P' : lambda s : float(s) * 2048,
'T' : lambda s : float(s) * 1024,
'G' : lambda s : float(s),
'M' : lambda s : float(s) / 1024,
'K' : lambda s : float(s) / 2048,
'B' : lambda s : float(s) / 3072,
}
unit = size[-1]
return float(units.get(unit, lambda s : None)(size[:-1]))
@property
def size(self):
output = json.loads(SysCommand(f"lsblk --json -o+SIZE {self.path}").decode('UTF-8'))
for device in output['blockdevices']:
return self.convert_size_to_gb(device['size'])
@property
def bus_type(self):
output = json.loads(SysCommand(f"lsblk --json -o+ROTA,TRAN {self.path}").decode('UTF-8'))
for device in output['blockdevices']:
return device['tran']
@property
def spinning(self):
output = json.loads(SysCommand(f"lsblk --json -o+ROTA,TRAN {self.path}").decode('UTF-8'))
for device in output['blockdevices']:
return device['rota'] is True
@property
def free_space(self):
# NOTE: parted -s will default to `cancel` on prompt, skipping any partition
# that is "outside" the disk. in /dev/sr0 this is usually the case with Archiso,
# so the free will ignore the ESP partition and just give the "free" space.
# Doesn't harm us, but worth noting in case something weird happens.
for line in SysCommand(f"parted -s --machine {self.path} print free"):
if 'free' in (free_space := line.decode('UTF-8')):
_, start, end, size, *_ = free_space.strip('\r\n;').split(':')
yield (start, end, size)
@property
def largest_free_space(self):
info = None
for space_info in self.free_space:
if not info:
info = space_info
else:
# [-1] = size
if space_info[-1] > info[-1]:
info = space_info
return info
def has_partitions(self): def has_partitions(self):
return len(self.partitions) return len(self.partitions)
@ -154,7 +426,12 @@ class BlockDevice:
return False return False
def flush_cache(self): def flush_cache(self):
self.part_cache = OrderedDict() self.part_cache = {}
def get_partition(self, uuid):
for partition in self:
if partition.uuid == uuid:
return partition
class Partition: class Partition:
@ -171,7 +448,7 @@ class Partition:
self.size = size # TODO: Refresh? self.size = size # TODO: Refresh?
self._encrypted = None self._encrypted = None
self.encrypted = encrypted self.encrypted = encrypted
self.allow_formatting = False # A fail-safe for unconfigured partitions, such as windows NTFS partitions. self.allow_formatting = False
if mountpoint: if mountpoint:
self.mount(mountpoint) self.mount(mountpoint)
@ -206,9 +483,80 @@ class Partition:
mount_repr = f", rel_mountpoint={self.target_mountpoint}" mount_repr = f", rel_mountpoint={self.target_mountpoint}"
if self._encrypted: if self._encrypted:
return f'Partition(path={self.path}, size={self.size}, real_device={self.real_device}, fs={self.filesystem}{mount_repr})' return f'Partition(path={self.path}, size={self.size}, PARTUUID={self.uuid}, parent={self.real_device}, fs={self.filesystem}{mount_repr})'
else: else:
return f'Partition(path={self.path}, size={self.size}, fs={self.filesystem}{mount_repr})' return f'Partition(path={self.path}, size={self.size}, PARTUUID={self.uuid}, fs={self.filesystem}{mount_repr})'
def __dump__(self):
return {
'type' : 'primary',
'PARTUUID' : self.uuid,
'wipe' : self.allow_formatting,
'boot' : self.boot,
'ESP' : self.boot,
'mountpoint' : self.target_mountpoint,
'encrypted' : self._encrypted,
'start' : self.start,
'size' : self.end,
'filesystem' : {
'format' : get_filesystem_type(self.path)
}
}
@property
def sector_size(self):
output = json.loads(SysCommand(f"lsblk --json -o+LOG-SEC {self.path}").decode('UTF-8'))
for device in output['blockdevices']:
return device.get('log-sec', None)
@property
def start(self):
output = json.loads(SysCommand(f"sfdisk --json {self.block_device.path}").decode('UTF-8'))
for partition in output.get('partitiontable', {}).get('partitions', []):
if partition['node'] == self.path:
return partition['start']# * self.sector_size
@property
def end(self):
# TODO: Verify that the logic holds up, that 'size' is the size without 'start' added to it.
output = json.loads(SysCommand(f"sfdisk --json {self.block_device.path}").decode('UTF-8'))
for partition in output.get('partitiontable', {}).get('partitions', []):
if partition['node'] == self.path:
return partition['size']# * self.sector_size
@property
def boot(self):
output = json.loads(SysCommand(f"sfdisk --json {self.block_device.path}").decode('UTF-8'))
# Get the bootable flag from the sfdisk output:
# {
# "partitiontable": {
# "label":"dos",
# "id":"0xd202c10a",
# "device":"/dev/loop0",
# "unit":"sectors",
# "sectorsize":512,
# "partitions": [
# {"node":"/dev/loop0p1", "start":2048, "size":10483712, "type":"83", "bootable":true}
# ]
# }
# }
for partition in output.get('partitiontable', {}).get('partitions', []):
if partition['node'] == self.path:
return partition.get('bootable', False)
return False
@property
def partition_type(self):
lsblk = json.loads(SysCommand(f"lsblk --json -o+PTTYPE {self.path}").decode('UTF-8'))
for device in lsblk['blockdevices']:
return device['pttype']
@property @property
def uuid(self) -> Optional[str]: def uuid(self) -> Optional[str]:
@ -217,8 +565,9 @@ class Partition:
This is more reliable than relying on /dev/disk/by-partuuid as This is more reliable than relying on /dev/disk/by-partuuid as
it doesn't seam to be able to detect md raid partitions. it doesn't seam to be able to detect md raid partitions.
""" """
lsblk = b''.join(SysCommand(f'lsblk -J -o+PARTUUID {self.path}'))
for partition in json.loads(lsblk.decode('UTF-8'))['blockdevices']: lsblk = json.loads(SysCommand(f'lsblk -J -o+PARTUUID {self.path}').decode('UTF-8'))
for partition in lsblk['blockdevices']:
return partition.get('partuuid', None) return partition.get('partuuid', None)
return None return None
@ -237,7 +586,7 @@ class Partition:
@property @property
def real_device(self): def real_device(self):
for blockdevice in json.loads(b''.join(SysCommand('lsblk -J')).decode('UTF-8'))['blockdevices']: for blockdevice in json.loads(SysCommand('lsblk -J').decode('UTF-8'))['blockdevices']:
if parent := self.find_parent_of(blockdevice, os.path.basename(self.path)): if parent := self.find_parent_of(blockdevice, os.path.basename(self.path)):
return f"/dev/{parent}" return f"/dev/{parent}"
# raise DiskError(f'Could not find appropriate parent for encrypted partition {self}') # raise DiskError(f'Could not find appropriate parent for encrypted partition {self}')
@ -248,7 +597,7 @@ class Partition:
from .luks import luks2 from .luks import luks2
try: try:
with luks2(self, 'luksloop', password, auto_unmount=True) as unlocked_device: with luks2(self, storage.get('ENC_IDENTIFIER', 'ai')+'loop', password, auto_unmount=True) as unlocked_device:
return unlocked_device.filesystem return unlocked_device.filesystem
except SysCallError: except SysCallError:
return None return None
@ -274,39 +623,16 @@ class Partition:
return True if files > 0 else False return True if files > 0 else False
def safe_to_format(self):
if self.allow_formatting is False:
log(f"Partition {self} is not marked for formatting.", level=logging.DEBUG)
return False
elif self.target_mountpoint == '/boot':
try:
if self.has_content():
log(f"Partition {self} is a boot partition and has content inside.", level=logging.DEBUG)
return False
except SysCallError as err:
log(err.message, logging.DEBUG)
log(f"Partition {self} was identified as /boot but we could not mount to check for content, continuing!", level=logging.DEBUG)
pass
return True
def encrypt(self, *args, **kwargs): def encrypt(self, *args, **kwargs):
""" """
A wrapper function for luks2() instances and the .encrypt() method of that instance. A wrapper function for luks2() instances and the .encrypt() method of that instance.
""" """
from .luks import luks2 from .luks import luks2
if not self._encrypted:
raise DiskError(f"Attempting to encrypt a partition that was not marked for encryption: {self}")
if not self.safe_to_format():
log(f"Partition {self} was marked as protected but encrypt() was called on it!", level=logging.ERROR, fg="red")
return False
handle = luks2(self, None, None) handle = luks2(self, None, None)
return handle.encrypt(self, *args, **kwargs) return handle.encrypt(self, *args, **kwargs)
def format(self, filesystem=None, path=None, allow_formatting=None, log_formatting=True): def format(self, filesystem=None, path=None, log_formatting=True):
""" """
Format can be given an overriding path, for instance /dev/null to test Format can be given an overriding path, for instance /dev/null to test
the formatting functionality and in essence the support for the given filesystem. the formatting functionality and in essence the support for the given filesystem.
@ -316,36 +642,30 @@ class Partition:
if path is None: if path is None:
path = self.path path = self.path
if allow_formatting is None:
allow_formatting = self.allow_formatting
# To avoid "unable to open /dev/x: No such file or directory" # To avoid "unable to open /dev/x: No such file or directory"
start_wait = time.time() start_wait = time.time()
while pathlib.Path(path).exists() is False and time.time() - start_wait < 10: while pathlib.Path(path).exists() is False and time.time() - start_wait < 10:
time.sleep(0.025) time.sleep(0.025)
if not allow_formatting:
raise PermissionError(f"{self} is not formatable either because instance is locked ({self.allow_formatting}) or a blocking flag was given ({allow_formatting})")
if log_formatting: if log_formatting:
log(f'Formatting {path} -> {filesystem}', level=logging.INFO) log(f'Formatting {path} -> {filesystem}', level=logging.INFO)
if filesystem == 'btrfs': if filesystem == 'btrfs':
o = b''.join(SysCommand(f'/usr/bin/mkfs.btrfs -f {path}')) if 'UUID:' not in (mkfs := SysCommand(f'/usr/bin/mkfs.btrfs -f {path}').decode('UTF-8')):
if b'UUID' not in o: raise DiskError(f'Could not format {path} with {filesystem} because: {mkfs}')
raise DiskError(f'Could not format {path} with {filesystem} because: {o}') self.filesystem = filesystem
self.filesystem = 'btrfs'
elif filesystem == 'vfat': elif filesystem == 'fat32':
o = b''.join(SysCommand(f'/usr/bin/mkfs.vfat -F32 {path}')) mkfs = SysCommand(f'/usr/bin/mkfs.vfat -F32 {path}').decode('UTF-8')
if (b'mkfs.fat' not in o and b'mkfs.vfat' not in o) or b'command not found' in o: if ('mkfs.fat' not in mkfs and 'mkfs.vfat' not in mkfs) or 'command not found' in mkfs:
raise DiskError(f'Could not format {path} with {filesystem} because: {o}') raise DiskError(f"Could not format {path} with {filesystem} because: {mkfs}")
self.filesystem = 'vfat' self.filesystem = filesystem
elif filesystem == 'ext4': elif filesystem == 'ext4':
if (handle := SysCommand(f'/usr/bin/mkfs.ext4 -F {path}')).exit_code != 0: if (handle := SysCommand(f'/usr/bin/mkfs.ext4 -F {path}')).exit_code != 0:
raise DiskError(f'Could not format {path} with {filesystem} because: {b"".join(handle)}') raise DiskError(f"Could not format {path} with {filesystem} because: {handle.decode('UTF-8')}")
self.filesystem = 'ext4' self.filesystem = filesystem
elif filesystem == 'ext2': elif filesystem == 'ext2':
if (handle := SysCommand(f'/usr/bin/mkfs.ext2 -F {path}')).exit_code != 0: if (handle := SysCommand(f'/usr/bin/mkfs.ext2 -F {path}')).exit_code != 0:
@ -354,19 +674,19 @@ class Partition:
elif filesystem == 'xfs': elif filesystem == 'xfs':
if (handle := SysCommand(f'/usr/bin/mkfs.xfs -f {path}')).exit_code != 0: if (handle := SysCommand(f'/usr/bin/mkfs.xfs -f {path}')).exit_code != 0:
raise DiskError(f'Could not format {path} with {filesystem} because: {b"".join(handle)}') raise DiskError(f"Could not format {path} with {filesystem} because: {handle.decode('UTF-8')}")
self.filesystem = 'xfs' self.filesystem = filesystem
elif filesystem == 'f2fs': elif filesystem == 'f2fs':
if (handle := SysCommand(f'/usr/bin/mkfs.f2fs -f {path}')).exit_code != 0: if (handle := SysCommand(f'/usr/bin/mkfs.f2fs -f {path}')).exit_code != 0:
raise DiskError(f'Could not format {path} with {filesystem} because: {b"".join(handle)}') raise DiskError(f"Could not format {path} with {filesystem} because: {handle.decode('UTF-8')}")
self.filesystem = 'f2fs' self.filesystem = filesystem
elif filesystem == 'crypto_LUKS': elif filesystem == 'crypto_LUKS':
# from .luks import luks2 # from .luks import luks2
# encrypted_partition = luks2(self, None, None) # encrypted_partition = luks2(self, None, None)
# encrypted_partition.format(path) # encrypted_partition.format(path)
self.filesystem = 'crypto_LUKS' self.filesystem = filesystem
else: else:
raise UnknownFilesystemFormat(f"Fileformat '{filesystem}' is not yet implemented.") raise UnknownFilesystemFormat(f"Fileformat '{filesystem}' is not yet implemented.")
@ -398,9 +718,9 @@ class Partition:
try: try:
if options: if options:
SysCommand(f'/usr/bin/mount -o {options} {self.path} {target}') SysCommand(f"/usr/bin/mount -o {options} {self.path} {target}")
else: else:
SysCommand(f'/usr/bin/mount {self.path} {target}') SysCommand(f"/usr/bin/mount {self.path} {target}")
except SysCallError as err: except SysCallError as err:
raise err raise err
@ -409,7 +729,7 @@ class Partition:
def unmount(self): def unmount(self):
try: try:
exit_code = SysCommand(f'/usr/bin/umount {self.path}').exit_code SysCommand(f"/usr/bin/umount {self.path}")
except SysCallError as err: except SysCallError as err:
exit_code = err.exit_code exit_code = err.exit_code
@ -481,17 +801,89 @@ class Filesystem:
# TODO: https://stackoverflow.com/questions/28157929/how-to-safely-handle-an-exception-inside-a-context-manager # TODO: https://stackoverflow.com/questions/28157929/how-to-safely-handle-an-exception-inside-a-context-manager
if len(args) >= 2 and args[1]: if len(args) >= 2 and args[1]:
raise args[1] raise args[1]
b''.join(SysCommand('sync')) SysCommand('sync')
return True return True
def partuuid_to_index(self, uuid):
output = json.loads(SysCommand(f"lsblk --json -o+PARTUUID {self.blockdevice.device}").decode('UTF-8'))
for device in output['blockdevices']:
for index, partition in enumerate(device['children']):
if partition['partuuid'].lower() == uuid:
return index
def load_layout(self, layout :dict):
from .luks import luks2
# If the layout tells us to wipe the drive, we do so
if layout.get('wipe', False):
if self.mode == GPT:
if not self.parted_mklabel(self.blockdevice.device, "gpt"):
raise KeyError(f"Could not create a GPT label on {self}")
elif self.mode == MBR:
if not self.parted_mklabel(self.blockdevice.device, "msdos"):
raise KeyError(f"Could not create a MSDOS label on {self}")
# We then iterate the partitions in order
for partition in layout.get('partitions', []):
# We don't want to re-add an existing partition (those containing a UUID already)
if partition.get('format', False) and not partition.get('PARTUUID', None):
print("Adding partition....")
partition['device_instance'] = self.add_partition(partition.get('type', 'primary'),
start=partition.get('start', '1MiB'), # TODO: Revisit sane block starts (4MB for memorycards for instance)
end=partition.get('size', '100%'),
partition_format=partition.get('filesystem', {}).get('format', 'btrfs'))
# TODO: device_instance some times become None
# print('Device instance:', partition['device_instance'])
elif (partition_uuid := partition.get('PARTUUID')) and (partition_instance := self.blockdevice.get_partition(uuid=partition_uuid)):
print("Re-using partition_instance:", partition_instance)
partition['device_instance'] = partition_instance
else:
raise ValueError(f"{self}.load_layout() doesn't know how to continue without a new partition definition or a UUID ({partition.get('PARTUUID')}) on the device ({self.blockdevice.get_partition(uuid=partition_uuid)}).")
if partition.get('filesystem', {}).get('format', False):
if partition.get('encrypted', False):
if not partition.get('password'):
if storage['arguments'] == 'silent':
raise ValueError(f"Missing encryption password for {partition['device_instance']}")
else:
from .user_interaction import get_password
partition['password'] = get_password(f"Enter a encryption password for {partition['device_instance']}")
partition['device_instance'].encrypt(password=partition['password'])
with luks2(partition['device_instance'], storage.get('ENC_IDENTIFIER', 'ai')+'loop', partition['password']) as unlocked_device:
if not partition.get('format'):
if storage['arguments'] == 'silent':
raise ValueError(f"Missing fs-type to format on newly created encrypted partition {partition['device_instance']}")
else:
if not partition.get('filesystem'):
partition['filesystem'] = {}
if not partition['filesystem'].get('format', False):
while True:
partition['filesystem']['format'] = input(f"Enter a valid fs-type for newly encrypted partition {partition['filesystem']['format']}: ").strip()
if not partition['filesystem']['format'] or valid_fs_type(partition['filesystem']['format']) is False:
pint("You need to enter a valid fs-type in order to continue. See `man parted` for valid fs-type's.")
continue
break
unlocked_device.format(partition['filesystem']['format'])
elif partition.get('format', False):
partition['device_instance'].format(partition['filesystem']['format'])
if partition.get('boot', False):
self.set(self.partuuid_to_index(partition['device_instance'].uuid), 'boot on')
def find_partition(self, mountpoint): def find_partition(self, mountpoint):
for partition in self.blockdevice: for partition in self.blockdevice:
if partition.target_mountpoint == mountpoint or partition.mountpoint == mountpoint: if partition.target_mountpoint == mountpoint or partition.mountpoint == mountpoint:
return partition return partition
def raw_parted(self, string: str): def raw_parted(self, string: str):
x = SysCommand(f'/usr/bin/parted -s {string}') if (cmd_handle := SysCommand(f'/usr/bin/parted -s {string}')).exit_code != 0:
return x log(f"Parted ended with a bad exit code: {cmd_handle}", level=logging.ERROR, fg="red")
return cmd_handle
def parted(self, string: str): def parted(self, string: str):
""" """
@ -500,74 +892,47 @@ class Filesystem:
:param string: A raw string passed to /usr/bin/parted -s <string> :param string: A raw string passed to /usr/bin/parted -s <string>
:type string: str :type string: str
""" """
return self.raw_parted(string).exit_code return self.raw_parted(string).exit_code == 0
def use_entire_disk(self, root_filesystem_type='ext4'): def use_entire_disk(self, root_filesystem_type='ext4') -> Partition:
log(f"Using and formatting the entire {self.blockdevice}.", level=logging.DEBUG) # TODO: Implement this with declarative profiles instead.
if has_uefi(): raise ValueError("Installation().use_entire_disk() has to be re-worked.")
self.add_partition('primary', start='1MiB', end='513MiB', partition_format='fat32')
self.set_name(0, 'EFI')
self.set(0, 'boot on')
# TODO: Probably redundant because in GPT mode 'esp on' is an alias for "boot on"?
# https://www.gnu.org/software/parted/manual/html_node/set.html
self.set(0, 'esp on')
self.add_partition('primary', start='513MiB', end='100%')
self.blockdevice.partition[0].filesystem = 'vfat'
self.blockdevice.partition[1].filesystem = root_filesystem_type
log(f"Set the root partition {self.blockdevice.partition[1]} to use filesystem {root_filesystem_type}.", level=logging.DEBUG)
self.blockdevice.partition[0].target_mountpoint = '/boot'
self.blockdevice.partition[1].target_mountpoint = '/'
self.blockdevice.partition[0].allow_formatting = True
self.blockdevice.partition[1].allow_formatting = True
else:
if not self.parted_mklabel(self.blockdevice.device, "msdos"):
raise KeyError(f"Could not create a MSDOS label on {self}")
self.add_partition('primary', start='1MiB', end='513MiB', partition_format='ext4')
self.set(0, 'boot on')
self.add_partition('primary', start='513MiB', end='100%')
self.blockdevice.partition[0].filesystem = 'ext4' # TODO: Up for debate weither or not this should be user-supplied: https://github.com/archlinux/archinstall/pull/595/files
self.blockdevice.partition[1].filesystem = root_filesystem_type
log(f"Set the boot partition {self.blockdevice.partition[0]} to use filesystem {'ext4'}.", level=logging.DEBUG)
log(f"Set the root partition {self.blockdevice.partition[1]} to use filesystem {root_filesystem_type}.", level=logging.DEBUG)
self.blockdevice.partition[0].target_mountpoint = '/boot'
self.blockdevice.partition[1].target_mountpoint = '/'
self.blockdevice.partition[0].allow_formatting = True
self.blockdevice.partition[1].allow_formatting = True
def add_partition(self, partition_type, start, end, partition_format=None): def add_partition(self, partition_type, start, end, partition_format=None):
log(f'Adding partition to {self.blockdevice}', level=logging.INFO) log(f'Adding partition to {self.blockdevice}, {start}->{end}', level=logging.INFO)
previous_partition_uuids = {partition.uuid for partition in self.blockdevice.partitions.values()}
previous_partitions = self.blockdevice.partitions
if self.mode == MBR: if self.mode == MBR:
if len(self.blockdevice.partitions) > 3: if len(self.blockdevice.partitions) > 3:
DiskError("Too many partitions on disk, MBR disks can only have 3 primary partitions") DiskError("Too many partitions on disk, MBR disks can only have 3 parimary partitions")
if partition_format:
partitioning = self.parted(f'{self.blockdevice.device} mkpart {partition_type} {partition_format} {start} {end}') == 0
else:
partitioning = self.parted(f'{self.blockdevice.device} mkpart {partition_type} {start} {end}') == 0
if partitioning: if partition_format:
parted_string = f'{self.blockdevice.device} mkpart {partition_type} {partition_format} {start} {end}'
else:
parted_string = f'{self.blockdevice.device} mkpart {partition_type} {start} {end}'
if self.parted(parted_string):
start_wait = time.time() start_wait = time.time()
time.sleep(0.025) # Let the new partition come up in the kernel
if time.time() - start_wait > 20: while previous_partition_uuids == {partition.uuid for partition in self.blockdevice.partitions.values()}:
raise DiskError(f"New partition never showed up after adding new partition on {self} (timeout 10 seconds).") if time.time() - start_wait > 10:
return True raise DiskError(f"New partition never showed up after adding new partition on {self} (timeout 10 seconds).")
time.sleep(0.025)
time.sleep(0.5) # Let the kernel catch up with quick block devices (nvme for instance)
return self.blockdevice.get_partition(uuid=(previous_partition_uuids ^ {partition.uuid for partition in self.blockdevice.partitions.values()}).pop())
def set_name(self, partition: int, name: str): def set_name(self, partition: int, name: str):
return self.parted(f'{self.blockdevice.device} name {partition + 1} "{name}"') == 0 return self.parted(f'{self.blockdevice.device} name {partition + 1} "{name}"') == 0
def set(self, partition: int, string: str): def set(self, partition: int, string: str):
log(f"Setting {string} on (parted) partition index {partition+1}", level=logging.INFO)
return self.parted(f'{self.blockdevice.device} set {partition + 1} {string}') == 0 return self.parted(f'{self.blockdevice.device} set {partition + 1} {string}') == 0
def parted_mklabel(self, device: str, disk_label: str): def parted_mklabel(self, device: str, disk_label: str):
log(f"Creating a new partition labling on {device}", level=logging.INFO, fg="yellow")
# Try to unmount devices before attempting to run mklabel # Try to unmount devices before attempting to run mklabel
try: try:
SysCommand(f'bash -c "umount {device}?"') SysCommand(f'bash -c "umount {device}?"')
@ -597,13 +962,15 @@ def device_state(name, *args, **kwargs):
# lsblk --json -l -n -o path # lsblk --json -l -n -o path
def all_disks(*args, **kwargs): def all_disks(*args, **kwargs):
kwargs.setdefault("partitions", False) kwargs.setdefault("partitions", False)
drives = OrderedDict() drives = {}
# for drive in json.loads(sys_command(f'losetup --json', *args, **lkwargs, hide_from_log=True)).decode('UTF_8')['loopdevices']:
for drive in json.loads(b''.join(SysCommand('lsblk --json -l -n -o path,size,type,mountpoint,label,pkname,model')).decode('UTF_8'))['blockdevices']: lsblk = json.loads(SysCommand('lsblk --json -l -n -o path,size,type,mountpoint,label,pkname,model').decode('UTF_8'))
for drive in lsblk['blockdevices']:
if not kwargs['partitions'] and drive['type'] == 'part': if not kwargs['partitions'] and drive['type'] == 'part':
continue continue
drives[drive['path']] = BlockDevice(drive['path'], drive) drives[drive['path']] = BlockDevice(drive['path'], drive)
return drives return drives
@ -632,12 +999,10 @@ def harddrive(size=None, model=None, fuzzy=False):
def get_mount_info(path) -> dict: def get_mount_info(path) -> dict:
try: try:
output = SysCommand(f'/usr/bin/findmnt --json {path}') output = SysCommand(f'/usr/bin/findmnt --json {path}').decode('UTF-8')
except SysCallError: except SysCallError:
return {} return {}
output = output.decode('UTF-8')
if not output: if not output:
return {} return {}
@ -651,14 +1016,12 @@ def get_mount_info(path) -> dict:
def get_partitions_in_use(mountpoint) -> list: def get_partitions_in_use(mountpoint) -> list:
try: try:
output = SysCommand(f'/usr/bin/findmnt --json -R {mountpoint}') output = SysCommand(f"/usr/bin/findmnt --json -R {mountpoint}").decode('UTF-8')
except SysCallError: except SysCallError:
return [] return []
mounts = [] mounts = []
output = output.decode('UTF-8')
if not output: if not output:
return [] return []
@ -674,16 +1037,26 @@ def get_partitions_in_use(mountpoint) -> list:
def get_filesystem_type(path): def get_filesystem_type(path):
try: try:
handle = SysCommand(f"blkid -o value -s TYPE {path}") return SysCommand(f"blkid -o value -s TYPE {path}").decode('UTF-8').strip()
return b''.join(handle).strip().decode('UTF-8')
except SysCallError: except SysCallError:
return None return None
def disk_layouts(): def disk_layouts():
try: try:
handle = SysCommand("lsblk -f -o+TYPE,SIZE -J") return json.loads(SysCommand("lsblk -f -o+TYPE,SIZE -J").decode('UTF-8'))
return json.loads(b''.join(handle).decode('UTF-8'))
except SysCallError as err: except SysCallError as err:
log(f"Could not return disk layouts: {err}") log(f"Could not return disk layouts: {err}")
return None return None
def encrypted_partitions(blockdevices :dict) -> bool:
for partition in blockdevices.values():
if partition.get('encrypted', False):
yield partition
def find_partition_by_mountpoint(block_devices, relative_mountpoint :str):
for device in block_devices:
for partition in block_devices[device]['partitions']:
if partition.get('mountpoint', None) == relative_mountpoint:
return partition

View File

@ -71,6 +71,8 @@ def locate_binary(name):
raise RequirementError(f"Binary {name} does not exist.") raise RequirementError(f"Binary {name} does not exist.")
def json_dumps(*args, **kwargs):
return json.dumps(*args, **{**kwargs, 'cls': JSON})
class JsonEncoder: class JsonEncoder:
def _encode(obj): def _encode(obj):
@ -110,7 +112,6 @@ class JSON(json.JSONEncoder, json.JSONDecoder):
def encode(self, obj): def encode(self, obj):
return super(JSON, self).encode(self._encode(obj)) return super(JSON, self).encode(self._encode(obj))
class SysCommandWorker: class SysCommandWorker:
def __init__(self, cmd, callbacks=None, peak_output=False, environment_vars=None, logfile=None, working_directory='./'): def __init__(self, cmd, callbacks=None, peak_output=False, environment_vars=None, logfile=None, working_directory='./'):
if not callbacks: if not callbacks:
@ -318,6 +319,15 @@ class SysCommand:
for line in self.session: for line in self.session:
yield line yield line
def __getitem__(self, key):
if type(key) is slice:
start = key.start if key.start else 0
end = key.stop if key.stop else len(self.session._trace_log)
return self.session._trace_log[start:end]
else:
raise ValueError("SysCommand() doesn't have key & value pairs, only slices, SysCommand('ls')[:10] as an example.")
def __repr__(self, *args, **kwargs): def __repr__(self, *args, **kwargs):
return self.session._trace_log.decode('UTF-8') return self.session._trace_log.decode('UTF-8')

View File

@ -57,7 +57,6 @@ class Installer:
self.post_base_install = [] self.post_base_install = []
storage['session'] = self storage['session'] = self
self.partitions = get_partitions_in_use(self.target)
self.MODULES = [] self.MODULES = []
self.BINARIES = [] self.BINARIES = []
@ -108,6 +107,10 @@ class Installer:
self.sync_log_to_install_medium() self.sync_log_to_install_medium()
return False return False
@property
def partitions(self):
return get_partitions_in_use(self.target)
def sync_log_to_install_medium(self): def sync_log_to_install_medium(self):
# Copy over the install log (if there is one) to the install medium if # Copy over the install log (if there is one) to the install medium if
# at least the base has been strapped in, otherwise we won't have a filesystem/structure to copy to. # at least the base has been strapped in, otherwise we won't have a filesystem/structure to copy to.
@ -122,6 +125,23 @@ class Installer:
return True return True
def mount_ordered_layout(self, layouts :dict):
from .luks import luks2
mountpoints = {}
for blockdevice in layouts:
for partition in layouts[blockdevice]['partitions']:
mountpoints[partition['mountpoint']] = partition
for mountpoint in sorted(mountpoints.keys()):
if mountpoints[mountpoint]['encrypted']:
loopdev = storage.get('ENC_IDENTIFIER', 'ai')+'loop'
password = mountpoints[mountpoint]['password']
with luks2(mountpoints[mountpoint]['device_instance'], loopdev, password, auto_unmount=False) as unlocked_device:
unlocked_device.mount(f"{self.target}{mountpoint}")
else:
mountpoints[mountpoint]['device_instance'].mount(f"{self.target}{mountpoint}")
def mount(self, partition, mountpoint, create_mountpoint=True): def mount(self, partition, mountpoint, create_mountpoint=True):
if create_mountpoint and not os.path.isdir(f'{self.target}{mountpoint}'): if create_mountpoint and not os.path.isdir(f'{self.target}{mountpoint}'):
os.makedirs(f'{self.target}{mountpoint}') os.makedirs(f'{self.target}{mountpoint}')
@ -425,6 +445,9 @@ class Installer:
elif partition.mountpoint == self.target: elif partition.mountpoint == self.target:
root_partition = partition root_partition = partition
if boot_partition is None and root_partition is None:
raise ValueError(f"Could not detect root (/) or boot (/boot) in {self.target} based on: {self.partitions}")
self.log(f'Adding bootloader {bootloader} to {boot_partition if boot_partition else root_partition}', level=logging.INFO) self.log(f'Adding bootloader {bootloader} to {boot_partition if boot_partition else root_partition}', level=logging.INFO)
if bootloader == 'systemd-bootctl': if bootloader == 'systemd-bootctl':

View File

@ -18,9 +18,6 @@ class luks2:
self.mapdev = None self.mapdev = None
def __enter__(self): def __enter__(self):
# if self.partition.allow_formatting:
# self.key_file = self.encrypt(self.partition, *self.args, **self.kwargs)
# else:
if not self.key_file: if not self.key_file:
self.key_file = f"/tmp/{os.path.basename(self.partition.path)}.disk_pw" # TODO: Make disk-pw-file randomly unique? self.key_file = f"/tmp/{os.path.basename(self.partition.path)}.disk_pw" # TODO: Make disk-pw-file randomly unique?
@ -42,9 +39,6 @@ class luks2:
return True return True
def encrypt(self, partition, password=None, key_size=512, hash_type='sha512', iter_time=10000, key_file=None): def encrypt(self, partition, password=None, key_size=512, hash_type='sha512', iter_time=10000, key_file=None):
if not self.partition.allow_formatting:
raise DiskError(f'Could not encrypt volume {partition} due to it having a formatting lock.')
log(f'Encrypting {partition} (This might take a while)', level=logging.INFO) log(f'Encrypting {partition} (This might take a while)', level=logging.INFO)
if not key_file: if not key_file:
@ -132,7 +126,6 @@ class luks2:
if os.path.islink(f'/dev/mapper/{mountpoint}'): if os.path.islink(f'/dev/mapper/{mountpoint}'):
self.mapdev = f'/dev/mapper/{mountpoint}' self.mapdev = f'/dev/mapper/{mountpoint}'
unlocked_partition = Partition(self.mapdev, None, encrypted=True, filesystem=get_filesystem_type(self.mapdev), autodetect_filesystem=False) unlocked_partition = Partition(self.mapdev, None, encrypted=True, filesystem=get_filesystem_type(self.mapdev), autodetect_filesystem=False)
unlocked_partition.allow_formatting = self.partition.allow_formatting
return unlocked_partition return unlocked_partition
def close(self, mountpoint=None): def close(self, mountpoint=None):

View File

@ -18,5 +18,6 @@ storage = {
'PROFILE_DB': None, # Used in cases when listing profiles is desired, not mandatory for direct profile grabing. 'PROFILE_DB': None, # Used in cases when listing profiles is desired, not mandatory for direct profile grabing.
'LOG_PATH': '/var/log/archinstall', 'LOG_PATH': '/var/log/archinstall',
'LOG_FILE': 'install.log', 'LOG_FILE': 'install.log',
'MOUNT_POINT': '/mnt', 'MOUNT_POINT': '/mnt/archinstall',
'ENC_IDENTIFIER': 'ainst'
} }

View File

@ -9,6 +9,7 @@ import signal
import sys import sys
import time import time
from .disk import BlockDevice, valid_fs_type, find_partition_by_mountpoint, suggest_single_disk_layout, suggest_multi_disk_layout, valid_parted_position
from .exceptions import * from .exceptions import *
from .general import SysCommand from .general import SysCommand
from .hardware import AVAILABLE_GFX_DRIVERS, has_uefi, has_amd_graphics, has_intel_graphics, has_nvidia_graphics from .hardware import AVAILABLE_GFX_DRIVERS, has_uefi, has_amd_graphics, has_intel_graphics, has_nvidia_graphics
@ -119,7 +120,7 @@ def print_large_list(options, padding=5, margin_bottom=0, separator=': '):
def generic_multi_select(options, text="Select one or more of the options above (leave blank to continue): ", sort=True, default=None, allow_empty=False): def generic_multi_select(options, text="Select one or more of the options above (leave blank to continue): ", sort=True, default=None, allow_empty=False):
# Checking if the options are different from `list` or `dict` or if they are empty # Checking if the options are different from `list` or `dict` or if they are empty
if type(options) not in [list, dict]: if type(options) not in [list, dict, type({}.keys()), type({}.values())]:
log(f" * Generic multi-select doesn't support ({type(options)}) as type of options * ", fg='red') log(f" * Generic multi-select doesn't support ({type(options)}) as type of options * ", fg='red')
log(" * If problem persists, please create an issue on https://github.com/archlinux/archinstall/issues * ", fg='yellow') log(" * If problem persists, please create an issue on https://github.com/archlinux/archinstall/issues * ", fg='yellow')
raise RequirementError("generic_multi_select() requires list or dictionary as options.") raise RequirementError("generic_multi_select() requires list or dictionary as options.")
@ -130,6 +131,8 @@ def generic_multi_select(options, text="Select one or more of the options above
# After passing the checks, function continues to work # After passing the checks, function continues to work
if type(options) == dict: if type(options) == dict:
options = list(options.values()) options = list(options.values())
elif type(options) in (type({}.keys()), type({}.values())):
options = list(options)
if sort: if sort:
options = sorted(options) options = sorted(options)
@ -186,8 +189,23 @@ def generic_multi_select(options, text="Select one or more of the options above
except RequirementError as e: except RequirementError as e:
log(f" * {e} * ", fg='red') log(f" * {e} * ", fg='red')
sys.stdout.write('\n')
sys.stdout.flush()
return selected_options return selected_options
def select_encrypted_partitions(block_devices :dict, password :str) -> dict:
root = find_partition_by_mountpoint(block_devices, '/')
root['encrypted'] = True
root['password'] = password
return block_devices
# TODO: Next version perhaps we can support multiple encrypted partitions
#options = []
#for partition in block_devices.values():
# options.append({key: val for key, val in partition.items() if val})
#print(generic_multi_select(options, f"Choose which partitions to encrypt (leave blank when done): "))
class MiniCurses: class MiniCurses:
def __init__(self, width, height): def __init__(self, width, height):
@ -536,6 +554,221 @@ def generic_select(options, input_text="Select one of the above by index or abso
return selected_option return selected_option
def partition_overlap(partitions :list, start :str, end :str) -> bool:
# TODO: Implement sanity check
return False
def get_default_partition_layout(block_devices):
if len(block_devices) == 1:
return suggest_single_disk_layout(block_devices[0])
else:
return suggest_multi_disk_layout(block_devices)
# TODO: Implement sane generic layout for 2+ drives
def manage_new_and_existing_partitions(block_device :BlockDevice) -> dict:
if has_uefi():
partition_type = 'gpt'
else:
partition_type = 'msdos'
# log(f"Selecting which partitions to re-use on {block_device}...", fg="yellow", level=logging.INFO)
# partitions = generic_multi_select(block_device.partitions.values(), "Select which partitions to re-use (the rest will be left alone): ", sort=True)
# partitions_to_wipe = generic_multi_select(partitions, "Which partitions do you wish to wipe (multiple can be selected): ", sort=True)
# mountpoints = {}
# struct = {
# "partitions" : []
# }
# for partition in partitions:
# mountpoint = input(f"Select a mountpoint (or skip) for {partition}: ").strip()
# part_struct = {}
# if mountpoint:
# part_struct['mountpoint'] = mountpoint
# if mountpoint == '/boot':
# part_struct['boot'] = True
# if has_uefi():
# part_struct['ESP'] = True
# elif mountpoint == '/' and
# if partition.uuid:
# part_struct['PARTUUID'] = partition.uuid
# if partition in partitions_to_wipe:
# part_struct['wipe'] = True
# struct['partitions'].append(part_struct)
# return struct
mountpoints = {}
block_device_struct = {
"partitions" : [partition.__dump__() for partition in block_device.partitions.values()]
}
# Test code: [part.__dump__() for part in block_device.partitions.values()]
# TODO: Squeeze in BTRFS subvolumes here
while True:
modes = [
"Create a new partition",
f"Suggest partition layout for {block_device}",
"Delete a partition" if len(block_device_struct) else "",
"Clear/Delete all partitions" if len(block_device_struct) else "",
"Assign mount-point for a partition" if len(block_device_struct) else "",
"Mark/Unmark a partition to be formatted (wipes data)" if len(block_device_struct) else "",
"Mark/Unmark a partition as encrypted" if len(block_device_struct) else "",
"Mark/Unmark a partition as bootable (automatic for /boot)" if len(block_device_struct) else "",
"Set desired filesystem for a partition" if len(block_device_struct) else "",
]
# Print current partition layout:
if len(block_device_struct["partitions"]):
print('Current partition layout:')
for partition in block_device_struct["partitions"]:
print(partition)
print()
task = generic_select(modes,
input_text=f"Select what to do with {block_device} (leave blank when done): ")
if not task:
break
if task == 'Create a new partition':
if partition_type == 'gpt':
# https://www.gnu.org/software/parted/manual/html_node/mkpart.html
# https://www.gnu.org/software/parted/manual/html_node/mklabel.html
name = input("Enter a desired name for the partition: ").strip()
fstype = input("Enter a desired filesystem type for the partition: ").strip()
start = input(f"Enter the start sector (percentage or block number, default: {block_device.largest_free_space[0]}): ").strip()
if not start.strip():
start = block_device.largest_free_space[0]
end_suggested = block_device.largest_free_space[1]
else:
end_suggested = '100%'
end = input(f"Enter the end sector of the partition (percentage or block number, ex: {end_suggested}): ").strip()
if not end.strip():
end = end_suggested
if valid_parted_position(start) and valid_parted_position(end) and valid_fs_type(fstype):
if partition_overlap(block_device_struct["partitions"], start, end):
log(f"This partition overlaps with other partitions on the drive! Ignoring this partition creation.", fg="red")
continue
block_device_struct["partitions"].append({
"type" : "primary", # Strictly only allowed under MSDOS, but GPT accepts it so it's "safe" to inject
"start" : start,
"size" : end,
"mountpoint" : None,
"wipe" : True,
"filesystem" : {
"format" : fstype
}
})
else:
log(f"Invalid start ({valid_parted_position(start)}), end ({valid_parted_position(end)}) or fstype ({valid_fs_type(fstype)}) for this partition. Ignoring this partition creation.", fg="red")
continue
elif task[:len("Suggest partition layout")] == "Suggest partition layout":
if len(block_device_struct["partitions"]):
if input(f"{block_device} contains queued partitions, this will remove those, are you sure? y/N: ").strip().lower() in ('', 'n'):
continue
block_device_struct["partitions"] = suggest_single_disk_layout(block_device)[block_device]
elif task is None:
return block_device_struct
else:
for index, partition in enumerate(block_device_struct["partitions"]):
print(f"{index}: Start: {partition['start']}, End: {partition['size']} ({partition['filesystem']['format']}{', mounting at: '+partition['mountpoint'] if partition['mountpoint'] else ''})")
if task == "Delete a partition":
if (partition := generic_select(block_device_struct["partitions"], 'Select which partition to delete: ', options_output=False)):
del(block_device_struct["partitions"][block_device_struct["partitions"].index(partition)])
elif task == "Clear/Delete all partitions":
block_device_struct["partitions"] = []
elif task == "Assign mount-point for a partition":
if (partition := generic_select(block_device_struct["partitions"], 'Select which partition to mount where: ', options_output=False)):
print(' * Partition mount-points are relative to inside the installation, the boot would be /boot as an example.')
mountpoint = input('Select where to mount partition (leave blank to remove mountpoint): ').strip()
if len(mountpoint):
block_device_struct["partitions"][block_device_struct["partitions"].index(partition)]['mountpoint'] = mountpoint
if mountpoint == '/boot':
log(f"Marked partition as bootable because mountpoint was set to /boot.", fg="yellow")
block_device_struct["partitions"][block_device_struct["partitions"].index(partition)]['boot'] = True
else:
del(block_device_struct["partitions"][block_device_struct["partitions"].index(partition)]['mountpoint'])
elif task == "Mark/Unmark a partition to be formatted (wipes data)":
if (partition := generic_select(block_device_struct["partitions"], 'Select which partition to mask for formatting: ', options_output=False)):
# If we mark a partition for formatting, but the format is CRYPTO LUKS, there's no point in formatting it really
# without asking the user which inner-filesystem they want to use. Since the flag 'encrypted' = True is already set,
# it's safe to change the filesystem for this partition.
if block_device_struct["partitions"][block_device_struct["partitions"].index(partition)].get('filesystem', {}).get('format', 'crypto_LUKS') == 'crypto_LUKS':
if not block_device_struct["partitions"][block_device_struct["partitions"].index(partition)].get('filesystem', None):
block_device_struct["partitions"][block_device_struct["partitions"].index(partition)]['filesystem'] = {}
while True:
fstype = input("Enter a desired filesystem type for the partition: ").strip()
if not valid_fs_type(fstype):
log(f"Desired filesystem {fstype} is not a valid filesystem.", level=logging.ERROR, fg="red")
continue
break
block_device_struct["partitions"][block_device_struct["partitions"].index(partition)]['filesystem']['format'] = fstype
# Negate the current wipe marking
block_device_struct["partitions"][block_device_struct["partitions"].index(partition)]['format'] = not block_device_struct["partitions"][block_device_struct["partitions"].index(partition)].get('format', False)
elif task == "Mark/Unmark a partition as encrypted":
if (partition := generic_select(block_device_struct["partitions"], 'Select which partition to mark as encrypted: ', options_output=False)):
# Negate the current encryption marking
block_device_struct["partitions"][block_device_struct["partitions"].index(partition)]['encrypted'] = not block_device_struct["partitions"][block_device_struct["partitions"].index(partition)].get('encrypted', False)
elif task == "Mark/Unmark a partition as bootable (automatic for /boot)":
if (partition := generic_select(block_device_struct["partitions"], 'Select which partition to mark as bootable: ', options_output=False)):
block_device_struct["partitions"][block_device_struct["partitions"].index(partition)]['boot'] = not block_device_struct["partitions"][block_device_struct["partitions"].index(partition)].get('boot', False)
elif task == "Set desired filesystem for a partition":
if (partition := generic_select(block_device_struct["partitions"], 'Select which partition to set a filesystem on: ', options_output=False)):
if not block_device_struct["partitions"][block_device_struct["partitions"].index(partition)].get('filesystem', None):
block_device_struct["partitions"][block_device_struct["partitions"].index(partition)]['filesystem'] = {}
while True:
fstype = input("Enter a desired filesystem type for the partition: ").strip()
if not valid_fs_type(fstype):
log(f"Desired filesystem {fstype} is not a valid filesystem.", level=logging.ERROR, fg="red")
continue
break
block_device_struct["partitions"][block_device_struct["partitions"].index(partition)]['filesystem']['format'] = fstype
return block_device_struct
def select_individual_blockdevice_usage(block_devices :list):
result = {}
for device in block_devices:
layout = manage_new_and_existing_partitions(device)
result[device] = layout
return result
def select_disk_layout(block_devices :list):
modes = [
"Wipe all selected drives and use a best-effort default partition layout",
"Select what to do with each individual drive (followed by partition usage)"
]
mode = generic_select(modes, input_text=f"Select what you wish to do with the selected block devices: ")
if mode == 'Wipe all selected drives and use a best-effort default partition layout':
return get_default_partition_layout(block_devices)
else:
return select_individual_blockdevice_usage(block_devices)
def select_disk(dict_o_disks): def select_disk(dict_o_disks):
""" """

File diff suppressed because one or more lines are too long

After

Width:  |  Height:  |  Size: 27 KiB

View File

@ -0,0 +1 @@
<mxfile host="Electron" modified="2021-05-02T19:57:46.193Z" agent="5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) draw.io/14.5.1 Chrome/89.0.4389.82 Electron/12.0.1 Safari/537.36" etag="WWkzNgJUxTiFme1f07FW" version="14.5.1" type="device"><diagram id="C5RBs43oDa-KdzZeNtuy" name="Page-1">7VvZdqM4EP2anHlKDpsxPMZ20sl0kl7S05meNwVkYCIjt5C3+fqRjFgFNnbwksQv3a5CCKG6dWsROdP7o/knAsb+PXYhOtMUd36mD840TVUsi/3HNYtYY5tC4ZHAFYMyxWPwH0zuFNpJ4MKoMJBijGgwLiodHIbQoQUdIATPisOGGBWfOgYelBSPDkCy9ilwqR9rLa2b6W9g4PnJk1XTjq+MQDJYvEnkAxfPcir96kzvE4xp/Gs070PENy/Zl6fbxRO6ezE//fkt+g3+6n3+8fDzPJ7sepNb0lcgMKRbT/3y2bLtaXf4oFrf7ga/rYfB/dW5IV6NLpL9gi7bPiFiQn3s4RCgq0zbI3gSupDPqjIpG3OH8Vgo/4WULgQWwIRipvLpCImr7C3I4m8mKBedRPzFxUQYzAvSQkhDHNI+RpgsV6obpmF3BkwfUYJfYO5K17IuL3V+R4BQTt/T+vZlbzmve80upetxLznSmOggEEWBs5wUECoGKYmcDAtxyHcieoHU8cWAeCf59pWAt8ZqYlyEJ8SBK8bpwnkA8eCq+cwUmsynIR5BtoXsPgIRoMG0uDggnMtLx2UAYj8EhjaAqljkFKCJeNIjRNytNaWHsPMygNPAYXRQRt3MDyh8HIPlDswYExURwy0v4KSyl+t53EypYbj1E9/mo1NHVSS01gFiBYRqQLeZdaeQUDjP7b1soOSqKehG8K0lxFlGXmrCSH6OuAzl9SatpAhNMukDPrFGBUHskwzMhmSgKtVga50NKqEjs8GvCt8/IHbUA2HnbUScCpBp3mx4N50TtftTuRl3H7/9Y4fnmrGS0c6Vi65qCCw0Bp6Y7isOQpobgofDiC2mjMz0qduHLlMC69VoTPl7xCEswKEEXeLj0fOELa23JoAVQhJH1TUYBYjD7AaiKaSBAyrCHECBF3KMMHtCUh3r2COD0GOSmUk/lm7Bkru3H/7UbsP4Z7UQ/6rRXcNiCp9WU7YKhs6ETJdGkTOUgodvQXWbU1OdieE8oDnOZFJKmex3xphcSAhz2yBd4LpqSizxpiDEjFeVjVGZJ8Bq02sNw2y3koprQa5cqIZmv44QxVRGp+QvZskP4rWLu/Ll4f6YtSsza+iQxZg//Tsvn0/VQB1SOnrJvPahywHlSMqBd5nWJ02ztSlXZ13G9XqCaZUXKqGkSlA6VHmwx0j3YXBbPbCG7vbTnVJl8rqf8FY0g5Gm9AZ/yOg7Zfi7zvC1piFtdxm+Kpn9naTwbbt5coCzLjxZDfPhPbm9HGjSrjSb8HmZgvbTig6ELvsXj3nFD/jkI2ZdeupZN3dpQ99jlloJQLul1KJd723dS+tzw2N20srVqHKVuFVh8arucCGRy/K62qbFG7G0ah6VqTuSpftsv0AQ8q7aV5bGBpx6T7nY7olb7xxbLqbKzL1/GtjskOjgNGC1TQO5gxvdFj3Qo2gjVKfvbQX7nab0FWd5pYq98lCwXOQfvjJ4FQS140o6LAk5X5CbrwoUH/Cg5LBdhKHcqP64EQkQR6xU0/cYoAz14AHKOkWjlVSgNv02pemh2WaRh1EoWOQGjHkcinYRd1T5i4G0rRAfaynXtwhGi4jCkQSaU/eg7oxLb+rjO+seaHKFcmzpRFt5gAsiP11Wy0yQ7FrDD4iOJitI1p0/JuAtwNXV6RqHLgGgnA9sdiQkM0Cd3fO8sS8H3ucnqyvxlDPgbcjS7eWd/DMunuCNKvK57W0o2crSnnXTbGIrtwMt1yjZquCXzA70C0/tKN9hY5e21I0js6Xe1inxqbrbnsiblncJERwLkcv13VMwhrH/96q+PPq4Bd3OCKVUwXWMTjNG2VkFpx9dMve+2aN9Uqj+FNJUS3WEYRenqPkUsq16MHnP/LkGgoDwnIN3lUzEdrP3zGTT47/iwtA9F50mZqWKuHYipNazVaMBIRntEBITs78PjXGW/ZWtfvU/</diagram></mxfile>

View File

@ -27,10 +27,9 @@ archinstall.log(f"Graphics devices detected: {archinstall.graphics_devices().key
archinstall.log(f"Disk states before installing: {archinstall.disk_layouts()}", level=logging.DEBUG) archinstall.log(f"Disk states before installing: {archinstall.disk_layouts()}", level=logging.DEBUG)
def load_config(): def load_config():
if archinstall.arguments.get('harddrive', None) is not None: if archinstall.arguments.get('harddrives', None) is not None:
archinstall.arguments['harddrive'] = archinstall.BlockDevice(path=archinstall.arguments['harddrive']['path']) archinstall.arguments['harddrives'] = [archinstall.BlockDevice(BlockDev) for BlockDev in archinstall.arguments['harddrives'].split(',')]
# Temporarily disabling keep_partitions if config file is loaded # Temporarily disabling keep_partitions if config file is loaded
archinstall.arguments['harddrive'].keep_partitions = False
# Temporary workaround to make Desktop Environments work # Temporary workaround to make Desktop Environments work
if archinstall.arguments.get('profile', None) is not None: if archinstall.arguments.get('profile', None) is not None:
if type(archinstall.arguments.get('profile', None)) is dict: if type(archinstall.arguments.get('profile', None)) is dict:
@ -59,18 +58,20 @@ def ask_user_questions():
Not until we're satisfied with what we want to install Not until we're satisfied with what we want to install
will we continue with the actual installation steps. will we continue with the actual installation steps.
""" """
if not archinstall.arguments.get('keyboard-language', None): if not archinstall.arguments.get('keyboard-layout', None):
while True: while True:
try: try:
archinstall.arguments['keyboard-language'] = archinstall.select_language(archinstall.list_keyboard_languages()).strip() archinstall.arguments['keyboard-layout'] = archinstall.select_language(archinstall.list_keyboard_languages()).strip()
break break
except archinstall.RequirementError as err: except archinstall.RequirementError as err:
archinstall.log(err, fg="red") archinstall.log(err, fg="red")
# Before continuing, set the preferred keyboard layout/language in the current terminal. # Before continuing, set the preferred keyboard layout/language in the current terminal.
# This will just help the user with the next following questions. # This will just help the user with the next following questions.
if len(archinstall.arguments['keyboard-language']): if len(archinstall.arguments['keyboard-layout']):
archinstall.set_keyboard_language(archinstall.arguments['keyboard-language']) archinstall.set_keyboard_language(archinstall.arguments['keyboard-layout'])
# Set which region to download packages from during the installation # Set which region to download packages from during the installation
if not archinstall.arguments.get('mirror-region', None): if not archinstall.arguments.get('mirror-region', None):
@ -91,123 +92,44 @@ def ask_user_questions():
if not archinstall.arguments.get('sys-encoding', None): if not archinstall.arguments.get('sys-encoding', None):
archinstall.arguments['sys-encoding'] = 'utf-8' archinstall.arguments['sys-encoding'] = 'utf-8'
# Ask which harddrive/block-device we will install to # Ask which harddrives/block-devices we will install to
if not archinstall.arguments.get('harddrive', None): # and convert them into archinstall.BlockDevice() objects.
archinstall.arguments['harddrive'] = archinstall.select_disk(archinstall.all_disks()) if archinstall.arguments.get('harddrives', None):
if archinstall.arguments['harddrive'] is None: archinstall.arguments['harddrives'] = [archinstall.BlockDevice(BlockDev) for BlockDev in archinstall.arguments['harddrives'].split(',')]
archinstall.arguments['target-mount'] = archinstall.storage.get('MOUNT_POINT', '/mnt') else:
archinstall.arguments['harddrives'] = archinstall.generic_multi_select(archinstall.all_disks(),
text="Select one or more harddrives to use and configure (leave blank to skip this step): ",
allow_empty=True)
# Perform a quick sanity check on the selected harddrive. if archinstall.arguments.get('harddrives', None):
# 1. Check if it has partitions archinstall.storage['disk_layouts'] = archinstall.select_disk_layout(archinstall.arguments['harddrives'])
# 3. Check that we support the current partitions
# 2. If so, ask if we should keep them or wipe everything
if archinstall.arguments['harddrive'] and archinstall.arguments['harddrive'].has_partitions():
archinstall.log(f"{archinstall.arguments['harddrive']} contains the following partitions:", fg='yellow')
# We curate a list pf supported partitions
# and print those that we don't support.
partition_mountpoints = {}
for partition in archinstall.arguments['harddrive']:
try:
if partition.filesystem_supported():
archinstall.log(f" {partition}")
partition_mountpoints[partition] = None
except archinstall.UnknownFilesystemFormat as err:
archinstall.log(f" {partition} (Filesystem not supported)", fg='red')
# We then ask what to do with the partitions.
if (option := archinstall.ask_for_disk_layout()) == 'abort':
archinstall.log("Safely aborting the installation. No changes to the disk or system has been made.")
exit(1)
elif option == 'keep-existing':
archinstall.arguments['harddrive'].keep_partitions = True
archinstall.log(" ** You will now select which partitions to use by selecting mount points (inside the installation). **")
archinstall.log(" ** The root would be a simple / and the boot partition /boot (as all paths are relative inside the installation). **")
mountpoints_set = []
while True:
# Select a partition
# If we provide keys as options, it's better to convert them to list and sort before passing
mountpoints_list = sorted(list(partition_mountpoints.keys()))
partition = archinstall.generic_select(mountpoints_list, "Select a partition by number that you want to set a mount-point for (leave blank when done): ")
if not partition:
if set(mountpoints_set) & {'/', '/boot'} == {'/', '/boot'}:
break
continue
# Select a mount-point
mountpoint = input(f"Enter a mount-point for {partition}: ").strip(' ')
if len(mountpoint):
# Get a valid & supported filesystem for the partition:
while 1:
new_filesystem = input(f"Enter a valid filesystem for {partition} (leave blank for {partition.filesystem}): ").strip(' ')
if len(new_filesystem) <= 0:
if partition.encrypted and partition.filesystem == 'crypto_LUKS':
old_password = archinstall.arguments.get('!encryption-password', None)
if not old_password:
old_password = input(f'Enter the old encryption password for {partition}: ')
if autodetected_filesystem := partition.detect_inner_filesystem(old_password):
new_filesystem = autodetected_filesystem
else:
archinstall.log("Could not auto-detect the filesystem inside the encrypted volume.", fg='red')
archinstall.log("A filesystem must be defined for the unlocked encrypted partition.")
continue
break
# Since the potentially new filesystem is new
# we have to check if we support it. We can do this by formatting /dev/null with the partitions filesystem.
# There's a nice wrapper for this on the partition object itself that supports a path-override during .format()
try:
partition.format(new_filesystem, path='/dev/null', log_formatting=False, allow_formatting=True)
except archinstall.UnknownFilesystemFormat:
archinstall.log(f"Selected filesystem is not supported yet. If you want archinstall to support '{new_filesystem}',")
archinstall.log("please create a issue-ticket suggesting it on github at https://github.com/archlinux/archinstall/issues.")
archinstall.log("Until then, please enter another supported filesystem.")
continue
except archinstall.SysCallError:
pass # Expected exception since mkfs.<format> can not format /dev/null. But that means our .format() function supported it.
break
# When we've selected all three criteria,
# We can safely mark the partition for formatting and where to mount it.
# TODO: allow_formatting might be redundant since target_mountpoint should only be
# set if we actually want to format it anyway.
mountpoints_set.append(mountpoint)
partition.allow_formatting = True
partition.target_mountpoint = mountpoint
# Only overwrite the filesystem definition if we selected one:
if len(new_filesystem):
partition.filesystem = new_filesystem
archinstall.log('Using existing partition table reported above.')
elif option == 'format-all':
if not archinstall.arguments.get('filesystem', None):
archinstall.arguments['filesystem'] = archinstall.ask_for_main_filesystem_format()
archinstall.arguments['harddrive'].keep_partitions = False
elif archinstall.arguments['harddrive']:
# If the drive doesn't have any partitions, safely mark the disk with keep_partitions = False
# and ask the user for a root filesystem.
if not archinstall.arguments.get('filesystem', None):
archinstall.arguments['filesystem'] = archinstall.ask_for_main_filesystem_format()
archinstall.arguments['harddrive'].keep_partitions = False
# Get disk encryption password (or skip if blank) # Get disk encryption password (or skip if blank)
if archinstall.arguments['harddrive'] and archinstall.arguments.get('!encryption-password', None) is None: if archinstall.arguments['harddrives'] and archinstall.arguments.get('!encryption-password', None) is None:
if passwd := archinstall.get_password(prompt='Enter disk encryption password (leave blank for no encryption): '): if (passwd := archinstall.get_password(prompt='Enter disk encryption password (leave blank for no encryption): ')):
archinstall.arguments['!encryption-password'] = passwd archinstall.arguments['!encryption-password'] = passwd
archinstall.arguments['harddrive'].encryption_password = archinstall.arguments['!encryption-password']
archinstall.arguments["bootloader"] = archinstall.ask_for_bootloader() if archinstall.arguments['harddrives'] and archinstall.arguments.get('!encryption-password', None):
# If no partitions was marked as encrypted, but a password was supplied and we have some disks to format..
# Then we need to identify which partitions to encrypt. This will default to / (root).
if len(list(archinstall.encrypted_partitions(archinstall.storage['disk_layouts']))) == 0:
archinstall.storage['disk_layouts'] = archinstall.select_encrypted_partitions(archinstall.storage['disk_layouts'], archinstall.arguments['!encryption-password'])
# Ask which boot-loader to use (will only ask if we're in BIOS (non-efi) mode)
if not archinstall.arguments.get("bootloader", None):
archinstall.arguments["bootloader"] = archinstall.ask_for_bootloader()
# Get the hostname for the machine # Get the hostname for the machine
if not archinstall.arguments.get('hostname', None): if not archinstall.arguments.get('hostname', None):
archinstall.arguments['hostname'] = input('Desired hostname for the installation: ').strip(' ') archinstall.arguments['hostname'] = input('Desired hostname for the installation: ').strip(' ')
# Ask for a root password (optional, but triggers requirement for super-user if skipped) # Ask for a root password (optional, but triggers requirement for super-user if skipped)
if not archinstall.arguments.get('!root-password', None): if not archinstall.arguments.get('!root-password', None):
archinstall.arguments['!root-password'] = archinstall.get_password(prompt='Enter root password (Recommendation: leave blank to leave root disabled): ') archinstall.arguments['!root-password'] = archinstall.get_password(prompt='Enter root password (Recommendation: leave blank to leave root disabled): ')
# Ask for additional users (super-user if root pw was not set) # Ask for additional users (super-user if root pw was not set)
if not archinstall.arguments.get('!root-password', None) and not archinstall.arguments.get('superusers', None): if not archinstall.arguments.get('!root-password', None) and not archinstall.arguments.get('superusers', None):
archinstall.arguments['superusers'] = archinstall.ask_for_superuser_account('Create a required super-user with sudo privileges: ', forced=True) archinstall.arguments['superusers'] = archinstall.ask_for_superuser_account('Create a required super-user with sudo privileges: ', forced=True)
@ -215,10 +137,12 @@ def ask_user_questions():
archinstall.arguments['users'] = users archinstall.arguments['users'] = users
archinstall.arguments['superusers'] = {**archinstall.arguments['superusers'], **superusers} archinstall.arguments['superusers'] = {**archinstall.arguments['superusers'], **superusers}
# Ask for archinstall-specific profiles (such as desktop environments etc) # Ask for archinstall-specific profiles (such as desktop environments etc)
if not archinstall.arguments.get('profile', None): if not archinstall.arguments.get('profile', None):
archinstall.arguments['profile'] = archinstall.select_profile() archinstall.arguments['profile'] = archinstall.select_profile()
# Check the potentially selected profiles preparations to get early checks if some additional questions are needed. # Check the potentially selected profiles preparations to get early checks if some additional questions are needed.
if archinstall.arguments['profile'] and archinstall.arguments['profile'].has_prep_function(): if archinstall.arguments['profile'] and archinstall.arguments['profile'].has_prep_function():
with archinstall.arguments['profile'].load_instructions(namespace=f"{archinstall.arguments['profile'].namespace}.py") as imported: with archinstall.arguments['profile'].load_instructions(namespace=f"{archinstall.arguments['profile'].namespace}.py") as imported:
@ -226,16 +150,19 @@ def ask_user_questions():
archinstall.log(' * Profile\'s preparation requirements was not fulfilled.', fg='red') archinstall.log(' * Profile\'s preparation requirements was not fulfilled.', fg='red')
exit(1) exit(1)
# Ask about audio server selection if one is not already set # Ask about audio server selection if one is not already set
if not archinstall.arguments.get('audio', None): if not archinstall.arguments.get('audio', None):
# The argument to ask_for_audio_selection lets the library know if it's a desktop profile # The argument to ask_for_audio_selection lets the library know if it's a desktop profile
archinstall.arguments['audio'] = archinstall.ask_for_audio_selection(is_desktop_profile(archinstall.arguments['profile'])) archinstall.arguments['audio'] = archinstall.ask_for_audio_selection(is_desktop_profile(archinstall.arguments['profile']))
# Ask for preferred kernel: # Ask for preferred kernel:
if not archinstall.arguments.get("kernels", None): if not archinstall.arguments.get("kernels", None):
kernels = ["linux", "linux-lts", "linux-zen", "linux-hardened"] kernels = ["linux", "linux-lts", "linux-zen", "linux-hardened"]
archinstall.arguments['kernels'] = archinstall.select_kernel(kernels) archinstall.arguments['kernels'] = archinstall.select_kernel(kernels)
# Additional packages (with some light weight error handling for invalid package names) # Additional packages (with some light weight error handling for invalid package names)
print("Only packages such as base, base-devel, linux, linux-firmware, efibootmgr and optional profile packages are installed.") print("Only packages such as base, base-devel, linux, linux-firmware, efibootmgr and optional profile packages are installed.")
print("If you desire a web browser, such as firefox or chromium, you may specify it in the following prompt.") print("If you desire a web browser, such as firefox or chromium, you may specify it in the following prompt.")
@ -272,7 +199,7 @@ def ask_user_questions():
archinstall.log("Hardware time and other post-configuration steps might be required in order for NTP to work. For more information, please check the Arch wiki.", fg="yellow") archinstall.log("Hardware time and other post-configuration steps might be required in order for NTP to work. For more information, please check the Arch wiki.", fg="yellow")
def perform_installation_steps(): def perform_filesystem_operations():
print() print()
print('This is your chosen configuration:') print('This is your chosen configuration:')
archinstall.log("-- Guided template chosen (with below config) --", level=logging.DEBUG) archinstall.log("-- Guided template chosen (with below config) --", level=logging.DEBUG)
@ -280,6 +207,10 @@ def perform_installation_steps():
archinstall.log(user_configuration, level=logging.INFO) archinstall.log(user_configuration, level=logging.INFO)
with open("/var/log/archinstall/user_configuration.json", "w") as config_file: with open("/var/log/archinstall/user_configuration.json", "w") as config_file:
config_file.write(user_configuration) config_file.write(user_configuration)
user_disk_layout = json.dumps(archinstall.storage['disk_layouts'], indent=4, sort_keys=True, cls=archinstall.JSON)
archinstall.log(user_disk_layout, level=logging.INFO)
with open("/var/log/archinstall/user_disk_layout.json", "w") as disk_layout_file:
disk_layout_file.write(user_disk_layout)
print() print()
if archinstall.arguments.get('dry-run'): if archinstall.arguments.get('dry-run'):
@ -293,8 +224,8 @@ def perform_installation_steps():
We mention the drive one last time, and count from 5 to 0. We mention the drive one last time, and count from 5 to 0.
""" """
if archinstall.arguments.get('harddrive', None): if archinstall.arguments.get('harddrives', None):
print(f" ! Formatting {archinstall.arguments['harddrive']} in ", end='') print(f" ! Formatting {archinstall.arguments['harddrives']} in ", end='')
archinstall.do_countdown() archinstall.do_countdown()
""" """
@ -304,43 +235,10 @@ def perform_installation_steps():
mode = archinstall.GPT mode = archinstall.GPT
if has_uefi() is False: if has_uefi() is False:
mode = archinstall.MBR mode = archinstall.MBR
with archinstall.Filesystem(archinstall.arguments['harddrive'], mode) as fs:
# Wipe the entire drive if the disk flag `keep_partitions`is False.
if archinstall.arguments['harddrive'].keep_partitions is False:
fs.use_entire_disk(root_filesystem_type=archinstall.arguments.get('filesystem', 'btrfs'))
# Check if encryption is desired and mark the root partition as encrypted. for drive in archinstall.arguments['harddrives']:
if archinstall.arguments.get('!encryption-password', None): with archinstall.Filesystem(drive, mode) as fs:
root_partition = fs.find_partition('/') fs.load_layout(archinstall.storage['disk_layouts'][drive])
root_partition.encrypted = True
# After the disk is ready, iterate the partitions and check
# which ones are safe to format, and format those.
for partition in archinstall.arguments['harddrive']:
if partition.safe_to_format():
# Partition might be marked as encrypted due to the filesystem type crypt_LUKS
# But we might have omitted the encryption password question to skip encryption.
# In which case partition.encrypted will be true, but passwd will be false.
if partition.encrypted and (passwd := archinstall.arguments.get('!encryption-password', None)):
partition.encrypt(password=passwd)
else:
partition.format()
else:
archinstall.log(f"Did not format {partition} because .safe_to_format() returned False or .allow_formatting was False.", level=logging.DEBUG)
if archinstall.arguments.get('!encryption-password', None):
# First encrypt and unlock, then format the desired partition inside the encrypted part.
# archinstall.luks2() encrypts the partition when entering the with context manager, and
# unlocks the drive so that it can be used as a normal block-device within archinstall.
with archinstall.luks2(fs.find_partition('/'), 'luksloop', archinstall.arguments.get('!encryption-password', None)) as unlocked_device:
unlocked_device.format(fs.find_partition('/').filesystem)
unlocked_device.mount(archinstall.storage.get('MOUNT_POINT', '/mnt'))
else:
fs.find_partition('/').mount(archinstall.storage.get('MOUNT_POINT', '/mnt'))
fs.find_partition('/boot').mount(archinstall.storage.get('MOUNT_POINT', '/mnt') + '/boot')
perform_installation(archinstall.storage.get('MOUNT_POINT', '/mnt'))
def perform_installation(mountpoint): def perform_installation(mountpoint):
@ -349,7 +247,11 @@ def perform_installation(mountpoint):
Only requirement is that the block devices are Only requirement is that the block devices are
formatted and setup prior to entering this function. formatted and setup prior to entering this function.
""" """
with archinstall.Installer(mountpoint, kernels=archinstall.arguments.get('kernels', ['linux'])) as installation: with archinstall.Installer(mountpoint, kernels=archinstall.arguments.get('kernels', 'linux')) as installation:
# Mount all the drives to the desired mountpoint
# This *can* be done outside of the installation, but the installer can deal with it.
installation.mount_ordered_layout(archinstall.storage['disk_layouts'])
# if len(mirrors): # if len(mirrors):
# Certain services might be running that affects the system during installation. # Certain services might be running that affects the system during installation.
# Currently, only one such service is "reflector.service" which updates /etc/pacman.d/mirrorlist # Currently, only one such service is "reflector.service" which updates /etc/pacman.d/mirrorlist
@ -417,7 +319,7 @@ def perform_installation(mountpoint):
# This step must be after profile installs to allow profiles to install language pre-requisits. # This step must be after profile installs to allow profiles to install language pre-requisits.
# After which, this step will set the language both for console and x11 if x11 was installed for instance. # After which, this step will set the language both for console and x11 if x11 was installed for instance.
installation.set_keyboard_language(archinstall.arguments['keyboard-language']) installation.set_keyboard_language(archinstall.arguments['keyboard-layout'])
if archinstall.arguments['profile'] and archinstall.arguments['profile'].has_post_install(): if archinstall.arguments['profile'] and archinstall.arguments['profile'].has_post_install():
with archinstall.arguments['profile'].load_instructions(namespace=f"{archinstall.arguments['profile'].namespace}.py") as imported: with archinstall.arguments['profile'].load_instructions(namespace=f"{archinstall.arguments['profile'].namespace}.py") as imported:
@ -456,4 +358,5 @@ load_config()
if not archinstall.arguments.get('silent'): if not archinstall.arguments.get('silent'):
ask_user_questions() ask_user_questions()
perform_installation_steps() perform_filesystem_operations()
perform_installation(archinstall.storage.get('MOUNT_POINT', '/mnt'))

View File

@ -53,7 +53,7 @@ if archinstall.arguments['harddrive']:
boot = fs.find_partition('/boot') boot = fs.find_partition('/boot')
root = fs.find_partition('/') root = fs.find_partition('/')
boot.format('vfat') boot.format('fat32')
# We encrypt the root partition if we got a password to do so with, # We encrypt the root partition if we got a password to do so with,
# Otherwise we just skip straight to formatting and installation # Otherwise we just skip straight to formatting and installation