Fix some PEP-8 errors.

This commit is contained in:
Varun Madiath 2020-10-19 22:59:30 -04:00
parent e4f363ce7d
commit 5ded22a5d0
17 changed files with 114 additions and 76 deletions

View File

@ -11,4 +11,4 @@ from .lib.locale_helpers import *
from .lib.services import *
from .lib.packages import *
from .lib.output import *
from .lib.storage import *
from .lib.storage import *

View File

@ -30,14 +30,17 @@ def run_as_a_module():
profile = sys.argv[1]
library = find_examples()
if not f'{profile}.py' in library:
if f'{profile}.py' not in library:
raise ProfileNotFound(f'Could not locate {profile}.py among the example files.')
# Import and execute the chosen `<profile>.py`:
spec = importlib.util.spec_from_file_location(library[f'{profile}.py'], library[f'{profile}.py'])
spec = importlib.util.spec_from_file_location(
library[f"{profile}.py"],
library[f"{profile}.py"]
)
imported_path = importlib.util.module_from_spec(spec)
spec.loader.exec_module(imported_path)
sys.modules[library[f'{profile}.py']] = imported_path
if __name__ == '__main__':
run_as_a_module()
run_as_a_module()

View File

@ -1,6 +1,6 @@
import glob, re, os, json
from collections import OrderedDict
from .exceptions import *
from .exceptions import DiskError
from .general import *
ROOT_DIR_PATTERN = re.compile('^.*?/devices')
@ -21,7 +21,7 @@ class BlockDevice():
return f"BlockDevice({self.device})"
def __getitem__(self, key, *args, **kwargs):
if not key in self.info:
if key not in self.info:
raise KeyError(f'{self} does not contain information: "{key}"')
return self.info[key]
@ -37,9 +37,9 @@ class BlockDevice():
def __dump__(self):
return {
'path' : self.path,
'info' : self.info,
'partition_cache' : self.part_cache
'path': self.path,
'info': self.info,
'partition_cache': self.part_cache
}
@property
@ -50,7 +50,8 @@ class BlockDevice():
If it's a ATA-drive it returns the /dev/X device
And if it's a crypto-device it returns the parent device
"""
if not 'type' in self.info: raise DiskError(f'Could not locate backplane info for "{self.path}"')
if "type" not in self.info:
raise DiskError(f'Could not locate backplane info for "{self.path}"')
if self.info['type'] == 'loop':
for drive in json.loads(b''.join(sys_command(f'losetup --json', hide_from_log=True)).decode('UTF_8'))['loopdevices']:
@ -60,7 +61,8 @@ class BlockDevice():
elif self.info['type'] == 'disk':
return self.path
elif self.info['type'] == 'crypt':
if not 'pkname' in self.info: raise DiskError(f'A crypt device ({self.path}) without a parent kernel device name.')
if 'pkname' not in self.info:
raise DiskError(f'A crypt device ({self.path}) without a parent kernel device name.')
return f"/dev/{self.info['pkname']}"
# if not stat.S_ISBLK(os.stat(full_path).st_mode):
@ -97,7 +99,8 @@ class BlockDevice():
class Partition():
def __init__(self, path, part_id=None, size=-1, filesystem=None, mountpoint=None, encrypted=False):
if not part_id: part_id = os.path.basename(path)
if not part_id:
part_id = os.path.basename(path)
self.path = path
self.part_id = part_id
self.mountpoint = mountpoint
@ -115,7 +118,7 @@ class Partition():
log(f'Formatting {self} -> {filesystem}')
if filesystem == 'btrfs':
o = b''.join(sys_command(f'/usr/bin/mkfs.btrfs -f {self.path}'))
if not b'UUID' in o:
if b'UUID' not in o:
raise DiskError(f'Could not format {self.path} with {filesystem} because: {o}')
self.filesystem = 'btrfs'
elif filesystem == 'fat32':
@ -244,11 +247,11 @@ def device_state(name, *args, **kwargs):
# lsblk --json -l -n -o path
def all_disks(*args, **kwargs):
if not 'partitions' in kwargs: kwargs['partitions'] = False
kwargs.setdefault("partitions", False)
drives = OrderedDict()
#for drive in json.loads(sys_command(f'losetup --json', *args, **lkwargs, hide_from_log=True)).decode('UTF_8')['loopdevices']:
for drive in json.loads(b''.join(sys_command(f'lsblk --json -l -n -o path,size,type,mountpoint,label,pkname', *args, **kwargs, hide_from_log=True)).decode('UTF_8'))['blockdevices']:
if not kwargs['partitions'] and drive['type'] == 'part': continue
drives[drive['path']] = BlockDevice(drive['path'], drive)
return drives
return drives

View File

@ -73,9 +73,10 @@ class sys_command():#Thread):
Stolen from archinstall_gui
"""
def __init__(self, cmd, callback=None, start_callback=None, *args, **kwargs):
if not 'worker_id' in kwargs: kwargs['worker_id'] = gen_uid()
if not 'emulate' in kwargs: kwargs['emulate'] = False
if not 'suppress_errors' in kwargs: kwargs['suppress_errors'] = False
kwargs.setdefault("worker_id", gen_uid())
kwargs.setdefault("emulate", False)
kwargs.setdefault("suppress_errors", False)
if kwargs['emulate']:
log(f"Starting command '{cmd}' in emulation mode.")
self.raw_cmd = cmd
@ -85,7 +86,8 @@ class sys_command():#Thread):
raise ValueError(f'Incorrect string to split: {cmd}\n{e}')
self.args = args
self.kwargs = kwargs
if not 'worker' in self.kwargs: self.kwargs['worker'] = None
self.kwargs.setdefault("worker", None)
self.callback = callback
self.pid = None
self.exit_code = None
@ -110,7 +112,8 @@ class sys_command():#Thread):
if not os.path.isdir(self.exec_dir):
os.makedirs(self.exec_dir)
if start_callback: start_callback(self, *args, **kwargs)
if start_callback:
start_callback(self, *args, **kwargs)
self.run()
def __iter__(self, *args, **kwargs):
@ -125,14 +128,14 @@ class sys_command():#Thread):
def dump(self):
return {
'status' : self.status,
'worker_id' : self.worker_id,
'worker_result' : self.trace_log.decode('UTF-8'),
'started' : self.started,
'ended' : self.ended,
'started_pprint' : '{}-{}-{} {}:{}:{}'.format(*time.localtime(self.started)),
'ended_pprint' : '{}-{}-{} {}:{}:{}'.format(*time.localtime(self.ended)) if self.ended else None,
'exit_code' : self.exit_code
'status': self.status,
'worker_id': self.worker_id,
'worker_result': self.trace_log.decode('UTF-8'),
'started': self.started,
'ended': self.ended,
'started_pprint': '{}-{}-{} {}:{}:{}'.format(*time.localtime(self.started)),
'ended_pprint': '{}-{}-{} {}:{}:{}'.format(*time.localtime(self.ended)) if self.ended else None,
'exit_code': self.exit_code
}
def run(self):
@ -255,4 +258,4 @@ def prerequisite_check():
return True
def reboot():
o = b''.join(sys_command(("/usr/bin/reboot")))
o = b''.join(sys_command("/usr/bin/reboot"))

