Manually fix linter issues

Related #2295
This commit is contained in:
Matthias Hagmann 2024-02-20 01:18:22 +01:00 committed by Peter F. Patel-Schneider
parent 7774569971
commit eb937fcc3a
15 changed files with 204 additions and 173 deletions

View File

@ -43,7 +43,10 @@ interactive = os.isatty(0)
prompt = "?? Input: " if interactive else ""
start_time = time.time()
strhex = lambda d: hexlify(d).decode("ascii").upper()
def strhex(d):
return hexlify(d).decode("ascii").upper()
#
#

View File

@ -44,15 +44,14 @@ logger = logging.getLogger(__name__)
#
#
_wired_device = lambda product_id, interface: {
"vendor_id": 0x046D,
"product_id": product_id,
"bus_id": 0x3,
"usb_interface": interface,
"isDevice": True,
}
_bt_device = lambda product_id: {"vendor_id": 0x046D, "product_id": product_id, "bus_id": 0x5, "isDevice": True}
def _wired_device(product_id, interface):
return {"vendor_id": 1133, "product_id": product_id, "bus_id": 3, "usb_interface": interface, "isDevice": True}
def _bt_device(product_id):
return {"vendor_id": 1133, "product_id": product_id, "bus_id": 5, "isDevice": True}
DEVICE_IDS = []
@ -243,7 +242,7 @@ def write(handle, devnumber, data, long_message=False):
except Exception as reason:
logger.error("write failed, assuming handle %r no longer available", handle)
close(handle)
raise exceptions.NoReceiver(reason=reason)
raise exceptions.NoReceiver(reason=reason) from reason
def read(handle, timeout=DEFAULT_TIMEOUT):
@ -292,7 +291,7 @@ def _read(handle, timeout):
except Exception as reason:
logger.warning("read failed, assuming handle %r no longer available", handle)
close(handle)
raise exceptions.NoReceiver(reason=reason)
raise exceptions.NoReceiver(reason=reason) from reason
if data and check_message(data): # ignore messages that fail check
report_id = ord(data[:1])
@ -324,7 +323,7 @@ def _skip_incoming(handle, ihandle, notifications_hook):
except Exception as reason:
logger.error("read failed, assuming receiver %s no longer available", handle)
close(handle)
raise exceptions.NoReceiver(reason=reason)
raise exceptions.NoReceiver(reason=reason) from reason
if data:
if check_message(data): # only process messages that pass check

View File

