Reworked final preparations for working with profiles and installing them.

This commit is contained in:
Anton Hvornum 2020-07-06 22:20:34 +02:00
parent db528d8676
commit f7d3022cc8
8 changed files with 303 additions and 79 deletions

View File

@ -3,3 +3,5 @@ from .lib.disk import *
from .lib.user_interaction import *
from .lib.exceptions import *
from .lib.installer import *
from .lib.profiles import *
from .lib.luks import *

View File

@ -78,12 +78,12 @@ class BlockDevice():
return self.info[key]
class Partition():
def __init__(self, path, part_id=None, size=-1):
def __init__(self, path, part_id=None, size=-1, filesystem=None, mountpoint=None):
if not part_id: part_id = os.path.basename(path)
self.path = path
self.part_id = part_id
self.mountpoint = None
self.filesystem = None # TODO: Autodetect if we're reusing a partition
self.mountpoint = mountpoint
self.filesystem = filesystem # TODO: Autodetect if we're reusing a partition
self.size = size # TODO: Refresh?
def __repr__(self, *args, **kwargs):
@ -106,63 +106,19 @@ class Partition():
return True
def mount(self, target, fs=None, options=''):
print(f'Mounting {self} to {target}')
if not fs:
if not self.filesystem: raise DiskError(f'Need to format (or define) the filesystem on {self} before mounting.')
fs = self.filesystem
## libc has some issues with loop devices, defaulting back to sys calls
# ret = libc.mount(self.path.encode(), target.encode(), fs.encode(), 0, options.encode())
# if ret < 0:
# errno = ctypes.get_errno()
# raise OSError(errno, f"Error mounting {self.path} ({fs}) on {target} with options '{options}': {os.strerror(errno)}")
if sys_command(f'/usr/bin/mount {self.path} {target}').exit_code == 0:
self.mountpoint = target
return True
class luks2():
def __init__(self, filesystem):
self.filesystem = filesystem
def __enter__(self):
return self
def __exit__(self, *args, **kwargs):
# TODO: https://stackoverflow.com/questions/28157929/how-to-safely-handle-an-exception-inside-a-context-manager
if len(args) >= 2 and args[1]:
raise args[1]
print(args)
return True
def encrypt(self, partition, password, key_size=512, hash_type='sha512', iter_time=10000, key_file=None):
print(f'Encrypting {partition}')
if not key_file: key_file = f'/tmp/{os.path.basename(self.filesystem.blockdevice.device)}.disk_pw' #TODO: Make disk-pw-file randomly unique?
if type(password) != bytes: password = bytes(password, 'UTF-8')
with open(key_file, 'wb') as fh:
fh.write(password)
o = b''.join(sys_command(f'/usr/bin/cryptsetup -q -v --type luks2 --pbkdf argon2i --hash {hash_type} --key-size {key_size} --iter-time {iter_time} --key-file {os.path.abspath(key_file)} --use-urandom luksFormat {partition.path}'))
if not b'Command successful.' in o:
raise DiskError(f'Could not encrypt volume "{partition.path}": {o}')
return key_file
def unlock(self, partition, mountpoint, key_file):
"""
Mounts a lukts2 compatible partition to a certain mountpoint.
Keyfile must be specified as there's no way to interact with the pw-prompt atm.
:param mountpoint: The name without absolute path, for instance "luksdev" will point to /dev/mapper/luksdev
:type mountpoint: str
"""
if '/' in mountpoint: os.path.basename(mountpoint) # TODO: Raise exception instead?
sys_command(f'/usr/bin/cryptsetup open {partition.path} {mountpoint} --key-file {os.path.abspath(key_file)} --type luks2')
if os.path.islink(f'/dev/mapper/{mountpoint}'):
return Partition(f'/dev/mapper/{mountpoint}')
def close(self, mountpoint):
sys_command(f'cryptsetup close /dev/mapper/{mountpoint}')
return os.path.islink(f'/dev/mapper/{mountpoint}') is False
if not self.mountpoint:
print(f'Mounting {self} to {target}')
if not fs:
if not self.filesystem: raise DiskError(f'Need to format (or define) the filesystem on {self} before mounting.')
fs = self.filesystem
## libc has some issues with loop devices, defaulting back to sys calls
# ret = libc.mount(self.path.encode(), target.encode(), fs.encode(), 0, options.encode())
# if ret < 0:
# errno = ctypes.get_errno()
# raise OSError(errno, f"Error mounting {self.path} ({fs}) on {target} with options '{options}': {os.strerror(errno)}")
if sys_command(f'/usr/bin/mount {self.path} {target}').exit_code == 0:
self.mountpoint = target
return True
class Filesystem():
# TODO:
@ -185,7 +141,6 @@ class Filesystem():
# TODO: https://stackoverflow.com/questions/28157929/how-to-safely-handle-an-exception-inside-a-context-manager
if len(args) >= 2 and args[1]:
raise args[1]
print(args)
b''.join(sys_command(f'sync'))
return True
@ -211,7 +166,7 @@ class Filesystem():
if prep_mode == 'luks2':
self.add_partition('primary', start='513MiB', end='100%')
else:
self.add_partition('primary', start='1MiB', end='513MiB', format='ext4')
self.add_partition('primary', start='513MiB', end='513MiB', format='ext4')
def add_partition(self, type, start, end, format=None):
print(f'Adding partition to {self.blockdevice}')

