Many more manual changes

This commit is contained in:
Dylan Taylor 2021-05-15 12:29:57 -04:00
parent 8eebc8ade3
commit 69d675f4aa
17 changed files with 232 additions and 150 deletions

View File

@ -1,12 +1,11 @@
from typing import Optional
import glob, re, os, json, time, hashlib
import pathlib, traceback, logging
import glob
import pathlib
import re
from collections import OrderedDict
from .exceptions import DiskError
from .general import *
from .output import log
from .storage import storage
from .hardware import hasUEFI
from .output import log
ROOT_DIR_PATTERN = re.compile('^.*?/devices')
GPT = 0b00000001
@ -172,7 +171,7 @@ class Partition():
self.mount(mountpoint)
mount_information = get_mount_info(self.path)
if self.mountpoint != mount_information.get('target', None) and mountpoint:
raise DiskError(f"{self} was given a mountpoint but the actual mountpoint differs: {mount_information.get('target', None)}")
@ -250,14 +249,14 @@ class Partition():
def has_content(self):
if not get_filesystem_type(self.path):
return False
temporary_mountpoint = '/tmp/'+hashlib.md5(bytes(f"{time.time()}", 'UTF-8')+os.urandom(12)).hexdigest()
temporary_path = pathlib.Path(temporary_mountpoint)
temporary_path.mkdir(parents=True, exist_ok=True)
if (handle := sys_command(f'/usr/bin/mount {self.path} {temporary_mountpoint}')).exit_code != 0:
raise DiskError(f'Could not mount and check for content on {self.path} because: {b"".join(handle)}')
files = len(glob.glob(f"{temporary_mountpoint}/*"))
sys_command(f'/usr/bin/umount {temporary_mountpoint}')
@ -385,7 +384,7 @@ class Partition():
sys_command(f'/usr/bin/mount {self.path} {target}')
except SysCallError as err:
raise err
self.mountpoint = target
return True
@ -446,7 +445,7 @@ class Filesystem():
raise DiskError(f'Problem setting the partition format to GPT:', f'/usr/bin/parted -s {self.blockdevice.device} mklabel msdos')
else:
raise DiskError(f'Unknown mode selected to format in: {self.mode}')
# TODO: partition_table_type is hardcoded to GPT at the moment. This has to be changed.
elif self.mode == self.blockdevice.partition_table_type:
log(f'Kept partition format {self.mode} for {self.blockdevice}', level=logging.DEBUG)
@ -513,7 +512,7 @@ class Filesystem():
def add_partition(self, type, start, end, format=None):
log(f'Adding partition to {self.blockdevice}', level=logging.INFO)
previous_partitions = self.blockdevice.partitions
if self.mode == MBR:
if len(self.blockdevice.partitions)>3:
@ -632,4 +631,4 @@ def disk_layouts():
return json.loads(b''.join(handle).decode('UTF-8'))
except SysCallError as err:
log(f"Could not return disk layouts: {err}")
return None
return None

View File

@ -1,23 +1,41 @@
class RequirementError(BaseException):
pass
class DiskError(BaseException):
pass
class UnknownFilesystemFormat(BaseException):
pass
class ProfileError(BaseException):
pass
class SysCallError(BaseException):
def __init__(self, message, exit_code):
super(SysCallError, self).__init__(message)
self.message = message
self.exit_code = exit_code
class ProfileNotFound(BaseException):
pass
class HardwareIncompatibilityError(BaseException):
pass
class PermissionError(BaseException):
pass
class UserError(BaseException):
pass
class ServiceException(BaseException):
pass
pass

View File

@ -1,11 +1,18 @@
import os, json, hashlib, shlex, sys
import time, pty, logging
import hashlib
import json
import logging
import os
import pty
import shlex
import sys
import time
from datetime import datetime, date
from subprocess import Popen, STDOUT, PIPE, check_output
from select import epoll, EPOLLIN, EPOLLHUP
from typing import Union
from .exceptions import *
from .output import log
from typing import Optional, Union
def gen_uid(entropy_length=256):
return hashlib.sha512(os.urandom(entropy_length)).hexdigest()
@ -37,16 +44,16 @@ class JSON_Encoder:
if isinstance(obj, dict):
## We'll need to iterate not just the value that default() usually gets passed
## But also iterate manually over each key: value pair in order to trap the keys.
copy = {}
for key, val in list(obj.items()):
if isinstance(val, dict):
val = json.loads(json.dumps(val, cls=JSON)) # This, is a EXTREMELY ugly hack..
# But it's the only quick way I can think of to
# But it's the only quick way I can think of to
# trigger a encoding of sub-dictionaries.
else:
val = JSON_Encoder._encode(val)
if type(key) == str and key[0] == '!':
copy[JSON_Encoder._encode(key)] = '******'
else:

View File

