Merge pull request #451 from dylanmtaylor/formatting2

More formatting fixes and other quality of life improvements
This commit is contained in:
Anton Hvornum 2021-05-15 22:54:22 +00:00 committed by GitHub
commit e3c8692bfa
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
33 changed files with 315 additions and 229 deletions

View File

@ -17,9 +17,9 @@ from .lib.user_interaction import *
__version__ = "2.2.0.dev1" __version__ = "2.2.0.dev1"
## Basic version of arg.parse() supporting: # Basic version of arg.parse() supporting:
## --key=value # --key=value
## --boolean # --boolean
arguments = {} arguments = {}
positionals = [] positionals = []
for arg in sys.argv[1:]: for arg in sys.argv[1:]:
@ -33,8 +33,8 @@ for arg in sys.argv[1:]:
positionals.append(arg) positionals.append(arg)
# TODO: Learn the dark arts of argparse... # TODO: Learn the dark arts of argparse... (I summon thee dark spawn of cPython)
# (I summon thee dark spawn of cPython)
def run_as_a_module(): def run_as_a_module():
""" """

View File

@ -2,9 +2,10 @@ import glob
import pathlib import pathlib
import re import re
from collections import OrderedDict from collections import OrderedDict
from typing import Optional
from .general import * from .general import *
from .hardware import hasUEFI from .hardware import has_uefi
from .output import log from .output import log
ROOT_DIR_PATTERN = re.compile('^.*?/devices') ROOT_DIR_PATTERN = re.compile('^.*?/devices')
@ -17,7 +18,8 @@ MBR = 0b00000010
# libc = ctypes.CDLL(ctypes.util.find_library('c'), use_errno=True) # libc = ctypes.CDLL(ctypes.util.find_library('c'), use_errno=True)
# libc.mount.argtypes = (ctypes.c_char_p, ctypes.c_char_p, ctypes.c_char_p, ctypes.c_ulong, ctypes.c_char_p) # libc.mount.argtypes = (ctypes.c_char_p, ctypes.c_char_p, ctypes.c_char_p, ctypes.c_ulong, ctypes.c_char_p)
class BlockDevice():
class BlockDevice:
def __init__(self, path, info=None): def __init__(self, path, info=None):
if not info: if not info:
# If we don't give any information, we need to auto-fill it. # If we don't give any information, we need to auto-fill it.
@ -75,8 +77,9 @@ class BlockDevice():
raise DiskError(f'Could not locate backplane info for "{self.path}"') raise DiskError(f'Could not locate backplane info for "{self.path}"')
if self.info['type'] == 'loop': if self.info['type'] == 'loop':
for drive in json.loads(b''.join(sys_command(['losetup', '--json'], hide_from_log=True)).decode('UTF_8'))['loopdevices']: for drive in json.loads(b''.join(SysCommand(['losetup', '--json'], hide_from_log=True)).decode('UTF_8'))['loopdevices']:
if not drive['name'] == self.path: continue if not drive['name'] == self.path:
continue
return drive['back-file'] return drive['back-file']
elif self.info['type'] == 'disk': elif self.info['type'] == 'disk':
@ -91,15 +94,15 @@ class BlockDevice():
else: else:
log(f"Unknown blockdevice type for {self.path}: {self.info['type']}", level=logging.DEBUG) log(f"Unknown blockdevice type for {self.path}: {self.info['type']}", level=logging.DEBUG)
# if not stat.S_ISBLK(os.stat(full_path).st_mode): # if not stat.S_ISBLK(os.stat(full_path).st_mode):
# raise DiskError(f'Selected disk "{full_path}" is not a block device.') # raise DiskError(f'Selected disk "{full_path}" is not a block device.')
@property @property
def partitions(self): def partitions(self):
o = b''.join(sys_command(['partprobe', self.path])) o = b''.join(SysCommand(['partprobe', self.path]))
# o = b''.join(sys_command('/usr/bin/lsblk -o name -J -b {dev}'.format(dev=dev))) # o = b''.join(sys_command('/usr/bin/lsblk -o name -J -b {dev}'.format(dev=dev)))
o = b''.join(sys_command(['/usr/bin/lsblk', '-J', self.path])) o = b''.join(SysCommand(['/usr/bin/lsblk', '-J', self.path]))
if b'not a block device' in o: if b'not a block device' in o:
raise DiskError(f'Can not read partitions off something that isn\'t a block device: {self.path}') raise DiskError(f'Can not read partitions off something that isn\'t a block device: {self.path}')
@ -113,7 +116,7 @@ class BlockDevice():
for part in r['blockdevices'][0]['children']: for part in r['blockdevices'][0]['children']:
part_id = part['name'][len(os.path.basename(self.path)):] part_id = part['name'][len(os.path.basename(self.path)):]
if part_id not in self.part_cache: if part_id not in self.part_cache:
## TODO: Force over-write even if in cache? # TODO: Force over-write even if in cache?
if part_id not in self.part_cache or self.part_cache[part_id].size != part['size']: if part_id not in self.part_cache or self.part_cache[part_id].size != part['size']:
self.part_cache[part_id] = Partition(root_path + part_id, self, part_id=part_id, size=part['size']) self.part_cache[part_id] = Partition(root_path + part_id, self, part_id=part_id, size=part['size'])
@ -136,7 +139,7 @@ class BlockDevice():
This is more reliable than relying on /dev/disk/by-partuuid as This is more reliable than relying on /dev/disk/by-partuuid as
it doesn't seam to be able to detect md raid partitions. it doesn't seam to be able to detect md raid partitions.
""" """
lsblk = b''.join(sys_command(f'lsblk -J -o+UUID {self.path}')) lsblk = b''.join(SysCommand(f'lsblk -J -o+UUID {self.path}'))
for partition in json.loads(lsblk.decode('UTF-8'))['blockdevices']: for partition in json.loads(lsblk.decode('UTF-8'))['blockdevices']:
return partition.get('uuid', None) return partition.get('uuid', None)
@ -153,7 +156,7 @@ class BlockDevice():
self.part_cache = OrderedDict() self.part_cache = OrderedDict()
class Partition(): class Partition:
def __init__(self, path: str, block_device: BlockDevice, part_id=None, size=-1, filesystem=None, mountpoint=None, encrypted=False, autodetect_filesystem=True): def __init__(self, path: str, block_device: BlockDevice, part_id=None, size=-1, filesystem=None, mountpoint=None, encrypted=False, autodetect_filesystem=True):
if not part_id: if not part_id:
part_id = os.path.basename(path) part_id = os.path.basename(path)
@ -177,11 +180,11 @@ class Partition():
if self.mountpoint != mount_information.get('target', None) and mountpoint: if self.mountpoint != mount_information.get('target', None) and mountpoint:
raise DiskError(f"{self} was given a mountpoint but the actual mountpoint differs: {mount_information.get('target', None)}") raise DiskError(f"{self} was given a mountpoint but the actual mountpoint differs: {mount_information.get('target', None)}")
if (target := mount_information.get('target', None)): if target := mount_information.get('target', None):
self.mountpoint = target self.mountpoint = target
if not self.filesystem and autodetect_filesystem: if not self.filesystem and autodetect_filesystem:
if (fstype := mount_information.get('fstype', get_filesystem_type(path))): if fstype := mount_information.get('fstype', get_filesystem_type(path)):
self.filesystem = fstype self.filesystem = fstype
if self.filesystem == 'crypto_LUKS': if self.filesystem == 'crypto_LUKS':
@ -213,7 +216,7 @@ class Partition():
This is more reliable than relying on /dev/disk/by-partuuid as This is more reliable than relying on /dev/disk/by-partuuid as
it doesn't seam to be able to detect md raid partitions. it doesn't seam to be able to detect md raid partitions.
""" """
lsblk = b''.join(sys_command(f'lsblk -J -o+PARTUUID {self.path}')) lsblk = b''.join(SysCommand(f'lsblk -J -o+PARTUUID {self.path}'))
for partition in json.loads(lsblk.decode('UTF-8'))['blockdevices']: for partition in json.loads(lsblk.decode('UTF-8'))['blockdevices']:
return partition.get('partuuid', None) return partition.get('partuuid', None)
return None return None
@ -233,10 +236,10 @@ class Partition():
@property @property
def real_device(self): def real_device(self):
for blockdevice in json.loads(b''.join(sys_command('lsblk -J')).decode('UTF-8'))['blockdevices']: for blockdevice in json.loads(b''.join(SysCommand('lsblk -J')).decode('UTF-8'))['blockdevices']:
if (parent := self.find_parent_of(blockdevice, os.path.basename(self.path))): if parent := self.find_parent_of(blockdevice, os.path.basename(self.path)):
return f"/dev/{parent}" return f"/dev/{parent}"
# raise DiskError(f'Could not find appropriate parent for encrypted partition {self}') # raise DiskError(f'Could not find appropriate parent for encrypted partition {self}')
return self.path return self.path
def detect_inner_filesystem(self, password): def detect_inner_filesystem(self, password):
@ -257,11 +260,11 @@ class Partition():
temporary_path = pathlib.Path(temporary_mountpoint) temporary_path = pathlib.Path(temporary_mountpoint)
temporary_path.mkdir(parents=True, exist_ok=True) temporary_path.mkdir(parents=True, exist_ok=True)
if (handle := sys_command(f'/usr/bin/mount {self.path} {temporary_mountpoint}')).exit_code != 0: if (handle := SysCommand(f'/usr/bin/mount {self.path} {temporary_mountpoint}')).exit_code != 0:
raise DiskError(f'Could not mount and check for content on {self.path} because: {b"".join(handle)}') raise DiskError(f'Could not mount and check for content on {self.path} because: {b"".join(handle)}')
files = len(glob.glob(f"{temporary_mountpoint}/*")) files = len(glob.glob(f"{temporary_mountpoint}/*"))
sys_command(f'/usr/bin/umount {temporary_mountpoint}') SysCommand(f'/usr/bin/umount {temporary_mountpoint}')
temporary_path.rmdir() temporary_path.rmdir()
@ -324,36 +327,36 @@ class Partition():
log(f'Formatting {path} -> {filesystem}', level=logging.INFO) log(f'Formatting {path} -> {filesystem}', level=logging.INFO)
if filesystem == 'btrfs': if filesystem == 'btrfs':
o = b''.join(sys_command(f'/usr/bin/mkfs.btrfs -f {path}')) o = b''.join(SysCommand(f'/usr/bin/mkfs.btrfs -f {path}'))
if b'UUID' not in o: if b'UUID' not in o:
raise DiskError(f'Could not format {path} with {filesystem} because: {o}') raise DiskError(f'Could not format {path} with {filesystem} because: {o}')
self.filesystem = 'btrfs' self.filesystem = 'btrfs'
elif filesystem == 'vfat': elif filesystem == 'vfat':
o = b''.join(sys_command(f'/usr/bin/mkfs.vfat -F32 {path}')) o = b''.join(SysCommand(f'/usr/bin/mkfs.vfat -F32 {path}'))
if (b'mkfs.fat' not in o and b'mkfs.vfat' not in o) or b'command not found' in o: if (b'mkfs.fat' not in o and b'mkfs.vfat' not in o) or b'command not found' in o:
raise DiskError(f'Could not format {path} with {filesystem} because: {o}') raise DiskError(f'Could not format {path} with {filesystem} because: {o}')
self.filesystem = 'vfat' self.filesystem = 'vfat'
elif filesystem == 'ext4': elif filesystem == 'ext4':
if (handle := sys_command(f'/usr/bin/mkfs.ext4 -F {path}')).exit_code != 0: if (handle := SysCommand(f'/usr/bin/mkfs.ext4 -F {path}')).exit_code != 0:
raise DiskError(f'Could not format {path} with {filesystem} because: {b"".join(handle)}') raise DiskError(f'Could not format {path} with {filesystem} because: {b"".join(handle)}')
self.filesystem = 'ext4' self.filesystem = 'ext4'
elif filesystem == 'xfs': elif filesystem == 'xfs':
if (handle := sys_command(f'/usr/bin/mkfs.xfs -f {path}')).exit_code != 0: if (handle := SysCommand(f'/usr/bin/mkfs.xfs -f {path}')).exit_code != 0:
raise DiskError(f'Could not format {path} with {filesystem} because: {b"".join(handle)}') raise DiskError(f'Could not format {path} with {filesystem} because: {b"".join(handle)}')
self.filesystem = 'xfs' self.filesystem = 'xfs'
elif filesystem == 'f2fs': elif filesystem == 'f2fs':
if (handle := sys_command(f'/usr/bin/mkfs.f2fs -f {path}')).exit_code != 0: if (handle := SysCommand(f'/usr/bin/mkfs.f2fs -f {path}')).exit_code != 0:
raise DiskError(f'Could not format {path} with {filesystem} because: {b"".join(handle)}') raise DiskError(f'Could not format {path} with {filesystem} because: {b"".join(handle)}')
self.filesystem = 'f2fs' self.filesystem = 'f2fs'
elif filesystem == 'crypto_LUKS': elif filesystem == 'crypto_LUKS':
# from .luks import luks2 # from .luks import luks2
# encrypted_partition = luks2(self, None, None) # encrypted_partition = luks2(self, None, None)
# encrypted_partition.format(path) # encrypted_partition.format(path)
self.filesystem = 'crypto_LUKS' self.filesystem = 'crypto_LUKS'
else: else:
@ -371,23 +374,24 @@ class Partition():
return parent return parent
elif 'children' in data: elif 'children' in data:
for child in data['children']: for child in data['children']:
if (parent := self.find_parent_of(child, name, parent=data['name'])): if parent := self.find_parent_of(child, name, parent=data['name']):
return parent return parent
def mount(self, target, fs=None, options=''): def mount(self, target, fs=None, options=''):
if not self.mountpoint: if not self.mountpoint:
log(f'Mounting {self} to {target}', level=logging.INFO) log(f'Mounting {self} to {target}', level=logging.INFO)
if not fs: if not fs:
if not self.filesystem: raise DiskError(f'Need to format (or define) the filesystem on {self} before mounting.') if not self.filesystem:
raise DiskError(f'Need to format (or define) the filesystem on {self} before mounting.')
fs = self.filesystem fs = self.filesystem
pathlib.Path(target).mkdir(parents=True, exist_ok=True) pathlib.Path(target).mkdir(parents=True, exist_ok=True)
try: try:
if options: if options:
sys_command(f'/usr/bin/mount -o {options} {self.path} {target}') SysCommand(f'/usr/bin/mount -o {options} {self.path} {target}')
else: else:
sys_command(f'/usr/bin/mount {self.path} {target}') SysCommand(f'/usr/bin/mount {self.path} {target}')
except SysCallError as err: except SysCallError as err:
raise err raise err
@ -396,14 +400,14 @@ class Partition():
def unmount(self): def unmount(self):
try: try:
exit_code = sys_command(f'/usr/bin/umount {self.path}').exit_code exit_code = SysCommand(f'/usr/bin/umount {self.path}').exit_code
except SysCallError as err: except SysCallError as err:
exit_code = err.exit_code exit_code = err.exit_code
# Without to much research, it seams that low error codes are errors. # Without to much research, it seams that low error codes are errors.
# And above 8k is indicators such as "/dev/x not mounted.". # And above 8k is indicators such as "/dev/x not mounted.".
# So anything in between 0 and 8k are errors (?). # So anything in between 0 and 8k are errors (?).
if exit_code > 0 and exit_code < 8000: if 0 < exit_code < 8000:
raise err raise err
self.mountpoint = None self.mountpoint = None
@ -416,8 +420,8 @@ class Partition():
""" """
The support for a filesystem (this partition) is tested by calling The support for a filesystem (this partition) is tested by calling
partition.format() with a path set to '/dev/null' which returns two exceptions: partition.format() with a path set to '/dev/null' which returns two exceptions:
1. SysCallError saying that /dev/null is not formattable - but the filesystem is supported 1. SysCallError saying that /dev/null is not formattable - but the filesystem is supported
2. UnknownFilesystemFormat that indicates that we don't support the given filesystem type 2. UnknownFilesystemFormat that indicates that we don't support the given filesystem type
""" """
try: try:
self.format(self.filesystem, '/dev/null', log_formatting=False, allow_formatting=True) self.format(self.filesystem, '/dev/null', log_formatting=False, allow_formatting=True)
@ -428,7 +432,7 @@ class Partition():
return True return True
class Filesystem(): class Filesystem:
# TODO: # TODO:
# When instance of a HDD is selected, check all usages and gracefully unmount them # When instance of a HDD is selected, check all usages and gracefully unmount them
# as well as close any crypto handles. # as well as close any crypto handles.
@ -446,7 +450,7 @@ class Filesystem():
else: else:
raise DiskError('Problem setting the partition format to GPT:', f'/usr/bin/parted -s {self.blockdevice.device} mklabel gpt') raise DiskError('Problem setting the partition format to GPT:', f'/usr/bin/parted -s {self.blockdevice.device} mklabel gpt')
elif self.mode == MBR: elif self.mode == MBR:
if sys_command(f'/usr/bin/parted -s {self.blockdevice.device} mklabel msdos').exit_code == 0: if SysCommand(f'/usr/bin/parted -s {self.blockdevice.device} mklabel msdos').exit_code == 0:
return self return self
else: else:
raise DiskError('Problem setting the partition format to GPT:', f'/usr/bin/parted -s {self.blockdevice.device} mklabel msdos') raise DiskError('Problem setting the partition format to GPT:', f'/usr/bin/parted -s {self.blockdevice.device} mklabel msdos')
@ -468,7 +472,7 @@ class Filesystem():
# TODO: https://stackoverflow.com/questions/28157929/how-to-safely-handle-an-exception-inside-a-context-manager # TODO: https://stackoverflow.com/questions/28157929/how-to-safely-handle-an-exception-inside-a-context-manager
if len(args) >= 2 and args[1]: if len(args) >= 2 and args[1]:
raise args[1] raise args[1]
b''.join(sys_command('sync')) b''.join(SysCommand('sync'))
return True return True
def find_partition(self, mountpoint): def find_partition(self, mountpoint):
@ -477,7 +481,7 @@ class Filesystem():
return partition return partition
def raw_parted(self, string: str): def raw_parted(self, string: str):
x = sys_command(f'/usr/bin/parted -s {string}') x = SysCommand(f'/usr/bin/parted -s {string}')
return x return x
def parted(self, string: str): def parted(self, string: str):
@ -491,8 +495,8 @@ class Filesystem():
def use_entire_disk(self, root_filesystem_type='ext4'): def use_entire_disk(self, root_filesystem_type='ext4'):
log(f"Using and formatting the entire {self.blockdevice}.", level=logging.DEBUG) log(f"Using and formatting the entire {self.blockdevice}.", level=logging.DEBUG)
if hasUEFI(): if has_uefi():
self.add_partition('primary', start='1MiB', end='513MiB', format='fat32') self.add_partition('primary', start='1MiB', end='513MiB', partition_format='fat32')
self.set_name(0, 'EFI') self.set_name(0, 'EFI')
self.set(0, 'boot on') self.set(0, 'boot on')
# TODO: Probably redundant because in GPT mode 'esp on' is an alias for "boot on"? # TODO: Probably redundant because in GPT mode 'esp on' is an alias for "boot on"?
@ -517,17 +521,17 @@ class Filesystem():
self.blockdevice.partition[0].target_mountpoint = '/' self.blockdevice.partition[0].target_mountpoint = '/'
self.blockdevice.partition[0].allow_formatting = True self.blockdevice.partition[0].allow_formatting = True
def add_partition(self, type, start, end, format=None): def add_partition(self, partition_type, start, end, partition_format=None):
log(f'Adding partition to {self.blockdevice}', level=logging.INFO) log(f'Adding partition to {self.blockdevice}', level=logging.INFO)
previous_partitions = self.blockdevice.partitions previous_partitions = self.blockdevice.partitions
if self.mode == MBR: if self.mode == MBR:
if len(self.blockdevice.partitions) > 3: if len(self.blockdevice.partitions) > 3:
DiskError("Too many partitions on disk, MBR disks can only have 3 parimary partitions") DiskError("Too many partitions on disk, MBR disks can only have 3 parimary partitions")
if format: if partition_format:
partitioning = self.parted(f'{self.blockdevice.device} mkpart {type} {format} {start} {end}') == 0 partitioning = self.parted(f'{self.blockdevice.device} mkpart {partition_type} {partition_format} {start} {end}') == 0
else: else:
partitioning = self.parted(f'{self.blockdevice.device} mkpart {type} {start} {end}') == 0 partitioning = self.parted(f'{self.blockdevice.device} mkpart {partition_type} {start} {end}') == 0
if partitioning: if partitioning:
start_wait = time.time() start_wait = time.time()
@ -568,8 +572,9 @@ def all_disks(*args, **kwargs):
kwargs.setdefault("partitions", False) kwargs.setdefault("partitions", False)
drives = OrderedDict() drives = OrderedDict()
# for drive in json.loads(sys_command(f'losetup --json', *args, **lkwargs, hide_from_log=True)).decode('UTF_8')['loopdevices']: # for drive in json.loads(sys_command(f'losetup --json', *args, **lkwargs, hide_from_log=True)).decode('UTF_8')['loopdevices']:
for drive in json.loads(b''.join(sys_command('lsblk --json -l -n -o path,size,type,mountpoint,label,pkname,model', *args, **kwargs, hide_from_log=True)).decode('UTF_8'))['blockdevices']: for drive in json.loads(b''.join(SysCommand('lsblk --json -l -n -o path,size,type,mountpoint,label,pkname,model', *args, **kwargs, hide_from_log=True)).decode('UTF_8'))['blockdevices']:
if not kwargs['partitions'] and drive['type'] == 'part': continue if not kwargs['partitions'] and drive['type'] == 'part':
continue
drives[drive['path']] = BlockDevice(drive['path'], drive) drives[drive['path']] = BlockDevice(drive['path'], drive)
return drives return drives
@ -600,7 +605,7 @@ def harddrive(size=None, model=None, fuzzy=False):
def get_mount_info(path): def get_mount_info(path):
try: try:
output = b''.join(sys_command(f'/usr/bin/findmnt --json {path}')) output = b''.join(SysCommand(f'/usr/bin/findmnt --json {path}'))
except SysCallError: except SysCallError:
return {} return {}
@ -615,7 +620,7 @@ def get_mount_info(path):
def get_partitions_in_use(mountpoint): def get_partitions_in_use(mountpoint):
try: try:
output = b''.join(sys_command(f'/usr/bin/findmnt --json -R {mountpoint}')) output = b''.join(SysCommand(f'/usr/bin/findmnt --json -R {mountpoint}'))
except SysCallError: except SysCallError:
return {} return {}
@ -634,7 +639,7 @@ def get_partitions_in_use(mountpoint):
def get_filesystem_type(path): def get_filesystem_type(path):
try: try:
handle = sys_command(f"blkid -o value -s TYPE {path}") handle = SysCommand(f"blkid -o value -s TYPE {path}")
return b''.join(handle).strip().decode('UTF-8') return b''.join(handle).strip().decode('UTF-8')
except SysCallError: except SysCallError:
return None return None
@ -642,7 +647,7 @@ def get_filesystem_type(path):
def disk_layouts(): def disk_layouts():
try: try:
handle = sys_command("lsblk -f -o+TYPE,SIZE -J") handle = SysCommand("lsblk -f -o+TYPE,SIZE -J")
return json.loads(b''.join(handle).decode('UTF-8')) return json.loads(b''.join(handle).decode('UTF-8'))
except SysCallError as err: except SysCallError as err:
log(f"Could not return disk layouts: {err}") log(f"Could not return disk layouts: {err}")

