From 58ddb0d6cd74ebd9b77fd50e52afaecc03f0eeb0 Mon Sep 17 00:00:00 2001 From: MattHag <16444067+MattHag@users.noreply.github.com> Date: Tue, 1 Oct 2024 02:30:13 +0200 Subject: [PATCH] Introduce HIDAPI protocol Improve type hints and names. --- lib/hidapi/hidapi_impl.py | 58 ++++++++++++++++++++++------------- lib/hidapi/udev_impl.py | 27 ++++++++-------- lib/logitech_receiver/base.py | 29 +++++++++++++++++- 3 files changed, 80 insertions(+), 34 deletions(-) diff --git a/lib/hidapi/hidapi_impl.py b/lib/hidapi/hidapi_impl.py index 74e7bb33..9e04153a 100644 --- a/lib/hidapi/hidapi_impl.py +++ b/lib/hidapi/hidapi_impl.py @@ -33,6 +33,7 @@ import typing from threading import Thread from time import sleep +from typing import Any from typing import Callable from hidapi.common import DeviceInfo @@ -225,10 +226,18 @@ class _DeviceMonitor(Thread): sleep(self.polling_delay) -# The filterfn is used to determine whether this is a device of interest to Solaar. -# It is given the bus id, vendor id, and product id and returns a dictionary -# with the required hid_driver and usb_interface and whether this is a receiver or device. -def _match(action, device, filterfn): +def _match( + action: str, + device, + filter_func: Callable[[int, int, int, bool, bool], dict[str, Any]], +): + """ + The filter_func is used to determine whether this is a device of + interest to Solaar. It is given the bus id, vendor id, and product + id and returns a dictionary with the required hid_driver and + usb_interface and whether this is a receiver or device. + """ + vid = device["vendor_id"] pid = device["product_id"] @@ -246,10 +255,10 @@ def _match(action, device, filterfn): device_handle = None try: device_handle = open_path(device["path"]) - report = get_input_report(device_handle, 0x10, 32) + report = _get_input_report(device_handle, 0x10, 32) if len(report) == 1 + 6 and report[0] == 0x10: device["hidpp_short"] = True - report = get_input_report(device_handle, 0x11, 32) + report = _get_input_report(device_handle, 0x11, 32) if len(report) == 1 + 19 and report[0] == 0x11: device["hidpp_long"] = True except HIDError as e: # noqa: F841 @@ -272,10 +281,10 @@ def _match(action, device, filterfn): if not device["hidpp_short"] and not device["hidpp_long"]: return None - filter_func = filterfn(bus_id, vid, pid, device["hidpp_short"], device["hidpp_long"]) - if not filter_func: + filtered_result = filter_func(bus_id, vid, pid, device["hidpp_short"], device["hidpp_long"]) + if not filtered_result: return - isDevice = filter_func.get("isDevice") + is_device = filtered_result.get("isDevice") if action == ACTION_ADD: d_info = DeviceInfo( @@ -289,7 +298,7 @@ def _match(action, device, filterfn): product=device["product_string"], serial=device["serial_number"], release=device["release_number"], - isDevice=isDevice, + isDevice=is_device, hidpp_short=device["hidpp_short"], hidpp_long=device["hidpp_long"], ) @@ -307,7 +316,7 @@ def _match(action, device, filterfn): product=None, serial=None, release=None, - isDevice=isDevice, + isDevice=is_device, hidpp_short=None, hidpp_long=None, ) @@ -324,19 +333,26 @@ def find_paired_node_wpid(receiver_path: str, index: int): return None -def monitor_glib(glib: GLib, callback: Callable, filterfn: Callable): +def monitor_glib( + glib: GLib, + callback: Callable, + filter_func: Callable[[int, int, int, bool, bool], dict[str, Any]], +) -> None: """Monitor GLib. Parameters ---------- glib GLib instance. + callback + Called when device found. + filter_func + Filter devices callback. """ - def device_callback(action, device): - # print(f"device_callback({action}): {device}") + def device_callback(action: str, device): if action == ACTION_ADD: - d_info = _match(action, device, filterfn) + d_info = _match(action, device, filter_func) if d_info: glib.idle_add(callback, action, d_info) elif action == ACTION_REMOVE: @@ -347,7 +363,7 @@ def monitor_glib(glib: GLib, callback: Callable, filterfn: Callable): monitor.start() -def enumerate(filterfn): +def enumerate(filter_func) -> DeviceInfo: """Enumerate the HID Devices. List all the HID devices attached to the system, optionally filtering by @@ -356,7 +372,7 @@ def enumerate(filterfn): :returns: a list of matching ``DeviceInfo`` tuples. """ for device in _enumerate_devices(): - d_info = _match(ACTION_ADD, device, filterfn) + d_info = _match(ACTION_ADD, device, filter_func) if d_info: yield d_info @@ -377,7 +393,7 @@ def open(vendor_id, product_id, serial=None): return device_handle -def open_path(device_path): +def open_path(device_path) -> Any: """Open a HID device by its path name. :param device_path: the path of a ``DeviceInfo`` tuple returned by enumerate(). @@ -393,7 +409,7 @@ def open_path(device_path): return device_handle -def close(device_handle): +def close(device_handle) -> None: """Close a HID device. :param device_handle: a device handle returned by open() or open_path(). @@ -402,7 +418,7 @@ def close(device_handle): _hidapi.hid_close(device_handle) -def write(device_handle, data): +def write(device_handle: int, data: bytes) -> int: """Write an Output report to a HID device. :param device_handle: a device handle returned by open() or open_path(). @@ -463,7 +479,7 @@ def read(device_handle, bytes_count, timeout_ms=None): return data.raw[:bytes_read] -def get_input_report(device_handle, report_id, size): +def _get_input_report(device_handle, report_id, size): assert device_handle data = ctypes.create_string_buffer(size) data[0] = bytearray((report_id,)) diff --git a/lib/hidapi/udev_impl.py b/lib/hidapi/udev_impl.py index 8f3d412c..7f5baf2a 100644 --- a/lib/hidapi/udev_impl.py +++ b/lib/hidapi/udev_impl.py @@ -79,10 +79,13 @@ def exit(): return True -# The filterfn is used to determine whether this is a device of interest to Solaar. -# It is given the bus id, vendor id, and product id and returns a dictionary -# with the required hid_driver and usb_interface and whether this is a receiver or device. -def _match(action, device, filter_func: typing.Callable[[int, int, int, bool, bool], dict[str, typing.Any]]): +def _match(action: str, device, filter_func: typing.Callable[[int, int, int, bool, bool], dict[str, typing.Any]]): + """ + + The filter_func is used to determine whether this is a device of + interest to Solaar. It is given the bus id, vendor id, and product + id and returns a dictionary with the required hid_driver and + usb_interface and whether this is a receiver or device.""" if logger.isEnabledFor(logging.DEBUG): logger.debug(f"Dbus event {action} {device}") hid_device = device.find_parent("hid") @@ -207,7 +210,7 @@ def find_paired_node(receiver_path: str, index: int, timeout: int): return None -def find_paired_node_wpid(receiver_path, index): +def find_paired_node_wpid(receiver_path: str, index: int): """Find the node of a device paired with a receiver, get wpid from udev""" context = pyudev.Context() receiver_phys = pyudev.Devices.from_device_file(context, receiver_path).find_parent("hid").get("HID_PHYS") @@ -228,7 +231,7 @@ def find_paired_node_wpid(receiver_path, index): return None -def monitor_glib(glib: GLib, callback: Callable, filterfn: typing.Callable): +def monitor_glib(glib: GLib, callback: Callable, filter_func: Callable): """Monitor GLib. Parameters @@ -240,14 +243,14 @@ def monitor_glib(glib: GLib, callback: Callable, filterfn: typing.Callable): m = pyudev.Monitor.from_netlink(c) m.filter_by(subsystem="hidraw") - def _process_udev_event(monitor, condition, cb, filterfn): + def _process_udev_event(monitor, condition, cb, filter_func): if condition == glib.IO_IN: event = monitor.receive_device() if event: action, device = event # print ("***", action, device) if action == ACTION_ADD: - d_info = _match(action, device, filterfn) + d_info = _match(action, device, filter_func) if d_info: glib.idle_add(cb, action, d_info) elif action == ACTION_REMOVE: @@ -257,13 +260,13 @@ def monitor_glib(glib: GLib, callback: Callable, filterfn: typing.Callable): try: # io_add_watch_full may not be available... - glib.io_add_watch_full(m, glib.PRIORITY_LOW, glib.IO_IN, _process_udev_event, callback, filterfn) + glib.io_add_watch_full(m, glib.PRIORITY_LOW, glib.IO_IN, _process_udev_event, callback, filter_func) except AttributeError: try: # and the priority parameter appeared later in the API - glib.io_add_watch(m, glib.PRIORITY_LOW, glib.IO_IN, _process_udev_event, callback, filterfn) + glib.io_add_watch(m, glib.PRIORITY_LOW, glib.IO_IN, _process_udev_event, callback, filter_func) except Exception: - glib.io_add_watch(m, glib.IO_IN, _process_udev_event, callback, filterfn) + glib.io_add_watch(m, glib.IO_IN, _process_udev_event, callback, filter_func) if logger.isEnabledFor(logging.DEBUG): logger.debug("Starting dbus monitoring") @@ -327,7 +330,7 @@ def open_path(device_path): raise -def close(device_handle): +def close(device_handle) -> None: """Close a HID device. :param device_handle: a device handle returned by open() or open_path(). diff --git a/lib/logitech_receiver/base.py b/lib/logitech_receiver/base.py index 326a3b5a..b3319c5b 100644 --- a/lib/logitech_receiver/base.py +++ b/lib/logitech_receiver/base.py @@ -43,6 +43,8 @@ from .common import LOGITECH_VENDOR_ID from .common import BusID if typing.TYPE_CHECKING: + from hidapi.common import DeviceInfo + gi.require_version("Gdk", "3.0") from gi.repository import GLib # NOQA: E402 @@ -54,6 +56,30 @@ else: logger = logging.getLogger(__name__) +class HIDAPI(typing.Protocol): + def find_paired_node_wpid(self, receiver_path: str, index: int): ... + + def find_paired_node(self, receiver_path: str, index: int, timeout: int): ... + + def open(self, vendor_id, product_id, serial=None): ... + + def open_path(self, path): ... + + def enumerate(self, filter_func: Callable[[int, int, int, bool, bool], dict[str, typing.Any]]) -> DeviceInfo: ... + + def monitor_glib( + self, glib: GLib, callback: Callable, filter_func: Callable[[int, int, int, bool, bool], dict[str, typing.Any]] + ) -> None: ... + + def read(self, device_handle, bytes_count, timeout_ms): ... + + def write(self, device_handle: int, data: bytes) -> int: ... + + def close(self, device_handle) -> None: ... + + +hidapi = typing.cast(HIDAPI, hidapi) + SHORT_MESSAGE_SIZE = 7 _LONG_MESSAGE_SIZE = 20 _MEDIUM_MESSAGE_SIZE = 15 @@ -639,7 +665,8 @@ def ping(handle, devnumber, long_message: bool = False): and reply_data[1:3] == request_data[:2] ): # error response error = ord(reply_data[3:4]) - if error == hidpp10_constants.ERROR.invalid_SubID__command: # valid reply from HID++ 1.0 device + if error == hidpp10_constants.ERROR.invalid_SubID__command: + # a valid reply from a HID++ 1.0 device return 1.0 if ( error == hidpp10_constants.ERROR.resource_error