@ -1,8 +1,11 @@
import os, subprocess, json
from .general import sys_command
from .networking import list_interfaces, enrichIfaceTypes
import json
import os
import subprocess
from typing import Optional
from .general import sys_command
from .networking import list_interfaces, enrich_iface_types
__packages__ = [
"mesa",
"xf86-video-amdgpu",
@ -53,7 +56,7 @@ AVAILABLE_GFX_DRIVERS = {
}
def hasWifi()->bool:
return 'WIRELESS' in enrichIfaceTypes(list_interfaces().values()).values()
return 'WIRELESS' in enrich_iface_types(list_interfaces().values()).values()
def hasAMDCPU()->bool:
if subprocess.check_output("lscpu | grep AMD", shell=True).strip().decode():

View File

@ -1,15 +1,11 @@
import os, stat, time, shutil, pathlib
import subprocess, logging
from .exceptions import *
from .disk import *
from .general import *
from .user_interaction import *
from .profiles import Profile
from .mirrors import *
from .systemd import Networkd
from .output import log
from .storage import storage
from .hardware import *
from .mirrors import *
from .output import log
from .profiles import Profile
from .storage import storage
from .systemd import Networkd
from .user_interaction import *
# Any package that the Installer() is responsible for (optional and the default ones)
__packages__ = ["base", "base-devel", "linux-firmware", "linux", "linux-lts", "linux-zen", "linux-hardened"]
@ -47,7 +43,7 @@ class Installer():
'base' : False,
'bootloader' : False
}
self.base_packages = base_packages.split(' ') if type(base_packages) is str else base_packages
for kernel in kernels:
self.base_packages.append(kernel)
@ -100,10 +96,10 @@ class Installer():
self.log('Some required steps were not successfully installed/configured before leaving the installer:', fg='red', level=logging.WARNING)
for step in missing_steps:
self.log(f' - {step}', fg='red', level=logging.WARNING)
self.log(f"Detailed error logs can be found at: {storage['LOG_PATH']}", level=logging.WARNING)
self.log(f"Submit this zip file as an issue to https://github.com/archlinux/archinstall/issues", level=logging.WARNING)
self.sync_log_to_install_medium()
return False
@ -116,7 +112,7 @@ class Installer():
if not os.path.isdir(f"{self.target}/{os.path.dirname(absolute_logfile)}"):
os.makedirs(f"{self.target}/{os.path.dirname(absolute_logfile)}")
shutil.copy2(absolute_logfile, f"{self.target}/{absolute_logfile}")
return True
@ -124,7 +120,7 @@ class Installer():
def mount(self, partition, mountpoint, create_mountpoint=True):
if create_mountpoint and not os.path.isdir(f'{self.target}{mountpoint}'):
os.makedirs(f'{self.target}{mountpoint}')
partition.mount(f'{self.target}{mountpoint}')
def post_install_check(self, *args, **kwargs):
@ -147,7 +143,7 @@ class Installer():
def genfstab(self, flags='-pU'):
self.log(f"Updating {self.target}/etc/fstab", level=logging.INFO)
fstab = sys_command(f'/usr/bin/genfstab {flags} {self.target}').trace_log
with open(f"{self.target}/etc/fstab", 'ab') as fstab_fh:
fstab_fh.write(fstab)
@ -204,7 +200,7 @@ class Installer():
def arch_chroot(self, cmd, *args, **kwargs):
if 'runas' in kwargs:
cmd = f"su - {kwargs['runas']} -c \"{cmd}\""
return self.run_command(cmd)
def drop_to_shell(self):
@ -224,7 +220,7 @@ class Installer():
network["DNS"] = dns
conf = Networkd(Match={"Name": nic}, Network=network)
with open(f"{self.target}/etc/systemd/network/10-{nic}.network", "a") as netconf:
netconf.write(str(conf))
@ -272,7 +268,7 @@ class Installer():
# Otherwise, we can go ahead and enable the services
else:
self.enable_service('systemd-networkd', 'systemd-resolved')
return True
@ -281,7 +277,7 @@ class Installer():
return partition
elif partition.parent not in partition.path and Partition(partition.parent, None, autodetect_filesystem=True).filesystem == 'crypto_LUKS':
return Partition(partition.parent, None, autodetect_filesystem=True)
return False
def mkinitcpio(self, *flags):
@ -298,7 +294,7 @@ class Installer():
## TODO: Perhaps this should be living in the function which dictates
## the partitioning. Leaving here for now.
for partition in self.partitions:
if partition.filesystem == 'btrfs':
@ -322,7 +318,7 @@ class Installer():
if not(hasUEFI()):
self.base_packages.append('grub')
if not isVM():
vendor = cpuVendor()
if vendor == "AuthenticAMD":
@ -331,7 +327,7 @@ class Installer():
self.base_packages.append("intel-ucode")
else:
self.log("Unknown cpu vendor not installing ucode")
self.pacstrap(self.base_packages)
self.helper_flags['base-strapped'] = True
@ -395,7 +391,7 @@ class Installer():
f"default {self.init_time}",
f"timeout 5"
]
with open(f'{self.target}/boot/loader/loader.conf', 'w') as loader:
for line in loader_data:
if line[:8] == 'default ':
@ -500,7 +496,7 @@ class Installer():
o = b''.join(sys_command(f"/usr/bin/arch-chroot {self.target} sh -c \"echo '{user}:{password}' | chpasswd\""))
pass
def user_set_shell(self, user, shell):
self.log(f'Setting shell for {user} to {shell}', level=logging.INFO)

