Simplify SysCommand decoding (#2121)

This commit is contained in:
Daniel Girtler 2023-10-02 21:01:23 +11:00 committed by GitHub
parent a095e393d8
commit 5c903df55f
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
11 changed files with 62 additions and 109 deletions

View File

@ -154,20 +154,19 @@ class DeviceHandler(object):
mountpoint = Path(common_prefix)
try:
result = SysCommand(f'btrfs subvolume list {mountpoint}')
result = SysCommand(f'btrfs subvolume list {mountpoint}').decode()
except SysCallError as err:
debug(f'Failed to read btrfs subvolume information: {err}')
return subvol_infos
try:
if decoded := result.decode('utf-8'):
# ID 256 gen 16 top level 5 path @
for line in decoded.splitlines():
# expected output format:
# ID 257 gen 8 top level 5 path @home
name = Path(line.split(' ')[-1])
sub_vol_mountpoint = lsblk_info.btrfs_subvol_info.get(name, None)
subvol_infos.append(_BtrfsSubvolumeInfo(name, sub_vol_mountpoint))
# ID 256 gen 16 top level 5 path @
for line in result.splitlines():
# expected output format:
# ID 257 gen 8 top level 5 path @home
name = Path(line.split(' ')[-1])
sub_vol_mountpoint = lsblk_info.btrfs_subvol_info.get(name, None)
subvol_infos.append(_BtrfsSubvolumeInfo(name, sub_vol_mountpoint))
except json.decoder.JSONDecodeError as err:
error(f"Could not decode lsblk JSON: {result}")
raise err

View File

@ -1111,12 +1111,12 @@ def _fetch_lsblk_info(dev_path: Optional[Union[Path, str]] = None, retry: int =
for retry_attempt in range(retry):
try:
result = SysCommand(f'lsblk --json -b -o+{lsblk_fields} {dev_path}')
result = SysCommand(f'lsblk --json -b -o+{lsblk_fields} {dev_path}').decode()
break
except SysCallError as err:
# Get the output minus the message/info from lsblk if it returns a non-zero exit code.
if err.worker:
err_str = err.worker.decode('UTF-8')
err_str = err.worker.decode()
debug(f'Error calling lsblk: {err_str}')
else:
raise err
@ -1127,10 +1127,9 @@ def _fetch_lsblk_info(dev_path: Optional[Union[Path, str]] = None, retry: int =
time.sleep(1)
try:
if decoded := result.decode('utf-8'):
block_devices = json.loads(decoded)
blockdevices = block_devices['blockdevices']
return [LsblkInfo.from_json(device) for device in blockdevices]
block_devices = json.loads(result)
blockdevices = block_devices['blockdevices']
return [LsblkInfo.from_json(device) for device in blockdevices]
except json.decoder.JSONDecodeError as err:
error(f"Could not decode lsblk JSON: {result}")
raise err

View File

@ -2,7 +2,7 @@ from __future__ import annotations
import getpass
from pathlib import Path
from typing import List, Optional
from typing import List
from .device_model import PartitionModification, Fido2Device
from ..general import SysCommand, SysCommandWorker, clear_vt100_escape_codes
@ -38,14 +38,11 @@ class Fido2:
# down moving the cursor in the menu
if not cls._loaded or reload:
try:
ret: Optional[str] = SysCommand("systemd-cryptenroll --fido2-device=list").decode('UTF-8')
ret = SysCommand("systemd-cryptenroll --fido2-device=list").decode()
except SysCallError:
error('fido2 support is most likely not installed')
raise ValueError('HSM devices can not be detected, is libfido2 installed?')
if not ret:
return []
fido_devices: str = clear_vt100_escape_codes(ret) # type: ignore
manufacturer_pos = 0

View File

@ -430,10 +430,15 @@ class SysCommand:
return True
def decode(self, *args, **kwargs) -> Optional[str]:
if self.session:
return self.session._trace_log.decode(*args, **kwargs)
return None
def decode(self, encoding: str = 'utf-8', errors='backslashreplace', strip: bool = True) -> str:
if not self.session:
raise ValueError('No session available to decode')
val = self.session._trace_log.decode(encoding, errors=errors)
if strip:
return val.strip()
return val
@property
def exit_code(self) -> Optional[int]:

View File

@ -131,7 +131,7 @@ class Installer:
We need to wait for it before we continue since we opted in to use a custom mirror/region.
"""
info('Waiting for time sync (systemd-timesyncd.service) to complete.')
while SysCommand('timedatectl show --property=NTPSynchronized --value').decode().rstrip() != 'yes':
while SysCommand('timedatectl show --property=NTPSynchronized --value').decode() != 'yes':
time.sleep(1)
info('Waiting for automatic mirror selection (reflector) to complete.')
@ -282,7 +282,7 @@ class Installer:
if enable_resume:
resume_uuid = SysCommand(f'findmnt -no UUID -T {self.target}{file}').decode('UTF-8').strip()
resume_offset = SysCommand(f'/usr/bin/filefrag -v {self.target}{file}').decode('UTF-8').split('0:', 1)[1].split(":", 1)[1].split("..", 1)[0].strip()
resume_offset = SysCommand(f'/usr/bin/filefrag -v {self.target}{file}').decode().split('0:', 1)[1].split(":", 1)[1].split("..", 1)[0].strip()
self._hooks.append('resume')
self._kernel_params.append(f'resume=UUID={resume_uuid}')
@ -312,9 +312,6 @@ class Installer:
except SysCallError as err:
raise RequirementError(f'Could not generate fstab, strapping in packages most likely failed (disk out of space?)\n Error: {err}')
if not gen_fstab:
raise RequirementError(f'Generating fstab returned empty value')
with open(fstab_path, 'a') as fp:
fp.write(gen_fstab)
@ -1318,17 +1315,21 @@ TIMEOUT=5
if os.path.splitext(service_name)[1] not in ('.service', '.target', '.timer'):
service_name += '.service' # Just to be safe
last_execution_time = b''.join(SysCommand(f"systemctl show --property=ActiveEnterTimestamp --no-pager {service_name}", environment_vars={'SYSTEMD_COLORS': '0'}))
last_execution_time = last_execution_time.lstrip(b'ActiveEnterTimestamp=').strip()
last_execution_time = SysCommand(
f"systemctl show --property=ActiveEnterTimestamp --no-pager {service_name}",
environment_vars={'SYSTEMD_COLORS': '0'}
).decode().lstrip('ActiveEnterTimestamp=')
if not last_execution_time:
return None
return last_execution_time.decode('UTF-8')
return last_execution_time
def _service_state(self, service_name: str) -> str:
if os.path.splitext(service_name)[1] not in ('.service', '.target', '.timer'):
service_name += '.service' # Just to be safe
state = b''.join(SysCommand(f'systemctl show --no-pager -p SubState --value {service_name}', environment_vars={'SYSTEMD_COLORS': '0'}))
return state.strip().decode('UTF-8')
return SysCommand(
f'systemctl show --no-pager -p SubState --value {service_name}',
environment_vars={'SYSTEMD_COLORS': '0'}
).decode()

View File

@ -1,61 +0,0 @@
from itertools import takewhile
from pathlib import Path
from typing import Iterator, List
from .exceptions import ServiceException, SysCallError
from .general import SysCommand
from .output import error
def list_keyboard_languages() -> Iterator[str]:
for line in SysCommand("localectl --no-pager list-keymaps", environment_vars={'SYSTEMD_COLORS': '0'}):
yield line.decode('UTF-8').strip()
def list_locales() -> List[str]:
entries = Path('/etc/locale.gen').read_text().splitlines()
# Before the list of locales begins there's an empty line with a '#' in front
# so we'll collect the locales from bottom up and halt when we're done.
locales = list(takewhile(bool, map(lambda entry: entry.strip('\n\t #'), reversed(entries))))
locales.reverse()
return locales
def list_x11_keyboard_languages() -> Iterator[str]:
for line in SysCommand("localectl --no-pager list-x11-keymap-layouts", environment_vars={'SYSTEMD_COLORS': '0'}):
yield line.decode('UTF-8').strip()
def verify_keyboard_layout(layout :str) -> bool:
for language in list_keyboard_languages():
if layout.lower() == language.lower():
return True
return False
def verify_x11_keyboard_layout(layout :str) -> bool:
for language in list_x11_keyboard_languages():
if layout.lower() == language.lower():
return True
return False
def set_keyboard_language(locale :str) -> bool:
if len(locale.strip()):
if not verify_keyboard_layout(locale):
error(f"Invalid keyboard locale specified: {locale}")
return False
try:
SysCommand(f'localectl set-keymap {locale}')
except SysCallError as err:
raise ServiceException(f"Unable to set locale '{locale}' for console: {err}")
return True
return False
def list_timezones() -> Iterator[str]:
for line in SysCommand("timedatectl --no-pager list-timezones", environment_vars={'SYSTEMD_COLORS': '0'}):
yield line.decode('UTF-8').strip()

View File

@ -1,6 +1,10 @@
from .locale_menu import LocaleConfiguration
from .locale import (
list_keyboard_languages, list_locales, list_x11_keyboard_languages,
verify_keyboard_layout, verify_x11_keyboard_layout, set_kb_layout,
list_timezones
from .utils import (
list_keyboard_languages,
list_locales,
list_x11_keyboard_languages,
verify_keyboard_layout,
verify_x11_keyboard_layout,
list_timezones,
set_kb_layout
)

View File

@ -1,7 +1,7 @@
from dataclasses import dataclass
from typing import Dict, Any, TYPE_CHECKING, Optional
from .locale import set_kb_layout, list_keyboard_languages, list_locales
from .utils import list_keyboard_languages, list_locales, set_kb_layout
from ..menu import Selector, AbstractSubMenu, MenuSelectionType, Menu
if TYPE_CHECKING:

View File

@ -6,8 +6,11 @@ from ..output import error
def list_keyboard_languages() -> Iterator[str]:
for line in SysCommand("localectl --no-pager list-keymaps", environment_vars={'SYSTEMD_COLORS': '0'}):
yield line.decode('UTF-8').strip()
for line in SysCommand(
"localectl --no-pager list-keymaps",
environment_vars={'SYSTEMD_COLORS': '0'}
).decode():
yield line
def list_locales() -> List[str]:
@ -22,8 +25,11 @@ def list_locales() -> List[str]:
def list_x11_keyboard_languages() -> Iterator[str]:
for line in SysCommand("localectl --no-pager list-x11-keymap-layouts", environment_vars={'SYSTEMD_COLORS': '0'}):
yield line.decode('UTF-8').strip()
for line in SysCommand(
"localectl --no-pager list-x11-keymap-layouts",
environment_vars={'SYSTEMD_COLORS': '0'}
).decode():
yield line
def verify_keyboard_layout(layout :str) -> bool:
@ -57,5 +63,8 @@ def set_kb_layout(locale :str) -> bool:
def list_timezones() -> Iterator[str]:
for line in SysCommand("timedatectl --no-pager list-timezones", environment_vars={'SYSTEMD_COLORS': '0'}):
yield line.decode('UTF-8').strip()
for line in SysCommand(
"timedatectl --no-pager list-timezones",
environment_vars={'SYSTEMD_COLORS': '0'}
).decode():
yield line

View File

@ -116,7 +116,7 @@ class Luks2:
command = f'/usr/bin/cryptsetup luksUUID {self.luks_dev_path}'
try:
return SysCommand(command).decode().strip() # type: ignore
return SysCommand(command).decode()
except SysCallError as err:
info(f'Unable to get UUID for Luks device: {self.luks_dev_path}')
raise err

View File

@ -37,7 +37,7 @@ def group_search(name :str) -> List[PackageSearchResult]:
raise err
# Just to be sure some code didn't slip through the exception
data = response.read().decode('UTF-8')
data = response.read().decode('utf-8')
return [PackageSearchResult(**package) for package in json.loads(data)['results']]