View File

@ -1,4 +1,6 @@
class RequirementError(BaseException):
pass
class DiskError(BaseException):
pass
class ProfileError(BaseException):
pass

View File

@ -9,6 +9,20 @@ def log(*args, **kwargs):
def gen_uid(entropy_length=256):
return hashlib.sha512(os.urandom(entropy_length)).hexdigest()
def multisplit(s, splitters):
s = [s,]
for key in splitters:
ns = []
for obj in s:
x = obj.split(key)
for index, part in enumerate(x):
if len(part):
ns.append(part)
if index < len(x)-1:
ns.append(key)
s = ns
return s
class sys_command():#Thread):
"""
Stolen from archinstall_gui
@ -20,7 +34,10 @@ class sys_command():#Thread):
if kwargs['emulate']:
log(f"Starting command '{cmd}' in emulation mode.")
self.raw_cmd = cmd
self.cmd = shlex.split(cmd)
try:
self.cmd = shlex.split(cmd)
except Exception as e:
raise ValueError(f'Incorrect string to split: {cmd}\n{e}')
self.args = args
self.kwargs = kwargs
if not 'worker' in self.kwargs: self.kwargs['worker'] = None

View File

@ -4,6 +4,7 @@ from .exceptions import *
from .disk import *
from .general import *
from .user_interaction import *
from .profiles import Profile
class Installer():
def __init__(self, partition, *, profile=None, mountpoint='/mnt', hostname='ArchInstalled'):
@ -22,7 +23,7 @@ class Installer():
# TODO: https://stackoverflow.com/questions/28157929/how-to-safely-handle-an-exception-inside-a-context-manager
if len(args) >= 2 and args[1]:
raise args[1]
print(args)
print('Installation completed without any errors.')
return True
def pacstrap(self, *packages):
@ -75,8 +76,10 @@ class Installer():
self.pacstrap(*packages)
def install_profile(self, profile):
print(f'[STUB] Installing network profile {profile}')
pass
profile = Profile(self, profile)
print(f'Installing network profile {profile}')
profile.install()
def user_create(self, user :str, password=None, groups=[]):
print(f'Creating user {user}')

53
archinstall/lib/luks.py Normal file
View File

@ -0,0 +1,53 @@
import os
from .exceptions import *
from .general import sys_command
from .disk import Partition
class luks2():
def __init__(self, partition, mountpoint, password, *args, **kwargs):
self.password = password
self.partition = partition
self.mountpoint = mountpoint
self.args = args
self.kwargs = kwargs
def __enter__(self):
key_file = self.encrypt(self.partition, self.password, *self.args, **self.kwargs)
return self.unlock(self.partition, self.mountpoint, key_file)
def __exit__(self, *args, **kwargs):
# TODO: https://stackoverflow.com/questions/28157929/how-to-safely-handle-an-exception-inside-a-context-manager
if len(args) >= 2 and args[1]:
raise args[1]
return True
def encrypt(self, partition, password, key_size=512, hash_type='sha512', iter_time=10000, key_file=None):
print(f'Encrypting {partition}')
if not key_file: key_file = f'/tmp/{os.path.basename(self.partition.path)}.disk_pw' #TODO: Make disk-pw-file randomly unique?
if type(password) != bytes: password = bytes(password, 'UTF-8')
with open(key_file, 'wb') as fh:
fh.write(password)
o = b''.join(sys_command(f'/usr/bin/cryptsetup -q -v --type luks2 --pbkdf argon2i --hash {hash_type} --key-size {key_size} --iter-time {iter_time} --key-file {os.path.abspath(key_file)} --use-urandom luksFormat {partition.path}'))
if not b'Command successful.' in o:
raise DiskError(f'Could not encrypt volume "{partition.path}": {o}')
return key_file
def unlock(self, partition, mountpoint, key_file):
"""
Mounts a lukts2 compatible partition to a certain mountpoint.
Keyfile must be specified as there's no way to interact with the pw-prompt atm.
:param mountpoint: The name without absolute path, for instance "luksdev" will point to /dev/mapper/luksdev
:type mountpoint: str
"""
if '/' in mountpoint: os.path.basename(mountpoint) # TODO: Raise exception instead?
sys_command(f'/usr/bin/cryptsetup open {partition.path} {mountpoint} --key-file {os.path.abspath(key_file)} --type luks2')
if os.path.islink(f'/dev/mapper/{mountpoint}'):
return Partition(f'/dev/mapper/{mountpoint}')
def close(self, mountpoint):
sys_command(f'cryptsetup close /dev/mapper/{mountpoint}')
return os.path.islink(f'/dev/mapper/{mountpoint}') is False

195
archinstall/lib/profiles.py Normal file
View File

@ -0,0 +1,195 @@
import os, urllib.request, urllib.parse, ssl, json
from collections import OrderedDict
from .general import multisplit, sys_command, log
from .exceptions import *
UPSTREAM_URL = 'https://raw.githubusercontent.com/Torxed/archinstall/annotations/deployments'
def grab_url_data(path):
safe_path = path[:path.find(':')+1]+''.join([item if item in ('/', '?', '=', '&') else urllib.parse.quote(item) for item in multisplit(path[path.find(':')+1:], ('/', '?', '=', '&'))])
ssl_context = ssl.create_default_context()
ssl_context.check_hostname = False
ssl_context.verify_mode=ssl.CERT_NONE
response = urllib.request.urlopen(safe_path, context=ssl_context)
return response.read()
def get_application_instructions(target):
instructions = {}
for path in ['./', './profiles', '/etc/archinstall', '/etc/archinstall/profiles']:
if os.path.isfile(f'{path}/applications/{target}.json'):
return os.path.abspath(f'{path}/{self.name}.json')
try:
if (cache := grab_url_data(f'{UPSTREAM_URL}/{self.name}.json')):
self._cache = cache
return f'{UPSTREAM_URL}/{self.name}.json'
except urllib.error.HTTPError:
pass
try:
if (cache := grab_url_data(f'{UPSTREAM_URL}/applications/{self.name}.json')):
self._cache = cache
return f'{UPSTREAM_URL}/applications/{self.name}.json'
except urllib.error.HTTPError:
pass
try:
instructions = grab_url_data(f'{UPSTREAM_URL}/applications/{target}.json').decode('UTF-8')
print('[N] Found application instructions for: {}'.format(target))
except urllib.error.HTTPError:
print('[N] Could not find remote instructions. yrying local instructions under ./deployments/applications')
local_path = './deployments/applications' if os.path.isfile('./archinstall.py') else './archinstall/deployments/applications' # Dangerous assumption
if os.path.isfile(f'{local_path}/{target}.json'):
with open(f'{local_path}/{target}.json', 'r') as fh:
instructions = fh.read()
print('[N] Found local application instructions for: {}'.format(target))
else:
print('[N] No instructions found for: {}'.format(target))
return instructions
try:
instructions = json.loads(instructions, object_pairs_hook=oDict)
except:
print('[E] JSON syntax error in {}'.format('{}/applications/{}.json'.format(args['profiles-path'], target)))
traceback.print_exc()
exit(1)
return instructions
class Profile():
def __init__(self, installer, name, args={}):
self.name = name
self.installer = installer
self._cache = None
self.args = args
def __repr__(self, *args, **kwargs):
return f'Profile({self.name} <"{self.path}">)'
@property
def path(self, *args, **kwargs):
for path in ['./', './profiles', '/etc/archinstall', '/etc/archinstall/profiles']:
if os.path.isfile(f'{path}/{self.name}.json'):
return os.path.abspath(f'{path}/{self.name}.json')
try:
if (cache := grab_url_data(f'{UPSTREAM_URL}/{self.name}.json')):
self._cache = cache
return f'{UPSTREAM_URL}/{self.name}.json'
except urllib.error.HTTPError:
pass
try:
if (cache := grab_url_data(f'{UPSTREAM_URL}/{self.name}.json')):
self._cache = cache
return f'{UPSTREAM_URL}/{self.name}.json'
except urllib.error.HTTPError:
pass
return None
def load_instructions(self):
if (absolute_path := self.path):
if absolute_path[:4] == 'http':
return json.loads(self._cache)
with open(absolute_path, 'r') as fh:
return json.load(fh)
raise ProfileError(f'No such profile ({self.name}) was found either locally or in {UPSTREAM_URL}')
def install(self):
instructions = self.load_instructions()
if 'args' in instructions:
self.args = instructions['args']
if 'post' in instructions:
instructions = instructions['post']
for title in instructions:
log(f'Running post installation step {title}')
print('[N] Network Deploy: {}'.format(title))
if type(instructions[title]) == str:
print('[N] Loading {} configuration'.format(instructions[title]))
log(f'Loading {instructions[title]} configuration')
instructions[title] = Application(self.installer, instructions[title], args=self.args)
instructions[title].install()
else:
for command in instructions[title]:
raw_command = command
opts = instructions[title][command] if type(instructions[title][command]) in (dict, OrderedDict) else {}
if len(opts):
if 'pass-args' in opts or 'format' in opts:
command = command.format(**self.args)
## FIXME: Instead of deleting the two options
## in order to mute command output further down,
## check for a 'debug' flag per command and delete these two
if 'pass-args' in opts:
del(opts['pass-args'])
elif 'format' in opts:
del(opts['format'])
if 'pass-args' in opts and opts['pass-args']:
command = command.format(**self.args)
if 'runas' in opts and f'su - {opts["runas"]} -c' not in command:
command = command.replace('"', '\\"')
command = f'su - {opts["runas"]} -c "{command}"'
if 'no-chroot' in opts and opts['no-chroot']:
log(f'Executing {command} as simple command from live-cd.')
o = sys_command(command, opts)
elif 'chroot' in opts and opts['chroot']:
log(f'Executing {command} in chroot.')
## Run in a manually set up version of arch-chroot (arch-chroot will break namespaces).
## This is a bit risky in case the file systems changes over the years, but we'll probably be safe adding this as an option.
## **> Prefer if possible to use 'no-chroot' instead which "live boots" the OS and runs the command.
o = sys_command(f"mount /dev/mapper/luksdev {self.installer.mountpoint}")
o = sys_command(f"cd {self.installer.mountpoint}; cp /etc/resolv.conf etc")
o = sys_command(f"cd {self.installer.mountpoint}; mount -t proc /proc proc")
o = sys_command(f"cd {self.installer.mountpoint}; mount --make-rslave --rbind /sys sys")
o = sys_command(f"cd {self.installer.mountpoint}; mount --make-rslave --rbind /dev dev")
o = sys_command(f'chroot {self.installer.mountpoint} /bin/bash -c "{command}"')
o = sys_command(f"cd {self.installer.mountpoint}; umount -R dev")
o = sys_command(f"cd {self.installer.mountpoint}; umount -R sys")
o = sys_command(f"cd {self.installer.mountpoint}; umount -R proc")
else:
if 'boot' in opts and opts['boot']:
log(f'Executing {command} in boot mode.')
defaults = {
'login:' : 'root\n',
'Password:' : self.args['password']+'\n',
f'[root@{self.args["hostname"]} ~]#' : command+'\n',
}
if not 'events' in opts: opts['events'] = {}
events = {**defaults, **opts['events']}
del(opts['events'])
o = b''.join(sys_command(f'/usr/bin/systemd-nspawn -D {self.installer.mountpoint} -b --machine temporary', events=events))
else:
log(f'Executing {command} in with systemd-nspawn without boot.')
o = b''.join(sys_command(f'/usr/bin/systemd-nspawn -D {self.installer.mountpoint} --machine temporary {command}'))
if type(instructions[title][raw_command]) == bytes and len(instructions['post'][title][raw_command]) and not instructions['post'][title][raw_command] in o:
log(f'{command} failed: {o.decode("UTF-8")}')
print('[W] Post install command failed: {}'.format(o.decode('UTF-8')))
class Application(Profile):
@property
def path(self, *args, **kwargs):
for path in ['./applications', './profiles/applications', '/etc/archinstall/applications', '/etc/archinstall/profiles/applications']:
if os.path.isfile(f'{path}/{self.name}.json'):
return os.path.abspath(f'{path}/{self.name}.json')
try:
if (cache := grab_url_data(f'{UPSTREAM_URL}/{self.name}.json')):
self._cache = cache
return f'{UPSTREAM_URL}/{self.name}.json'
except urllib.error.HTTPError:
pass
try:
if (cache := grab_url_data(f'{UPSTREAM_URL}/applications/{self.name}.json')):
self._cache = cache
return f'{UPSTREAM_URL}/applications/{self.name}.json'
except urllib.error.HTTPError:
pass
return None

View File

@ -1,25 +1,22 @@
import archinstall, getpass
## dd if=/dev/zero of=test.img bs=1G count=4
## losetup -fP test.img
# Unmount and close previous runs
archinstall.sys_command(f'umount -R /mnt', surpress_errors=True)
archinstall.sys_command(f'cryptsetup close /dev/mapper/luksloop', surpress_errors=True)
#harddrive = archinstall.select_disk(archinstall.all_disks())
harddrive = archinstall.all_disks()['/dev/loop0']
disk_password = '1234' # getpass.getpass(prompt='Disk password (won\'t echo): ')
# Select a harddrive and a disk password
harddrive = archinstall.select_disk(archinstall.all_disks())
disk_password = getpass.getpass(prompt='Disk password (won\'t echo): ')
with archinstall.Filesystem(harddrive, archinstall.GPT) as fs:
# Use the entire disk instead of setting up partitions on your own
fs.use_entire_disk('luks2')
with archinstall.luks2(fs) as crypt:
if harddrive.partition[1].size == '512M':
raise OSError('Trying to encrypt the boot partition for petes sake..')
key_file = crypt.encrypt(harddrive.partition[1], password=disk_password, key_size=512, hash_type='sha512', iter_time=10000, key_file='./pwfile')
if harddrive.partition[1].size == '512M':
raise OSError('Trying to encrypt the boot partition for petes sake..')
harddrive.partition[0].format('fat32')
unlocked_device = crypt.unlock(harddrive.partition[1], 'luksloop', key_file)
harddrive.partition[0].format('fat32')
with archinstall.luks2(harddrive.partition[1], 'luksloop', disk_password) as unlocked_device:
unlocked_device.format('btrfs')
with archinstall.Installer(unlocked_device, hostname='testmachine') as installation:
@ -27,7 +24,7 @@ with archinstall.Filesystem(harddrive, archinstall.GPT) as fs:
installation.add_bootloader(harddrive.partition[0])
installation.add_additional_packages(['nano', 'wget', 'git'])
installation.install_profile('desktop')
installation.install_profile('workstation')
installation.user_create('anton', 'test')
installation.user_set_pw('root', 'toor')