View File

@ -42,7 +42,7 @@ def locate_binary(name):
break # Don't recurse break # Don't recurse
class JSON_Encoder: class JsonEncoder:
def _encode(obj): def _encode(obj):
if isinstance(obj, dict): if isinstance(obj, dict):
# We'll need to iterate not just the value that default() usually gets passed # We'll need to iterate not just the value that default() usually gets passed
@ -54,12 +54,12 @@ class JSON_Encoder:
# This, is a EXTREMELY ugly hack.. but it's the only quick way I can think of to trigger a encoding of sub-dictionaries. # This, is a EXTREMELY ugly hack.. but it's the only quick way I can think of to trigger a encoding of sub-dictionaries.
val = json.loads(json.dumps(val, cls=JSON)) val = json.loads(json.dumps(val, cls=JSON))
else: else:
val = JSON_Encoder._encode(val) val = JsonEncoder._encode(val)
if type(key) == str and key[0] == '!': if type(key) == str and key[0] == '!':
copy[JSON_Encoder._encode(key)] = '******' copy[JsonEncoder._encode(key)] = '******'
else: else:
copy[JSON_Encoder._encode(key)] = val copy[JsonEncoder._encode(key)] = val
return copy return copy
elif hasattr(obj, 'json'): elif hasattr(obj, 'json'):
return obj.json() return obj.json()
@ -78,18 +78,20 @@ class JSON_Encoder:
class JSON(json.JSONEncoder, json.JSONDecoder): class JSON(json.JSONEncoder, json.JSONDecoder):
def _encode(self, obj): def _encode(self, obj):
return JSON_Encoder._encode(obj) return JsonEncoder._encode(obj)
def encode(self, obj): def encode(self, obj):
return super(JSON, self).encode(self._encode(obj)) return super(JSON, self).encode(self._encode(obj))
class sys_command: class SysCommand:
""" """
Stolen from archinstall_gui Stolen from archinstall_gui
""" """
def __init__(self, cmd, callback=None, start_callback=None, peak_output=False, environment_vars={}, *args, **kwargs): def __init__(self, cmd, callback=None, start_callback=None, peak_output=False, environment_vars=None, *args, **kwargs):
if environment_vars is None:
environment_vars = {}
kwargs.setdefault("worker_id", gen_uid()) kwargs.setdefault("worker_id", gen_uid())
kwargs.setdefault("emulate", False) kwargs.setdefault("emulate", False)
kwargs.setdefault("suppress_errors", False) kwargs.setdefault("suppress_errors", False)
@ -170,7 +172,7 @@ class sys_command:
'ended': self.ended, 'ended': self.ended,
'started_pprint': '{}-{}-{} {}:{}:{}'.format(*time.localtime(self.started)), 'started_pprint': '{}-{}-{} {}:{}:{}'.format(*time.localtime(self.started)),
'ended_pprint': '{}-{}-{} {}:{}:{}'.format(*time.localtime(self.ended)) if self.ended else None, 'ended_pprint': '{}-{}-{} {}:{}:{}'.format(*time.localtime(self.ended)) if self.ended else None,
'exit_code': self.exit_code 'exit_code': self.exit_code,
} }
def peak(self, output: Union[str, bytes]) -> bool: def peak(self, output: Union[str, bytes]) -> bool:
@ -256,7 +258,7 @@ class sys_command:
original = trigger original = trigger
trigger = bytes(original, 'UTF-8') trigger = bytes(original, 'UTF-8')
self.kwargs['events'][trigger] = self.kwargs['events'][original] self.kwargs['events'][trigger] = self.kwargs['events'][original]
del (self.kwargs['events'][original]) del self.kwargs['events'][original]
if type(self.kwargs['events'][trigger]) != bytes: if type(self.kwargs['events'][trigger]) != bytes:
self.kwargs['events'][trigger] = bytes(self.kwargs['events'][trigger], 'UTF-8') self.kwargs['events'][trigger] = bytes(self.kwargs['events'][trigger], 'UTF-8')
@ -269,7 +271,7 @@ class sys_command:
last_trigger_pos = trigger_pos last_trigger_pos = trigger_pos
os.write(child_fd, self.kwargs['events'][trigger]) os.write(child_fd, self.kwargs['events'][trigger])
del (self.kwargs['events'][trigger]) del self.kwargs['events'][trigger]
broke = True broke = True
break break
@ -334,4 +336,4 @@ def prerequisite_check():
def reboot(): def reboot():
o = b''.join(sys_command("/usr/bin/reboot")) o = b''.join(SysCommand("/usr/bin/reboot"))