View File

@ -4,6 +4,7 @@ import os
from .exceptions import *
# from .general import sys_command
def list_keyboard_languages():
locale_dir = '/usr/share/kbd/keymaps/'
@ -16,16 +17,19 @@ def list_keyboard_languages():
if os.path.splitext(file)[1] == '.gz':
yield file.strip('.gz').strip('.map')
def verify_keyboard_layout(layout):
for language in list_keyboard_languages():
if layout.lower() == language.lower():
return True
return False
def search_keyboard_layout(filter):
for language in list_keyboard_languages():
if filter.lower() in language.lower():
yield language
def set_keyboard_language(locale):
return subprocess.call(['loadkeys', locale]) == 0

View File

@ -1,13 +1,9 @@
import os
import shlex
import time
import pathlib
import logging
from .exceptions import *
from .general import *
from .disk import Partition
from .general import *
from .output import log
from .storage import storage
class luks2():
def __init__(self, partition, mountpoint, password, key_file=None, auto_unmount=False, *args, **kwargs):
@ -22,12 +18,12 @@ class luks2():
self.mapdev = None
def __enter__(self):
#if self.partition.allow_formatting:
# self.key_file = self.encrypt(self.partition, *self.args, **self.kwargs)
#else:
# if self.partition.allow_formatting:
# self.key_file = self.encrypt(self.partition, *self.args, **self.kwargs)
# else:
if not self.key_file:
self.key_file = f"/tmp/{os.path.basename(self.partition.path)}.disk_pw" # TODO: Make disk-pw-file randomly unique?
if type(self.password) != bytes:
self.password = bytes(self.password, 'UTF-8')
@ -112,7 +108,7 @@ class luks2():
if cmd_handle.exit_code != 0:
raise DiskError(f'Could not encrypt volume "{partition.path}": {cmd_output}')
return key_file
def unlock(self, partition, mountpoint, key_file):

View File