@ -35,106 +35,125 @@ from .i18n import _
# re_pairs determines whether a receiver pairs by replacing existing pairings, default to False
## currently only one receiver is so marked - should there be more?
_DRIVER = ("hid-generic", "generic-usb", "logitech-djreceiver")
DRIVER = ("hid-generic", "generic-usb", "logitech-djreceiver")
_bolt_receiver = lambda product_id: {
"vendor_id": 0x046D,
def _bolt_receiver(product_id):
return {
"vendor_id": 1133,
"product_id": product_id,
"usb_interface": 2,
"hid_driver": _DRIVER, # noqa: F821
"hid_driver": DRIVER,
"name": _("Bolt Receiver"),
"receiver_kind": "bolt",
"max_devices": 6,
"may_unpair": True,
}
}
_unifying_receiver = lambda product_id: {
"vendor_id": 0x046D,
def _unifying_receiver(product_id):
return {
"vendor_id": 1133,
"product_id": product_id,
"usb_interface": 2,
"hid_driver": _DRIVER, # noqa: F821
"hid_driver": DRIVER,
"name": _("Unifying Receiver"),
"receiver_kind": "unifying",
"may_unpair": True,
}
}
_nano_receiver = lambda product_id: {
"vendor_id": 0x046D,
def _nano_receiver(product_id):
return {
"vendor_id": 1133,
"product_id": product_id,
"usb_interface": 1,
"hid_driver": _DRIVER, # noqa: F821
"hid_driver": DRIVER,
"name": _("Nano Receiver"),
"receiver_kind": "nano",
"may_unpair": False,
"re_pairs": True,
}
}
_nano_receiver_no_unpair = lambda product_id: {
"vendor_id": 0x046D,
def _nano_receiver_no_unpair(product_id):
return {
"vendor_id": 1133,
"product_id": product_id,
"usb_interface": 1,
"hid_driver": _DRIVER, # noqa: F821
"hid_driver": DRIVER,
"name": _("Nano Receiver"),
"receiver_kind": "nano",
"may_unpair": False,
"unpair": False,
"re_pairs": True,
}
}
_nano_receiver_max2 = lambda product_id: {
"vendor_id": 0x046D,
def _nano_receiver_max2(product_id):
return {
"vendor_id": 1133,
"product_id": product_id,
"usb_interface": 1,
"hid_driver": _DRIVER, # noqa: F821
"hid_driver": DRIVER,
"name": _("Nano Receiver"),
"receiver_kind": "nano",
"max_devices": 2,
"may_unpair": False,
"re_pairs": True,
}
}
_nano_receiver_maxn = lambda product_id, max: {
"vendor_id": 0x046D,
def _nano_receiver_maxn(product_id, max):
return {
"vendor_id": 1133,
"product_id": product_id,
"usb_interface": 1,
"hid_driver": _DRIVER, # noqa: F821
"hid_driver": DRIVER,
"name": _("Nano Receiver"),
"receiver_kind": "nano",
"max_devices": max,
"may_unpair": False,
"re_pairs": True,
}
}
_lenovo_receiver = lambda product_id: {
"vendor_id": 0x17EF,
def _lenovo_receiver(product_id):
return {
"vendor_id": 6127,
"product_id": product_id,
"usb_interface": 1,
"hid_driver": _DRIVER, # noqa: F821
"hid_driver": DRIVER,
"name": _("Nano Receiver"),
"receiver_kind": "nano",
"may_unpair": False,
}
}
_lightspeed_receiver = lambda product_id: {
"vendor_id": 0x046D,
def _lightspeed_receiver(product_id):
return {
"vendor_id": 1133,
"product_id": product_id,
"usb_interface": 2,
"hid_driver": _DRIVER, # noqa: F821
"hid_driver": DRIVER,
"name": _("Lightspeed Receiver"),
"may_unpair": False,
}
}
_ex100_receiver = lambda product_id: {
"vendor_id": 0x046D,
def _ex100_receiver(product_id):
return {
"vendor_id": 1133,
"product_id": product_id,
"usb_interface": 1,
"hid_driver": _DRIVER, # noqa: F821
"hid_driver": DRIVER,
"name": _("EX100 Receiver 27 Mhz"),
"receiver_kind": "27Mhz",
"max_devices": 4,
"may_unpair": False,
"re_pairs": True,
}
}
# Receivers added here should also be listed in
# share/solaar/io.github.pwr_solaar.solaar.metainfo.xml
@ -204,5 +223,3 @@ ALL = (
LIGHTSPEED_RECEIVER_C547,
EX100_27MHZ_RECEIVER_C517,
)
del _DRIVER, _unifying_receiver, _nano_receiver, _lenovo_receiver, _lightspeed_receiver

View File

@ -23,7 +23,10 @@ from collections import namedtuple
import yaml as _yaml
is_string = lambda d: isinstance(d, str)
def is_string(d):
return isinstance(d, str)
#
#

View File

