Add custom mirror support (#1816)
Co-authored-by: Daniel Girtler <girtler.daniel@gmail.com>
This commit is contained in:
parent
d65359896a
commit
8a292a163e
|
|
@ -225,12 +225,8 @@ def load_config():
|
||||||
if profile_config := arguments.get('profile_config', None):
|
if profile_config := arguments.get('profile_config', None):
|
||||||
arguments['profile_config'] = profile.ProfileConfiguration.parse_arg(profile_config)
|
arguments['profile_config'] = profile.ProfileConfiguration.parse_arg(profile_config)
|
||||||
|
|
||||||
if arguments.get('mirror-region', None) is not None:
|
if mirror_config := arguments.get('mirror_config', None):
|
||||||
if type(arguments.get('mirror-region', None)) is dict:
|
arguments['mirror_config'] = mirrors.MirrorConfiguration.parse_args(mirror_config)
|
||||||
arguments['mirror-region'] = arguments.get('mirror-region', None)
|
|
||||||
else:
|
|
||||||
selected_region = arguments.get('mirror-region', None)
|
|
||||||
arguments['mirror-region'] = {selected_region: mirrors.list_mirrors()[selected_region]}
|
|
||||||
|
|
||||||
if arguments.get('servers', None) is not None:
|
if arguments.get('servers', None) is not None:
|
||||||
storage['_selected_servers'] = arguments.get('servers', None)
|
storage['_selected_servers'] = arguments.get('servers', None)
|
||||||
|
|
|
||||||
|
|
@ -5,6 +5,7 @@ from typing import Any, List, Optional, Union, Dict, TYPE_CHECKING
|
||||||
from . import disk
|
from . import disk
|
||||||
from .general import secret
|
from .general import secret
|
||||||
from .menu import Selector, AbstractMenu
|
from .menu import Selector, AbstractMenu
|
||||||
|
from .mirrors import MirrorConfiguration, MirrorMenu
|
||||||
from .models import NetworkConfiguration
|
from .models import NetworkConfiguration
|
||||||
from .models.bootloader import Bootloader
|
from .models.bootloader import Bootloader
|
||||||
from .models.users import User
|
from .models.users import User
|
||||||
|
|
@ -26,7 +27,6 @@ from .interactions import select_kernel
|
||||||
from .interactions import select_language
|
from .interactions import select_language
|
||||||
from .interactions import select_locale_enc
|
from .interactions import select_locale_enc
|
||||||
from .interactions import select_locale_lang
|
from .interactions import select_locale_lang
|
||||||
from .interactions import select_mirror_regions
|
|
||||||
from .interactions import ask_ntp
|
from .interactions import ask_ntp
|
||||||
from .interactions.disk_conf import select_disk_config
|
from .interactions.disk_conf import select_disk_config
|
||||||
|
|
||||||
|
|
@ -51,12 +51,13 @@ class GlobalMenu(AbstractMenu):
|
||||||
_('Keyboard layout'),
|
_('Keyboard layout'),
|
||||||
lambda preset: select_language(preset),
|
lambda preset: select_language(preset),
|
||||||
default='us')
|
default='us')
|
||||||
self._menu_options['mirror-region'] = \
|
self._menu_options['mirror_config'] = \
|
||||||
Selector(
|
Selector(
|
||||||
_('Mirror region'),
|
_('Mirrors'),
|
||||||
lambda preset: select_mirror_regions(preset),
|
lambda preset: self._mirror_configuration(preset),
|
||||||
display_func=lambda x: list(x.keys()) if x else '[]',
|
display_func=lambda x: str(_('Defined')) if x else '',
|
||||||
default={})
|
preview_func=self._prev_mirror_config
|
||||||
|
)
|
||||||
self._menu_options['sys-language'] = \
|
self._menu_options['sys-language'] = \
|
||||||
Selector(
|
Selector(
|
||||||
_('Locale language'),
|
_('Locale language'),
|
||||||
|
|
@ -354,3 +355,24 @@ class GlobalMenu(AbstractMenu):
|
||||||
def _create_user_account(self, defined_users: List[User]) -> List[User]:
|
def _create_user_account(self, defined_users: List[User]) -> List[User]:
|
||||||
users = ask_for_additional_users(defined_users=defined_users)
|
users = ask_for_additional_users(defined_users=defined_users)
|
||||||
return users
|
return users
|
||||||
|
|
||||||
|
def _mirror_configuration(self, preset: Optional[MirrorConfiguration] = None) -> Optional[MirrorConfiguration]:
|
||||||
|
data_store: Dict[str, Any] = {}
|
||||||
|
mirror_configuration = MirrorMenu(data_store, preset=preset).run()
|
||||||
|
return mirror_configuration
|
||||||
|
|
||||||
|
def _prev_mirror_config(self) -> Optional[str]:
|
||||||
|
selector = self._menu_options['mirror_config']
|
||||||
|
|
||||||
|
if selector.has_selection():
|
||||||
|
mirror_config: MirrorConfiguration = selector.current_selection # type: ignore
|
||||||
|
output = ''
|
||||||
|
if mirror_config.regions:
|
||||||
|
output += '{}: {}\n\n'.format(str(_('Mirror regions')), mirror_config.regions)
|
||||||
|
if mirror_config.custom_mirrors:
|
||||||
|
table = FormattedOutput.as_table(mirror_config.custom_mirrors)
|
||||||
|
output += '{}\n{}'.format(str(_('Custom mirrors')), table)
|
||||||
|
|
||||||
|
return output.strip()
|
||||||
|
|
||||||
|
return None
|
||||||
|
|
|
||||||
|
|
@ -6,7 +6,7 @@ import shutil
|
||||||
import subprocess
|
import subprocess
|
||||||
import time
|
import time
|
||||||
from pathlib import Path
|
from pathlib import Path
|
||||||
from typing import Any, List, Optional, TYPE_CHECKING, Union, Dict, Callable, Iterable
|
from typing import Any, List, Optional, TYPE_CHECKING, Union, Dict, Callable
|
||||||
|
|
||||||
from . import disk
|
from . import disk
|
||||||
from .exceptions import DiskError, ServiceException, RequirementError, HardwareIncompatibilityError, SysCallError
|
from .exceptions import DiskError, ServiceException, RequirementError, HardwareIncompatibilityError, SysCallError
|
||||||
|
|
@ -14,7 +14,7 @@ from .general import SysCommand
|
||||||
from .hardware import SysInfo
|
from .hardware import SysInfo
|
||||||
from .locale import verify_keyboard_layout, verify_x11_keyboard_layout
|
from .locale import verify_keyboard_layout, verify_x11_keyboard_layout
|
||||||
from .luks import Luks2
|
from .luks import Luks2
|
||||||
from .mirrors import use_mirrors
|
from .mirrors import use_mirrors, MirrorConfiguration, add_custom_mirrors
|
||||||
from .models.bootloader import Bootloader
|
from .models.bootloader import Bootloader
|
||||||
from .models.network_configuration import NetworkConfiguration
|
from .models.network_configuration import NetworkConfiguration
|
||||||
from .models.users import User
|
from .models.users import User
|
||||||
|
|
@ -383,14 +383,17 @@ class Installer:
|
||||||
|
|
||||||
raise RequirementError("Pacstrap failed. See /var/log/archinstall/install.log or above message for error details.")
|
raise RequirementError("Pacstrap failed. See /var/log/archinstall/install.log or above message for error details.")
|
||||||
|
|
||||||
def set_mirrors(self, mirrors: Dict[str, Iterable[str]]):
|
def set_mirrors(self, mirror_config: MirrorConfiguration):
|
||||||
for plugin in plugins.values():
|
for plugin in plugins.values():
|
||||||
if hasattr(plugin, 'on_mirrors'):
|
if hasattr(plugin, 'on_mirrors'):
|
||||||
if result := plugin.on_mirrors(mirrors):
|
if result := plugin.on_mirrors(mirror_config):
|
||||||
mirrors = result
|
mirror_config = result
|
||||||
|
|
||||||
destination = f'{self.target}/etc/pacman.d/mirrorlist'
|
destination = f'{self.target}/etc/pacman.d/mirrorlist'
|
||||||
use_mirrors(mirrors, destination=destination)
|
if mirror_config.mirror_regions:
|
||||||
|
use_mirrors(mirror_config.mirror_regions, destination)
|
||||||
|
if mirror_config.custom_mirrors:
|
||||||
|
add_custom_mirrors(mirror_config.custom_mirrors)
|
||||||
|
|
||||||
def genfstab(self, flags :str = '-pU'):
|
def genfstab(self, flags :str = '-pU'):
|
||||||
info(f"Updating {self.target}/etc/fstab")
|
info(f"Updating {self.target}/etc/fstab")
|
||||||
|
|
|
||||||
|
|
@ -11,7 +11,7 @@ from .disk_conf import (
|
||||||
|
|
||||||
from .general_conf import (
|
from .general_conf import (
|
||||||
ask_ntp, ask_hostname, ask_for_a_timezone, ask_for_audio_selection, select_language,
|
ask_ntp, ask_hostname, ask_for_a_timezone, ask_for_audio_selection, select_language,
|
||||||
select_mirror_regions, select_archinstall_language, ask_additional_packages_to_install,
|
select_archinstall_language, ask_additional_packages_to_install,
|
||||||
add_number_of_parrallel_downloads, select_additional_repositories
|
add_number_of_parrallel_downloads, select_additional_repositories
|
||||||
)
|
)
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -1,11 +1,10 @@
|
||||||
from __future__ import annotations
|
from __future__ import annotations
|
||||||
|
|
||||||
import pathlib
|
import pathlib
|
||||||
from typing import List, Any, Optional, Dict, TYPE_CHECKING
|
from typing import List, Any, Optional, TYPE_CHECKING
|
||||||
|
|
||||||
from ..locale import list_keyboard_languages, list_timezones
|
from ..locale import list_keyboard_languages, list_timezones
|
||||||
from ..menu import MenuSelectionType, Menu, TextInput
|
from ..menu import MenuSelectionType, Menu, TextInput
|
||||||
from ..mirrors import list_mirrors
|
|
||||||
from ..output import warn
|
from ..output import warn
|
||||||
from ..packages.packages import validate_package_list
|
from ..packages.packages import validate_package_list
|
||||||
from ..storage import storage
|
from ..storage import storage
|
||||||
|
|
@ -96,40 +95,6 @@ def select_language(preset: Optional[str] = None) -> Optional[str]:
|
||||||
return None
|
return None
|
||||||
|
|
||||||
|
|
||||||
def select_mirror_regions(preset_values: Dict[str, Any] = {}) -> Dict[str, Any]:
|
|
||||||
"""
|
|
||||||
Asks the user to select a mirror or region
|
|
||||||
Usually this is combined with :ref:`archinstall.list_mirrors`.
|
|
||||||
|
|
||||||
:return: The dictionary information about a mirror/region.
|
|
||||||
:rtype: dict
|
|
||||||
"""
|
|
||||||
if preset_values is None:
|
|
||||||
preselected = None
|
|
||||||
else:
|
|
||||||
preselected = list(preset_values.keys())
|
|
||||||
|
|
||||||
mirrors = list_mirrors()
|
|
||||||
|
|
||||||
choice = Menu(
|
|
||||||
_('Select one of the regions to download packages from'),
|
|
||||||
list(mirrors.keys()),
|
|
||||||
preset_values=preselected,
|
|
||||||
multi=True,
|
|
||||||
allow_reset=True
|
|
||||||
).run()
|
|
||||||
|
|
||||||
match choice.type_:
|
|
||||||
case MenuSelectionType.Reset:
|
|
||||||
return {}
|
|
||||||
case MenuSelectionType.Skip:
|
|
||||||
return preset_values
|
|
||||||
case MenuSelectionType.Selection:
|
|
||||||
return {selected: mirrors[selected] for selected in choice.multi_value}
|
|
||||||
|
|
||||||
return {}
|
|
||||||
|
|
||||||
|
|
||||||
def select_archinstall_language(languages: List[Language], preset: Language) -> Language:
|
def select_archinstall_language(languages: List[Language], preset: Language) -> Language:
|
||||||
# these are the displayed language names which can either be
|
# these are the displayed language names which can either be
|
||||||
# the english name of a language or, if present, the
|
# the english name of a language or, if present, the
|
||||||
|
|
|
||||||
|
|
@ -1,99 +1,279 @@
|
||||||
import pathlib
|
import pathlib
|
||||||
import urllib.error
|
from dataclasses import dataclass, field
|
||||||
import urllib.request
|
from enum import Enum
|
||||||
from typing import Union, Iterable, Dict, Any, List
|
from typing import Dict, Any, List, Optional, TYPE_CHECKING
|
||||||
from dataclasses import dataclass
|
|
||||||
|
|
||||||
from .general import SysCommand
|
from .menu import AbstractSubMenu, Selector, MenuSelectionType, Menu, ListManager, TextInput
|
||||||
from .output import info, warn
|
from .networking import fetch_data_from_url
|
||||||
from .exceptions import SysCallError
|
from .output import info, warn, FormattedOutput
|
||||||
from .storage import storage
|
from .storage import storage
|
||||||
|
|
||||||
|
if TYPE_CHECKING:
|
||||||
|
_: Any
|
||||||
|
|
||||||
|
|
||||||
|
class SignCheck(Enum):
|
||||||
|
Never = 'Never'
|
||||||
|
Optional = 'Optional'
|
||||||
|
Required = 'Required'
|
||||||
|
|
||||||
|
|
||||||
|
class SignOption(Enum):
|
||||||
|
TrustedOnly = 'TrustedOnly'
|
||||||
|
TrustAll = 'TrustAll'
|
||||||
|
|
||||||
|
|
||||||
@dataclass
|
@dataclass
|
||||||
class CustomMirror:
|
class CustomMirror:
|
||||||
url: str
|
|
||||||
signcheck: str
|
|
||||||
signoptions: str
|
|
||||||
name: str
|
name: str
|
||||||
|
url: str
|
||||||
|
sign_check: SignCheck
|
||||||
|
sign_option: SignOption
|
||||||
|
|
||||||
|
def as_json(self) -> Dict[str, str]:
|
||||||
|
return {
|
||||||
|
'Name': self.name,
|
||||||
|
'Url': self.url,
|
||||||
|
'Sign check': self.sign_check.value,
|
||||||
|
'Sign options': self.sign_option.value
|
||||||
|
}
|
||||||
|
|
||||||
|
def json(self) -> Dict[str, str]:
|
||||||
|
return {
|
||||||
|
'name': self.name,
|
||||||
|
'url': self.url,
|
||||||
|
'sign_check': self.sign_check.value,
|
||||||
|
'sign_option': self.sign_option.value
|
||||||
|
}
|
||||||
|
|
||||||
|
@classmethod
|
||||||
|
def parse_args(cls, args: List[Dict[str, str]]) -> List['CustomMirror']:
|
||||||
|
configs = []
|
||||||
|
for arg in args:
|
||||||
|
configs.append(
|
||||||
|
CustomMirror(
|
||||||
|
arg['name'],
|
||||||
|
arg['url'],
|
||||||
|
SignCheck(arg['sign_check']),
|
||||||
|
SignOption(arg['sign_option'])
|
||||||
|
)
|
||||||
|
)
|
||||||
|
|
||||||
|
return configs
|
||||||
|
|
||||||
|
|
||||||
def sort_mirrorlist(raw_data :bytes, sort_order: List[str] = ['https', 'http']) -> bytes:
|
@dataclass
|
||||||
|
class MirrorConfiguration:
|
||||||
|
mirror_regions: Dict[str, List[str]] = field(default_factory=dict)
|
||||||
|
custom_mirrors: List[CustomMirror] = field(default_factory=list)
|
||||||
|
|
||||||
|
@property
|
||||||
|
def regions(self) -> str:
|
||||||
|
return ', '.join(self.mirror_regions.keys())
|
||||||
|
|
||||||
|
def json(self) -> Dict[str, Any]:
|
||||||
|
return {
|
||||||
|
'mirror_regions': self.mirror_regions,
|
||||||
|
'custom_mirrors': [c.json() for c in self.custom_mirrors]
|
||||||
|
}
|
||||||
|
|
||||||
|
@classmethod
|
||||||
|
def parse_args(cls, args: Dict[str, Any]) -> 'MirrorConfiguration':
|
||||||
|
config = MirrorConfiguration()
|
||||||
|
|
||||||
|
if 'mirror_regions' in args:
|
||||||
|
config.mirror_regions = args['mirror_regions']
|
||||||
|
|
||||||
|
if 'custom_mirrors' in args:
|
||||||
|
config.custom_mirrors = CustomMirror.parse_args(args['custom_mirrors'])
|
||||||
|
|
||||||
|
return config
|
||||||
|
|
||||||
|
|
||||||
|
class CustomMirrorList(ListManager):
|
||||||
|
def __init__(self, prompt: str, custom_mirrors: List[CustomMirror]):
|
||||||
|
self._actions = [
|
||||||
|
str(_('Add a custom mirror')),
|
||||||
|
str(_('Change custom mirror')),
|
||||||
|
str(_('Delete custom mirror'))
|
||||||
|
]
|
||||||
|
super().__init__(prompt, custom_mirrors, [self._actions[0]], self._actions[1:])
|
||||||
|
|
||||||
|
def reformat(self, data: List[CustomMirror]) -> Dict[str, Any]:
|
||||||
|
table = FormattedOutput.as_table(data)
|
||||||
|
rows = table.split('\n')
|
||||||
|
|
||||||
|
# these are the header rows of the table and do not map to any User obviously
|
||||||
|
# we're adding 2 spaces as prefix because the menu selector '> ' will be put before
|
||||||
|
# the selectable rows so the header has to be aligned
|
||||||
|
display_data: Dict[str, Optional[CustomMirror]] = {f' {rows[0]}': None, f' {rows[1]}': None}
|
||||||
|
|
||||||
|
for row, user in zip(rows[2:], data):
|
||||||
|
row = row.replace('|', '\\|')
|
||||||
|
display_data[row] = user
|
||||||
|
|
||||||
|
return display_data
|
||||||
|
|
||||||
|
def selected_action_display(self, mirror: CustomMirror) -> str:
|
||||||
|
return mirror.name
|
||||||
|
|
||||||
|
def handle_action(
|
||||||
|
self,
|
||||||
|
action: str,
|
||||||
|
entry: Optional[CustomMirror],
|
||||||
|
data: List[CustomMirror]
|
||||||
|
) -> List[CustomMirror]:
|
||||||
|
if action == self._actions[0]: # add
|
||||||
|
new_mirror = self._add_custom_mirror()
|
||||||
|
if new_mirror is not None:
|
||||||
|
data = [d for d in data if d.name != new_mirror.name]
|
||||||
|
data += [new_mirror]
|
||||||
|
elif action == self._actions[1] and entry: # modify mirror
|
||||||
|
new_mirror = self._add_custom_mirror(entry)
|
||||||
|
if new_mirror is not None:
|
||||||
|
data = [d for d in data if d.name != entry.name]
|
||||||
|
data += [new_mirror]
|
||||||
|
elif action == self._actions[2] and entry: # delete
|
||||||
|
data = [d for d in data if d != entry]
|
||||||
|
|
||||||
|
return data
|
||||||
|
|
||||||
|
def _add_custom_mirror(self, mirror: Optional[CustomMirror] = None) -> Optional[CustomMirror]:
|
||||||
|
prompt = '\n\n' + str(_('Enter name (leave blank to skip): '))
|
||||||
|
existing_name = mirror.name if mirror else ''
|
||||||
|
|
||||||
|
while True:
|
||||||
|
name = TextInput(prompt, existing_name).run()
|
||||||
|
if not name:
|
||||||
|
return mirror
|
||||||
|
break
|
||||||
|
|
||||||
|
prompt = '\n' + str(_('Enter url (leave blank to skip): '))
|
||||||
|
existing_url = mirror.url if mirror else ''
|
||||||
|
|
||||||
|
while True:
|
||||||
|
url = TextInput(prompt, existing_url).run()
|
||||||
|
if not url:
|
||||||
|
return mirror
|
||||||
|
break
|
||||||
|
|
||||||
|
sign_check_choice = Menu(
|
||||||
|
str(_('Select signature check option')),
|
||||||
|
[s.value for s in SignCheck],
|
||||||
|
skip=False,
|
||||||
|
clear_screen=False,
|
||||||
|
preset_values=mirror.sign_check.value if mirror else None
|
||||||
|
).run()
|
||||||
|
|
||||||
|
sign_option_choice = Menu(
|
||||||
|
str(_('Select signature option')),
|
||||||
|
[s.value for s in SignOption],
|
||||||
|
skip=False,
|
||||||
|
clear_screen=False,
|
||||||
|
preset_values=mirror.sign_option.value if mirror else None
|
||||||
|
).run()
|
||||||
|
|
||||||
|
return CustomMirror(
|
||||||
|
name,
|
||||||
|
url,
|
||||||
|
SignCheck(sign_check_choice.single_value),
|
||||||
|
SignOption(sign_option_choice.single_value)
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
|
class MirrorMenu(AbstractSubMenu):
|
||||||
|
def __init__(
|
||||||
|
self,
|
||||||
|
data_store: Dict[str, Any],
|
||||||
|
preset: Optional[MirrorConfiguration] = None
|
||||||
|
):
|
||||||
|
if preset:
|
||||||
|
self._preset = preset
|
||||||
|
else:
|
||||||
|
self._preset = MirrorConfiguration()
|
||||||
|
|
||||||
|
super().__init__(data_store=data_store)
|
||||||
|
|
||||||
|
def setup_selection_menu_options(self):
|
||||||
|
self._menu_options['mirror_regions'] = \
|
||||||
|
Selector(
|
||||||
|
_('Mirror region'),
|
||||||
|
lambda preset: select_mirror_regions(preset),
|
||||||
|
display_func=lambda x: ', '.join(x.keys()) if x else '',
|
||||||
|
default=self._preset.mirror_regions,
|
||||||
|
enabled=True)
|
||||||
|
self._menu_options['custom_mirrors'] = \
|
||||||
|
Selector(
|
||||||
|
_('Custom mirrors'),
|
||||||
|
lambda preset: select_custom_mirror(preset=preset),
|
||||||
|
display_func=lambda x: str(_('Defined')) if x else '',
|
||||||
|
preview_func=self._prev_custom_mirror,
|
||||||
|
default=self._preset.custom_mirrors,
|
||||||
|
enabled=True
|
||||||
|
)
|
||||||
|
|
||||||
|
def _prev_custom_mirror(self) -> Optional[str]:
|
||||||
|
selector = self._menu_options['custom_mirrors']
|
||||||
|
|
||||||
|
if selector.has_selection():
|
||||||
|
custom_mirrors: List[CustomMirror] = selector.current_selection # type: ignore
|
||||||
|
output = FormattedOutput.as_table(custom_mirrors)
|
||||||
|
return output.strip()
|
||||||
|
|
||||||
|
return None
|
||||||
|
|
||||||
|
def run(self, allow_reset: bool = True) -> Optional[MirrorConfiguration]:
|
||||||
|
super().run(allow_reset=allow_reset)
|
||||||
|
|
||||||
|
if self._data_store.get('mirror_regions', None) or self._data_store.get('custom_mirrors', None):
|
||||||
|
return MirrorConfiguration(
|
||||||
|
mirror_regions=self._data_store['mirror_regions'],
|
||||||
|
custom_mirrors=self._data_store['custom_mirrors'],
|
||||||
|
)
|
||||||
|
|
||||||
|
return None
|
||||||
|
|
||||||
|
|
||||||
|
def select_mirror_regions(preset_values: Dict[str, List[str]] = {}) -> Dict[str, List[str]]:
|
||||||
"""
|
"""
|
||||||
This function can sort /etc/pacman.d/mirrorlist according to the
|
Asks the user to select a mirror or region
|
||||||
mirror's URL prefix. By default places HTTPS before HTTP but it also
|
Usually this is combined with :ref:`archinstall.list_mirrors`.
|
||||||
preserves the country/rank-order.
|
|
||||||
|
|
||||||
This assumes /etc/pacman.d/mirrorlist looks like the following:
|
:return: The dictionary information about a mirror/region.
|
||||||
|
:rtype: dict
|
||||||
## Comment
|
|
||||||
Server = url
|
|
||||||
|
|
||||||
or
|
|
||||||
|
|
||||||
## Comment
|
|
||||||
#Server = url
|
|
||||||
|
|
||||||
But the Comments need to start with double-hashmarks to be distringuished
|
|
||||||
from server url definitions (commented or uncommented).
|
|
||||||
"""
|
"""
|
||||||
comments_and_whitespaces = b""
|
if preset_values is None:
|
||||||
sort_order += ['Unknown']
|
preselected = None
|
||||||
categories: Dict[str, List] = {key: [] for key in sort_order}
|
|
||||||
|
|
||||||
for line in raw_data.split(b"\n"):
|
|
||||||
if line[0:2] in (b'##', b''):
|
|
||||||
comments_and_whitespaces += line + b'\n'
|
|
||||||
elif line[:6].lower() == b'server' or line[:7].lower() == b'#server':
|
|
||||||
opening, url = line.split(b'=', 1)
|
|
||||||
opening, url = opening.strip(), url.strip()
|
|
||||||
if (category := url.split(b'://',1)[0].decode('UTF-8')) in categories:
|
|
||||||
categories[category].append(comments_and_whitespaces)
|
|
||||||
categories[category].append(opening + b' = ' + url + b'\n')
|
|
||||||
else:
|
|
||||||
categories["Unknown"].append(comments_and_whitespaces)
|
|
||||||
categories["Unknown"].append(opening + b' = ' + url + b'\n')
|
|
||||||
|
|
||||||
comments_and_whitespaces = b""
|
|
||||||
|
|
||||||
new_raw_data = b''
|
|
||||||
for category in sort_order + ["Unknown"]:
|
|
||||||
for line in categories[category]:
|
|
||||||
new_raw_data += line
|
|
||||||
|
|
||||||
return new_raw_data
|
|
||||||
|
|
||||||
|
|
||||||
def filter_mirrors_by_region(regions :str,
|
|
||||||
destination :str = '/etc/pacman.d/mirrorlist',
|
|
||||||
sort_order :List[str] = ["https", "http"],
|
|
||||||
*args :str,
|
|
||||||
**kwargs :str
|
|
||||||
) -> Union[bool, bytes]:
|
|
||||||
"""
|
|
||||||
This function will change the active mirrors on the live medium by
|
|
||||||
filtering which regions are active based on `regions`.
|
|
||||||
|
|
||||||
:param regions: A series of country codes separated by `,`. For instance `SE,US` for sweden and United States.
|
|
||||||
:type regions: str
|
|
||||||
"""
|
|
||||||
region_list = [f'country={region}' for region in regions.split(',')]
|
|
||||||
response = urllib.request.urlopen(urllib.request.Request(f"https://archlinux.org/mirrorlist/?{'&'.join(region_list)}&protocol=https&protocol=http&ip_version=4&ip_version=6&use_mirror_status=on'", headers={'User-Agent': 'ArchInstall'}))
|
|
||||||
new_list = response.read().replace(b"#Server", b"Server")
|
|
||||||
|
|
||||||
if sort_order:
|
|
||||||
new_list = sort_mirrorlist(new_list, sort_order=sort_order)
|
|
||||||
|
|
||||||
if destination:
|
|
||||||
with open(destination, "wb") as mirrorlist:
|
|
||||||
mirrorlist.write(new_list)
|
|
||||||
|
|
||||||
return True
|
|
||||||
else:
|
else:
|
||||||
return new_list.decode('UTF-8')
|
preselected = list(preset_values.keys())
|
||||||
|
|
||||||
|
mirrors = list_mirrors()
|
||||||
|
|
||||||
|
choice = Menu(
|
||||||
|
_('Select one of the regions to download packages from'),
|
||||||
|
list(mirrors.keys()),
|
||||||
|
preset_values=preselected,
|
||||||
|
multi=True,
|
||||||
|
allow_reset=True
|
||||||
|
).run()
|
||||||
|
|
||||||
|
match choice.type_:
|
||||||
|
case MenuSelectionType.Reset:
|
||||||
|
return {}
|
||||||
|
case MenuSelectionType.Skip:
|
||||||
|
return preset_values
|
||||||
|
case MenuSelectionType.Selection:
|
||||||
|
return {selected: mirrors[selected] for selected in choice.multi_value}
|
||||||
|
|
||||||
|
return {}
|
||||||
|
|
||||||
|
|
||||||
def add_custom_mirrors(mirrors: List[CustomMirror]) -> bool:
|
def select_custom_mirror(prompt: str = '', preset: List[CustomMirror] = []):
|
||||||
|
custom_mirrors = CustomMirrorList(prompt, preset).run()
|
||||||
|
return custom_mirrors
|
||||||
|
|
||||||
|
|
||||||
|
def add_custom_mirrors(mirrors: List[CustomMirror]):
|
||||||
"""
|
"""
|
||||||
This will append custom mirror definitions in pacman.conf
|
This will append custom mirror definitions in pacman.conf
|
||||||
|
|
||||||
|
|
@ -102,99 +282,57 @@ def add_custom_mirrors(mirrors: List[CustomMirror]) -> bool:
|
||||||
"""
|
"""
|
||||||
with open('/etc/pacman.conf', 'a') as pacman:
|
with open('/etc/pacman.conf', 'a') as pacman:
|
||||||
for mirror in mirrors:
|
for mirror in mirrors:
|
||||||
pacman.write(f"[{mirror.name}]\n")
|
pacman.write(f"\n\n[{mirror.name}]\n")
|
||||||
pacman.write(f"SigLevel = {mirror.signcheck} {mirror.signoptions}\n")
|
pacman.write(f"SigLevel = {mirror.sign_check.value} {mirror.sign_option.value}\n")
|
||||||
pacman.write(f"Server = {mirror.url}\n")
|
pacman.write(f"Server = {mirror.url}\n")
|
||||||
|
|
||||||
return True
|
|
||||||
|
|
||||||
|
|
||||||
def insert_mirrors(mirrors :Dict[str, Any], *args :str, **kwargs :str) -> bool:
|
|
||||||
"""
|
|
||||||
This function will insert a given mirror-list at the top of `/etc/pacman.d/mirrorlist`.
|
|
||||||
It will not flush any other mirrors, just insert new ones.
|
|
||||||
|
|
||||||
:param mirrors: A dictionary of `{'url' : 'country', 'url2' : 'country'}`
|
|
||||||
:type mirrors: dict
|
|
||||||
"""
|
|
||||||
original_mirrorlist = ''
|
|
||||||
with open('/etc/pacman.d/mirrorlist', 'r') as original:
|
|
||||||
original_mirrorlist = original.read()
|
|
||||||
|
|
||||||
with open('/etc/pacman.d/mirrorlist', 'w') as new_mirrorlist:
|
|
||||||
for mirror, country in mirrors.items():
|
|
||||||
new_mirrorlist.write(f'## {country}\n')
|
|
||||||
new_mirrorlist.write(f'Server = {mirror}\n')
|
|
||||||
new_mirrorlist.write('\n')
|
|
||||||
new_mirrorlist.write(original_mirrorlist)
|
|
||||||
|
|
||||||
return True
|
|
||||||
|
|
||||||
|
|
||||||
def use_mirrors(
|
def use_mirrors(
|
||||||
regions: Dict[str, Iterable[str]],
|
regions: Dict[str, List[str]],
|
||||||
destination: str = '/etc/pacman.d/mirrorlist'
|
destination: str = '/etc/pacman.d/mirrorlist'
|
||||||
):
|
):
|
||||||
info(f'A new package mirror-list has been created: {destination}')
|
with open(destination, 'w') as fp:
|
||||||
with open(destination, 'w') as mirrorlist:
|
|
||||||
for region, mirrors in regions.items():
|
for region, mirrors in regions.items():
|
||||||
for mirror in mirrors:
|
for mirror in mirrors:
|
||||||
mirrorlist.write(f'## {region}\n')
|
fp.write(f'## {region}\n')
|
||||||
mirrorlist.write(f'Server = {mirror}\n')
|
fp.write(f'Server = {mirror}\n')
|
||||||
|
|
||||||
|
info(f'A new package mirror-list has been created: {destination}')
|
||||||
|
|
||||||
|
|
||||||
def re_rank_mirrors(
|
def _parse_mirror_list(mirrorlist: str) -> Dict[str, List[str]]:
|
||||||
top: int = 10,
|
file_content = mirrorlist.split('\n')
|
||||||
src: str = '/etc/pacman.d/mirrorlist',
|
file_content = list(filter(lambda x: x, file_content)) # filter out empty lines
|
||||||
dst: str = '/etc/pacman.d/mirrorlist',
|
first_srv_idx = [idx for idx, line in enumerate(file_content) if 'server' in line.lower()][0]
|
||||||
) -> bool:
|
mirrors = file_content[first_srv_idx - 1:]
|
||||||
try:
|
|
||||||
cmd = SysCommand(f"/usr/bin/rankmirrors -n {top} {src}")
|
mirror_list: Dict[str, List[str]] = {}
|
||||||
except SysCallError:
|
|
||||||
return False
|
for idx in range(0, len(mirrors), 2):
|
||||||
with open(dst, 'w') as f:
|
region = mirrors[idx].removeprefix('## ')
|
||||||
f.write(str(cmd))
|
url = mirrors[idx + 1].removeprefix('#').removeprefix('Server = ')
|
||||||
return True
|
mirror_list.setdefault(region, []).append(url)
|
||||||
|
|
||||||
|
return mirror_list
|
||||||
|
|
||||||
|
|
||||||
def list_mirrors(sort_order :List[str] = ["https", "http"]) -> Dict[str, Any]:
|
def list_mirrors() -> Dict[str, List[str]]:
|
||||||
regions: Dict[str, Dict[str, Any]] = {}
|
regions: Dict[str, List[str]] = {}
|
||||||
|
|
||||||
if storage['arguments']['offline']:
|
if storage['arguments']['offline']:
|
||||||
with pathlib.Path('/etc/pacman.d/mirrorlist').open('rb') as fh:
|
with pathlib.Path('/etc/pacman.d/mirrorlist').open('r') as fp:
|
||||||
mirrorlist = fh.read()
|
mirrorlist = fp.read()
|
||||||
else:
|
else:
|
||||||
url = "https://archlinux.org/mirrorlist/?protocol=https&protocol=http&ip_version=4&ip_version=6&use_mirror_status=on"
|
url = "https://archlinux.org/mirrorlist/?protocol=https&protocol=http&ip_version=4&ip_version=6&use_mirror_status=on"
|
||||||
|
|
||||||
try:
|
try:
|
||||||
response = urllib.request.urlopen(url)
|
mirrorlist = fetch_data_from_url(url)
|
||||||
except urllib.error.URLError as err:
|
except ValueError as err:
|
||||||
warn(f'Could not fetch an active mirror-list: {err}')
|
warn(f'Could not fetch an active mirror-list: {err}')
|
||||||
return regions
|
return regions
|
||||||
|
|
||||||
mirrorlist = response.read()
|
regions = _parse_mirror_list(mirrorlist)
|
||||||
|
sorted_regions = {}
|
||||||
|
for region, urls in regions.items():
|
||||||
|
sorted_regions[region] = sorted(urls, reverse=True)
|
||||||
|
|
||||||
if sort_order:
|
return sorted_regions
|
||||||
mirrorlist = sort_mirrorlist(mirrorlist, sort_order=sort_order)
|
|
||||||
|
|
||||||
region = 'Unknown region'
|
|
||||||
for line in mirrorlist.split(b'\n'):
|
|
||||||
if len(line.strip()) == 0:
|
|
||||||
continue
|
|
||||||
|
|
||||||
clean_line = line.decode('UTF-8').strip('\n').strip('\r')
|
|
||||||
|
|
||||||
if clean_line[:3] == '## ':
|
|
||||||
region = clean_line[3:]
|
|
||||||
elif clean_line[:10] == '#Server = ':
|
|
||||||
regions.setdefault(region, {})
|
|
||||||
|
|
||||||
url = clean_line.lstrip('#Server = ')
|
|
||||||
regions[region][url] = True
|
|
||||||
elif clean_line.startswith('Server = '):
|
|
||||||
regions.setdefault(region, {})
|
|
||||||
|
|
||||||
url = clean_line.lstrip('Server = ')
|
|
||||||
regions[region][url] = True
|
|
||||||
|
|
||||||
return regions
|
|
||||||
|
|
|
||||||
|
|
@ -11,7 +11,7 @@ from archinstall.default_profiles.applications.pipewire import PipewireProfile
|
||||||
from archinstall.lib.configuration import ConfigurationOutput
|
from archinstall.lib.configuration import ConfigurationOutput
|
||||||
from archinstall.lib.installer import Installer
|
from archinstall.lib.installer import Installer
|
||||||
from archinstall.lib.menu import Menu
|
from archinstall.lib.menu import Menu
|
||||||
from archinstall.lib.mirrors import use_mirrors
|
from archinstall.lib.mirrors import use_mirrors, add_custom_mirrors
|
||||||
from archinstall.lib.models.bootloader import Bootloader
|
from archinstall.lib.models.bootloader import Bootloader
|
||||||
from archinstall.lib.models.network_configuration import NetworkConfigurationHandler
|
from archinstall.lib.models.network_configuration import NetworkConfigurationHandler
|
||||||
from archinstall.lib.networking import check_mirror_reachable
|
from archinstall.lib.networking import check_mirror_reachable
|
||||||
|
|
@ -45,7 +45,7 @@ def ask_user_questions():
|
||||||
global_menu.enable('keyboard-layout')
|
global_menu.enable('keyboard-layout')
|
||||||
|
|
||||||
# Set which region to download packages from during the installation
|
# Set which region to download packages from during the installation
|
||||||
global_menu.enable('mirror-region')
|
global_menu.enable('mirror_config')
|
||||||
|
|
||||||
global_menu.enable('sys-language')
|
global_menu.enable('sys-language')
|
||||||
|
|
||||||
|
|
@ -137,8 +137,11 @@ def perform_installation(mountpoint: Path):
|
||||||
installation.generate_key_files()
|
installation.generate_key_files()
|
||||||
|
|
||||||
# Set mirrors used by pacstrap (outside of installation)
|
# Set mirrors used by pacstrap (outside of installation)
|
||||||
if archinstall.arguments.get('mirror-region', None):
|
if mirror_config := archinstall.arguments.get('mirror_config', None):
|
||||||
use_mirrors(archinstall.arguments['mirror-region']) # Set the mirrors for the live medium
|
if mirror_config.mirror_regions:
|
||||||
|
use_mirrors(mirror_config.mirror_regions)
|
||||||
|
if mirror_config.custom_mirrors:
|
||||||
|
add_custom_mirrors(mirror_config.custom_mirrors)
|
||||||
|
|
||||||
installation.minimal_installation(
|
installation.minimal_installation(
|
||||||
testing=enable_testing,
|
testing=enable_testing,
|
||||||
|
|
@ -147,9 +150,8 @@ def perform_installation(mountpoint: Path):
|
||||||
locales=[locale]
|
locales=[locale]
|
||||||
)
|
)
|
||||||
|
|
||||||
if archinstall.arguments.get('mirror-region') is not None:
|
if mirror_config := archinstall.arguments.get('mirror_config', None):
|
||||||
if archinstall.arguments.get("mirrors", None) is not None:
|
installation.set_mirrors(mirror_config) # Set the mirrors in the installation medium
|
||||||
installation.set_mirrors(archinstall.arguments['mirror-region']) # Set the mirrors in the installation medium
|
|
||||||
|
|
||||||
if archinstall.arguments.get('swap'):
|
if archinstall.arguments.get('swap'):
|
||||||
installation.setup_swap('zram')
|
installation.setup_swap('zram')
|
||||||
|
|
|
||||||
|
|
@ -5,7 +5,7 @@ from typing import TYPE_CHECKING, Any, Dict
|
||||||
|
|
||||||
import archinstall
|
import archinstall
|
||||||
from archinstall import SysInfo, info, debug
|
from archinstall import SysInfo, info, debug
|
||||||
from archinstall.lib.mirrors import use_mirrors
|
from archinstall.lib import mirrors
|
||||||
from archinstall.lib import models
|
from archinstall.lib import models
|
||||||
from archinstall.lib import disk
|
from archinstall.lib import disk
|
||||||
from archinstall.lib.networking import check_mirror_reachable
|
from archinstall.lib.networking import check_mirror_reachable
|
||||||
|
|
@ -92,7 +92,7 @@ class SwissMainMenu(GlobalMenu):
|
||||||
match self._execution_mode:
|
match self._execution_mode:
|
||||||
case ExecutionMode.Full | ExecutionMode.Lineal:
|
case ExecutionMode.Full | ExecutionMode.Lineal:
|
||||||
options_list = [
|
options_list = [
|
||||||
'keyboard-layout', 'mirror-region', 'disk_config',
|
'keyboard-layout', 'mirror_config', 'disk_config',
|
||||||
'disk_encryption', 'swap', 'bootloader', 'hostname', '!root-password',
|
'disk_encryption', 'swap', 'bootloader', 'hostname', '!root-password',
|
||||||
'!users', 'profile_config', 'audio', 'kernels', 'packages', 'additional-repositories', 'nic',
|
'!users', 'profile_config', 'audio', 'kernels', 'packages', 'additional-repositories', 'nic',
|
||||||
'timezone', 'ntp'
|
'timezone', 'ntp'
|
||||||
|
|
@ -107,7 +107,7 @@ class SwissMainMenu(GlobalMenu):
|
||||||
mandatory_list = ['disk_config']
|
mandatory_list = ['disk_config']
|
||||||
case ExecutionMode.Only_OS:
|
case ExecutionMode.Only_OS:
|
||||||
options_list = [
|
options_list = [
|
||||||
'keyboard-layout', 'mirror-region','bootloader', 'hostname',
|
'keyboard-layout', 'mirror_config','bootloader', 'hostname',
|
||||||
'!root-password', '!users', 'profile_config', 'audio', 'kernels',
|
'!root-password', '!users', 'profile_config', 'audio', 'kernels',
|
||||||
'packages', 'additional-repositories', 'nic', 'timezone', 'ntp'
|
'packages', 'additional-repositories', 'nic', 'timezone', 'ntp'
|
||||||
]
|
]
|
||||||
|
|
@ -196,8 +196,11 @@ def perform_installation(mountpoint: Path, exec_mode: ExecutionMode):
|
||||||
installation.generate_key_files()
|
installation.generate_key_files()
|
||||||
|
|
||||||
# Set mirrors used by pacstrap (outside of installation)
|
# Set mirrors used by pacstrap (outside of installation)
|
||||||
if archinstall.arguments.get('mirror-region', None):
|
if mirror_config := archinstall.arguments.get('mirror_config', None):
|
||||||
use_mirrors(archinstall.arguments['mirror-region']) # Set the mirrors for the live medium
|
if mirror_config.mirror_regions:
|
||||||
|
mirrors.use_mirrors(mirror_config.mirror_regions)
|
||||||
|
if mirror_config.custom_mirrors:
|
||||||
|
mirrors.add_custom_mirrors(mirror_config.custom_mirrors)
|
||||||
|
|
||||||
installation.minimal_installation(
|
installation.minimal_installation(
|
||||||
testing=enable_testing,
|
testing=enable_testing,
|
||||||
|
|
@ -206,10 +209,8 @@ def perform_installation(mountpoint: Path, exec_mode: ExecutionMode):
|
||||||
locales=[locale]
|
locales=[locale]
|
||||||
)
|
)
|
||||||
|
|
||||||
if archinstall.arguments.get('mirror-region') is not None:
|
if mirror_config := archinstall.arguments.get('mirror_config', None):
|
||||||
if archinstall.arguments.get("mirrors", None) is not None:
|
installation.set_mirrors(mirror_config) # Set the mirrors in the installation medium
|
||||||
installation.set_mirrors(
|
|
||||||
archinstall.arguments['mirror-region']) # Set the mirrors in the installation medium
|
|
||||||
|
|
||||||
if archinstall.arguments.get('swap'):
|
if archinstall.arguments.get('swap'):
|
||||||
installation.setup_swap('zram')
|
installation.setup_swap('zram')
|
||||||
|
|
|
||||||
|
|
@ -24,7 +24,7 @@ def ask_user_questions():
|
||||||
global_menu.enable('keyboard-layout')
|
global_menu.enable('keyboard-layout')
|
||||||
|
|
||||||
# Set which region to download packages from during the installation
|
# Set which region to download packages from during the installation
|
||||||
global_menu.enable('mirror-region')
|
global_menu.enable('mirror_config')
|
||||||
|
|
||||||
global_menu.enable('sys-language')
|
global_menu.enable('sys-language')
|
||||||
|
|
||||||
|
|
@ -116,8 +116,11 @@ def perform_installation(mountpoint: Path):
|
||||||
installation.generate_key_files()
|
installation.generate_key_files()
|
||||||
|
|
||||||
# Set mirrors used by pacstrap (outside of installation)
|
# Set mirrors used by pacstrap (outside of installation)
|
||||||
if archinstall.arguments.get('mirror-region', None):
|
if mirror_config := archinstall.arguments.get('mirror_config', None):
|
||||||
mirrors.use_mirrors(archinstall.arguments['mirror-region']) # Set the mirrors for the live medium
|
if mirror_config.mirror_regions:
|
||||||
|
mirrors.use_mirrors(mirror_config.mirror_regions)
|
||||||
|
if mirror_config.custom_mirrors:
|
||||||
|
mirrors.add_custom_mirrors(mirror_config.custom_mirrors)
|
||||||
|
|
||||||
installation.minimal_installation(
|
installation.minimal_installation(
|
||||||
testing=enable_testing,
|
testing=enable_testing,
|
||||||
|
|
@ -126,9 +129,8 @@ def perform_installation(mountpoint: Path):
|
||||||
locales=[locale]
|
locales=[locale]
|
||||||
)
|
)
|
||||||
|
|
||||||
if archinstall.arguments.get('mirror-region') is not None:
|
if mirror_config := archinstall.arguments.get('mirror_config', None):
|
||||||
if archinstall.arguments.get("mirrors", None) is not None:
|
installation.set_mirrors(mirror_config) # Set the mirrors in the installation medium
|
||||||
installation.set_mirrors(archinstall.arguments['mirror-region']) # Set the mirrors in the installation medium
|
|
||||||
|
|
||||||
if archinstall.arguments.get('swap'):
|
if archinstall.arguments.get('swap'):
|
||||||
installation.setup_swap('zram')
|
installation.setup_swap('zram')
|
||||||
|
|
|
||||||
Loading…
Reference in New Issue