Introduce HIDAPI protocol

Improve type hints and names.
This commit is contained in:
MattHag 2024-10-01 02:30:13 +02:00 committed by Peter F. Patel-Schneider
parent 46366b2430
commit 58ddb0d6cd
3 changed files with 80 additions and 34 deletions

View File

@ -33,6 +33,7 @@ import typing
from threading import Thread
from time import sleep
from typing import Any
from typing import Callable
from hidapi.common import DeviceInfo
@ -225,10 +226,18 @@ class _DeviceMonitor(Thread):
sleep(self.polling_delay)
# The filterfn is used to determine whether this is a device of interest to Solaar.
# It is given the bus id, vendor id, and product id and returns a dictionary
# with the required hid_driver and usb_interface and whether this is a receiver or device.
def _match(action, device, filterfn):
def _match(
action: str,
device,
filter_func: Callable[[int, int, int, bool, bool], dict[str, Any]],
):
"""
The filter_func is used to determine whether this is a device of
interest to Solaar. It is given the bus id, vendor id, and product
id and returns a dictionary with the required hid_driver and
usb_interface and whether this is a receiver or device.
"""
vid = device["vendor_id"]
pid = device["product_id"]
@ -246,10 +255,10 @@ def _match(action, device, filterfn):
device_handle = None
try:
device_handle = open_path(device["path"])
report = get_input_report(device_handle, 0x10, 32)
report = _get_input_report(device_handle, 0x10, 32)
if len(report) == 1 + 6 and report[0] == 0x10:
device["hidpp_short"] = True
report = get_input_report(device_handle, 0x11, 32)
report = _get_input_report(device_handle, 0x11, 32)
if len(report) == 1 + 19 and report[0] == 0x11:
device["hidpp_long"] = True
except HIDError as e: # noqa: F841
@ -272,10 +281,10 @@ def _match(action, device, filterfn):
if not device["hidpp_short"] and not device["hidpp_long"]:
return None
filter_func = filterfn(bus_id, vid, pid, device["hidpp_short"], device["hidpp_long"])
if not filter_func:
filtered_result = filter_func(bus_id, vid, pid, device["hidpp_short"], device["hidpp_long"])
if not filtered_result:
return
isDevice = filter_func.get("isDevice")
is_device = filtered_result.get("isDevice")
if action == ACTION_ADD:
d_info = DeviceInfo(
@ -289,7 +298,7 @@ def _match(action, device, filterfn):
product=device["product_string"],
serial=device["serial_number"],
release=device["release_number"],
isDevice=isDevice,
isDevice=is_device,
hidpp_short=device["hidpp_short"],
hidpp_long=device["hidpp_long"],
)
@ -307,7 +316,7 @@ def _match(action, device, filterfn):
product=None,
serial=None,
release=None,
isDevice=isDevice,
isDevice=is_device,
hidpp_short=None,
hidpp_long=None,
)
@ -324,19 +333,26 @@ def find_paired_node_wpid(receiver_path: str, index: int):
return None
def monitor_glib(glib: GLib, callback: Callable, filterfn: Callable):
def monitor_glib(
glib: GLib,
callback: Callable,
filter_func: Callable[[int, int, int, bool, bool], dict[str, Any]],
) -> None:
"""Monitor GLib.
Parameters
----------
glib
GLib instance.
callback
Called when device found.
filter_func
Filter devices callback.
"""
def device_callback(action, device):
# print(f"device_callback({action}): {device}")
def device_callback(action: str, device):
if action == ACTION_ADD:
d_info = _match(action, device, filterfn)
d_info = _match(action, device, filter_func)
if d_info:
glib.idle_add(callback, action, d_info)
elif action == ACTION_REMOVE:
@ -347,7 +363,7 @@ def monitor_glib(glib: GLib, callback: Callable, filterfn: Callable):
monitor.start()
def enumerate(filterfn):
def enumerate(filter_func) -> DeviceInfo:
"""Enumerate the HID Devices.
List all the HID devices attached to the system, optionally filtering by
@ -356,7 +372,7 @@ def enumerate(filterfn):
:returns: a list of matching ``DeviceInfo`` tuples.
"""
for device in _enumerate_devices():
d_info = _match(ACTION_ADD, device, filterfn)
d_info = _match(ACTION_ADD, device, filter_func)
if d_info:
yield d_info
@ -377,7 +393,7 @@ def open(vendor_id, product_id, serial=None):
return device_handle
def open_path(device_path):
def open_path(device_path) -> Any:
"""Open a HID device by its path name.
:param device_path: the path of a ``DeviceInfo`` tuple returned by enumerate().
@ -393,7 +409,7 @@ def open_path(device_path):
return device_handle
def close(device_handle):
def close(device_handle) -> None:
"""Close a HID device.
:param device_handle: a device handle returned by open() or open_path().
@ -402,7 +418,7 @@ def close(device_handle):
_hidapi.hid_close(device_handle)
def write(device_handle, data):
def write(device_handle: int, data: bytes) -> int:
"""Write an Output report to a HID device.
:param device_handle: a device handle returned by open() or open_path().
@ -463,7 +479,7 @@ def read(device_handle, bytes_count, timeout_ms=None):
return data.raw[:bytes_read]
def get_input_report(device_handle, report_id, size):
def _get_input_report(device_handle, report_id, size):
assert device_handle
data = ctypes.create_string_buffer(size)
data[0] = bytearray((report_id,))

View File

@ -79,10 +79,13 @@ def exit():
return True
# The filterfn is used to determine whether this is a device of interest to Solaar.
# It is given the bus id, vendor id, and product id and returns a dictionary
# with the required hid_driver and usb_interface and whether this is a receiver or device.
def _match(action, device, filter_func: typing.Callable[[int, int, int, bool, bool], dict[str, typing.Any]]):
def _match(action: str, device, filter_func: typing.Callable[[int, int, int, bool, bool], dict[str, typing.Any]]):
"""
The filter_func is used to determine whether this is a device of
interest to Solaar. It is given the bus id, vendor id, and product
id and returns a dictionary with the required hid_driver and
usb_interface and whether this is a receiver or device."""
if logger.isEnabledFor(logging.DEBUG):
logger.debug(f"Dbus event {action} {device}")
hid_device = device.find_parent("hid")
@ -207,7 +210,7 @@ def find_paired_node(receiver_path: str, index: int, timeout: int):
return None
def find_paired_node_wpid(receiver_path, index):
def find_paired_node_wpid(receiver_path: str, index: int):
"""Find the node of a device paired with a receiver, get wpid from udev"""
context = pyudev.Context()
receiver_phys = pyudev.Devices.from_device_file(context, receiver_path).find_parent("hid").get("HID_PHYS")
@ -228,7 +231,7 @@ def find_paired_node_wpid(receiver_path, index):
return None
def monitor_glib(glib: GLib, callback: Callable, filterfn: typing.Callable):
def monitor_glib(glib: GLib, callback: Callable, filter_func: Callable):
"""Monitor GLib.
Parameters
@ -240,14 +243,14 @@ def monitor_glib(glib: GLib, callback: Callable, filterfn: typing.Callable):
m = pyudev.Monitor.from_netlink(c)
m.filter_by(subsystem="hidraw")
def _process_udev_event(monitor, condition, cb, filterfn):
def _process_udev_event(monitor, condition, cb, filter_func):
if condition == glib.IO_IN:
event = monitor.receive_device()
if event:
action, device = event
# print ("***", action, device)
if action == ACTION_ADD:
d_info = _match(action, device, filterfn)
d_info = _match(action, device, filter_func)
if d_info:
glib.idle_add(cb, action, d_info)
elif action == ACTION_REMOVE:
@ -257,13 +260,13 @@ def monitor_glib(glib: GLib, callback: Callable, filterfn: typing.Callable):
try:
# io_add_watch_full may not be available...
glib.io_add_watch_full(m, glib.PRIORITY_LOW, glib.IO_IN, _process_udev_event, callback, filterfn)
glib.io_add_watch_full(m, glib.PRIORITY_LOW, glib.IO_IN, _process_udev_event, callback, filter_func)
except AttributeError:
try:
# and the priority parameter appeared later in the API
glib.io_add_watch(m, glib.PRIORITY_LOW, glib.IO_IN, _process_udev_event, callback, filterfn)
glib.io_add_watch(m, glib.PRIORITY_LOW, glib.IO_IN, _process_udev_event, callback, filter_func)
except Exception:
glib.io_add_watch(m, glib.IO_IN, _process_udev_event, callback, filterfn)
glib.io_add_watch(m, glib.IO_IN, _process_udev_event, callback, filter_func)
if logger.isEnabledFor(logging.DEBUG):
logger.debug("Starting dbus monitoring")
@ -327,7 +330,7 @@ def open_path(device_path):
raise
def close(device_handle):
def close(device_handle) -> None:
"""Close a HID device.
:param device_handle: a device handle returned by open() or open_path().

View File

@ -43,6 +43,8 @@ from .common import LOGITECH_VENDOR_ID
from .common import BusID
if typing.TYPE_CHECKING:
from hidapi.common import DeviceInfo
gi.require_version("Gdk", "3.0")
from gi.repository import GLib # NOQA: E402
@ -54,6 +56,30 @@ else:
logger = logging.getLogger(__name__)
class HIDAPI(typing.Protocol):
def find_paired_node_wpid(self, receiver_path: str, index: int): ...
def find_paired_node(self, receiver_path: str, index: int, timeout: int): ...
def open(self, vendor_id, product_id, serial=None): ...
def open_path(self, path): ...
def enumerate(self, filter_func: Callable[[int, int, int, bool, bool], dict[str, typing.Any]]) -> DeviceInfo: ...
def monitor_glib(
self, glib: GLib, callback: Callable, filter_func: Callable[[int, int, int, bool, bool], dict[str, typing.Any]]
) -> None: ...
def read(self, device_handle, bytes_count, timeout_ms): ...
def write(self, device_handle: int, data: bytes) -> int: ...
def close(self, device_handle) -> None: ...
hidapi = typing.cast(HIDAPI, hidapi)
SHORT_MESSAGE_SIZE = 7
_LONG_MESSAGE_SIZE = 20
_MEDIUM_MESSAGE_SIZE = 15
@ -639,7 +665,8 @@ def ping(handle, devnumber, long_message: bool = False):
and reply_data[1:3] == request_data[:2]
): # error response
error = ord(reply_data[3:4])
if error == hidpp10_constants.ERROR.invalid_SubID__command: # valid reply from HID++ 1.0 device
if error == hidpp10_constants.ERROR.invalid_SubID__command:
# a valid reply from a HID++ 1.0 device
return 1.0
if (
error == hidpp10_constants.ERROR.resource_error