@ -1416,22 +1416,6 @@ if True:
{"Rule": [{"Key": ["Brightness Up", "pressed"]}, {"KeyPress": "XF86_MonBrightnessUp"}]},
]
},
# {'Rule': [ # In firefox, crown emits keys that move up and down if not pressed, rotate through tabs otherwise
# {'Process': 'firefox'},
# {'Rule': [{'Test': 'crown_pressed'}, {'Test': 'crown_right_ratchet'}, {'KeyPress': ['Control_R', 'Tab']}]},
# {'Rule': [{'Test': 'crown_pressed'},
# {'Test': 'crown_left_ratchet'},
# {'KeyPress': ['Control_R', 'Shift_R', 'Tab']}]},
# {'Rule': [{'Test': 'crown_right_ratchet'}, {'KeyPress': 'Down'}]},
# {'Rule': [{'Test': 'crown_left_ratchet'}, {'KeyPress': 'Up'}]},
# ]},
# {'Rule': [ # Otherwise, crown movements emit keys that modify volume if not pressed, move between tracks otherwise
# {'Feature': 'CROWN'}, {'Report': 0x0},
# {'Rule': [{'Test': 'crown_pressed'}, {'Test': 'crown_right_ratchet'}, {'KeyPress': 'XF86_AudioNext'}]},
# {'Rule': [{'Test': 'crown_pressed'}, {'Test': 'crown_left_ratchet'}, {'KeyPress': 'XF86_AudioPrev'}]},
# {'Rule': [{'Test': 'crown_right_ratchet'}, {'KeyPress': 'XF86_AudioRaiseVolume'}]},
# {'Rule': [{'Test': 'crown_left_ratchet'}, {'KeyPress': 'XF86_AudioLowerVolume'}]}
# ]},
]
)

View File

@ -130,10 +130,10 @@ class FeaturesArray(dict):
return index if index else False
def __setitem__(self, feature, index):
if type(super().get(feature)) == int:
if isinstance(super().get(feature), int):
self.inverse.pop(super().get(feature))
super().__setitem__(feature, index)
if type(index) == int:
if isinstance(index, int):
self.inverse[index] = feature
def __delitem__(self, feature):
@ -902,19 +902,24 @@ class LEDParam:
LEDRampChoices = _NamedInts(default=0, yes=1, no=2)
LEDFormChoices = _NamedInts(default=0, sine=1, square=2, triangle=3, sawtooth=4, sharkfin=5, exponential=6)
LEDParamSize = {LEDParam.color: 3, LEDParam.speed: 1, LEDParam.period: 2,
LEDParam.intensity: 1, LEDParam.ramp: 1, LEDParam.form: 1} # yapf: disable
LEDParamSize = {
LEDParam.color: 3,
LEDParam.speed: 1,
LEDParam.period: 2,
LEDParam.intensity: 1,
LEDParam.ramp: 1,
LEDParam.form: 1,
}
LEDEffects = { # Wave=0x04, Stars=0x05, Press=0x06, Audio=0x07, # not implemented
0x0: [_NamedInt(0x0, _('Disabled')), {}],
0x1: [_NamedInt(0x1, _('Static')), {LEDParam.color: 0, LEDParam.ramp: 3}],
0x2: [_NamedInt(0x2, _('Pulse')), {LEDParam.color: 0, LEDParam.speed: 3}],
0x3: [_NamedInt(0x3, _('Cycle')), {LEDParam.period: 5, LEDParam.intensity: 7}],
0x8: [_NamedInt(0x8, _('Boot')), {}],
0x9: [_NamedInt(0x9, _('Demo')), {}],
0xA: [_NamedInt(0xA, _('Breathe')), {LEDParam.color: 0, LEDParam.period: 3,
LEDParam.form: 5, LEDParam.intensity: 6}],
0xB: [_NamedInt(0xB, _('Ripple')), {LEDParam.color: 0, LEDParam.period: 4}]
} # yapf: disable
0x0: [_NamedInt(0x0, _("Disabled")), {}],
0x1: [_NamedInt(0x1, _("Static")), {LEDParam.color: 0, LEDParam.ramp: 3}],
0x2: [_NamedInt(0x2, _("Pulse")), {LEDParam.color: 0, LEDParam.speed: 3}],
0x3: [_NamedInt(0x3, _("Cycle")), {LEDParam.period: 5, LEDParam.intensity: 7}],
0x8: [_NamedInt(0x8, _("Boot")), {}],
0x9: [_NamedInt(0x9, _("Demo")), {}],
0xA: [_NamedInt(0xA, _("Breathe")), {LEDParam.color: 0, LEDParam.period: 3, LEDParam.form: 5, LEDParam.intensity: 6}],
0xB: [_NamedInt(0xB, _("Ripple")), {LEDParam.color: 0, LEDParam.period: 4}],
}
class LEDEffectSetting: # an effect plus its parameters