View File

@ -3,7 +3,7 @@ import os
import subprocess import subprocess
from typing import Optional from typing import Optional
from .general import sys_command from .general import SysCommand
from .networking import list_interfaces, enrich_iface_types from .networking import list_interfaces, enrich_iface_types
__packages__ = [ __packages__ = [
@ -56,48 +56,48 @@ AVAILABLE_GFX_DRIVERS = {
} }
def hasWifi() -> bool: def has_wifi() -> bool:
return 'WIRELESS' in enrich_iface_types(list_interfaces().values()).values() return 'WIRELESS' in enrich_iface_types(list_interfaces().values()).values()
def hasAMDCPU() -> bool: def has_amd_cpu() -> bool:
if subprocess.check_output("lscpu | grep AMD", shell=True).strip().decode(): if subprocess.check_output("lscpu | grep AMD", shell=True).strip().decode():
return True return True
return False return False
def hasIntelCPU() -> bool: def has_intel_cpu() -> bool:
if subprocess.check_output("lscpu | grep Intel", shell=True).strip().decode(): if subprocess.check_output("lscpu | grep Intel", shell=True).strip().decode():
return True return True
return False return False
def hasUEFI() -> bool: def has_uefi() -> bool:
return os.path.isdir('/sys/firmware/efi') return os.path.isdir('/sys/firmware/efi')
def graphicsDevices() -> dict: def graphics_devices() -> dict:
cards = {} cards = {}
for line in sys_command("lspci"): for line in SysCommand("lspci"):
if b' VGA ' in line: if b' VGA ' in line:
_, identifier = line.split(b': ', 1) _, identifier = line.split(b': ', 1)
cards[identifier.strip().lower().decode('UTF-8')] = line cards[identifier.strip().lower().decode('UTF-8')] = line
return cards return cards
def hasNvidiaGraphics() -> bool: def has_nvidia_graphics() -> bool:
return any('nvidia' in x for x in graphicsDevices()) return any('nvidia' in x for x in graphics_devices())
def hasAmdGraphics() -> bool: def has_amd_graphics() -> bool:
return any('amd' in x for x in graphicsDevices()) return any('amd' in x for x in graphics_devices())
def hasIntelGraphics() -> bool: def has_intel_graphics() -> bool:
return any('intel' in x for x in graphicsDevices()) return any('intel' in x for x in graphics_devices())
def cpuVendor() -> Optional[str]: def cpu_vendor() -> Optional[str]:
cpu_info = json.loads(subprocess.check_output("lscpu -J", shell=True).decode('utf-8'))['lscpu'] cpu_info = json.loads(subprocess.check_output("lscpu -J", shell=True).decode('utf-8'))['lscpu']
for info in cpu_info: for info in cpu_info:
if info.get('field', None): if info.get('field', None):
@ -106,7 +106,7 @@ def cpuVendor() -> Optional[str]:
return None return None
def isVM() -> bool: def is_vm() -> bool:
try: try:
subprocess.check_call(["systemd-detect-virt"]) # systemd-detect-virt issues a non-zero exit code if it is not on a virtual machine subprocess.check_call(["systemd-detect-virt"]) # systemd-detect-virt issues a non-zero exit code if it is not on a virtual machine
return True return True

View File

@ -9,7 +9,7 @@ from .user_interaction import *
__packages__ = ["base", "base-devel", "linux-firmware", "linux", "linux-lts", "linux-zen", "linux-hardened"] __packages__ = ["base", "base-devel", "linux-firmware", "linux", "linux-lts", "linux-zen", "linux-hardened"]
class Installer(): class Installer:
""" """
`Installer()` is the wrapper for most basic installation steps. `Installer()` is the wrapper for most basic installation steps.
It also wraps :py:func:`~archinstall.Installer.pacstrap` among other things. It also wraps :py:func:`~archinstall.Installer.pacstrap` among other things.
@ -34,7 +34,11 @@ class Installer():
""" """
def __init__(self, target, *, base_packages=__packages__[:3], kernels=['linux']): def __init__(self, target, *, base_packages=None, kernels=None):
if base_packages is None:
base_packages = __packages__[:3]
if kernels is None:
kernels = ['linux']
self.target = target self.target = target
self.init_time = time.strftime('%Y-%m-%d_%H-%M-%S') self.init_time = time.strftime('%Y-%m-%d_%H-%M-%S')
self.milliseconds = int(str(time.time()).split('.')[1]) self.milliseconds = int(str(time.time()).split('.')[1])
@ -107,7 +111,7 @@ class Installer():
# Copy over the install log (if there is one) to the install medium if # Copy over the install log (if there is one) to the install medium if
# at least the base has been strapped in, otherwise we won't have a filesystem/structure to copy to. # at least the base has been strapped in, otherwise we won't have a filesystem/structure to copy to.
if self.helper_flags.get('base-strapped', False) is True: if self.helper_flags.get('base-strapped', False) is True:
if (filename := storage.get('LOG_FILE', None)): if filename := storage.get('LOG_FILE', None):
absolute_logfile = os.path.join(storage.get('LOG_PATH', './'), filename) absolute_logfile = os.path.join(storage.get('LOG_PATH', './'), filename)
if not os.path.isdir(f"{self.target}/{os.path.dirname(absolute_logfile)}"): if not os.path.isdir(f"{self.target}/{os.path.dirname(absolute_logfile)}"):
@ -127,11 +131,12 @@ class Installer():
return [step for step, flag in self.helper_flags.items() if flag is False] return [step for step, flag in self.helper_flags.items() if flag is False]
def pacstrap(self, *packages, **kwargs): def pacstrap(self, *packages, **kwargs):
if type(packages[0]) in (list, tuple): packages = packages[0] if type(packages[0]) in (list, tuple):
packages = packages[0]
self.log(f'Installing packages: {packages}', level=logging.INFO) self.log(f'Installing packages: {packages}', level=logging.INFO)
if (sync_mirrors := sys_command('/usr/bin/pacman -Syy')).exit_code == 0: if (sync_mirrors := SysCommand('/usr/bin/pacman -Syy')).exit_code == 0:
if (pacstrap := sys_command(f'/usr/bin/pacstrap {self.target} {" ".join(packages)}', **kwargs)).exit_code == 0: if (pacstrap := SysCommand(f'/usr/bin/pacstrap {self.target} {" ".join(packages)}', **kwargs)).exit_code == 0:
return True return True
else: else:
self.log(f'Could not strap in packages: {pacstrap.exit_code}', level=logging.INFO) self.log(f'Could not strap in packages: {pacstrap.exit_code}', level=logging.INFO)
@ -144,7 +149,7 @@ class Installer():
def genfstab(self, flags='-pU'): def genfstab(self, flags='-pU'):
self.log(f"Updating {self.target}/etc/fstab", level=logging.INFO) self.log(f"Updating {self.target}/etc/fstab", level=logging.INFO)
fstab = sys_command(f'/usr/bin/genfstab {flags} {self.target}').trace_log fstab = SysCommand(f'/usr/bin/genfstab {flags} {self.target}').trace_log
with open(f"{self.target}/etc/fstab", 'ab') as fstab_fh: with open(f"{self.target}/etc/fstab", 'ab') as fstab_fh:
fstab_fh.write(fstab) fstab_fh.write(fstab)
@ -158,22 +163,25 @@ class Installer():
fh.write(hostname + '\n') fh.write(hostname + '\n')
def set_locale(self, locale, encoding='UTF-8', *args, **kwargs): def set_locale(self, locale, encoding='UTF-8', *args, **kwargs):
if not len(locale): return True if not len(locale):
return True
with open(f'{self.target}/etc/locale.gen', 'a') as fh: with open(f'{self.target}/etc/locale.gen', 'a') as fh:
fh.write(f'{locale}.{encoding} {encoding}\n') fh.write(f'{locale}.{encoding} {encoding}\n')
with open(f'{self.target}/etc/locale.conf', 'w') as fh: with open(f'{self.target}/etc/locale.conf', 'w') as fh:
fh.write(f'LANG={locale}.{encoding}\n') fh.write(f'LANG={locale}.{encoding}\n')
return True if sys_command(f'/usr/bin/arch-chroot {self.target} locale-gen').exit_code == 0 else False return True if SysCommand(f'/usr/bin/arch-chroot {self.target} locale-gen').exit_code == 0 else False
def set_timezone(self, zone, *args, **kwargs): def set_timezone(self, zone, *args, **kwargs):
if not zone: return True if not zone:
if not len(zone): return True # Redundant return True
if not len(zone):
return True # Redundant
if (pathlib.Path("/usr") / "share" / "zoneinfo" / zone).exists(): if (pathlib.Path("/usr") / "share" / "zoneinfo" / zone).exists():
(pathlib.Path(self.target) / "etc" / "localtime").unlink(missing_ok=True) (pathlib.Path(self.target) / "etc" / "localtime").unlink(missing_ok=True)
sys_command(f'/usr/bin/arch-chroot {self.target} ln -s /usr/share/zoneinfo/{zone} /etc/localtime') SysCommand(f'/usr/bin/arch-chroot {self.target} ln -s /usr/share/zoneinfo/{zone} /etc/localtime')
return True return True
else: else:
self.log( self.log(
@ -195,7 +203,7 @@ class Installer():
raise ServiceException(f"Unable to start service {service}: {output}") raise ServiceException(f"Unable to start service {service}: {output}")
def run_command(self, cmd, *args, **kwargs): def run_command(self, cmd, *args, **kwargs):
return sys_command(f'/usr/bin/arch-chroot {self.target} {cmd}') return SysCommand(f'/usr/bin/arch-chroot {self.target} {cmd}')
def arch_chroot(self, cmd, *args, **kwargs): def arch_chroot(self, cmd, *args, **kwargs):
if 'runas' in kwargs: if 'runas' in kwargs:
@ -224,10 +232,10 @@ class Installer():
with open(f"{self.target}/etc/systemd/network/10-{nic}.network", "a") as netconf: with open(f"{self.target}/etc/systemd/network/10-{nic}.network", "a") as netconf:
netconf.write(str(conf)) netconf.write(str(conf))
def copy_ISO_network_config(self, enable_services=False): def copy_iso_network_config(self, enable_services=False):
# Copy (if any) iwd password and config files # Copy (if any) iwd password and config files
if os.path.isdir('/var/lib/iwd/'): if os.path.isdir('/var/lib/iwd/'):
if (psk_files := glob.glob('/var/lib/iwd/*.psk')): if psk_files := glob.glob('/var/lib/iwd/*.psk'):
if not os.path.isdir(f"{self.target}/var/lib/iwd"): if not os.path.isdir(f"{self.target}/var/lib/iwd"):
os.makedirs(f"{self.target}/var/lib/iwd") os.makedirs(f"{self.target}/var/lib/iwd")
@ -253,7 +261,7 @@ class Installer():
shutil.copy2(psk, f"{self.target}/var/lib/iwd/{os.path.basename(psk)}") shutil.copy2(psk, f"{self.target}/var/lib/iwd/{os.path.basename(psk)}")
# Copy (if any) systemd-networkd config files # Copy (if any) systemd-networkd config files
if (netconfigurations := glob.glob('/etc/systemd/network/*')): if netconfigurations := glob.glob('/etc/systemd/network/*'):
if not os.path.isdir(f"{self.target}/etc/systemd/network/"): if not os.path.isdir(f"{self.target}/etc/systemd/network/"):
os.makedirs(f"{self.target}/etc/systemd/network/") os.makedirs(f"{self.target}/etc/systemd/network/")
@ -263,6 +271,7 @@ class Installer():
if enable_services: if enable_services:
# If we haven't installed the base yet (function called pre-maturely) # If we haven't installed the base yet (function called pre-maturely)
if self.helper_flags.get('base', False) is False: if self.helper_flags.get('base', False) is False:
def post_install_enable_networkd_resolved(*args, **kwargs): def post_install_enable_networkd_resolved(*args, **kwargs):
self.enable_service('systemd-networkd', 'systemd-resolved') self.enable_service('systemd-networkd', 'systemd-resolved')
@ -288,13 +297,13 @@ class Installer():
mkinit.write(f"BINARIES=({' '.join(self.BINARIES)})\n") mkinit.write(f"BINARIES=({' '.join(self.BINARIES)})\n")
mkinit.write(f"FILES=({' '.join(self.FILES)})\n") mkinit.write(f"FILES=({' '.join(self.FILES)})\n")
mkinit.write(f"HOOKS=({' '.join(self.HOOKS)})\n") mkinit.write(f"HOOKS=({' '.join(self.HOOKS)})\n")
sys_command(f'/usr/bin/arch-chroot {self.target} mkinitcpio {" ".join(flags)}') SysCommand(f'/usr/bin/arch-chroot {self.target} mkinitcpio {" ".join(flags)}')
def minimal_installation(self): def minimal_installation(self):
## Add necessary packages if encrypting the drive # Add necessary packages if encrypting the drive
## (encrypted partitions default to btrfs for now, so we need btrfs-progs) # (encrypted partitions default to btrfs for now, so we need btrfs-progs)
## TODO: Perhaps this should be living in the function which dictates # TODO: Perhaps this should be living in the function which dictates
## the partitioning. Leaving here for now. # the partitioning. Leaving here for now.
for partition in self.partitions: for partition in self.partitions:
if partition.filesystem == 'btrfs': if partition.filesystem == 'btrfs':
@ -316,11 +325,11 @@ class Installer():
if 'encrypt' not in self.HOOKS: if 'encrypt' not in self.HOOKS:
self.HOOKS.insert(self.HOOKS.index('filesystems'), 'encrypt') self.HOOKS.insert(self.HOOKS.index('filesystems'), 'encrypt')
if not hasUEFI(): if not has_uefi():
self.base_packages.append('grub') self.base_packages.append('grub')
if not isVM(): if not is_vm():
vendor = cpuVendor() vendor = cpu_vendor()
if vendor == "AuthenticAMD": if vendor == "AuthenticAMD":
self.base_packages.append("amd-ucode") self.base_packages.append("amd-ucode")
elif vendor == "GenuineIntel": elif vendor == "GenuineIntel":
@ -332,11 +341,9 @@ class Installer():
self.helper_flags['base-strapped'] = True self.helper_flags['base-strapped'] = True
with open(f"{self.target}/etc/fstab", "a") as fstab: with open(f"{self.target}/etc/fstab", "a") as fstab:
fstab.write( fstab.write("\ntmpfs /tmp tmpfs defaults,noatime,mode=1777 0 0\n") # Redundant \n at the start? who knows?
"\ntmpfs /tmp tmpfs defaults,noatime,mode=1777 0 0\n"
) # Redundant \n at the start? who knows?
## TODO: Support locale and timezone # TODO: Support locale and timezone
# os.remove(f'{self.target}/etc/localtime') # os.remove(f'{self.target}/etc/localtime')
# sys_command(f'/usr/bin/arch-chroot {self.target} ln -s /usr/share/zoneinfo/{localtime} /etc/localtime') # sys_command(f'/usr/bin/arch-chroot {self.target} ln -s /usr/share/zoneinfo/{localtime} /etc/localtime')
# sys_command('/usr/bin/arch-chroot /mnt hwclock --hctosys --localtime') # sys_command('/usr/bin/arch-chroot /mnt hwclock --hctosys --localtime')
@ -344,7 +351,7 @@ class Installer():
self.set_locale('en_US') self.set_locale('en_US')
# TODO: Use python functions for this # TODO: Use python functions for this
sys_command(f'/usr/bin/arch-chroot {self.target} chmod 700 /root') SysCommand(f'/usr/bin/arch-chroot {self.target} chmod 700 /root')
self.mkinitcpio('-P') self.mkinitcpio('-P')
@ -371,16 +378,16 @@ class Installer():
if bootloader == 'systemd-bootctl': if bootloader == 'systemd-bootctl':
self.pacstrap('efibootmgr') self.pacstrap('efibootmgr')
if not hasUEFI(): if not has_uefi():
raise HardwareIncompatibilityError raise HardwareIncompatibilityError
# TODO: Ideally we would want to check if another config # TODO: Ideally we would want to check if another config
# points towards the same disk and/or partition. # points towards the same disk and/or partition.
# And in which case we should do some clean up. # And in which case we should do some clean up.
# Install the boot loader # Install the boot loader
if sys_command(f'/usr/bin/arch-chroot {self.target} bootctl --path=/boot install').exit_code != 0: if SysCommand(f'/usr/bin/arch-chroot {self.target} bootctl --path=/boot install').exit_code != 0:
# Fallback, try creating the boot loader without touching the EFI variables # Fallback, try creating the boot loader without touching the EFI variables
sys_command(f'/usr/bin/arch-chroot {self.target} bootctl --no-variables --path=/boot install') SysCommand(f'/usr/bin/arch-chroot {self.target} bootctl --no-variables --path=/boot install')
# Modify or create a loader.conf # Modify or create a loader.conf
if os.path.isfile(f'{self.target}/boot/loader/loader.conf'): if os.path.isfile(f'{self.target}/boot/loader/loader.conf'):
@ -402,8 +409,8 @@ class Installer():
else: else:
loader.write(f"{line}\n") loader.write(f"{line}\n")
## For some reason, blkid and /dev/disk/by-uuid are not getting along well. # For some reason, blkid and /dev/disk/by-uuid are not getting along well.
## And blkid is wrong in terms of LUKS. # And blkid is wrong in terms of LUKS.
# UUID = sys_command('blkid -s PARTUUID -o value {drive}{partition_2}'.format(**args)).decode('UTF-8').strip() # UUID = sys_command('blkid -s PARTUUID -o value {drive}{partition_2}'.format(**args)).decode('UTF-8').strip()
# Setup the loader entry # Setup the loader entry
with open(f'{self.target}/boot/loader/entries/{self.init_time}.conf', 'w') as entry: with open(f'{self.target}/boot/loader/entries/{self.init_time}.conf', 'w') as entry:
@ -411,8 +418,8 @@ class Installer():
entry.write(f'# Created on: {self.init_time}\n') entry.write(f'# Created on: {self.init_time}\n')
entry.write('title Arch Linux\n') entry.write('title Arch Linux\n')
entry.write('linux /vmlinuz-linux\n') entry.write('linux /vmlinuz-linux\n')
if not isVM(): if not is_vm():
vendor = cpuVendor() vendor = cpu_vendor()
if vendor == "AuthenticAMD": if vendor == "AuthenticAMD":
entry.write("initrd /amd-ucode.img\n") entry.write("initrd /amd-ucode.img\n")
elif vendor == "GenuineIntel": elif vendor == "GenuineIntel":
@ -420,10 +427,10 @@ class Installer():
else: else:
self.log("unknow cpu vendor, not adding ucode to systemd-boot config") self.log("unknow cpu vendor, not adding ucode to systemd-boot config")
entry.write('initrd /initramfs-linux.img\n') entry.write('initrd /initramfs-linux.img\n')
## blkid doesn't trigger on loopback devices really well, # blkid doesn't trigger on loopback devices really well,
## so we'll use the old manual method until we get that sorted out. # so we'll use the old manual method until we get that sorted out.
if (real_device := self.detect_encryption(root_partition)): if real_device := self.detect_encryption(root_partition):
# TODO: We need to detect if the encrypted device is a whole disk encryption, # TODO: We need to detect if the encrypted device is a whole disk encryption,
# or simply a partition encryption. Right now we assume it's a partition (and we always have) # or simply a partition encryption. Right now we assume it's a partition (and we always have)
log(f"Identifying root partition by PART-UUID on {real_device}: '{real_device.uuid}'.", level=logging.DEBUG) log(f"Identifying root partition by PART-UUID on {real_device}: '{real_device.uuid}'.", level=logging.DEBUG)
@ -439,17 +446,17 @@ class Installer():
elif bootloader == "grub-install": elif bootloader == "grub-install":
self.pacstrap('grub') self.pacstrap('grub')
if hasUEFI(): if has_uefi():
self.pacstrap('efibootmgr') self.pacstrap('efibootmgr')
o = b''.join(sys_command(f'/usr/bin/arch-chroot {self.target} grub-install --target=x86_64-efi --efi-directory=/boot --bootloader-id=GRUB')) o = b''.join(SysCommand(f'/usr/bin/arch-chroot {self.target} grub-install --target=x86_64-efi --efi-directory=/boot --bootloader-id=GRUB'))
sys_command('/usr/bin/arch-chroot /mnt grub-mkconfig -o /boot/grub/grub.cfg') SysCommand('/usr/bin/arch-chroot /mnt grub-mkconfig -o /boot/grub/grub.cfg')
return True return True
else: else:
root_device = subprocess.check_output(f'basename "$(readlink -f /sys/class/block/{root_partition.path.replace("/dev/", "")}/..)"', shell=True).decode().strip() root_device = subprocess.check_output(f'basename "$(readlink -f /sys/class/block/{root_partition.path.replace("/dev/", "")}/..)"', shell=True).decode().strip()
if root_device == "block": if root_device == "block":
root_device = f"{root_partition.path}" root_device = f"{root_partition.path}"
o = b''.join(sys_command(f'/usr/bin/arch-chroot {self.target} grub-install --target=i386-pc /dev/{root_device}')) o = b''.join(SysCommand(f'/usr/bin/arch-chroot {self.target} grub-install --target=i386-pc /dev/{root_device}'))
sys_command('/usr/bin/arch-chroot /mnt grub-mkconfig -o /boot/grub/grub.cfg') SysCommand('/usr/bin/arch-chroot /mnt grub-mkconfig -o /boot/grub/grub.cfg')
self.helper_flags['bootloader'] = bootloader self.helper_flags['bootloader'] = bootloader
return True return True
else: else:
@ -473,15 +480,17 @@ class Installer():
sudoers.write(f'{"%" if group else ""}{entity} ALL=(ALL) ALL\n') sudoers.write(f'{"%" if group else ""}{entity} ALL=(ALL) ALL\n')
return True return True
def user_create(self, user: str, password=None, groups=[], sudo=False): def user_create(self, user: str, password=None, groups=None, sudo=False):
if groups is None:
groups = []
self.log(f'Creating user {user}', level=logging.INFO) self.log(f'Creating user {user}', level=logging.INFO)
o = b''.join(sys_command(f'/usr/bin/arch-chroot {self.target} useradd -m -G wheel {user}')) o = b''.join(SysCommand(f'/usr/bin/arch-chroot {self.target} useradd -m -G wheel {user}'))
if password: if password:
self.user_set_pw(user, password) self.user_set_pw(user, password)
if groups: if groups:
for group in groups: for group in groups:
o = b''.join(sys_command(f'/usr/bin/arch-chroot {self.target} gpasswd -a {user} {group}')) o = b''.join(SysCommand(f'/usr/bin/arch-chroot {self.target} gpasswd -a {user} {group}'))
if sudo and self.enable_sudo(user): if sudo and self.enable_sudo(user):
self.helper_flags['user'] = True self.helper_flags['user'] = True
@ -493,13 +502,13 @@ class Installer():
# This means the root account isn't locked/disabled with * in /etc/passwd # This means the root account isn't locked/disabled with * in /etc/passwd
self.helper_flags['user'] = True self.helper_flags['user'] = True
o = b''.join(sys_command(f"/usr/bin/arch-chroot {self.target} sh -c \"echo '{user}:{password}' | chpasswd\"")) o = b''.join(SysCommand(f"/usr/bin/arch-chroot {self.target} sh -c \"echo '{user}:{password}' | chpasswd\""))
pass pass
def user_set_shell(self, user, shell): def user_set_shell(self, user, shell):
self.log(f'Setting shell for {user} to {shell}', level=logging.INFO) self.log(f'Setting shell for {user} to {shell}', level=logging.INFO)
o = b''.join(sys_command(f"/usr/bin/arch-chroot {self.target} sh -c \"chsh -s {shell} {user}\"")) o = b''.join(SysCommand(f"/usr/bin/arch-chroot {self.target} sh -c \"chsh -s {shell} {user}\""))
pass pass
def set_keyboard_language(self, language): def set_keyboard_language(self, language):

View File

@ -27,9 +27,9 @@ def verify_keyboard_layout(layout):
return False return False
def search_keyboard_layout(filter): def search_keyboard_layout(layout_filter):
for language in list_keyboard_languages(): for language in list_keyboard_languages():
if filter.lower() in language.lower(): if layout_filter.lower() in language.lower():
yield language yield language

View File

@ -5,7 +5,7 @@ from .general import *
from .output import log from .output import log
class luks2(): class luks2:
def __init__(self, partition, mountpoint, password, key_file=None, auto_unmount=False, *args, **kwargs): def __init__(self, partition, mountpoint, password, key_file=None, auto_unmount=False, *args, **kwargs):
self.password = password self.password = password
self.partition = partition self.partition = partition
@ -78,7 +78,7 @@ class luks2():
try: try:
# Try to setup the crypt-device # Try to setup the crypt-device
cmd_handle = sys_command(cryptsetup_args) cmd_handle = SysCommand(cryptsetup_args)
except SysCallError as err: except SysCallError as err:
if err.exit_code == 256: if err.exit_code == 256:
log(f'{partition} is being used, trying to unmount and crypt-close the device and running one more attempt at encrypting the device.', level=logging.DEBUG) log(f'{partition} is being used, trying to unmount and crypt-close the device and running one more attempt at encrypting the device.', level=logging.DEBUG)
@ -87,7 +87,7 @@ class luks2():
# Get crypt-information about the device by doing a reverse lookup starting with the partition path # Get crypt-information about the device by doing a reverse lookup starting with the partition path
# For instance: /dev/sda # For instance: /dev/sda
devinfo = json.loads(b''.join(sys_command(f"lsblk --fs -J {partition.path}")).decode('UTF-8'))['blockdevices'][0] devinfo = json.loads(b''.join(SysCommand(f"lsblk --fs -J {partition.path}")).decode('UTF-8'))['blockdevices'][0]
# For each child (sub-partition/sub-device) # For each child (sub-partition/sub-device)
if len(children := devinfo.get('children', [])): if len(children := devinfo.get('children', [])):
@ -95,14 +95,14 @@ class luks2():
# Unmount the child location # Unmount the child location
if child_mountpoint := child.get('mountpoint', None): if child_mountpoint := child.get('mountpoint', None):
log(f'Unmounting {child_mountpoint}', level=logging.DEBUG) log(f'Unmounting {child_mountpoint}', level=logging.DEBUG)
sys_command(f"umount -R {child_mountpoint}") SysCommand(f"umount -R {child_mountpoint}")
# And close it if possible. # And close it if possible.
log(f"Closing crypt device {child['name']}", level=logging.DEBUG) log(f"Closing crypt device {child['name']}", level=logging.DEBUG)
sys_command(f"cryptsetup close {child['name']}") SysCommand(f"cryptsetup close {child['name']}")
# Then try again to set up the crypt-device # Then try again to set up the crypt-device
cmd_handle = sys_command(cryptsetup_args) cmd_handle = SysCommand(cryptsetup_args)
else: else:
raise err raise err
@ -120,6 +120,7 @@ class luks2():
:type mountpoint: str :type mountpoint: str
""" """
from .disk import get_filesystem_type from .disk import get_filesystem_type
if '/' in mountpoint: if '/' in mountpoint:
os.path.basename(mountpoint) # TODO: Raise exception instead? os.path.basename(mountpoint) # TODO: Raise exception instead?
@ -127,7 +128,7 @@ class luks2():
while pathlib.Path(partition.path).exists() is False and time.time() - wait_timer < 10: while pathlib.Path(partition.path).exists() is False and time.time() - wait_timer < 10:
time.sleep(0.025) time.sleep(0.025)
sys_command(f'/usr/bin/cryptsetup open {partition.path} {mountpoint} --key-file {os.path.abspath(key_file)} --type luks2') SysCommand(f'/usr/bin/cryptsetup open {partition.path} {mountpoint} --key-file {os.path.abspath(key_file)} --type luks2')
if os.path.islink(f'/dev/mapper/{mountpoint}'): if os.path.islink(f'/dev/mapper/{mountpoint}'):
self.mapdev = f'/dev/mapper/{mountpoint}' self.mapdev = f'/dev/mapper/{mountpoint}'
unlocked_partition = Partition(self.mapdev, None, encrypted=True, filesystem=get_filesystem_type(self.mapdev), autodetect_filesystem=False) unlocked_partition = Partition(self.mapdev, None, encrypted=True, filesystem=get_filesystem_type(self.mapdev), autodetect_filesystem=False)
@ -138,9 +139,9 @@ class luks2():
if not mountpoint: if not mountpoint:
mountpoint = self.mapdev mountpoint = self.mapdev
sys_command(f'/usr/bin/cryptsetup close {self.mapdev}') SysCommand(f'/usr/bin/cryptsetup close {self.mapdev}')
return os.path.islink(self.mapdev) is False return os.path.islink(self.mapdev) is False
def format(self, path): def format(self, path):
if (handle := sys_command(f"/usr/bin/cryptsetup -q -v luksErase {path}")).exit_code != 0: if (handle := SysCommand(f"/usr/bin/cryptsetup -q -v luksErase {path}")).exit_code != 0:
raise DiskError(f'Could not format {path} with {self.filesystem} because: {b"".join(handle)}') raise DiskError(f'Could not format {path} with {self.filesystem} because: {b"".join(handle)}')

View File

@ -15,9 +15,9 @@ def filter_mirrors_by_region(regions, destination='/etc/pacman.d/mirrorlist', tm
region_list = [] region_list = []
for region in regions.split(','): for region in regions.split(','):
region_list.append(f'country={region}') region_list.append(f'country={region}')
o = b''.join(sys_command((f"/usr/bin/wget 'https://archlinux.org/mirrorlist/?{'&'.join(region_list)}&protocol=https&ip_version=4&ip_version=6&use_mirror_status=on' -O {tmp_dir}/mirrorlist"))) o = b''.join(SysCommand(f"/usr/bin/wget 'https://archlinux.org/mirrorlist/?{'&'.join(region_list)}&protocol=https&ip_version=4&ip_version=6&use_mirror_status=on' -O {tmp_dir}/mirrorlist"))
o = b''.join(sys_command((f"/usr/bin/sed -i 's/#Server/Server/' {tmp_dir}/mirrorlist"))) o = b''.join(SysCommand(f"/usr/bin/sed -i 's/#Server/Server/' {tmp_dir}/mirrorlist"))
o = b''.join(sys_command((f"/usr/bin/mv {tmp_dir}/mirrorlist {destination}"))) o = b''.join(SysCommand(f"/usr/bin/mv {tmp_dir}/mirrorlist {destination}"))
return True return True
@ -71,7 +71,7 @@ def use_mirrors(regions: dict, destination='/etc/pacman.d/mirrorlist'):
def re_rank_mirrors(top=10, *positionals, **kwargs): def re_rank_mirrors(top=10, *positionals, **kwargs):
if sys_command((f'/usr/bin/rankmirrors -n {top} /etc/pacman.d/mirrorlist > /etc/pacman.d/mirrorlist')).exit_code == 0: if SysCommand(f'/usr/bin/rankmirrors -n {top} /etc/pacman.d/mirrorlist > /etc/pacman.d/mirrorlist').exit_code == 0:
return True return True
return False return False

View File

@ -5,7 +5,7 @@ import struct
from collections import OrderedDict from collections import OrderedDict
from .exceptions import * from .exceptions import *
from .general import sys_command from .general import SysCommand
from .storage import storage from .storage import storage
@ -53,11 +53,11 @@ def wireless_scan(interface):
if interfaces[interface] != 'WIRELESS': if interfaces[interface] != 'WIRELESS':
raise HardwareIncompatibilityError(f"Interface {interface} is not a wireless interface: {interfaces}") raise HardwareIncompatibilityError(f"Interface {interface} is not a wireless interface: {interfaces}")
sys_command(f"iwctl station {interface} scan") SysCommand(f"iwctl station {interface} scan")
if not '_WIFI' in storage: if '_WIFI' not in storage:
storage['_WIFI'] = {} storage['_WIFI'] = {}
if not interface in storage['_WIFI']: if interface not in storage['_WIFI']:
storage['_WIFI'][interface] = {} storage['_WIFI'][interface] = {}
storage['_WIFI'][interface]['scanning'] = True storage['_WIFI'][interface]['scanning'] = True
@ -66,10 +66,11 @@ def wireless_scan(interface):
# TODO: Full WiFi experience might get evolved in the future, pausing for now 2021-01-25 # TODO: Full WiFi experience might get evolved in the future, pausing for now 2021-01-25
def get_wireless_networks(interface): def get_wireless_networks(interface):
# TODO: Make this oneliner pritter to check if the interface is scanning or not. # TODO: Make this oneliner pritter to check if the interface is scanning or not.
if not '_WIFI' in storage or interface not in storage['_WIFI'] or storage['_WIFI'][interface].get('scanning', False) is False: if '_WIFI' not in storage or interface not in storage['_WIFI'] or storage['_WIFI'][interface].get('scanning', False) is False:
import time import time
wireless_scan(interface) wireless_scan(interface)
time.sleep(5) time.sleep(5)
for line in sys_command(f"iwctl station {interface} get-networks"): for line in SysCommand(f"iwctl station {interface} get-networks"):
print(line) print(line)

View File

@ -18,7 +18,7 @@ class LogLevels:
Debug = 0b111 Debug = 0b111
class journald(dict): class Journald(dict):
@abc.abstractmethod @abc.abstractmethod
def log(message, level=logging.DEBUG): def log(message, level=logging.DEBUG):
try: try:
@ -83,11 +83,11 @@ def stylize_output(text: str, *opts, **kwargs):
color_names = ('black', 'red', 'green', 'yellow', 'blue', 'magenta', 'cyan', 'white') color_names = ('black', 'red', 'green', 'yellow', 'blue', 'magenta', 'cyan', 'white')
foreground = {color_names[x]: '3%s' % x for x in range(8)} foreground = {color_names[x]: '3%s' % x for x in range(8)}
background = {color_names[x]: '4%s' % x for x in range(8)} background = {color_names[x]: '4%s' % x for x in range(8)}
RESET = '0' reset = '0'
code_list = [] code_list = []
if text == '' and len(opts) == 1 and opts[0] == 'reset': if text == '' and len(opts) == 1 and opts[0] == 'reset':
return '\x1b[%sm' % RESET return '\x1b[%sm' % reset
for k, v in kwargs.items(): for k, v in kwargs.items():
if k == 'fg': if k == 'fg':
code_list.append(foreground[v]) code_list.append(foreground[v])
@ -97,7 +97,7 @@ def stylize_output(text: str, *opts, **kwargs):
if o in opt_dict: if o in opt_dict:
code_list.append(opt_dict[o]) code_list.append(opt_dict[o])
if 'noreset' not in opts: if 'noreset' not in opts:
text = '%s\x1b[%sm' % (text or '', RESET) text = '%s\x1b[%sm' % (text or '', reset)
return '%s%s' % (('\x1b[%sm' % ';'.join(code_list)), text or '') return '%s%s' % (('\x1b[%sm' % ';'.join(code_list)), text or '')
@ -112,7 +112,7 @@ def log(*args, **kwargs):
# If a logfile is defined in storage, # If a logfile is defined in storage,
# we use that one to output everything # we use that one to output everything
if (filename := storage.get('LOG_FILE', None)): if filename := storage.get('LOG_FILE', None):
absolute_logfile = os.path.join(storage.get('LOG_PATH', './'), filename) absolute_logfile = os.path.join(storage.get('LOG_PATH', './'), filename)
try: try:
@ -155,13 +155,13 @@ def log(*args, **kwargs):
log("Deprecated level detected in log message, please use new logging.<level> instead for the following log message:", fg="red", level=logging.ERROR, force=True) log("Deprecated level detected in log message, please use new logging.<level> instead for the following log message:", fg="red", level=logging.ERROR, force=True)
kwargs['level'] = logging.DEBUG kwargs['level'] = logging.DEBUG
if kwargs['level'] > storage.get('LOG_LEVEL', logging.INFO) and not 'force' in kwargs: if kwargs['level'] > storage.get('LOG_LEVEL', logging.INFO) and 'force' not in kwargs:
# Level on log message was Debug, but output level is set to Info. # Level on log message was Debug, but output level is set to Info.
# In that case, we'll drop it. # In that case, we'll drop it.
return None return None
try: try:
journald.log(string, level=kwargs.get('level', logging.INFO)) Journald.log(string, level=kwargs.get('level', logging.INFO))
except ModuleNotFoundError: except ModuleNotFoundError:
pass # Ignore writing to journald pass # Ignore writing to journald

View File

@ -14,7 +14,7 @@ from .storage import storage
def grab_url_data(path): def grab_url_data(path):
safe_path = path[:path.find(':') + 1] + ''.join([item if item in ('/', '?', '=', '&') else urllib.parse.quote(item) for item in multisplit(path[path.find(':') + 1:], ('/', '?', '=', '&'))]) safe_path = path[: path.find(':') + 1] + ''.join([item if item in ('/', '?', '=', '&') else urllib.parse.quote(item) for item in multisplit(path[path.find(':') + 1:], ('/', '?', '=', '&'))])
ssl_context = ssl.create_default_context() ssl_context = ssl.create_default_context()
ssl_context.check_hostname = False ssl_context.check_hostname = False
ssl_context.verify_mode = ssl.CERT_NONE ssl_context.verify_mode = ssl.CERT_NONE
@ -75,7 +75,7 @@ def list_profiles(filter_irrelevant_macs=True, subpath='', filter_top_level_prof
if filter_top_level_profiles: if filter_top_level_profiles:
for profile in list(cache.keys()): for profile in list(cache.keys()):
if Profile(None, profile).is_top_level_profile() is False: if Profile(None, profile).is_top_level_profile() is False:
del (cache[profile]) del cache[profile]
return cache return cache
@ -154,7 +154,7 @@ class Script:
return self return self
def execute(self): def execute(self):
if not self.namespace in sys.modules or self.spec is None: if self.namespace not in sys.modules or self.spec is None:
self.load_instructions() self.load_instructions()
self.spec.loader.exec_module(sys.modules[self.namespace]) self.spec.loader.exec_module(sys.modules[self.namespace])
@ -163,8 +163,10 @@ class Script:
class Profile(Script): class Profile(Script):
def __init__(self, installer, path, args={}): def __init__(self, installer, path, args=None):
super(Profile, self).__init__(path, installer) super(Profile, self).__init__(path, installer)
if args is None:
args = {}
def __dump__(self, *args, **kwargs): def __dump__(self, *args, **kwargs):
return {'path': self.path} return {'path': self.path}

View File

@ -5,6 +5,6 @@ def service_state(service_name: str):
if os.path.splitext(service_name)[1] != '.service': if os.path.splitext(service_name)[1] != '.service':
service_name += '.service' # Just to be safe service_name += '.service' # Just to be safe
state = b''.join(sys_command(f'systemctl show --no-pager -p SubState --value {service_name}', environment_vars={'SYSTEMD_COLORS': '0'})) state = b''.join(SysCommand(f'systemctl show --no-pager -p SubState --value {service_name}', environment_vars={'SYSTEMD_COLORS': '0'}))
return state.strip().decode('UTF-8') return state.strip().decode('UTF-8')

View File

@ -18,5 +18,5 @@ storage = {
'PROFILE_DB': None, # Used in cases when listing profiles is desired, not mandatory for direct profile grabing. 'PROFILE_DB': None, # Used in cases when listing profiles is desired, not mandatory for direct profile grabing.
'LOG_PATH': '/var/log/archinstall', 'LOG_PATH': '/var/log/archinstall',
'LOG_FILE': 'install.log', 'LOG_FILE': 'install.log',
'MOUNT_POINT': '/mnt' 'MOUNT_POINT': '/mnt',
} }

View File

@ -12,8 +12,8 @@ import time
import tty import tty
from .exceptions import * from .exceptions import *
from .general import sys_command from .general import SysCommand
from .hardware import AVAILABLE_GFX_DRIVERS, hasUEFI from .hardware import AVAILABLE_GFX_DRIVERS, has_uefi
from .locale_helpers import list_keyboard_languages, verify_keyboard_layout, search_keyboard_layout from .locale_helpers import list_keyboard_languages, verify_keyboard_layout, search_keyboard_layout
from .networking import list_interfaces from .networking import list_interfaces
from .output import log from .output import log
@ -23,6 +23,7 @@ from .profiles import Profile
# TODO: Some inconsistencies between the selection processes. # TODO: Some inconsistencies between the selection processes.
# Some return the keys from the options, some the values? # Some return the keys from the options, some the values?
def get_terminal_height(): def get_terminal_height():
return shutil.get_terminal_size().lines return shutil.get_terminal_size().lines
@ -84,7 +85,7 @@ def do_countdown():
def get_password(prompt="Enter a password: "): def get_password(prompt="Enter a password: "):
while (passwd := getpass.getpass(prompt)): while passwd := getpass.getpass(prompt):
passwd_verification = getpass.getpass(prompt='And one more time for verification: ') passwd_verification = getpass.getpass(prompt='And one more time for verification: ')
if passwd != passwd_verification: if passwd != passwd_verification:
log(' * Passwords did not match * ', fg='red') log(' * Passwords did not match * ', fg='red')
@ -104,7 +105,7 @@ def print_large_list(options, padding=5, margin_bottom=0, separator=': '):
max_num_of_columns = get_terminal_width() // longest_line max_num_of_columns = get_terminal_width() // longest_line
max_options_in_cells = max_num_of_columns * (get_terminal_height() - margin_bottom) max_options_in_cells = max_num_of_columns * (get_terminal_height() - margin_bottom)
if (len(options) > max_options_in_cells): if len(options) > max_options_in_cells:
for index, option in enumerate(options): for index, option in enumerate(options):
print(f"{index}: {option}") print(f"{index}: {option}")
return 1, index return 1, index
@ -214,8 +215,10 @@ class MiniCurses:
self._cursor_x += len(text) self._cursor_x += len(text)
def clear(self, x, y): def clear(self, x, y):
if x < 0: x = 0 if x < 0:
if y < 0: y = 0 x = 0
if y < 0:
y = 0
# import time # import time
# sys.stdout.write(f"Clearing from: {x, y}") # sys.stdout.write(f"Clearing from: {x, y}")
@ -243,7 +246,7 @@ class MiniCurses:
return True return True
# Move back to the current known position (BACKSPACE doesn't updated x-pos) # Move back to the current known position (BACKSPACE doesn't updated x-pos)
sys.stdout.flush() sys.stdout.flush()
sys.stdout.write("\033[%dG" % (self._cursor_x)) sys.stdout.write("\033[%dG" % self._cursor_x)
sys.stdout.flush() sys.stdout.flush()
# Write a blank space # Write a blank space
@ -253,7 +256,7 @@ class MiniCurses:
# And move back again # And move back again
sys.stdout.flush() sys.stdout.flush()
sys.stdout.write("\033[%dG" % (self._cursor_x)) sys.stdout.write("\033[%dG" % self._cursor_x)
sys.stdout.flush() sys.stdout.flush()
self._cursor_x -= 1 self._cursor_x -= 1
@ -360,7 +363,7 @@ def ask_for_a_timezone():
def ask_for_bootloader() -> str: def ask_for_bootloader() -> str:
bootloader = "systemd-bootctl" bootloader = "systemd-bootctl"
if hasUEFI() == False: if not has_uefi():
bootloader = "grub-install" bootloader = "grub-install"
else: else:
bootloader_choice = input("Would you like to use GRUB as a bootloader instead of systemd-boot? [y/N] ").lower() bootloader_choice = input("Would you like to use GRUB as a bootloader instead of systemd-boot? [y/N] ").lower()
@ -401,8 +404,7 @@ def ask_to_configure_network():
for index, mode in enumerate(modes): for index, mode in enumerate(modes):
print(f"{index}: {mode}") print(f"{index}: {mode}")
mode = generic_select(['DHCP', 'IP'], f"Select which mode to configure for {nic} or leave blank for DHCP: ", mode = generic_select(['DHCP', 'IP'], f"Select which mode to configure for {nic} or leave blank for DHCP: ", options_output=False)
options_output=False)
if mode == 'IP': if mode == 'IP':
while 1: while 1:
ip = input(f"Enter the IP and subnet for {nic} (example: 192.168.0.5/24): ").strip() ip = input(f"Enter the IP and subnet for {nic} (example: 192.168.0.5/24): ").strip()
@ -450,11 +452,10 @@ def ask_for_disk_layout():
options = { options = {
'keep-existing': 'Keep existing partition layout and select which ones to use where', 'keep-existing': 'Keep existing partition layout and select which ones to use where',
'format-all': 'Format entire drive and setup a basic partition scheme', 'format-all': 'Format entire drive and setup a basic partition scheme',
'abort': 'Abort the installation' 'abort': 'Abort the installation',
} }
value = generic_select(options, "Found partitions on the selected drive, (select by number) what you want to do: ", value = generic_select(options, "Found partitions on the selected drive, (select by number) what you want to do: ", allow_empty_input=False, sort=True)
allow_empty_input=False, sort=True)
return next((key for key, val in options.items() if val == value), None) return next((key for key, val in options.items() if val == value), None)
@ -466,8 +467,7 @@ def ask_for_main_filesystem_format():
'f2fs': 'f2fs' 'f2fs': 'f2fs'
} }
value = generic_select(options, "Select which filesystem your main partition should use (by number or name): ", value = generic_select(options, "Select which filesystem your main partition should use (by number or name): ", allow_empty_input=False)
allow_empty_input=False)
return next((key for key, val in options.items() if val == value), None) return next((key for key, val in options.items() if val == value), None)
@ -584,8 +584,7 @@ def select_profile(options):
print(' -- They might make it easier to install things like desktop environments. --') print(' -- They might make it easier to install things like desktop environments. --')
print(' -- (Leave blank and hit enter to skip this step and continue) --') print(' -- (Leave blank and hit enter to skip this step and continue) --')
selected_profile = generic_select(profiles, 'Enter a pre-programmed profile name if you want to install one: ', selected_profile = generic_select(profiles, 'Enter a pre-programmed profile name if you want to install one: ', options_output=False)
options_output=False)
if selected_profile: if selected_profile:
return Profile(None, selected_profile) return Profile(None, selected_profile)
else: else:
@ -675,9 +674,7 @@ def select_mirror_regions(mirrors, show_top_mirrors=True):
print_large_list(regions, margin_bottom=4) print_large_list(regions, margin_bottom=4)
print(' -- You can skip this step by leaving the option blank --') print(' -- You can skip this step by leaving the option blank --')
selected_mirror = generic_select(regions, selected_mirror = generic_select(regions, 'Select one of the above regions to download packages from (by number or full name): ', options_output=False)
'Select one of the above regions to download packages from (by number or full name): ',
options_output=False)
if not selected_mirror: if not selected_mirror:
# Returning back empty options which can be both used to # Returning back empty options which can be both used to
# do "if x:" logic as well as do `x.get('mirror', {}).get('sub', None)` chaining # do "if x:" logic as well as do `x.get('mirror', {}).get('sub', None)` chaining
@ -708,7 +705,7 @@ def select_driver(options=AVAILABLE_GFX_DRIVERS):
default_option = options["All open-source (default)"] default_option = options["All open-source (default)"]
if drivers: if drivers:
lspci = sys_command('/usr/bin/lspci') lspci = SysCommand('/usr/bin/lspci')
for line in lspci.trace_log.split(b'\r\n'): for line in lspci.trace_log.split(b'\r\n'):
if b' vga ' in line.lower(): if b' vga ' in line.lower():
if b'nvidia' in line.lower(): if b'nvidia' in line.lower():

View File

@ -1,7 +1,7 @@
.. _archinstall.helpers: .. _archinstall.helpers:
.. warning:: .. warning::
All these helper functions are mostly, if not all, related to outside-installation-instructions. Meaning the calls will affect your current running system - and not touch your installed system. All these helper functions are mostly, if not all, related to outside-installation-instructions. Meaning the calls will affect your current running system - and not touch your installed system.
Profile related helpers Profile related helpers
======================= =======================

View File

@ -8,8 +8,8 @@ sys.path.insert(0, os.path.abspath('..'))
def process_docstring(app, what, name, obj, options, lines): def process_docstring(app, what, name, obj, options, lines):
spaces_pat = re.compile(r"( {8})") spaces_pat = re.compile(r"( {8})")
ll = [] ll = []
for l in lines: for line in lines:
ll.append(spaces_pat.sub(" ", l)) ll.append(spaces_pat.sub(" ", line))
lines[:] = ll lines[:] = ll
@ -123,8 +123,5 @@ man_pages = [("index", "archinstall", u"archinstall Documentation", [u"Anton Hvo
# (source start file, target name, title, author, # (source start file, target name, title, author,
# dir menu entry, description, category) # dir menu entry, description, category)
texinfo_documents = [ texinfo_documents = [
( ("index", "archinstall", u"archinstall Documentation", u"Anton Hvornum", "archinstall", "Simple and minimal HTTP server."),
"index", "archinstall", u"archinstall Documentation",
u"Anton Hvornum", "archinstall", "Simple and minimal HTTP server."
),
] ]

View File

@ -3,7 +3,7 @@ import logging
import time import time
import archinstall import archinstall
from archinstall.lib.hardware import hasUEFI from archinstall.lib.hardware import has_uefi
if archinstall.arguments.get('help'): if archinstall.arguments.get('help'):
print("See `man archinstall` for help.") print("See `man archinstall` for help.")
@ -84,8 +84,7 @@ def ask_user_questions():
# Select a partition # Select a partition
# If we provide keys as options, it's better to convert them to list and sort before passing # If we provide keys as options, it's better to convert them to list and sort before passing
mountpoints_list = sorted(list(partition_mountpoints.keys())) mountpoints_list = sorted(list(partition_mountpoints.keys()))
partition = archinstall.generic_select(mountpoints_list, partition = archinstall.generic_select(mountpoints_list, "Select a partition by number that you want to set a mount-point for (leave blank when done): ")
"Select a partition by number that you want to set a mount-point for (leave blank when done): ")
if not partition: if not partition:
if set(mountpoints_set) & {'/', '/boot'} == {'/', '/boot'}: if set(mountpoints_set) & {'/', '/boot'} == {'/', '/boot'}:
break break
@ -105,7 +104,7 @@ def ask_user_questions():
if not old_password: if not old_password:
old_password = input(f'Enter the old encryption password for {partition}: ') old_password = input(f'Enter the old encryption password for {partition}: ')
if (autodetected_filesystem := partition.detect_inner_filesystem(old_password)): if autodetected_filesystem := partition.detect_inner_filesystem(old_password):
new_filesystem = autodetected_filesystem new_filesystem = autodetected_filesystem
else: else:
archinstall.log("Could not auto-detect the filesystem inside the encrypted volume.", fg='red') archinstall.log("Could not auto-detect the filesystem inside the encrypted volume.", fg='red')
@ -258,7 +257,7 @@ def perform_installation_steps():
Once that's done, we'll hand over to perform_installation() Once that's done, we'll hand over to perform_installation()
""" """
mode = archinstall.GPT mode = archinstall.GPT
if hasUEFI() is False: if has_uefi() is False:
mode = archinstall.MBR mode = archinstall.MBR
with archinstall.Filesystem(archinstall.arguments['harddrive'], mode) as fs: with archinstall.Filesystem(archinstall.arguments['harddrive'], mode) as fs:
@ -295,7 +294,7 @@ def perform_installation_steps():
else: else:
fs.find_partition('/').mount('/mnt') fs.find_partition('/').mount('/mnt')
if hasUEFI(): if has_uefi():
fs.find_partition('/boot').mount('/mnt/boot') fs.find_partition('/boot').mount('/mnt/boot')
perform_installation('/mnt') perform_installation('/mnt')
@ -322,7 +321,7 @@ def perform_installation(mountpoint):
installation.set_hostname(archinstall.arguments['hostname']) installation.set_hostname(archinstall.arguments['hostname'])
if archinstall.arguments['mirror-region'].get("mirrors", None) is not None: if archinstall.arguments['mirror-region'].get("mirrors", None) is not None:
installation.set_mirrors(archinstall.arguments['mirror-region']) # Set the mirrors in the installation medium installation.set_mirrors(archinstall.arguments['mirror-region']) # Set the mirrors in the installation medium
if archinstall.arguments["bootloader"] == "grub-install" and hasUEFI() == True: if archinstall.arguments["bootloader"] == "grub-install" and has_uefi():
installation.add_additional_packages("grub") installation.add_additional_packages("grub")
installation.set_keyboard_language(archinstall.arguments['keyboard-language']) installation.set_keyboard_language(archinstall.arguments['keyboard-language'])
installation.add_bootloader(archinstall.arguments["bootloader"]) installation.add_bootloader(archinstall.arguments["bootloader"])
@ -330,7 +329,7 @@ def perform_installation(mountpoint):
# If user selected to copy the current ISO network configuration # If user selected to copy the current ISO network configuration
# Perform a copy of the config # Perform a copy of the config
if archinstall.arguments.get('nic', {}) == 'Copy ISO network configuration to installation': if archinstall.arguments.get('nic', {}) == 'Copy ISO network configuration to installation':
installation.copy_ISO_network_config(enable_services=True) # Sources the ISO network configuration to the install medium. installation.copy_iso_network_config(enable_services=True) # Sources the ISO network configuration to the install medium.
elif archinstall.arguments.get('nic', {}).get('NetworkManager', False): elif archinstall.arguments.get('nic', {}).get('NetworkManager', False):
installation.add_additional_packages("networkmanager") installation.add_additional_packages("networkmanager")
installation.enable_service('NetworkManager.service') installation.enable_service('NetworkManager.service')
@ -373,10 +372,7 @@ def perform_installation(mountpoint):
if archinstall.arguments['profile'] and archinstall.arguments['profile'].has_post_install(): if archinstall.arguments['profile'] and archinstall.arguments['profile'].has_post_install():
with archinstall.arguments['profile'].load_instructions(namespace=f"{archinstall.arguments['profile'].namespace}.py") as imported: with archinstall.arguments['profile'].load_instructions(namespace=f"{archinstall.arguments['profile'].namespace}.py") as imported:
if not imported._post_install(): if not imported._post_install():
archinstall.log( archinstall.log(' * Profile\'s post configuration requirements was not fulfilled.', fg='red')
' * Profile\'s post configuration requirements was not fulfilled.',
fg='red'
)
exit(1) exit(1)
installation.log("For post-installation tips, see https://wiki.archlinux.org/index.php/Installation_guide#Post-installation", fg="yellow") installation.log("For post-installation tips, see https://wiki.archlinux.org/index.php/Installation_guide#Post-installation", fg="yellow")

View File

@ -23,7 +23,7 @@ def install_on(mountpoint):
# Optionally enable networking: # Optionally enable networking:
if archinstall.arguments.get('network', None): if archinstall.arguments.get('network', None):
installation.copy_ISO_network_config(enable_services=True) installation.copy_iso_network_config(enable_services=True)
installation.add_additional_packages(['nano', 'wget', 'git']) installation.add_additional_packages(['nano', 'wget', 'git'])
installation.install_profile('minimal') installation.install_profile('minimal')

View File

@ -1,6 +1,16 @@
import archinstall import archinstall
__packages__ = ["awesome", "xorg-xrandr", "xterm", "feh", "slock", "terminus-font", "gnu-free-fonts", "ttf-liberation", "xsel"] __packages__ = [
"awesome",
"xorg-xrandr",
"xterm",
"feh",
"slock",
"terminus-font",
"gnu-free-fonts",
"ttf-liberation",
"xsel",
]
archinstall.storage['installation_session'].install_profile('xorg') archinstall.storage['installation_session'].install_profile('xorg')

View File

@ -2,7 +2,11 @@ import archinstall
# Define the package list in order for lib to source # Define the package list in order for lib to source
# which packages will be installed by this profile # which packages will be installed by this profile
__packages__ = ["cockpit", "udisks2", "packagekit"] __packages__ = [
"cockpit",
"udisks2",
"packagekit",
]
archinstall.storage['installation_session'].add_additional_packages(__packages__) archinstall.storage['installation_session'].add_additional_packages(__packages__)

View File

@ -6,7 +6,12 @@ is_top_level_profile = False
# New way of defining packages for a profile, which is iterable and can be used out side # New way of defining packages for a profile, which is iterable and can be used out side
# of the profile to get a list of "what packages will be installed". # of the profile to get a list of "what packages will be installed".
__packages__ = ['nemo', 'gpicview', 'main', 'alacritty'] __packages__ = [
"nemo",
"gpicview",
"main",
"alacritty",
]
def _prep_function(*args, **kwargs): def _prep_function(*args, **kwargs):

View File

@ -5,7 +5,12 @@ import archinstall
is_top_level_profile = False is_top_level_profile = False
# "It is recommended also to install the gnome group, which contains applications required for the standard GNOME experience." - Arch Wiki # "It is recommended also to install the gnome group, which contains applications required for the standard GNOME experience." - Arch Wiki
__packages__ = ["budgie-desktop", "lightdm", "lightdm-gtk-greeter", "gnome"] __packages__ = [
"budgie-desktop",
"gnome",
"lightdm",
"lightdm-gtk-greeter",
]
def _prep_function(*args, **kwargs): def _prep_function(*args, **kwargs):

View File

@ -4,7 +4,13 @@ import archinstall
is_top_level_profile = False is_top_level_profile = False
__packages__ = ["deepin", "deepin-terminal", "deepin-editor", "lightdm", "lightdm-gtk-greeter"] __packages__ = [
"deepin",
"deepin-terminal",
"deepin-editor",
"lightdm",
"lightdm-gtk-greeter",
]
def _prep_function(*args, **kwargs): def _prep_function(*args, **kwargs):

View File

@ -43,9 +43,7 @@ def _prep_function(*args, **kwargs):
'enlightenment', 'enlightenment',
] ]
desktop = archinstall.generic_select( desktop = archinstall.generic_select(supported_desktops, 'Select your desired desktop environment: ', allow_empty_input=False, sort=True)
supported_desktops, 'Select your desired desktop environment: ', allow_empty_input=False, sort=True
)
# Temporarily store the selected desktop profile # Temporarily store the selected desktop profile
# in a session-safe location, since this module will get reloaded # in a session-safe location, since this module will get reloaded

View File

@ -4,7 +4,12 @@ import archinstall
is_top_level_profile = False is_top_level_profile = False
__packages__ = ["enlightenment", "terminology", "lightdm", "lightdm-gtk-greeter"] __packages__ = [
"enlightenment",
"terminology",
"lightdm",
"lightdm-gtk-greeter",
]
def _prep_function(*args, **kwargs): def _prep_function(*args, **kwargs):

View File

@ -5,7 +5,11 @@ import archinstall
is_top_level_profile = False is_top_level_profile = False
# Note: GDM should be part of the gnome group, but adding it here for clarity # Note: GDM should be part of the gnome group, but adding it here for clarity
__packages__ = ["gnome", "gnome-tweaks", "gdm"] __packages__ = [
"gnome",
"gnome-tweaks",
"gdm",
]
def _prep_function(*args, **kwargs): def _prep_function(*args, **kwargs):

View File

@ -6,7 +6,15 @@ is_top_level_profile = False
# New way of defining packages for a profile, which is iterable and can be used out side # New way of defining packages for a profile, which is iterable and can be used out side
# of the profile to get a list of "what packages will be installed". # of the profile to get a list of "what packages will be installed".
__packages__ = ['i3lock', 'i3status', 'i3blocks', 'xterm', 'lightdm-gtk-greeter', 'lightdm', 'dmenu'] __packages__ = [
'i3lock',
'i3status',
'i3blocks',
'xterm',
'lightdm-gtk-greeter',
'lightdm',
'dmenu',
]
def _prep_function(*args, **kwargs): def _prep_function(*args, **kwargs):
@ -18,9 +26,7 @@ def _prep_function(*args, **kwargs):
""" """
supported_configurations = ['i3-wm', 'i3-gaps'] supported_configurations = ['i3-wm', 'i3-gaps']
desktop = archinstall.generic_select( desktop = archinstall.generic_select(supported_configurations, 'Select your desired configuration: ', allow_empty_input=False, sort=True)
supported_configurations, 'Select your desired configuration: ', allow_empty_input=False, sort=True
)
# Temporarily store the selected desktop profile # Temporarily store the selected desktop profile
# in a session-safe location, since this module will get reloaded # in a session-safe location, since this module will get reloaded

View File

@ -4,7 +4,15 @@ import archinstall
is_top_level_profile = False is_top_level_profile = False
__packages__ = ["plasma-meta", "konsole", "kate", "dolphin", "sddm", "plasma-wayland-session", "egl-wayland"] __packages__ = [
"plasma-meta",
"konsole",
"kate",
"dolphin",
"sddm",
"plasma-wayland-session",
"egl-wayland",
]
# TODO: Remove hard dependency of bash (due to .bash_profile) # TODO: Remove hard dependency of bash (due to .bash_profile)

View File

@ -4,7 +4,12 @@ import archinstall
is_top_level_profile = False is_top_level_profile = False
__packages__ = ["mate", "mate-extra", "lightdm", "lightdm-gtk-greeter"] __packages__ = [
"mate",
"mate-extra",
"lightdm",
"lightdm-gtk-greeter",
]
def _prep_function(*args, **kwargs): def _prep_function(*args, **kwargs):

View File

@ -6,7 +6,17 @@ import archinstall
is_top_level_profile = True is_top_level_profile = True
available_servers = ["cockpit", "docker", "httpd", "lighttpd", "mariadb", "nginx", "postgresql", "sshd", "tomcat"] available_servers = [
"cockpit",
"docker",
"httpd",
"lighttpd",
"mariadb",
"nginx",
"postgresql",
"sshd",
"tomcat",
]
def _prep_function(*args, **kwargs): def _prep_function(*args, **kwargs):

View File

@ -35,9 +35,7 @@ def _prep_function(*args, **kwargs):
# or through conventional import sway # or through conventional import sway
if __name__ == "sway": if __name__ == "sway":
if "nvidia" in _gfx_driver_packages: if "nvidia" in _gfx_driver_packages:
choice = input( choice = input("The proprietary Nvidia driver is not supported by Sway. It is likely that you will run into issues. Continue anyways? [y/N] ")
"The proprietary Nvidia driver is not supported by Sway. It is likely that you will run into issues. Continue anyways? [y/N] "
)
if choice.lower() in ("n", ""): if choice.lower() in ("n", ""):
raise archinstall.lib.exceptions.HardwareIncompatibilityError("Sway does not support the proprietary nvidia drivers.") raise archinstall.lib.exceptions.HardwareIncompatibilityError("Sway does not support the proprietary nvidia drivers.")

View File

@ -4,7 +4,12 @@ import archinstall
is_top_level_profile = False is_top_level_profile = False
__packages__ = ["xfce4", "xfce4-goodies", "lightdm", "lightdm-gtk-greeter"] __packages__ = [
"xfce4",
"xfce4-goodies",
"lightdm",
"lightdm-gtk-greeter",
]
def _prep_function(*args, **kwargs): def _prep_function(*args, **kwargs):

View File

@ -4,7 +4,14 @@ import archinstall
is_top_level_profile = True is_top_level_profile = True
__packages__ = ['dkms', 'xorg-server', 'xorg-xinit', 'nvidia-dkms', 'xorg-server', *archinstall.lib.hardware.__packages__] __packages__ = [
'dkms',
'xorg-server',
'xorg-xinit',
'nvidia-dkms',
'xorg-server',
*archinstall.lib.hardware.__packages__,
]
def _prep_function(*args, **kwargs): def _prep_function(*args, **kwargs):