216 lines
7.9 KiB
Python
216 lines
7.9 KiB
Python
import os, urllib.request, urllib.parse, ssl, json, re
|
|
import importlib.util, sys, glob, hashlib
|
|
from collections import OrderedDict
|
|
from .general import multisplit, sys_command, log
|
|
from .exceptions import *
|
|
from .networking import *
|
|
from .output import log, LOG_LEVELS
|
|
from .storage import storage
|
|
|
|
def grab_url_data(path):
|
|
safe_path = path[:path.find(':')+1]+''.join([item if item in ('/', '?', '=', '&') else urllib.parse.quote(item) for item in multisplit(path[path.find(':')+1:], ('/', '?', '=', '&'))])
|
|
ssl_context = ssl.create_default_context()
|
|
ssl_context.check_hostname = False
|
|
ssl_context.verify_mode=ssl.CERT_NONE
|
|
response = urllib.request.urlopen(safe_path, context=ssl_context)
|
|
return response.read()
|
|
|
|
def list_profiles(filter_irrelevant_macs=True):
|
|
# TODO: Grab from github page as well, not just local static files
|
|
if filter_irrelevant_macs:
|
|
local_macs = list_interfaces()
|
|
|
|
cache = {}
|
|
# Grab all local profiles found in PROFILE_PATH
|
|
for PATH_ITEM in storage['PROFILE_PATH']:
|
|
for root, folders, files in os.walk(os.path.abspath(os.path.expanduser(PATH_ITEM))):
|
|
for file in files:
|
|
if os.path.splitext(file)[1] == '.py':
|
|
tailored = False
|
|
if len(mac := re.findall('(([a-zA-z0-9]{2}[-:]){5}([a-zA-z0-9]{2}))', file)):
|
|
if filter_irrelevant_macs and mac[0][0].lower() not in local_macs:
|
|
continue
|
|
tailored = True
|
|
|
|
description = ''
|
|
with open(os.path.join(root, file), 'r') as fh:
|
|
first_line = fh.readline()
|
|
if first_line[0] == '#':
|
|
description = first_line[1:].strip()
|
|
|
|
cache[file[:-3]] = {'path' : os.path.join(root, file), 'description' : description, 'tailored' : tailored}
|
|
break
|
|
|
|
# Grab profiles from upstream URL
|
|
if storage['PROFILE_DB']:
|
|
profiles_url = os.path.join(storage["UPSTREAM_URL"], storage['PROFILE_DB'])
|
|
try:
|
|
profile_list = json.loads(grab_url_data(profiles_url))
|
|
except urllib.error.HTTPError as err:
|
|
print(f'Error: Listing profiles on URL "{profiles_url}" resulted in:', err)
|
|
return cache
|
|
except:
|
|
print(f'Error: Could not decode "{profiles_url}" result as JSON:', err)
|
|
return cache
|
|
|
|
for profile in profile_list:
|
|
if os.path.splitext(profile)[1] == '.py':
|
|
tailored = False
|
|
if len(mac := re.findall('(([a-zA-z0-9]{2}[-:]){5}([a-zA-z0-9]{2}))', profile)):
|
|
if filter_irrelevant_macs and mac[0][0] not in local_macs:
|
|
continue
|
|
tailored = True
|
|
|
|
cache[profile[:-3]] = {'path' : os.path.join(storage["UPSTREAM_URL"], profile), 'description' : profile_list[profile], 'tailored' : tailored}
|
|
|
|
return cache
|
|
|
|
def find_installation_script(profile):
|
|
parsed_url = urllib.parse.urlparse(profile)
|
|
if not parsed_url.scheme:
|
|
examples = list_profiles()
|
|
if f"{profile}.py" in examples:
|
|
with open(examples[f"{profile}.py"]) as file:
|
|
return Script(file.read(), filename=os.path.basename(profile)+".py")
|
|
try:
|
|
with open(profile, 'r') as file:
|
|
return Script(file.read(), filename=os.path.basename(profile))
|
|
except FileNotFoundError:
|
|
# We need to traverse backwards one step with /../ because
|
|
# We're living in src/lib/ and we're not executing from src/ anymore.
|
|
cwd = os.path.abspath(f'{os.path.dirname(__file__)}/../')
|
|
examples = f"{cwd}/examples"
|
|
raise ProfileNotFound(f"File {profile} does not exist in {examples}")
|
|
elif parsed_url.scheme in ('https', 'http'):
|
|
return Script(urllib.request.urlopen(profile).read().decode('utf-8'), filename=os.path.basename(profile))
|
|
else:
|
|
raise ProfileNotFound(f"Cannot handle scheme {parsed_url.scheme}")
|
|
|
|
|
|
class Imported():
|
|
def __init__(self, spec, imported):
|
|
self.spec = spec
|
|
self.imported = imported
|
|
|
|
def __enter__(self, *args, **kwargs):
|
|
self.spec.loader.exec_module(self.imported)
|
|
return self.imported
|
|
|
|
def __exit__(self, *args, **kwargs):
|
|
# TODO: https://stackoverflow.com/questions/28157929/how-to-safely-handle-an-exception-inside-a-context-manager
|
|
if len(args) >= 2 and args[1]:
|
|
raise args[1]
|
|
|
|
|
|
class Script():
|
|
def __init__(self, content, filename=''):
|
|
self.content = content
|
|
self.filename = filename
|
|
|
|
@property
|
|
def path(self):
|
|
temp_file_path = f"/tmp/{self.filename}_{hashlib.md5(os.urandom(12)).hexdigest()}.py"
|
|
|
|
with open(temp_file_path, "w") as temp_file:
|
|
temp_file.write(self.content)
|
|
|
|
return temp_file_path
|
|
|
|
def execute(self):
|
|
spec = importlib.util.spec_from_file_location(
|
|
"tempscript",
|
|
self.path
|
|
)
|
|
imported_path = importlib.util.module_from_spec(spec)
|
|
spec.loader.exec_module(imported_path)
|
|
sys.modules["tempscript"] = imported_path
|
|
|
|
|
|
class Profile():
|
|
def __init__(self, installer, path, args={}):
|
|
self._path = path
|
|
self.installer = installer
|
|
self._cache = None
|
|
self.args = args
|
|
|
|
def __dump__(self, *args, **kwargs):
|
|
return {'path' : self._path}
|
|
|
|
def __repr__(self, *args, **kwargs):
|
|
return f'Profile({self._path} <"{self.path}">)'
|
|
|
|
@property
|
|
def path(self, *args, **kwargs):
|
|
if os.path.isfile(f'{self._path}'):
|
|
return os.path.abspath(f'{self._path}')
|
|
|
|
for path in ['./profiles', '/etc/archinstall', '/etc/archinstall/profiles', os.path.abspath(f'{os.path.dirname(__file__)}/../profiles')]: # Step out of /lib
|
|
if os.path.isfile(f'{path}/{self._path}.py'):
|
|
return os.path.abspath(f'{path}/{self._path}.py')
|
|
|
|
try:
|
|
if (cache := grab_url_data(f'{storage["UPSTREAM_URL"]}/{self._path}.py')):
|
|
self._cache = cache
|
|
return f'{storage["UPSTREAM_URL"]}/{self._path}.py'
|
|
except urllib.error.HTTPError:
|
|
pass
|
|
|
|
return None
|
|
|
|
# def py_exec_mock(self):
|
|
# spec.loader.exec_module(imported)
|
|
|
|
def load_instructions(self, namespace=None):
|
|
if (absolute_path := self.path):
|
|
if os.path.splitext(absolute_path)[1] == '.py':
|
|
if not namespace:
|
|
namespace = os.path.splitext(os.path.basename(absolute_path))[0]
|
|
spec = importlib.util.spec_from_file_location(namespace, absolute_path)
|
|
imported = importlib.util.module_from_spec(spec)
|
|
sys.modules[namespace] = imported
|
|
return Imported(spec, imported)
|
|
else:
|
|
raise ProfileError(f'Extension {os.path.splitext(absolute_path)[1]} is not a supported profile model. Only .py is supported.')
|
|
|
|
raise ProfileError(f'No such profile ({self._path}) was found either locally or in {storage["UPSTREAM_URL"]}')
|
|
|
|
def install(self):
|
|
# To avoid profiles importing the wrong 'archinstall',
|
|
# we need to ensure that this current archinstall is in sys.path
|
|
archinstall_path = os.path.abspath(f'{os.path.dirname(__file__)}/../../')
|
|
if archinstall_path not in sys.path:
|
|
sys.path.insert(0, archinstall_path)
|
|
|
|
instructions = self.load_instructions()
|
|
if type(instructions) == Imported:
|
|
# There's no easy way to give the imported profile the installer instance unless we require the profile-programmer to create a certain function that must be the same for all..
|
|
# Which is a bit inconvenient so we'll make a a truly global installer for now, in the future archinstall main __init__.py should setup the 'installation' variable..
|
|
# but to avoid circular imports and other traps, this works for now.
|
|
# TODO: Remove
|
|
__builtins__['installation'] = self.installer
|
|
with instructions as runtime:
|
|
log(f'{self} finished successfully.', bg='black', fg='green', level=LOG_LEVELS.Info, file=storage.get('logfile', None))
|
|
|
|
return True
|
|
|
|
class Application(Profile):
|
|
def __repr__(self, *args, **kwargs):
|
|
return f'Application({self._path} <"{self.path}">)'
|
|
|
|
@property
|
|
def path(self, *args, **kwargs):
|
|
if os.path.isfile(f'{self._path}'):
|
|
return os.path.abspath(f'{self._path}')
|
|
|
|
for path in ['./applications', './profiles/applications', '/etc/archinstall/applications', '/etc/archinstall/profiles/applications', os.path.abspath(f'{os.path.dirname(__file__)}/../profiles/applications')]:
|
|
if os.path.isfile(f'{path}/{self._path}.py'):
|
|
return os.path.abspath(f'{path}/{self._path}.py')
|
|
|
|
try:
|
|
if (cache := grab_url_data(f'{storage["UPSTREAM_URL"]}/applications/{self._path}.py')):
|
|
self._cache = cache
|
|
return f'{storage["UPSTREAM_URL"]}/applications/{self._path}.py'
|
|
except urllib.error.HTTPError:
|
|
pass
|
|
|
|
return None |