From 1729189981f3972f1e9dd01b7d2edbd3df9ce6c2 Mon Sep 17 00:00:00 2001 From: MattHag <16444067+MattHag@users.noreply.github.com> Date: Sat, 28 Sep 2024 14:08:56 +0200 Subject: [PATCH] base: Add find_paired_node functions Avoid direct access to hidapi and use the base module as low-level API instead. This change replaces the remaining calls to find_paired_node and find_paired_node_wpid by exposing them via base module. --- lib/logitech_receiver/device.py | 13 ++++++------ lib/logitech_receiver/receiver.py | 26 +++++++++--------------- lib/solaar/cli/__init__.py | 12 +++-------- lib/solaar/listener.py | 14 ++----------- tests/logitech_receiver/test_device.py | 24 ++++++++++------------ tests/logitech_receiver/test_receiver.py | 19 ++++++----------- 6 files changed, 38 insertions(+), 70 deletions(-) diff --git a/lib/logitech_receiver/device.py b/lib/logitech_receiver/device.py index 0891bd63..6f53fb63 100644 --- a/lib/logitech_receiver/device.py +++ b/lib/logitech_receiver/device.py @@ -46,7 +46,10 @@ _hidpp20 = hidpp20.Hidpp20() class LowLevelInterface(Protocol): - def open_path(self, path): + def open_path(self, path) -> Any: + ... + + def find_paired_node(self, receiver_path: str, index: int, timeout: int): ... def ping(self, handle, number, long_message: bool): @@ -64,9 +67,7 @@ low_level_interface = cast(LowLevelInterface, base) class DeviceFactory: @staticmethod - def create_device( - find_paired_node_func: Callable[[str, int, int], Any], low_level: LowLevelInterface, device_info, setting_callback=None - ): + def create_device(low_level: LowLevelInterface, device_info, setting_callback=None): """Opens a Logitech Device found attached to the machine, by Linux device path. :returns: An open file handle for the found receiver, or None. """ @@ -75,7 +76,6 @@ class DeviceFactory: if handle: # a direct connected device might not be online (as reported by user) return Device( - find_paired_node_func, low_level, None, None, @@ -100,7 +100,6 @@ class Device: def __init__( self, - find_paired_node_func: Callable[[str, int, int], Any], low_level: LowLevelInterface, receiver, number, @@ -157,7 +156,7 @@ class Device: self.cleanups = [] # functions to run on the device when it is closed if not self.path: - self.path = find_paired_node_func(receiver.path, number, 1) if receiver else None + self.path = self.low_level.find_paired_node(receiver.path, number, 1) if receiver else None if not self.handle: try: self.handle = self.low_level.open_path(self.path) if self.path else None diff --git a/lib/logitech_receiver/receiver.py b/lib/logitech_receiver/receiver.py index efd85ec3..f69a9357 100644 --- a/lib/logitech_receiver/receiver.py +++ b/lib/logitech_receiver/receiver.py @@ -20,7 +20,6 @@ import logging import time from dataclasses import dataclass -from typing import Any from typing import Callable from typing import Optional from typing import Protocol @@ -48,6 +47,9 @@ class LowLevelInterface(Protocol): def open_path(self, path): ... + def find_paired_node_wpid(self, receiver_path: str, index: int): + ... + def ping(self, handle, number, long_message=False): ... @@ -89,7 +91,6 @@ class Receiver: def __init__( self, - find_paired_node_wpid_func: Callable[[str, int], Any], receiver_kind, product_info, handle, @@ -98,7 +99,6 @@ class Receiver: setting_callback=None, ): assert handle - self._find_paired_node_wpid_func = find_paired_node_wpid_func self.isDevice = False # some devices act as receiver so we need a property to distinguish them self.handle = handle self.path = path @@ -398,10 +398,8 @@ class Receiver: class BoltReceiver(Receiver): """Bolt receivers use a different pairing prototol and have different pairing registers""" - def __init__( - self, find_paired_node_wpid_func, receiver_kind, product_info, handle, path, product_id, setting_callback=None - ): - super().__init__(find_paired_node_wpid_func, receiver_kind, product_info, handle, path, product_id, setting_callback) + def __init__(self, receiver_kind, product_info, handle, path, product_id, setting_callback=None): + super().__init__(receiver_kind, product_info, handle, path, product_id, setting_callback) def initialize(self, product_info: dict): serial_reply = self.read_register(Registers.BOLT_UNIQUE_ID) @@ -465,10 +463,8 @@ class LightSpeedReceiver(Receiver): class Ex100Receiver(Receiver): """A very old style receiver, somewhat different from newer receivers""" - def __init__( - self, find_paired_node_wpid_func, receiver_kind, product_info, handle, path, product_id, setting_callback=None - ): - super().__init__(find_paired_node_wpid_func, receiver_kind, product_info, handle, path, product_id, setting_callback) + def __init__(self, receiver_kind, product_info, handle, path, product_id, setting_callback=None): + super().__init__(receiver_kind, product_info, handle, path, product_id, setting_callback) def initialize(self, product_info: dict): self.serial = None @@ -484,7 +480,8 @@ class Ex100Receiver(Receiver): return online, encrypted, wpid, kind def device_pairing_information(self, number: int) -> dict: - wpid = self._find_paired_node_wpid_func(self.path, number) # extract WPID from udev path + # extract WPID from udev path + wpid = base.find_paired_node_wpid(self.path, number) if not wpid: logger.error("Unable to get wpid from udev for device %d of %s", number, self) raise exceptions.NoSuchDevice(number=number, receiver=self, error="Not present 27Mhz device") @@ -520,9 +517,7 @@ receiver_class_mapping = { class ReceiverFactory: @staticmethod - def create_receiver( - find_paired_node_wpid_func: Callable[[str, int], Any], device_info, setting_callback=None - ) -> Optional[Receiver]: + def create_receiver(device_info, setting_callback=None) -> Optional[Receiver]: """Opens a Logitech Receiver found attached to the machine, by Linux device path.""" try: @@ -538,7 +533,6 @@ class ReceiverFactory: kind = product_info.get("receiver_kind", "unknown") rclass = receiver_class_mapping.get(kind, Receiver) return rclass( - find_paired_node_wpid_func, kind, product_info, handle, diff --git a/lib/solaar/cli/__init__.py b/lib/solaar/cli/__init__.py index 1eca8544..927b871c 100644 --- a/lib/solaar/cli/__init__.py +++ b/lib/solaar/cli/__init__.py @@ -16,7 +16,6 @@ import argparse import logging -import platform import sys from importlib import import_module @@ -29,11 +28,6 @@ from logitech_receiver import receiver from solaar import NAME -if platform.system() == "Linux": - import hidapi.udev_impl as hidapi -else: - import hidapi.hidapi_impl as hidapi - logger = logging.getLogger(__name__) @@ -112,7 +106,7 @@ def _receivers(dev_path=None): if dev_path is not None and dev_path != dev_info.path: continue try: - r = receiver.ReceiverFactory.create_receiver(hidapi.find_paired_node_wpid, dev_info) + r = receiver.ReceiverFactory.create_receiver(dev_info) if logger.isEnabledFor(logging.DEBUG): logger.debug("[%s] => %s", dev_info.path, r) if r: @@ -128,9 +122,9 @@ def _receivers_and_devices(dev_path=None): continue try: if dev_info.isDevice: - d = device.DeviceFactory.create_device(hidapi.find_paired_node, base, dev_info) + d = device.DeviceFactory.create_device(base, dev_info) else: - d = receiver.ReceiverFactory.create_receiver(hidapi.find_paired_node_wpid, dev_info) + d = receiver.ReceiverFactory.create_receiver(dev_info) if logger.isEnabledFor(logging.DEBUG): logger.debug("[%s] => %s", dev_info.path, d) diff --git a/lib/solaar/listener.py b/lib/solaar/listener.py index 3b6d45e4..ef76d8a8 100644 --- a/lib/solaar/listener.py +++ b/lib/solaar/listener.py @@ -17,7 +17,6 @@ import errno import logging -import platform import subprocess import time @@ -40,11 +39,6 @@ from . import i18n gi.require_version("Gtk", "3.0") # NOQA: E402 from gi.repository import GLib # NOQA: E402 # isort:skip -if platform.system() == "Linux": - import hidapi.udev_impl as hidapi -else: - import hidapi.hidapi_impl as hidapi - logger = logging.getLogger(__name__) _GHOST_DEVICE = namedtuple("_GHOST_DEVICE", ("receiver", "number", "name", "kind", "online")) @@ -261,13 +255,9 @@ def _start(device_info): assert _status_callback and _setting_callback isDevice = device_info.isDevice if not isDevice: - receiver_ = logitech_receiver.receiver.ReceiverFactory.create_receiver( - hidapi.find_paired_node_wpid, device_info, _setting_callback - ) + receiver_ = logitech_receiver.receiver.ReceiverFactory.create_receiver(device_info, _setting_callback) else: - receiver_ = logitech_receiver.device.DeviceFactory.create_device( - hidapi.find_paired_node, base, device_info, _setting_callback - ) + receiver_ = logitech_receiver.device.DeviceFactory.create_device(base, device_info, _setting_callback) if receiver_: configuration.attach_to(receiver_) if receiver_.bluetooth and receiver_.hid_serial: diff --git a/tests/logitech_receiver/test_device.py b/tests/logitech_receiver/test_device.py index 851b93d3..fde08a68 100644 --- a/tests/logitech_receiver/test_device.py +++ b/tests/logitech_receiver/test_device.py @@ -17,7 +17,6 @@ from dataclasses import dataclass from functools import partial from typing import Optional -from unittest import mock import pytest @@ -35,6 +34,9 @@ class LowLevelInterfaceFake: def open_path(self, path): return fake_hidpp.open_path(path) + def find_paired_node(self, receiver_path: str, index: int, timeout: int): + return None + def request(self, response, *args, **kwargs): func = partial(fake_hidpp.request, self.responses) return func(response, *args, **kwargs) @@ -80,12 +82,12 @@ def test_create_device(device_info, responses, expected_success): low_level_mock = LowLevelInterfaceFake(responses) if expected_success is None: with pytest.raises(PermissionError): - device.DeviceFactory.create_device(mock.Mock(), low_level_mock, device_info) + device.DeviceFactory.create_device(low_level_mock, device_info) elif not expected_success: with pytest.raises(TypeError): - device.DeviceFactory.create_device(mock.Mock(), low_level_mock, device_info) + device.DeviceFactory.create_device(low_level_mock, device_info) else: - test_device = device.DeviceFactory.create_device(mock.Mock(), low_level_mock, device_info) + test_device = device.DeviceFactory.create_device(low_level_mock, device_info) assert bool(test_device) == expected_success @@ -96,7 +98,7 @@ def test_create_device(device_info, responses, expected_success): def test_device_name(device_info, responses, expected_codename, expected_name, expected_kind): low_level = LowLevelInterfaceFake(responses) - test_device = device.DeviceFactory.create_device(mock.Mock(), low_level, device_info) + test_device = device.DeviceFactory.create_device(low_level, device_info) assert test_device.codename == expected_codename assert test_device.name == expected_name @@ -124,9 +126,7 @@ def test_device_name(device_info, responses, expected_codename, expected_name, e ), ) def test_device_info(device_info, responses, handle, _name, _codename, number, protocol, registers): - test_device = device.Device( - mock.Mock(), LowLevelInterfaceFake(responses), None, None, None, handle=handle, device_info=device_info - ) + test_device = device.Device(LowLevelInterfaceFake(responses), None, None, None, handle=handle, device_info=device_info) assert test_device.handle == handle assert test_device._name == _name @@ -195,9 +195,7 @@ def test_device_receiver(number, pairing_info, responses, handle, _name, codenam low_level.request = partial(fake_hidpp.request, fake_hidpp.replace_number(responses, number)) low_level.ping = partial(fake_hidpp.ping, fake_hidpp.replace_number(responses, number)) - test_device = device.Device( - mock.Mock(), low_level, FakeReceiver(codename="CODE"), number, True, pairing_info, handle=handle - ) + test_device = device.Device(low_level, FakeReceiver(codename="CODE"), number, True, pairing_info, handle=handle) test_device.receiver.device = test_device assert test_device.handle == handle @@ -246,7 +244,7 @@ def test_device_ids(number, info, responses, handle, unitId, modelId, tid, kind, low_level.request = partial(fake_hidpp.request, fake_hidpp.replace_number(responses, number)) low_level.ping = partial(fake_hidpp.ping, fake_hidpp.replace_number(responses, number)) - test_device = device.Device(mock.Mock(), low_level, FakeReceiver(), number, True, info, handle=handle) + test_device = device.Device(low_level, FakeReceiver(), number, True, info, handle=handle) assert test_device.unitId == unitId assert test_device.modelId == modelId @@ -261,7 +259,7 @@ def test_device_ids(number, info, responses, handle, unitId, modelId, tid, kind, class FakeDevice(device.Device): # a fully functional Device but its HID++ functions look at local data def __init__(self, responses, *args, **kwargs): self.responses = responses - super().__init__(mock.Mock(), LowLevelInterfaceFake(responses), *args, **kwargs) + super().__init__(LowLevelInterfaceFake(responses), *args, **kwargs) request = fake_hidpp.Device.request ping = fake_hidpp.Device.ping diff --git a/tests/logitech_receiver/test_receiver.py b/tests/logitech_receiver/test_receiver.py index 01f02989..014e20fd 100644 --- a/tests/logitech_receiver/test_receiver.py +++ b/tests/logitech_receiver/test_receiver.py @@ -4,7 +4,6 @@ from dataclasses import dataclass from functools import partial from unittest import mock -import hidapi import pytest from logitech_receiver import common @@ -13,11 +12,6 @@ from logitech_receiver import receiver from . import fake_hidpp -if platform.system() == "Linux": - import hidapi.udev_impl as hidapi -else: - import hidapi.hidapi_impl as hidapi - @pytest.mark.parametrize( "index, expected_kind", @@ -121,16 +115,15 @@ c534_info = {"kind": common.NamedInt(0, "unknown"), "polling": "", "power_switch def test_receiver_factory_create_receiver(device_info, responses, handle, serial, max_devices, mock_base): mock_base[0].side_effect = fake_hidpp.open_path mock_base[1].side_effect = partial(fake_hidpp.request, responses) - find_paired_node_wpid_func = hidapi.find_paired_node_wpid if handle is False: with pytest.raises(Exception): # noqa: B017 - receiver.ReceiverFactory.create_receiver(find_paired_node_wpid_func, device_info, lambda x: x) + receiver.ReceiverFactory.create_receiver(device_info, lambda x: x) elif handle is None: - r = receiver.ReceiverFactory.create_receiver(find_paired_node_wpid_func, device_info, lambda x: x) + r = receiver.ReceiverFactory.create_receiver(device_info, lambda x: x) assert r is None else: - r = receiver.ReceiverFactory.create_receiver(find_paired_node_wpid_func, device_info, lambda x: x) + r = receiver.ReceiverFactory.create_receiver(device_info, lambda x: x) assert r.handle == handle assert r.serial == serial assert r.max_devices == max_devices @@ -151,7 +144,7 @@ def test_receiver_factory_props( mock_base[0].side_effect = fake_hidpp.open_path mock_base[1].side_effect = partial(fake_hidpp.request, responses) - r = receiver.ReceiverFactory.create_receiver(mock.Mock(), device_info, lambda x: x) + r = receiver.ReceiverFactory.create_receiver(device_info, lambda x: x) assert len(r.firmware) == firmware if firmware is not None else firmware is None assert r.device_codename(2) == codename @@ -173,7 +166,7 @@ def test_receiver_factory_string(device_info, responses, status_str, strng, mock mock_base[0].side_effect = fake_hidpp.open_path mock_base[1].side_effect = partial(fake_hidpp.request, responses) - r = receiver.ReceiverFactory.create_receiver(mock.Mock(), device_info, lambda x: x) + r = receiver.ReceiverFactory.create_receiver(device_info, lambda x: x) assert r.status_string() == status_str assert str(r) == strng @@ -191,7 +184,7 @@ def test_receiver_factory_nodevice(device_info, responses, mock_base): mock_base[0].side_effect = fake_hidpp.open_path mock_base[1].side_effect = partial(fake_hidpp.request, responses) - r = receiver.ReceiverFactory.create_receiver(mock.Mock(), device_info, lambda x: x) + r = receiver.ReceiverFactory.create_receiver(device_info, lambda x: x) with pytest.raises(exceptions.NoSuchDevice): r.device_pairing_information(1)