base: Add find_paired_node functions

Avoid direct access to hidapi and use the base module as low-level API
instead. This change replaces the remaining calls to find_paired_node
and find_paired_node_wpid by exposing them via base module.
This commit is contained in:
MattHag 2024-09-28 14:08:56 +02:00 committed by Peter F. Patel-Schneider
parent 90c41dbe32
commit 1729189981
6 changed files with 38 additions and 70 deletions

View File

@ -46,7 +46,10 @@ _hidpp20 = hidpp20.Hidpp20()
class LowLevelInterface(Protocol):
def open_path(self, path):
def open_path(self, path) -> Any:
...
def find_paired_node(self, receiver_path: str, index: int, timeout: int):
...
def ping(self, handle, number, long_message: bool):
@ -64,9 +67,7 @@ low_level_interface = cast(LowLevelInterface, base)
class DeviceFactory:
@staticmethod
def create_device(
find_paired_node_func: Callable[[str, int, int], Any], low_level: LowLevelInterface, device_info, setting_callback=None
):
def create_device(low_level: LowLevelInterface, device_info, setting_callback=None):
"""Opens a Logitech Device found attached to the machine, by Linux device path.
:returns: An open file handle for the found receiver, or None.
"""
@ -75,7 +76,6 @@ class DeviceFactory:
if handle:
# a direct connected device might not be online (as reported by user)
return Device(
find_paired_node_func,
low_level,
None,
None,
@ -100,7 +100,6 @@ class Device:
def __init__(
self,
find_paired_node_func: Callable[[str, int, int], Any],
low_level: LowLevelInterface,
receiver,
number,
@ -157,7 +156,7 @@ class Device:
self.cleanups = [] # functions to run on the device when it is closed
if not self.path:
self.path = find_paired_node_func(receiver.path, number, 1) if receiver else None
self.path = self.low_level.find_paired_node(receiver.path, number, 1) if receiver else None
if not self.handle:
try:
self.handle = self.low_level.open_path(self.path) if self.path else None

View File

@ -20,7 +20,6 @@ import logging
import time
from dataclasses import dataclass
from typing import Any
from typing import Callable
from typing import Optional
from typing import Protocol
@ -48,6 +47,9 @@ class LowLevelInterface(Protocol):
def open_path(self, path):
...
def find_paired_node_wpid(self, receiver_path: str, index: int):
...
def ping(self, handle, number, long_message=False):
...
@ -89,7 +91,6 @@ class Receiver:
def __init__(
self,
find_paired_node_wpid_func: Callable[[str, int], Any],
receiver_kind,
product_info,
handle,
@ -98,7 +99,6 @@ class Receiver:
setting_callback=None,
):
assert handle
self._find_paired_node_wpid_func = find_paired_node_wpid_func
self.isDevice = False # some devices act as receiver so we need a property to distinguish them
self.handle = handle
self.path = path
@ -398,10 +398,8 @@ class Receiver:
class BoltReceiver(Receiver):
"""Bolt receivers use a different pairing prototol and have different pairing registers"""
def __init__(
self, find_paired_node_wpid_func, receiver_kind, product_info, handle, path, product_id, setting_callback=None
):
super().__init__(find_paired_node_wpid_func, receiver_kind, product_info, handle, path, product_id, setting_callback)
def __init__(self, receiver_kind, product_info, handle, path, product_id, setting_callback=None):
super().__init__(receiver_kind, product_info, handle, path, product_id, setting_callback)
def initialize(self, product_info: dict):
serial_reply = self.read_register(Registers.BOLT_UNIQUE_ID)
@ -465,10 +463,8 @@ class LightSpeedReceiver(Receiver):
class Ex100Receiver(Receiver):
"""A very old style receiver, somewhat different from newer receivers"""
def __init__(
self, find_paired_node_wpid_func, receiver_kind, product_info, handle, path, product_id, setting_callback=None
):
super().__init__(find_paired_node_wpid_func, receiver_kind, product_info, handle, path, product_id, setting_callback)
def __init__(self, receiver_kind, product_info, handle, path, product_id, setting_callback=None):
super().__init__(receiver_kind, product_info, handle, path, product_id, setting_callback)
def initialize(self, product_info: dict):
self.serial = None
@ -484,7 +480,8 @@ class Ex100Receiver(Receiver):
return online, encrypted, wpid, kind
def device_pairing_information(self, number: int) -> dict:
wpid = self._find_paired_node_wpid_func(self.path, number) # extract WPID from udev path
# extract WPID from udev path
wpid = base.find_paired_node_wpid(self.path, number)
if not wpid:
logger.error("Unable to get wpid from udev for device %d of %s", number, self)
raise exceptions.NoSuchDevice(number=number, receiver=self, error="Not present 27Mhz device")
@ -520,9 +517,7 @@ receiver_class_mapping = {
class ReceiverFactory:
@staticmethod
def create_receiver(
find_paired_node_wpid_func: Callable[[str, int], Any], device_info, setting_callback=None
) -> Optional[Receiver]:
def create_receiver(device_info, setting_callback=None) -> Optional[Receiver]:
"""Opens a Logitech Receiver found attached to the machine, by Linux device path."""
try:
@ -538,7 +533,6 @@ class ReceiverFactory:
kind = product_info.get("receiver_kind", "unknown")
rclass = receiver_class_mapping.get(kind, Receiver)
return rclass(
find_paired_node_wpid_func,
kind,
product_info,
handle,

View File

@ -16,7 +16,6 @@
import argparse
import logging
import platform
import sys
from importlib import import_module
@ -29,11 +28,6 @@ from logitech_receiver import receiver
from solaar import NAME
if platform.system() == "Linux":
import hidapi.udev_impl as hidapi
else:
import hidapi.hidapi_impl as hidapi
logger = logging.getLogger(__name__)
@ -112,7 +106,7 @@ def _receivers(dev_path=None):
if dev_path is not None and dev_path != dev_info.path:
continue
try:
r = receiver.ReceiverFactory.create_receiver(hidapi.find_paired_node_wpid, dev_info)
r = receiver.ReceiverFactory.create_receiver(dev_info)
if logger.isEnabledFor(logging.DEBUG):
logger.debug("[%s] => %s", dev_info.path, r)
if r:
@ -128,9 +122,9 @@ def _receivers_and_devices(dev_path=None):
continue
try:
if dev_info.isDevice:
d = device.DeviceFactory.create_device(hidapi.find_paired_node, base, dev_info)
d = device.DeviceFactory.create_device(base, dev_info)
else:
d = receiver.ReceiverFactory.create_receiver(hidapi.find_paired_node_wpid, dev_info)
d = receiver.ReceiverFactory.create_receiver(dev_info)
if logger.isEnabledFor(logging.DEBUG):
logger.debug("[%s] => %s", dev_info.path, d)

View File

@ -17,7 +17,6 @@
import errno
import logging
import platform
import subprocess
import time
@ -40,11 +39,6 @@ from . import i18n
gi.require_version("Gtk", "3.0") # NOQA: E402
from gi.repository import GLib # NOQA: E402 # isort:skip
if platform.system() == "Linux":
import hidapi.udev_impl as hidapi
else:
import hidapi.hidapi_impl as hidapi
logger = logging.getLogger(__name__)
_GHOST_DEVICE = namedtuple("_GHOST_DEVICE", ("receiver", "number", "name", "kind", "online"))
@ -261,13 +255,9 @@ def _start(device_info):
assert _status_callback and _setting_callback
isDevice = device_info.isDevice
if not isDevice:
receiver_ = logitech_receiver.receiver.ReceiverFactory.create_receiver(
hidapi.find_paired_node_wpid, device_info, _setting_callback
)
receiver_ = logitech_receiver.receiver.ReceiverFactory.create_receiver(device_info, _setting_callback)
else:
receiver_ = logitech_receiver.device.DeviceFactory.create_device(
hidapi.find_paired_node, base, device_info, _setting_callback
)
receiver_ = logitech_receiver.device.DeviceFactory.create_device(base, device_info, _setting_callback)
if receiver_:
configuration.attach_to(receiver_)
if receiver_.bluetooth and receiver_.hid_serial:

View File

@ -17,7 +17,6 @@
from dataclasses import dataclass
from functools import partial
from typing import Optional
from unittest import mock
import pytest
@ -35,6 +34,9 @@ class LowLevelInterfaceFake:
def open_path(self, path):
return fake_hidpp.open_path(path)
def find_paired_node(self, receiver_path: str, index: int, timeout: int):
return None
def request(self, response, *args, **kwargs):
func = partial(fake_hidpp.request, self.responses)
return func(response, *args, **kwargs)
@ -80,12 +82,12 @@ def test_create_device(device_info, responses, expected_success):
low_level_mock = LowLevelInterfaceFake(responses)
if expected_success is None:
with pytest.raises(PermissionError):
device.DeviceFactory.create_device(mock.Mock(), low_level_mock, device_info)
device.DeviceFactory.create_device(low_level_mock, device_info)
elif not expected_success:
with pytest.raises(TypeError):
device.DeviceFactory.create_device(mock.Mock(), low_level_mock, device_info)
device.DeviceFactory.create_device(low_level_mock, device_info)
else:
test_device = device.DeviceFactory.create_device(mock.Mock(), low_level_mock, device_info)
test_device = device.DeviceFactory.create_device(low_level_mock, device_info)
assert bool(test_device) == expected_success
@ -96,7 +98,7 @@ def test_create_device(device_info, responses, expected_success):
def test_device_name(device_info, responses, expected_codename, expected_name, expected_kind):
low_level = LowLevelInterfaceFake(responses)
test_device = device.DeviceFactory.create_device(mock.Mock(), low_level, device_info)
test_device = device.DeviceFactory.create_device(low_level, device_info)
assert test_device.codename == expected_codename
assert test_device.name == expected_name
@ -124,9 +126,7 @@ def test_device_name(device_info, responses, expected_codename, expected_name, e
),
)
def test_device_info(device_info, responses, handle, _name, _codename, number, protocol, registers):
test_device = device.Device(
mock.Mock(), LowLevelInterfaceFake(responses), None, None, None, handle=handle, device_info=device_info
)
test_device = device.Device(LowLevelInterfaceFake(responses), None, None, None, handle=handle, device_info=device_info)
assert test_device.handle == handle
assert test_device._name == _name
@ -195,9 +195,7 @@ def test_device_receiver(number, pairing_info, responses, handle, _name, codenam
low_level.request = partial(fake_hidpp.request, fake_hidpp.replace_number(responses, number))
low_level.ping = partial(fake_hidpp.ping, fake_hidpp.replace_number(responses, number))
test_device = device.Device(
mock.Mock(), low_level, FakeReceiver(codename="CODE"), number, True, pairing_info, handle=handle
)
test_device = device.Device(low_level, FakeReceiver(codename="CODE"), number, True, pairing_info, handle=handle)
test_device.receiver.device = test_device
assert test_device.handle == handle
@ -246,7 +244,7 @@ def test_device_ids(number, info, responses, handle, unitId, modelId, tid, kind,
low_level.request = partial(fake_hidpp.request, fake_hidpp.replace_number(responses, number))
low_level.ping = partial(fake_hidpp.ping, fake_hidpp.replace_number(responses, number))
test_device = device.Device(mock.Mock(), low_level, FakeReceiver(), number, True, info, handle=handle)
test_device = device.Device(low_level, FakeReceiver(), number, True, info, handle=handle)
assert test_device.unitId == unitId
assert test_device.modelId == modelId
@ -261,7 +259,7 @@ def test_device_ids(number, info, responses, handle, unitId, modelId, tid, kind,
class FakeDevice(device.Device): # a fully functional Device but its HID++ functions look at local data
def __init__(self, responses, *args, **kwargs):
self.responses = responses
super().__init__(mock.Mock(), LowLevelInterfaceFake(responses), *args, **kwargs)
super().__init__(LowLevelInterfaceFake(responses), *args, **kwargs)
request = fake_hidpp.Device.request
ping = fake_hidpp.Device.ping

View File

@ -4,7 +4,6 @@ from dataclasses import dataclass
from functools import partial
from unittest import mock
import hidapi
import pytest
from logitech_receiver import common
@ -13,11 +12,6 @@ from logitech_receiver import receiver
from . import fake_hidpp
if platform.system() == "Linux":
import hidapi.udev_impl as hidapi
else:
import hidapi.hidapi_impl as hidapi
@pytest.mark.parametrize(
"index, expected_kind",
@ -121,16 +115,15 @@ c534_info = {"kind": common.NamedInt(0, "unknown"), "polling": "", "power_switch
def test_receiver_factory_create_receiver(device_info, responses, handle, serial, max_devices, mock_base):
mock_base[0].side_effect = fake_hidpp.open_path
mock_base[1].side_effect = partial(fake_hidpp.request, responses)
find_paired_node_wpid_func = hidapi.find_paired_node_wpid
if handle is False:
with pytest.raises(Exception): # noqa: B017
receiver.ReceiverFactory.create_receiver(find_paired_node_wpid_func, device_info, lambda x: x)
receiver.ReceiverFactory.create_receiver(device_info, lambda x: x)
elif handle is None:
r = receiver.ReceiverFactory.create_receiver(find_paired_node_wpid_func, device_info, lambda x: x)
r = receiver.ReceiverFactory.create_receiver(device_info, lambda x: x)
assert r is None
else:
r = receiver.ReceiverFactory.create_receiver(find_paired_node_wpid_func, device_info, lambda x: x)
r = receiver.ReceiverFactory.create_receiver(device_info, lambda x: x)
assert r.handle == handle
assert r.serial == serial
assert r.max_devices == max_devices
@ -151,7 +144,7 @@ def test_receiver_factory_props(
mock_base[0].side_effect = fake_hidpp.open_path
mock_base[1].side_effect = partial(fake_hidpp.request, responses)
r = receiver.ReceiverFactory.create_receiver(mock.Mock(), device_info, lambda x: x)
r = receiver.ReceiverFactory.create_receiver(device_info, lambda x: x)
assert len(r.firmware) == firmware if firmware is not None else firmware is None
assert r.device_codename(2) == codename
@ -173,7 +166,7 @@ def test_receiver_factory_string(device_info, responses, status_str, strng, mock
mock_base[0].side_effect = fake_hidpp.open_path
mock_base[1].side_effect = partial(fake_hidpp.request, responses)
r = receiver.ReceiverFactory.create_receiver(mock.Mock(), device_info, lambda x: x)
r = receiver.ReceiverFactory.create_receiver(device_info, lambda x: x)
assert r.status_string() == status_str
assert str(r) == strng
@ -191,7 +184,7 @@ def test_receiver_factory_nodevice(device_info, responses, mock_base):
mock_base[0].side_effect = fake_hidpp.open_path
mock_base[1].side_effect = partial(fake_hidpp.request, responses)
r = receiver.ReceiverFactory.create_receiver(mock.Mock(), device_info, lambda x: x)
r = receiver.ReceiverFactory.create_receiver(device_info, lambda x: x)
with pytest.raises(exceptions.NoSuchDevice):
r.device_pairing_information(1)