@ -1,9 +1,8 @@
import urllib.request, logging
import urllib.request
from .exceptions import *
from .general import *
from .output import log
from .storage import storage
def filter_mirrors_by_region(regions, destination='/etc/pacman.d/mirrorlist', tmp_dir='/root', *args, **kwargs):
"""
@ -19,9 +18,10 @@ def filter_mirrors_by_region(regions, destination='/etc/pacman.d/mirrorlist', tm
o = b''.join(sys_command((f"/usr/bin/wget 'https://archlinux.org/mirrorlist/?{'&'.join(region_list)}&protocol=https&ip_version=4&ip_version=6&use_mirror_status=on' -O {tmp_dir}/mirrorlist")))
o = b''.join(sys_command((f"/usr/bin/sed -i 's/#Server/Server/' {tmp_dir}/mirrorlist")))
o = b''.join(sys_command((f"/usr/bin/mv {tmp_dir}/mirrorlist {destination}")))
return True
def add_custom_mirrors(mirrors:list, *args, **kwargs):
"""
This will append custom mirror definitions in pacman.conf
@ -37,6 +37,7 @@ def add_custom_mirrors(mirrors:list, *args, **kwargs):
return True
def insert_mirrors(mirrors, *args, **kwargs):
"""
This function will insert a given mirror-list at the top of `/etc/pacman.d/mirrorlist`.
@ -58,6 +59,7 @@ def insert_mirrors(mirrors, *args, **kwargs):
return True
def use_mirrors(regions :dict, destination='/etc/pacman.d/mirrorlist'):
log(f'A new package mirror-list has been created: {destination}', level=logging.INFO)
for region, mirrors in regions.items():
@ -67,11 +69,13 @@ def use_mirrors(regions :dict, destination='/etc/pacman.d/mirrorlist'):
mirrorlist.write(f'Server = {mirror}\n')
return True
def re_rank_mirrors(top=10, *positionals, **kwargs):
if sys_command((f'/usr/bin/rankmirrors -n {top} /etc/pacman.d/mirrorlist > /etc/pacman.d/mirrorlist')).exit_code == 0:
return True
return False
def list_mirrors():
url = f"https://archlinux.org/mirrorlist/?protocol=https&ip_version=4&ip_version=6&use_mirror_status=on"
regions = {}
@ -97,4 +101,4 @@ def list_mirrors():
url = line.lstrip('#Server = ')
regions[region][url] = True
return regions
return regions

View File

@ -7,22 +7,25 @@ from .exceptions import *
from .general import sys_command
from .storage import storage
def getHwAddr(ifname):
def get_hw_addr(ifname):
s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
info = fcntl.ioctl(s.fileno(), 0x8927, struct.pack('256s', bytes(ifname, 'utf-8')[:15]))
return ':'.join('%02x' % b for b in info[18:24])
def list_interfaces(skip_loopback=True):
interfaces = OrderedDict()
for index, iface in socket.if_nameindex():
if skip_loopback and iface == "lo":
continue
mac = getHwAddr(iface).replace(':', '-').lower()
mac = get_hw_addr(iface).replace(':', '-').lower()
interfaces[mac] = iface
return interfaces
def enrichIfaceTypes(interfaces :dict):
def enrich_iface_types(interfaces :dict):
result = {}
for iface in interfaces:
if os.path.isdir(f"/sys/class/net/{iface}/bridge/"):
@ -39,11 +42,13 @@ def enrichIfaceTypes(interfaces :dict):
result[iface] = 'UNKNOWN'
return result
def get_interface_from_mac(mac):
return list_interfaces().get(mac.lower(), None)
def wirelessScan(interface):
interfaces = enrichIfaceTypes(list_interfaces().values())
def wireless_scan(interface):
interfaces = enrich_iface_types(list_interfaces().values())
if interfaces[interface] != 'WIRELESS':
raise HardwareIncompatibilityError(f"Interface {interface} is not a wireless interface: {interfaces}")
@ -56,12 +61,13 @@ def wirelessScan(interface):
storage['_WIFI'][interface]['scanning'] = True
# TODO: Full WiFi experience might get evolved in the future, pausing for now 2021-01-25
def getWirelessNetworks(interface):
def get_wireless_networks(interface):
# TODO: Make this oneliner pritter to check if the interface is scanning or not.
if not '_WIFI' in storage or interface not in storage['_WIFI'] or storage['_WIFI'][interface].get('scanning', False) is False:
import time
wirelessScan(interface)
wireless_scan(interface)
time.sleep(5)
for line in sys_command(f"iwctl station {interface} get-networks"):

View File

@ -5,16 +5,18 @@ import logging
from pathlib import Path
from .storage import storage
# TODO: use logging's built in levels instead.
# Although logging is threaded and I wish to avoid that.
# It's more Pythonistic or w/e you want to call it.
class LOG_LEVELS:
class LogLevels:
Critical = 0b001
Error = 0b010
Warning = 0b011
Info = 0b101
Debug = 0b111
class journald(dict):
@abc.abstractmethod
def log(message, level=logging.DEBUG):
@ -27,19 +29,19 @@ class journald(dict):
# to logging levels (and warn about deprecated usage)
# There's some code re-usage here but that should be fine.
# TODO: Remove these in a few versions:
if level == LOG_LEVELS.Critical:
if level == LogLevels.Critical:
log("Deprecated level detected in log message, please use new logging.<level> instead for the following log message:", fg="red", level=logging.ERROR, force=True)
level = logging.CRITICAL
elif level == LOG_LEVELS.Error:
elif level == LogLevels.Error:
log("Deprecated level detected in log message, please use new logging.<level> instead for the following log message:", fg="red", level=logging.ERROR, force=True)
level = logging.ERROR
elif level == LOG_LEVELS.Warning:
elif level == LogLevels.Warning:
log("Deprecated level detected in log message, please use new logging.<level> instead for the following log message:", fg="red", level=logging.ERROR, force=True)
level = logging.WARNING
elif level == LOG_LEVELS.Info:
elif level == LogLevels.Info:
log("Deprecated level detected in log message, please use new logging.<level> instead for the following log message:", fg="red", level=logging.ERROR, force=True)
level = logging.INFO
elif level == LOG_LEVELS.Debug:
elif level == LogLevels.Debug:
log("Deprecated level detected in log message, please use new logging.<level> instead for the following log message:", fg="red", level=logging.ERROR, force=True)
level = logging.DEBUG
@ -49,14 +51,16 @@ class journald(dict):
log_ch.setFormatter(log_fmt)
log_adapter.addHandler(log_ch)
log_adapter.setLevel(logging.DEBUG)
log_adapter.log(level, message)
# TODO: Replace log() for session based logging.
class SessionLogging():
class SessionLogging:
def __init__(self):
pass
# Found first reference here: https://stackoverflow.com/questions/7445658/how-to-detect-if-the-console-does-support-ansi-escape-codes-in-python
# And re-used this: https://github.com/django/django/blob/master/django/core/management/color.py#L12
def supports_color():
@ -70,6 +74,7 @@ def supports_color():
is_a_tty = hasattr(sys.stdout, 'isatty') and sys.stdout.isatty()
return supported_platform and is_a_tty
# Heavily influenced by: https://github.com/django/django/blob/ae8338daf34fd746771e0678081999b656177bae/django/utils/termcolors.py#L13
# Color options here: https://askubuntu.com/questions/528928/how-to-do-underline-bold-italic-strikethrough-color-background-and-size-i
def stylize_output(text :str, *opts, **kwargs):
@ -94,6 +99,7 @@ def stylize_output(text :str, *opts, **kwargs):
text = '%s\x1b[%sm' % (text or '', RESET)
return '%s%s' % (('\x1b[%sm' % ';'.join(code_list)), text or '')
def log(*args, **kwargs):
string = orig_string = ' '.join([str(x) for x in args])
@ -132,19 +138,19 @@ def log(*args, **kwargs):
# to logging levels (and warn about deprecated usage)
# There's some code re-usage here but that should be fine.
# TODO: Remove these in a few versions:
if kwargs['level'] == LOG_LEVELS.Critical:
if kwargs['level'] == LogLevels.Critical:
log("Deprecated level detected in log message, please use new logging.<level> instead for the following log message:", fg="red", level=logging.ERROR, force=True)
kwargs['level'] = logging.CRITICAL
elif kwargs['level'] == LOG_LEVELS.Error:
elif kwargs['level'] == LogLevels.Error:
log("Deprecated level detected in log message, please use new logging.<level> instead for the following log message:", fg="red", level=logging.ERROR, force=True)
kwargs['level'] = logging.ERROR
elif kwargs['level'] == LOG_LEVELS.Warning:
elif kwargs['level'] == LogLevels.Warning:
log("Deprecated level detected in log message, please use new logging.<level> instead for the following log message:", fg="red", level=logging.ERROR, force=True)
kwargs['level'] = logging.WARNING
elif kwargs['level'] == LOG_LEVELS.Info:
elif kwargs['level'] == LogLevels.Info:
log("Deprecated level detected in log message, please use new logging.<level> instead for the following log message:", fg="red", level=logging.ERROR, force=True)
kwargs['level'] = logging.INFO
elif kwargs['level'] == LOG_LEVELS.Debug:
elif kwargs['level'] == LogLevels.Debug:
log("Deprecated level detected in log message, please use new logging.<level> instead for the following log message:", fg="red", level=logging.ERROR, force=True)
kwargs['level'] = logging.DEBUG
@ -156,7 +162,7 @@ def log(*args, **kwargs):
try:
journald.log(string, level=kwargs.get('level', logging.INFO))
except ModuleNotFoundError:
pass # Ignore writing to journald
pass # Ignore writing to journald
# Finally, print the log unless we skipped it based on level.
# We use sys.stdout.write()+flush() instead of print() to try and

View File

@ -1,10 +1,14 @@
import urllib.request, urllib.parse
import ssl, json
import json
import ssl
import urllib.parse
import urllib.request
from .exceptions import *
BASE_URL = 'https://archlinux.org/packages/search/json/?name={package}'
BASE_GROUP_URL = 'https://archlinux.org/groups/x86_64/{group}/'
def find_group(name):
ssl_context = ssl.create_default_context()
ssl_context.check_hostname = False
@ -16,11 +20,12 @@ def find_group(name):
return False
else:
raise err
# Just to be sure some code didn't slip through the exception
if response.code == 200:
return True
def find_package(name):
"""
Finds a specific package via the package database.
@ -33,6 +38,7 @@ def find_package(name):
data = response.read().decode('UTF-8')
return json.loads(data)
def find_packages(*names):
"""
This function returns the search results for many packages.
@ -44,6 +50,7 @@ def find_packages(*names):
result[package] = find_package(package)
return result
def validate_package_list(packages :list):
"""
Validates a list of given packages.
@ -53,8 +60,8 @@ def validate_package_list(packages :list):
for package in packages:
if not find_package(package)['results'] and not find_group(package):
invalid_packages.append(package)
if invalid_packages:
raise RequirementError(f"Invalid package names: {invalid_packages}")
return True
return True

