Added support for .py profiles. Added a simple 'desktop.py' for now that is just a mock to make sure it's working.
This commit is contained in:
parent
a695607326
commit
3ed8db5ef0
|
|
@ -1,4 +1,5 @@
|
||||||
import os, urllib.request, urllib.parse, ssl, json
|
import os, urllib.request, urllib.parse, ssl, json
|
||||||
|
import importlib.util, sys
|
||||||
from collections import OrderedDict
|
from collections import OrderedDict
|
||||||
from .general import multisplit, sys_command, log
|
from .general import multisplit, sys_command, log
|
||||||
from .exceptions import *
|
from .exceptions import *
|
||||||
|
|
@ -13,49 +14,19 @@ def grab_url_data(path):
|
||||||
response = urllib.request.urlopen(safe_path, context=ssl_context)
|
response = urllib.request.urlopen(safe_path, context=ssl_context)
|
||||||
return response.read()
|
return response.read()
|
||||||
|
|
||||||
def get_application_instructions(target):
|
class Imported():
|
||||||
instructions = {}
|
def __init__(self, spec, imported):
|
||||||
|
self.spec = spec
|
||||||
|
self.imported = imported
|
||||||
|
|
||||||
for path in ['./', './profiles', '/etc/archinstall', '/etc/archinstall/profiles']:
|
def __enter__(self, *args, **kwargs):
|
||||||
if os.path.isfile(f'{path}/applications/{target}.json'):
|
self.spec.loader.exec_module(self.imported)
|
||||||
return os.path.abspath(f'{path}/{self.name}.json')
|
return self
|
||||||
|
|
||||||
try:
|
def __exit__(self, *args, **kwargs):
|
||||||
if (cache := grab_url_data(f'{UPSTREAM_URL}/{self.name}.json')):
|
# TODO: https://stackoverflow.com/questions/28157929/how-to-safely-handle-an-exception-inside-a-context-manager
|
||||||
self._cache = cache
|
if len(args) >= 2 and args[1]:
|
||||||
return f'{UPSTREAM_URL}/{self.name}.json'
|
raise args[1]
|
||||||
except urllib.error.HTTPError:
|
|
||||||
pass
|
|
||||||
try:
|
|
||||||
if (cache := grab_url_data(f'{UPSTREAM_URL}/applications/{self.name}.json')):
|
|
||||||
self._cache = cache
|
|
||||||
return f'{UPSTREAM_URL}/applications/{self.name}.json'
|
|
||||||
except urllib.error.HTTPError:
|
|
||||||
pass
|
|
||||||
|
|
||||||
try:
|
|
||||||
instructions = grab_url_data(f'{UPSTREAM_URL}/applications/{target}.json').decode('UTF-8')
|
|
||||||
log('[N] Found application instructions for: {}'.format(target))
|
|
||||||
except urllib.error.HTTPError:
|
|
||||||
log('[N] Could not find remote instructions. yrying local instructions under ./profiles/applications')
|
|
||||||
local_path = './profiles/applications' if os.path.isfile('./archinstall.py') else './archinstall/profiles/applications' # Dangerous assumption
|
|
||||||
if os.path.isfile(f'{local_path}/{target}.json'):
|
|
||||||
with open(f'{local_path}/{target}.json', 'r') as fh:
|
|
||||||
instructions = fh.read()
|
|
||||||
|
|
||||||
log('[N] Found local application instructions for: {}'.format(target))
|
|
||||||
else:
|
|
||||||
log('[N] No instructions found for: {}'.format(target))
|
|
||||||
return instructions
|
|
||||||
|
|
||||||
try:
|
|
||||||
instructions = json.loads(instructions, object_pairs_hook=oDict)
|
|
||||||
except:
|
|
||||||
log('[E] JSON syntax error in {}'.format('{}/applications/{}.json'.format(args['profiles-path'], target)))
|
|
||||||
traceback.print_exc()
|
|
||||||
exit(1)
|
|
||||||
|
|
||||||
return instructions
|
|
||||||
|
|
||||||
class Profile():
|
class Profile():
|
||||||
def __init__(self, installer, name, args={}):
|
def __init__(self, installer, name, args={}):
|
||||||
|
|
@ -69,10 +40,18 @@ class Profile():
|
||||||
|
|
||||||
@property
|
@property
|
||||||
def path(self, *args, **kwargs):
|
def path(self, *args, **kwargs):
|
||||||
for path in ['./', './profiles', '/etc/archinstall', '/etc/archinstall/profiles']:
|
for path in ['./profiles', '/etc/archinstall', '/etc/archinstall/profiles', os.path.abspath(f'{os.path.dirname(__file__)}/../profiles')]: # Step out of /lib
|
||||||
if os.path.isfile(f'{path}/{self.name}.json'):
|
if os.path.isfile(f'{path}/{self.name}.json'):
|
||||||
return os.path.abspath(f'{path}/{self.name}.json')
|
return os.path.abspath(f'{path}/{self.name}.json')
|
||||||
|
elif os.path.isfile(f'{path}/{self.name}.py'):
|
||||||
|
return os.path.abspath(f'{path}/{self.name}.py')
|
||||||
|
|
||||||
|
try:
|
||||||
|
if (cache := grab_url_data(f'{UPSTREAM_URL}/{self.name}.py')):
|
||||||
|
self._cache = cache
|
||||||
|
return f'{UPSTREAM_URL}/{self.name}.py'
|
||||||
|
except urllib.error.HTTPError:
|
||||||
|
pass
|
||||||
try:
|
try:
|
||||||
if (cache := grab_url_data(f'{UPSTREAM_URL}/{self.name}.json')):
|
if (cache := grab_url_data(f'{UPSTREAM_URL}/{self.name}.json')):
|
||||||
self._cache = cache
|
self._cache = cache
|
||||||
|
|
@ -88,9 +67,17 @@ class Profile():
|
||||||
|
|
||||||
return None
|
return None
|
||||||
|
|
||||||
|
def py_exec_mock(self):
|
||||||
|
spec.loader.exec_module(imported)
|
||||||
|
|
||||||
def load_instructions(self):
|
def load_instructions(self):
|
||||||
if (absolute_path := self.path):
|
if (absolute_path := self.path):
|
||||||
if absolute_path[:4] == 'http':
|
if os.path.splitext(absolute_path)[1] == '.py':
|
||||||
|
spec = importlib.util.spec_from_file_location(absolute_path, absolute_path)
|
||||||
|
imported = importlib.util.module_from_spec(spec)
|
||||||
|
sys.modules[os.path.basename(absolute_path)] = imported
|
||||||
|
return Imported(spec, imported)
|
||||||
|
elif absolute_path[:4] == 'http':
|
||||||
return json.loads(self._cache)
|
return json.loads(self._cache)
|
||||||
|
|
||||||
with open(absolute_path, 'r') as fh:
|
with open(absolute_path, 'r') as fh:
|
||||||
|
|
@ -100,77 +87,81 @@ class Profile():
|
||||||
|
|
||||||
def install(self):
|
def install(self):
|
||||||
instructions = self.load_instructions()
|
instructions = self.load_instructions()
|
||||||
if 'args' in instructions:
|
if type(instructions) == Imported:
|
||||||
self.args = instructions['args']
|
with instructions as runtime:
|
||||||
if 'post' in instructions:
|
log(f'Profile {self.name} finished successfully.')
|
||||||
instructions = instructions['post']
|
else:
|
||||||
|
if 'args' in instructions:
|
||||||
|
self.args = instructions['args']
|
||||||
|
if 'post' in instructions:
|
||||||
|
instructions = instructions['post']
|
||||||
|
|
||||||
for title in instructions:
|
for title in instructions:
|
||||||
log(f'Running post installation step {title}')
|
log(f'Running post installation step {title}')
|
||||||
|
|
||||||
log('[N] Network Deploy: {}'.format(title))
|
log('[N] Network Deploy: {}'.format(title))
|
||||||
if type(instructions[title]) == str:
|
if type(instructions[title]) == str:
|
||||||
log('[N] Loading {} configuration'.format(instructions[title]))
|
log('[N] Loading {} configuration'.format(instructions[title]))
|
||||||
log(f'Loading {instructions[title]} configuration')
|
log(f'Loading {instructions[title]} configuration')
|
||||||
instructions[title] = Application(self.installer, instructions[title], args=self.args)
|
instructions[title] = Application(self.installer, instructions[title], args=self.args)
|
||||||
instructions[title].install()
|
instructions[title].install()
|
||||||
else:
|
else:
|
||||||
for command in instructions[title]:
|
for command in instructions[title]:
|
||||||
raw_command = command
|
raw_command = command
|
||||||
opts = instructions[title][command] if type(instructions[title][command]) in (dict, OrderedDict) else {}
|
opts = instructions[title][command] if type(instructions[title][command]) in (dict, OrderedDict) else {}
|
||||||
if len(opts):
|
if len(opts):
|
||||||
if 'pass-args' in opts or 'format' in opts:
|
if 'pass-args' in opts or 'format' in opts:
|
||||||
|
command = command.format(**self.args)
|
||||||
|
## FIXME: Instead of deleting the two options
|
||||||
|
## in order to mute command output further down,
|
||||||
|
## check for a 'debug' flag per command and delete these two
|
||||||
|
if 'pass-args' in opts:
|
||||||
|
del(opts['pass-args'])
|
||||||
|
elif 'format' in opts:
|
||||||
|
del(opts['format'])
|
||||||
|
|
||||||
|
if 'pass-args' in opts and opts['pass-args']:
|
||||||
command = command.format(**self.args)
|
command = command.format(**self.args)
|
||||||
## FIXME: Instead of deleting the two options
|
|
||||||
## in order to mute command output further down,
|
|
||||||
## check for a 'debug' flag per command and delete these two
|
|
||||||
if 'pass-args' in opts:
|
|
||||||
del(opts['pass-args'])
|
|
||||||
elif 'format' in opts:
|
|
||||||
del(opts['format'])
|
|
||||||
|
|
||||||
if 'pass-args' in opts and opts['pass-args']:
|
if 'runas' in opts and f'su - {opts["runas"]} -c' not in command:
|
||||||
command = command.format(**self.args)
|
command = command.replace('"', '\\"')
|
||||||
|
command = f'su - {opts["runas"]} -c "{command}"'
|
||||||
|
|
||||||
if 'runas' in opts and f'su - {opts["runas"]} -c' not in command:
|
if 'no-chroot' in opts and opts['no-chroot']:
|
||||||
command = command.replace('"', '\\"')
|
log(f'Executing {command} as simple command from live-cd.')
|
||||||
command = f'su - {opts["runas"]} -c "{command}"'
|
o = sys_command(command, opts)
|
||||||
|
elif 'chroot' in opts and opts['chroot']:
|
||||||
if 'no-chroot' in opts and opts['no-chroot']:
|
log(f'Executing {command} in chroot.')
|
||||||
log(f'Executing {command} as simple command from live-cd.')
|
## Run in a manually set up version of arch-chroot (arch-chroot will break namespaces).
|
||||||
o = sys_command(command, opts)
|
## This is a bit risky in case the file systems changes over the years, but we'll probably be safe adding this as an option.
|
||||||
elif 'chroot' in opts and opts['chroot']:
|
## **> Prefer if possible to use 'no-chroot' instead which "live boots" the OS and runs the command.
|
||||||
log(f'Executing {command} in chroot.')
|
o = sys_command(f"mount /dev/mapper/luksdev {self.installer.mountpoint}")
|
||||||
## Run in a manually set up version of arch-chroot (arch-chroot will break namespaces).
|
o = sys_command(f"cd {self.installer.mountpoint}; cp /etc/resolv.conf etc")
|
||||||
## This is a bit risky in case the file systems changes over the years, but we'll probably be safe adding this as an option.
|
o = sys_command(f"cd {self.installer.mountpoint}; mount -t proc /proc proc")
|
||||||
## **> Prefer if possible to use 'no-chroot' instead which "live boots" the OS and runs the command.
|
o = sys_command(f"cd {self.installer.mountpoint}; mount --make-rslave --rbind /sys sys")
|
||||||
o = sys_command(f"mount /dev/mapper/luksdev {self.installer.mountpoint}")
|
o = sys_command(f"cd {self.installer.mountpoint}; mount --make-rslave --rbind /dev dev")
|
||||||
o = sys_command(f"cd {self.installer.mountpoint}; cp /etc/resolv.conf etc")
|
o = sys_command(f'chroot {self.installer.mountpoint} /bin/bash -c "{command}"')
|
||||||
o = sys_command(f"cd {self.installer.mountpoint}; mount -t proc /proc proc")
|
o = sys_command(f"cd {self.installer.mountpoint}; umount -R dev")
|
||||||
o = sys_command(f"cd {self.installer.mountpoint}; mount --make-rslave --rbind /sys sys")
|
o = sys_command(f"cd {self.installer.mountpoint}; umount -R sys")
|
||||||
o = sys_command(f"cd {self.installer.mountpoint}; mount --make-rslave --rbind /dev dev")
|
o = sys_command(f"cd {self.installer.mountpoint}; umount -R proc")
|
||||||
o = sys_command(f'chroot {self.installer.mountpoint} /bin/bash -c "{command}"')
|
|
||||||
o = sys_command(f"cd {self.installer.mountpoint}; umount -R dev")
|
|
||||||
o = sys_command(f"cd {self.installer.mountpoint}; umount -R sys")
|
|
||||||
o = sys_command(f"cd {self.installer.mountpoint}; umount -R proc")
|
|
||||||
else:
|
|
||||||
if 'boot' in opts and opts['boot']:
|
|
||||||
log(f'Executing {command} in boot mode.')
|
|
||||||
defaults = {
|
|
||||||
'login:' : 'root\n',
|
|
||||||
'Password:' : self.args['password']+'\n',
|
|
||||||
f'[root@{self.args["hostname"]} ~]#' : command+'\n',
|
|
||||||
}
|
|
||||||
if not 'events' in opts: opts['events'] = {}
|
|
||||||
events = {**defaults, **opts['events']}
|
|
||||||
del(opts['events'])
|
|
||||||
o = b''.join(sys_command(f'/usr/bin/systemd-nspawn -D {self.installer.mountpoint} -b --machine temporary', events=events))
|
|
||||||
else:
|
else:
|
||||||
log(f'Executing {command} in with systemd-nspawn without boot.')
|
if 'boot' in opts and opts['boot']:
|
||||||
o = b''.join(sys_command(f'/usr/bin/systemd-nspawn -D {self.installer.mountpoint} --machine temporary {command}'))
|
log(f'Executing {command} in boot mode.')
|
||||||
if type(instructions[title][raw_command]) == bytes and len(instructions['post'][title][raw_command]) and not instructions['post'][title][raw_command] in o:
|
defaults = {
|
||||||
log(f'{command} failed: {o.decode("UTF-8")}')
|
'login:' : 'root\n',
|
||||||
log('[W] Post install command failed: {}'.format(o.decode('UTF-8')))
|
'Password:' : self.args['password']+'\n',
|
||||||
|
f'[root@{self.args["hostname"]} ~]#' : command+'\n',
|
||||||
|
}
|
||||||
|
if not 'events' in opts: opts['events'] = {}
|
||||||
|
events = {**defaults, **opts['events']}
|
||||||
|
del(opts['events'])
|
||||||
|
o = b''.join(sys_command(f'/usr/bin/systemd-nspawn -D {self.installer.mountpoint} -b --machine temporary', events=events))
|
||||||
|
else:
|
||||||
|
log(f'Executing {command} in with systemd-nspawn without boot.')
|
||||||
|
o = b''.join(sys_command(f'/usr/bin/systemd-nspawn -D {self.installer.mountpoint} --machine temporary {command}'))
|
||||||
|
if type(instructions[title][raw_command]) == bytes and len(instructions['post'][title][raw_command]) and not instructions['post'][title][raw_command] in o:
|
||||||
|
log(f'{command} failed: {o.decode("UTF-8")}')
|
||||||
|
log('[W] Post install command failed: {}'.format(o.decode('UTF-8')))
|
||||||
|
|
||||||
class Application(Profile):
|
class Application(Profile):
|
||||||
@property
|
@property
|
||||||
|
|
|
||||||
|
|
@ -0,0 +1,2 @@
|
||||||
|
import archinstall
|
||||||
|
print('Installing desktop using:', archinstall)
|
||||||
Loading…
Reference in New Issue