Fix warnings from automatic code inspections

Warnings found by automatic code inspection and partially tackled
- Drop distuitls inf favour of setuptools
- Replace deprecated pyudev.Device.from_device_number
- Remove unnecessary brackets
- Avoid access to private variables etc.
- Shadows built-in name
- Line length >120 characters
- Not a module level variable
- Simplify clause
and more
This commit is contained in:
MattHag 2024-09-30 21:37:05 +02:00 committed by Peter F. Patel-Schneider
parent 0f4d1aebcd
commit 46366b2430
32 changed files with 479 additions and 188 deletions

View File

@ -25,6 +25,7 @@ def init_paths():
import sys
# Python 3 might have problems converting back to UTF-8 in case of Unicode surrogates
decoded_path = None
try:
decoded_path = sys.path[0]
sys.path[0].encode(sys.getfilesystemencoding())

View File

@ -560,7 +560,8 @@ class ArrayItem(MainItem):
)
continue
if usage in self._usages and all(usage_type not in self._INCOMPATIBLE_TYPES for usage_type in usage.usage_types):
not_incompatible_type = all(usage_type not in self._INCOMPATIBLE_TYPES for usage_type in usage.usage_types)
if usage in self._usages and not_incompatible_type:
usage_values[usage] = UsageValue(self, True)
return usage_values
@ -820,14 +821,28 @@ class ReportDescriptor:
if data is None:
raise InvalidReportDescriptor("Invalid output item")
self._append_items(
offset_output, self._output, report_id, report_count, report_size, usages, data, {**glob, **local}
offset_output,
self._output,
report_id,
report_count,
report_size,
usages,
data,
{**glob, **local},
)
elif tag == TagMain.FEATURE:
if data is None:
raise InvalidReportDescriptor("Invalid feature item")
self._append_items(
offset_feature, self._feature, report_id, report_count, report_size, usages, data, {**glob, **local}
offset_feature,
self._feature,
report_id,
report_count,
report_size,
usages,
data,
{**glob, **local},
)
# clear local

View File

@ -1,18 +1,20 @@
from __future__ import annotations
import dataclasses
@dataclasses.dataclass
class DeviceInfo:
path: str
bus_id: str
bus_id: str | None
vendor_id: str
product_id: str
interface: str
driver: str
manufacturer: str
product: str
serial: str
release: str
interface: str | None
driver: str | None
manufacturer: str | None
product: str | None
serial: str | None
release: str | None
isDevice: bool
hidpp_short: str
hidpp_long: str
hidpp_short: str | None
hidpp_long: str | None

View File

@ -33,6 +33,7 @@ import typing
from threading import Thread
from time import sleep
from typing import Callable
from hidapi.common import DeviceInfo
@ -203,6 +204,7 @@ class _DeviceMonitor(Thread):
def __init__(self, device_callback, polling_delay=5.0):
self.device_callback = device_callback
self.polling_delay = polling_delay
self.prev_devices = None
# daemon threads are automatically killed when main thread exits
super().__init__(daemon=True)
@ -259,7 +261,12 @@ def _match(action, device, filterfn):
if logger.isEnabledFor(logging.INFO):
logger.info(
"Found device BID %s VID %04X PID %04X HID++ %s %s", bus_id, vid, pid, device["hidpp_short"], device["hidpp_long"]
"Found device BID %s VID %04X PID %04X HID++ %s %s",
bus_id,
vid,
pid,
device["hidpp_short"],
device["hidpp_long"],
)
if not device["hidpp_short"] and not device["hidpp_long"]:
@ -317,7 +324,7 @@ def find_paired_node_wpid(receiver_path: str, index: int):
return None
def monitor_glib(glib: GLib, callback, filterfn):
def monitor_glib(glib: GLib, callback: Callable, filterfn: Callable):
"""Monitor GLib.
Parameters
@ -452,7 +459,6 @@ def read(device_handle, bytes_count, timeout_ms=None):
if bytes_read < 0:
raise HIDError(_hidapi.hid_error(device_handle))
return None
return data.raw[:bytes_read]

View File

@ -36,6 +36,7 @@ import warnings
from select import select
from time import sleep
from time import time
from typing import Callable
import pyudev
@ -114,7 +115,12 @@ def _match(action, device, filter_func: typing.Callable[[int, int, int, bool, bo
hidpp_short = None
hidpp_long = None
logger.info(
"Report Descriptor not processed for DEVICE %s BID %s VID %s PID %s: %s", device.device_node, bid, vid, pid, e
"Report Descriptor not processed for DEVICE %s BID %s VID %s PID %s: %s",
device.device_node,
bid,
vid,
pid,
e,
)
filtered_result = filter_func(int(bid, 16), int(vid, 16), int(pid, 16), hidpp_short, hidpp_long)
@ -222,7 +228,7 @@ def find_paired_node_wpid(receiver_path, index):
return None
def monitor_glib(glib: GLib, callback, filterfn):
def monitor_glib(glib: GLib, callback: Callable, filterfn: typing.Callable):
"""Monitor GLib.
Parameters
@ -453,7 +459,7 @@ def get_indexed_string(device_handle, index):
assert device_handle
stat = os.fstat(device_handle)
try:
dev = pyudev.Device.from_device_number(pyudev.Context(), "char", stat.st_rdev)
dev = pyudev.Devices.from_device_number(pyudev.Context(), "char", stat.st_rdev)
except (pyudev.DeviceNotFoundError, ValueError):
return None

View File