View File

@ -1,13 +1,17 @@
import hashlib
import importlib.util
import json
import re
import ssl
import sys
import urllib.parse
import urllib.request
from typing import Optional
import os, urllib.request, urllib.parse, ssl, json, re
import importlib.util, sys, glob, hashlib, logging
from collections import OrderedDict
from .general import multisplit, sys_command
from .exceptions import *
from .general import multisplit
from .networking import *
from .output import log
from .storage import storage
def grab_url_data(path):
safe_path = path[:path.find(':')+1]+''.join([item if item in ('/', '?', '=', '&') else urllib.parse.quote(item) for item in multisplit(path[path.find(':')+1:], ('/', '?', '=', '&'))])
ssl_context = ssl.create_default_context()
@ -16,6 +20,7 @@ def grab_url_data(path):
response = urllib.request.urlopen(safe_path, context=ssl_context)
return response.read()
def list_profiles(filter_irrelevant_macs=True, subpath='', filter_top_level_profiles=False):
# TODO: Grab from github page as well, not just local static files
if filter_irrelevant_macs:
@ -55,7 +60,7 @@ def list_profiles(filter_irrelevant_macs=True, subpath='', filter_top_level_prof
except json.decoder.JSONDecodeError as err:
print(f'Error: Could not decode "{profiles_url}" result as JSON:', err)
return cache
for profile in profile_list:
if os.path.splitext(profile)[1] == '.py':
tailored = False
@ -73,7 +78,8 @@ def list_profiles(filter_irrelevant_macs=True, subpath='', filter_top_level_prof
return cache
class Script():
class Script:
def __init__(self, profile, installer=None):
# profile: https://hvornum.se/something.py
# profile: desktop
@ -154,6 +160,7 @@ class Script():
return sys.modules[self.namespace]
class Profile(Script):
def __init__(self, installer, path, args={}):
super(Profile, self).__init__(path, installer)
@ -238,6 +245,7 @@ class Profile(Script):
return imported.__packages__
return None
class Application(Profile):
def __repr__(self, *args, **kwargs):
return f'Application({os.path.basename(self.profile)})'

View File

@ -1,8 +1,6 @@
import os
from .exceptions import *
from .general import *
def service_state(service_name: str):
if os.path.splitext(service_name)[1] != '.service':
service_name += '.service' # Just to be safe

View File

@ -12,7 +12,7 @@ storage = {
'./profiles',
'~/.config/archinstall/profiles',
os.path.join(os.path.dirname(os.path.abspath(__file__)), 'profiles'),
#os.path.abspath(f'{os.path.dirname(__file__)}/../examples')
# os.path.abspath(f'{os.path.dirname(__file__)}/../examples')
],
'UPSTREAM_URL' : 'https://raw.githubusercontent.com/archlinux/archinstall/master/profiles',
'PROFILE_DB' : None, # Used in cases when listing profiles is desired, not mandatory for direct profile grabing.

View File

@ -1,4 +1,4 @@
class Ini():
class Ini:
def __init__(self, *args, **kwargs):
"""
Limited INI handler for now.
@ -25,11 +25,13 @@ class Ini():
return result
class Systemd(Ini):
"""
Placeholder class to do systemd specific setups.
"""
class Networkd(Systemd):
"""
Placeholder class to do systemd-network specific setups.

View File

@ -1,27 +1,40 @@
import getpass, pathlib, os, shutil, re, time
import sys, time, signal, ipaddress, logging
import termios, tty, select # Used for char by char polling of sys.stdin
import getpass
import ipaddress
import logging
import pathlib
import re
import select # Used for char by char polling of sys.stdin
import shutil
import signal
import sys
import termios
import time
import tty
from .exceptions import *
from .profiles import Profile
from .locale_helpers import list_keyboard_languages, verify_keyboard_layout, search_keyboard_layout
from .output import log
from .storage import storage
from .networking import list_interfaces
from .general import sys_command
from .hardware import AVAILABLE_GFX_DRIVERS, hasUEFI
from .locale_helpers import list_keyboard_languages, verify_keyboard_layout, search_keyboard_layout
from .networking import list_interfaces
from .output import log
from .profiles import Profile
## TODO: Some inconsistencies between the selection processes.
## Some return the keys from the options, some the values?
# TODO: Some inconsistencies between the selection processes.
# Some return the keys from the options, some the values?
def get_terminal_height():
return shutil.get_terminal_size().lines
def get_terminal_width():
return shutil.get_terminal_size().columns
def get_longest_option(options):
return max([len(x) for x in options])
def check_for_correct_username(username):
if re.match(r'^[a-z_][a-z0-9_-]*\$?$', username) and len(username) <= 32:
return True
@ -32,6 +45,7 @@ def check_for_correct_username(username):
)
return False
def do_countdown():
SIG_TRIGGER = False
def kill_handler(sig, frame):
@ -67,6 +81,7 @@ def do_countdown():
signal.signal(signal.SIGINT, original_sigint_handler)
return True
def get_password(prompt="Enter a password: "):
while (passwd := getpass.getpass(prompt)):
passwd_verification = getpass.getpass(prompt='And one more time for verification: ')
@ -80,6 +95,7 @@ def get_password(prompt="Enter a password: "):
return passwd
return None
def print_large_list(options, padding=5, margin_bottom=0, separator=': '):
highest_index_number_length = len(str(len(options)))
longest_line = highest_index_number_length + len(separator) + get_longest_option(options) + padding
@ -140,7 +156,7 @@ def generic_multi_select(options, text="Select one or more of the options above
section.input_pos = section._cursor_x
selected_option = section.get_keyboard_input(end=None)
# This string check is necessary to correct work with it
# Without this, Python will raise AttributeError because of stripping `None`
# Without this, Python will raise AttributeError because of stripping `None`
# It also allows to remove empty spaces if the user accidentally entered them.
if isinstance(selected_option, str):
selected_option = selected_option.strip()
@ -173,7 +189,7 @@ def generic_multi_select(options, text="Select one or more of the options above
return selected_options
class MiniCurses():
class MiniCurses:
def __init__(self, width, height):
self.width = width
self.height = height
@ -200,10 +216,10 @@ class MiniCurses():
if x < 0: x = 0
if y < 0: y = 0
#import time
#sys.stdout.write(f"Clearing from: {x, y}")
#sys.stdout.flush()
#time.sleep(2)
# import time
# sys.stdout.write(f"Clearing from: {x, y}")
# sys.stdout.flush()
# time.sleep(2)
sys.stdout.flush()
sys.stdout.write('\033[%d;%df' % (y, x))
@ -259,16 +275,16 @@ class MiniCurses():
poller.register(sys.stdin.fileno(), select.EPOLLIN)
EOF = False
while EOF is False:
eof = False
while eof is False:
for fileno, event in poller.poll(0.025):
char = sys.stdin.read(1)
#sys.stdout.write(f"{[char]}")
#sys.stdout.flush()
# sys.stdout.write(f"{[char]}")
# sys.stdout.flush()
if (newline := (char in ('\n', '\r'))):
EOF = True
if newline := (char in ('\n', '\r')):
eof = True
if not newline or strip_rowbreaks is False:
response += char
@ -287,6 +303,7 @@ class MiniCurses():
if response:
return response
def ask_for_superuser_account(prompt='Username for required superuser with sudo privileges: ', forced=False):
while 1:
new_user = input(prompt).strip(' ')
@ -304,6 +321,7 @@ def ask_for_superuser_account(prompt='Username for required superuser with sudo
password = get_password(prompt=f'Password for user {new_user}: ')
return {new_user: {"!password" : password}}
def ask_for_additional_users(prompt='Any additional users to install (leave blank for no users): '):
users = {}
superusers = {}
@ -315,7 +333,7 @@ def ask_for_additional_users(prompt='Any additional users to install (leave blan
if not check_for_correct_username(new_user):
continue
password = get_password(prompt=f'Password for user {new_user}: ')
if input("Should this user be a superuser (sudoer) [y/N]: ").strip(' ').lower() in ('y', 'yes'):
superusers[new_user] = {"!password" : password}
else:
@ -323,6 +341,7 @@ def ask_for_additional_users(prompt='Any additional users to install (leave blan
return users, superusers
def ask_for_a_timezone():
while True:
timezone = input('Enter a valid timezone (examples: Europe/Stockholm, US/Eastern) or press enter to use UTC: ').strip().strip('*.')
@ -337,6 +356,7 @@ def ask_for_a_timezone():
fg='red'
)
def ask_for_bootloader() -> str:
bootloader = "systemd-bootctl"
if hasUEFI()==False:
@ -347,6 +367,7 @@ def ask_for_bootloader() -> str:
bootloader="grub-install"
return bootloader
def ask_for_audio_selection():
audio = "pulseaudio" # Default for most desktop environments
pipewire_choice = input("Would you like to install pipewire instead of pulseaudio as the default audio server? [Y/n] ").lower()
@ -355,6 +376,7 @@ def ask_for_audio_selection():
return audio
def ask_to_configure_network():
# Optionally configure one network interface.
#while 1:
@ -422,6 +444,7 @@ def ask_to_configure_network():
return {}
def ask_for_disk_layout():
options = {
'keep-existing' : 'Keep existing partition layout and select which ones to use where',
@ -433,6 +456,7 @@ def ask_for_disk_layout():
allow_empty_input=False, sort=True)
return next((key for key, val in options.items() if val == value), None)
def ask_for_main_filesystem_format():
options = {
'btrfs' : 'btrfs',
@ -445,6 +469,7 @@ def ask_for_main_filesystem_format():
allow_empty_input=False)
return next((key for key, val in options.items() if val == value), None)
def generic_select(options, input_text="Select one of the above by index or absolute value: ", allow_empty_input=True, options_output=True, sort=False):
"""
A generic select function that does not output anything
@ -477,7 +502,6 @@ def generic_select(options, input_text="Select one of the above by index or abso
# As we pass only list and dict (converted to list), we can skip converting to list
options = sorted(options)
# Added ability to disable the output of options items,
# if another function displays something different from this
if options_output:
@ -510,6 +534,7 @@ def generic_select(options, input_text="Select one of the above by index or abso
return selected_option
def select_disk(dict_o_disks):
"""
Asks the user to select a harddrive from the `dict_o_disks` selection.
@ -525,18 +550,18 @@ def select_disk(dict_o_disks):
if len(drives) >= 1:
for index, drive in enumerate(drives):
print(f"{index}: {drive} ({dict_o_disks[drive]['size'], dict_o_disks[drive].device, dict_o_disks[drive]['label']})")
log(f"You can skip selecting a drive and partitioning and use whatever drive-setup is mounted at /mnt (experimental)", fg="yellow")
drive = generic_select(drives, 'Select one of the above disks (by name or number) or leave blank to use /mnt: ',
options_output=False)
drive = generic_select(drives, 'Select one of the above disks (by name or number) or leave blank to use /mnt: ', options_output=False)
if not drive:
return drive
drive = dict_o_disks[drive]
return drive
raise DiskError('select_disk() requires a non-empty dictionary of disks to select from.')
def select_profile(options):
"""
Asks the user to select a profile from the `options` dictionary parameter.
@ -565,6 +590,7 @@ def select_profile(options):
else:
raise RequirementError("Selecting profiles require a least one profile to be given as an option.")
def select_language(options, show_only_country_codes=True):
"""
Asks the user to select a language from the `options` dictionary parameter.
@ -579,8 +605,8 @@ def select_language(options, show_only_country_codes=True):
:return: The language/dictionary key of the selected language
:rtype: str
"""
DEFAULT_KEYBOARD_LANGUAGE = 'us'
default_keyboard_language = 'us'
if show_only_country_codes:
languages = sorted([language for language in list(options) if len(language) == 2])
else:
@ -596,7 +622,7 @@ def select_language(options, show_only_country_codes=True):
while True:
selected_language = input('Select one of the above keyboard languages (by name or full name): ')
if not selected_language:
return DEFAULT_KEYBOARD_LANGUAGE
return default_keyboard_language
elif selected_language.lower() in ('?', 'help'):
while True:
filter_string = input("Search for layout containing (example: \"sv-\") or enter 'exit' to exit from search: ")
@ -624,6 +650,7 @@ def select_language(options, show_only_country_codes=True):
raise RequirementError("Selecting languages require a least one language to be given as an option.")
def select_mirror_regions(mirrors, show_top_mirrors=True):
"""
Asks the user to select a mirror or region from the `mirrors` dictionary parameter.
@ -665,6 +692,7 @@ def select_mirror_regions(mirrors, show_top_mirrors=True):
raise RequirementError("Selecting mirror region require a least one region to be given as an option.")
def select_driver(options=AVAILABLE_GFX_DRIVERS):
"""
Some what convoluted function, which's job is simple.
@ -673,10 +701,10 @@ def select_driver(options=AVAILABLE_GFX_DRIVERS):
(The template xorg is for beginner users, not advanced, and should
there for appeal to the general public first and edge cases later)
"""
drivers = sorted(list(options))
default_option = options["All open-source (default)"]
if drivers:
lspci = sys_command(f'/usr/bin/lspci')
for line in lspci.trace_log.split(b'\r\n'):
@ -696,8 +724,7 @@ def select_driver(options=AVAILABLE_GFX_DRIVERS):
if type(selected_driver) == dict:
driver_options = sorted(list(selected_driver))
driver_package_group = generic_select(driver_options, f'Which driver-type do you want for {initial_option}: ',
allow_empty_input=False)
driver_package_group = generic_select(driver_options, f'Which driver-type do you want for {initial_option}: ', allow_empty_input=False)
driver_package_group = selected_driver[driver_package_group]
return driver_package_group
@ -706,6 +733,7 @@ def select_driver(options=AVAILABLE_GFX_DRIVERS):
raise RequirementError("Selecting drivers require a least one profile to be given as an option.")
def select_kernel(options):
"""
Asks the user to select a kernel for system.
@ -716,12 +744,12 @@ def select_kernel(options):
:return: The string as a selected kernel
:rtype: string
"""
DEFAULT_KERNEL = "linux"
default_kernel = "linux"
kernels = sorted(list(options))
if kernels:
return generic_multi_select(kernels, f"Choose which kernels to use (leave blank for default: {DEFAULT_KERNEL}): ", default=DEFAULT_KERNEL, sort=False)
return generic_multi_select(kernels, f"Choose which kernels to use (leave blank for default: {default_kernel}): ", default=default_kernel, sort=False)
raise RequirementError("Selecting kernels require a least one kernel to be given as an option.")

View File

@ -10,7 +10,7 @@ if archinstall.arguments.get('help'):
exit(0)
# For support reasons, we'll log the disk layout pre installation to match against post-installation layout
archinstall.log(f"Disk states before installing: {archinstall.disk_layouts()}", level=archinstall.LOG_LEVELS.Debug)
archinstall.log(f"Disk states before installing: {archinstall.disk_layouts()}", level=archinstall.LogLevels.Debug)
def ask_user_questions():
@ -387,7 +387,7 @@ def perform_installation(mountpoint):
pass
# For support reasons, we'll log the disk layout post installation (crash or no crash)
archinstall.log(f"Disk states after installing: {archinstall.disk_layouts()}", level=archinstall.LOG_LEVELS.Debug)
archinstall.log(f"Disk states after installing: {archinstall.disk_layouts()}", level=archinstall.LogLevels.Debug)
ask_user_questions()
perform_installation_steps()