View File

@ -103,6 +103,12 @@ if available:
return n
else:
init = lambda: False
uninit = lambda: None
show = lambda dev, reason=None: None
def init():
return False
def uninit():
return None
def show(dev, reason=None):
return None

View File

@ -1209,7 +1209,7 @@ class RangeValidator(Validator):
def acceptable(self, args, current):
arg = args[0]
# None if len(args) != 1 or type(arg) != int or arg < self.min_value or arg > self.max_value else args)
return None if len(args) != 1 or type(arg) != int or arg < self.min_value or arg > self.max_value else args
return None if len(args) != 1 or isinstance(arg, int) or arg < self.min_value or arg > self.max_value else args
def compare(self, args, current):
if len(args) == 1:
@ -1295,7 +1295,7 @@ class PackedRangeValidator(Validator):
def acceptable(self, args, current):
if len(args) != 2 or int(args[0]) < 0 or int(args[0]) >= self.count:
return None
return None if type(args[1]) != int or args[1] < self.min_value or args[1] > self.max_value else args
return None if not isinstance(args[1], int) or args[1] < self.min_value or args[1] > self.max_value else args
def compare(self, args, current):
logger.warning("compare not implemented for packed range settings")

View File

@ -121,21 +121,21 @@ def select_toggle(value, setting, key=None):
else:
try:
value = bool(int(value))
except Exception:
except Exception as exc:
if value.lower() in ("true", "yes", "on", "t", "y"):
value = True
elif value.lower() in ("false", "no", "off", "f", "n"):
value = False
else:
raise Exception("%s: don't know how to interpret '%s' as boolean" % (setting.name, value))
raise Exception("%s: don't know how to interpret '%s' as boolean" % (setting.name, value)) from exc
return value
def select_range(value, setting):
try:
value = int(value)
except ValueError:
raise Exception("%s: can't interpret '%s' as integer" % (setting.name, value))
except ValueError as exc:
raise Exception("%s: can't interpret '%s' as integer" % (setting.name, value)) from exc
min, max = setting.range
if value < min or value > max:
raise Exception("%s: value '%s' out of bounds" % (setting.name, value))

View File

@ -173,8 +173,8 @@ def _convert_json(json_dict):
dev[_KEY_WPID] = dev.get(_KEY_WPID) if dev.get(_KEY_WPID) else key[0]
dev[_KEY_SERIAL] = dev.get(_KEY_SERIAL) if dev.get(_KEY_SERIAL) else key[1]
for k, v in dev.items():
if type(k) == str and not k.startswith("_") and type(v) == dict: # convert string keys to ints
v = {int(dk) if type(dk) == str else dk: dv for dk, dv in v.items()}
if isinstance(k, str) and not k.startswith("_") and isinstance(v, dict): # convert string keys to ints
v = {int(dk) if isinstance(dk, str) else dk: dv for dk, dv in v.items()}
dev[k] = v
for k in ["mouse-gestures", "dpi-sliding"]:
v = dev.get(k, None)

View File