@ -29,6 +29,7 @@ from contextlib import contextmanager
from random import getrandbits
from time import time
from typing import Any
from typing import Callable
import gi
@ -53,7 +54,7 @@ else:
logger = logging.getLogger(__name__)
_SHORT_MESSAGE_SIZE = 7
SHORT_MESSAGE_SIZE = 7
_LONG_MESSAGE_SIZE = 20
_MEDIUM_MESSAGE_SIZE = 15
_MAX_READ_SIZE = 32
@ -138,7 +139,7 @@ def _match(record: dict[str, Any], bus_id: int, vendor_id: int, product_id: int)
def filter_receivers(
bus_id: int, vendor_id: int, product_id: int, hidpp_short: bool = False, hidpp_long: bool = False
bus_id: int, vendor_id: int, product_id: int, _hidpp_short: bool = False, _hidpp_long: bool = False
) -> dict[str, Any]:
"""Check that this product is a Logitech receiver.
@ -184,7 +185,7 @@ def receivers_and_devices():
yield from hidapi.enumerate(filter_products_of_interest)
def notify_on_receivers_glib(glib: GLib, callback):
def notify_on_receivers_glib(glib: GLib, callback: Callable):
"""Watch for matching devices and notifies the callback on the GLib thread.
Parameters
@ -254,7 +255,7 @@ def write(handle, devnumber, data, long_message=False):
assert data is not None
assert isinstance(data, bytes), (repr(data), type(data))
if long_message or len(data) > _SHORT_MESSAGE_SIZE - 2 or data[:1] == b"\x82":
if long_message or len(data) > SHORT_MESSAGE_SIZE - 2 or data[:1] == b"\x82":
wdata = struct.pack("!BB18s", HIDPP_LONG_MESSAGE_ID, devnumber, data)
else:
wdata = struct.pack("!BB5s", HIDPP_SHORT_MESSAGE_ID, devnumber, data)
@ -303,7 +304,7 @@ def is_relevant_message(data: bytes) -> bool:
# mapping from report_id to message length
report_lengths = {
HIDPP_SHORT_MESSAGE_ID: _SHORT_MESSAGE_SIZE,
HIDPP_SHORT_MESSAGE_ID: SHORT_MESSAGE_SIZE,
HIDPP_LONG_MESSAGE_ID: _LONG_MESSAGE_SIZE,
DJ_MESSAGE_ID: _MEDIUM_MESSAGE_SIZE,
0x21: _MAX_READ_SIZE,
@ -344,7 +345,12 @@ def _read(handle, timeout):
report_id != DJ_MESSAGE_ID or ord(data[2:3]) > 0x10
): # ignore DJ input messages
logger.debug(
"(%s) => r[%02X %02X %s %s]", handle, report_id, devnumber, common.strhex(data[2:4]), common.strhex(data[4:])
"(%s) => r[%02X %02X %s %s]",
handle,
report_id,
devnumber,
common.strhex(data[2:4]),
common.strhex(data[4:]),
)
return report_id, devnumber, data[2:]
@ -554,7 +560,12 @@ def request(
error,
hidpp20_constants.ERROR[error],
)
raise exceptions.FeatureCallError(number=devnumber, request=request_id, error=error, params=params)
raise exceptions.FeatureCallError(
number=devnumber,
request=request_id,
error=error,
params=params,
)
if reply_data[:2] == request_data[:2]:
if devnumber == 0xFF:
@ -628,7 +639,7 @@ def ping(handle, devnumber, long_message: bool = False):
and reply_data[1:3] == request_data[:2]
): # error response
error = ord(reply_data[3:4])
if error == hidpp10_constants.ERROR.invalid_SubID__command: # a valid reply from a HID++ 1.0 device
if error == hidpp10_constants.ERROR.invalid_SubID__command: # valid reply from HID++ 1.0 device
return 1.0
if (
error == hidpp10_constants.ERROR.resource_error

View File

@ -370,12 +370,12 @@ class NamedInts:
__slots__ = ("__dict__", "_values", "_indexed", "_fallback", "_is_sorted")
def __init__(self, dict=None, **kwargs):
def __init__(self, dict_=None, **kwargs):
def _readable_name(n):
return n.replace("__", "/").replace("_", " ")
# print (repr(kwargs))
elements = dict if dict else kwargs
elements = dict_ if dict_ else kwargs
values = {k: NamedInt(v, _readable_name(k)) for (k, v) in elements.items()}
self.__dict__ = values
self._is_sorted = False
@ -499,7 +499,7 @@ class NamedInts:
return NamedInts(**self.__dict__, **other.__dict__)
def __eq__(self, other):
return type(self) == type(other) and self._values == other._values
return isinstance(other, self.__class__) and self._values == other._values
class UnsortedNamedInts(NamedInts):
@ -548,7 +548,7 @@ class FirmwareInfo:
kind: str
name: str
version: str
extras: str
extras: str | None
class BatteryStatus(IntEnum):

View File

@ -225,7 +225,13 @@ _D("Craft Advanced Keyboard", codename="Craft", protocol=4.5, wpid="4066", btid=
_D("Wireless Illuminated Keyboard K800 new", codename="K800 new", protocol=4.5, wpid="406E")
_D("Wireless Keyboard K470", codename="K470", protocol=4.5, wpid="4075")
_D("MX Keys Keyboard", codename="MX Keys", protocol=4.5, wpid="408A", btid=0xB35B)
_D("G915 TKL LIGHTSPEED Wireless RGB Mechanical Gaming Keyboard", codename="G915 TKL", protocol=4.2, wpid="408E", usbid=0xC343)
_D(
"G915 TKL LIGHTSPEED Wireless RGB Mechanical Gaming Keyboard",
codename="G915 TKL",
protocol=4.2,
wpid="408E",
usbid=0xC343,
)
_D("Illuminated Keyboard", codename="Illuminated", protocol=1.0, usbid=0xC318, interface=1)
_D("G213 Prodigy Gaming Keyboard", codename="G213", usbid=0xC336, interface=1)
_D("G512 RGB Mechanical Gaming Keyboard", codename="G512", usbid=0xC33C, interface=1)
@ -253,7 +259,14 @@ _D(
wpid=("1006", "100D", "0612"),
registers=(Reg.BATTERY_CHARGE,),
)
_D("MX Air", codename="MX Air", protocol=1.0, kind=DEVICE_KIND.mouse, wpid=("1007", "100E"), registers=(Reg.BATTERY_CHARGE))
_D(
"MX Air",
codename="MX Air",
protocol=1.0,
kind=DEVICE_KIND.mouse,
wpid=("1007", "100E"),
registers=Reg.BATTERY_CHARGE,
)
_D(
"MX Revolution",
codename="MX Revolution",
@ -262,10 +275,34 @@ _D(
wpid=("1008", "100C"),
registers=(Reg.BATTERY_CHARGE,),
)
_D("MX620 Laser Cordless Mouse", codename="MX620", protocol=1.0, wpid=("100A", "1016"), registers=(Reg.BATTERY_CHARGE,))
_D("VX Nano Cordless Laser Mouse", codename="VX Nano", protocol=1.0, wpid=("100B", "100F"), registers=(Reg.BATTERY_CHARGE,))
_D("V450 Nano Cordless Laser Mouse", codename="V450 Nano", protocol=1.0, wpid="1011", registers=(Reg.BATTERY_CHARGE,))
_D("V550 Nano Cordless Laser Mouse", codename="V550 Nano", protocol=1.0, wpid="1013", registers=(Reg.BATTERY_CHARGE,))
_D(
"MX620 Laser Cordless Mouse",
codename="MX620",
protocol=1.0,
wpid=("100A", "1016"),
registers=(Reg.BATTERY_CHARGE,),
)
_D(
"VX Nano Cordless Laser Mouse",
codename="VX Nano",
protocol=1.0,
wpid=("100B", "100F"),
registers=(Reg.BATTERY_CHARGE,),
)
_D(
"V450 Nano Cordless Laser Mouse",
codename="V450 Nano",
protocol=1.0,
wpid="1011",
registers=(Reg.BATTERY_CHARGE,),
)
_D(
"V550 Nano Cordless Laser Mouse",
codename="V550 Nano",
protocol=1.0,
wpid="1013",
registers=(Reg.BATTERY_CHARGE,),
)
_D(
"MX 1100 Cordless Laser Mouse",
codename="MX 1100",
@ -282,11 +319,40 @@ _D(
wpid="101A",
registers=(Reg.BATTERY_STATUS, Reg.THREE_LEDS),
)
_D("Marathon Mouse M705 (M-R0009)", codename="M705 (M-R0009)", protocol=1.0, wpid="101B", registers=(Reg.BATTERY_CHARGE,))
_D("Wireless Mouse M350", codename="M350", protocol=1.0, wpid="101C", registers=(Reg.BATTERY_CHARGE,))
_D("Wireless Mouse M505", codename="M505/B605", protocol=1.0, wpid="101D", registers=(Reg.BATTERY_CHARGE,))
_D("Wireless Mouse M305", codename="M305", protocol=1.0, wpid="101F", registers=(Reg.BATTERY_STATUS,))
_D("Wireless Mouse M215", codename="M215", protocol=1.0, wpid="1020")
_D(
"Marathon Mouse M705 (M-R0009)",
codename="M705 (M-R0009)",
protocol=1.0,
wpid="101B",
registers=(Reg.BATTERY_CHARGE,),
)
_D(
"Wireless Mouse M350",
codename="M350",
protocol=1.0,
wpid="101C",
registers=(Reg.BATTERY_CHARGE,),
)
_D(
"Wireless Mouse M505",
codename="M505/B605",
protocol=1.0,
wpid="101D",
registers=(Reg.BATTERY_CHARGE,),
)
_D(
"Wireless Mouse M305",
codename="M305",
protocol=1.0,
wpid="101F",
registers=(Reg.BATTERY_STATUS,),
)
_D(
"Wireless Mouse M215",
codename="M215",
protocol=1.0,
wpid="1020",
)
_D(
"G700 Gaming Mouse",
codename="G700",
@ -382,5 +448,19 @@ _D("G533 Gaming Headset", codename="G533 Headset", protocol=2.0, interface=3, ki
_D("G535 Gaming Headset", codename="G535 Headset", protocol=2.0, interface=3, kind=DEVICE_KIND.headset, usbid=0x0AC4)
_D("G935 Gaming Headset", codename="G935 Headset", protocol=2.0, interface=3, kind=DEVICE_KIND.headset, usbid=0x0A87)
_D("G733 Gaming Headset", codename="G733 Headset", protocol=2.0, interface=3, kind=DEVICE_KIND.headset, usbid=0x0AB5)
_D("G733 Gaming Headset", codename="G733 Headset New", protocol=2.0, interface=3, kind=DEVICE_KIND.headset, usbid=0x0AFE)
_D("PRO X Wireless Gaming Headset", codename="PRO Headset", protocol=2.0, interface=3, kind=DEVICE_KIND.headset, usbid=0x0ABA)
_D(
"G733 Gaming Headset",
codename="G733 Headset New",
protocol=2.0,
interface=3,
kind=DEVICE_KIND.headset,
usbid=0x0AFE,
)
_D(
"PRO X Wireless Gaming Headset",
codename="PRO Headset",
protocol=2.0,
interface=3,
kind=DEVICE_KIND.headset,
usbid=0x0ABA,
)

View File

@ -176,7 +176,11 @@ class Device:
descriptors.get_btid(self.product_id) if self.bluetooth else descriptors.get_usbid(self.product_id)
)
if self.number is None: # for direct-connected devices get 'number' from descriptor protocol else use 0xFF
self.number = 0x00 if self.descriptor and self.descriptor.protocol and self.descriptor.protocol < 2.0 else 0xFF
if self.descriptor and self.descriptor.protocol and self.descriptor.protocol < 2.0:
number = 0x00
else:
number = 0xFF
self.number = number
self.ping() # determine whether a direct-connected device is online
if self.descriptor:

View File

@ -26,6 +26,7 @@ import subprocess
import sys
import time
from typing import Any
from typing import Dict
from typing import Tuple
@ -113,9 +114,17 @@ except Exception:
# Globals
xtest_available = True # Xtest might be available
xdisplay = None
Xkbdisplay = None # xkb might be available
X11Lib = None
modifier_keycodes = []
XkbUseCoreKbd = 0x100
NET_ACTIVE_WINDOW = None
NET_WM_PID = None
WM_CLASS = None
udevice = None
@ -187,7 +196,10 @@ def gnome_dbus_interface_setup():
remote_object = bus.get_object("org.gnome.Shell", "/io/github/pwr_solaar/solaar")
_dbus_interface = dbus.Interface(remote_object, "io.github.pwr_solaar.solaar")
except dbus.exceptions.DBusException:
logger.warning("Solaar Gnome extension not installed - some rule capabilities inoperable", exc_info=sys.exc_info())
logger.warning(
"Solaar Gnome extension not installed - some rule capabilities inoperable",
exc_info=sys.exc_info(),
)
_dbus_interface = False
return _dbus_interface
@ -228,7 +240,10 @@ if evdev:
for _, evcode in buttons.values():
if evcode:
key_events.append(evcode)
devicecap = {evdev.ecodes.EV_KEY: key_events, evdev.ecodes.EV_REL: [evdev.ecodes.REL_WHEEL, evdev.ecodes.REL_HWHEEL]}
devicecap = {
evdev.ecodes.EV_KEY: key_events,
evdev.ecodes.EV_REL: [evdev.ecodes.REL_WHEEL, evdev.ecodes.REL_HWHEEL],
}
else:
# Just mock these since they won't be useful without evdev anyway
buttons = {}
@ -283,9 +298,9 @@ def xy_direction(_x, _y):
y = round(_y / m)
if x < 0 and y < 0:
return "Mouse Up-left"
elif x > 0 and y < 0:
elif x > 0 > y:
return "Mouse Up-right"
elif x < 0 and y > 0:
elif x < 0 < y:
return "Mouse Down-left"
elif x > 0 and y > 0:
return "Mouse Down-right"
@ -456,7 +471,7 @@ TESTS = {
"crown_tap": [lambda f, r, d, a: f == FEATURE.CROWN and r == 0 and d[5] == 0x01 and d[5], False],
"crown_start_press": [lambda f, r, d, a: f == FEATURE.CROWN and r == 0 and d[6] == 0x01 and d[6], False],
"crown_end_press": [lambda f, r, d, a: f == FEATURE.CROWN and r == 0 and d[6] == 0x05 and d[6], False],
"crown_pressed": [lambda f, r, d, a: f == FEATURE.CROWN and r == 0 and d[6] >= 0x01 and d[6] <= 0x04 and d[6], False],
"crown_pressed": [lambda f, r, d, a: f == FEATURE.CROWN and r == 0 and 0x01 <= d[6] <= 0x04 and d[6], False],
"thumb_wheel_up": [thumb_wheel_up, True],
"thumb_wheel_down": [thumb_wheel_down, True],
"lowres_wheel_up": [
@ -488,7 +503,7 @@ MOUSE_GESTURE_TESTS = {
"mouse-noop": [],
}
COMPONENTS = {}
# COMPONENTS = {}
class RuleComponent:
@ -503,6 +518,17 @@ class RuleComponent:
return Condition()
def _evaluate(components, feature, notification, device, result) -> Any:
res = True
for component in components:
res = component.evaluate(feature, notification, device, result)
if not isinstance(component, Action) and res is None:
return None
if isinstance(component, Condition) and not res:
return res
return res
class Rule(RuleComponent):
def __init__(self, args, source=None, warn=True):
self.components = [self.compile(a) for a in args]
@ -515,14 +541,7 @@ class Rule(RuleComponent):
def evaluate(self, feature, notification, device, last_result):
if logger.isEnabledFor(logging.DEBUG):
logger.debug("evaluate rule: %s", self)
result = True
for component in self.components:
result = component.evaluate(feature, notification, device, result)
if not isinstance(component, Action) and result is None:
return None
if isinstance(component, Condition) and not result:
return result
return result
return _evaluate(self.components, feature, notification, device, True)
def once(self, feature, notification, device, last_result):
self.evaluate(feature, notification, device, last_result)
@ -598,14 +617,7 @@ class And(Condition):
def evaluate(self, feature, notification, device, last_result):
if logger.isEnabledFor(logging.DEBUG):
logger.debug("evaluate condition: %s", self)
result = True
for component in self.components:
result = component.evaluate(feature, notification, device, last_result)
if not isinstance(component, Action) and result is None:
return None
if isinstance(component, Condition) and not result:
return result
return result
return _evaluate(self.components, feature, notification, device, last_result)
def data(self):
return {"And": [c.data() for c in self.components]}
@ -663,7 +675,8 @@ class Process(Condition):
if (not wayland and not x11_setup()) or (wayland and not gnome_dbus_interface_setup()):
if warn:
logger.warning(
"rules can only access active process in X11 or in Wayland under GNOME with Solaar Gnome extension - %s",
"rules can only access active process in X11 or in Wayland under GNOME with Solaar Gnome "
"extension - %s",
self,
)
if not isinstance(process, str):
@ -1218,7 +1231,13 @@ class KeyPress(Action):
if gkeymap:
current = gkeymap.get_modifier_state()
if logger.isEnabledFor(logging.INFO):
logger.info("KeyPress action: %s %s, group %s, modifiers %s", self.key_names, self.action, kbdgroup(), current)
logger.info(
"KeyPress action: %s %s, group %s, modifiers %s",
self.key_names,
self.action,
kbdgroup(),
current,
)
if self.action != RELEASE:
self.keyDown(self.key_symbols, current)
if self.action != DEPRESS:
@ -1287,7 +1306,10 @@ class MouseClick(Action):
if count in [CLICK, DEPRESS, RELEASE]:
self.count = count
elif warn:
logger.warning("rule MouseClick action: argument %s should be an integer or CLICK, PRESS, or RELEASE", count)
logger.warning(
"rule MouseClick action: argument %s should be an integer or CLICK, PRESS, or RELEASE",
count,
)
self.count = 1
def __str__(self):
@ -1332,7 +1354,12 @@ class Set(Action):
return None
args = setting.acceptable(self.args[2:], setting.read())
if args is None:
logger.warning("Set Action: invalid args %s for setting %s of %s", self.args[2:], self.args[1], self.args[0])
logger.warning(
"Set Action: invalid args %s for setting %s of %s",
self.args[2:],
self.args[1],
self.args[0],
)
return None
if len(args) > 1:
setting.write_key_value(args[0], args[1])
@ -1432,18 +1459,17 @@ COMPONENTS = {
"Later": Later,
}
built_in_rules = Rule([])
if True:
built_in_rules = Rule(
[
{
"Rule": [ # Implement problematic keys for Craft and MX Master
{"Rule": [{"Key": ["Brightness Down", "pressed"]}, {"KeyPress": "XF86_MonBrightnessDown"}]},
{"Rule": [{"Key": ["Brightness Up", "pressed"]}, {"KeyPress": "XF86_MonBrightnessUp"}]},
]
},
]
)
built_in_rules = Rule(
[
{
"Rule": [ # Implement problematic keys for Craft and MX Master
{"Rule": [{"Key": ["Brightness Down", "pressed"]}, {"KeyPress": "XF86_MonBrightnessDown"}]},
{"Rule": [{"Key": ["Brightness Up", "pressed"]}, {"KeyPress": "XF86_MonBrightnessUp"}]},
]
},
]
)
def key_is_down(key):

View File

@ -21,6 +21,7 @@ import struct
import threading
from typing import Any
from typing import Dict
from typing import List
from typing import Optional
from typing import Tuple
@ -55,10 +56,7 @@ KIND_MAP = {kind: hidpp10_constants.DEVICE_KIND[str(kind)] for kind in DEVICE_KI
class Device(Protocol):
def feature_request(self, feature: FEATURE) -> Any:
...
def request(self) -> Any:
def feature_request(self, feature, function=0x00, *params, no_reply=False) -> Any:
...
@property
@ -253,7 +251,7 @@ class ReprogrammableKeyV4(ReprogrammableKey):
def remappable_to(self) -> common.NamedInts:
self._device.keys._ensure_all_keys_queried()
ret = common.UnsortedNamedInts()
if self.group_mask != []: # only keys with a non-zero gmask are remappable
if self.group_mask: # only keys with a non-zero gmask are remappable
ret[self.default_task] = self.default_task # it should always be possible to map the key to itself
for g in self.group_mask:
g = special_keys.CID_GROUP[str(g)]
@ -291,7 +289,11 @@ class ReprogrammableKeyV4(ReprogrammableKey):
def _getCidReporting(self):
try:
mapped_data = self._device.feature_request(FEATURE.REPROG_CONTROLS_V4, 0x20, *tuple(struct.pack("!H", self._cid)))
mapped_data = self._device.feature_request(
FEATURE.REPROG_CONTROLS_V4,
0x20,
*tuple(struct.pack("!H", self._cid)),
)
if mapped_data:
cid, mapping_flags_1, mapped_to = struct.unpack("!HBH", mapped_data[:5])
if cid != self._cid and logger.isEnabledFor(logging.WARNING):
@ -316,11 +318,17 @@ class ReprogrammableKeyV4(ReprogrammableKey):
self._mapping_flags = 0
self._mapped_to = self._cid
def _setCidReporting(self, flags=None, remap=0):
"""Sends a `setCidReporting` request with the given parameters. Raises an exception if the parameters are invalid.
Parameters:
- flags {Dict[NamedInt,bool]} -- a dictionary of which mapping flags to set/unset
- remap {int} -- which control ID to remap to; or 0 to keep current mapping
def _setCidReporting(self, flags: Dict[NamedInt, bool] = None, remap: int = 0):
"""Sends a `setCidReporting` request with the given parameters.
Raises an exception if the parameters are invalid.
Parameters
----------
flags
A dictionary of which mapping flags to set/unset.
remap
Which control ID to remap to; or 0 to keep current mapping.
"""
flags = flags if flags else {} # See flake8 B006
@ -555,7 +563,13 @@ class KeysArrayPersistent(KeysArray):
keydata = self.device.feature_request(FEATURE.PERSISTENT_REMAPPABLE_ACTION, 0x20, index, 0xFF)
if keydata:
key = struct.unpack("!H", keydata[:2])[0]
mapped_data = self.device.feature_request(FEATURE.PERSISTENT_REMAPPABLE_ACTION, 0x30, key >> 8, key & 0xFF, 0xFF)
mapped_data = self.device.feature_request(
FEATURE.PERSISTENT_REMAPPABLE_ACTION,
0x30,
key >> 8,
key & 0xFF,
0xFF,
)
if mapped_data:
_ignore, _ignore, actionId, remapped, modifiers, status = struct.unpack("!HBBHBB", mapped_data[:8])
else:
@ -571,7 +585,15 @@ class KeysArrayPersistent(KeysArray):
remapped = special_keys.HID_CONSUMERCODES[remapped]
elif actionId == special_keys.ACTIONID.Empty: # purge data from empty value
remapped = modifiers = 0
self.keys[index] = PersistentRemappableAction(self.device, index, key, actionId, remapped, modifiers, status)
self.keys[index] = PersistentRemappableAction(
self.device,
index,
key,
actionId,
remapped,
modifiers,
status,
)
elif logger.isEnabledFor(logging.WARNING):
logger.warning(f"Key with index {index} was expected to exist but device doesn't report it.")
@ -673,15 +695,15 @@ class Gesture:
if index is not None:
offset = index >> 3 # 8 gestures per byte
mask = 0x1 << (index % 8)
return (offset, mask)
return offset, mask
else:
return (None, None)
return None, None
def enable_offset_mask(gesture):
return gesture._offset_mask(gesture.index)
def enable_offset_mask(self):
return self._offset_mask(self.index)
def diversion_offset_mask(gesture):
return gesture._offset_mask(gesture.diversion_index)
def diversion_offset_mask(self):
return self._offset_mask(self.diversion_index)
def enabled(self): # is the gesture enabled?
if self._enabled is None and self.index is not None:
@ -710,7 +732,14 @@ class Gesture:
return None
if self.diversion_index is not None:
offset, mask = self.diversion_offset_mask()
reply = self._device.feature_request(FEATURE.GESTURE_2, 0x40, offset, 0x01, mask, mask if diverted else 0x00)
reply = self._device.feature_request(
FEATURE.GESTURE_2,
0x40,
offset,
0x01,
mask,
mask if diverted else 0x00,
)
return reply
def as_int(self):
@ -919,7 +948,8 @@ LEDParamSize = {
LEDParam.saturation: 1,
}
# not implemented from x8070 Wave=4, Stars=5, Press=6, Audio=7
# not implemented from x8071 Custom=12, Kitt=13, HSVPulsing=20, WaveC=22, RippleC=23, SignatureActive=24, SignaturePassive=25
# not implemented from x8071 Custom=12, Kitt=13, HSVPulsing=20,
# WaveC=22, RippleC=23, SignatureActive=24, SignaturePassive=25
LEDEffects = {
0x00: [NamedInt(0x00, _("Disabled")), {}],
0x01: [NamedInt(0x01, _("Static")), {LEDParam.color: 0, LEDParam.ramp: 3}],
@ -927,7 +957,10 @@ LEDEffects = {
0x03: [NamedInt(0x03, _("Cycle")), {LEDParam.period: 5, LEDParam.intensity: 7}],
0x08: [NamedInt(0x08, _("Boot")), {}],
0x09: [NamedInt(0x09, _("Demo")), {}],
0x0A: [NamedInt(0x0A, _("Breathe")), {LEDParam.color: 0, LEDParam.period: 3, LEDParam.form: 5, LEDParam.intensity: 6}],
0x0A: [
NamedInt(0x0A, _("Breathe")),
{LEDParam.color: 0, LEDParam.period: 3, LEDParam.form: 5, LEDParam.intensity: 6},
],
0x0B: [NamedInt(0x0B, _("Ripple")), {LEDParam.color: 0, LEDParam.period: 4}],
0x0E: [NamedInt(0x0E, _("Decomposition")), {LEDParam.period: 6, LEDParam.intensity: 8}],
0x0F: [NamedInt(0x0F, _("Signature1")), {LEDParam.period: 5, LEDParam.intensity: 7}],
@ -976,7 +1009,7 @@ class LEDEffectSetting: # an effect plus its parameters
return dumper.represent_mapping("!LEDEffectSetting", data.__dict__, flow_style=True)
def __eq__(self, other):
return type(self) == type(other) and self.to_bytes() == other.to_bytes()
return isinstance(other, self.__class__) and self.to_bytes() == other.to_bytes()
def __str__(self):
return yaml.dump(self, width=float("inf")).rstrip("\n")
@ -1150,7 +1183,8 @@ class Button:
elif self.type == ButtonMappingTypes.No_Action:
bytes += b"\xff\xff"
elif self.behavior == ButtonBehaviors.Function:
bytes += common.int2bytes(self.value, 1) + b"\xff" + (common.int2bytes(self.data, 1) if self.data else b"\x00")
data = common.int2bytes(self.data, 1) if self.data else b"\x00"
bytes += common.int2bytes(self.value, 1) + b"\xff" + data
else:
bytes = self.bytes if self.bytes else b"\xff\xff\xff\xff"
return bytes
@ -1318,7 +1352,9 @@ class OnboardProfiles:
def to_bytes(self):
bytes = b""
for i in range(1, len(self.profiles) + 1):
bytes += common.int2bytes(self.profiles[i].sector, 2) + common.int2bytes(self.profiles[i].enabled, 1) + b"\x00"
profiles_sector = common.int2bytes(self.profiles[i].sector, 2)
profiles_enabled = common.int2bytes(self.profiles[i].enabled, 1)
bytes += profiles_sector + profiles_enabled + b"\x00"
bytes += b"\xff\xff\x00\x00" # marker after last profile
while len(bytes) < self.size - 2: # leave room for CRC
bytes += b"\xff"
@ -1333,7 +1369,14 @@ class OnboardProfiles:
chunk = dev.feature_request(FEATURE.ONBOARD_PROFILES, 0x50, sector >> 8, sector & 0xFF, o >> 8, o & 0xFF)
bytes += chunk
o += 16
chunk = dev.feature_request(FEATURE.ONBOARD_PROFILES, 0x50, sector >> 8, sector & 0xFF, (s - 16) >> 8, (s - 16) & 0xFF)
chunk = dev.feature_request(
FEATURE.ONBOARD_PROFILES,
0x50,
sector >> 8,
sector & 0xFF,
(s - 16) >> 8,
(s - 16) & 0xFF,
)
bytes += chunk[16 + o - s :] # the last chunk has to be read in an awkward way
return bytes
@ -1443,7 +1486,7 @@ class Hidpp20:
if transport_bits & flag:
tid_map[transport] = modelId[offset : offset + 2].hex().upper()
offset = offset + 2
return (unitId.hex().upper(), modelId.hex().upper(), tid_map)
return unitId.hex().upper(), modelId.hex().upper(), tid_map
def get_kind(self, device: Device):
"""Reads a device's type.
@ -1831,6 +1874,7 @@ def decipher_battery_unified(report):
def decipher_adc_measurement(report):
# partial implementation - needs mapping to levels
charge_level = None
adc, flags = struct.unpack("!HB", report[:3])
for level in battery_voltage_remaining:
if level[0] < adc:

View File

@ -150,7 +150,14 @@ FEATURE._fallback = lambda x: f"unknown:{x:04X}"
FEATURE_FLAG = NamedInts(internal=0x20, hidden=0x40, obsolete=0x80)
DEVICE_KIND = NamedInts(
keyboard=0x00, remote_control=0x01, numpad=0x02, mouse=0x03, touchpad=0x04, trackball=0x05, presenter=0x06, receiver=0x07
keyboard=0x00,
remote_control=0x01,
numpad=0x02,
mouse=0x03,
touchpad=0x04,
trackball=0x05,
presenter=0x06,
receiver=0x07,
)
FIRMWARE_KIND = NamedInts(Firmware=0x00, Bootloader=0x01, Hardware=0x02, Other=0x03)

View File

@ -111,7 +111,7 @@ class Setting:
assert hasattr(self, "_device")
if self._validator.kind == KIND.range:
return (self._validator.min_value, self._validator.max_value)
return self._validator.min_value, self._validator.max_value
def _pre_read(self, cached, key=None):
if self.persist and self._value is None and getattr(self._device, "persister", None):
@ -1208,7 +1208,7 @@ class RangeValidator(Validator):
if len(args) == 1:
return args[0] == current
elif len(args) == 2:
return args[0] <= current and current <= args[1]
return args[0] <= current <= args[1]
else:
return False

View File

@ -22,7 +22,17 @@ NAME = "Solaar"
try:
__version__ = (
subprocess.check_output(["git", "describe", "--always"], cwd=sys.path[0], stderr=subprocess.DEVNULL).strip().decode()
subprocess.check_output(
[
"git",
"describe",
"--always",
],
cwd=sys.path[0],
stderr=subprocess.DEVNULL,
)
.strip()
.decode()
)
except Exception:
try:

View File

@ -33,7 +33,9 @@ logger = logging.getLogger(__name__)
def _create_parser():
parser = argparse.ArgumentParser(
prog=NAME.lower(), add_help=False, epilog=f"For details on individual actions, run `{NAME.lower()} <action> --help`."
prog=NAME.lower(),
add_help=False,
epilog=f"For details on individual actions, run `{NAME.lower()} <action> --help`.",
)
subparsers = parser.add_subparsers(title="actions", help="command-line action to perform")
@ -53,7 +55,11 @@ def _create_parser():
)
sp.set_defaults(action="probe")
sp = subparsers.add_parser("profiles", help="read or write onboard profiles", epilog="Only works on active devices.")
sp = subparsers.add_parser(
"profiles",
help="read or write onboard profiles",
epilog="Only works on active devices.",
)
sp.add_argument(
"device",
help="device to read or write profiles of; may be a device number (1..6), a serial number, "
@ -69,7 +75,10 @@ def _create_parser():
)
sp.add_argument(
"device",
help="device to configure; may be a device number (1..6), a serial number, " "or a substring of a device's name",
help=(
"device to configure; may be a device number (1..6), a serial number, ",
"or a substring of a device's name",
),
)
sp.add_argument("setting", nargs="?", help="device-specific setting; leave empty to list available settings")
sp.add_argument("value_key", nargs="?", help="new value for the setting or key for keyed settings")
@ -206,7 +215,7 @@ def run(cli_args=None, hidraw_path=None):
c = list(_receivers(hidraw_path))
if not c:
raise Exception(
'No supported device found. Use "lsusb" and "bluetoothctl devices Connected" to list connected devices.'
'No supported device found. Use "lsusb" and "bluetoothctl devices Connected" to list connected devices.'
)
m = import_module("." + action, package=__name__)
m.run(c, args, _find_receiver, _find_device)

View File

@ -22,6 +22,8 @@ from logitech_receiver.common import NamedInts
from solaar import configuration
APP_ID = "io.github.pwr_solaar.solaar"
def _print_setting(s, verbose=True):
print("#", s.label)
@ -92,7 +94,7 @@ def select_choice(value, choices, setting, key):
break
if val is not None:
value = val
elif ivalue is not None and ivalue >= 1 and ivalue <= len(choices):
elif ivalue is not None and 1 <= ivalue <= len(choices):
value = choices[ivalue - 1]
elif lvalue in ("higher", "lower"):
old_value = setting.read() if key is None else setting.read_key(key)
@ -134,13 +136,13 @@ def select_range(value, setting):
value = int(value)
except ValueError as exc:
raise Exception(f"{setting.name}: can't interpret '{value}' as integer") from exc
min, max = setting.range
if value < min or value > max:
minimum, maximum = setting.range
if value < minimum or value > maximum:
raise Exception(f"{setting.name}: value '{value}' out of bounds")
return value
def run(receivers, args, find_receiver, find_device):
def run(receivers, args, _find_receiver, find_device):
assert receivers
assert args.device
@ -190,7 +192,6 @@ def run(receivers, args, find_receiver, find_device):
from gi.repository import Gtk
if Gtk.init_check()[0]: # can Gtk be initialized?
APP_ID = "io.github.pwr_solaar.solaar"
application = Gtk.Application.new(APP_ID, Gio.ApplicationFlags.HANDLES_COMMAND_LINE)
application.register()
remote = application.get_is_remote()
@ -236,7 +237,7 @@ def set(dev, setting, args, save):
elif setting.kind == settings.KIND.map_choice:
if args.extra_subkey is None:
_print_setting_keyed(setting, args.value_key)
return (None, None, None)
return None, None, None
key = args.value_key
ikey = to_int(key)
k = next((k for k in setting.choices.keys() if key == k), None)
@ -254,7 +255,7 @@ def set(dev, setting, args, save):
elif setting.kind == settings.KIND.multiple_toggle:
if args.extra_subkey is None:
_print_setting_keyed(setting, args.value_key)
return (None, None, None)
return None, None, None
key = args.value_key
all_keys = getattr(setting, "choices_universe", None)
ikey = all_keys[int(key) if key.isdigit() else key] if isinstance(all_keys, NamedInts) else to_int(key)

View File

@ -51,7 +51,7 @@ def run(receivers, args, find_receiver, _ignore):
assert n
if n.devnumber == 0xFF:
notifications.process(receiver, n)
elif n.sub_id == 0x41 and len(n.data) == base._SHORT_MESSAGE_SIZE - 4:
elif n.sub_id == 0x41 and len(n.data) == base.SHORT_MESSAGE_SIZE - 4:
kd, known_devices = known_devices, None # only process one connection notification
if kd is not None:
if n.devnumber not in kd:

View File

@ -50,6 +50,7 @@ def _require(module, os_package, gi=None, gi_package=None, gi_version=None):
battery_icons_style = "regular"
tray_icon_size = None
temp = tempfile.NamedTemporaryFile(prefix="Solaar_", mode="w", delete=True)
@ -72,9 +73,16 @@ def _parse_arguments():
metavar="PATH",
help="unifying receiver to use; the first detected receiver if unspecified. Example: /dev/hidraw2",
)
arg_parser.add_argument("--restart-on-wake-up", action="store_true", help="restart Solaar on sleep wake-up (experimental)")
arg_parser.add_argument(
"-w", "--window", choices=("show", "hide", "only"), help="start with window showing / hidden / only (no tray icon)"
"--restart-on-wake-up",
action="store_true",
help="restart Solaar on sleep wake-up (experimental)",
)
arg_parser.add_argument(
"-w",
"--window",
choices=("show", "hide", "only"),
help="start with window showing / hidden / only (no tray icon)",
)
arg_parser.add_argument(
"-b",

View File

@ -49,7 +49,13 @@ _GHOST_DEVICE.__nonzero__ = _GHOST_DEVICE.__bool__
def _ghost(device):
return _GHOST_DEVICE(receiver=device.receiver, number=device.number, name=device.name, kind=device.kind, online=False)
return _GHOST_DEVICE(
receiver=device.receiver,
number=device.number,
name=device.name,
kind=device.kind,
online=False,
)
class SolaarListener(listener.EventsListener):
@ -230,7 +236,9 @@ class SolaarListener(listener.EventsListener):
def _process_bluez_dbus(device, path, dictionary, signature):
"""Process bluez dbus property changed signals for device status changes to discover disconnections and connections"""
"""Process bluez dbus property changed signals for device status
changes to discover disconnections and connections.
"""
if device:
if dictionary.get("Connected") is not None:
connected = dictionary.get("Connected")

View File

@ -84,7 +84,7 @@ def _command_line(app, command_line):
return 0
def _shutdown(app, shutdown_hook):
def _shutdown(_app, shutdown_hook):
if logger.isEnabledFor(logging.DEBUG):
logger.debug("shutdown")
shutdown_hook()

View File

@ -31,7 +31,7 @@ class AboutViewProtocol(Protocol):
def update_description(self, comments: str) -> None:
...
def update_copyright(self, copyright):
def update_copyright(self, copyright_text: str) -> None:
...
def update_authors(self, authors: list[str]) -> None:
@ -68,8 +68,8 @@ class Presenter:
self.view.update_description(comments)
def update_copyright(self) -> None:
copyright = self.model.get_copyright()
self.view.update_copyright(copyright)
copyright_text = self.model.get_copyright()
self.view.update_copyright(copyright_text)
def update_authors(self) -> None:
authors = self.model.get_authors()

View File

@ -48,7 +48,10 @@ def _create_error_text(reason: str, object_) -> Tuple[str, str]:
elif reason == "unpair":
title = _("Unpairing failed")
text = (
_("Failed to unpair %{device} from %{receiver}.").format(device=object_.name, receiver=object_.receiver.name)
_("Failed to unpair %{device} from %{receiver}.").format(
device=object_.name,
receiver=object_.receiver.name,
)
+ "\n\n"
+ _("The receiver returned an error, with no further details.")
)

View File

@ -52,7 +52,7 @@ def _read_async(setting, force_read, sbox, device_is_online, sensitive):
def _write_async(setting, value, sbox, sensitive=True, key=None):
def _do_write(s, v, sb, key):
def _do_write(_s, v, sb, key):
try:
if key is None:
v = setting.write(v)
@ -87,8 +87,9 @@ class Scale(Gtk.Scale):
class Control:
def __init__(**kwargs):
pass
def __init__(self, **kwargs):
self.sbox = None
self.delegate = None
def init(self, sbox, delegate):
self.sbox = sbox
@ -227,12 +228,12 @@ class ChoiceControlBig(Gtk.Entry, Control):
tooltip = _("Incomplete") if self.value is None else _("Complete - ENTER to change")
self.set_icon_tooltip_text(Gtk.EntryIconPosition.SECONDARY, tooltip)
def activate(self, *args):
def activate(self, *_args):
if self.value is not None and self.get_sensitive():
self.set_icon_from_icon_name(Gtk.EntryIconPosition.SECONDARY, "")
self.delegate.update()
def select(self, completion, model, iter):
def select(self, _completion, model, iter):
self.set_value(model.get(iter, 0)[0])
if self.value and self.get_sensitive():
self.set_icon_from_icon_name(Gtk.EntryIconPosition.SECONDARY, "")
@ -278,7 +279,7 @@ class MapChoiceControl(Gtk.HBox, Control):
if current is not None:
self.valueBox.set_value(current)
def map_value_notify_key(self, *args):
def map_value_notify_key(self, *_args):
key_choice = int(self.keyBox.get_active_id())
if self.keyBox.get_sensitive():
self.map_populate_value_box(key_choice)
@ -321,7 +322,7 @@ class MultipleControl(Gtk.ListBox, Control):
sbox._button = self._button
return True
def toggle_display(self, *args):
def toggle_display(self, *_args):
self._showing = not self._showing
if not self._showing:
for c in self.get_children():
@ -355,7 +356,7 @@ class MultipleToggleControl(MultipleControl):
self.add(h)
self._label_control_pairs.append((lbl, control))
def toggle_notify(self, switch, active):
def toggle_notify(self, switch, _active):
if switch.get_sensitive():
key = switch._setting_key
new_state = switch.get_state()
@ -410,7 +411,12 @@ class MultipleRangeControl(MultipleControl):
h.pack_start(sub_item_lbl, False, False, 0)
sub_item_lbl.set_margin_start(30)
if sub_item.widget == "Scale":
control = Gtk.Scale.new_with_range(Gtk.Orientation.HORIZONTAL, sub_item.minimum, sub_item.maximum, 1)
control = Gtk.Scale.new_with_range(
Gtk.Orientation.HORIZONTAL,
sub_item.minimum,
sub_item.maximum,
1,
)
control.set_round_digits(0)
control.set_digits(0)
h.pack_end(control, True, True, 0)
@ -474,7 +480,6 @@ class MultipleRangeControl(MultipleControl):
class PackedRangeControl(MultipleRangeControl):
def setup(self, setting):
validator = setting._validator
self._items = []
for item in range(validator.count):
h = Gtk.HBox(homogeneous=False, spacing=0)
lbl = Gtk.Label(label=str(validator.keys[item]))
@ -536,8 +541,9 @@ class HeteroKeyControl(Gtk.HBox, Control):
item_lblbox.set_visible(False)
else:
item_lblbox = None
item_box = ComboBoxText()
if item["kind"] == settings.KIND.choice:
item_box = ComboBoxText()
for entry in item["choices"]:
item_box.append(str(int(entry)), str(entry))
item_box.set_active(0)
@ -572,8 +578,8 @@ class HeteroKeyControl(Gtk.HBox, Control):
self.sbox._failed.set_visible(True)
self.setup_visibles(value.ID if value is not None else 0)
def setup_visibles(self, ID):
fields = self.sbox.setting.fields_map[ID][1] if ID in self.sbox.setting.fields_map else {}
def setup_visibles(self, id_):
fields = self.sbox.setting.fields_map[id_][1] if id_ in self.sbox.setting.fields_map else {}
for name, (lblbox, box) in self._items.items():
visible = name in fields or name == "ID"
if lblbox:
@ -635,7 +641,7 @@ def _change_icon(allowed, icon):
icon.set_tooltip_text(_allowables_tooltips[allowed])
def _create_sbox(s, device):
def _create_sbox(s, _device):
sbox = Gtk.HBox(homogeneous=False, spacing=6)
sbox.setting = s
sbox.kind = s.kind
@ -689,10 +695,10 @@ def _create_sbox(s, device):
return sbox
def _update_setting_item(sbox, value, is_online=True, sensitive=True, nullOK=False):
def _update_setting_item(sbox, value, is_online=True, sensitive=True, null_okay=False):
sbox._spinner.stop()
sensitive = sbox._change_icon._allowed if sensitive is None else sensitive
if value is None and not nullOK:
if value is None and not null_okay:
sbox._control.set_sensitive(sensitive is True)
_change_icon(sensitive, sbox._change_icon)
sbox._failed.set_visible(is_online)
@ -807,7 +813,11 @@ def _record_setting(device, setting_class, values):
logger.debug("on %s changing setting %s to %s", device, setting_class.name, values)
setting = next((s for s in device.settings if s.name == setting_class.name), None)
if setting is None and logger.isEnabledFor(logging.DEBUG):
logger.debug("No setting for %s found on %s when trying to record a change made elsewhere", setting_class.name, device)
logger.debug(
"No setting for %s found on %s when trying to record a change made elsewhere",
setting_class.name,
device,
)
if setting:
assert device == setting._device
if len(values) > 1:

View File

@ -616,7 +616,8 @@ class DiversionDialog:
if len(parent_c.components) == 0: # placeholder
_populate_model(m, parent_it, None, level=wrapped.level)
m.remove(it)
self.view.get_selection().select_iter(m.iter_nth_child(parent_it, max(0, min(idx, len(parent_c.components) - 1))))
select = max(0, min(idx, len(parent_c.components) - 1))
self.view.get_selection().select_iter(m.iter_nth_child(parent_it, select))
self.on_update()
return c
@ -1408,7 +1409,6 @@ class _SettingWithValueUI:
def create_widgets(self):
self.widgets = {}
self.label = Gtk.Label(valign=Gtk.Align.CENTER, hexpand=True)
self.label.set_text(self.label_text)
self.widgets[self.label] = (0, 0, 5, 1)
@ -1432,7 +1432,13 @@ class _SettingWithValueUI:
self.device_field.connect("changed", self._on_update)
self.widgets[self.device_field] = (1, 1, 1, 1)
lbl = Gtk.Label(label=_("Setting"), halign=Gtk.Align.CENTER, valign=Gtk.Align.CENTER, hexpand=True, vexpand=False)
lbl = Gtk.Label(
label=_("Setting"),
halign=Gtk.Align.CENTER,
valign=Gtk.Align.CENTER,
hexpand=True,
vexpand=False,
)
self.widgets[lbl] = (0, 2, 1, 1)
self.setting_field = SmartComboBox([(s[0].name, s[0].label) for s in self.ALL_SETTINGS.values()])
self.setting_field.set_valign(Gtk.Align.CENTER)
@ -1569,7 +1575,11 @@ class _SettingWithValueUI:
def item(k):
lbl = labels.get(k, None)
return (k, lbl[0] if lbl and isinstance(lbl, tuple) and lbl[0] else str(k))
if lbl and isinstance(lbl, tuple) and lbl[0]:
label = lbl[0]
else:
label = str(k)
return k, label
with self.ignore_changes():
self.key_field.set_all_values(sorted(map(item, keys), key=lambda k: k[1]))
@ -1688,6 +1698,7 @@ class _SettingWithValueUI:
setting, val_class, kind, keys = cls._setting_attributes(setting_name, device)
device_setting = (device.settings if device else {}).get(setting_name, None)
disp = [setting.label or setting.name if setting else setting_name]
key = None
if kind in cls.MULTIPLE:
key = next(a, None)
key = _from_named_ints(key, keys) if keys else key

View File

@ -166,13 +166,21 @@ def _show_passcode(assistant, receiver, passkey):
page_text = _("Enter passcode on %(name)s.") % {"name": name}
page_text += "\n"
if authentication & 0x01:
page_text += _("Type %(passcode)s and then press the enter key.") % {"passcode": receiver.pairing.device_passkey}
page_text += _("Type %(passcode)s and then press the enter key.") % {
"passcode": receiver.pairing.device_passkey,
}
else:
passcode = ", ".join(
[_("right") if bit == "1" else _("left") for bit in f"{int(receiver.pairing.device_passkey):010b}"]
)
page_text += _("Press %(code)s\nand then press left and right buttons simultaneously.") % {"code": passcode}
page = _create_page(assistant, Gtk.AssistantPageType.PROGRESS, intro_text, "preferences-desktop-peripherals", page_text)
page = _create_page(
assistant,
Gtk.AssistantPageType.PROGRESS,
intro_text,
"preferences-desktop-peripherals",
page_text,
)
assistant.set_page_complete(page, True)
assistant.next_page()
@ -185,7 +193,13 @@ def _create_assistant(receiver, ok, finish, title, text):
assistant.set_resizable(False)
assistant.set_role("pair-device")
if ok:
page_intro = _create_page(assistant, Gtk.AssistantPageType.PROGRESS, title, "preferences-desktop-peripherals", text)
page_intro = _create_page(
assistant,
Gtk.AssistantPageType.PROGRESS,
title,
"preferences-desktop-peripherals",
text,
)
spinner = Gtk.Spinner()
spinner.set_visible(True)
spinner.start()
@ -227,7 +241,7 @@ def _create_success_page(assistant, device):
assistant.commit()
def _create_failure_page(assistant, error):
def _create_failure_page(assistant, error) -> None:
header = _("Pairing failed") + ": " + _(str(error)) + "."
if "timeout" in str(error):
text = _("Make sure your device is within range, and has a decent battery charge.")
@ -242,7 +256,7 @@ def _create_failure_page(assistant, error):
assistant.commit()
def _create_page(assistant, kind, header=None, icon_name=None, text=None):
def _create_page(assistant, kind, header=None, icon_name=None, text=None) -> Gtk.VBox:
p = Gtk.VBox(homogeneous=False, spacing=8)
assistant.append_page(p)
assistant.set_page_type(p, kind)

View File

@ -15,6 +15,7 @@
## 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
from contextlib import contextmanager as contextlib_contextmanager
from typing import Callable
from gi.repository import Gtk
from logitech_receiver import diversion
@ -49,7 +50,7 @@ class CompletionEntry(Gtk.Entry):
class RuleComponentUI:
CLASS = diversion.RuleComponent
def __init__(self, panel, on_update=None):
def __init__(self, panel, on_update: Callable = None):
self.panel = panel
self.widgets = {} # widget -> coord. in grid
self.component = None

View File

@ -72,7 +72,8 @@ def _scroll(tray_icon, event, direction=None):
# ignore all other directions
return
if sum(map(lambda i: i[1] is not None, _devices_info)) < 2: # don't bother even trying to scroll if less than two devices
# don't bother even trying to scroll if less than two devices
if sum(map(lambda i: i[1] is not None, _devices_info)) < 2:
return
# scroll events come way too fast (at least 5-6 at once) so take a little break between them
@ -215,7 +216,10 @@ except ImportError:
icon.set_tooltip_text(NAME)
icon.connect("activate", window.toggle)
icon.connect("scroll-event", _scroll)
icon.connect("popup-menu", lambda icon, button, time: menu.popup(None, None, icon.position_menu, icon, button, time))
icon.connect(
"popup-menu",
lambda icon, button, time: menu.popup(None, None, icon.position_menu, icon, button, time),
)
return icon

View File

@ -501,47 +501,47 @@ def _update_details(button):
# If read_all is False, only return stuff that is ~100% already
# cached, and involves no HID++ calls.
yield (_("Path"), device.path)
yield _("Path"), device.path
if device.kind is None:
yield (_("USB ID"), f"{LOGITECH_VENDOR_ID:04x}:" + device.product_id)
yield _("USB ID"), f"{LOGITECH_VENDOR_ID:04x}:" + device.product_id
if read_all:
yield (_("Serial"), device.serial)
yield _("Serial"), device.serial
else:
yield (_("Serial"), "...")
yield _("Serial"), "..."
else:
# yield ('Codename', device.codename)
yield (_("Index"), device.number)
yield _("Index"), device.number
if device.wpid:
yield (_("Wireless PID"), device.wpid)
yield _("Wireless PID"), device.wpid
if device.product_id:
yield (_("Product ID"), f"{LOGITECH_VENDOR_ID:04x}:" + device.product_id)
yield _("Product ID"), f"{LOGITECH_VENDOR_ID:04x}:" + device.product_id
hid_version = device.protocol
yield (_("Protocol"), f"HID++ {hid_version:1.1f}" if hid_version else _("Unknown"))
yield _("Protocol"), f"HID++ {hid_version:1.1f}" if hid_version else _("Unknown")
if read_all and device.polling_rate:
yield (_("Polling rate"), device.polling_rate)
yield _("Polling rate"), device.polling_rate
if read_all or not device.online:
yield (_("Serial"), device.serial)
yield _("Serial"), device.serial
else:
yield (_("Serial"), "...")
yield _("Serial"), "..."
if read_all and device.unitId and device.unitId != device.serial:
yield (_("Unit ID"), device.unitId)
yield _("Unit ID"), device.unitId
if read_all:
if device.firmware:
for fw in list(device.firmware):
yield (" " + _(str(fw.kind)), (fw.name + " " + fw.version).strip())
yield " " + _(str(fw.kind)), (fw.name + " " + fw.version).strip()
elif device.kind is None or device.online:
yield (f" {_('Firmware')}", "...")
yield f" {_('Firmware')}", "..."
flag_bits = device.notification_flags
if flag_bits is not None:
flag_names = (
(f"({_('none')})",) if flag_bits == 0 else hidpp10_constants.NOTIFICATION_FLAG.flag_names(flag_bits)
)
yield (_("Notifications"), (f"\n{' ':15}").join(flag_names))
yield _("Notifications"), f"\n{' ':15}".join(flag_names)
def _set_details(text):
_details._text.set_markup(text)

View File

@ -6,11 +6,7 @@ from os.path import dirname
from pathlib import Path
from setuptools import find_packages
try:
from setuptools import setup
except ImportError:
from distutils.core import setup
from setuptools import setup
NAME = "Solaar"
version = Path("lib/solaar/version").read_text().strip()

View File

@ -16,6 +16,7 @@
"""HID++ data and functions common to several logitech_receiver test files"""
from __future__ import annotations
import errno
import threading
@ -43,7 +44,17 @@ def ping(responses, handle, devnumber, long_message=False):
return r.response
def request(responses, handle, devnumber, id, *params, no_reply=False, return_error=False, long_message=False, protocol=1.0):
def request(
responses,
handle,
devnumber,
id,
*params,
no_reply=False,
return_error=False,
long_message=False,
protocol=1.0,
):
params = b"".join(pack("B", p) if isinstance(p, int) else p for p in params)
print("REQUEST ", hex(handle), hex(devnumber), hex(id), params.hex())
for r in responses:
@ -54,7 +65,7 @@ def request(responses, handle, devnumber, id, *params, no_reply=False, return_er
@dataclass
class Response:
response: Optional[str]
response: str | float
id: int
params: str = ""
handle: int = 0x11

View File

@ -225,7 +225,7 @@ def test_set_3leds(device, level, charging, warning, p1, p2, mocker):
spy_request.assert_called_once_with(0x8000 | Registers.THREE_LEDS, p1, p2)
@pytest.mark.parametrize("device", [(device_offline), (device_features)])
@pytest.mark.parametrize("device", [device_offline, device_features])
def test_set_3leds_missing(device, mocker):
spy_request = mocker.spy(device, "request")

View File

@ -288,9 +288,12 @@ def test_ReprogrammableKeyV4_set(responses, index, diverted, persistently_divert
(fake_hidpp.responses_key, 3, 0x0053, 0x02, 0x0001, 0x00, 1, "Mouse Button: 1", "", "02000100", "7FFFFFFF"),
],
)
def test_RemappableAction(r, index, cid, actionId, remapped, mask, status, action, modifiers, byts, remap, mocker):
def test_remappable_action(r, index, cid, actionId, remapped, mask, status, action, modifiers, byts, remap, mocker):
if int(remap, 16) == special_keys.KEYS_Default:
responses = r + [fake_hidpp.Response("040000", 0x0000, "1C00"), fake_hidpp.Response("00", 0x450, f"{cid:04X}" + "FF")]
responses = r + [
fake_hidpp.Response("040000", 0x0000, "1C00"),
fake_hidpp.Response("00", 0x450, f"{cid:04X}" + "FF"),
]
else:
responses = r + [
fake_hidpp.Response("040000", 0x0000, "1C00"),