Moving away from custom log levels, to something that's well defined. (#360)

* Moving away from custom log levels, to something that's well defined.

* Added backward compability to log() as well.

* Added an option to force log messages out on screen even if the level is below the log level threashold.

* Added force log messages when wrong notation is used.

* Added some more length to the deprecated message

* Swapped all log levels to use logging.<level> instead.

Co-authored-by: Anton Hvornum <anton.feeds@gmail.com>
This commit is contained in:
Anton Hvornum 2021-04-27 14:43:17 +00:00 committed by GitHub
parent a29eea26db
commit 090b98b830
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
9 changed files with 139 additions and 104 deletions

View File

@ -1,9 +1,9 @@
import glob, re, os, json, time, hashlib
import pathlib, traceback
import pathlib, traceback, logging
from collections import OrderedDict
from .exceptions import DiskError
from .general import *
from .output import log, LOG_LEVELS
from .output import log
from .storage import storage
from .hardware import hasUEFI
@ -88,7 +88,7 @@ class BlockDevice():
raise DiskError(f'A crypt device ({self.path}) without a parent kernel device name.')
return f"/dev/{self.info['pkname']}"
else:
log(f"Unknown blockdevice type for {self.path}: {self.info['type']}", level=LOG_LEVELS.Debug)
log(f"Unknown blockdevice type for {self.path}: {self.info['type']}", level=logging.DEBUG)
# if not stat.S_ISBLK(os.stat(full_path).st_mode):
# raise DiskError(f'Selected disk "{full_path}" is not a block device.')
@ -129,7 +129,7 @@ class BlockDevice():
@property
def uuid(self):
log(f'BlockDevice().uuid is untested!', level=LOG_LEVELS.Warning, fg='yellow')
log(f'BlockDevice().uuid is untested!', level=logging.WARNING, fg='yellow')
"""
Returns the disk UUID as returned by lsblk.
This is more reliable than relying on /dev/disk/by-partuuid as
@ -222,8 +222,8 @@ class Partition():
@encrypted.setter
def encrypted(self, value :bool):
if value:
log(f'Marking {self} as encrypted: {value}', level=LOG_LEVELS.Debug)
log(f"Callstrack when marking the partition: {''.join(traceback.format_stack())}", level=LOG_LEVELS.Debug)
log(f'Marking {self} as encrypted: {value}', level=logging.DEBUG)
log(f"Callstrack when marking the partition: {''.join(traceback.format_stack())}", level=logging.DEBUG)
self._encrypted = value
@ -240,7 +240,7 @@ class Partition():
return self.path
def detect_inner_filesystem(self, password):
log(f'Trying to detect inner filesystem format on {self} (This might take a while)', level=LOG_LEVELS.Info)
log(f'Trying to detect inner filesystem format on {self} (This might take a while)', level=logging.INFO)
from .luks import luks2
try:
@ -269,16 +269,16 @@ class Partition():
def safe_to_format(self):
if self.allow_formatting is False:
log(f"Partition {self} is not marked for formatting.", level=LOG_LEVELS.Debug)
log(f"Partition {self} is not marked for formatting.", level=logging.DEBUG)
return False
elif self.target_mountpoint == '/boot':
try:
if self.has_content():
log(f"Partition {self} is a boot partition and has content inside.", level=LOG_LEVELS.Debug)
log(f"Partition {self} is a boot partition and has content inside.", level=logging.DEBUG)
return False
except SysCallError as err:
log(err.message, LOG_LEVELS.Debug)
log(f"Partition {self} was identified as /boot but we could not mount to check for content, continuing!", level=LOG_LEVELS.Debug)
log(err.message, logging.DEBUG)
log(f"Partition {self} was identified as /boot but we could not mount to check for content, continuing!", level=logging.DEBUG)
pass
return True
@ -293,7 +293,7 @@ class Partition():
raise DiskError(f"Attempting to encrypt a partition that was not marked for encryption: {self}")
if not self.safe_to_format():
log(f"Partition {self} was marked as protected but encrypt() was called on it!", level=LOG_LEVELS.Error, fg="red")
log(f"Partition {self} was marked as protected but encrypt() was called on it!", level=logging.ERROR, fg="red")
return False
handle = luks2(self, None, None)
@ -321,7 +321,7 @@ class Partition():
raise PermissionError(f"{self} is not formatable either because instance is locked ({self.allow_formatting}) or a blocking flag was given ({allow_formatting})")
if log_formatting:
log(f'Formatting {path} -> {filesystem}', level=LOG_LEVELS.Info)
log(f'Formatting {path} -> {filesystem}', level=logging.INFO)
if filesystem == 'btrfs':
o = b''.join(sys_command(f'/usr/bin/mkfs.btrfs -f {path}'))
@ -376,7 +376,7 @@ class Partition():
def mount(self, target, fs=None, options=''):
if not self.mountpoint:
log(f'Mounting {self} to {target}', level=LOG_LEVELS.Info)
log(f'Mounting {self} to {target}', level=logging.INFO)
if not fs:
if not self.filesystem: raise DiskError(f'Need to format (or define) the filesystem on {self} before mounting.')
fs = self.filesystem
@ -434,7 +434,7 @@ class Filesystem():
def __enter__(self, *args, **kwargs):
if self.blockdevice.keep_partitions is False:
log(f'Wiping {self.blockdevice} by using partition format {self.mode}', level=LOG_LEVELS.Debug)
log(f'Wiping {self.blockdevice} by using partition format {self.mode}', level=logging.DEBUG)
if self.mode == GPT:
if self.raw_parted(f'{self.blockdevice.device} mklabel gpt').exit_code == 0:
self.blockdevice.flush_cache()
@ -451,7 +451,7 @@ class Filesystem():
# TODO: partition_table_type is hardcoded to GPT at the moment. This has to be changed.
elif self.mode == self.blockdevice.partition_table_type:
log(f'Kept partition format {self.mode} for {self.blockdevice}', level=LOG_LEVELS.Debug)
log(f'Kept partition format {self.mode} for {self.blockdevice}', level=logging.DEBUG)
else:
raise DiskError(f'The selected partition table format {self.mode} does not match that of {self.blockdevice}.')
@ -474,7 +474,7 @@ class Filesystem():
def raw_parted(self, string:str):
x = sys_command(f'/usr/bin/parted -s {string}')
log(f"'parted -s {string}' returned: {b''.join(x)}", level=LOG_LEVELS.Debug)
log(f"'parted -s {string}' returned: {b''.join(x)}", level=logging.DEBUG)
return x
def parted(self, string:str):
@ -487,7 +487,7 @@ class Filesystem():
return self.raw_parted(string).exit_code
def use_entire_disk(self, root_filesystem_type='ext4'):
log(f"Using and formatting the entire {self.blockdevice}.", level=LOG_LEVELS.Debug)
log(f"Using and formatting the entire {self.blockdevice}.", level=logging.DEBUG)
if hasUEFI():
self.add_partition('primary', start='1MiB', end='513MiB', format='fat32')
self.set_name(0, 'EFI')
@ -499,7 +499,7 @@ class Filesystem():
self.blockdevice.partition[0].filesystem = 'vfat'
self.blockdevice.partition[1].filesystem = root_filesystem_type
log(f"Set the root partition {self.blockdevice.partition[1]} to use filesystem {root_filesystem_type}.", level=LOG_LEVELS.Debug)
log(f"Set the root partition {self.blockdevice.partition[1]} to use filesystem {root_filesystem_type}.", level=logging.DEBUG)
self.blockdevice.partition[0].target_mountpoint = '/boot'
self.blockdevice.partition[1].target_mountpoint = '/'
@ -510,12 +510,12 @@ class Filesystem():
#we don't need a seprate boot partition it would be a waste of space
self.add_partition('primary', start='1MB', end='100%')
self.blockdevice.partition[0].filesystem=root_filesystem_type
log(f"Set the root partition {self.blockdevice.partition[0]} to use filesystem {root_filesystem_type}.", level=LOG_LEVELS.Debug)
log(f"Set the root partition {self.blockdevice.partition[0]} to use filesystem {root_filesystem_type}.", level=logging.DEBUG)
self.blockdevice.partition[0].target_mountpoint = '/'
self.blockdevice.partition[0].allow_formatting = True
def add_partition(self, type, start, end, format=None):
log(f'Adding partition to {self.blockdevice}', level=LOG_LEVELS.Info)
log(f'Adding partition to {self.blockdevice}', level=logging.INFO)
previous_partitions = self.blockdevice.partitions
if self.mode == MBR:

View File

@ -1,10 +1,10 @@
import os, json, hashlib, shlex, sys
import time, pty
import time, pty, logging
from datetime import datetime, date
from subprocess import Popen, STDOUT, PIPE, check_output
from select import epoll, EPOLLIN, EPOLLHUP
from .exceptions import *
from .output import log, LOG_LEVELS
from .output import log
def gen_uid(entropy_length=256):
return hashlib.sha512(os.urandom(entropy_length)).hexdigest()
@ -84,7 +84,7 @@ class sys_command():#Thread):
self.log = kwargs.get('log', log)
if kwargs['emulate']:
self.log(f"Starting command '{cmd}' in emulation mode.", level=LOG_LEVELS.Debug)
self.log(f"Starting command '{cmd}' in emulation mode.", level=logging.DEBUG)
if type(cmd) is list:
# if we get a list of arguments
@ -204,7 +204,7 @@ class sys_command():#Thread):
os.execve(self.cmd[0], self.cmd, {**os.environ, **self.environment_vars})
except FileNotFoundError:
self.status = 'done'
self.log(f"{self.cmd[0]} does not exist.", level=LOG_LEVELS.Debug)
self.log(f"{self.cmd[0]} does not exist.", level=logging.DEBUG)
self.exit_code = 1
return False
@ -214,8 +214,8 @@ class sys_command():#Thread):
poller.register(child_fd, EPOLLIN | EPOLLHUP)
if 'events' in self.kwargs and 'debug' in self.kwargs:
self.log(f'[D] Using triggers for command: {self.cmd}', level=LOG_LEVELS.Debug)
self.log(json.dumps(self.kwargs['events']), level=LOG_LEVELS.Debug)
self.log(f'[D] Using triggers for command: {self.cmd}', level=logging.DEBUG)
self.log(json.dumps(self.kwargs['events']), level=logging.DEBUG)
alive = True
last_trigger_pos = 0
@ -230,7 +230,7 @@ class sys_command():#Thread):
break
if 'debug' in self.kwargs and self.kwargs['debug'] and len(output):
self.log(self.cmd, 'gave:', output.decode('UTF-8'), level=LOG_LEVELS.Debug)
self.log(self.cmd, 'gave:', output.decode('UTF-8'), level=logging.DEBUG)
if 'on_output' in self.kwargs:
self.kwargs['on_output'](self.kwargs['worker'], output)
@ -251,8 +251,8 @@ class sys_command():#Thread):
trigger_pos = self.trace_log[last_trigger_pos:].lower().find(trigger.lower())
if 'debug' in self.kwargs and self.kwargs['debug']:
self.log(f"Writing to subprocess {self.cmd[0]}: {self.kwargs['events'][trigger].decode('UTF-8')}", level=LOG_LEVELS.Debug)
self.log(f"Writing to subprocess {self.cmd[0]}: {self.kwargs['events'][trigger].decode('UTF-8')}", level=LOG_LEVELS.Debug)
self.log(f"Writing to subprocess {self.cmd[0]}: {self.kwargs['events'][trigger].decode('UTF-8')}", level=logging.DEBUG)
self.log(f"Writing to subprocess {self.cmd[0]}: {self.kwargs['events'][trigger].decode('UTF-8')}", level=logging.DEBUG)
last_trigger_pos = trigger_pos
os.write(child_fd, self.kwargs['events'][trigger])
@ -266,18 +266,18 @@ class sys_command():#Thread):
## Adding a exit trigger:
if len(self.kwargs['events']) == 0:
if 'debug' in self.kwargs and self.kwargs['debug']:
self.log(f"Waiting for last command {self.cmd[0]} to finish.", level=LOG_LEVELS.Debug)
self.log(f"Waiting for last command {self.cmd[0]} to finish.", level=logging.DEBUG)
if bytes(f']$'.lower(), 'UTF-8') in self.trace_log[0-len(f']$')-5:].lower():
if 'debug' in self.kwargs and self.kwargs['debug']:
self.log(f"{self.cmd[0]} has finished.", level=LOG_LEVELS.Debug)
self.log(f"{self.cmd[0]} has finished.", level=logging.DEBUG)
alive = False
break
self.status = 'done'
if 'debug' in self.kwargs and self.kwargs['debug']:
self.log(f"{self.cmd[0]} waiting for exit code.", level=LOG_LEVELS.Debug)
self.log(f"{self.cmd[0]} waiting for exit code.", level=logging.DEBUG)
if not self.kwargs['emulate']:
try:
@ -291,14 +291,14 @@ class sys_command():#Thread):
self.exit_code = 0
if 'debug' in self.kwargs and self.kwargs['debug']:
self.log(f"{self.cmd[0]} got exit code: {self.exit_code}", level=LOG_LEVELS.Debug)
self.log(f"{self.cmd[0]} got exit code: {self.exit_code}", level=logging.DEBUG)
if 'ignore_errors' in self.kwargs:
self.exit_code = 0
if self.exit_code != 0 and not self.kwargs['suppress_errors']:
#self.log(self.trace_log.decode('UTF-8'), level=LOG_LEVELS.Debug)
#self.log(f"'{self.raw_cmd}' did not exit gracefully, exit code {self.exit_code}.", level=LOG_LEVELS.Error)
#self.log(self.trace_log.decode('UTF-8'), level=logging.DEBUG)
#self.log(f"'{self.raw_cmd}' did not exit gracefully, exit code {self.exit_code}.", level=logging.ERROR)
raise SysCallError(message=f"{self.trace_log.decode('UTF-8')}\n'{self.raw_cmd}' did not exit gracefully (trace log above), exit code: {self.exit_code}", exit_code=self.exit_code)
self.ended = time.time()

View File

@ -1,4 +1,5 @@
import os, stat, time, shutil, pathlib, subprocess
import os, stat, time, shutil, pathlib
import subprocess, logging
from .exceptions import *
from .disk import *
@ -7,7 +8,7 @@ from .user_interaction import *
from .profiles import Profile
from .mirrors import *
from .systemd import Networkd
from .output import log, LOG_LEVELS
from .output import log
from .storage import storage
from .hardware import *
@ -60,7 +61,7 @@ class Installer():
storage['session'] = self
self.partitions = get_partitions_in_use(self.target)
def log(self, *args, level=LOG_LEVELS.Debug, **kwargs):
def log(self, *args, level=logging.DEBUG, **kwargs):
"""
installer.log() wraps output.log() mainly to set a default log-level for this install session.
Any manual override can be done per log() call.
@ -75,8 +76,8 @@ class Installer():
# TODO: https://stackoverflow.com/questions/28157929/how-to-safely-handle-an-exception-inside-a-context-manager
if len(args) >= 2 and args[1]:
#self.log(self.trace_log.decode('UTF-8'), level=LOG_LEVELS.Debug)
self.log(args[1], level=LOG_LEVELS.Error, fg='red')
#self.log(self.trace_log.decode('UTF-8'), level=logging.DEBUG)
self.log(args[1], level=logging.ERROR, fg='red')
self.sync_log_to_install_medium()
@ -89,17 +90,17 @@ class Installer():
self.genfstab()
if not (missing_steps := self.post_install_check()):
self.log('Installation completed without any errors. You may now reboot.', bg='black', fg='green', level=LOG_LEVELS.Info)
self.log('Installation completed without any errors. You may now reboot.', bg='black', fg='green', level=logging.INFO)
self.sync_log_to_install_medium()
return True
else:
self.log('Some required steps were not successfully installed/configured before leaving the installer:', bg='black', fg='red', level=LOG_LEVELS.Warning)
self.log('Some required steps were not successfully installed/configured before leaving the installer:', bg='black', fg='red', level=logging.WARNING)
for step in missing_steps:
self.log(f' - {step}', bg='black', fg='red', level=LOG_LEVELS.Warning)
self.log(f' - {step}', bg='black', fg='red', level=logging.WARNING)
self.log(f"Detailed error logs can be found at: {storage['LOG_PATH']}", level=LOG_LEVELS.Warning)
self.log(f"Submit this zip file as an issue to https://github.com/archlinux/archinstall/issues", level=LOG_LEVELS.Warning)
self.log(f"Detailed error logs can be found at: {storage['LOG_PATH']}", level=logging.WARNING)
self.log(f"Submit this zip file as an issue to https://github.com/archlinux/archinstall/issues", level=logging.WARNING)
self.sync_log_to_install_medium()
return False
@ -129,21 +130,21 @@ class Installer():
def pacstrap(self, *packages, **kwargs):
if type(packages[0]) in (list, tuple): packages = packages[0]
self.log(f'Installing packages: {packages}', level=LOG_LEVELS.Info)
self.log(f'Installing packages: {packages}', level=logging.INFO)
if (sync_mirrors := sys_command('/usr/bin/pacman -Syy')).exit_code == 0:
if (pacstrap := sys_command(f'/usr/bin/pacstrap {self.target} {" ".join(packages)}', **kwargs)).exit_code == 0:
return True
else:
self.log(f'Could not strap in packages: {pacstrap.exit_code}', level=LOG_LEVELS.Info)
self.log(f'Could not strap in packages: {pacstrap.exit_code}', level=logging.INFO)
else:
self.log(f'Could not sync mirrors: {sync_mirrors.exit_code}', level=LOG_LEVELS.Info)
self.log(f'Could not sync mirrors: {sync_mirrors.exit_code}', level=logging.INFO)
def set_mirrors(self, mirrors):
return use_mirrors(mirrors, destination=f'{self.target}/etc/pacman.d/mirrorlist')
def genfstab(self, flags='-pU'):
self.log(f"Updating {self.target}/etc/fstab", level=LOG_LEVELS.Info)
self.log(f"Updating {self.target}/etc/fstab", level=logging.INFO)
fstab = sys_command(f'/usr/bin/genfstab {flags} {self.target}').trace_log
with open(f"{self.target}/etc/fstab", 'ab') as fstab_fh:
@ -179,19 +180,19 @@ class Installer():
else:
self.log(
f"Time zone {zone} does not exist, continuing with system default.",
level=LOG_LEVELS.Warning,
level=logging.WARNING,
fg='red'
)
def activate_ntp(self):
self.log(f'Installing and activating NTP.', level=LOG_LEVELS.Info)
self.log(f'Installing and activating NTP.', level=logging.INFO)
if self.pacstrap('ntp'):
if self.enable_service('ntpd'):
return True
def enable_service(self, *services):
for service in services:
self.log(f'Enabling service {service}', level=LOG_LEVELS.Info)
self.log(f'Enabling service {service}', level=logging.INFO)
if (output := self.arch_chroot(f'systemctl enable {service}')).exit_code != 0:
raise ServiceException(f"Unable to start service {service}: {output}")
@ -349,7 +350,7 @@ class Installer():
# Run registered post-install hooks
for function in self.post_base_install:
self.log(f"Running post-installation hook: {function}", level=LOG_LEVELS.Info)
self.log(f"Running post-installation hook: {function}", level=logging.INFO)
function(self)
return True
@ -363,7 +364,7 @@ class Installer():
elif partition.mountpoint == self.target:
root_partition = partition
self.log(f'Adding bootloader {bootloader} to {boot_partition}', level=LOG_LEVELS.Info)
self.log(f'Adding bootloader {bootloader} to {boot_partition}', level=logging.INFO)
if bootloader == 'systemd-bootctl':
if not hasUEFI():
@ -417,10 +418,10 @@ class Installer():
if (real_device := self.detect_encryption(root_partition)):
# TODO: We need to detect if the encrypted device is a whole disk encryption,
# or simply a partition encryption. Right now we assume it's a partition (and we always have)
log(f"Identifying root partition by PART-UUID on {real_device}: '{real_device.uuid}'.", level=LOG_LEVELS.Debug)
log(f"Identifying root partition by PART-UUID on {real_device}: '{real_device.uuid}'.", level=logging.DEBUG)
entry.write(f'options cryptdevice=PARTUUID={real_device.uuid}:luksdev root=/dev/mapper/luksdev rw intel_pstate=no_hwp\n')
else:
log(f"Identifying root partition by PART-UUID on {root_partition}, looking for '{root_partition.uuid}'.", level=LOG_LEVELS.Debug)
log(f"Identifying root partition by PART-UUID on {root_partition}, looking for '{root_partition.uuid}'.", level=logging.DEBUG)
entry.write(f'options root=PARTUUID={root_partition.uuid} rw intel_pstate=no_hwp\n')
self.helper_flags['bootloader'] = bootloader
@ -458,17 +459,17 @@ class Installer():
if type(profile) == str:
profile = Profile(self, profile)
self.log(f'Installing network profile {profile}', level=LOG_LEVELS.Info)
self.log(f'Installing network profile {profile}', level=logging.INFO)
return profile.install()
def enable_sudo(self, entity :str, group=False):
self.log(f'Enabling sudo permissions for {entity}.', level=LOG_LEVELS.Info)
self.log(f'Enabling sudo permissions for {entity}.', level=logging.INFO)
with open(f'{self.target}/etc/sudoers', 'a') as sudoers:
sudoers.write(f'{"%" if group else ""}{entity} ALL=(ALL) ALL\n')
return True
def user_create(self, user :str, password=None, groups=[], sudo=False):
self.log(f'Creating user {user}', level=LOG_LEVELS.Info)
self.log(f'Creating user {user}', level=logging.INFO)
o = b''.join(sys_command(f'/usr/bin/arch-chroot {self.target} useradd -m -G wheel {user}'))
if password:
self.user_set_pw(user, password)
@ -481,7 +482,7 @@ class Installer():
self.helper_flags['user'] = True
def user_set_pw(self, user, password):
self.log(f'Setting password for {user}', level=LOG_LEVELS.Info)
self.log(f'Setting password for {user}', level=logging.INFO)
if user == 'root':
# This means the root account isn't locked/disabled with * in /etc/passwd
@ -491,7 +492,7 @@ class Installer():
pass
def user_set_shell(self, user, shell):
self.log(f'Setting shell for {user} to {shell}', level=LOG_LEVELS.Info)
self.log(f'Setting shell for {user} to {shell}', level=logging.INFO)
o = b''.join(sys_command(f"/usr/bin/arch-chroot {self.target} sh -c \"chsh -s {shell} {user}\""))
pass
@ -502,5 +503,5 @@ class Installer():
vconsole.write(f'KEYMAP={language}\n')
vconsole.write(f'FONT=lat9w-16\n')
else:
self.log(f'Keyboard language was not changed from default (no language specified).', fg="yellow", level=LOG_LEVELS.Info)
self.log(f'Keyboard language was not changed from default (no language specified).', fg="yellow", level=logging.INFO)
return True

