Update `SysCommand()` calls in remaining files (#1707)

This commit is contained in:
Daemon Coder 2023-05-04 04:42:37 -04:00 committed by GitHub
parent adceed22ad
commit fd83f073f3
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 67 additions and 70 deletions

View File

@ -139,19 +139,18 @@ class DeviceHandler(object):
log(f'Failed to read btrfs subvolume information: {err}', level=logging.DEBUG)
return subvol_infos
if result.exit_code == 0:
try:
if decoded := result.decode('utf-8'):
# ID 256 gen 16 top level 5 path @
for line in decoded.splitlines():
# expected output format:
# ID 257 gen 8 top level 5 path @home
name = Path(line.split(' ')[-1])
sub_vol_mountpoint = lsblk_info.btrfs_subvol_info.get(name, None)
subvol_infos.append(_BtrfsSubvolumeInfo(name, sub_vol_mountpoint))
except json.decoder.JSONDecodeError as err:
log(f"Could not decode lsblk JSON: {result}", fg="red", level=logging.ERROR)
raise err
try:
if decoded := result.decode('utf-8'):
# ID 256 gen 16 top level 5 path @
for line in decoded.splitlines():
# expected output format:
# ID 257 gen 8 top level 5 path @home
name = Path(line.split(' ')[-1])
sub_vol_mountpoint = lsblk_info.btrfs_subvol_info.get(name, None)
subvol_infos.append(_BtrfsSubvolumeInfo(name, sub_vol_mountpoint))
except json.decoder.JSONDecodeError as err:
log(f"Could not decode lsblk JSON: {result}", fg="red", level=logging.ERROR)
raise err
if not lsblk_info.mountpoint:
self.umount(dev_path)
@ -206,9 +205,7 @@ class DeviceHandler(object):
log(f'Formatting filesystem: /usr/bin/{command} {options_str} {path}')
try:
if (handle := SysCommand(f"/usr/bin/{command} {options_str} {path}")).exit_code != 0:
mkfs_error = handle.decode()
raise DiskError(f'Could not format {path} with {fs_type.value}: {mkfs_error}')
SysCommand(f"/usr/bin/{command} {options_str} {path}")
except SysCallError as error:
msg = f'Could not format {path} with {fs_type.value}: {error.message}'
log(msg, fg='red')
@ -408,12 +405,16 @@ class DeviceHandler(object):
SysCommand(f"btrfs subvolume create {subvol_path}")
if sub_vol.nodatacow:
if (result := SysCommand(f'chattr +C {subvol_path}')).exit_code != 0:
raise DiskError(f'Could not set nodatacow attribute at {subvol_path}: {result.decode()}')
try:
SysCommand(f'chattr +C {subvol_path}')
except SysCallError as error:
raise DiskError(f'Could not set nodatacow attribute at {subvol_path}: {error}')
if sub_vol.compress:
if (result := SysCommand(f'chattr +c {subvol_path}')).exit_code != 0:
raise DiskError(f'Could not set compress attribute at {subvol_path}: {result}')
try:
SysCommand(f'chattr +c {subvol_path}')
except SysCallError as error:
raise DiskError(f'Could not set compress attribute at {subvol_path}: {error}')
if luks_handler is not None and luks_handler.mapper_dev is not None:
self.umount(luks_handler.mapper_dev)
@ -518,9 +519,7 @@ class DeviceHandler(object):
log(f'Mounting {dev_path}: command', level=logging.DEBUG)
try:
result = SysCommand(command)
if result.exit_code != 0:
raise DiskError(f'Could not mount {dev_path}: {command}\n{result.decode()}')
SysCommand(command)
except SysCallError as err:
raise DiskError(f'Could not mount {dev_path}: {command}\n{err.message}')
@ -575,10 +574,7 @@ class DeviceHandler(object):
try:
log(f'Calling partprobe: {command}', level=logging.DEBUG)
result = SysCommand(command)
if result.exit_code != 0:
log(f'"{command}" returned a failure: {result.decode()}', level=logging.DEBUG)
SysCommand(command)
except SysCallError as error:
log(f'"{command}" failed to run: {error}', level=logging.DEBUG)

View File

@ -1,7 +1,7 @@
import logging
from typing import Iterator, List, Callable, Optional
from .exceptions import ServiceException
from .exceptions import ServiceException, SysCallError
from .general import SysCommand
from .output import log
from .storage import storage
@ -161,8 +161,10 @@ def set_keyboard_language(locale :str) -> bool:
log(f"Invalid keyboard locale specified: {locale}", fg="red", level=logging.ERROR)
return False
if (output := SysCommand(f'localectl set-keymap {locale}')).exit_code != 0:
raise ServiceException(f"Unable to set locale '{locale}' for console: {output}")
try:
SysCommand(f'localectl set-keymap {locale}')
except SysCallError as error:
raise ServiceException(f"Unable to set locale '{locale}' for console: {error}")
return True

View File

@ -88,30 +88,28 @@ class Luks2:
'luksFormat', str(self.luks_dev_path),
])
try:
# Retry formatting the volume because archinstall can some times be too quick
# which generates a "Device /dev/sdX does not exist or access denied." between
# setting up partitions and us trying to encrypt it.
cmd_handle = None
for i in range(storage['DISK_RETRY_ATTEMPTS']):
if (cmd_handle := SysCommand(cryptsetup_args)).exit_code != 0:
time.sleep(storage['DISK_TIMEOUTS'])
else:
break
if cmd_handle is not None and cmd_handle.exit_code != 0:
output = str(b''.join(cmd_handle))
raise DiskError(f'Could not encrypt volume "{self.luks_dev_path}": {output}')
except SysCallError as err:
if err.exit_code == 1:
log(f'luks2 partition currently in use: {self.luks_dev_path}')
log('Attempting to unmount, crypt-close and trying encryption again')
self.lock()
# Then try again to set up the crypt-device
# Retry formatting the volume because archinstall can some times be too quick
# which generates a "Device /dev/sdX does not exist or access denied." between
# setting up partitions and us trying to encrypt it.
for retry_attempt in range(storage['DISK_RETRY_ATTEMPTS']):
try:
SysCommand(cryptsetup_args)
else:
raise err
break
except SysCallError as error:
time.sleep(storage['DISK_TIMEOUTS'])
if retry_attempt != storage['DISK_RETRY_ATTEMPTS'] - 1:
continue
if error.exit_code == 1:
log(f'luks2 partition currently in use: {self.luks_dev_path}')
log('Attempting to unmount, crypt-close and trying encryption again')
self.lock()
# Then try again to set up the crypt-device
SysCommand(cryptsetup_args)
else:
raise DiskError(f'Could not encrypt volume "{self.luks_dev_path}": {error}')
return key_file
@ -119,11 +117,7 @@ class Luks2:
command = f'/usr/bin/cryptsetup luksUUID {self.luks_dev_path}'
try:
result = SysCommand(command)
if result.exit_code != 0:
raise DiskError(f'Unable to get UUID for Luks device: {result.decode()}')
return result.decode() # type: ignore
return SysCommand(command).decode().strip() # type: ignore
except SysCallError as err:
log(f'Unable to get UUID for Luks device: {self.luks_dev_path}', level=logging.INFO)
raise err

