Mergining in PR #112. Reworked partitioning and added new functions
This commit is contained in:
commit
e2aeb3a32f
|
|
@ -12,4 +12,19 @@ from .lib.services import *
|
|||
from .lib.packages import *
|
||||
from .lib.output import *
|
||||
from .lib.storage import *
|
||||
from .lib.hardware import *
|
||||
from .lib.hardware import *
|
||||
|
||||
## Basic version of arg.parse() supporting:
|
||||
## --key=value
|
||||
## --boolean
|
||||
arguments = {}
|
||||
positionals = []
|
||||
for arg in sys.argv[1:]:
|
||||
if '--' == arg[:2]:
|
||||
if '=' in arg:
|
||||
key, val = [x.strip() for x in arg[2:].split('=', 1)]
|
||||
else:
|
||||
key, val = arg[2:], True
|
||||
arguments[key] = val
|
||||
else:
|
||||
positionals.append(arg)
|
||||
|
|
@ -1,4 +1,5 @@
|
|||
import glob, re, os, json, time # Time is only used to gracefully wait for new paritions to come online
|
||||
import glob, re, os, json, time, hashlib
|
||||
import pathlib
|
||||
from collections import OrderedDict
|
||||
from .exceptions import DiskError
|
||||
from .general import *
|
||||
|
|
@ -7,6 +8,7 @@ from .storage import storage
|
|||
|
||||
ROOT_DIR_PATTERN = re.compile('^.*?/devices')
|
||||
GPT = 0b00000001
|
||||
MBR = 0b00000010
|
||||
|
||||
#import ctypes
|
||||
#import ctypes.util
|
||||
|
|
@ -14,14 +16,27 @@ GPT = 0b00000001
|
|||
#libc.mount.argtypes = (ctypes.c_char_p, ctypes.c_char_p, ctypes.c_char_p, ctypes.c_ulong, ctypes.c_char_p)
|
||||
|
||||
class BlockDevice():
|
||||
def __init__(self, path, info):
|
||||
def __init__(self, path, info=None):
|
||||
if not info:
|
||||
# If we don't give any information, we need to auto-fill it.
|
||||
# Otherwise any subsequent usage will break.
|
||||
info = all_disks()[path].info
|
||||
|
||||
self.path = path
|
||||
self.info = info
|
||||
self.part_cache = OrderedDict()
|
||||
# TODO: Currently disk encryption is a BIT missleading.
|
||||
# It's actually partition-encryption, but for future-proofing this
|
||||
# I'm placing the encryption password on a BlockDevice level.
|
||||
self.encryption_passwoed = None
|
||||
|
||||
def __repr__(self, *args, **kwargs):
|
||||
return f"BlockDevice({self.device})"
|
||||
|
||||
def __iter__(self):
|
||||
for partition in self.partitions:
|
||||
yield self.partitions[partition]
|
||||
|
||||
def __getitem__(self, key, *args, **kwargs):
|
||||
if key not in self.info:
|
||||
raise KeyError(f'{self} does not contain information: "{key}"')
|
||||
|
|
@ -91,7 +106,8 @@ class BlockDevice():
|
|||
part_id = part['name'][len(os.path.basename(self.path)):]
|
||||
if part_id not in self.part_cache:
|
||||
## TODO: Force over-write even if in cache?
|
||||
self.part_cache[part_id] = Partition(root_path + part_id, part_id=part_id, size=part['size'])
|
||||
if part_id not in self.part_cache or self.part_cache[part_id].size != part['size']:
|
||||
self.part_cache[part_id] = Partition(root_path + part_id, part_id=part_id, size=part['size'])
|
||||
|
||||
return {k: self.part_cache[k] for k in sorted(self.part_cache)}
|
||||
|
||||
|
|
@ -100,59 +116,68 @@ class BlockDevice():
|
|||
all_partitions = self.partitions
|
||||
return [all_partitions[k] for k in all_partitions]
|
||||
|
||||
@property
|
||||
def partition_table_type(self):
|
||||
return GPT
|
||||
|
||||
def has_partitions(self):
|
||||
return len(self.partitions)
|
||||
|
||||
def has_mount_point(self, mountpoint):
|
||||
for partition in self.partitions:
|
||||
if self.partitions[partition].mountpoint == mountpoint:
|
||||
return True
|
||||
return False
|
||||
|
||||
class Partition():
|
||||
def __init__(self, path, part_id=None, size=-1, filesystem=None, mountpoint=None, encrypted=False):
|
||||
def __init__(self, path, part_id=None, size=-1, filesystem=None, mountpoint=None, encrypted=False, autodetect_filesystem=True):
|
||||
if not part_id:
|
||||
part_id = os.path.basename(path)
|
||||
self.path = path
|
||||
self.part_id = part_id
|
||||
self.mountpoint = mountpoint
|
||||
self.filesystem = filesystem # TODO: Autodetect if we're reusing a partition
|
||||
self.target_mountpoint = mountpoint
|
||||
self.filesystem = filesystem
|
||||
self.size = size # TODO: Refresh?
|
||||
self.encrypted = encrypted
|
||||
self.allow_formatting = False # A fail-safe for unconfigured partitions, such as windows NTFS partitions.
|
||||
|
||||
if mountpoint:
|
||||
self.mount(mountpoint)
|
||||
|
||||
mount_information = get_mount_info(self.path)
|
||||
|
||||
if self.mountpoint != mount_information.get('target', None) and mountpoint:
|
||||
raise DiskError(f"{self} was given a mountpoint but the actual mountpoint differs: {mount_information.get('target', None)}")
|
||||
|
||||
if (target := mount_information.get('target', None)):
|
||||
self.mountpoint = target
|
||||
|
||||
if not self.filesystem and autodetect_filesystem:
|
||||
if (fstype := mount_information.get('fstype', get_filesystem_type(self.real_device))):
|
||||
self.filesystem = fstype
|
||||
|
||||
if self.filesystem == 'crypto_LUKS':
|
||||
self.encrypted = True
|
||||
|
||||
def __lt__(self, left_comparitor):
|
||||
if type(left_comparitor) == Partition:
|
||||
left_comparitor = left_comparitor.path
|
||||
else:
|
||||
left_comparitor = str(left_comparitor)
|
||||
return self.path < left_comparitor # Not quite sure the order here is correct. But /dev/nvme0n1p1 comes before /dev/nvme0n1p5 so seems correct.
|
||||
|
||||
def __repr__(self, *args, **kwargs):
|
||||
mount_repr = ''
|
||||
if self.mountpoint:
|
||||
mount_repr = f", mounted={self.mountpoint}"
|
||||
elif self.target_mountpoint:
|
||||
mount_repr = f", rel_mountpoint={self.target_mountpoint}"
|
||||
|
||||
if self.encrypted:
|
||||
return f'Partition(path={self.path}, real_device={self.real_device}, fs={self.filesystem}, mounted={self.mountpoint})'
|
||||
return f'Partition(path={self.path}, real_device={self.real_device}, fs={self.filesystem}{mount_repr})'
|
||||
else:
|
||||
return f'Partition(path={self.path}, fs={self.filesystem}, mounted={self.mountpoint})'
|
||||
|
||||
def format(self, filesystem):
|
||||
log(f'Formatting {self} -> {filesystem}', level=LOG_LEVELS.Info)
|
||||
if filesystem == 'btrfs':
|
||||
o = b''.join(sys_command(f'/usr/bin/mkfs.btrfs -f {self.path}'))
|
||||
if b'UUID' not in o:
|
||||
raise DiskError(f'Could not format {self.path} with {filesystem} because: {o}')
|
||||
self.filesystem = 'btrfs'
|
||||
elif filesystem == 'fat32':
|
||||
o = b''.join(sys_command(f'/usr/bin/mkfs.vfat -F32 {self.path}'))
|
||||
if (b'mkfs.fat' not in o and b'mkfs.vfat' not in o) or b'command not found' in o:
|
||||
raise DiskError(f'Could not format {self.path} with {filesystem} because: {o}')
|
||||
self.filesystem = 'fat32'
|
||||
elif filesystem == 'ext4':
|
||||
if (handle := sys_command(f'/usr/bin/mkfs.ext4 -F {self.path}')).exit_code != 0:
|
||||
raise DiskError(f'Could not format {self.path} with {filesystem} because: {b"".join(handle)}')
|
||||
self.filesystem = 'ext4'
|
||||
elif filesystem == 'xfs':
|
||||
if (handle:= sys_command(f'/usr/bin/mkfs.xfs -f {self.path}')).exit_code != 0:
|
||||
raise DiskError(f'Could not format {self.path} with {filesystem} because: {b"".join(handle)}')
|
||||
self.filesystem = 'xfs'
|
||||
elif filesystem == 'f2fs':
|
||||
if (handle:= sys_command(f'/usr/bin/mkfs.f2fs -f {self.path}')).exit_code != 0:
|
||||
raise DiskError(f'Could not format {self.path} with {filesystem} because: {b"".join(handle)}')
|
||||
self.filesystem = 'f2fs'
|
||||
else:
|
||||
raise DiskError(f'Fileformat {filesystem} is not yet implemented.')
|
||||
return True
|
||||
|
||||
def find_parent_of(self, data, name, parent=None):
|
||||
if data['name'] == name:
|
||||
return parent
|
||||
elif 'children' in data:
|
||||
for child in data['children']:
|
||||
if (parent := self.find_parent_of(child, name, parent=data['name'])):
|
||||
return parent
|
||||
return f'Partition(path={self.path}, fs={self.filesystem}{mount_repr})'
|
||||
|
||||
@property
|
||||
def real_device(self):
|
||||
|
|
@ -164,6 +189,114 @@ class Partition():
|
|||
return f"/dev/{parent}"
|
||||
raise DiskError(f'Could not find appropriate parent for encrypted partition {self}')
|
||||
|
||||
def detect_inner_filesystem(self, password):
|
||||
log(f'Trying to detect inner filesystem format on {self} (This might take a while)', level=LOG_LEVELS.Info)
|
||||
from .luks import luks2
|
||||
with luks2(self, 'luksloop', password, auto_unmount=True) as unlocked_device:
|
||||
return unlocked_device.filesystem
|
||||
|
||||
def has_content(self):
|
||||
temporary_mountpoint = '/tmp/'+hashlib.md5(bytes(f"{time.time()}", 'UTF-8')+os.urandom(12)).hexdigest()
|
||||
temporary_path = pathlib.Path(temporary_mountpoint)
|
||||
|
||||
temporary_path.mkdir(parents=True, exist_ok=True)
|
||||
if (handle := sys_command(f'/usr/bin/mount {self.path} {temporary_mountpoint}')).exit_code != 0:
|
||||
raise DiskError(f'Could not mount and check for content on {self.path} because: {b"".join(handle)}')
|
||||
|
||||
files = len(glob.glob(f"{temporary_mountpoint}/*"))
|
||||
sys_command(f'/usr/bin/umount {temporary_mountpoint}')
|
||||
|
||||
temporary_path.rmdir()
|
||||
|
||||
return True if files > 0 else False
|
||||
|
||||
def safe_to_format(self):
|
||||
if self.allow_formatting is False:
|
||||
return False
|
||||
elif self.target_mountpoint == '/boot' and self.has_content():
|
||||
return False
|
||||
|
||||
return True
|
||||
|
||||
def encrypt(self, *args, **kwargs):
|
||||
"""
|
||||
A wrapper function for luks2() instances and the .encrypt() method of that instance.
|
||||
"""
|
||||
from .luks import luks2
|
||||
|
||||
if not self.encrypted:
|
||||
raise DiskError(f"Attempting to encrypt a partition that was not marked for encryption: {self}")
|
||||
|
||||
if not self.safe_to_format():
|
||||
return False
|
||||
|
||||
handle = luks2(self, None, None)
|
||||
return handle.encrypt(self, *args, **kwargs)
|
||||
|
||||
def format(self, filesystem=None, path=None, allow_formatting=None, log_formating=True):
|
||||
"""
|
||||
Format can be given an overriding path, for instance /dev/null to test
|
||||
the formating functionality and in essence the support for the given filesystem.
|
||||
"""
|
||||
if filesystem is None:
|
||||
filesystem = self.filesystem
|
||||
|
||||
if path is None:
|
||||
path = self.path
|
||||
if allow_formatting is None:
|
||||
allow_formatting = self.allow_formatting
|
||||
|
||||
if not allow_formatting:
|
||||
raise PermissionError(f"{self} is not formatable either because instance is locked ({self.allow_formatting}) or a blocking flag was given ({allow_formatting})")
|
||||
|
||||
if log_formating:
|
||||
log(f'Formatting {path} -> {filesystem}', level=LOG_LEVELS.Info)
|
||||
|
||||
if filesystem == 'btrfs':
|
||||
o = b''.join(sys_command(f'/usr/bin/mkfs.btrfs -f {path}'))
|
||||
if b'UUID' not in o:
|
||||
raise DiskError(f'Could not format {path} with {filesystem} because: {o}')
|
||||
self.filesystem = 'btrfs'
|
||||
|
||||
elif filesystem == 'vfat':
|
||||
o = b''.join(sys_command(f'/usr/bin/mkfs.vfat -F32 {path}'))
|
||||
if (b'mkfs.fat' not in o and b'mkfs.vfat' not in o) or b'command not found' in o:
|
||||
raise DiskError(f'Could not format {path} with {filesystem} because: {o}')
|
||||
self.filesystem = 'vfat'
|
||||
|
||||
elif filesystem == 'ext4':
|
||||
if (handle := sys_command(f'/usr/bin/mkfs.ext4 -F {path}')).exit_code != 0:
|
||||
raise DiskError(f'Could not format {path} with {filesystem} because: {b"".join(handle)}')
|
||||
self.filesystem = 'ext4'
|
||||
|
||||
elif filesystem == 'xfs':
|
||||
if (handle := sys_command(f'/usr/bin/mkfs.xfs -f {path}')).exit_code != 0:
|
||||
raise DiskError(f'Could not format {path} with {filesystem} because: {b"".join(handle)}')
|
||||
self.filesystem = 'xfs'
|
||||
|
||||
elif filesystem == 'f2fs':
|
||||
if (handle := sys_command(f'/usr/bin/mkfs.f2fs -f {path}')).exit_code != 0:
|
||||
raise DiskError(f'Could not format {path} with {filesystem} because: {b"".join(handle)}')
|
||||
self.filesystem = 'f2fs'
|
||||
|
||||
elif filesystem == 'crypto_LUKS':
|
||||
# from .luks import luks2
|
||||
# encrypted_partition = luks2(self, None, None)
|
||||
# encrypted_partition.format(path)
|
||||
self.filesystem = 'crypto_LUKS'
|
||||
|
||||
else:
|
||||
raise UnknownFilesystemFormat(f"Fileformat '{filesystem}' is not yet implemented.")
|
||||
return True
|
||||
|
||||
def find_parent_of(self, data, name, parent=None):
|
||||
if data['name'] == name:
|
||||
return parent
|
||||
elif 'children' in data:
|
||||
for child in data['children']:
|
||||
if (parent := self.find_parent_of(child, name, parent=data['name'])):
|
||||
return parent
|
||||
|
||||
def mount(self, target, fs=None, options=''):
|
||||
if not self.mountpoint:
|
||||
log(f'Mounting {self} to {target}', level=LOG_LEVELS.Info)
|
||||
|
|
@ -179,6 +312,21 @@ class Partition():
|
|||
self.mountpoint = target
|
||||
return True
|
||||
|
||||
def filesystem_supported(self):
|
||||
"""
|
||||
The support for a filesystem (this partition) is tested by calling
|
||||
partition.format() with a path set to '/dev/null' which returns two exceptions:
|
||||
1. SysCallError saying that /dev/null is not formattable - but the filesystem is supported
|
||||
2. UnknownFilesystemFormat that indicates that we don't support the given filesystem type
|
||||
"""
|
||||
try:
|
||||
self.format(self.filesystem, '/dev/null', log_formating=False, allow_formatting=True)
|
||||
except SysCallError:
|
||||
pass # We supported it, but /dev/null is not formatable as expected so the mkfs call exited with an error code
|
||||
except UnknownFilesystemFormat as err:
|
||||
raise err
|
||||
return True
|
||||
|
||||
class Filesystem():
|
||||
# TODO:
|
||||
# When instance of a HDD is selected, check all usages and gracefully unmount them
|
||||
|
|
@ -188,13 +336,23 @@ class Filesystem():
|
|||
self.mode = mode
|
||||
|
||||
def __enter__(self, *args, **kwargs):
|
||||
if self.mode == GPT:
|
||||
if sys_command(f'/usr/bin/parted -s {self.blockdevice.device} mklabel gpt',).exit_code == 0:
|
||||
return self
|
||||
if self.blockdevice.keep_partitions is False:
|
||||
log(f'Wiping {self.blockdevice} by using partition format {self.mode}', level=LOG_LEVELS.Debug)
|
||||
if self.mode == GPT:
|
||||
if sys_command(f'/usr/bin/parted -s {self.blockdevice.device} mklabel gpt',).exit_code == 0:
|
||||
return self
|
||||
else:
|
||||
raise DiskError(f'Problem setting the partition format to GPT:', f'/usr/bin/parted -s {self.blockdevice.device} mklabel gpt')
|
||||
else:
|
||||
raise DiskError(f'Problem setting the partition format to GPT:', f'/usr/bin/parted -s {self.blockdevice.device} mklabel gpt')
|
||||
raise DiskError(f'Unknown mode selected to format in: {self.mode}')
|
||||
|
||||
# TODO: partition_table_type is hardcoded to GPT at the moment. This has to be changed.
|
||||
elif self.mode == self.blockdevice.partition_table_type:
|
||||
log(f'Kept partition format {self.mode} for {self.blockdevice}', level=LOG_LEVELS.Debug)
|
||||
else:
|
||||
raise DiskError(f'Unknown mode selected to format in: {self.mode}')
|
||||
raise DiskError(f'The selected partition table format {self.mode} does not match that of {self.blockdevice}.')
|
||||
|
||||
return self
|
||||
|
||||
def __repr__(self):
|
||||
return f"Filesystem(blockdevice={self.blockdevice}, mode={self.mode})"
|
||||
|
|
@ -206,6 +364,11 @@ class Filesystem():
|
|||
b''.join(sys_command(f'sync'))
|
||||
return True
|
||||
|
||||
def find_partition(self, mountpoint):
|
||||
for partition in self.blockdevice:
|
||||
if partition.target_mountpoint == mountpoint or partition.mountpoint == mountpoint:
|
||||
return partition
|
||||
|
||||
def raw_parted(self, string:str):
|
||||
x = sys_command(f'/usr/bin/parted -s {string}')
|
||||
o = b''.join(x)
|
||||
|
|
@ -220,15 +383,23 @@ class Filesystem():
|
|||
"""
|
||||
return self.raw_parted(string).exit_code
|
||||
|
||||
def use_entire_disk(self, prep_mode=None):
|
||||
self.add_partition('primary', start='1MiB', end='513MiB', format='fat32')
|
||||
def use_entire_disk(self, root_filesystem_type='ext4', encrypt_root_partition=True):
|
||||
self.add_partition('primary', start='1MiB', end='513MiB', format='vfat')
|
||||
self.set_name(0, 'EFI')
|
||||
self.set(0, 'boot on')
|
||||
self.set(0, 'esp on') # TODO: Redundant, as in GPT mode it's an alias for "boot on"? https://www.gnu.org/software/parted/manual/html_node/set.html
|
||||
if prep_mode == 'luks2':
|
||||
self.add_partition('primary', start='513MiB', end='100%')
|
||||
else:
|
||||
self.add_partition('primary', start='513MiB', end='100%', format='ext4')
|
||||
# TODO: Probably redundant because in GPT mode 'esp on' is an alias for "boot on"?
|
||||
# https://www.gnu.org/software/parted/manual/html_node/set.html
|
||||
self.set(0, 'esp on')
|
||||
self.add_partition('primary', start='513MiB', end='100%')
|
||||
|
||||
self.blockdevice.partition[0].filesystem = 'vfat'
|
||||
self.blockdevice.partition[1].filesystem = root_filesystem_type
|
||||
|
||||
self.blockdevice.partition[0].target_mountpoint = '/boot'
|
||||
self.blockdevice.partition[1].target_mountpoint = '/'
|
||||
|
||||
if encrypt_root_partition:
|
||||
self.blockdevice.partition[1].encrypted = True
|
||||
|
||||
def add_partition(self, type, start, end, format=None):
|
||||
log(f'Adding partition to {self.blockdevice}', level=LOG_LEVELS.Info)
|
||||
|
|
@ -302,3 +473,24 @@ def harddrive(size=None, model=None, fuzzy=False):
|
|||
continue
|
||||
|
||||
return collection[drive]
|
||||
|
||||
def get_mount_info(path):
|
||||
try:
|
||||
output = b''.join(sys_command(f'/usr/bin/findmnt --json {path}'))
|
||||
except SysCallError:
|
||||
return {}
|
||||
|
||||
output = output.decode('UTF-8')
|
||||
output = json.loads(output)
|
||||
if 'filesystems' in output:
|
||||
if len(output['filesystems']) > 1:
|
||||
raise DiskError(f"Path '{path}' contains multiple mountpoints: {output['filesystems']}")
|
||||
|
||||
return output['filesystems'][0]
|
||||
|
||||
def get_filesystem_type(path):
|
||||
try:
|
||||
handle = sys_command(f"blkid -o value -s TYPE {path}")
|
||||
return b''.join(handle).strip().decode('UTF-8')
|
||||
except SysCallError:
|
||||
return None
|
||||
|
|
@ -2,6 +2,8 @@ class RequirementError(BaseException):
|
|||
pass
|
||||
class DiskError(BaseException):
|
||||
pass
|
||||
class UnknownFilesystemFormat(BaseException):
|
||||
pass
|
||||
class ProfileError(BaseException):
|
||||
pass
|
||||
class SysCallError(BaseException):
|
||||
|
|
@ -9,4 +11,8 @@ class SysCallError(BaseException):
|
|||
class ProfileNotFound(BaseException):
|
||||
pass
|
||||
class HardwareIncompatibilityError(BaseException):
|
||||
pass
|
||||
class PermissionError(BaseException):
|
||||
pass
|
||||
class UserError(BaseException):
|
||||
pass
|
||||
|
|
@ -37,6 +37,7 @@ class JSON_Encoder:
|
|||
## We'll need to iterate not just the value that default() usually gets passed
|
||||
## But also iterate manually over each key: value pair in order to trap the keys.
|
||||
|
||||
copy = {}
|
||||
for key, val in list(obj.items()):
|
||||
if isinstance(val, dict):
|
||||
val = json.loads(json.dumps(val, cls=JSON)) # This, is a EXTREMELY ugly hack..
|
||||
|
|
@ -44,9 +45,12 @@ class JSON_Encoder:
|
|||
# trigger a encoding of sub-dictionaries.
|
||||
else:
|
||||
val = JSON_Encoder._encode(val)
|
||||
del(obj[key])
|
||||
obj[JSON_Encoder._encode(key)] = val
|
||||
return obj
|
||||
|
||||
if type(key) == str and key[0] == '!':
|
||||
copy[JSON_Encoder._encode(key)] = '******'
|
||||
else:
|
||||
copy[JSON_Encoder._encode(key)] = val
|
||||
return copy
|
||||
elif hasattr(obj, 'json'):
|
||||
return obj.json()
|
||||
elif hasattr(obj, '__dump__'):
|
||||
|
|
|
|||
|
|
@ -78,7 +78,7 @@ class Installer():
|
|||
|
||||
if len(args) >= 2 and args[1]:
|
||||
#self.log(self.trace_log.decode('UTF-8'), level=LOG_LEVELS.Debug)
|
||||
self.log(args[1], level=LOG_LEVELS.Error)
|
||||
self.log(args[1], level=LOG_LEVELS.Error, fg='red')
|
||||
|
||||
self.sync_log_to_install_medium()
|
||||
|
||||
|
|
|
|||
|
|
@ -6,31 +6,60 @@ from .output import log, LOG_LEVELS
|
|||
from .storage import storage
|
||||
|
||||
class luks2():
|
||||
def __init__(self, partition, mountpoint, password, *args, **kwargs):
|
||||
def __init__(self, partition, mountpoint, password, key_file=None, auto_unmount=False, *args, **kwargs):
|
||||
self.password = password
|
||||
self.partition = partition
|
||||
self.mountpoint = mountpoint
|
||||
self.args = args
|
||||
self.kwargs = kwargs
|
||||
self.key_file = key_file
|
||||
self.auto_unmount = auto_unmount
|
||||
self.filesystem = 'crypto_LUKS'
|
||||
self.mapdev = None
|
||||
|
||||
def __enter__(self):
|
||||
key_file = self.encrypt(self.partition, self.password, *self.args, **self.kwargs)
|
||||
return self.unlock(self.partition, self.mountpoint, key_file)
|
||||
#if self.partition.allow_formatting:
|
||||
# self.key_file = self.encrypt(self.partition, *self.args, **self.kwargs)
|
||||
#else:
|
||||
if not self.key_file:
|
||||
self.key_file = f"/tmp/{os.path.basename(self.partition.path)}.disk_pw" # TODO: Make disk-pw-file randomly unique?
|
||||
|
||||
if type(self.password) != bytes:
|
||||
self.password = bytes(self.password, 'UTF-8')
|
||||
|
||||
with open(self.key_file, 'wb') as fh:
|
||||
fh.write(self.password)
|
||||
|
||||
return self.unlock(self.partition, self.mountpoint, self.key_file)
|
||||
|
||||
def __exit__(self, *args, **kwargs):
|
||||
# TODO: https://stackoverflow.com/questions/28157929/how-to-safely-handle-an-exception-inside-a-context-manager
|
||||
if self.auto_unmount:
|
||||
self.close()
|
||||
|
||||
if len(args) >= 2 and args[1]:
|
||||
raise args[1]
|
||||
return True
|
||||
|
||||
def encrypt(self, partition, password, key_size=512, hash_type='sha512', iter_time=10000, key_file=None):
|
||||
def encrypt(self, partition, password=None, key_size=512, hash_type='sha512', iter_time=10000, key_file=None):
|
||||
# TODO: We should be able to integrate this into the main log some how.
|
||||
# Perhaps post-mortem?
|
||||
if not self.partition.allow_formatting:
|
||||
raise DiskError(f'Could not encrypt volume {self.partition} due to it having a formatting lock.')
|
||||
|
||||
log(f'Encrypting {partition} (This might take a while)', level=LOG_LEVELS.Info)
|
||||
|
||||
if not key_file:
|
||||
key_file = f"/tmp/{os.path.basename(self.partition.path)}.disk_pw" # TODO: Make disk-pw-file randomly unique?
|
||||
if type(password) != bytes: password = bytes(password, 'UTF-8')
|
||||
if self.key_file:
|
||||
key_file = self.key_file
|
||||
else:
|
||||
key_file = f"/tmp/{os.path.basename(self.partition.path)}.disk_pw" # TODO: Make disk-pw-file randomly unique?
|
||||
|
||||
if not password:
|
||||
password = self.password
|
||||
|
||||
if type(password) != bytes:
|
||||
password = bytes(password, 'UTF-8')
|
||||
|
||||
with open(key_file, 'wb') as fh:
|
||||
fh.write(password)
|
||||
|
|
@ -49,12 +78,23 @@ class luks2():
|
|||
:param mountpoint: The name without absolute path, for instance "luksdev" will point to /dev/mapper/luksdev
|
||||
:type mountpoint: str
|
||||
"""
|
||||
from .disk import get_filesystem_type
|
||||
if '/' in mountpoint:
|
||||
os.path.basename(mountpoint) # TODO: Raise exception instead?
|
||||
sys_command(f'/usr/bin/cryptsetup open {partition.path} {mountpoint} --key-file {os.path.abspath(key_file)} --type luks2')
|
||||
if os.path.islink(f'/dev/mapper/{mountpoint}'):
|
||||
return Partition(f'/dev/mapper/{mountpoint}', encrypted=True)
|
||||
self.mapdev = f'/dev/mapper/{mountpoint}'
|
||||
unlocked_partition = Partition(self.mapdev, encrypted=True, filesystem=get_filesystem_type(self.mapdev), autodetect_filesystem=False)
|
||||
unlocked_partition.allow_formatting = self.partition.allow_formatting
|
||||
return unlocked_partition
|
||||
|
||||
def close(self, mountpoint):
|
||||
sys_command(f'cryptsetup close /dev/mapper/{mountpoint}')
|
||||
return os.path.islink(f'/dev/mapper/{mountpoint}') is False
|
||||
def close(self, mountpoint=None):
|
||||
if not mountpoint:
|
||||
mountpoint = self.mapdev
|
||||
|
||||
sys_command(f'/usr/bin/cryptsetup close {self.mapdev}')
|
||||
return os.path.islink(self.mapdev) is False
|
||||
|
||||
def format(self, path):
|
||||
if (handle := sys_command(f"/usr/bin/cryptsetup -q -v luksErase {path}")).exit_code != 0:
|
||||
raise DiskError(f'Could not format {path} with {self.filesystem} because: {b"".join(handle)}')
|
||||
|
|
@ -5,6 +5,9 @@ import logging
|
|||
from pathlib import Path
|
||||
from .storage import storage
|
||||
|
||||
# TODO: use logging's built in levels instead.
|
||||
# Altough logging is threaded and I wish to avoid that.
|
||||
# It's more Pythonistic or w/e you want to call it.
|
||||
class LOG_LEVELS:
|
||||
Critical = 0b001
|
||||
Error = 0b010
|
||||
|
|
@ -108,10 +111,10 @@ def log(*args, **kwargs):
|
|||
# In that case, we'll drop it.
|
||||
return None
|
||||
|
||||
try:
|
||||
journald.log(string, level=kwargs['level'])
|
||||
except ModuleNotFoundError:
|
||||
pass # Ignore writing to journald
|
||||
try:
|
||||
journald.log(string, level=kwargs.get('level', LOG_LEVELS.Info))
|
||||
except ModuleNotFoundError:
|
||||
pass # Ignore writing to journald
|
||||
|
||||
# Finally, print the log unless we skipped it based on level.
|
||||
# We use sys.stdout.write()+flush() instead of print() to try and
|
||||
|
|
|
|||
|
|
@ -157,6 +157,23 @@ class Profile(Script):
|
|||
def install(self):
|
||||
return self.execute()
|
||||
|
||||
def has_prep_function(self):
|
||||
with open(self.path, 'r') as source:
|
||||
source_data = source.read()
|
||||
|
||||
# Some crude safety checks, make sure the imported profile has
|
||||
# a __name__ check and if so, check if it's got a _prep_function()
|
||||
# we can call to ask for more user input.
|
||||
#
|
||||
# If the requirements are met, import with .py in the namespace to not
|
||||
# trigger a traditional:
|
||||
# if __name__ == 'moduleName'
|
||||
if '__name__' in source_data and '_prep_function' in source_data:
|
||||
with self.load_instructions(namespace=f"{self.namespace}.py") as imported:
|
||||
if hasattr(imported, '_prep_function'):
|
||||
return True
|
||||
return False
|
||||
|
||||
class Application(Profile):
|
||||
def __repr__(self, *args, **kwargs):
|
||||
return f'Application({os.path.basename(self.profile)})'
|
||||
|
|
|
|||
|
|
@ -17,5 +17,6 @@ storage = {
|
|||
'UPSTREAM_URL' : 'https://raw.githubusercontent.com/Torxed/archinstall/master/profiles',
|
||||
'PROFILE_DB' : None, # Used in cases when listing profiles is desired, not mandatory for direct profile grabing.
|
||||
'LOG_PATH' : '/var/log/archinstall',
|
||||
'LOG_FILE' : 'install.log'
|
||||
'LOG_FILE' : 'install.log',
|
||||
'MOUNT_POINT' : '/mnt'
|
||||
}
|
||||
|
|
|
|||
|
|
@ -1,10 +1,114 @@
|
|||
import getpass
|
||||
from .exceptions import *
|
||||
from .profiles import Profile
|
||||
from .locale_helpers import search_keyboard_layout
|
||||
from .output import log, LOG_LEVELS
|
||||
from .storage import storage
|
||||
from .networking import list_interfaces
|
||||
|
||||
## TODO: Some inconsistencies between the selection processes.
|
||||
## Some return the keys from the options, some the values?
|
||||
|
||||
def get_password(prompt="Enter a password: "):
|
||||
while (passwd := getpass.getpass(prompt)):
|
||||
passwd_verification = getpass.getpass(prompt='And one more time for verification: ')
|
||||
if passwd != passwd_verification:
|
||||
log(' * Passwords did not match * ', bg='black', fg='red')
|
||||
continue
|
||||
return passwd
|
||||
return None
|
||||
|
||||
def ask_for_superuser_account(prompt='Create a required super-user with sudo privileges: ', forced=False):
|
||||
while 1:
|
||||
new_user = input(prompt).strip(' ')
|
||||
|
||||
if not new_user and forced:
|
||||
# TODO: make this text more generic?
|
||||
# It's only used to create the first sudo user when root is disabled in guided.py
|
||||
log(' * Since root is disabled, you need to create a least one (super) user!', bg='black', fg='red')
|
||||
continue
|
||||
elif not new_user and not forced:
|
||||
raise UserError("No superuser was created.")
|
||||
|
||||
password = get_password(prompt=f'Password for user {new_user}: ')
|
||||
return {new_user: password}
|
||||
|
||||
def ask_for_additional_users(prompt='Any additional users to install (leave blank for no users): '):
|
||||
users = {}
|
||||
super_users = {}
|
||||
|
||||
while 1:
|
||||
new_user = input(prompt).strip(' ')
|
||||
if not new_user:
|
||||
break
|
||||
password = get_password(prompt=f'Password for user {new_user}: ')
|
||||
|
||||
if input("Should this user be a sudo (super) user (y/N): ").strip(' ').lower() in ('y', 'yes'):
|
||||
super_users[new_user] = password
|
||||
else:
|
||||
users[new_user] = password
|
||||
|
||||
return users, super_users
|
||||
|
||||
def ask_to_configure_network():
|
||||
# Optionally configure one network interface.
|
||||
#while 1:
|
||||
# {MAC: Ifname}
|
||||
interfaces = {'ISO-CONFIG' : 'Copy ISO network configuration to installation', **list_interfaces()}
|
||||
|
||||
nic = generic_select(interfaces.values(), "Select one network interface to configure (leave blank to skip): ")
|
||||
if nic and nic != 'Copy ISO network configuration to installation':
|
||||
mode = generic_select(['DHCP (auto detect)', 'IP (static)'], f"Select which mode to configure for {nic}: ")
|
||||
if mode == 'IP (static)':
|
||||
while 1:
|
||||
ip = input(f"Enter the IP and subnet for {nic} (example: 192.168.0.5/24): ").strip()
|
||||
if ip:
|
||||
break
|
||||
else:
|
||||
log(
|
||||
"You need to enter a valid IP in IP-config mode.",
|
||||
level=LOG_LEVELS.Warning,
|
||||
bg='black',
|
||||
fg='red'
|
||||
)
|
||||
|
||||
if not len(gateway := input('Enter your gateway (router) IP address or leave blank for none: ').strip()):
|
||||
gateway = None
|
||||
|
||||
dns = None
|
||||
if len(dns_input := input('Enter your DNS servers (space separated, blank for none): ').strip()):
|
||||
dns = dns_input.split(' ')
|
||||
|
||||
return {'nic': nic, 'dhcp': False, 'ip': ip, 'gateway' : gateway, 'dns' : dns}
|
||||
else:
|
||||
return {'nic': nic}
|
||||
elif nic:
|
||||
return nic
|
||||
|
||||
return None
|
||||
|
||||
def ask_for_disk_layout():
|
||||
options = {
|
||||
'keep-existing' : 'Keep existing partition layout and select which ones to use where.',
|
||||
'format-all' : 'Format entire drive and setup a basic partition scheme.',
|
||||
'abort' : 'Abort the installation.'
|
||||
}
|
||||
|
||||
value = generic_select(options.values(), "Found partitions on the selected drive, (select by number) what you want to do: ")
|
||||
return next((key for key, val in options.items() if val == value), None)
|
||||
|
||||
def ask_for_main_filesystem_format():
|
||||
options = {
|
||||
'btrfs' : 'btrfs',
|
||||
'ext4' : 'ext4',
|
||||
'xfs' : 'xfs',
|
||||
'f2fs' : 'f2fs',
|
||||
'vfat' : 'vfat'
|
||||
}
|
||||
|
||||
value = generic_select(options.values(), "Select your main partitions filesystem by number or free-text: ")
|
||||
return next((key for key, val in options.items() if val == value), None)
|
||||
|
||||
def generic_select(options, input_text="Select one of the above by index or absolute value: ", sort=True):
|
||||
"""
|
||||
A generic select function that does not output anything
|
||||
|
|
@ -93,23 +197,7 @@ def select_profile(options):
|
|||
else:
|
||||
RequirementError("Selected profile does not exist.")
|
||||
|
||||
profile = Profile(None, selected_profile)
|
||||
with open(profile.path, 'r') as source:
|
||||
source_data = source.read()
|
||||
|
||||
# Some crude safety checks, make sure the imported profile has
|
||||
# a __name__ check and if so, check if it's got a _prep_function()
|
||||
# we can call to ask for more user input.
|
||||
#
|
||||
# If the requirements are met, import with .py in the namespace to not
|
||||
# trigger a traditional:
|
||||
# if __name__ == 'moduleName'
|
||||
if '__name__' in source_data and '_prep_function' in source_data:
|
||||
with profile.load_instructions(namespace=f"{selected_profile}.py") as imported:
|
||||
if hasattr(imported, '_prep_function'):
|
||||
return profile, imported
|
||||
|
||||
return selected_profile
|
||||
return Profile(None, selected_profile)
|
||||
|
||||
raise RequirementError("Selecting profiles require a least one profile to be given as an option.")
|
||||
|
||||
|
|
|
|||
|
|
@ -1,14 +1,10 @@
|
|||
import getpass, time, json, sys, signal, os
|
||||
import archinstall
|
||||
|
||||
# Create a storage structure for all our information.
|
||||
# We'll print this right before the user gets informed about the formatting timer.
|
||||
archinstall.storage['_guided'] = {}
|
||||
archinstall.storage['_guided_hidden'] = {} # This will simply be hidden from printouts and things.
|
||||
|
||||
"""
|
||||
This signal-handler chain (and global variable)
|
||||
is used to trigger the "Are you sure you want to abort?" question.
|
||||
is used to trigger the "Are you sure you want to abort?" question further down.
|
||||
It might look a bit odd, but have a look at the line: "if SIG_TRIGGER:"
|
||||
"""
|
||||
SIG_TRIGGER = False
|
||||
def kill_handler(sig, frame):
|
||||
|
|
@ -23,13 +19,263 @@ def sig_handler(sig, frame):
|
|||
original_sigint_handler = signal.getsignal(signal.SIGINT)
|
||||
signal.signal(signal.SIGINT, sig_handler)
|
||||
|
||||
|
||||
def ask_user_questions():
|
||||
"""
|
||||
First, we'll ask the user for a bunch of user input.
|
||||
Not until we're satisfied with what we want to install
|
||||
will we continue with the actual installation steps.
|
||||
"""
|
||||
if not archinstall.arguments.get('keyboard-language', None):
|
||||
archinstall.arguments['keyboard-language'] = archinstall.select_language(archinstall.list_keyboard_languages()).strip()
|
||||
|
||||
# Before continuing, set the preferred keyboard layout/language in the current terminal.
|
||||
# This will just help the user with the next following questions.
|
||||
if len(archinstall.arguments['keyboard-language']):
|
||||
archinstall.set_keyboard_language(archinstall.arguments['keyboard-language'])
|
||||
|
||||
# Set which region to download packages from during the installation
|
||||
if not archinstall.arguments.get('mirror-region', None):
|
||||
archinstall.arguments['mirror-region'] = archinstall.select_mirror_regions(archinstall.list_mirrors())
|
||||
else:
|
||||
selected_region = archinstall.arguments['mirror-region']
|
||||
archinstall.arguments['mirror-region'] = {selected_region : archinstall.list_mirrors()[selected_region]}
|
||||
|
||||
|
||||
# Ask which harddrive/block-device we will install to
|
||||
if archinstall.arguments.get('harddrive', None):
|
||||
archinstall.arguments['harddrive'] = archinstall.BlockDevice(archinstall.arguments['harddrive'])
|
||||
else:
|
||||
archinstall.arguments['harddrive'] = archinstall.select_disk(archinstall.all_disks())
|
||||
|
||||
# Perform a quick sanity check on the selected harddrive.
|
||||
# 1. Check if it has partitions
|
||||
# 3. Check that we support the current partitions
|
||||
# 2. If so, ask if we should keep them or wipe everything
|
||||
if archinstall.arguments['harddrive'].has_partitions():
|
||||
archinstall.log(f"{archinstall.arguments['harddrive']} contains the following partitions:", fg='red')
|
||||
|
||||
# We curate a list pf supported paritions
|
||||
# and print those that we don't support.
|
||||
partition_mountpoints = {}
|
||||
for partition in archinstall.arguments['harddrive']:
|
||||
try:
|
||||
if partition.filesystem_supported():
|
||||
archinstall.log(f" {partition}")
|
||||
partition_mountpoints[partition] = None
|
||||
except archinstall.UnknownFilesystemFormat as err:
|
||||
archinstall.log(f" {partition} (Filesystem not supported)", fg='red')
|
||||
|
||||
# We then ask what to do with the paritions.
|
||||
if (option := archinstall.ask_for_disk_layout()) == 'abort':
|
||||
archinstall.log(f"Safely aborting the installation. No changes to the disk or system has been made.")
|
||||
exit(1)
|
||||
elif option == 'keep-existing':
|
||||
archinstall.arguments['harddrive'].keep_partitions = True
|
||||
|
||||
archinstall.log(f" ** You will now select which partitions to use by selecting mount points (inside the installation). **")
|
||||
archinstall.log(f" ** The root would be a simple / and the boot partition /boot (as all paths are relative inside the installation). **")
|
||||
while True:
|
||||
# Select a partition
|
||||
partition = archinstall.generic_select(partition_mountpoints.keys(),
|
||||
"Select a partition by number that you want to set a mount-point for (leave blank when done): ")
|
||||
if not partition:
|
||||
break
|
||||
|
||||
# Select a mount-point
|
||||
mountpoint = input(f"Enter a mount-point for {partition}: ").strip(' ')
|
||||
if len(mountpoint):
|
||||
|
||||
# Get a valid & supported filesystem for the parition:
|
||||
while 1:
|
||||
new_filesystem = input(f"Enter a valid filesystem for {partition} (leave blank for {partition.filesystem}): ").strip(' ')
|
||||
if len(new_filesystem) <= 0:
|
||||
if partition.encrypted and partition.filesystem == 'crypto_LUKS':
|
||||
if (autodetected_filesystem := partition.detect_inner_filesystem(archinstall.arguments.get('!encryption-password', None))):
|
||||
new_filesystem = autodetected_filesystem
|
||||
else:
|
||||
archinstall.log(f"Could not auto-detect the filesystem inside the encrypted volume.", fg='red')
|
||||
archinstall.log(f"A filesystem must be defined for the unlocked encrypted partition.")
|
||||
continue
|
||||
break
|
||||
|
||||
# Since the potentially new filesystem is new
|
||||
# we have to check if we support it. We can do this by formatting /dev/null with the partitions filesystem.
|
||||
# There's a nice wrapper for this on the partition object itself that supports a path-override during .format()
|
||||
try:
|
||||
partition.format(new_filesystem, path='/dev/null', log_formating=False, allow_formatting=True)
|
||||
except archinstall.UnknownFilesystemFormat:
|
||||
archinstall.log(f"Selected filesystem is not supported yet. If you want archinstall to support '{new_filesystem}', please create a issue-ticket suggesting it on github at https://github.com/Torxed/archinstall/issues.")
|
||||
archinstall.log(f"Until then, please enter another supported filesystem.")
|
||||
continue
|
||||
except archinstall.SysCallError:
|
||||
pass # Expected exception since mkfs.<format> can not format /dev/null.
|
||||
# But that means our .format() function supported it.
|
||||
break
|
||||
|
||||
# When we've selected all three criterias,
|
||||
# We can safely mark the partition for formatting and where to mount it.
|
||||
# TODO: allow_formatting might be redundant since target_mountpoint should only be
|
||||
# set if we actually want to format it anyway.
|
||||
partition.allow_formatting = True
|
||||
partition.target_mountpoint = mountpoint
|
||||
# Only overwrite the filesystem definition if we selected one:
|
||||
if len(new_filesystem):
|
||||
partition.filesystem = new_filesystem
|
||||
|
||||
archinstall.log('Using existing partition table reported above.')
|
||||
elif option == 'format-all':
|
||||
archinstall.arguments['filesystem'] = archinstall.ask_for_main_filesystem_format()
|
||||
archinstall.arguments['harddrive'].keep_partitions = False
|
||||
|
||||
# Get disk encryption password (or skip if blank)
|
||||
if not archinstall.arguments.get('!encryption-password', None):
|
||||
archinstall.arguments['!encryption-password'] = archinstall.get_password(prompt='Enter disk encryption password (leave blank for no encryption): ')
|
||||
archinstall.arguments['harddrive'].encryption_password = archinstall.arguments['!encryption-password']
|
||||
|
||||
# Get the hostname for the machine
|
||||
if not archinstall.arguments.get('hostname', None):
|
||||
archinstall.arguments['hostname'] = input('Desired hostname for the installation: ').strip(' ')
|
||||
|
||||
# Ask for a root password (optional, but triggers requirement for super-user if skipped)
|
||||
if not archinstall.arguments.get('!root-password', None):
|
||||
archinstall.arguments['!root-password'] = archinstall.get_password(prompt='Enter root password (Recommended: leave blank to leave root disabled): ')
|
||||
|
||||
# Ask for additional users (super-user if root pw was not set)
|
||||
archinstall.arguments['users'] = {}
|
||||
archinstall.arguments['superusers'] = {}
|
||||
if not archinstall.arguments.get('!root-password', None):
|
||||
archinstall.arguments['superusers'] = archinstall.ask_for_superuser_account('Create a required super-user with sudo privileges: ', forced=True)
|
||||
|
||||
users, superusers = archinstall.ask_for_additional_users('Any additional users to install (leave blank for no users): ')
|
||||
archinstall.arguments['users'] = users
|
||||
archinstall.arguments['superusers'] = {**archinstall.arguments['superusers'], **superusers}
|
||||
|
||||
# Ask for archinstall-specific profiles (such as desktop environments etc)
|
||||
if not archinstall.arguments.get('profile', None):
|
||||
archinstall.arguments['profile'] = archinstall.select_profile(archinstall.list_profiles())
|
||||
else:
|
||||
archinstall.arguments['profile'] = archinstall.list_profiles()[archinstall.arguments['profile']]
|
||||
|
||||
# Check the potentially selected profiles preperations to get early checks if some additional questions are needed.
|
||||
if archinstall.arguments['profile'] and archinstall.arguments['profile'].has_prep_function():
|
||||
with archinstall.arguments['profile'].load_instructions(namespace=f"{archinstall.arguments['profile'].namespace}.py") as imported:
|
||||
if not imported._prep_function():
|
||||
archinstall.log(
|
||||
' * Profile\'s preparation requirements was not fulfilled.',
|
||||
bg='black',
|
||||
fg='red'
|
||||
)
|
||||
exit(1)
|
||||
|
||||
# Additional packages (with some light weight error handling for invalid package names)
|
||||
if not archinstall.arguments.get('packages', None):
|
||||
archinstall.arguments['packages'] = [package for package in input('Additional packages aside from base (space separated): ').split(' ') if len(package)]
|
||||
|
||||
# Verify packages that were given
|
||||
try:
|
||||
archinstall.validate_package_list(archinstall.arguments['packages'])
|
||||
except archinstall.RequirementError as e:
|
||||
archinstall.log(e, fg='red')
|
||||
exit(1)
|
||||
|
||||
# Ask or Call the helper function that asks the user to optionally configure a network.
|
||||
if not archinstall.arguments.get('nic', None):
|
||||
archinstall.arguments['nic'] = archinstall.ask_to_configure_network()
|
||||
|
||||
|
||||
def perform_installation_steps():
|
||||
global SIG_TRIGGER
|
||||
|
||||
print()
|
||||
print('This is your chosen configuration:')
|
||||
archinstall.log("-- Guided template chosen (with below config) --", level=archinstall.LOG_LEVELS.Debug)
|
||||
archinstall.log(json.dumps(archinstall.arguments, indent=4, sort_keys=True, cls=archinstall.JSON), level=archinstall.LOG_LEVELS.Info)
|
||||
print()
|
||||
|
||||
input('Press Enter to continue.')
|
||||
|
||||
"""
|
||||
Issue a final warning before we continue with something un-revertable.
|
||||
We mention the drive one last time, and count from 5 to 0.
|
||||
"""
|
||||
|
||||
print(f" ! Formatting {archinstall.arguments['harddrive']} in ", end='')
|
||||
|
||||
for i in range(5, 0, -1):
|
||||
print(f"{i}", end='')
|
||||
|
||||
for x in range(4):
|
||||
sys.stdout.flush()
|
||||
time.sleep(0.25)
|
||||
print(".", end='')
|
||||
|
||||
if SIG_TRIGGER:
|
||||
abort = input('\nDo you really want to abort (y/n)? ')
|
||||
if abort.strip() != 'n':
|
||||
exit(0)
|
||||
|
||||
if SIG_TRIGGER is False:
|
||||
sys.stdin.read()
|
||||
SIG_TRIGGER = False
|
||||
signal.signal(signal.SIGINT, sig_handler)
|
||||
|
||||
# Put back the default/original signal handler now that we're done catching
|
||||
# and interrupting SIGINT with "Do you really want to abort".
|
||||
print()
|
||||
signal.signal(signal.SIGINT, original_sigint_handler)
|
||||
|
||||
"""
|
||||
Setup the blockdevice, filesystem (and optionally encryption).
|
||||
Once that's done, we'll hand over to perform_installation()
|
||||
"""
|
||||
with archinstall.Filesystem(archinstall.arguments['harddrive'], archinstall.GPT) as fs:
|
||||
# Wipe the entire drive if the disk flag `keep_partitions`is False.
|
||||
if archinstall.arguments['harddrive'].keep_partitions is False:
|
||||
fs.use_entire_disk(root_filesystem_type=archinstall.arguments.get('filesystem', 'btrfs'),
|
||||
encrypt_root_partition=archinstall.arguments.get('!encryption-password', False))
|
||||
# Otherwise, check if encryption is desired and mark the root partition as encrypted.
|
||||
elif archinstall.arguments.get('!encryption-password', None):
|
||||
root_partition = fs.find_partition('/')
|
||||
root_partition.encrypted = True
|
||||
|
||||
# After the disk is ready, iterate the partitions and check
|
||||
# which ones are safe to format, and format those.
|
||||
for partition in archinstall.arguments['harddrive']:
|
||||
if partition.safe_to_format():
|
||||
if partition.encrypted:
|
||||
partition.encrypt(password=archinstall.arguments.get('!encryption-password', None))
|
||||
else:
|
||||
partition.format()
|
||||
else:
|
||||
archinstall.log(f"Did not format {partition} because .safe_to_format() returned False or .allow_formatting was False.", level=archinstall.LOG_LEVELS.Debug)
|
||||
|
||||
if archinstall.arguments.get('!encryption-password', None):
|
||||
# First encrypt and unlock, then format the desired partition inside the encrypted part.
|
||||
# archinstall.luks2() encrypts the partition when entering the with context manager, and
|
||||
# unlocks the drive so that it can be used as a normal block-device within archinstall.
|
||||
with archinstall.luks2(fs.find_partition('/'), 'luksloop', archinstall.arguments.get('!encryption-password', None)) as unlocked_device:
|
||||
unlocked_device.format(fs.find_partition('/').filesystem)
|
||||
|
||||
perform_installation(device=unlocked_device,
|
||||
boot_partition=fs.find_partition('/boot'),
|
||||
language=archinstall.arguments['keyboard-language'],
|
||||
mirrors=archinstall.arguments['mirror-region'])
|
||||
else:
|
||||
archinstall.arguments['harddrive'].partition[1].format('ext4')
|
||||
perform_installation(device=fs.find_partition('/'),
|
||||
boot_partition=fs.find_partition('/boot'),
|
||||
language=archinstall.arguments['keyboard-language'],
|
||||
mirrors=archinstall.arguments['mirror-region'])
|
||||
|
||||
|
||||
def perform_installation(device, boot_partition, language, mirrors):
|
||||
"""
|
||||
Performs the installation steps on a block device.
|
||||
Only requirement is that the block devices are
|
||||
formatted and setup prior to entering this function.
|
||||
"""
|
||||
with archinstall.Installer(device, boot_partition=boot_partition, hostname=archinstall.storage['_guided']['hostname']) as installation:
|
||||
with archinstall.Installer(device, boot_partition=boot_partition, hostname=archinstall.arguments.get('hostname', 'Archinstall')) as installation:
|
||||
## if len(mirrors):
|
||||
# Certain services might be running that affects the system during installation.
|
||||
# Currently, only one such service is "reflector.service" which updates /etc/pacman.d/mirrorlist
|
||||
|
|
@ -46,242 +292,35 @@ def perform_installation(device, boot_partition, language, mirrors):
|
|||
|
||||
# If user selected to copy the current ISO network configuration
|
||||
# Perform a copy of the config
|
||||
if archinstall.storage['_guided']['network'] == 'Copy ISO network configuration to installation':
|
||||
if archinstall.arguments.get('nic', None) == 'Copy ISO network configuration to installation':
|
||||
installation.copy_ISO_network_config(enable_services=True) # Sources the ISO network configuration to the install medium.
|
||||
|
||||
# Otherwise, if a interface was selected, configure that interface
|
||||
elif archinstall.storage['_guided']['network']:
|
||||
installation.configure_nic(**archinstall.storage['_guided']['network'])
|
||||
elif archinstall.arguments.get('nic', None):
|
||||
installation.configure_nic(**archinstall.arguments.get('nic', {}))
|
||||
installation.enable_service('systemd-networkd')
|
||||
installation.enable_service('systemd-resolved')
|
||||
|
||||
|
||||
if archinstall.storage['_guided']['packages'] and archinstall.storage['_guided']['packages'][0] != '':
|
||||
installation.add_additional_packages(archinstall.storage['_guided']['packages'])
|
||||
if archinstall.arguments.get('packages', None) and archinstall.arguments.get('packages', None)[0] != '':
|
||||
installation.add_additional_packages(archinstall.arguments.get('packages', None))
|
||||
|
||||
if 'profile' in archinstall.storage['_guided'] and len(profile := archinstall.storage['_guided']['profile']['path'].strip()):
|
||||
if archinstall.arguments.get('profile', None) and len(profile := archinstall.arguments.get('profile').strip()):
|
||||
installation.install_profile(profile)
|
||||
|
||||
if archinstall.storage['_guided']['users']:
|
||||
for user in archinstall.storage['_guided']['users']:
|
||||
if archinstall.arguments.get('users', None):
|
||||
for user in archinstall.arguments.get('users'):
|
||||
password = users[user]
|
||||
|
||||
sudo = False
|
||||
if 'root_pw' not in archinstall.storage['_guided_hidden'] or len(archinstall.storage['_guided_hidden']['root_pw'].strip()) == 0:
|
||||
sudo = True
|
||||
|
||||
installation.user_create(user, password, sudo=sudo)
|
||||
|
||||
if 'root_pw' in archinstall.storage['_guided_hidden'] and archinstall.storage['_guided_hidden']['root_pw']:
|
||||
installation.user_set_pw('root', archinstall.storage['_guided_hidden']['root_pw'])
|
||||
|
||||
# Unmount and close previous runs (in case the installer is restarted)
|
||||
archinstall.sys_command(f'umount -R /mnt', suppress_errors=True)
|
||||
archinstall.sys_command(f'cryptsetup close /dev/mapper/luksloop', suppress_errors=True)
|
||||
installation.user_create(user, password, sudo=False)
|
||||
if archinstall.arguments.get('superusers', None):
|
||||
for user in archinstall.arguments.get('users'):
|
||||
password = users[user]
|
||||
installation.user_create(user, password, sudo=Tru)
|
||||
|
||||
|
||||
"""
|
||||
First, we'll ask the user for a bunch of user input.
|
||||
Not until we're satisfied with what we want to install
|
||||
will we continue with the actual installation steps.
|
||||
"""
|
||||
if (root_pw := archinstall.arguments.get('!root-password', None)) and len(root_pw):
|
||||
installation.user_set_pw('root', root_pw)
|
||||
|
||||
if len(keyboard_language := archinstall.select_language(archinstall.list_keyboard_languages()).strip()):
|
||||
archinstall.set_keyboard_language(keyboard_language)
|
||||
archinstall.storage['_guided']['keyboard_layout'] = keyboard_language
|
||||
|
||||
# Set which region to download packages from during the installation
|
||||
mirror_regions = archinstall.select_mirror_regions(archinstall.list_mirrors())
|
||||
archinstall.storage['_guided']['mirrors'] = mirror_regions
|
||||
|
||||
# Ask which harddrive/block-device we will install to
|
||||
harddrive = archinstall.select_disk(archinstall.all_disks())
|
||||
while (disk_password := getpass.getpass(prompt='Enter disk encryption password (leave blank for no encryption): ')):
|
||||
disk_password_verification = getpass.getpass(prompt='And one more time for verification: ')
|
||||
if disk_password != disk_password_verification:
|
||||
archinstall.log(' * Passwords did not match * ', bg='black', fg='red')
|
||||
continue
|
||||
archinstall.storage['_guided']['disk_encryption'] = True
|
||||
break
|
||||
archinstall.storage['_guided']['harddrive'] = harddrive
|
||||
|
||||
# Ask for a hostname
|
||||
hostname = input('Desired hostname for the installation: ')
|
||||
if len(hostname) == 0:
|
||||
hostname = 'ArchInstall'
|
||||
archinstall.storage['_guided']['hostname'] = hostname
|
||||
|
||||
# Ask for a root password (optional, but triggers requirement for super-user if skipped)
|
||||
while (root_pw := getpass.getpass(prompt='Enter root password (leave blank to leave root disabled): ')):
|
||||
root_pw_verification = getpass.getpass(prompt='And one more time for verification: ')
|
||||
if root_pw != root_pw_verification:
|
||||
archinstall.log(' * Passwords did not match * ', bg='black', fg='red')
|
||||
continue
|
||||
|
||||
# Storing things in _guided_hidden helps us avoid printing it
|
||||
# when echoing user configuration: archinstall.storage['_guided']
|
||||
archinstall.storage['_guided_hidden']['root_pw'] = root_pw
|
||||
archinstall.storage['_guided']['root_unlocked'] = True
|
||||
break
|
||||
|
||||
# Ask for additional users (super-user if root pw was not set)
|
||||
users = {}
|
||||
new_user_text = 'Any additional users to install (leave blank for no users): '
|
||||
if len(root_pw.strip()) == 0:
|
||||
new_user_text = 'Create a super-user with sudo privileges: '
|
||||
|
||||
archinstall.storage['_guided']['users'] = None
|
||||
while 1:
|
||||
new_user = input(new_user_text)
|
||||
if len(new_user.strip()) == 0:
|
||||
if len(root_pw.strip()) == 0:
|
||||
archinstall.log(' * Since root is disabled, you need to create a least one (super) user!', bg='black', fg='red')
|
||||
continue
|
||||
break
|
||||
|
||||
if not archinstall.storage['_guided']['users']:
|
||||
archinstall.storage['_guided']['users'] = []
|
||||
archinstall.storage['_guided']['users'].append(new_user)
|
||||
|
||||
new_user_passwd = getpass.getpass(prompt=f'Password for user {new_user}: ')
|
||||
new_user_passwd_verify = getpass.getpass(prompt=f'Enter password again for verification: ')
|
||||
if new_user_passwd != new_user_passwd_verify:
|
||||
archinstall.log(' * Passwords did not match * ', bg='black', fg='red')
|
||||
continue
|
||||
|
||||
users[new_user] = new_user_passwd
|
||||
break
|
||||
|
||||
# Ask for archinstall-specific profiles (such as desktop environments etc)
|
||||
while 1:
|
||||
profile = archinstall.select_profile(archinstall.list_profiles())
|
||||
if profile:
|
||||
archinstall.storage['_guided']['profile'] = profile
|
||||
|
||||
if type(profile) != str: # Got a imported profile
|
||||
archinstall.storage['_guided']['profile'] = profile[0] # The second return is a module, and not a handle/object.
|
||||
if not profile[1]._prep_function():
|
||||
# TODO: See how we can incorporate this into
|
||||
# the general log flow. As this is pre-installation
|
||||
# session setup. Which creates the installation.log file.
|
||||
archinstall.log(
|
||||
' * Profile\'s preparation requirements was not fulfilled.',
|
||||
bg='black',
|
||||
fg='red'
|
||||
)
|
||||
continue
|
||||
break
|
||||
else:
|
||||
break
|
||||
|
||||
# Additional packages (with some light weight error handling for invalid package names)
|
||||
archinstall.storage['_guided']['packages'] = None
|
||||
while 1:
|
||||
packages = [package for package in input('Additional packages aside from base (space separated): ').split(' ') if len(package)]
|
||||
|
||||
if not packages:
|
||||
break
|
||||
|
||||
try:
|
||||
if archinstall.validate_package_list(packages):
|
||||
archinstall.storage['_guided']['packages'] = packages
|
||||
break
|
||||
except archinstall.RequirementError as e:
|
||||
print(e)
|
||||
|
||||
# Optionally configure one network interface.
|
||||
#while 1:
|
||||
# {MAC: Ifname}
|
||||
interfaces = {'ISO-CONFIG' : 'Copy ISO network configuration to installation', **archinstall.list_interfaces()}
|
||||
archinstall.storage['_guided']['network'] = None
|
||||
|
||||
nic = archinstall.generic_select(interfaces.values(), "Select one network interface to configure (leave blank to skip): ")
|
||||
if nic and nic != 'Copy ISO network configuration to installation':
|
||||
mode = archinstall.generic_select(['DHCP (auto detect)', 'IP (static)'], f"Select which mode to configure for {nic}: ")
|
||||
if mode == 'IP (static)':
|
||||
while 1:
|
||||
ip = input(f"Enter the IP and subnet for {nic} (example: 192.168.0.5/24): ").strip()
|
||||
if ip:
|
||||
break
|
||||
else:
|
||||
ArchInstall.log(
|
||||
"You need to enter a valid IP in IP-config mode.",
|
||||
level=archinstall.LOG_LEVELS.Warning,
|
||||
bg='black',
|
||||
fg='red'
|
||||
)
|
||||
|
||||
if not len(gateway := input('Enter your gateway (router) IP address or leave blank for none: ').strip()):
|
||||
gateway = None
|
||||
|
||||
dns = None
|
||||
if len(dns_input := input('Enter your DNS servers (space separated, blank for none): ').strip()):
|
||||
dns = dns_input.split(' ')
|
||||
|
||||
archinstall.storage['_guided']['network'] = {'nic': nic, 'dhcp': False, 'ip': ip, 'gateway' : gateway, 'dns' : dns}
|
||||
else:
|
||||
archinstall.storage['_guided']['network'] = {'nic': nic}
|
||||
elif nic:
|
||||
archinstall.storage['_guided']['network'] = nic
|
||||
|
||||
print()
|
||||
print('This is your chosen configuration:')
|
||||
archinstall.log("-- Guided template chosen (with below config) --", level=archinstall.LOG_LEVELS.Debug)
|
||||
archinstall.log(json.dumps(archinstall.storage['_guided'], indent=4, sort_keys=True, cls=archinstall.JSON), level=archinstall.LOG_LEVELS.Info)
|
||||
print()
|
||||
|
||||
input('Press Enter to continue.')
|
||||
|
||||
"""
|
||||
Issue a final warning before we continue with something un-revertable.
|
||||
We mention the drive one last time, and count from 5 to 0.
|
||||
"""
|
||||
|
||||
print(f' ! Formatting {harddrive} in ', end='')
|
||||
|
||||
for i in range(5, 0, -1):
|
||||
print(f"{i}", end='')
|
||||
|
||||
for x in range(4):
|
||||
sys.stdout.flush()
|
||||
time.sleep(0.25)
|
||||
print(".", end='')
|
||||
|
||||
if SIG_TRIGGER:
|
||||
abort = input('\nDo you really want to abort (y/n)? ')
|
||||
if abort.strip() != 'n':
|
||||
exit(0)
|
||||
|
||||
if SIG_TRIGGER is False:
|
||||
sys.stdin.read()
|
||||
SIG_TRIGGER = False
|
||||
signal.signal(signal.SIGINT, sig_handler)
|
||||
print()
|
||||
signal.signal(signal.SIGINT, original_sigint_handler)
|
||||
|
||||
"""
|
||||
Setup the blockdevice, filesystem (and optionally encryption).
|
||||
Once that's done, we'll hand over to perform_installation()
|
||||
"""
|
||||
with archinstall.Filesystem(harddrive, archinstall.GPT) as fs:
|
||||
# Use partitioning helper to set up the disk partitions.
|
||||
if disk_password:
|
||||
fs.use_entire_disk('luks2')
|
||||
else:
|
||||
fs.use_entire_disk('ext4')
|
||||
|
||||
if harddrive.partition[1].size == '512M':
|
||||
raise OSError('Trying to encrypt the boot partition for petes sake..')
|
||||
harddrive.partition[0].format('fat32')
|
||||
|
||||
if disk_password:
|
||||
# First encrypt and unlock, then format the desired partition inside the encrypted part.
|
||||
# archinstall.luks2() encrypts the partition when entering the with context manager, and
|
||||
# unlocks the drive so that it can be used as a normal block-device within archinstall.
|
||||
with archinstall.luks2(harddrive.partition[1], 'luksloop', disk_password) as unlocked_device:
|
||||
unlocked_device.format('btrfs')
|
||||
|
||||
perform_installation(unlocked_device, harddrive.partition[0], keyboard_language, mirror_regions)
|
||||
else:
|
||||
harddrive.partition[1].format('ext4')
|
||||
perform_installation(harddrive.partition[1], harddrive.partition[0], keyboard_language, mirror_regions)
|
||||
ask_user_questions()
|
||||
perform_installation_steps()
|
||||
18
make.sh
18
make.sh
|
|
@ -1,18 +0,0 @@
|
|||
#!/bin/bash
|
||||
# Description: Binary builder for https://archlinux.life/bin/
|
||||
|
||||
VERSION=$(cat VERSION)
|
||||
|
||||
rm -rf archinstall.egg-info/ build/ src/ pkg/ dist/ archinstall.build/ "archinstall-v${VERSION}-x86_64/" *.pkg.*.xz archinstall-*.tar.gz
|
||||
|
||||
#nuitka3 --standalone --show-progress archinstall
|
||||
#cp -r examples/ archinstall.dist/
|
||||
#mv archinstall.dist "archinstall-v${VERSION}-x86_64"
|
||||
#tar -czvf "archinstall-v${VERSION}.tar.gz" "archinstall-v${VERSION}-x86_64"
|
||||
|
||||
# makepkg -f
|
||||
python3 setup.py sdist bdist_wheel
|
||||
echo 'python3 -m twine upload dist/* && rm -rf dist/'
|
||||
python3 -m twine upload dist/* && rm -rf dist/
|
||||
|
||||
rm -rf archinstall.egg-info/ build/ src/ pkg/ archinstall.build/ "archinstall-v${VERSION}-x86_64/"
|
||||
Loading…
Reference in New Issue