@ -344,14 +344,14 @@ def stop_all():
if logger.isEnabledFor(logging.INFO):
logger.info("stopping receiver listening threads %s", listeners)
for l in listeners:
l.stop()
for listener_thread in listeners:
listener_thread.stop()
configuration.save()
if listeners:
for l in listeners:
l.join()
for listener_thread in listeners:
listener_thread.join()
# ping all devices to find out whether they are connected
@ -360,22 +360,22 @@ def stop_all():
def ping_all(resuming=False):
if logger.isEnabledFor(logging.INFO):
logger.info("ping all devices%s", " when resuming" if resuming else "")
for l in _all_listeners.values():
if l.receiver.isDevice:
if resuming and hasattr(l.receiver, "status"):
l.receiver.status._active = None # ensure that settings are pushed
if l.receiver.ping():
l.receiver.status.changed(active=True, push=True)
l._status_changed(l.receiver)
for listener_thread in _all_listeners.values():
if listener_thread.receiver.isDevice:
if resuming and hasattr(listener_thread.receiver, "status"):
listener_thread.receiver.status._active = None # ensure that settings are pushed
if listener_thread.receiver.ping():
listener_thread.receiver.status.changed(active=True, push=True)
listener_thread._status_changed(listener_thread.receiver)
else:
count = l.receiver.count()
count = listener_thread.receiver.count()
if count:
for dev in l.receiver:
for dev in listener_thread.receiver:
if resuming and hasattr(dev, "status"):
dev.status._active = None # ensure that settings are pushed
if dev.ping():
dev.status.changed(active=True, push=True)
l._status_changed(dev)
listener_thread._status_changed(dev)
count -= 1
if not count:
break
@ -428,10 +428,10 @@ def _process_receiver_event(action, device_info):
logger.info("receiver event %s %s", action, device_info)
# whatever the action, stop any previous receivers at this path
l = _all_listeners.pop(device_info.path, None)
if l is not None:
assert isinstance(l, ReceiverListener)
l.stop()
listener_thread = _all_listeners.pop(device_info.path, None)
if listener_thread is not None:
assert isinstance(listener_thread, ReceiverListener)
listener_thread.stop()
if action == "add": # a new device was detected
_process_add(device_info, 3)

View File

@ -200,7 +200,10 @@ class ChoiceControlBig(Gtk.Entry, Control):
liststore.append((int(v), str(v)))
completion = Gtk.EntryCompletion()
completion.set_model(liststore)
norm = lambda s: s.replace("_", "").replace(" ", "").lower()
def norm(s):
return s.replace("_", "").replace(" ", "").lower()
completion.set_match_func(lambda completion, key, it: norm(key) in norm(completion.get_model()[it][1]))
completion.set_text_column(1)
self.set_completion(completion)

View File

@ -714,6 +714,10 @@ class DiversionDialog:
# return pos + length
def norm(s):
return s.replace("_", "").replace(" ", "").lower()
class CompletionEntry(Gtk.Entry):
def __init__(self, values, *args, **kwargs):
super().__init__(*args, **kwargs)
@ -726,7 +730,6 @@ class CompletionEntry(Gtk.Entry):
liststore = Gtk.ListStore(str)
completion = Gtk.EntryCompletion()
completion.set_model(liststore)
norm = lambda s: s.replace("_", "").replace(" ", "").lower()
completion.set_match_func(lambda completion, key, it: norm(key) in norm(completion.get_model()[it][0]))
completion.set_text_column(0)
entry.set_completion(completion)

View File

@ -141,8 +141,16 @@ if available:
logger.exception("showing %s", n)
else:
init = lambda: False
uninit = lambda: None
def init():
return False
def uninit():
return None
# toggle = lambda action: False
alert = lambda reason: None
show = lambda dev, reason=None: None
def alert(reason):
return None
def show(dev, reason=None):
return None

View File

@ -158,9 +158,9 @@ try:
from gi.repository import AppIndicator3
ayatana_appindicator_found = False
except ValueError:
except ValueError as exc:
# treat unavailable versions the same as unavailable packages
raise ImportError
raise ImportError from exc
if logger.isEnabledFor(logging.DEBUG):
logger.debug("using %sAppIndicator3" % ("Ayatana " if ayatana_appindicator_found else ""))