* All the changes needed to make btrfs subvolumes work. It boils down to two points;
the handling of the addressing of subvolumes re. physical partitions, and the small changes at the bootloader level
* We added a new script only_hd for testing purposes. It only handles hadrd drive management
* restoring an escape hatch during subvolume processing
* hipercommented manage_btrfs_subvolumes
* Ready to be able to select and process options in subvolume mounting
* Separte nodatacow processing
* Solving a flake8 complain
* Use of bind names @ get_filesystem_type
* compress mount option bypass
* Preparations for encryption handling
* Compatibility to master version re. encrypted btrfs volumes
* Now we can create subvolumes and mountpoints inside an encrypted btrfs partition
* changes for entries file generation with systemd-bootctl
* flake8 corrections plus some comments

Co-authored-by: Anton Hvornum <anton@hvornum.se>
This commit is contained in:
Werner Llácer 2021-12-31 13:47:41 +01:00 committed by GitHub
parent c3e2b99316
commit 7f9b7991e9
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
7 changed files with 485 additions and 42 deletions

View File

@ -6,6 +6,8 @@ from .helpers import get_mount_info
from ..exceptions import DiskError
from ..general import SysCommand
from ..output import log
from .partition import Partition
def mount_subvolume(installation, subvolume_location :Union[pathlib.Path, str], force=False) -> bool:
"""
@ -72,3 +74,105 @@ def create_subvolume(installation, subvolume_location :Union[pathlib.Path, str])
log(f"Creating a subvolume on {target}", level=logging.INFO)
if (cmd := SysCommand(f"btrfs subvolume create {target}")).exit_code != 0:
raise DiskError(f"Could not create a subvolume at {target}: {cmd}")
def manage_btrfs_subvolumes(installation, partition :dict, mountpoints :dict, subvolumes :dict, unlocked_device :dict = None):
""" we do the magic with subvolumes in a centralized place
parameters:
* the installation object
* the partition dictionary entry which represents the physical partition
* mountpoinst, the dictionary which contains all the partititon to be mounted
* subvolumes is the dictionary with the names of the subvolumes and its location
We expect the partition has been mounted as / , and it to be unmounted after the processing
Then we create all the subvolumes inside btrfs as demand
We clone then, both the partition dictionary and the object inside it and adapt it to the subvolume needs
Then we add it them to the mountpoints dictionary to be processed as "normal" partitions
# TODO For encrypted devices we need some special processing prior to it
"""
# We process each of the pairs <subvolume name: mount point | None | mount info dict>
# th mount info dict has an entry for the path of the mountpoint (named 'mountpoint') and 'options' which is a list
# of mount options (or similar used by brtfs)
for name, right_hand in subvolumes.items():
try:
# we normalize the subvolume name (getting rid of slash at the start if exists. In our implemenation has no semantic load - every subvolume is created from the top of the hierarchy- and simplifies its further use
if name.startswith('/'):
name = name[1:]
# renormalize the right hand.
location = None
mount_options = []
# no contents, so it is not to be mounted
if not right_hand:
location = None
# just a string. per backward compatibility the mount point
elif isinstance(right_hand,str):
location = right_hand
# a dict. two elements 'mountpoint' (obvious) and and a mount options list ¿?
elif isinstance(right_hand,dict):
location = right_hand.get('mountpoint',None)
mount_options = right_hand.get('options',[])
# we create the subvolume
create_subvolume(installation,name)
# Make the nodatacow processing now
# It will be the main cause of creation of subvolumes which are not to be mounted
# it is not an options which can be established by subvolume (but for whole file systems), and can be
# set up via a simple attribute change in a directory (if empty). And here the directories are brand new
if 'nodatacow' in mount_options:
if (cmd := SysCommand(f"chattr +C {installation.target}/{name}")).exit_code != 0:
raise DiskError(f"Could not set nodatacow attribute at {installation.target}/{name}: {cmd}")
# entry is deleted so nodatacow doesn't propagate to the mount options
del mount_options[mount_options.index('nodatacow')]
# Make the compress processing now
# it is not an options which can be established by subvolume (but for whole file systems), and can be
# set up via a simple attribute change in a directory (if empty). And here the directories are brand new
# in this way only zstd compression is activaded
# TODO WARNING it is not clear if it should be a standard feature, so it might need to be deactivated
if 'compress' in mount_options:
if (cmd := SysCommand(f"chattr +c {installation.target}/{name}")).exit_code != 0:
raise DiskError(f"Could not set compress attribute at {installation.target}/{name}: {cmd}")
# entry is deleted so nodatacow doesn't propagate to the mount options
del mount_options[mount_options.index('compress')]
# END compress processing.
# we do not mount if THE basic partition will be mounted or if we exclude explicitly this subvolume
if not partition['mountpoint'] and location is not None:
# we begin to create a fake partition entry. First we copy the original -the one that corresponds to
# the primary partition
fake_partition = partition.copy()
# we start to modify entries in the "fake partition" to match the needs of the subvolumes
#
# to avoid any chance of entering in a loop (not expected) we delete the list of subvolumes in the copy
# and reset the encryption parameters
del fake_partition['btrfs']
fake_partition['encrypted'] = False
fake_partition['generate-encryption-key-file'] = False
# Mount destination. As of now the right hand part
fake_partition['mountpoint'] = location
# we load the name in an attribute called subvolume, but i think it is not needed anymore, 'cause the mount logic uses a different path.
fake_partition['subvolume'] = name
# here we add the mount options
fake_partition['options'] = mount_options
# Here comes the most exotic part. The dictionary attribute 'device_instance' contains an instance of Partition. This instance will be queried along the mount process at the installer.
# We instanciate a new object with following attributes coming / adapted from the instance which was in the primary partition entry (the one we are coping - partition['device_instance']
# * path, which will be expanded with the subvolume name to use the bind mount syntax the system uses for naming mounted subvolumes
# * size. When the OS queries all the subvolumes share the same size as the full partititon
# * uuid. All the subvolumes on a partition share the same uuid
if not unlocked_device:
fake_partition['device_instance'] = Partition(f"{partition['device_instance'].path}[/{name}]",partition['device_instance'].size,partition['device_instance'].uuid)
else:
# for subvolumes IN an encrypted partition we make our device instance from unlocked device instead of the raw partition.
# This time we make a copy (we should to the same above TODO) and alter the path by hand
from copy import copy
# KIDS DONT'T DO THIS AT HOME
fake_partition['device_instance'] = copy(unlocked_device)
fake_partition['device_instance'].path = f"{unlocked_device.path}[/{name}]"
# we reset this attribute, which holds where the partition is actually mounted. Remember, the physical partition is mounted at this moment and therefore has the value '/'.
# If i don't reset it, process will abort as "already mounted' .
# TODO It works for this purpose, but the fact that this bevahiour can happed, should make think twice
fake_partition['device_instance'].mountpoint = None
#
# Well, now that this "fake partition" is ready, we add it to the list of the ones which are to be mounted,
# as "normal" ones
mountpoints[fake_partition['mountpoint']] = fake_partition
except Exception as e:
raise e
# if the physical partition has been selected to be mounted, we include it at the list. Remmeber, all the above treatement won't happen except the creation of the subvolume
if partition['mountpoint']:
mountpoints[partition['mountpoint']] = partition

View File

@ -37,10 +37,10 @@ class Filesystem:
for i in range(storage['DISK_RETRY_ATTEMPTS']):
self.partprobe()
time.sleep(5)
# TODO: Convert to blkid (or something similar, but blkid doesn't support traversing to list sub-PARTUUIDs based on blockdevice path?)
output = json.loads(SysCommand(f"lsblk --json -o+PARTUUID {self.blockdevice.device}").decode('UTF-8'))
for device in output['blockdevices']:
for index, partition in enumerate(device['children']):
if (partuuid := partition.get('partuuid', None)) and partuuid.lower() == uuid:
@ -93,8 +93,12 @@ class Filesystem:
storage['arguments']['!encryption-password'] = get_password(f"Enter a encryption password for {partition['device_instance']}")
partition['!password'] = storage['arguments']['!encryption-password']
loopdev = f"{storage.get('ENC_IDENTIFIER', 'ai')}{pathlib.Path(partition['mountpoint']).name}loop"
# to be able to generate an unique name in case the partition will not be mounted
if partition.get('mountpoint',None):
ppath = partition['mountpoint']
else:
ppath = partition['device_instance'].path
loopdev = f"{storage.get('ENC_IDENTIFIER', 'ai')}{pathlib.Path(ppath).name}loop"
partition['device_instance'].encrypt(password=partition['!password'])
@ -207,9 +211,9 @@ class Filesystem:
SysCommand(f'bash -c "umount {device}?"')
except:
pass
self.partprobe()
worked = self.raw_parted(f'{device} mklabel {disk_label}').exit_code == 0
self.partprobe()
return worked

View File

@ -123,9 +123,19 @@ def harddrive(size=None, model=None, fuzzy=False):
return collection[drive]
def split_bind_name(path :Union[pathlib.Path, str]) -> list:
# we check for the bind notation. if exist we'll only use the "true" device path
if '[' in str(path) : # is a bind path (btrfs subvolume path)
device_path, bind_path = str(path).split('[')
bind_path = bind_path[:-1].strip() # remove the ]
else:
device_path = path
bind_path = None
return device_path,bind_path
def get_mount_info(path :Union[pathlib.Path, str], traverse=False, return_real_path=False) -> dict:
for traversal in list(map(str, [str(path)] + list(pathlib.Path(str(path)).parents))):
device_path,bind_path = split_bind_name(path)
for traversal in list(map(str, [str(device_path)] + list(pathlib.Path(str(device_path)).parents))):
try:
log(f"Getting mount information for device path {traversal}", level=logging.INFO)
output = SysCommand(f'/usr/bin/findmnt --json {traversal}').decode('UTF-8')
@ -141,6 +151,10 @@ def get_mount_info(path :Union[pathlib.Path, str], traverse=False, return_real_p
raise DiskError(f"Could not get mount information for device path {path}")
output = json.loads(output)
# for btrfs partitions we redice the filesystem list to the one with the source equals to the parameter
# i.e. the subvolume filesystem we're searching for
if 'filesystems' in output and len(output['filesystems']) > 1 and bind_path is not None:
output['filesystems'] = [entry for entry in output['filesystems'] if entry['source'] == str(path)]
if 'filesystems' in output:
if len(output['filesystems']) > 1:
raise DiskError(f"Path '{path}' contains multiple mountpoints: {output['filesystems']}")
@ -180,8 +194,9 @@ def get_partitions_in_use(mountpoint) -> list:
def get_filesystem_type(path):
device_name, bind_name = split_bind_name(path)
try:
return SysCommand(f"blkid -o value -s TYPE {path}").decode('UTF-8').strip()
return SysCommand(f"blkid -o value -s TYPE {device_name}").decode('UTF-8').strip()
except SysCallError:
return None
@ -217,12 +232,13 @@ def partprobe():
time.sleep(5)
def convert_device_to_uuid(path :str) -> str:
device_name, bind_name = split_bind_name(path)
for i in range(storage['DISK_RETRY_ATTEMPTS']):
partprobe()
# TODO: Convert lsblk to blkid
# (lsblk supports BlockDev and Partition UUID grabbing, blkid requires you to pick PTUUID and PARTUUID)
output = json.loads(SysCommand(f"lsblk --json -o+UUID {path}").decode('UTF-8'))
output = json.loads(SysCommand(f"lsblk --json -o+UUID {device_name}").decode('UTF-8'))
for device in output['blockdevices']:
if (dev_uuid := device.get('uuid', None)):

View File

@ -7,7 +7,7 @@ import os
import hashlib
from typing import Optional
from .blockdevice import BlockDevice
from .helpers import get_mount_info, get_filesystem_type, convert_size_to_gb
from .helpers import get_mount_info, get_filesystem_type, convert_size_to_gb, split_bind_name
from ..storage import storage
from ..exceptions import DiskError, SysCallError, UnknownFilesystemFormat
from ..output import log
@ -87,7 +87,7 @@ class Partition:
@property
def sector_size(self):
output = json.loads(SysCommand(f"lsblk --json -o+LOG-SEC {self.path}").decode('UTF-8'))
output = json.loads(SysCommand(f"lsblk --json -o+LOG-SEC {self.device_path}").decode('UTF-8'))
for device in output['blockdevices']:
return device.get('log-sec', None)
@ -114,7 +114,7 @@ class Partition:
for i in range(storage['DISK_RETRY_ATTEMPTS']):
self.partprobe()
if (handle := SysCommand(f"lsblk --json -b -o+SIZE {self.path}")).exit_code == 0:
if (handle := SysCommand(f"lsblk --json -b -o+SIZE {self.device_path}")).exit_code == 0:
lsblk = json.loads(handle.decode('UTF-8'))
for device in lsblk['blockdevices']:
@ -144,7 +144,7 @@ class Partition:
@property
def partition_type(self):
lsblk = json.loads(SysCommand(f"lsblk --json -o+PTTYPE {self.path}").decode('UTF-8'))
lsblk = json.loads(SysCommand(f"lsblk --json -o+PTTYPE {self.device_path}").decode('UTF-8'))
for device in lsblk['blockdevices']:
return device['pttype']
@ -155,6 +155,7 @@ class Partition:
Returns the PARTUUID as returned by lsblk.
This is more reliable than relying on /dev/disk/by-partuuid as
it doesn't seam to be able to detect md raid partitions.
For bind mounts all the subvolumes share the same uuid
"""
for i in range(storage['DISK_RETRY_ATTEMPTS']):
self.partprobe()
@ -175,8 +176,7 @@ class Partition:
For instance when you want to get a __repr__ of the class.
"""
self.partprobe()
return SysCommand(f'blkid -s PARTUUID -o value {self.path}').decode('UTF-8').strip()
return SysCommand(f'blkid -s PARTUUID -o value {self.device_path}').decode('UTF-8').strip()
@property
def encrypted(self):
@ -184,7 +184,6 @@ class Partition:
@encrypted.setter
def encrypted(self, value: bool):
self._encrypted = value
@property
@ -194,11 +193,26 @@ class Partition:
@property
def real_device(self):
for blockdevice in json.loads(SysCommand('lsblk -J').decode('UTF-8'))['blockdevices']:
if parent := self.find_parent_of(blockdevice, os.path.basename(self.path)):
if parent := self.find_parent_of(blockdevice, os.path.basename(self.device_path)):
return f"/dev/{parent}"
# raise DiskError(f'Could not find appropriate parent for encrypted partition {self}')
return self.path
@property
def device_path(self):
""" for bind mounts returns the phisical path of the partition
"""
device_path, bind_name = split_bind_name(self.path)
return device_path
@property
def bind_name(self):
""" for bind mounts returns the bind name (subvolume path).
Returns none if this property does not exist
"""
device_path, bind_name = split_bind_name(self.path)
return bind_name
def partprobe(self):
SysCommand(f'bash -c "partprobe"')
time.sleep(1)
@ -348,11 +362,20 @@ class Partition:
pathlib.Path(target).mkdir(parents=True, exist_ok=True)
if self.bind_name:
device_path = self.device_path
# TODO options should be better be a list than a string
if options:
options = f"{options},subvol={self.bind_name}"
else:
options = f"subvol={self.bind_name}"
else:
device_path = self.path
try:
if options:
mnt_handle = SysCommand(f"/usr/bin/mount -t {fs_type} -o {options} {self.path} {target}")
mnt_handle = SysCommand(f"/usr/bin/mount -t {fs_type} -o {options} {device_path} {target}")
else:
mnt_handle = SysCommand(f"/usr/bin/mount -t {fs_type} {self.path} {target}")
mnt_handle = SysCommand(f"/usr/bin/mount -t {fs_type} {device_path} {target}")
# TODO: Should be redundant to check for exit_code
if mnt_handle.exit_code != 0:
@ -401,5 +424,5 @@ def get_mount_fs_type(fs):
if fs == 'ntfs':
return 'ntfs3' # Needed to use the Paragon R/W NTFS driver
elif fs == 'fat32':
return 'vfat' # This is the actual type used for fat32 mounting.
return 'vfat' # This is the actual type used for fat32 mounting
return fs

View File

@ -11,14 +11,14 @@ from .disk import get_partitions_in_use, Partition
from .general import SysCommand, generate_password
from .hardware import has_uefi, is_vm, cpu_vendor
from .locale_helpers import verify_keyboard_layout, verify_x11_keyboard_layout
from .disk.helpers import get_mount_info
from .disk.helpers import get_mount_info, split_bind_name
from .mirrors import use_mirrors
from .plugins import plugins
from .storage import storage
# from .user_interaction import *
from .output import log
from .profiles import Profile
from .disk.btrfs import create_subvolume, mount_subvolume
from .disk.btrfs import manage_btrfs_subvolumes
from .disk.partition import get_mount_fs_type
from .exceptions import DiskError, ServiceException, RequirementError, HardwareIncompatibilityError
@ -184,12 +184,38 @@ class Installer:
mountpoints = {}
for blockdevice in layouts:
for partition in layouts[blockdevice]['partitions']:
mountpoints[partition['mountpoint']] = partition
if (subvolumes := partition.get('btrfs', {}).get('subvolumes', {})):
if partition.get('encrypted',False):
if partition.get('mountpoint',None):
ppath = partition['mountpoint']
else:
ppath = partition['device_instance'].path
loopdev = f"{storage.get('ENC_IDENTIFIER', 'ai')}{pathlib.Path(ppath).name}loop"
# Immediately unlock the encrypted device to format the inner volume
with luks2(partition['device_instance'], loopdev, partition['!password'], auto_unmount=False) as unlocked_device:
unlocked_device.mount(f"{self.target}/")
try:
manage_btrfs_subvolumes(self,partition,mountpoints,subvolumes,unlocked_device)
except Exception as e:
# every exception unmounts the physical volume. Otherwise we let the system in an unstable state
unlocked_device.unmount()
raise e
unlocked_device.unmount()
# TODO generate key
else:
self.mount(partition['device_instance'],"/")
try:
manage_btrfs_subvolumes(self,partition,mountpoints,subvolumes)
except Exception as e:
# every exception unmounts the physical volume. Otherwise we let the system in an unstable state
partition['device_instance'].unmount()
raise e
partition['device_instance'].unmount()
else:
mountpoints[partition['mountpoint']] = partition
for mountpoint in sorted([mnt_dest for mnt_dest in mountpoints.keys() if mnt_dest != None]):
partition = mountpoints[mountpoint]
if partition.get('encrypted', False):
if partition.get('encrypted', False) and not partition.get('subvolume',None):
loopdev = f"{storage.get('ENC_IDENTIFIER', 'ai')}{pathlib.Path(partition['mountpoint']).name}loop"
if not (password := partition.get('!password', None)):
raise RequirementError(f"Missing mountpoint {mountpoint} encryption password in layout: {partition}")
@ -215,19 +241,17 @@ class Installer:
else:
log(f"Mounting {mountpoint} to {self.target}{mountpoint} using {partition['device_instance']}", level=logging.INFO)
partition['device_instance'].mount(f"{self.target}{mountpoint}")
if partition.get('options',[]):
mount_options = ','.join(partition['options'])
partition['device_instance'].mount(f"{self.target}{mountpoint}",options=mount_options)
else:
partition['device_instance'].mount(f"{self.target}{mountpoint}")
time.sleep(1)
try:
get_mount_info(f"{self.target}{mountpoint}", traverse=False)
except DiskError:
raise DiskError(f"Target {self.target}{mountpoint} never got mounted properly (unable to get mount information using findmnt).")
if (subvolumes := partition.get('btrfs', {}).get('subvolumes', {})):
for name, location in subvolumes.items():
create_subvolume(self, location)
mount_subvolume(self, location)
def mount(self, partition, mountpoint, create_mountpoint=True):
if create_mountpoint and not os.path.isdir(f'{self.target}{mountpoint}'):
os.makedirs(f'{self.target}{mountpoint}')
@ -468,11 +492,14 @@ class Installer:
for partition in self.partitions:
if partition.filesystem == 'btrfs':
# if partition.encrypted:
self.base_packages.append('btrfs-progs')
if 'btrfs-progs' not in self.base_packages:
self.base_packages.append('btrfs-progs')
if partition.filesystem == 'xfs':
self.base_packages.append('xfsprogs')
if 'xfs' not in self.base_packages:
self.base_packages.append('xfsprogs')
if partition.filesystem == 'f2fs':
self.base_packages.append('f2fs-tools')
if 'f2fs' not in self.base_packages:
self.base_packages.append('f2fs-tools')
# Configure mkinitcpio to handle some specific use cases.
if partition.filesystem == 'btrfs':
@ -480,7 +507,6 @@ class Installer:
self.MODULES.append('btrfs')
if '/usr/bin/btrfs' not in self.BINARIES:
self.BINARIES.append('/usr/bin/btrfs')
# There is not yet an fsck tool for NTFS. If it's being used for the root filesystem, the hook should be removed.
if partition.filesystem == 'ntfs3' and partition.mountpoint == self.target:
if 'fsck' in self.HOOKS:
@ -634,15 +660,21 @@ class Installer:
entry.write(f"initrd /initramfs-{kernel}.img\n")
# blkid doesn't trigger on loopback devices really well,
# so we'll use the old manual method until we get that sorted out.
if root_fs_type is not None:
options_entry = f'rw intel_pstate=no_hwp rootfstype={root_fs_type} {" ".join(self.KERNEL_PARAMS)}\n'
else:
options_entry = f'rw intel_pstate=no_hwp {" ".join(self.KERNEL_PARAMS)}\n'
base_path,bind_path = split_bind_name(str(root_partition.path))
if bind_path is not None: # and root_fs_type == 'btrfs':
options_entry = f"rootflags=subvol={bind_path} " + options_entry
if real_device := self.detect_encryption(root_partition):
# TODO: We need to detect if the encrypted device is a whole disk encryption,
# or simply a partition encryption. Right now we assume it's a partition (and we always have)
log(f"Identifying root partition by PART-UUID on {real_device}: '{real_device.uuid}'.", level=logging.DEBUG)
entry.write(f'options cryptdevice=PARTUUID={real_device.uuid}:luksdev root=/dev/mapper/luksdev rw intel_pstate=no_hwp rootfstype={root_fs_type} {" ".join(self.KERNEL_PARAMS)}\n')
entry.write(f'options cryptdevice=PARTUUID={real_device.uuid}:luksdev root=/dev/mapper/luksdev {options_entry}')
else:
log(f"Identifying root partition by PART-UUID on {root_partition}, looking for '{root_partition.uuid}'.", level=logging.DEBUG)
entry.write(f'options root=PARTUUID={root_partition.uuid} rw intel_pstate=no_hwp rootfstype={root_fs_type} {" ".join(self.KERNEL_PARAMS)}\n')
entry.write(f'options root=PARTUUID={root_partition.uuid} {options_entry}')
self.helper_flags['bootloader'] = bootloader

View File

@ -172,4 +172,4 @@ class luks2:
def crypttab(self, installation, key_path :str, options=["luks", "key-slot=1"]):
log(f'Adding a crypttab entry for key {key_path} in {installation}', level=logging.INFO)
with open(f"{installation.target}/etc/crypttab", "a") as crypttab:
crypttab.write(f"{self.mountpoint} UUID={convert_device_to_uuid(self.partition.path)} {key_path} {','.join(options)}\n")
crypttab.write(f"{self.mountpoint} UUID={convert_device_to_uuid(self.partition.path)} {key_path} {','.join(options)}\n")

264
examples/only_hd.py Normal file
View File

@ -0,0 +1,264 @@
import json
import logging
import os
import pathlib
import archinstall
import glob
def load_mirror():
if archinstall.arguments.get('mirror-region', None) is not None:
if type(archinstall.arguments.get('mirror-region', None)) is dict:
archinstall.arguments['mirror-region'] = archinstall.arguments.get('mirror-region', None)
else:
selected_region = archinstall.arguments.get('mirror-region', None)
archinstall.arguments['mirror-region'] = {selected_region: archinstall.list_mirrors()[selected_region]}
def load_localization():
if archinstall.arguments.get('sys-language', None) is not None:
archinstall.arguments['sys-language'] = archinstall.arguments.get('sys-language', 'en_US')
if archinstall.arguments.get('sys-encoding', None) is not None:
archinstall.arguments['sys-encoding'] = archinstall.arguments.get('sys-encoding', 'utf-8')
def load_harddrives():
if archinstall.arguments.get('harddrives', None) is not None:
if type(archinstall.arguments['harddrives']) is str:
archinstall.arguments['harddrives'] = archinstall.arguments['harddrives'].split(',')
archinstall.arguments['harddrives'] = [archinstall.BlockDevice(BlockDev) for BlockDev in archinstall.arguments['harddrives']]
# Temporarily disabling keep_partitions if config file is loaded
def load_disk_layouts():
if archinstall.arguments.get('disk_layouts', None) is not None:
dl_path = pathlib.Path(archinstall.arguments['disk_layouts'])
if dl_path.exists(): # and str(dl_path).endswith('.json'):
try:
with open(dl_path) as fh:
archinstall.storage['disk_layouts'] = json.load(fh)
except Exception as e:
raise ValueError(f"--disk_layouts does not contain a valid JSON format: {e}")
else:
try:
archinstall.storage['disk_layouts'] = json.loads(archinstall.arguments['disk_layouts'])
except:
raise ValueError("--disk_layouts=<json> needs either a JSON file or a JSON string given with a valid disk layout.")
def ask_harddrives():
# Ask which harddrives/block-devices we will install to
# and convert them into archinstall.BlockDevice() objects.
if archinstall.arguments.get('harddrives', None) is None:
archinstall.arguments['harddrives'] = archinstall.generic_multi_select(archinstall.all_disks(),
text="Select one or more harddrives to use and configure (leave blank to skip this step): ",
allow_empty=True)
if not archinstall.arguments['harddrives']:
archinstall.log("You decided to skip harddrive selection",fg="red",level=logging.INFO)
archinstall.log(f"and will use whatever drive-setup is mounted at {archinstall.storage['MOUNT_POINT']} (experimental)",fg="red",level=logging.INFO)
archinstall.log("WARNING: Archinstall won't check the suitability of this setup",fg="red",level=logging.INFO)
if input("Do you wish to continue ? [Y/n]").strip().lower() == 'n':
exit(1)
else:
if archinstall.storage.get('disk_layouts', None) is None:
archinstall.storage['disk_layouts'] = archinstall.select_disk_layout(archinstall.arguments['harddrives'], archinstall.arguments.get('advanced', False))
# Get disk encryption password (or skip if blank)
if archinstall.arguments.get('!encryption-password', None) is None:
if passwd := archinstall.get_password(prompt='Enter disk encryption password (leave blank for no encryption): '):
archinstall.arguments['!encryption-password'] = passwd
if archinstall.arguments.get('!encryption-password', None):
# If no partitions was marked as encrypted, but a password was supplied and we have some disks to format..
# Then we need to identify which partitions to encrypt. This will default to / (root).
if len(list(archinstall.encrypted_partitions(archinstall.storage['disk_layouts']))) == 0:
archinstall.storage['disk_layouts'] = archinstall.select_encrypted_partitions(archinstall.storage['disk_layouts'], archinstall.arguments['!encryption-password'])
# Ask which boot-loader to use (will only ask if we're in BIOS (non-efi) mode)
if not archinstall.arguments.get("bootloader", None):
archinstall.arguments["bootloader"] = archinstall.ask_for_bootloader(archinstall.arguments.get('advanced', False))
if not archinstall.arguments.get('swap', None):
archinstall.arguments['swap'] = archinstall.ask_for_swap()
def load_profiles():
if archinstall.arguments.get('profile', None) is not None:
if type(archinstall.arguments.get('profile', None)) is dict:
archinstall.arguments['profile'] = archinstall.Profile(None, archinstall.arguments.get('profile', None)['path'])
else:
archinstall.arguments['profile'] = archinstall.Profile(None, archinstall.arguments.get('profile', None))
def load_desktop_profiles():
# Temporary workaround to make Desktop Environments work
archinstall.storage['_desktop_profile'] = archinstall.arguments.get('desktop-environment', None)
def load_gfxdriver():
if archinstall.arguments.get('gfx_driver', None) is not None:
archinstall.storage['gfx_driver_packages'] = archinstall.AVAILABLE_GFX_DRIVERS.get(archinstall.arguments.get('gfx_driver', None), None)
def load_servers():
if archinstall.arguments.get('servers', None) is not None:
archinstall.storage['_selected_servers'] = archinstall.arguments.get('servers', None)
def load_config():
load_harddrives()
load_profiles()
load_desktop_profiles()
load_mirror()
load_localization()
load_gfxdriver()
load_servers()
load_disk_layouts()
def ask_user_questions():
"""
First, we'll ask the user for a bunch of user input.
Not until we're satisfied with what we want to install
will we continue with the actual installation steps.
"""
ask_harddrives()
def write_config_files():
print()
print('This is your chosen configuration:')
archinstall.log("-- Guided template chosen (with below config) --", level=logging.DEBUG)
user_configuration = json.dumps(archinstall.arguments, indent=4, sort_keys=True, cls=archinstall.JSON)
archinstall.log(user_configuration, level=logging.INFO)
with open("/var/log/archinstall/user_configuration.json", "w") as config_file:
config_file.write(user_configuration)
if archinstall.storage.get('disk_layouts'):
user_disk_layout = json.dumps(archinstall.storage['disk_layouts'], indent=4, sort_keys=True, cls=archinstall.JSON)
archinstall.log(user_disk_layout, level=logging.INFO)
with open("/var/log/archinstall/user_disk_layout.json", "w") as disk_layout_file:
disk_layout_file.write(user_disk_layout)
print()
if archinstall.arguments.get('dry-run'):
exit(0)
# it is here so a dry run execution will not save the credentials file ¿?
user_credentials = {}
if archinstall.arguments.get('!users'):
user_credentials["!users"] = archinstall.arguments['!users']
if archinstall.arguments.get('!superusers'):
user_credentials["!superusers"] = archinstall.arguments['!superusers']
if archinstall.arguments.get('!encryption-password'):
user_credentials["!encryption-password"] = archinstall.arguments['!encryption-password']
with open("/var/log/archinstall/user_credentials.json", "w") as config_file:
config_file.write(json.dumps(user_credentials, indent=4, sort_keys=True, cls=archinstall.UNSAFE_JSON))
def perform_disk_operations():
"""
Issue a final warning before we continue with something un-revertable.
We mention the drive one last time, and count from 5 to 0.
"""
if archinstall.arguments.get('harddrives', None):
print(f" ! Formatting {archinstall.arguments['harddrives']} in ", end='')
archinstall.do_countdown()
"""
Setup the blockdevice, filesystem (and optionally encryption).
Once that's done, we'll hand over to perform_installation()
"""
mode = archinstall.GPT
if archinstall.has_uefi() is False:
mode = archinstall.MBR
for drive in archinstall.arguments.get('harddrives', []):
if dl_disk := archinstall.storage.get('disk_layouts', {}).get(drive.path):
with archinstall.Filesystem(drive, mode) as fs:
fs.load_layout(dl_disk)
def create_subvolume(installation_mountpoint, subvolume_location):
"""
This function uses btrfs to create a subvolume.
@installation: archinstall.Installer instance
@subvolume_location: a localized string or path inside the installation / or /boot for instance without specifying /mnt/boot
"""
if type(installation_mountpoint) == str:
installation_mountpoint_path = pathlib.Path(installation_mountpoint)
else:
installation_mountpoint_path = installation_mountpoint
# Set up the required physical structure
if type(subvolume_location) == str:
subvolume_location = pathlib.Path(subvolume_location)
target = installation_mountpoint_path / subvolume_location.relative_to(subvolume_location.anchor)
# Difference from mount_subvolume:
# We only check if the parent exists, since we'll run in to "target path already exists" otherwise
if not target.parent.exists():
target.parent.mkdir(parents=True)
if glob.glob(str(target / '*')):
raise archinstall.DiskError(f"Cannot create subvolume at {target} because it contains data (non-empty folder target)")
# Remove the target if it exists. It is nor incompatible to the previous
if target.exists():
target.rmdir()
archinstall.log(f"Creating a subvolume on {target}", level=logging.INFO)
if (cmd := archinstall.SysCommand(f"btrfs subvolume create {target}")).exit_code != 0:
raise archinstall.DiskError(f"Could not create a subvolume at {target}: {cmd}")
def perform_installation(mountpoint):
"""
Performs the installation steps on a block device.
Only requirement is that the block devices are
formatted and setup prior to entering this function.
"""
with archinstall.Installer(mountpoint, kernels=archinstall.arguments.get('kernels', 'linux')) as installation:
# Mount all the drives to the desired mountpoint
# This *can* be done outside of the installation, but the installer can deal with it.
if archinstall.storage.get('disk_layouts'):
installation.mount_ordered_layout(archinstall.storage['disk_layouts'])
# Placing /boot check during installation because this will catch both re-use and wipe scenarios.
for partition in installation.partitions:
if partition.mountpoint == installation.target + '/boot':
if partition.size <= 0.25: # in GB
raise archinstall.DiskError(f"The selected /boot partition in use is not large enough to properly install a boot loader. Please resize it to at least 256MB and re-run the installation.")
# For support reasons, we'll log the disk layout post installation (crash or no crash)
archinstall.log(f"Disk states after installing: {archinstall.disk_layouts()}", level=logging.DEBUG)
def log_execution_environment():
# Log various information about hardware before starting the installation. This might assist in troubleshooting
archinstall.log(f"Hardware model detected: {archinstall.sys_vendor()} {archinstall.product_name()}; UEFI mode: {archinstall.has_uefi()}", level=logging.DEBUG)
archinstall.log(f"Processor model detected: {archinstall.cpu_model()}", level=logging.DEBUG)
archinstall.log(f"Memory statistics: {archinstall.mem_available()} available out of {archinstall.mem_total()} total installed", level=logging.DEBUG)
archinstall.log(f"Virtualization detected: {archinstall.virtualization()}; is VM: {archinstall.is_vm()}", level=logging.DEBUG)
archinstall.log(f"Graphics devices detected: {archinstall.graphics_devices().keys()}", level=logging.DEBUG)
# For support reasons, we'll log the disk layout pre installation to match against post-installation layout
archinstall.log(f"Disk states before installing: {archinstall.disk_layouts()}", level=logging.DEBUG)
if archinstall.arguments.get('help'):
print("See `man archinstall` for help.")
exit(0)
if os.getuid() != 0:
print("Archinstall requires root privileges to run. See --help for more.")
exit(1)
log_execution_environment()
if not archinstall.check_mirror_reachable():
log_file = os.path.join(archinstall.storage.get('LOG_PATH', None), archinstall.storage.get('LOG_FILE', None))
archinstall.log(f"Arch Linux mirrors are not reachable. Please check your internet connection and the log file '{log_file}'.", level=logging.INFO, fg="red")
exit(1)
load_config()
if not archinstall.arguments.get('silent'):
ask_user_questions()
# YEP write_config_files()
if not archinstall.arguments.get('silent'):
input('Press Enter to continue.')
perform_disk_operations()
perform_installation(archinstall.storage.get('MOUNT_POINT', '/mnt'))