View File

@ -2,10 +2,11 @@ import os
import shlex
import time
import pathlib
import logging
from .exceptions import *
from .general import *
from .disk import Partition
from .output import log, LOG_LEVELS
from .output import log
from .storage import storage
class luks2():
@ -48,7 +49,7 @@ class luks2():
if not self.partition.allow_formatting:
raise DiskError(f'Could not encrypt volume {self.partition} due to it having a formatting lock.')
log(f'Encrypting {partition} (This might take a while)', level=LOG_LEVELS.Info)
log(f'Encrypting {partition} (This might take a while)', level=logging.INFO)
if not key_file:
if self.key_file:
@ -84,7 +85,7 @@ class luks2():
cmd_handle = sys_command(cryptsetup_args)
except SysCallError as err:
if err.exit_code == 256:
log(f'{partition} is being used, trying to unmount and crypt-close the device and running one more attempt at encrypting the device.', level=LOG_LEVELS.Debug)
log(f'{partition} is being used, trying to unmount and crypt-close the device and running one more attempt at encrypting the device.', level=logging.DEBUG)
# Partition was in use, unmount it and try again
partition.unmount()
@ -97,11 +98,11 @@ class luks2():
for child in children:
# Unmount the child location
if child_mountpoint := child.get('mountpoint', None):
log(f'Unmounting {child_mountpoint}', level=LOG_LEVELS.Debug)
log(f'Unmounting {child_mountpoint}', level=logging.DEBUG)
sys_command(f"umount -R {child_mountpoint}")
# And close it if possible.
log(f"Closing crypt device {child['name']}", level=LOG_LEVELS.Debug)
log(f"Closing crypt device {child['name']}", level=logging.DEBUG)
sys_command(f"cryptsetup close {child['name']}")
# Then try again to set up the crypt-device