View File

@ -7,6 +7,7 @@ from dataclasses import dataclass
from .general import SysCommand
from .output import log
from .exceptions import SysCallError
from .storage import storage
@ -148,8 +149,9 @@ def re_rank_mirrors(
src: str = '/etc/pacman.d/mirrorlist',
dst: str = '/etc/pacman.d/mirrorlist',
) -> bool:
cmd = SysCommand(f"/usr/bin/rankmirrors -n {top} {src}")
if cmd.exit_code != 0:
try:
cmd = SysCommand(f"/usr/bin/rankmirrors -n {top} {src}")
except SysCallError:
return False
with open(dst, 'w') as f:
f.write(str(cmd))

View File

@ -38,11 +38,11 @@ def list_interfaces(skip_loopback :bool = True) -> Dict[str, str]:
def check_mirror_reachable() -> bool:
log("Testing connectivity to the Arch Linux mirrors ...", level=logging.INFO)
try:
if run_pacman("-Sy").exit_code == 0:
return True
elif os.geteuid() != 0:
log("check_mirror_reachable() uses 'pacman -Sy' which requires root.", level=logging.ERROR, fg="red")
run_pacman("-Sy")
return True
except SysCallError as err:
if os.geteuid() != 0:
log("check_mirror_reachable() uses 'pacman -Sy' which requires root.", level=logging.ERROR, fg="red")
log(f'exit_code: {err.exit_code}, Error: {err.message}', level=logging.DEBUG)
return False
@ -50,11 +50,12 @@ def check_mirror_reachable() -> bool:
def update_keyring() -> bool:
log("Updating archlinux-keyring ...", level=logging.INFO)
if run_pacman("-Sy --noconfirm archlinux-keyring").exit_code == 0:
try:
run_pacman("-Sy --noconfirm archlinux-keyring")
return True
elif os.geteuid() != 0:
log("update_keyring() uses 'pacman -Sy archlinux-keyring' which requires root.", level=logging.ERROR, fg="red")
except SysCallError:
if os.geteuid() != 0:
log("update_keyring() uses 'pacman -Sy archlinux-keyring' which requires root.", level=logging.ERROR, fg="red")
return False
@ -84,8 +85,10 @@ def wireless_scan(interface :str) -> None:
if interfaces[interface] != 'WIRELESS':
raise HardwareIncompatibilityError(f"Interface {interface} is not a wireless interface: {interfaces}")
if not (output := SysCommand(f"iwctl station {interface} scan")).exit_code == 0:
raise SystemError(f"Could not scan for wireless networks: {output}")
try:
SysCommand(f"iwctl station {interface} scan")
except SysCallError as error:
raise SystemError(f"Could not scan for wireless networks: {error}")
if '_WIFI' not in storage:
storage['_WIFI'] = {}