receiver: Remove hard dependency on base

With this test all receiver tests are macOS compatible again. The low
level interface supports passing a fake API for unit tests.
This commit is contained in:
MattHag 2024-09-28 18:45:17 +02:00 committed by Peter F. Patel-Schneider
parent 4e50e605a6
commit ef6b7dec2c
4 changed files with 60 additions and 55 deletions

View File

@ -23,12 +23,10 @@ from dataclasses import dataclass
from typing import Callable from typing import Callable
from typing import Optional from typing import Optional
from typing import Protocol from typing import Protocol
from typing import cast
from solaar.i18n import _ from solaar.i18n import _
from solaar.i18n import ngettext from solaar.i18n import ngettext
from . import base
from . import exceptions from . import exceptions
from . import hidpp10 from . import hidpp10
from . import hidpp10_constants from . import hidpp10_constants
@ -60,9 +58,6 @@ class LowLevelInterface(Protocol):
... ...
low_level_interface = cast(LowLevelInterface, base)
@dataclass @dataclass
class Pairing: class Pairing:
"""Information about the current or most recent pairing""" """Information about the current or most recent pairing"""
@ -91,6 +86,7 @@ class Receiver:
def __init__( def __init__(
self, self,
low_level: LowLevelInterface,
receiver_kind, receiver_kind,
product_info, product_info,
handle, handle,
@ -99,6 +95,7 @@ class Receiver:
setting_callback=None, setting_callback=None,
): ):
assert handle assert handle
self.low_level = low_level
self.isDevice = False # some devices act as receiver so we need a property to distinguish them self.isDevice = False # some devices act as receiver so we need a property to distinguish them
self.handle = handle self.handle = handle
self.path = path self.path = path
@ -137,7 +134,7 @@ class Receiver:
if d: if d:
d.close() d.close()
self._devices.clear() self._devices.clear()
return handle and base.close(handle) return handle and self.low_level.close(handle)
def __del__(self): def __del__(self):
self.close() self.close()
@ -257,7 +254,7 @@ class Receiver:
logger.warning("mismatch on device kind %s %s", info["kind"], nkind) logger.warning("mismatch on device kind %s %s", info["kind"], nkind)
else: else:
online = True online = True
dev = Device(low_level_interface, self, number, online, pairing_info=info, setting_callback=self.setting_callback) dev = Device(self.low_level, self, number, online, pairing_info=info, setting_callback=self.setting_callback)
if logger.isEnabledFor(logging.INFO): if logger.isEnabledFor(logging.INFO):
logger.info("%s: found new device %d (%s)", self, number, dev.wpid) logger.info("%s: found new device %d (%s)", self, number, dev.wpid)
self._devices[number] = dev self._devices[number] = dev
@ -282,7 +279,7 @@ class Receiver:
def request(self, request_id, *params): def request(self, request_id, *params):
if bool(self): if bool(self):
return base.request(self.handle, 0xFF, request_id, *params) return self.low_level.request(self.handle, 0xFF, request_id, *params)
def reset_pairing(self): def reset_pairing(self):
self.pairing = Pairing() self.pairing = Pairing()
@ -481,7 +478,7 @@ class Ex100Receiver(Receiver):
def device_pairing_information(self, number: int) -> dict: def device_pairing_information(self, number: int) -> dict:
# extract WPID from udev path # extract WPID from udev path
wpid = base.find_paired_node_wpid(self.path, number) wpid = self.low_level.find_paired_node_wpid(self.path, number)
if not wpid: if not wpid:
logger.error("Unable to get wpid from udev for device %d of %s", number, self) logger.error("Unable to get wpid from udev for device %d of %s", number, self)
raise exceptions.NoSuchDevice(number=number, receiver=self, error="Not present 27Mhz device") raise exceptions.NoSuchDevice(number=number, receiver=self, error="Not present 27Mhz device")
@ -517,22 +514,23 @@ receiver_class_mapping = {
class ReceiverFactory: class ReceiverFactory:
@staticmethod @staticmethod
def create_receiver(device_info, setting_callback=None) -> Optional[Receiver]: def create_receiver(low_level: LowLevelInterface, device_info, setting_callback=None) -> Optional[Receiver]:
"""Opens a Logitech Receiver found attached to the machine, by Linux device path.""" """Opens a Logitech Receiver found attached to the machine, by Linux device path."""
try: try:
handle = base.open_path(device_info.path) handle = low_level.open_path(device_info.path)
if handle: if handle:
usb_id = device_info.product_id usb_id = device_info.product_id
if isinstance(usb_id, str): if isinstance(usb_id, str):
usb_id = int(usb_id, 16) usb_id = int(usb_id, 16)
try: try:
product_info = base.product_information(usb_id) product_info = low_level.product_information(usb_id)
except ValueError: except ValueError:
product_info = {} product_info = {}
kind = product_info.get("receiver_kind", "unknown") kind = product_info.get("receiver_kind", "unknown")
rclass = receiver_class_mapping.get(kind, Receiver) rclass = receiver_class_mapping.get(kind, Receiver)
return rclass( return rclass(
low_level,
kind, kind,
product_info, product_info,
handle, handle,

View File

@ -106,7 +106,7 @@ def _receivers(dev_path=None):
if dev_path is not None and dev_path != dev_info.path: if dev_path is not None and dev_path != dev_info.path:
continue continue
try: try:
r = receiver.ReceiverFactory.create_receiver(dev_info) r = receiver.ReceiverFactory.create_receiver(base, dev_info)
if logger.isEnabledFor(logging.DEBUG): if logger.isEnabledFor(logging.DEBUG):
logger.debug("[%s] => %s", dev_info.path, r) logger.debug("[%s] => %s", dev_info.path, r)
if r: if r:
@ -124,7 +124,7 @@ def _receivers_and_devices(dev_path=None):
if dev_info.isDevice: if dev_info.isDevice:
d = device.DeviceFactory.create_device(base, dev_info) d = device.DeviceFactory.create_device(base, dev_info)
else: else:
d = receiver.ReceiverFactory.create_receiver(dev_info) d = receiver.ReceiverFactory.create_receiver(base, dev_info)
if logger.isEnabledFor(logging.DEBUG): if logger.isEnabledFor(logging.DEBUG):
logger.debug("[%s] => %s", dev_info.path, d) logger.debug("[%s] => %s", dev_info.path, d)

View File

@ -255,7 +255,7 @@ def _start(device_info):
assert _status_callback and _setting_callback assert _status_callback and _setting_callback
isDevice = device_info.isDevice isDevice = device_info.isDevice
if not isDevice: if not isDevice:
receiver_ = logitech_receiver.receiver.ReceiverFactory.create_receiver(device_info, _setting_callback) receiver_ = logitech_receiver.receiver.ReceiverFactory.create_receiver(base, device_info, _setting_callback)
else: else:
receiver_ = logitech_receiver.device.DeviceFactory.create_device(base, device_info, _setting_callback) receiver_ = logitech_receiver.device.DeviceFactory.create_device(base, device_info, _setting_callback)
if receiver_: if receiver_:

View File

@ -1,11 +1,10 @@
import platform
from dataclasses import dataclass from dataclasses import dataclass
from functools import partial from functools import partial
from unittest import mock from unittest import mock
import pytest import pytest
from logitech_receiver import base
from logitech_receiver import common from logitech_receiver import common
from logitech_receiver import exceptions from logitech_receiver import exceptions
from logitech_receiver import receiver from logitech_receiver import receiver
@ -13,6 +12,31 @@ from logitech_receiver import receiver
from . import fake_hidpp from . import fake_hidpp
class LowLevelInterfaceFake:
def __init__(self, responses=None):
self.responses = responses
def open_path(self, path):
return fake_hidpp.open_path(path)
def product_information(self, usb_id: int) -> dict:
return base.product_information(usb_id)
def find_paired_node(self, receiver_path: str, index: int, timeout: int):
return None
def request(self, response, *args, **kwargs):
func = partial(fake_hidpp.request, self.responses)
return func(response, *args, **kwargs)
def ping(self, response, *args, **kwargs):
func = partial(fake_hidpp.ping, self.responses)
return func(response, *args, **kwargs)
def close(self, *args, **kwargs):
pass
@pytest.mark.parametrize( @pytest.mark.parametrize(
"index, expected_kind", "index, expected_kind",
[ [
@ -47,13 +71,6 @@ def mock_request():
yield mock_request yield mock_request
@pytest.fixture
def mock_base():
with mock.patch("logitech_receiver.base.open_path", return_value=None) as mock_open_path:
with mock.patch("logitech_receiver.base.request", return_value=None) as mock_request:
yield mock_open_path, mock_request
responses_unifying = [ responses_unifying = [
fake_hidpp.Response("000000", 0x8003, "FF"), fake_hidpp.Response("000000", 0x8003, "FF"),
fake_hidpp.Response("000300", 0x8102), fake_hidpp.Response("000300", 0x8102),
@ -99,37 +116,34 @@ mouse_info = {
c534_info = {"kind": common.NamedInt(0, "unknown"), "polling": "", "power_switch": "(unknown)", "serial": None, "wpid": "45AB"} c534_info = {"kind": common.NamedInt(0, "unknown"), "polling": "", "power_switch": "(unknown)", "serial": None, "wpid": "45AB"}
@pytest.mark.skipif(platform.system() == "Darwin", reason="Fails on macOS")
@pytest.mark.parametrize( @pytest.mark.parametrize(
"device_info, responses, handle, serial, max_devices, ", "device_info, responses, handle, serial, max_devices, ",
[ [
(DeviceInfo(None), [], False, None, None), (DeviceInfo(path=None), [], False, None, None),
(DeviceInfo(11), [], None, None, None), (DeviceInfo(path=11), [], None, None, None),
(DeviceInfo("11"), responses_unifying, 0x11, "16CC9CB4", 6), (DeviceInfo(path="11"), responses_unifying, 0x11, "16CC9CB4", 6),
(DeviceInfo("12", product_id=0xC534), responses_c534, 0x12, "16CC9CB4", 2), (DeviceInfo(path="12", product_id=0xC534), responses_c534, 0x12, "16CC9CB4", 2),
(DeviceInfo("12", product_id=0xC539), responses_c534, 0x12, "16CC9CB4", 2), (DeviceInfo(path="12", product_id=0xC539), responses_c534, 0x12, "16CC9CB4", 2),
(DeviceInfo("13"), responses_unusual, 0x13, "26CC9CB4", 1), (DeviceInfo(path="13"), responses_unusual, 0x13, "26CC9CB4", 1),
(DeviceInfo("14"), responses_lacking, 0x14, None, 1), (DeviceInfo(path="14"), responses_lacking, 0x14, None, 1),
], ],
) )
def test_receiver_factory_create_receiver(device_info, responses, handle, serial, max_devices, mock_base): def test_receiver_factory_create_receiver(device_info, responses, handle, serial, max_devices):
mock_base[0].side_effect = fake_hidpp.open_path mock_low_level = LowLevelInterfaceFake(responses)
mock_base[1].side_effect = partial(fake_hidpp.request, responses)
if handle is False: if handle is False:
with pytest.raises(Exception): # noqa: B017 with pytest.raises(Exception): # noqa: B017
receiver.ReceiverFactory.create_receiver(device_info, lambda x: x) receiver.ReceiverFactory.create_receiver(mock_low_level, device_info, lambda x: x)
elif handle is None: elif handle is None:
r = receiver.ReceiverFactory.create_receiver(device_info, lambda x: x) r = receiver.ReceiverFactory.create_receiver(mock_low_level, device_info, lambda x: x)
assert r is None assert r is None
else: else:
r = receiver.ReceiverFactory.create_receiver(device_info, lambda x: x) r = receiver.ReceiverFactory.create_receiver(mock_low_level, device_info, lambda x: x)
assert r.handle == handle assert r.handle == handle
assert r.serial == serial assert r.serial == serial
assert r.max_devices == max_devices assert r.max_devices == max_devices
@pytest.mark.skipif(platform.system() == "Darwin", reason="Fails on macOS")
@pytest.mark.parametrize( @pytest.mark.parametrize(
"device_info, responses, firmware, codename, remaining_pairings, pairing_info, count", "device_info, responses, firmware, codename, remaining_pairings, pairing_info, count",
[ [
@ -138,13 +152,10 @@ def test_receiver_factory_create_receiver(device_info, responses, handle, serial
(DeviceInfo("13", product_id=0xCCCC), responses_unusual, None, None, -1, c534_info, 3), (DeviceInfo("13", product_id=0xCCCC), responses_unusual, None, None, -1, c534_info, 3),
], ],
) )
def test_receiver_factory_props( def test_receiver_factory_props(device_info, responses, firmware, codename, remaining_pairings, pairing_info, count):
device_info, responses, firmware, codename, remaining_pairings, pairing_info, count, mock_base mock_low_level = LowLevelInterfaceFake(responses)
):
mock_base[0].side_effect = fake_hidpp.open_path
mock_base[1].side_effect = partial(fake_hidpp.request, responses)
r = receiver.ReceiverFactory.create_receiver(device_info, lambda x: x) r = receiver.ReceiverFactory.create_receiver(mock_low_level, device_info, lambda x: x)
assert len(r.firmware) == firmware if firmware is not None else firmware is None assert len(r.firmware) == firmware if firmware is not None else firmware is None
assert r.device_codename(2) == codename assert r.device_codename(2) == codename
@ -153,7 +164,6 @@ def test_receiver_factory_props(
assert r.count() == count assert r.count() == count
@pytest.mark.skipif(platform.system() == "Darwin", reason="Fails on macOS")
@pytest.mark.parametrize( @pytest.mark.parametrize(
"device_info, responses, status_str, strng", "device_info, responses, status_str, strng",
[ [
@ -162,17 +172,15 @@ def test_receiver_factory_props(
(DeviceInfo("13", product_id=0xCCCC), responses_unusual, "No paired devices.", "<Receiver(13,19)>"), (DeviceInfo("13", product_id=0xCCCC), responses_unusual, "No paired devices.", "<Receiver(13,19)>"),
], ],
) )
def test_receiver_factory_string(device_info, responses, status_str, strng, mock_base): def test_receiver_factory_string(device_info, responses, status_str, strng):
mock_base[0].side_effect = fake_hidpp.open_path mock_low_level = LowLevelInterfaceFake(responses)
mock_base[1].side_effect = partial(fake_hidpp.request, responses)
r = receiver.ReceiverFactory.create_receiver(device_info, lambda x: x) r = receiver.ReceiverFactory.create_receiver(mock_low_level, device_info, lambda x: x)
assert r.status_string() == status_str assert r.status_string() == status_str
assert str(r) == strng assert str(r) == strng
@pytest.mark.skipif(platform.system() == "Darwin", reason="Fails on macOS")
@pytest.mark.parametrize( @pytest.mark.parametrize(
"device_info, responses", "device_info, responses",
[ [
@ -180,11 +188,10 @@ def test_receiver_factory_string(device_info, responses, status_str, strng, mock
(DeviceInfo("14", product_id="C534"), responses_lacking), (DeviceInfo("14", product_id="C534"), responses_lacking),
], ],
) )
def test_receiver_factory_nodevice(device_info, responses, mock_base): def test_receiver_factory_no_device(device_info, responses):
mock_base[0].side_effect = fake_hidpp.open_path mock_low_level = LowLevelInterfaceFake(responses)
mock_base[1].side_effect = partial(fake_hidpp.request, responses)
r = receiver.ReceiverFactory.create_receiver(device_info, lambda x: x) r = receiver.ReceiverFactory.create_receiver(mock_low_level, device_info, lambda x: x)
with pytest.raises(exceptions.NoSuchDevice): with pytest.raises(exceptions.NoSuchDevice):
r.device_pairing_information(1) r.device_pairing_information(1)