View File

@ -1,4 +1,4 @@
import urllib.request
import urllib.request, logging
from .exceptions import *
from .general import *
@ -59,7 +59,7 @@ def insert_mirrors(mirrors, *args, **kwargs):
return True
def use_mirrors(regions :dict, destination='/etc/pacman.d/mirrorlist'):
log(f'A new package mirror-list has been created: {destination}', level=LOG_LEVELS.Info)
log(f'A new package mirror-list has been created: {destination}', level=logging.INFO)
for region, mirrors in regions.items():
with open(destination, 'w') as mirrorlist:
for mirror in mirrors:
@ -79,7 +79,7 @@ def list_mirrors():
try:
response = urllib.request.urlopen(url)
except urllib.error.URLError as err:
log(f'Could not fetch an active mirror-list: {err}', level=LOG_LEVELS.Warning, fg="yellow")
log(f'Could not fetch an active mirror-list: {err}', level=logging.WARNING, fg="yellow")
return regions

View File

@ -17,8 +17,32 @@ class LOG_LEVELS:
class journald(dict):
@abc.abstractmethod
def log(message, level=LOG_LEVELS.Debug):
import systemd.journal
def log(message, level=logging.DEBUG):
try:
import systemd.journal
except ModuleNotFoundError:
return False
# For backwards compability, convert old style log-levels
# to logging levels (and warn about deprecated usage)
# There's some code re-usage here but that should be fine.
# TODO: Remove these in a few versions:
if level == LOG_LEVELS.Critical:
log("Deprecated level detected in log message, please use new logging.<level> instead for the following log message:", fg="red", level=logging.ERROR, force=True)
level = logging.CRITICAL
elif level == LOG_LEVELS.Error:
log("Deprecated level detected in log message, please use new logging.<level> instead for the following log message:", fg="red", level=logging.ERROR, force=True)
level = logging.ERROR
elif level == LOG_LEVELS.Warning:
log("Deprecated level detected in log message, please use new logging.<level> instead for the following log message:", fg="red", level=logging.ERROR, force=True)
level = logging.WARNING
elif level == LOG_LEVELS.Info:
log("Deprecated level detected in log message, please use new logging.<level> instead for the following log message:", fg="red", level=logging.ERROR, force=True)
level = logging.INFO
elif level == LOG_LEVELS.Debug:
log("Deprecated level detected in log message, please use new logging.<level> instead for the following log message:", fg="red", level=logging.ERROR, force=True)
level = logging.DEBUG
log_adapter = logging.getLogger('archinstall')
log_fmt = logging.Formatter("[%(levelname)s]: %(message)s")
log_ch = systemd.journal.JournalHandler()
@ -26,19 +50,7 @@ class journald(dict):
log_adapter.addHandler(log_ch)
log_adapter.setLevel(logging.DEBUG)
if level == LOG_LEVELS.Critical:
log_adapter.critical(message)
elif level == LOG_LEVELS.Error:
log_adapter.error(message)
elif level == LOG_LEVELS.Warning:
log_adapter.warning(message)
elif level == LOG_LEVELS.Info:
log_adapter.info(message)
elif level == LOG_LEVELS.Debug:
log_adapter.debug(message)
else:
# Fallback logger
log_adapter.debug(message)
log_adapter.log(level, message)
# TODO: Replace log() for session based logging.
class SessionLogging():
@ -112,17 +124,38 @@ def log(*args, **kwargs):
with open(absolute_logfile, 'a') as log_file:
log_file.write(f"{orig_string}\n")
# If we assigned a level, try to log it to systemd's journald.
# Unless the level is higher than we've decided to output interactively.
# (Remember, log files still get *ALL* the output despite level restrictions)
if 'level' in kwargs:
if kwargs['level'] > storage.get('LOG_LEVEL', LOG_LEVELS.Info):
# For backwards compability, convert old style log-levels
# to logging levels (and warn about deprecated usage)
# There's some code re-usage here but that should be fine.
# TODO: Remove these in a few versions:
if kwargs['level'] == LOG_LEVELS.Critical:
log("Deprecated level detected in log message, please use new logging.<level> instead for the following log message:", fg="red", level=logging.ERROR, force=True)
kwargs['level'] = logging.CRITICAL
elif kwargs['level'] == LOG_LEVELS.Error:
log("Deprecated level detected in log message, please use new logging.<level> instead for the following log message:", fg="red", level=logging.ERROR, force=True)
kwargs['level'] = logging.ERROR
elif kwargs['level'] == LOG_LEVELS.Warning:
log("Deprecated level detected in log message, please use new logging.<level> instead for the following log message:", fg="red", level=logging.ERROR, force=True)
kwargs['level'] = logging.WARNING
elif kwargs['level'] == LOG_LEVELS.Info:
log("Deprecated level detected in log message, please use new logging.<level> instead for the following log message:", fg="red", level=logging.ERROR, force=True)
kwargs['level'] = logging.INFO
elif kwargs['level'] == LOG_LEVELS.Debug:
log("Deprecated level detected in log message, please use new logging.<level> instead for the following log message:", fg="red", level=logging.ERROR, force=True)
kwargs['level'] = logging.DEBUG
if kwargs['level'] > storage.get('LOG_LEVEL', logging.INFO) and not 'force' in kwargs:
# Level on log message was Debug, but output level is set to Info.
# In that case, we'll drop it.
return None
try:
journald.log(string, level=kwargs.get('level', LOG_LEVELS.Info))
journald.log(string, level=kwargs.get('level', logging.INFO))
except ModuleNotFoundError:
pass # Ignore writing to journald

