Added type annotations to all functions (#845)

* Added type annotations for 1/5 of the files.

There's bound to be some issues with type miss-match, will sort that out later.

* Added type hints for 4/5 of the code

* Added type hints for 4.7/5 of the code

* Added type hints for 5/5 of the code base

* Split the linters into individual files

This should help with more clearly show which runner is breaking since they don't share a single common name any longer. Also moved mypy settings into pyproject.toml

* Fixed some of the last flake8 issues

* Missing parameter

* Fixed invalid lookahead types

* __future__ had to be at the top

* Fixed last flake8 issues
This commit is contained in:
Anton Hvornum 2022-01-06 22:01:15 +01:00 committed by GitHub
parent 015cd2a59f
commit e32cf71ae7
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
28 changed files with 524 additions and 362 deletions

12
.github/workflows/bandit.yaml vendored Normal file
View File

@ -0,0 +1,12 @@
on: [ push, pull_request ]
name: Bandit security checkup
jobs:
flake8:
runs-on: ubuntu-latest
container:
image: archlinux:latest
steps:
- uses: actions/checkout@v2
- run: pacman --noconfirm -Syu bandit
- name: Security checkup with Bandit
run: bandit -r archinstall || exit 0

14
.github/workflows/flake8.yaml vendored Normal file
View File

@ -0,0 +1,14 @@
on: [ push, pull_request ]
name: flake8 linting (15 ignores)
jobs:
flake8:
runs-on: ubuntu-latest
container:
image: archlinux:latest
steps:
- uses: actions/checkout@v2
- run: pacman --noconfirm -Syu python python-pip
- run: python -m pip install --upgrade pip
- run: pip install flake8
- name: Lint with flake8
run: flake8

View File

@ -1,36 +0,0 @@
on: [ push, pull_request ]
name: Lint Python and Find Syntax Errors
jobs:
mypy:
runs-on: ubuntu-latest
container:
image: archlinux:latest
steps:
- uses: actions/checkout@v2
- run: pacman --noconfirm -Syu python mypy
- name: run mypy
run: mypy . --ignore-missing-imports || exit 0
flake8:
runs-on: ubuntu-latest
container:
image: archlinux:latest
steps:
- uses: actions/checkout@v2
- run: pacman --noconfirm -Syu python python-pip
- run: python -m pip install --upgrade pip
- run: pip install flake8
- name: Lint with flake8
run: flake8 # See the .flake8 file for runtime parameters
pytest:
runs-on: ubuntu-latest
container:
image: archlinux:latest
steps:
- uses: actions/checkout@v2
- run: pacman --noconfirm -Syu python python-pip
- run: python -m pip install --upgrade pip
- run: pip install pytest
# TODO: Add tests and enable pytest checks.
# - name: Test with pytest
# run: |
# pytest

16
.github/workflows/mypy.yaml vendored Normal file
View File

@ -0,0 +1,16 @@
on: [ push, pull_request ]
name: mypy type checking
jobs:
mypy:
runs-on: ubuntu-latest
container:
image: archlinux:latest
steps:
- uses: actions/checkout@v2
- run: pacman --noconfirm -Syu python mypy python-pip
- run: python -m pip install --upgrade pip
- run: pip install fastapi pydantic
- run: python --version
- run: mypy --version
- name: run mypy
run: mypy --strict --module archinstall || exit 0

15
.github/workflows/pytest.yaml vendored Normal file
View File

@ -0,0 +1,15 @@
on: [ push, pull_request ]
name: pytest test validation
jobs:
pytest:
runs-on: ubuntu-latest
container:
image: archlinux:latest
options: --privileged
steps:
- uses: actions/checkout@v2
- run: pacman --noconfirm -Syu python python-pip qemu gcc
- run: python -m pip install --upgrade pip
- run: pip install pytest
- name: Test with pytest
run: python -m pytest || exit 0

View File

@ -3,7 +3,7 @@
<!-- </div> -->
# Arch Installer
[![Lint Python and Find Syntax Errors](https://github.com/archlinux/archinstall/actions/workflows/lint-python.yaml/badge.svg)](https://github.com/archlinux/archinstall/actions/workflows/lint-python.yaml)
[![Lint Python and Find Syntax Errors](https://github.com/archlinux/archinstall/actions/workflows/flake8.yaml/badge.svg)](https://github.com/archlinux/archinstall/actions/workflows/flake8.yaml)
Just another guided/automated [Arch Linux](https://wiki.archlinux.org/index.php/Arch_Linux) installer with a twist.
The installer also doubles as a python library to install Arch Linux and manage services, packages and other things inside the installed system *(Usually from a live medium)*.

View File

@ -1,14 +1,20 @@
from __future__ import annotations
import os
import json
import logging
import time
from typing import Optional, Dict, Any, Iterator, Tuple, List, TYPE_CHECKING
# https://stackoverflow.com/a/39757388/929999
if TYPE_CHECKING:
from .partition import Partition
from ..exceptions import DiskError
from ..output import log
from ..general import SysCommand
from ..storage import storage
class BlockDevice:
def __init__(self, path, info=None):
def __init__(self, path :str, info :Optional[Dict[str, Any]] = None):
if not info:
from .helpers import all_disks
# If we don't give any information, we need to auto-fill it.
@ -24,32 +30,32 @@ class BlockDevice:
# It's actually partition-encryption, but for future-proofing this
# I'm placing the encryption password on a BlockDevice level.
def __repr__(self, *args, **kwargs):
def __repr__(self, *args :str, **kwargs :str) -> str:
return f"BlockDevice({self.device_or_backfile}, size={self.size}GB, free_space={'+'.join(part[2] for part in self.free_space)}, bus_type={self.bus_type})"
def __iter__(self):
def __iter__(self) -> Iterator[Partition]:
for partition in self.partitions:
yield self.partitions[partition]
def __getitem__(self, key, *args, **kwargs):
def __getitem__(self, key :str, *args :str, **kwargs :str) -> Any:
if key not in self.info:
raise KeyError(f'{self} does not contain information: "{key}"')
return self.info[key]
def __len__(self):
def __len__(self) -> int:
return len(self.partitions)
def __lt__(self, left_comparitor):
def __lt__(self, left_comparitor :'BlockDevice') -> bool:
return self.path < left_comparitor.path
def json(self):
def json(self) -> str:
"""
json() has precedence over __dump__, so this is a way
to give less/partial information for user readability.
"""
return self.path
def __dump__(self):
def __dump__(self) -> Dict[str, Dict[str, Any]]:
return {
self.path : {
'partuuid' : self.uuid,
@ -59,14 +65,14 @@ class BlockDevice:
}
@property
def partition_type(self):
def partition_type(self) -> str:
output = json.loads(SysCommand(f"lsblk --json -o+PTTYPE {self.path}").decode('UTF-8'))
for device in output['blockdevices']:
return device['pttype']
@property
def device_or_backfile(self):
def device_or_backfile(self) -> str:
"""
Returns the actual device-endpoint of the BlockDevice.
If it's a loop-back-device it returns the back-file,
@ -82,7 +88,7 @@ class BlockDevice:
return self.device
@property
def device(self):
def device(self) -> str:
"""
Returns the device file of the BlockDevice.
If it's a loop-back-device it returns the /dev/X device,
@ -108,7 +114,7 @@ class BlockDevice:
# raise DiskError(f'Selected disk "{full_path}" is not a block device.')
@property
def partitions(self):
def partitions(self) -> Dict[str, Partition]:
from .filesystem import Partition
self.partprobe()
@ -133,17 +139,19 @@ class BlockDevice:
return {k: self.part_cache[k] for k in sorted(self.part_cache)}
@property
def partition(self):
def partition(self) -> Partition:
all_partitions = self.partitions
return [all_partitions[k] for k in all_partitions]
@property
def partition_table_type(self):
def partition_table_type(self) -> int:
# TODO: Don't hardcode :)
# Remove if we don't use this function anywhere
from .filesystem import GPT
return GPT
@property
def uuid(self):
def uuid(self) -> str:
log('BlockDevice().uuid is untested!', level=logging.WARNING, fg='yellow')
"""
Returns the disk UUID as returned by lsblk.
@ -153,7 +161,7 @@ class BlockDevice:
return SysCommand(f'blkid -s PTUUID -o value {self.path}').decode('UTF-8')
@property
def size(self):
def size(self) -> float:
from .helpers import convert_size_to_gb
output = json.loads(SysCommand(f"lsblk --json -b -o+SIZE {self.path}").decode('UTF-8'))
@ -162,21 +170,21 @@ class BlockDevice:
return convert_size_to_gb(device['size'])
@property
def bus_type(self):
def bus_type(self) -> str:
output = json.loads(SysCommand(f"lsblk --json -o+ROTA,TRAN {self.path}").decode('UTF-8'))
for device in output['blockdevices']:
return device['tran']
@property
def spinning(self):
def spinning(self) -> bool:
output = json.loads(SysCommand(f"lsblk --json -o+ROTA,TRAN {self.path}").decode('UTF-8'))
for device in output['blockdevices']:
return device['rota'] is True
@property
def free_space(self):
def free_space(self) -> Tuple[str, str, str]:
# NOTE: parted -s will default to `cancel` on prompt, skipping any partition
# that is "outside" the disk. in /dev/sr0 this is usually the case with Archiso,
# so the free will ignore the ESP partition and just give the "free" space.
@ -187,7 +195,7 @@ class BlockDevice:
yield (start, end, size)
@property
def largest_free_space(self):
def largest_free_space(self) -> List[str]:
info = []
for space_info in self.free_space:
if not info:
@ -199,7 +207,7 @@ class BlockDevice:
return info
@property
def first_free_sector(self):
def first_free_sector(self) -> str:
if info := self.largest_free_space:
start = info[0]
else:
@ -207,29 +215,29 @@ class BlockDevice:
return start
@property
def first_end_sector(self):
def first_end_sector(self) -> str:
if info := self.largest_free_space:
end = info[1]
else:
end = f"{self.size}GB"
return end
def partprobe(self):
SysCommand(['partprobe', self.path])
def partprobe(self) -> bool:
return SysCommand(['partprobe', self.path]).exit_code == 0
def has_partitions(self):
def has_partitions(self) -> int:
return len(self.partitions)
def has_mount_point(self, mountpoint):
def has_mount_point(self, mountpoint :str) -> bool:
for partition in self.partitions:
if self.partitions[partition].mountpoint == mountpoint:
return True
return False
def flush_cache(self):
def flush_cache(self) -> None:
self.part_cache = {}
def get_partition(self, uuid):
def get_partition(self, uuid :str) -> Partition:
count = 0
while count < 5:
for partition_uuid, partition in self.partitions.items():

View File

@ -1,7 +1,12 @@
from __future__ import annotations
import pathlib
import glob
import logging
from typing import Union
from typing import Union, Dict, TYPE_CHECKING
# https://stackoverflow.com/a/39757388/929999
if TYPE_CHECKING:
from ..installer import Installer
from .helpers import get_mount_info
from ..exceptions import DiskError
from ..general import SysCommand
@ -9,7 +14,7 @@ from ..output import log
from .partition import Partition
def mount_subvolume(installation, subvolume_location :Union[pathlib.Path, str], force=False) -> bool:
def mount_subvolume(installation :Installer, subvolume_location :Union[pathlib.Path, str], force=False) -> bool:
"""
This function uses mount to mount a subvolume on a given device, at a given location with a given subvolume name.
@ -42,7 +47,7 @@ def mount_subvolume(installation, subvolume_location :Union[pathlib.Path, str],
return SysCommand(f"mount {mount_information['source']} {target} -o subvol=@{subvolume_location}").exit_code == 0
def create_subvolume(installation, subvolume_location :Union[pathlib.Path, str]) -> bool:
def create_subvolume(installation :Installer, subvolume_location :Union[pathlib.Path, str]) -> bool:
"""
This function uses btrfs to create a subvolume.
@ -75,7 +80,12 @@ def create_subvolume(installation, subvolume_location :Union[pathlib.Path, str])
if (cmd := SysCommand(f"btrfs subvolume create {target}")).exit_code != 0:
raise DiskError(f"Could not create a subvolume at {target}: {cmd}")
def manage_btrfs_subvolumes(installation, partition :dict, mountpoints :dict, subvolumes :dict, unlocked_device :dict = None):
def manage_btrfs_subvolumes(installation :Installer,
partition :Dict[str, str],
mountpoints :Dict[str, str],
subvolumes :Dict[str, str],
unlocked_device :Dict[str, str] = None
) -> None:
""" we do the magic with subvolumes in a centralized place
parameters:
* the installation object

View File

@ -1,7 +1,13 @@
from __future__ import annotations
import time
import logging
import json
import pathlib
from typing import Optional, Dict, Any, TYPE_CHECKING
# https://stackoverflow.com/a/39757388/929999
if TYPE_CHECKING:
from .blockdevice import BlockDevice
from .partition import Partition
from .validators import valid_fs_type
from ..exceptions import DiskError
@ -16,24 +22,25 @@ class Filesystem:
# TODO:
# When instance of a HDD is selected, check all usages and gracefully unmount them
# as well as close any crypto handles.
def __init__(self, blockdevice, mode):
def __init__(self, blockdevice :BlockDevice, mode :int):
self.blockdevice = blockdevice
self.mode = mode
def __enter__(self, *args, **kwargs):
def __enter__(self, *args :str, **kwargs :str) -> 'Filesystem':
return self
def __repr__(self):
def __repr__(self) -> str:
return f"Filesystem(blockdevice={self.blockdevice}, mode={self.mode})"
def __exit__(self, *args, **kwargs):
def __exit__(self, *args :str, **kwargs :str) -> bool:
# TODO: https://stackoverflow.com/questions/28157929/how-to-safely-handle-an-exception-inside-a-context-manager
if len(args) >= 2 and args[1]:
raise args[1]
SysCommand('sync')
return True
def partuuid_to_index(self, uuid):
def partuuid_to_index(self, uuid :str) -> Optional[int]:
for i in range(storage['DISK_RETRY_ATTEMPTS']):
self.partprobe()
time.sleep(5)
@ -50,7 +57,7 @@ class Filesystem:
raise DiskError(f"Failed to convert PARTUUID {uuid} to a partition index number on blockdevice {self.blockdevice.device}")
def load_layout(self, layout :dict):
def load_layout(self, layout :Dict[str, Any]) -> None:
from ..luks import luks2
# If the layout tells us to wipe the drive, we do so
@ -127,21 +134,21 @@ class Filesystem:
log(f"Marking partition {partition['device_instance']} as bootable.")
self.set(self.partuuid_to_index(partition['device_instance'].uuid), 'boot on')
def find_partition(self, mountpoint):
def find_partition(self, mountpoint :str) -> Partition:
for partition in self.blockdevice:
if partition.target_mountpoint == mountpoint or partition.mountpoint == mountpoint:
return partition
def partprobe(self):
SysCommand(f'bash -c "partprobe"')
def partprobe(self) -> bool:
return SysCommand(f'bash -c "partprobe"').exit_code == 0
def raw_parted(self, string: str):
def raw_parted(self, string: str) -> SysCommand:
if (cmd_handle := SysCommand(f'/usr/bin/parted -s {string}')).exit_code != 0:
log(f"Parted ended with a bad exit code: {cmd_handle}", level=logging.ERROR, fg="red")
time.sleep(0.5)
return cmd_handle
def parted(self, string: str):
def parted(self, string: str) -> bool:
"""
Performs a parted execution of the given string
@ -149,16 +156,17 @@ class Filesystem:
:type string: str
"""
if (parted_handle := self.raw_parted(string)).exit_code == 0:
self.partprobe()
return True
if self.partprobe():
return True
return False
else:
raise DiskError(f"Parted failed to add a partition: {parted_handle}")
def use_entire_disk(self, root_filesystem_type='ext4') -> Partition:
def use_entire_disk(self, root_filesystem_type :str = 'ext4') -> Partition:
# TODO: Implement this with declarative profiles instead.
raise ValueError("Installation().use_entire_disk() has to be re-worked.")
def add_partition(self, partition_type, start, end, partition_format=None):
def add_partition(self, partition_type :str, start :str, end :str, partition_format :Optional[str] = None) -> None:
log(f'Adding partition to {self.blockdevice}, {start}->{end}', level=logging.INFO)
previous_partition_uuids = {partition.uuid for partition in self.blockdevice.partitions.values()}
@ -197,14 +205,14 @@ class Filesystem:
log("Add partition is exiting due to excessive wait time",level=logging.INFO)
raise DiskError(f"New partition never showed up after adding new partition on {self}.")
def set_name(self, partition: int, name: str):
def set_name(self, partition: int, name: str) -> bool:
return self.parted(f'{self.blockdevice.device} name {partition + 1} "{name}"') == 0
def set(self, partition: int, string: str):
def set(self, partition: int, string: str) -> bool:
log(f"Setting {string} on (parted) partition index {partition+1}", level=logging.INFO)
return self.parted(f'{self.blockdevice.device} set {partition + 1} {string}') == 0
def parted_mklabel(self, device: str, disk_label: str):
def parted_mklabel(self, device: str, disk_label: str) -> bool:
log(f"Creating a new partition label on {device}", level=logging.INFO, fg="yellow")
# Try to unmount devices before attempting to run mklabel
try:

View File

@ -1,10 +1,15 @@
from __future__ import annotations
import json
import logging
import os
import pathlib
import re
import time
from typing import Union
from typing import Union, List, Iterator, Dict, Optional, Any, TYPE_CHECKING
# https://stackoverflow.com/a/39757388/929999
if TYPE_CHECKING:
from .partition import Partition
from .blockdevice import BlockDevice
from ..exceptions import SysCallError, DiskError
from ..general import SysCommand
@ -14,10 +19,10 @@ from ..storage import storage
ROOT_DIR_PATTERN = re.compile('^.*?/devices')
GIGA = 2 ** 30
def convert_size_to_gb(size):
def convert_size_to_gb(size :Union[int, float]) -> float:
return round(size / GIGA,1)
def sort_block_devices_based_on_performance(block_devices):
def sort_block_devices_based_on_performance(block_devices :List[BlockDevice]) -> Dict[BlockDevice, int]:
result = {device: 0 for device in block_devices}
for device, weight in result.items():
@ -35,12 +40,12 @@ def sort_block_devices_based_on_performance(block_devices):
return result
def filter_disks_below_size_in_gb(devices, gigabytes):
def filter_disks_below_size_in_gb(devices :List[BlockDevice], gigabytes :int) -> Iterator[BlockDevice]:
for disk in devices:
if disk.size >= gigabytes:
yield disk
def select_largest_device(devices, gigabytes, filter_out=None):
def select_largest_device(devices :List[BlockDevice], gigabytes :int, filter_out :Optional[List[BlockDevice]] = None) -> BlockDevice:
if not filter_out:
filter_out = []
@ -56,7 +61,7 @@ def select_largest_device(devices, gigabytes, filter_out=None):
return max(copy_devices, key=(lambda device : device.size))
def select_disk_larger_than_or_close_to(devices, gigabytes, filter_out=None):
def select_disk_larger_than_or_close_to(devices :List[BlockDevice], gigabytes :int, filter_out :Optional[List[BlockDevice]] = None) -> BlockDevice:
if not filter_out:
filter_out = []
@ -70,7 +75,7 @@ def select_disk_larger_than_or_close_to(devices, gigabytes, filter_out=None):
return min(copy_devices, key=(lambda device : abs(device.size - gigabytes)))
def convert_to_gigabytes(string):
def convert_to_gigabytes(string :str) -> float:
unit = string.strip()[-1]
size = float(string.strip()[:-1])
@ -81,7 +86,7 @@ def convert_to_gigabytes(string):
return size
def device_state(name, *args, **kwargs):
def device_state(name :str, *args :str, **kwargs :str) -> Optional[bool]:
# Based out of: https://askubuntu.com/questions/528690/how-to-get-list-of-all-non-removable-disk-device-names-ssd-hdd-and-sata-ide-onl/528709#528709
if os.path.isfile('/sys/block/{}/device/block/{}/removable'.format(name, name)):
with open('/sys/block/{}/device/block/{}/removable'.format(name, name)) as f:
@ -99,7 +104,7 @@ def device_state(name, *args, **kwargs):
return True
# lsblk --json -l -n -o path
def all_disks(*args, **kwargs):
def all_disks(*args :str, **kwargs :str) -> List[BlockDevice]:
kwargs.setdefault("partitions", False)
drives = {}
@ -113,7 +118,7 @@ def all_disks(*args, **kwargs):
return drives
def harddrive(size=None, model=None, fuzzy=False):
def harddrive(size :Optional[float] = None, model :Optional[str] = None, fuzzy :bool = False) -> Optional[BlockDevice]:
collection = all_disks()
for drive in collection:
if size and convert_to_gigabytes(collection[drive]['size']) != size:
@ -133,7 +138,7 @@ def split_bind_name(path :Union[pathlib.Path, str]) -> list:
bind_path = None
return device_path,bind_path
def get_mount_info(path :Union[pathlib.Path, str], traverse=False, return_real_path=False) -> dict:
def get_mount_info(path :Union[pathlib.Path, str], traverse :bool = False, return_real_path :bool = False) -> Dict[str, Any]:
device_path,bind_path = split_bind_name(path)
for traversal in list(map(str, [str(device_path)] + list(pathlib.Path(str(device_path)).parents))):
try:
@ -170,7 +175,7 @@ def get_mount_info(path :Union[pathlib.Path, str], traverse=False, return_real_p
return {}
def get_partitions_in_use(mountpoint) -> list:
def get_partitions_in_use(mountpoint :str) -> List[Partition]:
from .partition import Partition
try:
@ -193,7 +198,7 @@ def get_partitions_in_use(mountpoint) -> list:
return mounts
def get_filesystem_type(path):
def get_filesystem_type(path :str) -> Optional[str]:
device_name, bind_name = split_bind_name(path)
try:
return SysCommand(f"blkid -o value -s TYPE {device_name}").decode('UTF-8').strip()
@ -201,10 +206,10 @@ def get_filesystem_type(path):
return None
def disk_layouts():
def disk_layouts() -> Optional[Dict[str, Any]]:
try:
if (handle := SysCommand("lsblk -f -o+TYPE,SIZE -J")).exit_code == 0:
return json.loads(handle.decode('UTF-8'))
return {str(key): val for key, val in json.loads(handle.decode('UTF-8')).items()}
else:
log(f"Could not return disk layouts: {handle}", level=logging.WARNING, fg="yellow")
return None
@ -216,20 +221,22 @@ def disk_layouts():
return None
def encrypted_partitions(blockdevices :dict) -> bool:
def encrypted_partitions(blockdevices :Dict[str, Any]) -> bool:
for partition in blockdevices.values():
if partition.get('encrypted', False):
yield partition
def find_partition_by_mountpoint(block_devices, relative_mountpoint :str):
def find_partition_by_mountpoint(block_devices :List[BlockDevice], relative_mountpoint :str) -> Partition:
for device in block_devices:
for partition in block_devices[device]['partitions']:
if partition.get('mountpoint', None) == relative_mountpoint:
return partition
def partprobe():
SysCommand(f'bash -c "partprobe"')
time.sleep(5)
def partprobe() -> bool:
if SysCommand(f'bash -c "partprobe"').exit_code == 0:
time.sleep(5) # TODO: Remove, we should be relying on blkid instead of lsblk
return True
return False
def convert_device_to_uuid(path :str) -> str:
device_name, bind_name = split_bind_name(path)

View File

@ -5,7 +5,8 @@ import logging
import json
import os
import hashlib
from typing import Optional
from typing import Optional, Dict, Any, List, Union
from .blockdevice import BlockDevice
from .helpers import get_mount_info, get_filesystem_type, convert_size_to_gb, split_bind_name
from ..storage import storage
@ -15,7 +16,15 @@ from ..general import SysCommand
class Partition:
def __init__(self, path: str, block_device: BlockDevice, part_id=None, filesystem=None, mountpoint=None, encrypted=False, autodetect_filesystem=True):
def __init__(self,
path: str,
block_device: BlockDevice,
part_id :Optional[str] = None,
filesystem :Optional[str] = None,
mountpoint :Optional[str] = None,
encrypted :bool = False,
autodetect_filesystem :bool = True):
if not part_id:
part_id = os.path.basename(path)
@ -50,14 +59,16 @@ class Partition:
if self.filesystem == 'crypto_LUKS':
self.encrypted = True
def __lt__(self, left_comparitor):
def __lt__(self, left_comparitor :BlockDevice) -> bool:
if type(left_comparitor) == Partition:
left_comparitor = left_comparitor.path
else:
left_comparitor = str(left_comparitor)
return self.path < left_comparitor # Not quite sure the order here is correct. But /dev/nvme0n1p1 comes before /dev/nvme0n1p5 so seems correct.
def __repr__(self, *args, **kwargs):
# The goal is to check if /dev/nvme0n1p1 comes before /dev/nvme0n1p5
return self.path < left_comparitor
def __repr__(self, *args :str, **kwargs :str) -> str:
mount_repr = ''
if self.mountpoint:
mount_repr = f", mounted={self.mountpoint}"
@ -69,7 +80,7 @@ class Partition:
else:
return f'Partition(path={self.path}, size={self.size}, PARTUUID={self._safe_uuid}, fs={self.filesystem}{mount_repr})'
def __dump__(self):
def __dump__(self) -> Dict[str, Any]:
return {
'type': 'primary',
'PARTUUID': self._safe_uuid,
@ -86,14 +97,14 @@ class Partition:
}
@property
def sector_size(self):
def sector_size(self) -> Optional[int]:
output = json.loads(SysCommand(f"lsblk --json -o+LOG-SEC {self.device_path}").decode('UTF-8'))
for device in output['blockdevices']:
return device.get('log-sec', None)
@property
def start(self):
def start(self) -> Optional[str]:
output = json.loads(SysCommand(f"sfdisk --json {self.block_device.path}").decode('UTF-8'))
for partition in output.get('partitiontable', {}).get('partitions', []):
@ -101,7 +112,7 @@ class Partition:
return partition['start'] # * self.sector_size
@property
def end(self):
def end(self) -> Optional[str]:
# TODO: Verify that the logic holds up, that 'size' is the size without 'start' added to it.
output = json.loads(SysCommand(f"sfdisk --json {self.block_device.path}").decode('UTF-8'))
@ -110,7 +121,7 @@ class Partition:
return partition['size'] # * self.sector_size
@property
def size(self):
def size(self) -> Optional[float]:
for i in range(storage['DISK_RETRY_ATTEMPTS']):
self.partprobe()
@ -123,7 +134,7 @@ class Partition:
time.sleep(storage['DISK_TIMEOUTS'])
@property
def boot(self):
def boot(self) -> bool:
output = json.loads(SysCommand(f"sfdisk --json {self.block_device.path}").decode('UTF-8'))
# Get the bootable flag from the sfdisk output:
@ -143,7 +154,7 @@ class Partition:
return False
@property
def partition_type(self):
def partition_type(self) -> Optional[str]:
lsblk = json.loads(SysCommand(f"lsblk --json -o+PTTYPE {self.device_path}").decode('UTF-8'))
for device in lsblk['blockdevices']:
@ -179,19 +190,19 @@ class Partition:
return SysCommand(f'blkid -s PARTUUID -o value {self.device_path}').decode('UTF-8').strip()
@property
def encrypted(self):
def encrypted(self) -> Union[bool, None]:
return self._encrypted
@encrypted.setter
def encrypted(self, value: bool):
def encrypted(self, value: bool) -> None:
self._encrypted = value
@property
def parent(self):
def parent(self) -> str:
return self.real_device
@property
def real_device(self):
def real_device(self) -> str:
for blockdevice in json.loads(SysCommand('lsblk -J').decode('UTF-8'))['blockdevices']:
if parent := self.find_parent_of(blockdevice, os.path.basename(self.device_path)):
return f"/dev/{parent}"
@ -199,25 +210,27 @@ class Partition:
return self.path
@property
def device_path(self):
def device_path(self) -> str:
""" for bind mounts returns the phisical path of the partition
"""
device_path, bind_name = split_bind_name(self.path)
return device_path
@property
def bind_name(self):
def bind_name(self) -> str:
""" for bind mounts returns the bind name (subvolume path).
Returns none if this property does not exist
"""
device_path, bind_name = split_bind_name(self.path)
return bind_name
def partprobe(self):
SysCommand(f'bash -c "partprobe"')
time.sleep(1)
def partprobe(self) -> bool:
if SysCommand(f'bash -c "partprobe"').exit_code == 0:
time.sleep(1)
return True
return False
def detect_inner_filesystem(self, password):
def detect_inner_filesystem(self, password :str) -> Optional[str]:
log(f'Trying to detect inner filesystem format on {self} (This might take a while)', level=logging.INFO)
from ..luks import luks2
@ -227,7 +240,7 @@ class Partition:
except SysCallError:
return None
def has_content(self):
def has_content(self) -> bool:
fs_type = get_filesystem_type(self.path)
if not fs_type or "swap" in fs_type:
return False
@ -248,7 +261,7 @@ class Partition:
return True if files > 0 else False
def encrypt(self, *args, **kwargs):
def encrypt(self, *args :str, **kwargs :str) -> str:
"""
A wrapper function for luks2() instances and the .encrypt() method of that instance.
"""
@ -257,7 +270,7 @@ class Partition:
handle = luks2(self, None, None)
return handle.encrypt(self, *args, **kwargs)
def format(self, filesystem=None, path=None, log_formatting=True, options=[]):
def format(self, filesystem :Optional[str] = None, path :Optional[str] = None, log_formatting :bool = True, options :List[str] = []) -> bool:
"""
Format can be given an overriding path, for instance /dev/null to test
the formatting functionality and in essence the support for the given filesystem.
@ -342,7 +355,7 @@ class Partition:
return True
def find_parent_of(self, data, name, parent=None):
def find_parent_of(self, data :Dict[str, Any], name :str, parent :Optional[str] = None) -> Optional[str]:
if data['name'] == name:
return parent
elif 'children' in data:
@ -350,7 +363,7 @@ class Partition:
if parent := self.find_parent_of(child, name, parent=data['name']):
return parent
def mount(self, target, fs=None, options=''):
def mount(self, target :str, fs :Optional[str] = None, options :str = '') -> bool:
if not self.mountpoint:
log(f'Mounting {self} to {target}', level=logging.INFO)
if not fs:
@ -386,25 +399,24 @@ class Partition:
self.mountpoint = target
return True
def unmount(self):
try:
SysCommand(f"/usr/bin/umount {self.path}")
except SysCallError as err:
exit_code = err.exit_code
return False
# Without to much research, it seams that low error codes are errors.
# And above 8k is indicators such as "/dev/x not mounted.".
# So anything in between 0 and 8k are errors (?).
if 0 < exit_code < 8000:
raise err
def unmount(self) -> bool:
worker = SysCommand(f"/usr/bin/umount {self.path}")
# Without to much research, it seams that low error codes are errors.
# And above 8k is indicators such as "/dev/x not mounted.".
# So anything in between 0 and 8k are errors (?).
if 0 < worker.exit_code < 8000:
raise SysCallError(f"Could not unmount {self.path} properly: {worker}", exit_code=worker.exit_code)
self.mountpoint = None
return True
def umount(self):
def umount(self) -> bool:
return self.unmount()
def filesystem_supported(self):
def filesystem_supported(self) -> bool:
"""
The support for a filesystem (this partition) is tested by calling
partition.format() with a path set to '/dev/null' which returns two exceptions:
@ -420,7 +432,7 @@ class Partition:
return True
def get_mount_fs_type(fs):
def get_mount_fs_type(fs :str) -> str:
if fs == 'ntfs':
return 'ntfs3' # Needed to use the Paragon R/W NTFS driver
elif fs == 'fat32':

View File

@ -1,8 +1,17 @@
from __future__ import annotations
import logging
from typing import Optional, Dict, Any, List, TYPE_CHECKING
# https://stackoverflow.com/a/39757388/929999
if TYPE_CHECKING:
from .blockdevice import BlockDevice
from .helpers import sort_block_devices_based_on_performance, select_largest_device, select_disk_larger_than_or_close_to
from ..output import log
def suggest_single_disk_layout(block_device, default_filesystem=None, advanced_options=False):
def suggest_single_disk_layout(block_device :BlockDevice,
default_filesystem :Optional[str] = None,
advanced_options :bool = False) -> Dict[str, Any]:
if not default_filesystem:
from ..user_interaction import ask_for_main_filesystem_format
default_filesystem = ask_for_main_filesystem_format(advanced_options)
@ -94,7 +103,10 @@ def suggest_single_disk_layout(block_device, default_filesystem=None, advanced_o
return layout
def suggest_multi_disk_layout(block_devices, default_filesystem=None, advanced_options=False):
def suggest_multi_disk_layout(block_devices :List[BlockDevice],
default_filesystem :Optional[str] = None,
advanced_options :bool = False) -> Dict[str, Any]:
if not default_filesystem:
from ..user_interaction import ask_for_main_filesystem_format
default_filesystem = ask_for_main_filesystem_format(advanced_options)

View File

@ -1,4 +1,6 @@
def valid_parted_position(pos :str):
from typing import List
def valid_parted_position(pos :str) -> bool:
if not len(pos):
return False
@ -17,7 +19,7 @@ def valid_parted_position(pos :str):
return False
def fs_types():
def fs_types() -> List[str]:
# https://www.gnu.org/software/parted/manual/html_node/mkpart.html
# Above link doesn't agree with `man parted` /mkpart documentation:
"""

View File

@ -17,7 +17,7 @@ class ProfileError(BaseException):
class SysCallError(BaseException):
def __init__(self, message :str, exit_code :Optional[int]) -> None:
def __init__(self, message :str, exit_code :Optional[int] = None) -> None:
super(SysCallError, self).__init__(message)
self.message = message
self.exit_code = exit_code

View File

@ -1,3 +1,4 @@
from __future__ import annotations
import hashlib
import json
import logging
@ -9,7 +10,10 @@ import string
import sys
import time
from datetime import datetime, date
from typing import Callable, Optional, Dict, Any, List, Union, Iterator
from typing import Callable, Optional, Dict, Any, List, Union, Iterator, TYPE_CHECKING
# https://stackoverflow.com/a/39757388/929999
if TYPE_CHECKING:
from .installer import Installer
if sys.platform == 'linux':
from select import epoll, EPOLLIN, EPOLLHUP
@ -46,14 +50,14 @@ from .exceptions import RequirementError, SysCallError
from .output import log
from .storage import storage
def gen_uid(entropy_length=256):
def gen_uid(entropy_length :int = 256) -> str:
return hashlib.sha512(os.urandom(entropy_length)).hexdigest()
def generate_password(length=64):
def generate_password(length :int = 64) -> str:
haystack = string.printable # digits, ascii_letters, punctiation (!"#$[] etc) and whitespace
return ''.join(secrets.choice(haystack) for i in range(length))
def multisplit(s, splitters):
def multisplit(s :str, splitters :List[str]) -> str:
s = [s, ]
for key in splitters:
ns = []
@ -77,12 +81,12 @@ def locate_binary(name :str) -> str:
raise RequirementError(f"Binary {name} does not exist.")
def json_dumps(*args, **kwargs):
def json_dumps(*args :str, **kwargs :str) -> str:
return json.dumps(*args, **{**kwargs, 'cls': JSON})
class JsonEncoder:
@staticmethod
def _encode(obj):
def _encode(obj :Any) -> Any:
"""
This JSON encoder function will try it's best to convert
any archinstall data structures, instances or variables into
@ -119,7 +123,7 @@ class JsonEncoder:
return obj
@staticmethod
def _unsafe_encode(obj):
def _unsafe_encode(obj :Any) -> Any:
"""
Same as _encode() but it keeps dictionary keys starting with !
"""
@ -141,20 +145,20 @@ class JSON(json.JSONEncoder, json.JSONDecoder):
"""
A safe JSON encoder that will omit private information in dicts (starting with !)
"""
def _encode(self, obj):
def _encode(self, obj :Any) -> Any:
return JsonEncoder._encode(obj)
def encode(self, obj):
def encode(self, obj :Any) -> Any:
return super(JSON, self).encode(self._encode(obj))
class UNSAFE_JSON(json.JSONEncoder, json.JSONDecoder):
"""
UNSAFE_JSON will call/encode and keep private information in dicts (starting with !)
"""
def _encode(self, obj):
def _encode(self, obj :Any) -> Any:
return JsonEncoder._unsafe_encode(obj)
def encode(self, obj):
def encode(self, obj :Any) -> Any:
return super(UNSAFE_JSON, self).encode(self._encode(obj))
class SysCommandWorker:
@ -455,13 +459,17 @@ class SysCommand:
return None
def prerequisite_check():
if not os.path.isdir("/sys/firmware/efi"):
raise RequirementError("Archinstall only supports machines in UEFI mode.")
def prerequisite_check() -> bool:
"""
This function is used as a safety check before
continuing with an installation.
Could be anything from checking that /boot is big enough
to check if nvidia hardware exists when nvidia driver was chosen.
"""
return True
def reboot():
SysCommand("/usr/bin/reboot")
@ -473,12 +481,15 @@ def pid_exists(pid: int) -> bool:
return False
def run_custom_user_commands(commands, installation):
def run_custom_user_commands(commands :List[str], installation :Installer) -> None:
for index, command in enumerate(commands):
log(f'Executing custom command "{command}" ...', fg='yellow')
log(f'Executing custom command "{command}" ...', level=logging.INFO)
with open(f"{installation.target}/var/tmp/user-command.{index}.sh", "w") as temp_script:
temp_script.write(command)
execution_output = SysCommand(f"arch-chroot {installation.target} bash /var/tmp/user-command.{index}.sh")
log(execution_output)
os.unlink(f"{installation.target}/var/tmp/user-command.{index}.sh")

View File

@ -1,5 +1,4 @@
import time
from typing import Union
import logging
import os
import shutil
@ -7,6 +6,7 @@ import shlex
import pathlib
import subprocess
import glob
from typing import Union, Dict, Any, List, ModuleType, Optional, Iterator, Mapping
from .disk import get_partitions_in_use, Partition
from .general import SysCommand, generate_password
from .hardware import has_uefi, is_vm, cpu_vendor
@ -30,29 +30,29 @@ __accessibility_packages__ = ["brltty", "espeakup", "alsa-utils"]
class InstallationFile:
def __init__(self, installation, filename, owner, mode="w"):
def __init__(self, installation :'Installer', filename :str, owner :str, mode :str = "w"):
self.installation = installation
self.filename = filename
self.owner = owner
self.mode = mode
self.fh = None
def __enter__(self):
def __enter__(self) -> 'InstallationFile':
self.fh = open(self.filename, self.mode)
return self
def __exit__(self, *args):
def __exit__(self, *args :str) -> None:
self.fh.close()
self.installation.chown(self.owner, self.filename)
def write(self, data: Union[str, bytes]):
def write(self, data: Union[str, bytes]) -> int:
return self.fh.write(data)
def read(self, *args):
def read(self, *args) -> Union[str, bytes]:
return self.fh.read(*args)
def poll(self, *args):
return self.fh.poll(*args)
# def poll(self, *args) -> bool:
# return self.fh.poll(*args)
def accessibility_tools_in_use() -> bool:
@ -84,11 +84,12 @@ class Installer:
"""
def __init__(self, target, *, base_packages=None, kernels=None):
def __init__(self, target :str, *, base_packages :Optional[List[str]] = None, kernels :Optional[List[str]] = None):
if base_packages is None:
base_packages = __packages__[:3]
if kernels is None:
kernels = ['linux']
self.kernels = kernels
self.target = target
self.init_time = time.strftime('%Y-%m-%d_%H-%M-%S')
@ -119,18 +120,17 @@ class Installer:
self.HOOKS = ["base", "udev", "autodetect", "keyboard", "keymap", "modconf", "block", "filesystems", "fsck"]
self.KERNEL_PARAMS = []
def log(self, *args, level=logging.DEBUG, **kwargs):
def log(self, *args :str, level :int = logging.DEBUG, **kwargs :str):
"""
installer.log() wraps output.log() mainly to set a default log-level for this install session.
Any manual override can be done per log() call.
"""
log(*args, level=level, **kwargs)
def __enter__(self, *args, **kwargs):
def __enter__(self, *args :str, **kwargs :str) -> 'Installer':
return self
def __exit__(self, *args, **kwargs):
# b''.join(sys_command('sync')) # No need to, since the underlying fs() object will call sync.
def __exit__(self, *args :str, **kwargs :str) -> None:
# TODO: https://stackoverflow.com/questions/28157929/how-to-safely-handle-an-exception-inside-a-context-manager
if len(args) >= 2 and args[1]:
@ -163,10 +163,10 @@ class Installer:
return False
@property
def partitions(self):
def partitions(self) -> List[Partition]:
return get_partitions_in_use(self.target)
def sync_log_to_install_medium(self):
def sync_log_to_install_medium(self) -> bool:
# Copy over the install log (if there is one) to the install medium if
# at least the base has been strapped in, otherwise we won't have a filesystem/structure to copy to.
if self.helper_flags.get('base-strapped', False) is True:
@ -180,7 +180,7 @@ class Installer:
return True
def mount_ordered_layout(self, layouts: dict):
def mount_ordered_layout(self, layouts: Dict[str, Any]) -> None:
from .luks import luks2
mountpoints = {}
@ -254,16 +254,16 @@ class Installer:
except DiskError:
raise DiskError(f"Target {self.target}{mountpoint} never got mounted properly (unable to get mount information using findmnt).")
def mount(self, partition, mountpoint, create_mountpoint=True):
def mount(self, partition :Partition, mountpoint :str, create_mountpoint :bool = True) -> None:
if create_mountpoint and not os.path.isdir(f'{self.target}{mountpoint}'):
os.makedirs(f'{self.target}{mountpoint}')
partition.mount(f'{self.target}{mountpoint}')
def post_install_check(self, *args, **kwargs):
def post_install_check(self, *args :str, **kwargs :str) -> List[bool]:
return [step for step, flag in self.helper_flags.items() if flag is False]
def pacstrap(self, *packages, **kwargs):
def pacstrap(self, *packages :str, **kwargs :str) -> bool:
if type(packages[0]) in (list, tuple):
packages = packages[0]
@ -284,7 +284,7 @@ class Installer:
else:
self.log(f'Could not sync mirrors: {sync_mirrors.exit_code}', level=logging.INFO)
def set_mirrors(self, mirrors):
def set_mirrors(self, mirrors :Mapping[str, Iterator[str]]) -> None:
for plugin in plugins.values():
if hasattr(plugin, 'on_mirrors'):
if result := plugin.on_mirrors(mirrors):
@ -292,7 +292,7 @@ class Installer:
return use_mirrors(mirrors, destination=f'{self.target}/etc/pacman.d/mirrorlist')
def genfstab(self, flags='-pU'):
def genfstab(self, flags :str = '-pU') -> bool:
self.log(f"Updating {self.target}/etc/fstab", level=logging.INFO)
with open(f"{self.target}/etc/fstab", 'a') as fstab_fh:
@ -307,11 +307,11 @@ class Installer:
return True
def set_hostname(self, hostname: str, *args, **kwargs):
def set_hostname(self, hostname: str, *args :str, **kwargs :str) -> None:
with open(f'{self.target}/etc/hostname', 'w') as fh:
fh.write(hostname + '\n')
def set_locale(self, locale, encoding='UTF-8', *args, **kwargs):
def set_locale(self, locale :str, encoding :str = 'UTF-8', *args :str, **kwargs :str) -> bool:
if not len(locale):
return True
@ -322,7 +322,7 @@ class Installer:
return True if SysCommand(f'/usr/bin/arch-chroot {self.target} locale-gen').exit_code == 0 else False
def set_timezone(self, zone, *args, **kwargs):
def set_timezone(self, zone :str, *args :str, **kwargs :str) -> bool:
if not zone:
return True
if not len(zone):
@ -337,6 +337,7 @@ class Installer:
(pathlib.Path(self.target) / "etc" / "localtime").unlink(missing_ok=True)
SysCommand(f'/usr/bin/arch-chroot {self.target} ln -s /usr/share/zoneinfo/{zone} /etc/localtime')
return True
else:
self.log(
f"Time zone {zone} does not exist, continuing with system default.",
@ -344,11 +345,13 @@ class Installer:
fg='red'
)
def activate_ntp(self):
return False
def activate_ntp(self) -> None:
log(f"activate_ntp() is deprecated, use activate_time_syncronization()", fg="yellow", level=logging.INFO)
self.activate_time_syncronization()
def activate_time_syncronization(self):
def activate_time_syncronization(self) -> None:
self.log('Activating systemd-timesyncd for time synchronization using Arch Linux and ntp.org NTP servers.', level=logging.INFO)
self.enable_service('systemd-timesyncd')
@ -361,11 +364,11 @@ class Installer:
with Boot(self) as session:
session.SysCommand(["timedatectl", "set-ntp", 'true'])
def enable_espeakup(self):
def enable_espeakup(self) -> None:
self.log('Enabling espeakup.service for speech synthesis (accessibility).', level=logging.INFO)
self.enable_service('espeakup')
def enable_service(self, *services):
def enable_service(self, *services :str) -> None:
for service in services:
self.log(f'Enabling service {service}', level=logging.INFO)
if (output := self.arch_chroot(f'systemctl enable {service}')).exit_code != 0:
@ -375,19 +378,27 @@ class Installer:
if hasattr(plugin, 'on_service'):
plugin.on_service(service)
def run_command(self, cmd, *args, **kwargs):
def run_command(self, cmd :str, *args :str, **kwargs :str) -> None:
return SysCommand(f'/usr/bin/arch-chroot {self.target} {cmd}')
def arch_chroot(self, cmd, run_as=None):
def arch_chroot(self, cmd :str, run_as :Optional[str] = None):
if run_as:
cmd = f"su - {run_as} -c {shlex.quote(cmd)}"
return self.run_command(cmd)
def drop_to_shell(self):
def drop_to_shell(self) -> None:
subprocess.check_call(f"/usr/bin/arch-chroot {self.target}", shell=True)
def configure_nic(self, nic, dhcp=True, ip=None, gateway=None, dns=None, *args, **kwargs):
def configure_nic(self,
nic :str,
dhcp :bool = True,
ip :Optional[str] = None,
gateway :Optional[str] = None,
dns :Optional[str] = None,
*args :str,
**kwargs :str
) -> None:
from .systemd import Networkd
if dhcp:
@ -412,7 +423,7 @@ class Installer:
with open(f"{self.target}/etc/systemd/network/10-{nic}.network", "a") as netconf:
netconf.write(str(conf))
def copy_iso_network_config(self, enable_services=False):
def copy_iso_network_config(self, enable_services :bool = False) -> bool:
# Copy (if any) iwd password and config files
if os.path.isdir('/var/lib/iwd/'):
if psk_files := glob.glob('/var/lib/iwd/*.psk'):
@ -427,7 +438,7 @@ class Installer:
# This function will be called after minimal_installation()
# as a hook for post-installs. This hook is only needed if
# base is not installed yet.
def post_install_enable_iwd_service(*args, **kwargs):
def post_install_enable_iwd_service(*args :str, **kwargs :str):
self.enable_service('iwd')
self.post_base_install.append(post_install_enable_iwd_service)
@ -452,7 +463,7 @@ class Installer:
# If we haven't installed the base yet (function called pre-maturely)
if self.helper_flags.get('base', False) is False:
def post_install_enable_networkd_resolved(*args, **kwargs):
def post_install_enable_networkd_resolved(*args :str, **kwargs :str):
self.enable_service('systemd-networkd', 'systemd-resolved')
self.post_base_install.append(post_install_enable_networkd_resolved)
@ -462,7 +473,7 @@ class Installer:
return True
def detect_encryption(self, partition):
def detect_encryption(self, partition :Partition) -> bool:
part = Partition(partition.parent, None, autodetect_filesystem=True)
if partition.encrypted:
return partition
@ -471,7 +482,7 @@ class Installer:
return False
def mkinitcpio(self, *flags):
def mkinitcpio(self, *flags :str) -> bool:
for plugin in plugins.values():
if hasattr(plugin, 'on_mkinitcpio'):
# Allow plugins to override the usage of mkinitcpio altogether.
@ -483,9 +494,10 @@ class Installer:
mkinit.write(f"BINARIES=({' '.join(self.BINARIES)})\n")
mkinit.write(f"FILES=({' '.join(self.FILES)})\n")
mkinit.write(f"HOOKS=({' '.join(self.HOOKS)})\n")
SysCommand(f'/usr/bin/arch-chroot {self.target} mkinitcpio {" ".join(flags)}')
def minimal_installation(self):
return SysCommand(f'/usr/bin/arch-chroot {self.target} mkinitcpio {" ".join(flags)}').exit_code == 0
def minimal_installation(self) -> bool:
# Add necessary packages if encrypting the drive
# (encrypted partitions default to btrfs for now, so we need btrfs-progs)
# TODO: Perhaps this should be living in the function which dictates
@ -562,7 +574,7 @@ class Installer:
return True
def setup_swap(self, kind='zram'):
def setup_swap(self, kind :str = 'zram') -> bool:
if kind == 'zram':
self.log(f"Setting up swap on zram")
self.pacstrap('zram-generator')
@ -578,7 +590,18 @@ class Installer:
else:
raise ValueError(f"Archinstall currently only supports setting up swap on zram")
def add_bootloader(self, bootloader='systemd-bootctl'):
def add_bootloader(self, bootloader :str = 'systemd-bootctl') -> bool:
"""
Adds a bootloader to the installation instance.
Archinstall supports one of three types:
* systemd-bootctl
* grub
* efistub (beta)
:param bootloader: Can be one of the three strings
'systemd-bootctl', 'grub' or 'efistub' (beta)
"""
for plugin in plugins.values():
if hasattr(plugin, 'on_add_bootloader'):
# Allow plugins to override the boot-loader handling.
@ -757,10 +780,19 @@ class Installer:
return True
def add_additional_packages(self, *packages):
def add_additional_packages(self, *packages :str) -> bool:
return self.pacstrap(*packages)
def install_profile(self, profile):
def install_profile(self, profile :str) -> ModuleType:
"""
Installs a archinstall profile script (.py file).
This profile can be either local, remote or part of the library.
:param profile: Can be a local path or a remote path (URL)
:return: Returns the imported script as a module, this way
you can access any remaining functions exposed by the profile.
:rtype: module
"""
storage['installation_session'] = self
if type(profile) == str:
@ -769,13 +801,13 @@ class Installer:
self.log(f'Installing network profile {profile}', level=logging.INFO)
return profile.install()
def enable_sudo(self, entity: str, group=False):
def enable_sudo(self, entity: str, group :bool = False) -> bool:
self.log(f'Enabling sudo permissions for {entity}.', level=logging.INFO)
with open(f'{self.target}/etc/sudoers', 'a') as sudoers:
sudoers.write(f'{"%" if group else ""}{entity} ALL=(ALL) ALL\n')
return True
def user_create(self, user: str, password=None, groups=None, sudo=False):
def user_create(self, user :str, password :Optional[str] = None, groups :Optional[str] = None, sudo :bool = False) -> None:
if groups is None:
groups = []
@ -789,7 +821,8 @@ class Installer:
if not handled_by_plugin:
self.log(f'Creating user {user}', level=logging.INFO)
SysCommand(f'/usr/bin/arch-chroot {self.target} useradd -m -G wheel {user}')
if not (output := SysCommand(f'/usr/bin/arch-chroot {self.target} useradd -m -G wheel {user}')).exit_code == 0:
raise SystemError(f"Could not create user inside installation: {output}")
for plugin in plugins.values():
if hasattr(plugin, 'on_user_created'):
@ -806,24 +839,24 @@ class Installer:
if sudo and self.enable_sudo(user):
self.helper_flags['user'] = True
def user_set_pw(self, user, password):
def user_set_pw(self, user :str, password :str) -> bool:
self.log(f'Setting password for {user}', level=logging.INFO)
if user == 'root':
# This means the root account isn't locked/disabled with * in /etc/passwd
self.helper_flags['user'] = True
SysCommand(f"/usr/bin/arch-chroot {self.target} sh -c \"echo '{user}:{password}' | chpasswd\"")
return SysCommand(f"/usr/bin/arch-chroot {self.target} sh -c \"echo '{user}:{password}' | chpasswd\"").exit_code == 0
def user_set_shell(self, user, shell):
def user_set_shell(self, user :str, shell :str) -> bool:
self.log(f'Setting shell for {user} to {shell}', level=logging.INFO)
SysCommand(f"/usr/bin/arch-chroot {self.target} sh -c \"chsh -s {shell} {user}\"")
return SysCommand(f"/usr/bin/arch-chroot {self.target} sh -c \"chsh -s {shell} {user}\"").exit_code == 0
def chown(self, owner, path, options=[]):
return SysCommand(f"/usr/bin/arch-chroot {self.target} sh -c 'chown {' '.join(options)} {owner} {path}")
def chown(self, owner :str, path :str, options :List[str] = []) -> bool:
return SysCommand(f"/usr/bin/arch-chroot {self.target} sh -c 'chown {' '.join(options)} {owner} {path}").exit_code == 0
def create_file(self, filename, owner=None):
def create_file(self, filename :str, owner :Optional[str] = None) -> InstallationFile:
return InstallationFile(self, filename, owner)
def set_keyboard_language(self, language: str) -> bool:

View File

@ -1,41 +1,42 @@
import logging
from typing import Iterator
from .exceptions import ServiceException
from .general import SysCommand
from .output import log
def list_keyboard_languages():
def list_keyboard_languages() -> Iterator[str]:
for line in SysCommand("localectl --no-pager list-keymaps", environment_vars={'SYSTEMD_COLORS': '0'}):
yield line.decode('UTF-8').strip()
def list_x11_keyboard_languages():
def list_x11_keyboard_languages() -> Iterator[str]:
for line in SysCommand("localectl --no-pager list-x11-keymap-layouts", environment_vars={'SYSTEMD_COLORS': '0'}):
yield line.decode('UTF-8').strip()
def verify_keyboard_layout(layout):
def verify_keyboard_layout(layout :str) -> bool:
for language in list_keyboard_languages():
if layout.lower() == language.lower():
return True
return False
def verify_x11_keyboard_layout(layout):
def verify_x11_keyboard_layout(layout :str) -> bool:
for language in list_x11_keyboard_languages():
if layout.lower() == language.lower():
return True
return False
def search_keyboard_layout(layout):
def search_keyboard_layout(layout :str) -> Iterator[str]:
for language in list_keyboard_languages():
if layout.lower() in language.lower():
yield language
def set_keyboard_language(locale):
def set_keyboard_language(locale :str) -> bool:
if len(locale.strip()):
if not verify_keyboard_layout(locale):
log(f"Invalid keyboard locale specified: {locale}", fg="red", level=logging.ERROR)
@ -49,6 +50,6 @@ def set_keyboard_language(locale):
return False
def list_timezones():
def list_timezones() -> Iterator[str]:
for line in SysCommand("timedatectl --no-pager list-timezones", environment_vars={'SYSTEMD_COLORS': '0'}):
yield line.decode('UTF-8').strip()

View File

@ -1,9 +1,15 @@
from __future__ import annotations
import json
import logging
import os
import pathlib
import shlex
import time
from typing import Optional, List,TYPE_CHECKING
# https://stackoverflow.com/a/39757388/929999
if TYPE_CHECKING:
from .installer import Installer
from .disk import Partition, convert_device_to_uuid
from .general import SysCommand, SysCommandWorker
from .output import log
@ -11,7 +17,15 @@ from .exceptions import SysCallError, DiskError
from .storage import storage
class luks2:
def __init__(self, partition, mountpoint, password, key_file=None, auto_unmount=False, *args, **kwargs):
def __init__(self,
partition :Partition,
mountpoint :str,
password :str,
key_file :Optional[str] = None,
auto_unmount :bool = False,
*args :str,
**kwargs :str):
self.password = password
self.partition = partition
self.mountpoint = mountpoint
@ -22,7 +36,7 @@ class luks2:
self.filesystem = 'crypto_LUKS'
self.mapdev = None
def __enter__(self):
def __enter__(self) -> Partition:
if not self.key_file:
self.key_file = f"/tmp/{os.path.basename(self.partition.path)}.disk_pw" # TODO: Make disk-pw-file randomly unique?
@ -34,16 +48,23 @@ class luks2:
return self.unlock(self.partition, self.mountpoint, self.key_file)
def __exit__(self, *args, **kwargs):
def __exit__(self, *args :str, **kwargs :str) -> bool:
# TODO: https://stackoverflow.com/questions/28157929/how-to-safely-handle-an-exception-inside-a-context-manager
if self.auto_unmount:
self.close()
if len(args) >= 2 and args[1]:
raise args[1]
return True
def encrypt(self, partition, password=None, key_size=512, hash_type='sha512', iter_time=10000, key_file=None):
def encrypt(self, partition :Partition,
password :Optional[str] = None,
key_size :int = 512,
hash_type :str = 'sha512',
iter_time :int = 10000,
key_file :Optional[str] = None) -> str:
log(f'Encrypting {partition} (This might take a while)', level=logging.INFO)
if not key_file:
@ -119,7 +140,7 @@ class luks2:
return key_file
def unlock(self, partition, mountpoint, key_file):
def unlock(self, partition :Partition, mountpoint :str, key_file :str) -> Partition:
"""
Mounts a luks2 compatible partition to a certain mountpoint.
Keyfile must be specified as there's no way to interact with the pw-prompt atm.
@ -142,18 +163,18 @@ class luks2:
unlocked_partition = Partition(self.mapdev, None, encrypted=True, filesystem=get_filesystem_type(self.mapdev), autodetect_filesystem=False)
return unlocked_partition
def close(self, mountpoint=None):
def close(self, mountpoint :Optional[str] = None) -> bool:
if not mountpoint:
mountpoint = self.mapdev
SysCommand(f'/usr/bin/cryptsetup close {self.mapdev}')
return os.path.islink(self.mapdev) is False
def format(self, path):
def format(self, path :str) -> None:
if (handle := SysCommand(f"/usr/bin/cryptsetup -q -v luksErase {path}")).exit_code != 0:
raise DiskError(f'Could not format {path} with {self.filesystem} because: {b"".join(handle)}')
def add_key(self, path :pathlib.Path, password :str):
def add_key(self, path :pathlib.Path, password :str) -> bool:
if not path.exists():
raise OSError(2, f"Could not import {path} as a disk encryption key, file is missing.", str(path))
@ -169,7 +190,9 @@ class luks2:
if worker.exit_code != 0:
raise DiskError(f'Could not add encryption key {path} to {self.partition} because: {worker}')
def crypttab(self, installation, key_path :str, options=["luks", "key-slot=1"]):
return True
def crypttab(self, installation :Installer, key_path :str, options :List[str] = ["luks", "key-slot=1"]) -> None:
log(f'Adding a crypttab entry for key {key_path} in {installation}', level=logging.INFO)
with open(f"{installation.target}/etc/crypttab", "a") as crypttab:
crypttab.write(f"{self.mountpoint} UUID={convert_device_to_uuid(self.partition.path)} {key_path} {','.join(options)}\n")

View File

@ -1,7 +1,7 @@
import logging
import urllib.error
import urllib.request
from typing import Union, Mapping, Iterable
from typing import Union, Mapping, Iterable, Dict, Any, List
from .general import SysCommand
from .output import log
@ -51,7 +51,12 @@ def sort_mirrorlist(raw_data :bytes, sort_order=["https", "http"]) -> bytes:
return new_raw_data
def filter_mirrors_by_region(regions, destination='/etc/pacman.d/mirrorlist', sort_order=["https", "http"], *args, **kwargs) -> Union[bool, bytes]:
def filter_mirrors_by_region(regions :str,
destination :str = '/etc/pacman.d/mirrorlist',
sort_order :List[str] = ["https", "http"],
*args :str,
**kwargs :str
) -> Union[bool, bytes]:
"""
This function will change the active mirrors on the live medium by
filtering which regions are active based on `regions`.
@ -75,7 +80,7 @@ def filter_mirrors_by_region(regions, destination='/etc/pacman.d/mirrorlist', so
return new_list.decode('UTF-8')
def add_custom_mirrors(mirrors: list, *args, **kwargs):
def add_custom_mirrors(mirrors: List[str], *args :str, **kwargs :str) -> bool:
"""
This will append custom mirror definitions in pacman.conf
@ -91,7 +96,7 @@ def add_custom_mirrors(mirrors: list, *args, **kwargs):
return True
def insert_mirrors(mirrors, *args, **kwargs):
def insert_mirrors(mirrors :Dict[str, Any], *args :str, **kwargs :str) -> bool:
"""
This function will insert a given mirror-list at the top of `/etc/pacman.d/mirrorlist`.
It will not flush any other mirrors, just insert new ones.
@ -138,7 +143,7 @@ def re_rank_mirrors(
return True
def list_mirrors(sort_order=["https", "http"]):
def list_mirrors(sort_order :List[str] = ["https", "http"]) -> Dict[str, Any]:
url = "https://archlinux.org/mirrorlist/?protocol=https&protocol=http&ip_version=4&ip_version=6&use_mirror_status=on"
regions = {}

View File

@ -2,7 +2,7 @@ import logging
import os
import socket
import struct
from collections import OrderedDict
from typing import Union, Dict, Any, List
from .exceptions import HardwareIncompatibilityError
from .general import SysCommand
@ -10,36 +10,40 @@ from .output import log
from .storage import storage
def get_hw_addr(ifname):
def get_hw_addr(ifname :str) -> str:
import fcntl
s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
info = fcntl.ioctl(s.fileno(), 0x8927, struct.pack('256s', bytes(ifname, 'utf-8')[:15]))
return ':'.join('%02x' % b for b in info[18:24])
def list_interfaces(skip_loopback=True):
interfaces = OrderedDict()
def list_interfaces(skip_loopback :bool = True) -> Dict[str, str]:
interfaces = {}
for index, iface in socket.if_nameindex():
if skip_loopback and iface == "lo":
continue
mac = get_hw_addr(iface).replace(':', '-').lower()
interfaces[mac] = iface
return interfaces
def check_mirror_reachable():
def check_mirror_reachable() -> bool:
log("Testing connectivity to the Arch Linux mirrors ...", level=logging.INFO)
if SysCommand("pacman -Sy").exit_code == 0:
return True
elif os.geteuid() != 0:
log("check_mirror_reachable() uses 'pacman -Sy' which requires root.", level=logging.ERROR, fg="red")
return False
def enrich_iface_types(interfaces: dict):
def enrich_iface_types(interfaces: Union[Dict[str, Any], List[str]]) -> Dict[str, str]:
result = {}
for iface in interfaces:
if os.path.isdir(f"/sys/class/net/{iface}/bridge/"):
result[iface] = 'BRIDGE'
@ -53,19 +57,21 @@ def enrich_iface_types(interfaces: dict):
result[iface] = 'PHYSICAL'
else:
result[iface] = 'UNKNOWN'
return result
def get_interface_from_mac(mac):
def get_interface_from_mac(mac :str) -> str:
return list_interfaces().get(mac.lower(), None)
def wireless_scan(interface):
def wireless_scan(interface :str) -> None:
interfaces = enrich_iface_types(list_interfaces().values())
if interfaces[interface] != 'WIRELESS':
raise HardwareIncompatibilityError(f"Interface {interface} is not a wireless interface: {interfaces}")
SysCommand(f"iwctl station {interface} scan")
if not (output := SysCommand(f"iwctl station {interface} scan")).exit_code == 0:
raise SystemError(f"Could not scan for wireless networks: {output}")
if '_WIFI' not in storage:
storage['_WIFI'] = {}
@ -76,8 +82,9 @@ def wireless_scan(interface):
# TODO: Full WiFi experience might get evolved in the future, pausing for now 2021-01-25
def get_wireless_networks(interface):
def get_wireless_networks(interface :str) -> None:
# TODO: Make this oneliner pritter to check if the interface is scanning or not.
# TODO: Rename this to list_wireless_networks() as it doesn't return anything
if '_WIFI' not in storage or interface not in storage['_WIFI'] or storage['_WIFI'][interface].get('scanning', False) is False:
import time

View File

@ -3,6 +3,7 @@ import ssl
import urllib.error
import urllib.parse
import urllib.request
from typing import Dict, Any
from .exceptions import RequirementError
@ -10,7 +11,7 @@ BASE_URL = 'https://archlinux.org/packages/search/json/?name={package}'
BASE_GROUP_URL = 'https://archlinux.org/groups/x86_64/{group}/'
def find_group(name):
def find_group(name :str) -> bool:
ssl_context = ssl.create_default_context()
ssl_context.check_hostname = False
ssl_context.verify_mode = ssl.CERT_NONE
@ -27,7 +28,7 @@ def find_group(name):
return True
def find_package(name):
def find_package(name :str) -> Any:
"""
Finds a specific package via the package database.
It makes a simple web-request, which might be a bit slow.
@ -40,7 +41,7 @@ def find_package(name):
return json.loads(data)
def find_packages(*names):
def find_packages(*names :str) -> Dict[str, Any]:
"""
This function returns the search results for many packages.
The function itself is rather slow, so consider not sending to
@ -49,7 +50,7 @@ def find_packages(*names):
return {package: find_package(package) for package in names}
def validate_package_list(packages: list):
def validate_package_list(packages: list) -> bool:
"""
Validates a list of given packages.
Raises `RequirementError` if one or more packages are not found.

View File

@ -7,6 +7,7 @@ import pathlib
import urllib.parse
import urllib.request
from importlib import metadata
from typing import ModuleType, Optional, List
from .output import log
from .storage import storage
@ -38,7 +39,7 @@ def localize_path(profile_path :str) -> str:
return profile_path
def import_via_path(path :str, namespace=None): # -> module (not sure how to write that in type definitions)
def import_via_path(path :str, namespace :Optional[str] = None) -> ModuleType:
if not namespace:
namespace = os.path.basename(path)
@ -62,14 +63,14 @@ def import_via_path(path :str, namespace=None): # -> module (not sure how to wri
except:
pass
def find_nth(haystack, needle, n):
def find_nth(haystack :List[str], needle :str, n :int) -> int:
start = haystack.find(needle)
while start >= 0 and n > 1:
start = haystack.find(needle, start + len(needle))
n -= 1
return start
def load_plugin(path :str): # -> module (not sure how to write that in type definitions)
def load_plugin(path :str) -> ModuleType:
parsed_url = urllib.parse.urlparse(path)
# The Profile was not a direct match on a remote URL

View File

@ -1,3 +1,4 @@
from __future__ import annotations
import hashlib
import importlib.util
import json
@ -8,7 +9,10 @@ import sys
import urllib.error
import urllib.parse
import urllib.request
from typing import Optional
from typing import Optional, ModuleType, Dict, Union, TYPE_CHECKING
# https://stackoverflow.com/a/39757388/929999
if TYPE_CHECKING:
from .installer import Installer
from .general import multisplit
from .networking import list_interfaces
@ -16,16 +20,16 @@ from .storage import storage
from .exceptions import ProfileNotFound
def grab_url_data(path):
def grab_url_data(path :str) -> str:
safe_path = path[: path.find(':') + 1] + ''.join([item if item in ('/', '?', '=', '&') else urllib.parse.quote(item) for item in multisplit(path[path.find(':') + 1:], ('/', '?', '=', '&'))])
ssl_context = ssl.create_default_context()
ssl_context.check_hostname = False
ssl_context.verify_mode = ssl.CERT_NONE
response = urllib.request.urlopen(safe_path, context=ssl_context)
return response.read()
return response.read() # bytes?
def is_desktop_profile(profile) -> bool:
def is_desktop_profile(profile :str) -> bool:
if str(profile) == 'Profile(desktop)':
return True
@ -42,8 +46,13 @@ def is_desktop_profile(profile) -> bool:
return False
def list_profiles(filter_irrelevant_macs=True, subpath='', filter_top_level_profiles=False):
def list_profiles(
filter_irrelevant_macs :bool = True,
subpath :str = '',
filter_top_level_profiles :bool = False
) -> Dict[str, Dict[str, Union[str, bool]]]:
# TODO: Grab from github page as well, not just local static files
if filter_irrelevant_macs:
local_macs = list_interfaces()
@ -101,23 +110,27 @@ def list_profiles(filter_irrelevant_macs=True, subpath='', filter_top_level_prof
class Script:
def __init__(self, profile, installer=None):
# profile: https://hvornum.se/something.py
# profile: desktop
# profile: /path/to/profile.py
def __init__(self, profile :str, installer :Optional[Installer] = None):
"""
:param profile: A string representing either a boundled profile, a local python file
or a remote path (URL) to a python script-profile. Three examples:
* profile: https://archlinux.org/some_profile.py
* profile: desktop
* profile: /path/to/profile.py
"""
self.profile = profile
self.installer = installer
self.installer = installer # TODO: Appears not to be used anymore?
self.converted_path = None
self.spec = None
self.examples = None
self.namespace = os.path.splitext(os.path.basename(self.path))[0]
self.original_namespace = self.namespace
def __enter__(self, *args, **kwargs):
def __enter__(self, *args :str, **kwargs :str) -> ModuleType:
self.execute()
return sys.modules[self.namespace]
def __exit__(self, *args, **kwargs):
def __exit__(self, *args :str, **kwargs :str) -> None:
# TODO: https://stackoverflow.com/questions/28157929/how-to-safely-handle-an-exception-inside-a-context-manager
if len(args) >= 2 and args[1]:
raise args[1]
@ -125,7 +138,7 @@ class Script:
if self.original_namespace:
self.namespace = self.original_namespace
def localize_path(self, profile_path):
def localize_path(self, profile_path :str) -> str:
if (url := urllib.parse.urlparse(profile_path)).scheme and url.scheme in ('https', 'http'):
if not self.converted_path:
self.converted_path = f"/tmp/{os.path.basename(self.profile).replace('.py', '')}_{hashlib.md5(os.urandom(12)).hexdigest()}.py"
@ -138,7 +151,7 @@ class Script:
return profile_path
@property
def path(self):
def path(self) -> str:
parsed_url = urllib.parse.urlparse(self.profile)
# The Profile was not a direct match on a remote URL
@ -163,7 +176,7 @@ class Script:
else:
raise ProfileNotFound(f"Cannot handle scheme {parsed_url.scheme}")
def load_instructions(self, namespace=None):
def load_instructions(self, namespace :Optional[str] = None) -> 'Script':
if namespace:
self.namespace = namespace
@ -173,7 +186,7 @@ class Script:
return self
def execute(self):
def execute(self) -> ModuleType:
if self.namespace not in sys.modules or self.spec is None:
self.load_instructions()
@ -183,25 +196,23 @@ class Script:
class Profile(Script):
def __init__(self, installer, path, args=None):
def __init__(self, installer :Installer, path :str):
super(Profile, self).__init__(path, installer)
if args is None:
args = {}
def __dump__(self, *args, **kwargs):
def __dump__(self, *args :str, **kwargs :str) -> Dict[str, str]:
return {'path': self.path}
def __repr__(self, *args, **kwargs):
def __repr__(self, *args :str, **kwargs :str) -> str:
return f'Profile({os.path.basename(self.profile)})'
def install(self):
def install(self) -> ModuleType:
# Before installing, revert any temporary changes to the namespace.
# This ensures that the namespace during installation is the original initiation namespace.
# (For instance awesome instead of aweosme.py or app-awesome.py)
self.namespace = self.original_namespace
return self.execute()
def has_prep_function(self):
def has_prep_function(self) -> bool:
with open(self.path, 'r') as source:
source_data = source.read()
@ -218,7 +229,7 @@ class Profile(Script):
return True
return False
def has_post_install(self):
def has_post_install(self) -> bool:
with open(self.path, 'r') as source:
source_data = source.read()
@ -234,7 +245,7 @@ class Profile(Script):
if hasattr(imported, '_post_install'):
return True
def is_top_level_profile(self):
def is_top_level_profile(self) -> bool:
with open(self.path, 'r') as source:
source_data = source.read()
@ -247,7 +258,7 @@ class Profile(Script):
# since developers like less code - omitting it should assume they want to present it.
return True
def get_profile_description(self):
def get_profile_description(self) -> str:
with open(self.path, 'r') as source:
source_data = source.read()
@ -282,11 +293,11 @@ class Profile(Script):
class Application(Profile):
def __repr__(self, *args, **kwargs):
def __repr__(self, *args :str, **kwargs :str):
return f'Application({os.path.basename(self.profile)})'
@property
def path(self):
def path(self) -> str:
parsed_url = urllib.parse.urlparse(self.profile)
# The Profile was not a direct match on a remote URL
@ -311,7 +322,7 @@ class Application(Profile):
else:
raise ProfileNotFound(f"Application cannot handle scheme {parsed_url.scheme}")
def install(self):
def install(self) -> ModuleType:
# Before installing, revert any temporary changes to the namespace.
# This ensures that the namespace during installation is the original initiation namespace.
# (For instance awesome instead of aweosme.py or app-awesome.py)

View File

@ -2,7 +2,7 @@ import os
from .general import SysCommand
def service_state(service_name: str):
def service_state(service_name: str) -> str:
if os.path.splitext(service_name)[1] != '.service':
service_name += '.service' # Just to be safe

View File

@ -1,5 +1,6 @@
import logging
import time
from typing import Interator
from .exceptions import SysCallError
from .general import SysCommand, SysCommandWorker, locate_binary
from .installer import Installer
@ -8,14 +9,14 @@ from .storage import storage
class Ini:
def __init__(self, *args, **kwargs):
def __init__(self, *args :str, **kwargs :str):
"""
Limited INI handler for now.
Supports multiple keywords through dictionary list items.
"""
self.kwargs = kwargs
def __str__(self):
def __str__(self) -> str:
result = ''
first_row_done = False
for top_level in self.kwargs:
@ -54,7 +55,7 @@ class Boot:
self.session = None
self.ready = False
def __enter__(self):
def __enter__(self) -> 'Boot':
if (existing_session := storage.get('active_boot', None)) and existing_session.instance != self.instance:
raise KeyError("Archinstall only supports booting up one instance, and a active session is already active and it is not this one.")
@ -81,7 +82,7 @@ class Boot:
storage['active_boot'] = self
return self
def __exit__(self, *args, **kwargs):
def __exit__(self, *args :str, **kwargs :str) -> None:
# b''.join(sys_command('sync')) # No need to, since the underlying fs() object will call sync.
# TODO: https://stackoverflow.com/questions/28157929/how-to-safely-handle-an-exception-inside-a-context-manager
@ -98,24 +99,24 @@ class Boot:
else:
raise SysCallError(f"Could not shut down temporary boot of {self.instance}: {shutdown}", exit_code=shutdown.exit_code)
def __iter__(self):
def __iter__(self) -> Interator[str]:
if self.session:
for value in self.session:
yield value
def __contains__(self, key: bytes):
def __contains__(self, key: bytes) -> bool:
if self.session is None:
return False
return key in self.session
def is_alive(self):
def is_alive(self) -> bool:
if self.session is None:
return False
return self.session.is_alive()
def SysCommand(self, cmd: list, *args, **kwargs):
def SysCommand(self, cmd: list, *args, **kwargs) -> SysCommand:
if cmd[0][0] != '/' and cmd[0][:2] != './':
# This check is also done in SysCommand & SysCommandWorker.
# However, that check is done for `machinectl` and not for our chroot command.
@ -125,7 +126,7 @@ class Boot:
return SysCommand(["systemd-run", f"--machine={self.container_name}", "--pty", *cmd], *args, **kwargs)
def SysCommandWorker(self, cmd: list, *args, **kwargs):
def SysCommandWorker(self, cmd: list, *args, **kwargs) -> SysCommandWorker:
if cmd[0][0] != '/' and cmd[0][:2] != './':
cmd[0] = locate_binary(cmd[0])

View File

@ -1,3 +1,4 @@
from __future__ import annotations
import getpass
import ipaddress
import logging
@ -7,6 +8,11 @@ import shutil
import signal
import sys
import time
from typing import List, Any, Optional, Dict, Union, TYPE_CHECKING
# https://stackoverflow.com/a/39757388/929999
if TYPE_CHECKING:
from .disk.partition import Partition
from .disk import BlockDevice, suggest_single_disk_layout, suggest_multi_disk_layout, valid_parted_position, all_disks
from .exceptions import RequirementError, UserError, DiskError
@ -23,20 +29,20 @@ from .mirrors import list_mirrors
# Some return the keys from the options, some the values?
from .. import fs_types
def get_terminal_height():
# TODO: These can be removed after the move to simple_menu.py
def get_terminal_height() -> int:
return shutil.get_terminal_size().lines
def get_terminal_width():
def get_terminal_width() -> int:
return shutil.get_terminal_size().columns
def get_longest_option(options):
def get_longest_option(options :List[Any]) -> int:
return max([len(x) for x in options])
def check_for_correct_username(username):
def check_for_correct_username(username :str) -> bool:
if re.match(r'^[a-z_][a-z0-9_-]*\$?$', username) and len(username) <= 32:
return True
log(
@ -47,14 +53,14 @@ def check_for_correct_username(username):
return False
def do_countdown():
def do_countdown() -> bool:
SIG_TRIGGER = False
def kill_handler(sig, frame):
def kill_handler(sig :int, frame :Any) -> None:
print()
exit(0)
def sig_handler(sig, frame):
def sig_handler(sig :int, frame :Any) -> None:
global SIG_TRIGGER
SIG_TRIGGER = True
signal.signal(signal.SIGINT, kill_handler)
@ -79,12 +85,14 @@ def do_countdown():
sys.stdin.read()
SIG_TRIGGER = False
signal.signal(signal.SIGINT, sig_handler)
print()
signal.signal(signal.SIGINT, original_sigint_handler)
return True
def get_password(prompt="Enter a password: "):
def get_password(prompt :str = "Enter a password: ") -> Optional[str]:
while passwd := getpass.getpass(prompt):
passwd_verification = getpass.getpass(prompt='And one more time for verification: ')
if passwd != passwd_verification:
@ -98,7 +106,7 @@ def get_password(prompt="Enter a password: "):
return None
def print_large_list(options, padding=5, margin_bottom=0, separator=': '):
def print_large_list(options :List[str], padding :int = 5, margin_bottom :int = 0, separator :str = ': ') -> List[int]:
highest_index_number_length = len(str(len(options)))
longest_line = highest_index_number_length + len(separator) + get_longest_option(options) + padding
spaces_without_option = longest_line - (len(separator) + highest_index_number_length)
@ -136,6 +144,7 @@ def select_encrypted_partitions(block_devices :dict, password :str) -> dict:
# Users might want to single out a partition for non-encryption to share between dualboot etc.
# TODO: This can be removed once we have simple_menu everywhere
class MiniCurses:
def __init__(self, width, height):
self.width = width
@ -255,11 +264,11 @@ class MiniCurses:
return response
def ask_for_swap(prompt='Would you like to use swap on zram? (Y/n): ', forced=False):
def ask_for_swap(prompt :str = 'Would you like to use swap on zram? (Y/n): ', forced :bool = False) -> bool:
return True if input(prompt).strip(' ').lower() not in ('n', 'no') else False
def ask_for_superuser_account(prompt='Username for required superuser with sudo privileges: ', forced=False):
def ask_for_superuser_account(prompt :str = 'Username for required superuser with sudo privileges: ', forced :bool = False) -> Dict[str, Dict[str, str]]:
while 1:
new_user = input(prompt).strip(' ')
@ -277,7 +286,7 @@ def ask_for_superuser_account(prompt='Username for required superuser with sudo
return {new_user: {"!password": password}}
def ask_for_additional_users(prompt='Any additional users to install (leave blank for no users): '):
def ask_for_additional_users(prompt :str = 'Any additional users to install (leave blank for no users): ') -> List[Dict[str, Dict[str, str]]]:
users = {}
superusers = {}
@ -297,7 +306,7 @@ def ask_for_additional_users(prompt='Any additional users to install (leave blan
return users, superusers
def ask_for_a_timezone():
def ask_for_a_timezone() -> str:
timezones = list_timezones()
default = 'UTC'
@ -311,7 +320,7 @@ def ask_for_a_timezone():
return selected_tz
def ask_for_bootloader(advanced_options=False) -> str:
def ask_for_bootloader(advanced_options :bool = False) -> str:
bootloader = "systemd-bootctl" if has_uefi() else "grub-install"
if has_uefi():
if not advanced_options:
@ -333,14 +342,14 @@ def ask_for_bootloader(advanced_options=False) -> str:
return bootloader
def ask_for_audio_selection(desktop=True):
def ask_for_audio_selection(desktop :bool = True) -> str:
audio = 'pipewire' if desktop else 'none'
choices = ['pipewire', 'pulseaudio'] if desktop else ['pipewire', 'pulseaudio', 'none']
selected_audio = Menu(f'Choose an audio server or leave blank to use "{audio}"', choices, default_option=audio).run()
return selected_audio
def ask_to_configure_network():
def ask_to_configure_network() -> Dict[str, Any]:
# Optionally configure one network interface.
# while 1:
# {MAC: Ifname}
@ -435,7 +444,7 @@ def ask_for_main_filesystem_format(advanced_options=False):
return Menu('Select which filesystem your main partition should use', options, skip=False).run()
def current_partition_layout(partitions, with_idx=False):
def current_partition_layout(partitions :List[Partition], with_idx :bool = False) -> Dict[str, Any]:
def do_padding(name, max_len):
spaces = abs(len(str(name)) - max_len) + 2
pad_left = int(spaces / 2)
@ -479,7 +488,7 @@ def current_partition_layout(partitions, with_idx=False):
return f'\n\nCurrent partition layout:\n\n{current_layout}'
def select_partition(title, partitions, multiple=False):
def select_partition(title :str, partitions :List[Partition], multiple :bool = False) -> Union[int, List[int], None]:
partition_indexes = list(map(str, range(len(partitions))))
partition = Menu(title, partition_indexes, multi=multiple).run()
@ -491,47 +500,18 @@ def select_partition(title, partitions, multiple=False):
return None
def get_default_partition_layout(block_devices, advanced_options=False):
def get_default_partition_layout(
block_devices :Union[BlockDevice, List[BlockDevice]],
advanced_options :bool = False
) -> Dict[str, Any]:
if len(block_devices) == 1:
return suggest_single_disk_layout(block_devices[0], advanced_options=advanced_options)
else:
return suggest_multi_disk_layout(block_devices, advanced_options=advanced_options)
def manage_new_and_existing_partitions(block_device :BlockDevice) -> dict:
# if has_uefi():
# partition_type = 'gpt'
# else:
# partition_type = 'msdos'
# log(f"Selecting which partitions to re-use on {block_device}...", fg="yellow", level=logging.INFO)
# partitions = generic_multi_select(block_device.partitions.values(), "Select which partitions to re-use (the rest will be left alone): ", sort=True)
# partitions_to_wipe = generic_multi_select(partitions, "Which partitions do you wish to wipe (multiple can be selected): ", sort=True)
# mountpoints = {}
# struct = {
# "partitions" : []
# }
# for partition in partitions:
# mountpoint = input(f"Select a mountpoint (or skip) for {partition}: ").strip()
# part_struct = {}
# if mountpoint:
# part_struct['mountpoint'] = mountpoint
# if mountpoint == '/boot':
# part_struct['boot'] = True
# if has_uefi():
# part_struct['ESP'] = True
# elif mountpoint == '/' and
# if partition.uuid:
# part_struct['PARTUUID'] = partition.uuid
# if partition in partitions_to_wipe:
# part_struct['wipe'] = True
# struct['partitions'].append(part_struct)
# return struct
def manage_new_and_existing_partitions(block_device :BlockDevice) -> Dict[str, Any]:
block_device_struct = {
"partitions": [partition.__dump__() for partition in block_device.partitions.values()]
}
@ -689,7 +669,7 @@ def manage_new_and_existing_partitions(block_device :BlockDevice) -> dict:
return block_device_struct
def select_individual_blockdevice_usage(block_devices: list):
def select_individual_blockdevice_usage(block_devices: list) -> Dict[str, Any]:
result = {}
for device in block_devices:
@ -700,7 +680,7 @@ def select_individual_blockdevice_usage(block_devices: list):
return result
def select_disk_layout(block_devices :list, advanced_options=False):
def select_disk_layout(block_devices :list, advanced_options=False) -> Dict[str, Any]:
modes = [
"Wipe all selected drives and use a best-effort default partition layout",
"Select what to do with each individual drive (followed by partition usage)"
@ -714,7 +694,7 @@ def select_disk_layout(block_devices :list, advanced_options=False):
return select_individual_blockdevice_usage(block_devices)
def select_disk(dict_o_disks):
def select_disk(dict_o_disks :Dict[str, BlockDevice]) -> BlockDevice:
"""
Asks the user to select a harddrive from the `dict_o_disks` selection.
Usually this is combined with :ref:`archinstall.list_drives`.
@ -742,7 +722,7 @@ def select_disk(dict_o_disks):
raise DiskError('select_disk() requires a non-empty dictionary of disks to select from.')
def select_profile():
def select_profile() -> Optional[str]:
"""
# Asks the user to select a profile from the available profiles.
#
@ -770,7 +750,7 @@ def select_profile():
return None
def select_language():
def select_language() -> str:
"""
Asks the user to select a language
Usually this is combined with :ref:`archinstall.list_keyboard_languages`.
@ -788,7 +768,7 @@ def select_language():
return selected_lang
def select_mirror_regions():
def select_mirror_regions() -> Dict[str, Any]:
"""
Asks the user to select a mirror or region
Usually this is combined with :ref:`archinstall.list_mirrors`.
@ -810,7 +790,7 @@ def select_mirror_regions():
return {}
def select_harddrives():
def select_harddrives() -> Optional[str]:
"""
Asks the user to select one or multiple hard drives
@ -832,7 +812,7 @@ def select_harddrives():
return None
def select_driver(options=AVAILABLE_GFX_DRIVERS):
def select_driver(options :Dict[str, Any] = AVAILABLE_GFX_DRIVERS) -> str:
"""
Some what convoluted function, whose job is simple.
Select a graphics driver from a pre-defined set of popular options.
@ -866,7 +846,7 @@ def select_driver(options=AVAILABLE_GFX_DRIVERS):
raise RequirementError("Selecting drivers require a least one profile to be given as an option.")
def select_kernel():
def select_kernel() -> List[str]:
"""
Asks the user to select a kernel for system.

View File

View File

@ -29,3 +29,11 @@ exclude = ["docs/*.html", "docs/_static","docs/*.png","docs/*.psd"]
[tool.flit.metadata.requires-extra]
doc = ["sphinx"]
[tool.mypy]
python_version = "3.10"
exclude = "tests"
[tool.bandit]
targets = ["ourkvm"]
exclude = ["/tests"]