archinstall/archinstall.py

1382 lines
53 KiB
Python

#!/usr/bin/python3
import traceback
import os, re, struct, sys, json, pty, shlex
import urllib.request, urllib.parse, ssl, signal
import time
from glob import glob
from select import epoll, EPOLLIN, EPOLLHUP
from socket import socket, inet_ntoa, AF_INET, AF_INET6, AF_PACKET
from collections import OrderedDict as oDict
from subprocess import Popen, STDOUT, PIPE, check_output
from random import choice
from string import ascii_uppercase, ascii_lowercase, digits
from hashlib import sha512
from threading import Thread, enumerate as tenum
if not os.path.isdir('/sys/firmware/efi'):
print('[E] This script only supports UEFI-booted machines.')
exit(1)
if os.path.isfile('./SAFETY_LOCK'):
SAFETY_LOCK = True
else:
SAFETY_LOCK = False
profiles_path = 'https://raw.githubusercontent.com/Torxed/archinstall/master/deployments'
rootdir_pattern = re.compile('^.*?/devices')
harddrives = oDict()
commandlog = []
worker_history = oDict()
instructions = oDict()
args = {}
import logging
from systemd.journal import JournalHandler
# Custom adapter to pre-pend the 'origin' key.
# TODO: Should probably use filters: https://docs.python.org/3/howto/logging-cookbook.html#using-filters-to-impart-contextual-information
class CustomAdapter(logging.LoggerAdapter):
def process(self, msg, kwargs):
return '[{}] {}'.format(self.extra['origin'], msg), kwargs
logger = logging.getLogger() # __name__
journald_handler = JournalHandler()
journald_handler.setFormatter(logging.Formatter('[{levelname}] {message}', style='{'))
logger.addHandler(journald_handler)
logger.setLevel(logging.DEBUG)
class LOG_LEVELS:
CRITICAL = 1
ERROR = 2
WARNING = 3
INFO = 4
DEBUG = 5
LOG_LEVEL = 4
def log(*msg, origin='UNKNOWN', level=5, **kwargs):
if level <= LOG_LEVEL:
msg = [item.decode('UTF-8', errors='backslashreplace') if type(item) == bytes else item for item in msg]
msg = [str(item) if type(item) != str else item for item in msg]
log_adapter = CustomAdapter(logger, {'origin': origin})
if level <= 1:
log_adapter.critical(' '.join(msg))
elif level <= 2:
log_adapter.error(' '.join(msg))
elif level <= 3:
log_adapter.warning(' '.join(msg))
elif level <= 4:
log_adapter.info(' '.join(msg))
else:
log_adapter.debug(' '.join(msg))
## == Profiles Path can be set via --profiles-path=/path
## This just sets the default path if the parameter is omitted.
try:
import psutil
except:
## Time to monkey patch in all the stats and psutil fuctions if it isn't installed.
class mem():
def __init__(self, free, percent=-1):
self.free = free
self.percent = percent
class disk():
def __init__(self, size, free, percent):
self.total = size
self.used = 0
self.free = free
self.percent = percent
class iostat():
def __init__(self, interface, bytes_sent=0, bytes_recv=0):
self.interface = interface
self.bytes_recv = int(bytes_recv)
self.bytes_sent = int(bytes_sent)
def __repr__(self, *positionals, **kwargs):
return f'iostat@{self.interface}[bytes_sent: {self.bytes_sent}, bytes_recv: {self.bytes_recv}]'
class psutil():
def cpu_percent(interval=0):
## This just counts the ammount of time the CPU has spent. Find a better way!
with cmd("grep 'cpu ' /proc/stat | awk '{usage=($2+$4)*100/($2+$4+$5)} END {print usage}'") as output:
for line in output:
return float(line.strip().decode('UTF-8'))
def virtual_memory():
with cmd("grep 'MemFree: ' /proc/meminfo | awk '{free=($2)} END {print free}'") as output:
for line in output:
return mem(float(line.strip().decode('UTF-8')))
def disk_usage(partition):
disk_stats = os.statvfs(partition)
free_size = disk_stats.f_bfree * disk_stats.f_bsize
disk_size = disk_stats.f_blocks * disk_stats.f_bsize
percent = (100/disk_size)*free_size
return disk(disk_size, free_size, percent)
def net_if_addrs():
interfaces = {}
for root, folders, files in os.walk('/sys/class/net/'):
for name in folders:
interfaces[name] = {}
return interfaces
def net_io_counters(pernic=True):
data = {}
for interface in psutil.net_if_addrs().keys():
with cmd("grep '{interface}:' /proc/net/dev | awk '{{recv=$2}}{{send=$10}} END {{print send,recv}}'".format(interface=interface)) as output:
for line in output:
data[interface] = iostat(interface, *line.strip().decode('UTF-8').split(' ',1))
return data
## FIXME: dependency checks (fdisk, lsblk etc)
def sig_handler(signal, frame):
print('\nAborting further installation steps!')
print(' Here\'s a summary of the commandline:')
print(f' {sys.argv}')
exit(0)
signal.signal(signal.SIGINT, sig_handler)
def gen_uid(entropy_length=256):
return sha512(os.urandom(entropy_length)).hexdigest()
def get_default_gateway_linux(*positionals, **kwargs):
"""Read the default gateway directly from /proc."""
with open("/proc/net/route") as fh:
for line in fh:
fields = line.strip().split()
if fields[1] != '00000000' or not int(fields[3], 16) & 2:
continue
return inet_ntoa(struct.pack("<L", int(fields[2], 16)))
def get_local_MACs():
macs = {}
for nic, opts in psutil.net_if_addrs().items():
for addr in opts:
#if addr.family in (AF_INET, AF_INET6) and addr.address:
if addr.family == AF_PACKET: # MAC
macs[addr.address] = nic
return macs
def gen_yubikey_password():
return None #TODO: Implement
def pid_exists(pid):
"""Check whether pid exists in the current process table."""
if pid < 0:
return False
try:
os.kill(pid, 0)
except (OSError, e):
return e.errno == errno.EPERMRM
else:
return True
def simple_command(cmd, opts=None, *positionals, **kwargs):
if not opts: opts = {}
if 'debug' in opts:
print('[!] {}'.format(cmd))
handle = Popen(cmd, shell='True', stdout=PIPE, stderr=STDOUT, stdin=PIPE)
output = b''
while handle.poll() is None:
data = handle.stdout.read()
if len(data):
if 'debug' in opts:
print(data.decode('UTF-8'), end='')
# print(data.decode('UTF-8'), end='')
output += data
data = handle.stdout.read()
if 'debug' in opts:
print(data.decode('UTF-8'), end='')
output += data
handle.stdin.close()
handle.stdout.close()
return output
class sys_command():#Thread):
def __init__(self, cmd, callback=None, start_callback=None, *positionals, **kwargs):
if not 'worker_id' in kwargs: kwargs['worker_id'] = gen_uid()
if not 'emulate' in kwargs: kwargs['emulate'] = SAFETY_LOCK
#Thread.__init__(self)
if kwargs['emulate']:
print(f"Starting command '{cmd}' in emulation mode.")
self.cmd = shlex.split(cmd)
self.args = args
self.kwargs = kwargs
self.callback = callback
self.pid = None
self.exit_code = None
self.started = time.time()
self.ended = None
self.worker_id = kwargs['worker_id']
self.trace_log = b''
self.status = 'starting'
user_catalogue = os.path.expanduser('~')
self.cwd = f"{user_catalogue}/archinstall/cache/workers/{kwargs['worker_id']}/"
self.exec_dir = f'{self.cwd}/{os.path.basename(self.cmd[0])}_workingdir'
if not self.cmd[0][0] == '/':
log('Worker command is not executed with absolute path, trying to find: {}'.format(self.cmd[0]), origin='spawn', level=5)
o = check_output(['/usr/bin/which', self.cmd[0]])
log('This is the binary {} for {}'.format(o.decode('UTF-8'), self.cmd[0]), origin='spawn', level=5)
self.cmd[0] = o.decode('UTF-8').strip()
if not os.path.isdir(self.exec_dir):
os.makedirs(self.exec_dir)
if self.kwargs['emulate']:
commandlog.append(cmd + ' #emulated')
elif 'hide_from_log' in self.kwargs and self.kwargs['hide_from_log']:
pass
else:
commandlog.append(cmd)
if start_callback: start_callback(self, *positionals, **kwargs)
#self.start()
self.run()
def __iter__(self, *positionals, **kwargs):
for line in self.trace_log.split(b'\n'):
yield line
def __repr__(self, *positionals, **kwargs):
return f"{self.cmd, self.trace_log}"
def decode(self, fmt='UTF-8'):
return self.trace_log.decode(fmt)
def dump(self):
return {
'status' : self.status,
'worker_id' : self.worker_id,
'worker_result' : self.trace_log.decode('UTF-8'),
'started' : self.started,
'ended' : self.ended,
'started_pprint' : '{}-{}-{} {}:{}:{}'.format(*time.localtime(self.started)),
'ended_pprint' : '{}-{}-{} {}:{}:{}'.format(*time.localtime(self.ended)) if self.ended else None,
'exit_code' : self.exit_code
}
def run(self):
#main = None
#for t in tenum():
# if t.name == 'MainThread':
# main = t
# break
#if not main:
# print('Main thread not existing')
# return
self.status = 'running'
old_dir = os.getcwd()
os.chdir(self.exec_dir)
self.pid, child_fd = pty.fork()
if not self.pid: # Child process
# Replace child process with our main process
if not self.kwargs['emulate']:
try:
os.execv(self.cmd[0], self.cmd)
except FileNotFoundError:
self.status = 'done'
log(f"{self.cmd[0]} does not exist.", origin='spawn', level=2)
self.exit_code = 1
return False
os.chdir(old_dir)
poller = epoll()
poller.register(child_fd, EPOLLIN | EPOLLHUP)
if 'events' in self.kwargs and 'debug' in self.kwargs:
print(f'[D] Using triggers for command: {self.cmd}')
print(json.dumps(self.kwargs['events']))
alive = True
last_trigger_pos = 0
while alive and not self.kwargs['emulate']:
for fileno, event in poller.poll(0.1):
try:
output = os.read(child_fd, 8192).strip()
self.trace_log += output
except OSError:
alive = False
break
if 'debug' in self.kwargs and self.kwargs['debug'] and len(output):
print(self.cmd[0], 'gave:', output.decode('UTF-8'))
log(self.cmd[0],'gave:', output.decode('UTF-8'), origin='spawn', level=4)
lower = output.lower()
broke = False
if 'events' in self.kwargs:
for trigger in list(self.kwargs['events']):
if type(trigger) != bytes:
original = trigger
trigger = bytes(original, 'UTF-8')
self.kwargs['events'][trigger] = self.kwargs['events'][original]
del(self.kwargs['events'][original])
if type(self.kwargs['events'][trigger]) != bytes:
self.kwargs['events'][trigger] = bytes(self.kwargs['events'][trigger], 'UTF-8')
if trigger.lower() in self.trace_log[last_trigger_pos:].lower():
trigger_pos = self.trace_log[last_trigger_pos:].lower().find(trigger.lower())
if 'debug' in self.kwargs and self.kwargs['debug']:
print(f"Writing to subprocess {self.cmd[0]}: {self.kwargs['events'][trigger].decode('UTF-8')}")
log(f"Writing to subprocess {self.cmd[0]}: {self.kwargs['events'][trigger].decode('UTF-8')}", origin='spawn', level=5)
last_trigger_pos = trigger_pos
os.write(child_fd, self.kwargs['events'][trigger])
del(self.kwargs['events'][trigger])
broke = True
break
if broke:
continue
## Adding a exit trigger:
if len(self.kwargs['events']) == 0:
if 'debug' in self.kwargs and self.kwargs['debug']:
log(f"Waiting for last command {self.cmd[0]} to finish.", origin='spawn', level=4)
if bytes(f']$'.lower(), 'UTF-8') in self.trace_log[0-len(f']$')-5:].lower():
if 'debug' in self.kwargs and self.kwargs['debug']:
log(f"{self.cmd[0]} has finished.", origin='spawn', level=4)
alive = False
break
self.status = 'done'
if 'debug' in self.kwargs and self.kwargs['debug']:
log(f"{self.cmd[0]} waiting for exit code.", origin='spawn', level=5)
if not self.kwargs['emulate']:
try:
self.exit_code = os.waitpid(self.pid, 0)[1]
except ChildProcessError:
try:
self.exit_code = os.waitpid(child_fd, 0)[1]
except ChildProcessError:
self.exit_code = 1
else:
self.exit_code = 0
if 'ignore_errors' in self.kwargs:
self.exit_code = 0
if self.exit_code != 0:
log(f"{self.cmd[0]} did not exit gracefully, exit code {self.exit_code}.", origin='spawn', level=3)
log(self.trace_log.decode('UTF-8'), origin='spawn', level=3)
else:
log(f"{self.cmd[0]} exit nicely.", origin='spawn', level=5)
self.ended = time.time()
with open(f'{self.cwd}/trace.log', 'wb') as fh:
fh.write(self.trace_log)
worker_history[self.worker_id] = self.dump()
if 'dependency' in self.kwargs:
pass # TODO: Not yet supported (steal it from archinstall_gui)
"""
dependency = self.kwargs['dependency']
if type(dependency) == str:
# Dependency is a progress-string. Wait for it to show up.
while main and main.isAlive() and dependency not in progress or progress[dependency] is None:
time.sleep(0.25)
dependency = progress[dependency]
if type(dependency) == str:
log(f"{self.func} waited for progress {dependency} which never showed up. Aborting.", level=2, origin='worker', function='run')
self.ended = time.time()
self.status = 'aborted'
return None
while main and main.isAlive() and dependency.ended is None:
time.sleep(0.25)
print(' *** Dependency released for:', self.func)
if dependency.data is None or not main or not main.isAlive():
log('Dependency:', dependency.func, 'did not exit clearly. There for,', self.func, 'can not execute.', level=2, origin='worker', function='run')
self.ended = time.time()
self.status = 'aborted'
return None
"""
if self.callback:
pass # TODO: Not yet supported (steal it from archinstall_gui)
#self.callback(self, *self.args, **self.kwargs)
def get_drive_from_uuid(drive):
if len(harddrives) <= 0: raise ValueError("No hard drives to iterate in order to find: {}".format(uuid))
for drive in harddrives:
#for partition in psutil.disk_partitions('/dev/{}'.format(name)):
# pass #blkid -s PARTUUID -o value /dev/sda2
o = simple_command(f'blkid -s PTUUID -o value /dev/{drive}')
if len(o) and o == uuid:
return drive
return None
def get_drive_from_part_uuid(partuuid, *positionals, **kwargs):
if len(harddrives) <= 0: raise ValueError("No hard drives to iterate in order to find: {}".format(uuid))
for drive in harddrives:
for partition in get_partitions(f'/dev/{drive}', *positionals, **kwargs):
o = simple_command(f'blkid -s PARTUUID -o value /dev/{drive}{partition}')
if len(o) and o == partuuid:
return drive
return None
def update_git(branch='master'):
default_gw = get_default_gateway_linux()
if(default_gw):
print('[N] Checking for updates...')
## Not the most elegant way to make sure git conflicts doesn't occur (yea fml)
if os.path.isfile('/root/archinstall/archinstall.py'):
os.remove('/root/archinstall/archinstall.py')
if os.path.isfile('/root/archinstall/README.md'):
os.remove('/root/archinstall/README.md')
output = simple_command('(cd /root/archinstall; git reset --hard origin/$(git branch | grep "*" | cut -d\' \' -f 2); git pull)')
if b'error:' in output:
print('[N] Could not update git source for some reason.')
return
# b'From github.com:Torxed/archinstall\n 339d687..80b97f3 master -> origin/master\nUpdating 339d687..80b97f3\nFast-forward\n README.md | 2 +-\n 1 file changed, 1 insertion(+), 1 deletion(-)\n'
if output != b'Already up to date' or branch != 'master':
#tmp = re.findall(b'[0-9]+ file changed', output)
#print(tmp)
#if len(tmp):
# num_changes = int(tmp[0].split(b' ',1)[0])
# if(num_changes):
if branch != 'master':
on_branch = simple_command('(cd /root/archinstall; git branch | grep "*" | cut -d\' \' -f 2)').decode('UTF-8').strip()
if on_branch.lower() != branch.lower():
print(f'[N] Changing branch from {on_branch} to {branch}')
output = simple_command(f'(cd /root/archinstall; git checkout {branch}; git pull)')
print('[N] Rebooting the new branch')
if not 'rebooted' in args:
os.execv('/usr/bin/python3', ['archinstall.py'] + sys.argv + ['--rebooted','--rerun'])
else:
os.execv('/usr/bin/python3', ['archinstall.py'] + sys.argv + ['--rerun',])
if not 'rebooted' in args:
## Reboot the script (in same context)
print('[N] Rebooting the script')
os.execv('/usr/bin/python3', ['archinstall.py'] + sys.argv + ['--rebooted',])
extit(1)
def device_state(name, *positionals, **kwargs):
# Based out of: https://askubuntu.com/questions/528690/how-to-get-list-of-all-non-removable-disk-device-names-ssd-hdd-and-sata-ide-onl/528709#528709
if os.path.isfile('/sys/block/{}/device/block/{}/removable'.format(name, name)):
with open('/sys/block/{}/device/block/{}/removable'.format(name, name)) as f:
if f.read(1) == '1':
return
path = rootdir_pattern.sub('', os.readlink('/sys/block/{}'.format(name)))
hotplug_buses = ("usb", "ieee1394", "mmc", "pcmcia", "firewire")
for bus in hotplug_buses:
if os.path.exists('/sys/bus/{}'.format(bus)):
for device_bus in os.listdir('/sys/bus/{}/devices'.format(bus)):
device_link = rootdir_pattern.sub('', os.readlink('/sys/bus/{}/devices/{}'.format(bus, device_bus)))
if re.search(device_link, path):
return
return True
def get_partitions(dev, *positionals, **kwargs):
drive_name = os.path.basename(dev)
parts = oDict()
#o = b''.join(sys_command('/usr/bin/lsblk -o name -J -b {dev}'.format(dev=dev)))
o = b''.join(sys_command(f'/usr/bin/lsblk -J {dev}', hide_from_log=True))
if b'not a block device' in o:
## TODO: Replace o = sys_command() with code, o = sys_command()
## and make sys_command() return the exit-code, way safer than checking output strings :P
return {}
if not o[:1] == b'{':
print('[E] Error in getting blk devices:', o)
exit(1)
r = json.loads(o.decode('UTF-8'))
if len(r['blockdevices']) and 'children' in r['blockdevices'][0]:
for part in r['blockdevices'][0]['children']:
#size = os.statvfs(dev + part['name'][len(drive_name):])
parts[part['name'][len(drive_name):]] = {
#'size' : size.f_frsize * size.f_bavail,
#'blocksize' : size.f_frsize * size.f_blocks
'size' : part['size']
}
return parts
def get_disk_model(drive):
with open(f'/sys/block/{os.path.basename(drive)}/device/model', 'rb') as fh:
return fh.read().decode('UTF-8').strip()
def get_disk_size(drive):
dev_short_name = os.path.basename(drive)
with open(f'/sys/block/{dev_short_name}/device/block/{dev_short_name}/size', 'rb') as fh:
return ''.join(human_readable_size(fh.read().decode('UTF-8').strip()))
def disk_info(drive, *positionals, **kwargs):
lkwargs = {**kwargs}
lkwargs['emulate'] = False # This is a emulate-safe function. Does not alter filesystem.
info = json.loads(b''.join(sys_command(f'lsblk -J -o "NAME,SIZE,FSTYPE,LABEL" {drive}', *positionals, **lkwargs, hide_from_log=True)).decode('UTF_8'))['blockdevices'][0]
fileformats = []
labels = []
if 'children' in info: ## Might not be partitioned yet
for child in info['children']:
if child['fstype'] != None:
fileformats.append(child['fstype'])
if child['label'] != None:
labels.append(child['label'])
else:
fileformats = ['*Empty Drive*']
labels = ['(no partitions)']
info['fileformats'] = fileformats
info['labels'] = labels
info['model'] = get_disk_model(drive)
return info
def cleanup_args():
for key in args:
if args[key] == '<STDIN>':
if not args['unattended']:
args[key] = input(f'Enter a value for {key}: ')
else:
args[key] = random_string(32)
elif args[key] == '<RND_STR>': args[key] = random_string(32)
elif args[key] == '<YUBIKEY>':
args[key] = gen_yubikey_password()
if not args[key]:
print('[E] Failed to setup a yubikey password, is it plugged in?')
exit(1)
def merge_in_includes(instructions, *positionals, **kwargs):
if 'args' in instructions:
## == Recursively fetch instructions if "include" is found under {args: ...}
while 'include' in instructions['args']:
includes = instructions['args']['include']
print('[!] Importing net-deploy target: {}'.format(includes))
del(instructions['args']['include'])
if type(includes) in (dict, list):
for include in includes:
instructions = merge_dicts(instructions, get_instructions(include, *positionals, **kwargs), before=True)
else:
instructions = merge_dicts(instructions, get_instructions(includes), *positionals, **kwargs, before=True)
## Update arguments if we found any
for key, val in instructions['args'].items():
args[key] = val
if 'args' in instructions:
## TODO: Reuseable code, there's to many get_instructions, merge_dictgs and args updating going on.
## Update arguments if we found any
for key, val in instructions['args'].items():
args[key] = val
if 'user_args' in kwargs:
for key, val in kwargs['user_args'].items():
args[key] = val
return instructions
def update_drive_list(*positionals, **kwargs):
# https://github.com/karelzak/util-linux/blob/f920f73d83f8fd52e7a14ec0385f61fab448b491/disk-utils/fdisk-list.c#L52
for path in glob('/sys/block/*/device'):
name = re.sub('.*/(.*?)/device', '\g<1>', path)
if device_state(name, *positionals, **kwargs):
harddrives[f'/dev/{name}'] = disk_info(f'/dev/{name}', *positionals, **kwargs)
def human_readable_size(bits, sizes=[{8 : 'b'}, {1024 : 'kb'}, {1024 : 'mb'}, {1024 : 'gb'}, {1024 : 'tb'}, {1024 : 'zb?'}]):
# Not needed if using lsblk.
end_human = None
for pair in sizes:
size, human = list(pair.items())[0]
if bits / size > 1:
bits = bits/size
end_human = human
else:
break
return bits, end_human
def human_disk_info(drive):
return {
'size' : harddrives[drive]['size'],
'fileformat' : harddrives[drive]['fileformats'],
'labels' : harddrives[drive]['labels']
}
def close_disks():
o = simple_command('/usr/bin/umount -R /mnt/boot')
o = simple_command('/usr/bin/umount -R /mnt')
o = simple_command('/usr/bin/cryptsetup close /dev/mapper/luksdev')
def format_disk(drive='drive', start='start', end='size', emulate=False, *positionals, **kwargs):
drive = args[drive]
start = args[start]
end = args[end]
if not drive:
raise ValueError('Need to supply a drive path, for instance: /dev/sdx')
# dd if=/dev/random of=args['drive'] bs=4096 status=progress
# https://github.com/dcantrell/pyparted would be nice, but isn't officially in the repo's #SadPanda
if sys_command(f'/usr/bin/parted -s {drive} mklabel gpt', emulate=emulate, *positionals, **kwargs).exit_code != 0:
return None
if sys_command(f'/usr/bin/parted -s {drive} mklabel gpt', emulate=emulate, *positionals, **kwargs).exit_code != 0:
return None
if sys_command(f'/usr/bin/parted -s {drive} mkpart primary FAT32 1MiB {start}', emulate=emulate, *positionals, **kwargs).exit_code != 0:
return None
if sys_command(f'/usr/bin/parted -s {drive} name 1 "EFI"', emulate=emulate, *positionals, **kwargs).exit_code != 0:
return None
if sys_command(f'/usr/bin/parted -s {drive} set 1 esp on', emulate=emulate, *positionals, **kwargs).exit_code != 0:
return None
if sys_command(f'/usr/bin/parted -s {drive} set 1 boot on', emulate=emulate, *positionals, **kwargs).exit_code != 0:
return None
if sys_command(f'/usr/bin/parted -s {drive} mkpart primary {start} {end}', emulate=emulate, *positionals, **kwargs).exit_code != 0:
return None
# TODO: grab partitions after each parted/partition step instead of guessing which partiton is which later on.
# Create one, grab partitions - dub that to "boot" or something. do the next partition, grab that and dub it "system".. or something..
# This "assumption" has bit me in the ass so many times now I've stoped counting.. Jerker is right.. Don't do it like this :P
return True
def multisplit(s, splitters):
s = [s,]
for key in splitters:
ns = []
for obj in s:
x = obj.split(key)
for index, part in enumerate(x):
if len(part):
ns.append(part)
if index < len(x)-1:
ns.append(key)
s = ns
return s
def grab_url_data(path):
safe_path = path[:path.find(':')+1]+''.join([item if item in ('/', '?', '=', '&') else urllib.parse.quote(item) for item in multisplit(path[path.find(':')+1:], ('/', '?', '=', '&'))])
ssl_context = ssl.create_default_context()
ssl_context.check_hostname = False
ssl_context.verify_mode=ssl.CERT_NONE
response = urllib.request.urlopen(safe_path, context=ssl_context)
return response.read()
def get_application_instructions(target):
instructions = {}
try:
instructions = grab_url_data('{}/applications/{}.json'.format(args['profiles-path'], target)).decode('UTF-8')
print('[N] Found application instructions for: {}'.format(target))
except urllib.error.HTTPError:
print('[N] Could not find remote instructions. yrying local instructions under ./deployments/applications')
local_path = './deployments/applications' if os.path.isfile('./archinstall.py') else './archinstall/deployments/applications' # Dangerous assumption
if os.path.isfile(f'{local_path}/{target}.json'):
with open(f'{local_path}/{target}.json', 'r') as fh:
instructions = fh.read()
print('[N] Found local application instructions for: {}'.format(target))
else:
print('[N] No instructions found for: {}'.format(target))
return instructions
try:
instructions = json.loads(instructions, object_pairs_hook=oDict)
except:
print('[E] JSON syntax error in {}'.format('{}/applications/{}.json'.format(args['profiles-path'], target)))
traceback.print_exc()
exit(1)
return instructions
def get_local_instructions(target):
instructions = oDict()
local_path = './deployments' if os.path.isfile('./archinstall.py') else './archinstall/deployments' # Dangerous assumption
if os.path.isfile(f'{local_path}/{target}.json'):
with open(f'{local_path}/{target}.json', 'r') as fh:
instructions = fh.read()
print('[N] Found local instructions called: {}'.format(target))
else:
print('[N] No instructions found called: {}'.format(target))
return instructions
def get_instructions(target, *positionals, **kwargs):
instructions = oDict()
if get_default_gateway_linux():
try:
instructions = grab_url_data('{}/{}.json'.format(args['profiles-path'], target)).decode('UTF-8')
print('[N] Found net-deploy instructions called: {}'.format(target))
except urllib.error.HTTPError:
print('[N] Could not find remote instructions. Trying local instructions under ./deployments')
isntructions = get_local_instructions(target, *positionals)
else:
isntructions = get_local_instructions(target, *positionals)
if type(instructions) not in (dict, oDict,):
try:
instructions = json.loads(instructions, object_pairs_hook=oDict)
except:
print('[E] JSON syntax error in {}'.format('{}/{}.json'.format(args['profiles-path'], target)))
traceback.print_exc()
exit(1)
return instructions
def merge_dicts(d1, d2, before=True, overwrite=False):
""" Merges d2 into d1 """
if before:
d1, d2 = d2.copy(), d1.copy()
overwrite = True
for key, val in d2.items():
if key in d1:
if type(d1[key]) in [dict, oDict] and type(d2[key]) in [dict, oDict]:
d1[key] = merge_dicts(d1[key] if not before else d2[key], d2[key] if not before else d1[key], before=before, overwrite=overwrite)
elif overwrite:
d1[key] = val
else:
d1[key] = val
return d1
def random_string(l):
return ''.join(choice(ascii_uppercase + ascii_lowercase + digits) for i in range(l))
def phone_home(url):
payload = json.dumps({"hostname": args['hostname'],
"done" : time.time(),
"profile": args['profile'],
"drive": args['drive'],
"base_status": base_return_code}).encode('utf8')
request = urllib.request.Request(url, data=payload, headers={'content-type': 'application/json'})
response = urllib.request.urlopen(request)
def get_external_ip(*positionals, **kwargs):
result = urllib.request.urlopen("https://hvornum.se/ip/?f=json").read().decode('UTF-8')
return json.loads(result)['ip']
def guess_country(ip, *positionals, **kwargs):
# python-pygeoip
# geoip-database
result = None
GEOIP_DB = '/usr/share/GeoIP/GeoIP.dat'
if os.path.isfile(GEOIP_DB):
try:
import pygeoip
except:
## TODO: Do a best-effort-guess based off the hostname given off the IP instead, if GoeIP doesn't exist.
return result
gi = pygeoip.GeoIP(GEOIP_DB)
result = gi.country_code_by_addr(ip)
else:
log(f'Missing GeoIP database: {GEOIP_DB}', origin='guess_country', level=LOG_LEVELS.ERROR)
return result
def setup_args_defaults(args, *positionals, **kwargs):
if not 'size' in args: args['size'] = '100%'
if not 'mirrors' in args: args['mirrors'] = True
if not 'start' in args: args['start'] = '513MiB'
if not 'pwfile' in args: args['pwfile'] = '/tmp/diskpw'
if not 'hostname' in args: args['hostname'] = 'Archinstall'
if not 'packages' in args: args['packages'] = '' # extra packages other than default
if not 'post' in args: args['post'] = 'reboot'
if not 'password' in args: args['password'] = '0000' # Default disk passord, can be <STDIN> or a fixed string
if not 'minimal' in args: args['minimal'] = False
if not 'unattended' in args: args['unattended'] = False
if not 'profile' in args: args['profile'] = None
if not 'skip-encrypt' in args: args['skip-encrypt'] = False
if not 'profiles-path' in args: args['profiles-path'] = profiles_path
if not 'rerun' in args: args['rerun'] = None
if not 'aur-keep' in args: args['aur-keep'] = False
if not 'aur-support' in args: args['aur-support'] = True # Support adds yay (https://github.com/Jguer/yay) in installation steps.
if not 'ignore-rerun' in args: args['ignore-rerun'] = False
if not 'phone-home' in args: args['phone-home'] = False
# Setup locales if we didn't get one.
if not 'country' in args:
country = None
if get_default_gateway_linux():
ip = get_external_ip()
country = guess_country(ip)
args['country'] = 'all' if not country else country
if not 'localtime' in args: args['localtime'] = 'Europe/Stockholm' if args['country'] == 'SE' else 'GMT+0' # TODO: Arbitrary for now
return args
def load_automatic_instructions(*positionals, **kwargs):
instructions = oDict()
if get_default_gateway_linux(*positionals, **kwargs):
locmac = get_local_MACs()
if not len(locmac):
print('[N] No network interfaces - No net deploy.')
else:
for mac in locmac:
instructions = get_instructions(mac, *positionals, **kwargs)
if 'args' in instructions:
## == Recursively fetch instructions if "include" is found under {args: ...}
while 'include' in instructions['args']:
includes = instructions['args']['include']
print('[!] Importing net-deploy target: {}'.format(includes))
del(instructions['args']['include'])
if type(includes) in (dict, list):
for include in includes:
instructions = merge_dicts(instructions, get_instructions(include, *positionals, **kwargs), before=True)
else:
instructions = merge_dicts(instructions, get_instructions(includes, *positionals, **kwargs), before=True)
## Update arguments if we found any
for key, val in instructions['args'].items():
args[key] = val
if 'user_args' in kwargs:
for key, val in kwargs['user_args'].items():
args[key] = val
else:
print('[N] No gateway - No net deploy')
return instructions
def cache_diskpw_on_disk():
if not os.path.isfile(args['pwfile']):
#PIN = '0000'
with open(args['pwfile'], 'w') as pw:
pw.write(args['password'])
def refresh_partition_list(drive, *positionals, **kwargs):
drive = args[drive]
if not 'partitions' in args:
args['partitions'] = oDict()
for index, part_name in enumerate(sorted(get_partitions(drive, *positionals, **kwargs).keys())):
args['partitions'][str(index+1)] = part_name
return True
def mkfs_fat32(drive, partition, *positionals, **kwargs):
drive = args[drive]
partition = args['partitions'][partition]
o = b''.join(sys_command(f'/usr/bin/mkfs.vfat -F32 {drive}{partition}'))
if (b'mkfs.fat' not in o and b'mkfs.vfat' not in o) or b'command not found' in o:
return None
return True
def is_luksdev_mounted(*positionals, **kwargs):
o = b''.join(sys_command('/usr/bin/file /dev/mapper/luksdev', hide_from_log=True)) # /dev/dm-0
if b'cannot open' in o:
return False
return True
def mount_luktsdev(drive, partition, keyfile, *positionals, **kwargs):
drive = args[drive]
partition = args['partitions'][partition]
keyfile = args[keyfile]
if not is_luksdev_mounted():
o = b''.join(sys_command(f'/usr/bin/cryptsetup open {drive}{partition} luksdev --key-file {keyfile} --type luks2'.format(**args)))
return is_luksdev_mounted()
def encrypt_partition(drive, partition, keyfile='/tmp/diskpw', *positionals, **kwargs):
drive = args[drive]
partition = args['partitions'][partition]
keyfile = args[keyfile]
o = b''.join(sys_command(f'/usr/bin/cryptsetup -q -v --type luks2 --pbkdf argon2i --hash sha512 --key-size 512 --iter-time 10000 --key-file {keyfile} --use-urandom luksFormat {drive}{partition}'))
if not b'Command successful.' in o:
return False
return True
def mkfs_btrfs(drive='/dev/mapper/luksdev', *positionals, **kwargs):
o = b''.join(sys_command(f'/usr/bin/mkfs.btrfs -f {drive}'))
if not b'UUID' in o:
return False
return True
def mount_luksdev(where='/dev/mapper/luksdev', to='/mnt', *positionals, **kwargs):
check_mounted = simple_command('/usr/bin/mount | /usr/bin/grep /mnt', *positionals, **kwargs).decode('UTF-8').strip()# /dev/dm-0
if len(check_mounted):
return False
o = b''.join(sys_command('/usr/bin/mount /dev/mapper/luksdev /mnt', *positionals, **kwargs))
return True
def mount_part(drive, partition, mountpoint='/mnt', *positionals, **kwargs):
os.makedirs(mountpoint, exist_ok=True)
#o = b''.join(sys_command('/usr/bin/mount | /usr/bin/grep /mnt/boot', *positionals, **kwargs)) # /dev/dm-0
check_mounted = simple_command(f'/usr/bin/mount | /usr/bin/grep {mountpoint}', *positionals, **kwargs).decode('UTF-8').strip()
if len(check_mounted):
return False
o = b''.join(sys_command(f'/usr/bin/mount {drive}{partition} {mountpoint}', *positionals, **kwargs))
return True
def mount_boot(drive, partition, mountpoint='/mnt/boot', *positionals, **kwargs):
os.makedirs('/mnt/boot', exist_ok=True)
#o = b''.join(sys_command('/usr/bin/mount | /usr/bin/grep /mnt/boot', *positionals, **kwargs)) # /dev/dm-0
check_mounted = simple_command('/usr/bin/mount | /usr/bin/grep /mnt/boot', *positionals, **kwargs).decode('UTF-8').strip()
if len(check_mounted):
return False
o = b''.join(sys_command(f'/usr/bin/mount {drive}{partition} {mountpoint}', *positionals, **kwargs))
return True
def mount_mountpoints(drive, bootpartition, mountpoint='/mnt', *positionals, **kwargs):
drive = args[drive]
if args['skip-encrypt']:
mount_part(drive, args['partitions']["2"], mountpoint, *positionals, **kwargs)
else:
mount_luksdev(*positionals, **kwargs)
mount_boot(drive, args['partitions'][bootpartition], mountpoint=f'{mountpoint}/boot', *positionals, **kwargs)
return True
def re_rank_mirrors(top=10, *positionals, **kwargs):
if sys_command(('/usr/bin/rankmirrors -n {top} /etc/pacman.d/mirrorlist > /etc/pacman.d/mirrorlist')).exit_code == 0:
return True
return False
def filter_mirrors_by_country_list(countries, top=None, *positionals, **kwargs):
## TODO: replace wget with urllib.request (no point in calling syscommand)
country_list = []
for country in countries.split(','):
country_list.append(f'country={country}')
o = b''.join(sys_command((f"/usr/bin/wget 'https://www.archlinux.org/mirrorlist/?{'&'.join(country_list)}&protocol=https&ip_version=4&ip_version=6&use_mirror_status=on' -O /root/mirrorlist")))
o = b''.join(sys_command(("/usr/bin/sed -i 's/#Server/Server/' /root/mirrorlist")))
o = b''.join(sys_command(("/usr/bin/mv /root/mirrorlist /etc/pacman.d/")))
if top:
re_rank_mirrors(top, *positionals, **kwargs) or not os.path.isfile('/etc/pacman.d/mirrorlist')
return True
def strap_in_base(*positionals, **kwargs):
if args['aur-support']:
args['packages'] += ' git'
if sys_command('/usr/bin/pacman -Syy', *positionals, **kwargs).exit_code == 0:
x = sys_command('/usr/bin/pacstrap /mnt base base-devel linux linux-firmware btrfs-progs efibootmgr nano wpa_supplicant dialog {packages}'.format(**args), *positionals, **kwargs)
if x.exit_code == 0:
return True
return False
def configure_base_system(*positionals, **kwargs):
## TODO: Replace a lot of these syscalls with just python native operations.
o = b''.join(sys_command('/usr/bin/genfstab -pU /mnt >> /mnt/etc/fstab'))
with open('/mnt/etc/fstab', 'a') as fstab:
fstab.write('\ntmpfs /tmp tmpfs defaults,noatime,mode=1777 0 0\n') # Redundant \n at the start? who knoes?
o = b''.join(sys_command('/usr/bin/arch-chroot /mnt rm -f /etc/localtime'))
o = b''.join(sys_command('/usr/bin/arch-chroot /mnt ln -s /usr/share/zoneinfo/{localtime} /etc/localtime'.format(**args)))
o = b''.join(sys_command('/usr/bin/arch-chroot /mnt hwclock --hctosys --localtime'))
#o = sys_command('arch-chroot /mnt echo "{hostname}" > /etc/hostname'.format(**args))
#o = sys_command("arch-chroot /mnt sed -i 's/#\(en_US\.UTF-8\)/\1/' /etc/locale.gen")
o = b''.join(sys_command("/usr/bin/arch-chroot /mnt sh -c \"echo '{hostname}' > /etc/hostname\"".format(**args)))
o = b''.join(sys_command("/usr/bin/arch-chroot /mnt sh -c \"echo 'en_US.UTF-8 UTF-8' > /etc/locale.gen\""))
o = b''.join(sys_command("/usr/bin/arch-chroot /mnt sh -c \"echo 'LANG=en_US.UTF-8' > /etc/locale.conf\""))
o = b''.join(sys_command('/usr/bin/arch-chroot /mnt locale-gen'))
o = b''.join(sys_command('/usr/bin/arch-chroot /mnt chmod 700 /root'))
with open('/mnt/etc/mkinitcpio.conf', 'w') as mkinit:
## TODO: Don't replace it, in case some update in the future actually adds something.
mkinit.write('MODULES=(btrfs)\n')
mkinit.write('BINARIES=(/usr/bin/btrfs)\n')
mkinit.write('FILES=()\n')
mkinit.write('HOOKS=(base udev autodetect modconf block encrypt filesystems keyboard fsck)\n')
o = b''.join(sys_command('/usr/bin/arch-chroot /mnt mkinitcpio -p linux'))
return True
def setup_bootloader(*positionals, **kwargs):
o = b''.join(sys_command('/usr/bin/arch-chroot /mnt bootctl --no-variables --path=/boot install'))
with open('/mnt/boot/loader/loader.conf', 'w') as loader:
loader.write('default arch\n')
loader.write('timeout 5\n')
## For some reason, blkid and /dev/disk/by-uuid are not getting along well.
## And blkid is wrong in terms of LUKS.
#UUID = sys_command('blkid -s PARTUUID -o value {drive}{partition_2}'.format(**args)).decode('UTF-8').strip()
with open('/mnt/boot/loader/entries/arch.conf', 'w') as entry:
entry.write('title Arch Linux\n')
entry.write('linux /vmlinuz-linux\n')
entry.write('initrd /initramfs-linux.img\n')
if args['skip-encrypt']:
## NOTE: We could use /dev/disk/by-partuuid but blkid does the same and a lot cleaner
UUID = simple_command(f"blkid -s PARTUUID -o value /dev/{os.path.basename(args['drive'])}{args['partitions']['2']}").decode('UTF-8').strip()
entry.write('options root=PARTUUID={UUID} rw intel_pstate=no_hwp\n'.format(UUID=UUID))
else:
UUID = simple_command(f"ls -l /dev/disk/by-uuid/ | grep {os.path.basename(args['drive'])}{args['partitions']['2']} | awk '{{print $9}}'").decode('UTF-8').strip()
entry.write('options cryptdevice=UUID={UUID}:luksdev root=/dev/mapper/luksdev rw intel_pstate=no_hwp\n'.format(UUID=UUID))
return True
def add_AUR_support(*positionals, **kwargs):
o = b''.join(sys_command('/usr/bin/arch-chroot /mnt sh -c "useradd -m -G wheel aibuilder"'))
o = b''.join(sys_command("/usr/bin/sed -i 's/# %wheel ALL=(ALL) NO/%wheel ALL=(ALL) NO/' /mnt/etc/sudoers"))
o = b''.join(sys_command('/usr/bin/arch-chroot /mnt sh -c "su - aibuilder -c \\"(cd /home/aibuilder; git clone https://aur.archlinux.org/yay.git)\\""'))
o = b''.join(sys_command('/usr/bin/arch-chroot /mnt sh -c "chown -R aibuilder.aibuilder /home/aibuilder/yay"'))
o = b''.join(sys_command('/usr/bin/arch-chroot /mnt sh -c "su - aibuilder -c \\"(cd /home/aibuilder/yay; makepkg -si --noconfirm)\\" >/dev/null"'))
## Do not remove aibuilder just yet, can be used later for aur packages.
#o = b''.join(sys_command('/usr/bin/sed -i \'s/%wheel ALL=(ALL) NO/# %wheel ALL=(ALL) NO/\' /mnt/etc/sudoers'))
#o = b''.join(sys_command('/usr/bin/arch-chroot /mnt sh -c "userdel aibuilder"'))
#o = b''.join(sys_command('/usr/bin/arch-chroot /mnt sh -c "rm -rf /home/aibuilder"'))
return True
def run_post_install_steps(*positionals, **kwargs):
conf = {}
if 'post' in instructions:
conf = instructions['post']
elif not 'args' in instructions and len(instructions):
conf = instructions
if 'git-branch' in conf:
update_git(conf['git-branch'])
del(conf['git-branch'])
rerun = args['ignore-rerun']
for title in conf:
if args['rerun'] and args['rerun'] != title and not rerun:
continue
else:
rerun = True
print('[N] Network Deploy: {}'.format(title))
if type(conf[title]) == str:
print('[N] Loading {} configuration'.format(conf[title]))
conf[title] = get_application_instructions(conf[title])
for command in conf[title]:
raw_command = command
opts = conf[title][command] if type(conf[title][command]) in (dict, oDict) else {}
if len(opts):
if 'pass-args' in opts or 'format' in opts:
command = command.format(**args)
## FIXME: Instead of deleting the two options
## in order to mute command output further down,
## check for a 'debug' flag per command and delete these two
if 'pass-args' in opts:
del(opts['pass-args'])
elif 'format' in opts:
del(opts['format'])
elif ('debug' in opts and opts['debug']) or ('debug' in conf and conf['debug']):
print('[-] Options: {}'.format(opts))
if 'pass-args' in opts and opts['pass-args']:
command = command.format(**args)
if 'runas' in opts and f'su - {opts["runas"]} -c' not in command:
command = command.replace('"', '\\"')
command = f'su - {opts["runas"]} -c "{command}"'
#print('[N] Command: {} ({})'.format(command, opts))
## https://superuser.com/questions/1242978/start-systemd-nspawn-and-execute-commands-inside
## !IMPORTANT
##
## arch-chroot mounts /run into the chroot environment, this breaks name resolves for some reason.
## Either skipping mounting /run and using traditional chroot is an option, but using
## `systemd-nspawn -D /mnt --machine temporary` might be a more flexible solution in case of file structure changes.
if 'no-chroot' in opts and opts['no-chroot']:
o = simple_command(command, opts)
elif 'chroot' in opts and opts['chroot']:
## Run in a manually set up version of arch-chroot (arch-chroot will break namespaces).
## This is a bit risky in case the file systems changes over the years, but we'll probably be safe adding this as an option.
## **> Prefer if possible to use 'no-chroot' instead which "live boots" the OS and runs the command.
o = simple_command("mount /dev/mapper/luksdev /mnt")
o = simple_command("cd /mnt; cp /etc/resolv.conf etc")
o = simple_command("cd /mnt; mount -t proc /proc proc")
o = simple_command("cd /mnt; mount --make-rslave --rbind /sys sys")
o = simple_command("cd /mnt; mount --make-rslave --rbind /dev dev")
o = simple_command('chroot /mnt /bin/bash -c "{c}"'.format(c=command), opts=opts)
o = simple_command("cd /mnt; umount -R dev")
o = simple_command("cd /mnt; umount -R sys")
o = simple_command("cd /mnt; umount -R proc")
else:
if 'boot' in opts and opts['boot']:
## So, if we're going to boot this maddafakker up, we'll need to
## be able to login. The quickest way is to just add automatic login.. so lessgo!
## Turns out.. that didn't work exactly as planned..
##
# if not os.path.isdir('/mnt/etc/systemd/system/console-getty.service.d/'):
# os.makedirs('/mnt/etc/systemd/system/console-getty.service.d/')
# with open('/mnt/etc/systemd/system/console-getty.service.d/override.conf', 'w') as fh:
# fh.write('[Service]\n')
# fh.write('ExecStart=\n')
# fh.write('ExecStart=-/usr/bin/agetty --autologin root -s %I 115200,38400,9600 vt102\n')
## So we'll add a bunch of triggers instead and let the sys_command manually react to them.
## "<hostname> login" followed by "Passwodd" in case it's been set in a previous step.. usually this shouldn't be nessecary
## since we set the password as the last step. And then the command itself which will be executed by looking for:
## [root@<hostname> ~]#
defaults = {
'login:' : 'root\n',
#'Password:' : args['password']+'\n',
'[root@{args["hostname"]} ~]#' : command+'\n',
}
if not 'events' in opts: opts['events'] = {}
events = {**defaults, **opts['events']}
del(opts['events'])
o = b''.join(sys_command('/usr/bin/systemd-nspawn -D /mnt -b --machine temporary', events=events, **opts))
## Not needed anymore: And cleanup after out selves.. Don't want to leave any residue..
# os.remove('/mnt/etc/systemd/system/console-getty.service.d/override.conf')
else:
o = b''.join(sys_command('/usr/bin/systemd-nspawn -D /mnt --machine temporary {c}'.format(c=command), **opts))
if type(conf[title][raw_command]) == bytes and len(conf[title][raw_command]) and not conf[title][raw_command] in o:
print('[W] Post install command failed: {}'.format(o.decode('UTF-8')))
#print(o)
if __name__ == '__main__':
## Setup some defaults
# (in case no command-line parameters or netdeploy-params were given)
args = setup_args_defaults(args)
user_args = {}
positionals = []
for arg in sys.argv[1:]:
if '--' == arg[:2]:
if '=' in arg:
key, val = [x.strip() for x in arg[2:].split('=')]
else:
key, val = arg[2:], True
args[key] = val
user_args[key] = val
else:
positionals.append(arg)
update_git() # Breaks and restarts the script if an update was found.
update_drive_list()
## == If we got networking,
# Try fetching instructions for this box unless a specific profile was given, and execute them.
if args['profile'] is None and not args['minimal']:
instructions = load_automatic_instructions(user_args=user_args)
elif args['profile'] and not args['minimal']:
instructions = get_instructions(args['profile'])
if len(instructions) <= 0:
print('[E] No instructions by the name of {} was found.'.format(args['profile']))
print(' Installation won\'t continue until a valid profile is given.')
print(' (this is because --profile was given and a --default is not given)')
exit(1)
first = True
while not args['minimal'] and not args['profile'] and len(instructions) <= 0:
profile = input('What template do you want to install: ')
instructions = get_instructions(profile)
if first and len(instructions) <= 0:
print('[E] No instructions by the name of {} was found.'.format(profile))
print(' Installation won\'t continue until a valid profile is given.')
print(' (this is because --default is not instructed and no --profile given)')
first = False
# TODO: Might not need to return anything here, passed by reference?
instructions = merge_in_includes(instructions, user_args=user_args)
cleanup_args()
## If no drive was found in args, select one.
if not 'drive' in args:
if len(harddrives):
drives = sorted(list(harddrives.keys()))
if len(drives) > 1 and 'force' not in args and not 'unattended' in args and ('minimal' in args and 'first-drive' not in args):
for index, drive in enumerate(drives):
print(f"{index}: {drive} ({harddrives[drive]['size'], harddrives[drive]['fstype'], harddrives[drive]['label']})")
drive = input('Select one of the above disks (by number): ')
if not drive.isdigit():
raise KeyError("Multiple disks found, --drive=/dev/X not specified (or --force/--first-drive)")
drives = [drives[int(drive)]] # Make sure only the selected drive is in the list of options
args['drive'] = drives[0] # First drive found
else:
args['drive'] = None
if args['drive'] and args['drive'][0] != '/':
## Remap the selected UUID to the device to be formatted.
drive = get_drive_from_uuid(args['drive'])
if not drive:
print(f'[N] Could not map UUID "{args["drive"]}" to a device. Trying to match via PARTUUID instead!')
drive = get_drive_from_part_uuid(args['drive'])
if not drive:
print(f'[E] Could not map UUID "{args["drive"]}" to a device. Aborting!')
exit(1)
args['drive'] = drive
print(json.dumps(args, indent=4))
if args['minimal'] and not 'force' in args and not 'unattended' in args:
if(input('Are these settings OK? (No return beyond this point) N/y: ').lower() != 'y'):
exit(1)
cache_diskpw_on_disk()
#else:
# ## TODO: Convert to `rb` instead.
# # We shouldn't discriminate \xfu from being a passwd phrase.
# with open(args['pwfile'], 'r') as pw:
# PIN = pw.read().strip()
print()
if not args['skip-encrypt']:
print('[!] Disk & root PASSWORD is: {}'.format(args['password']))
else:
print('[!] root PASSWORD is: {}'.format(args['password']))
print()
if not args['rerun'] or args['ignore-rerun']:
for i in range(5, 0, -1):
print(f'Formatting {args["drive"]} in {i}...')
time.sleep(1)
close_disks()
print(f'[N] Setting up {args["drive"]}.')
if not format_disk('drive', start='start', end='size', debug=True):
print(f'[E] Coult not format drive {args["drive"]}')
exit(1)
refresh_partition_list('drive')
print(f'[N] Partitions: {len(args["partitions"])} (Boot: {list(args["partitions"].keys())[0]})')
if len(args['partitions']) <= 0:
print(f'[E] No partitions were created on {args["drive"]}', o)
exit(1)
if not args['rerun'] or args['ignore-rerun']:
if not mkfs_fat32('drive', '1'):
print(f'[E] Could not setup {args["drive"]}{args["partitions"]["1"]}')
exit(1)
if not args['skip-encrypt']:
# "--cipher sha512" breaks the shit.
# TODO: --use-random instead of --use-urandom
print(f'[N] Adding encryption to {args["drive"]}{args["partitions"]["2"]}.')
if not encrypt_partition('drive', '2', 'pwfile'):
print('[E] Failed to setup disk encryption.', o)
exit(1)
if not args['skip-encrypt']:
if not mount_luktsdev('drive', '2', 'pwfile'):
print('[E] Could not open encrypted device.', o)
exit(1)
if not args['rerun'] or args['ignore-rerun']:
print(f'[N] Creating btrfs filesystem inside {args["drive"]}{args["partitions"]["2"]}')
on_part = '/dev/mapper/luksdev'
if args['skip-encrypt']:
on_part = f'{args["drive"]}{args["partitions"]["2"]}'
if not mkfs_btrfs(on_part):
print('[E] Could not setup btrfs filesystem.')
exit(1)
mount_mountpoints('drive', '1')
if 'mirrors' in args and args['mirrors'] and 'country' in args and get_default_gateway_linux():
print('[N] Reordering mirrors.')
filter_mirrors_by_country_list(args['country'])
pre_conf = {}
if 'pre' in instructions:
pre_conf = instructions['pre']
elif 'prerequisits' in instructions:
pre_conf = instructions['prerequisits']
if 'git-branch' in pre_conf:
update_git(pre_conf['git-branch'])
del(pre_conf['git-branch'])
## Prerequisit steps needs to NOT be executed in arch-chroot.
## Mainly because there's no root structure to chroot into.
## But partly because some configurations need to be done against the live CD.
## (For instance, modifying mirrors are done on LiveCD and replicated intwards)
for title in pre_conf:
print('[N] Network prerequisit step: {}'.format(title))
if args['rerun'] and args['rerun'] != title and not rerun:
continue
else:
rerun = True
for command in pre_conf[title]:
raw_command = command
opts = pre_conf[title][raw_command] if type(pre_conf[title][raw_command]) in (dict, oDict) else {}
if len(opts):
if 'pass-args' in opts or 'format' in opts:
command = command.format(**args)
## FIXME: Instead of deleting the two options
## in order to mute command output further down,
## check for a 'debug' flag per command and delete these two
if 'pass-args' in opts:
del(opts['pass-args'])
elif 'format' in opts:
del(opts['format'])
elif 'debug' in opts and opts['debug']:
print('[N] Complete command-string: '.format(command))
else:
print('[-] Options: {}'.format(opts))
#print('[N] Command: {} ({})'.format(raw_command, opts))
o = b''.join(sys_command('{c}'.format(c=command), opts))
if type(conf[title][raw_command]) == bytes and len(conf[title][raw_command]) and not conf[title][raw_command] in b''.join(o):
print('[W] Prerequisit step failed: {}'.format(b''.join(o).decode('UTF-8')))
#print(o)
if not args['rerun'] or rerun:
print('[N] Straping in packages.')
base_return_code = strap_in_base() # TODO: check return here? we return based off pacstrap exit code.. Never tired it tho.
else:
base_return_code = None
if not os.path.isdir('/mnt/etc'): # TODO: This might not be the most long term stable thing to rely on...
print('[E] Failed to strap in packages', o)
exit(1)
if not args['rerun'] or rerun:
print('[N] Configuring base system.')
configure_base_system()
## WORKAROUND: https://github.com/systemd/systemd/issues/13603#issuecomment-552246188
print('[N] Setting up bootloader.')
setup_bootloader()
if args['aur-support']:
print('[N] AUR support demanded, building "yay" before running POST steps.')
add_AUR_support()
print('[N] AUR support added. use "yay -Syy --noconfirm <package>" to deploy in POST.')
print('[N] Running post installation steps.')
run_post_install_steps()
time.sleep(2)
if args['aur-support'] and not args['aur-keep']:
o = b''.join(sys_command('/usr/bin/sed -i \'s/%wheel ALL=(ALL) NO/# %wheel ALL=(ALL) NO/\' /mnt/etc/sudoers'))
o = b''.join(sys_command('/usr/bin/arch-chroot /mnt sh -c "userdel aibuilder"'))
o = b''.join(sys_command('/usr/bin/arch-chroot /mnt sh -c "rm -rf /home/aibuilder"'))
if args['phone-home']:
phone_home(args['phone-home'])
## == Passwords
# o = sys_command('arch-chroot /mnt usermod --password {} root'.format(args['password']))
# o = sys_command("arch-chroot /mnt sh -c 'echo {pin} | passwd --stdin root'".format(pin='"{pin}"'.format(**args, pin=args['password'])), echo=True)
o = simple_command("/usr/bin/arch-chroot /mnt sh -c \"echo 'root:{pin}' | chpasswd\"".format(**args, pin=args['password']))
if 'user' in args:
o = ('/usr/bin/arch-chroot /mnt useradd -m -G wheel {user}'.format(**args))
o = ("/usr/bin/arch-chroot /mnt sh -c \"echo '{user}:{pin}' | chpasswd\"".format(**args, pin=args['password']))
if args['post'] == 'reboot':
o = simple_command('/usr/bin/umount -R /mnt')
o = simple_command('/usr/bin/reboot now')
else:
print('Done. "umount -R /mnt; reboot" when you\'re done tinkering.')