View File

@ -1,10 +1,10 @@
import os, urllib.request, urllib.parse, ssl, json, re
import importlib.util, sys, glob, hashlib
import importlib.util, sys, glob, hashlib, logging
from collections import OrderedDict
from .general import multisplit, sys_command
from .exceptions import *
from .networking import *
from .output import log, LOG_LEVELS
from .output import log
from .storage import storage
def grab_url_data(path):
@ -82,7 +82,7 @@ class Script():
self.examples = None
self.namespace = os.path.splitext(os.path.basename(self.path))[0]
self.original_namespace = self.namespace
log(f"Script {self} has been loaded with namespace '{self.namespace}'", level=LOG_LEVELS.Debug)
log(f"Script {self} has been loaded with namespace '{self.namespace}'", level=logging.DEBUG)
def __enter__(self, *args, **kwargs):
self.execute()

View File

@ -1,9 +1,9 @@
import getpass, pathlib, os, shutil, re
import sys, time, signal, ipaddress
import sys, time, signal, ipaddress, logging
from .exceptions import *
from .profiles import Profile
from .locale_helpers import list_keyboard_languages, verify_keyboard_layout, search_keyboard_layout
from .output import log, LOG_LEVELS
from .output import log
from .storage import storage
from .networking import list_interfaces
from .general import sys_command
@ -26,7 +26,7 @@ def check_for_correct_username(username):
return True
log(
"The username you entered is invalid. Try again",
level=LOG_LEVELS.Warning,
level=logging.WARNING,
fg='red'
)
return False
@ -141,7 +141,7 @@ def ask_for_a_timezone():
else:
log(
f"Specified timezone {timezone} does not exist.",
level=LOG_LEVELS.Warning,
level=logging.WARNING,
fg='red'
)
@ -198,7 +198,7 @@ def ask_to_configure_network():
except ValueError:
log(
"You need to enter a valid IP in IP-config mode.",
level=LOG_LEVELS.Warning,
level=logging.WARNING,
fg='red'
)
@ -214,7 +214,7 @@ def ask_to_configure_network():
except ValueError:
log(
"You need to enter a valid gateway (router) IP address.",
level=LOG_LEVELS.Warning,
level=logging.WARNING,
fg='red'
)