View File

@ -23,14 +23,15 @@ class luks2():
def encrypt(self, partition, password, key_size=512, hash_type='sha512', iter_time=10000, key_file=None):
log(f'Encrypting {partition}')
if not key_file: key_file = f'/tmp/{os.path.basename(self.partition.path)}.disk_pw' #TODO: Make disk-pw-file randomly unique?
if not key_file:
key_file = f"/tmp/{os.path.basename(self.partition.path)}.disk_pw" # TODO: Make disk-pw-file randomly unique?
if type(password) != bytes: password = bytes(password, 'UTF-8')
with open(key_file, 'wb') as fh:
fh.write(password)
o = b''.join(sys_command(f'/usr/bin/cryptsetup -q -v --type luks2 --pbkdf argon2i --hash {hash_type} --key-size {key_size} --iter-time {iter_time} --key-file {os.path.abspath(key_file)} --use-urandom luksFormat {partition.path}'))
if not b'Command successful.' in o:
if b'Command successful.' not in o:
raise DiskError(f'Could not encrypt volume "{partition.path}": {o}')
return key_file
@ -43,7 +44,8 @@ class luks2():
:param mountpoint: The name without absolute path, for instance "luksdev" will point to /dev/mapper/luksdev
:type mountpoint: str
"""
if '/' in mountpoint: os.path.basename(mountpoint) # TODO: Raise exception instead?
if '/' in mountpoint:
os.path.basename(mountpoint) # TODO: Raise exception instead?
sys_command(f'/usr/bin/cryptsetup open {partition.path} {mountpoint} --key-file {os.path.abspath(key_file)} --type luks2')
if os.path.islink(f'/dev/mapper/{mountpoint}'):
return Partition(f'/dev/mapper/{mountpoint}', encrypted=True)

View File

@ -78,16 +78,16 @@ def list_mirrors():
region = 'Unknown region'
for line in response.readlines():
if len(line.strip()) == 0: continue
if len(line.strip()) == 0:
continue
line = line.decode('UTF-8').strip('\n').strip('\r')
if line[:3] == '## ':
region = line[3:]
elif line[:10] == '#Server = ':
if not region in regions:
regions[region] = {}
regions.setdefault(region, {})
url = line[1:].lstrip('Server = ')
url = line.lstrip('#Server = ')
regions[region][url] = True
return regions

View File

@ -3,7 +3,6 @@ import socket
import struct
from collections import OrderedDict
from .exceptions import *
def getHwAddr(ifname):
s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
@ -13,7 +12,8 @@ def getHwAddr(ifname):
def list_interfaces(skip_loopback=True):
interfaces = OrderedDict()
for index, iface in socket.if_nameindex():
if skip_loopback and iface == 'lo': continue
if skip_loopback and iface == "lo":
continue
mac = getHwAddr(iface).replace(':', '-')
interfaces[mac] = iface

View File

@ -11,7 +11,7 @@ def find_package(name):
"""
ssl_context = ssl.create_default_context()
ssl_context.check_hostname = False
ssl_context.verify_mode=ssl.CERT_NONE
ssl_context.verify_mode = ssl.CERT_NONE
response = urllib.request.urlopen(BASE_URL.format(package=name), context=ssl_context)
data = response.read().decode('UTF-8')
return json.loads(data)