View File

@ -1,4 +1,4 @@
import getpass, time, json, os
import getpass, time, json, os, logging
import archinstall
from archinstall.lib.hardware import hasUEFI
from archinstall.lib.profiles import Profile
@ -237,8 +237,8 @@ def ask_user_questions():
def perform_installation_steps():
print()
print('This is your chosen configuration:')
archinstall.log("-- Guided template chosen (with below config) --", level=archinstall.LOG_LEVELS.Debug)
archinstall.log(json.dumps(archinstall.arguments, indent=4, sort_keys=True, cls=archinstall.JSON), level=archinstall.LOG_LEVELS.Info)
archinstall.log("-- Guided template chosen (with below config) --", level=logging.DEBUG)
archinstall.log(json.dumps(archinstall.arguments, indent=4, sort_keys=True, cls=archinstall.JSON), level=logging.INFO)
print()
input('Press Enter to continue.')
@ -282,7 +282,7 @@ def perform_installation_steps():
else:
partition.format()
else:
archinstall.log(f"Did not format {partition} because .safe_to_format() returned False or .allow_formatting was False.", level=archinstall.LOG_LEVELS.Debug)
archinstall.log(f"Did not format {partition} because .safe_to_format() returned False or .allow_formatting was False.", level=logging.DEBUG)
if hasUEFI():
fs.find_partition('/boot').format('vfat')# we don't have a boot partition in bios mode
@ -313,7 +313,7 @@ def perform_installation(mountpoint):
# Certain services might be running that affects the system during installation.
# Currently, only one such service is "reflector.service" which updates /etc/pacman.d/mirrorlist
# We need to wait for it before we continue since we opted in to use a custom mirror/region.
installation.log(f'Waiting for automatic mirror selection (reflector) to complete.', level=archinstall.LOG_LEVELS.Info)
installation.log(f'Waiting for automatic mirror selection (reflector) to complete.', level=logging.INFO)
while archinstall.service_state('reflector') not in ('dead', 'failed'):
time.sleep(1)
# Set mirrors used by pacstrap (outside of installation)
@ -342,7 +342,7 @@ def perform_installation(mountpoint):
installation.enable_service('systemd-resolved')
if archinstall.arguments.get('audio', None) != None:
installation.log(f"This audio server will be used: {archinstall.arguments.get('audio', None)}", level=archinstall.LOG_LEVELS.Info)
installation.log(f"This audio server will be used: {archinstall.arguments.get('audio', None)}", level=logging.INFO)
if archinstall.arguments.get('audio', None) == 'pipewire':
print('Installing pipewire ...')
@ -351,7 +351,7 @@ def perform_installation(mountpoint):
print('Installing pulseaudio ...')
installation.add_additional_packages("pulseaudio")
else:
installation.log("No audio server will be installed.", level=archinstall.LOG_LEVELS.Info)
installation.log("No audio server will be installed.", level=logging.INFO)
if archinstall.arguments.get('packages', None) and archinstall.arguments.get('packages', None)[0] != '':
installation.add_additional_packages(archinstall.arguments.get('packages', None))