View File

@ -106,7 +106,7 @@ class Profile():
# To avoid profiles importing the wrong 'archinstall',
# we need to ensure that this current archinstall is in sys.path
archinstall_path = os.path.abspath(f'{os.path.dirname(__file__)}/../../')
if not archinstall_path in sys.path:
if archinstall_path not in sys.path:
sys.path.insert(0, archinstall_path)
instructions = self.load_instructions()

View File

@ -3,11 +3,10 @@ import os
from .exceptions import *
from .general import *
def service_state(service_name :str):
def service_state(service_name: str):
if os.path.splitext(service_name)[1] != '.service':
service_name += '.service' # Just to be safe
service_name += '.service' # Just to be safe
state = b''.join(sys_command(f'systemctl show -p SubState --value {service_name}'))
return state.strip().decode('UTF-8')

0
archinstall/lib/tts.py Normal file
View File

View File

@ -1,17 +1,18 @@
import sys, os
import os
import re
import sys
sys.path.insert(0, os.path.abspath('..'))
import re
def process_docstring(app, what, name, obj, options, lines):
spaces_pat = re.compile(r"( {8})")
ll = []
for l in lines:
ll.append(spaces_pat.sub(" ",l))
lines[:] = ll
spaces_pat = re.compile(r"( {8})")
ll = []
for l in lines:
ll.append(spaces_pat.sub(" ", l))
lines[:] = ll
def setup(app):
app.connect('autodoc-process-docstring', process_docstring)
app.connect('autodoc-process-docstring', process_docstring)
# Configuration file for the Sphinx documentation builder.
#
@ -110,8 +111,10 @@ htmlhelp_basename = 'slimhttpdoc'
# One entry per manual page. List of tuples
# (source start file, name, description, authors, manual section).
man_pages = [
('index', 'archinstall', u'archinstall Documentation',
[u'Anton Hvornum'], 1)
(
"index", "archinstall", u"archinstall Documentation",
[u"Anton Hvornum"], 1
)
]
# If true, show URL addresses after external links.
@ -124,6 +127,8 @@ man_pages = [
# (source start file, target name, title, author,
# dir menu entry, description, category)
texinfo_documents = [
('index', 'archinstall', u'archinstall Documentation',
u'Anton Hvornum', 'archinstall', 'Simple and minimal HTTP server.'),
(
"index", "archinstall", u"archinstall Documentation",
u"Anton Hvornum", "archinstall", "Simple and minimal HTTP server."
),
]

View File

@ -88,7 +88,8 @@ archinstall.storage['_guided']['harddrive'] = harddrive
# Ask for a hostname
hostname = input('Desired hostname for the installation: ')
if len(hostname) == 0: hostname = 'ArchInstall'
if len(hostname) == 0:
hostname = 'ArchInstall'
archinstall.storage['_guided']['hostname'] = hostname
# Ask for a root password (optional, but triggers requirement for super-user if skipped)

View File

@ -22,7 +22,11 @@ with archinstall.Filesystem(harddrive, archinstall.GPT) as fs:
with archinstall.luks2(harddrive.partition[1], 'luksloop', disk_password) as unlocked_device:
unlocked_device.format('btrfs')
with archinstall.Installer(unlocked_device, boot_partition=harddrive.partition[0], hostname='testmachine') as installation:
with archinstall.Installer(
unlocked_device,
boot_partition=harddrive.partition[0],
hostname="testmachine"
) as installation:
if installation.minimal_installation():
installation.add_bootloader()
@ -37,7 +41,12 @@ with archinstall.Filesystem(harddrive, archinstall.GPT) as fs:
print(f'Submitting {commit}: success')
conditions = {"project" : "archinstall", "profile" : "52-54-00-12-34-56", "status" : "success", "commit" : commit}
conditions = {
"project": "archinstall",
"profile": "52-54-00-12-34-56",
"status": "success",
"commit": commit
}
req = urllib.request.Request("https://api.archlinux.life/build/success",
data=json.dumps(conditions).encode('utf8'),
headers={'content-type': 'application/json'})

View File

@ -1,14 +1,19 @@
import archinstall
installation.add_additional_packages("awesome xorg-xrandr xterm feh slock terminus-font-otb gnu-free-fonts ttf-liberation xsel")
installation.add_additional_packages(
"awesome xorg-xrandr xterm feh slock terminus-font-otb gnu-free-fonts ttf-liberation xsel"
)
with open(f'{installation.mountpoint}/etc/X11/xinit/xinitrc', 'r') as xinitrc:
xinitrc_data = xinitrc.read()
for line in xinitrc_data.split('\n'):
if 'twm &' in line: xinitrc_data = xinitrc_data.replace(line, f"# {line}")
if 'xclock' in line: xinitrc_data = xinitrc_data.replace(line, f"# {line}")
if 'xterm' in line: xinitrc_data = xinitrc_data.replace(line, f"# {line}")
if "twm &" in line:
xinitrc_data = xinitrc_data.replace(line, f"# {line}")
if "xclock" in line:
xinitrc_data = xinitrc_data.replace(line, f"# {line}")
if "xterm" in line:
xinitrc_data = xinitrc_data.replace(line, f"# {line}")
xinitrc_data += '\n'
xinitrc_data += 'exec awesome\n'
@ -22,4 +27,4 @@ with open(f'{installation.mountpoint}/etc/xdg/awesome/rc.lua', 'r') as awesome_r
awesome = awesome.replace('xterm', 'xterm -ls -xrm \\"XTerm*selectToClipboard: true\\"')
with open(f'{installation.mountpoint}/etc/xdg/awesome/rc.lua', 'w') as awesome_rc_lua:
awesome_rc_lua.write(awesome)
awesome_rc_lua.write(awesome)

View File

@ -2,6 +2,7 @@
import archinstall
def _prep_function(*args, **kwargs):
"""
Magic function called by the importing installer
@ -18,6 +19,7 @@ def _prep_function(*args, **kwargs):
else:
print('Deprecated (??): xorg profile has no _prep_function() anymore')
# Ensures that this code only gets executed if executed
# through importlib.util.spec_from_file_location("awesome", "/somewhere/awesome.py")
# or through conventional import awesome
@ -32,17 +34,19 @@ if __name__ == 'awesome':
# Then setup and configure the desktop environment: awesome
arguments = {
#'keyboard_layout' : 'sv-latin1',
"editor" : "nano",
"mediaplayer" : "lollypop gstreamer gst-plugins-good gnome-keyring",
"filebrowser" : "nemo gpicview-gtk3",
"webbrowser" : "chromium",
"window_manager" : "awesome",
"virtulization" : "qemu ovmf",
"utils" : "openssh sshfs git htop pkgfile scrot dhclient wget smbclient cifs-utils libu2f-host",
"audio" : "pulseaudio pulseaudio-alsa pavucontrol"
"editor": "nano",
"mediaplayer": "lollypop gstreamer gst-plugins-good gnome-keyring",
"filebrowser": "nemo gpicview-gtk3",
"webbrowser": "chromium",
"window_manager": "awesome",
"virtulization": "qemu ovmf",
"utils": "openssh sshfs git htop pkgfile scrot dhclient wget smbclient cifs-utils libu2f-host",
"audio": "pulseaudio pulseaudio-alsa pavucontrol"
}
installation.add_additional_packages("{webbrowser} {utils} {mediaplayer} {window_manager} {virtulization} {filebrowser} {editor}".format(**arguments))
installation.add_additional_packages(
"{webbrowser} {utils} {mediaplayer} {window_manager} {virtulization} {filebrowser} {editor}".format(
**arguments))
#with open(f'{installation.mountpoint}/etc/X11/xinit/xinitrc', 'a') as X11:
# X11.write('setxkbmap se\n')
@ -56,7 +60,10 @@ if __name__ == 'awesome':
awesome_lua = awesome_rc_lua.read()
# Insert slock as a shortcut on Modkey+l (window+l)
awesome_lua = awesome_lua.replace('\nglobalkeys = gears.table.join(', 'globalkeys = gears.table.join(\n awful.key({ modkey, }, \"l\", function() awful.spawn(\"slock &\") end,\n')
awesome_lua = awesome_lua.replace(
"\nglobalkeys = gears.table.join(",
"globalkeys = gears.table.join(\n awful.key({ modkey, }, \"l\", function() awful.spawn(\"slock &\") end,\n"
)
# Insert some useful applications:
#awesome = awesome.replace('{ "open terminal", terminal, ','{ "Chromium", "chromium" },\n "open terminal", terminal, ')
@ -75,5 +82,6 @@ if __name__ == 'awesome':
awesome_rc_lua.write(awesome_lua)
# Remove some interfering nemo settings
installation.arch_chroot('gsettings set org.nemo.desktop show-desktop-icons false')
installation.arch_chroot('xdg-mime default nemo.desktop inode/directory application/x-gnome-saved-search')
installation.arch_chroot("gsettings set org.nemo.desktop show-desktop-icons false")
installation.arch_chroot(
"xdg-mime default nemo.desktop inode/directory application/x-gnome-saved-search")

View File

@ -32,4 +32,4 @@ if __name__ == 'gnome':
installation.enable_service('gdm') # Gnome Display Manager
# We could also start it via xinitrc since we do have Xorg,
# but for gnome that's deprecated and wayland is preferred.
# but for gnome that's deprecated and wayland is preferred.