refactor: Creation of devices (#2493)

* Refine interfaces for testability

* Reenable fixed device tests
This commit is contained in:
MattHag 2024-05-27 17:58:16 +02:00 committed by GitHub
parent faf27ca323
commit c29231bc6b
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
5 changed files with 135 additions and 100 deletions

View File

@ -21,6 +21,8 @@ import time
from typing import Callable from typing import Callable
from typing import Optional from typing import Optional
from typing import Protocol
from typing import cast
import hidapi import hidapi
@ -46,23 +48,49 @@ _R = hidpp10_constants.REGISTERS
_IR = hidpp10_constants.INFO_SUBREGISTERS _IR = hidpp10_constants.INFO_SUBREGISTERS
class LowLevelInterface(Protocol):
def open_path(self, path):
...
def ping(self, handle, number, long_message: bool):
...
def request(self, handle, devnumber, request_id, *params, **kwargs):
...
def close(self, handle, *args, **kwargs) -> bool:
...
low_level_interface = cast(LowLevelInterface, base)
class DeviceFactory: class DeviceFactory:
@staticmethod @staticmethod
def create_device(device_info, setting_callback=None): def create_device(low_level: LowLevelInterface, device_info, setting_callback=None):
"""Opens a Logitech Device found attached to the machine, by Linux device path. """Opens a Logitech Device found attached to the machine, by Linux device path.
:returns: An open file handle for the found receiver, or None. :returns: An open file handle for the found receiver, or None.
""" """
try: try:
handle = base.open_path(device_info.path) handle = low_level.open_path(device_info.path)
if handle: if handle:
# a direct connected device might not be online (as reported by user) # a direct connected device might not be online (as reported by user)
return Device(None, None, None, handle=handle, device_info=device_info, setting_callback=setting_callback) return Device(
low_level,
None,
None,
None,
handle=handle,
device_info=device_info,
setting_callback=setting_callback,
)
except OSError as e: except OSError as e:
logger.exception("open %s", device_info) logger.exception("open %s", device_info)
if e.errno == errno.EACCES: if e.errno == errno.EACCES:
raise raise
except Exception: except Exception:
logger.exception("open %s", device_info) logger.exception("open %s", device_info)
raise
class Device: class Device:
@ -72,6 +100,7 @@ class Device:
def __init__( def __init__(
self, self,
low_level: LowLevelInterface,
receiver, receiver,
number, number,
online, online,
@ -83,6 +112,7 @@ class Device:
assert receiver or device_info assert receiver or device_info
if receiver: if receiver:
assert 0 < number <= 15 # some receivers have devices past their max # of devices assert 0 < number <= 15 # some receivers have devices past their max # of devices
self.low_level = low_level
self.number = number # will be None at this point for directly connected devices self.number = number # will be None at this point for directly connected devices
self.online = online # is the device online? - gates many atempts to contact the device self.online = online # is the device online? - gates many atempts to contact the device
self.descriptor = None self.descriptor = None
@ -129,11 +159,11 @@ class Device:
self.path = hidapi.find_paired_node(receiver.path, number, 1) if receiver else None self.path = hidapi.find_paired_node(receiver.path, number, 1) if receiver else None
if not self.handle: if not self.handle:
try: try:
self.handle = base.open_path(self.path) if self.path else None self.handle = self.low_level.open_path(self.path) if self.path else None
except Exception: # maybe the device wasn't set up except Exception: # maybe the device wasn't set up
try: try:
time.sleep(1) time.sleep(1)
self.handle = base.open_path(self.path) if self.path else None self.handle = self.low_level.open_path(self.path) if self.path else None
except Exception: # give up except Exception: # give up
self.handle = None # should this give up completely? self.handle = None # should this give up completely?
@ -484,7 +514,7 @@ class Device:
long = self.hidpp_long is True or ( long = self.hidpp_long is True or (
self.hidpp_long is None and (self.bluetooth or self._protocol is not None and self._protocol >= 2.0) self.hidpp_long is None and (self.bluetooth or self._protocol is not None and self._protocol >= 2.0)
) )
return base.request( return self.low_level.request(
self.handle or self.receiver.handle, self.handle or self.receiver.handle,
self.number, self.number,
request_id, request_id,
@ -503,7 +533,8 @@ class Device:
long = self.hidpp_long is True or ( long = self.hidpp_long is True or (
self.hidpp_long is None and (self.bluetooth or self._protocol is not None and self._protocol >= 2.0) self.hidpp_long is None and (self.bluetooth or self._protocol is not None and self._protocol >= 2.0)
) )
protocol = base.ping(self.handle or self.receiver.handle, self.number, long_message=long) handle = self.handle or self.receiver.handle
protocol = self.low_level.ping(handle, self.number, long_message=long)
self.online = protocol is not None self.online = protocol is not None
if protocol: if protocol:
self._protocol = protocol self._protocol = protocol
@ -519,7 +550,7 @@ class Device:
if hasattr(self, "cleanups"): if hasattr(self, "cleanups"):
for cleanup in self.cleanups: for cleanup in self.cleanups:
cleanup(self) cleanup(self)
return handle and base.close(handle) return handle and self.low_level.close(handle)
def __index__(self): def __index__(self):
return self.number return self.number

View File

@ -22,6 +22,8 @@ import time
from dataclasses import dataclass from dataclasses import dataclass
from typing import Callable from typing import Callable
from typing import Optional from typing import Optional
from typing import Protocol
from typing import cast
import hidapi import hidapi
@ -42,6 +44,23 @@ _R = hidpp10_constants.REGISTERS
_IR = hidpp10_constants.INFO_SUBREGISTERS _IR = hidpp10_constants.INFO_SUBREGISTERS
class LowLevelInterface(Protocol):
def open_path(self, path):
...
def ping(self, handle, number, long_message=False):
...
def request(self, handle, devnumber, request_id, *params, **kwargs):
...
def close(self, handle):
...
low_level_interface = cast(LowLevelInterface, base)
@dataclass @dataclass
class Pairing: class Pairing:
"""Information about the current or most recent pairing""" """Information about the current or most recent pairing"""
@ -228,7 +247,7 @@ class Receiver:
logger.warning("mismatch on device kind %s %s", info["kind"], nkind) logger.warning("mismatch on device kind %s %s", info["kind"], nkind)
else: else:
online = True online = True
dev = Device(self, number, online, pairing_info=info, setting_callback=self.setting_callback) dev = Device(low_level_interface, self, number, online, pairing_info=info, setting_callback=self.setting_callback)
if logger.isEnabledFor(logging.INFO): if logger.isEnabledFor(logging.INFO):
logger.info("%s: found new device %d (%s)", self, number, dev.wpid) logger.info("%s: found new device %d (%s)", self, number, dev.wpid)
self._devices[number] = dev self._devices[number] = dev

View File

@ -25,8 +25,7 @@ from traceback import format_exc
import logitech_receiver.device as _device import logitech_receiver.device as _device
import logitech_receiver.receiver as _receiver import logitech_receiver.receiver as _receiver
from logitech_receiver.base import receivers from logitech_receiver import base
from logitech_receiver.base import receivers_and_devices
from solaar import NAME from solaar import NAME
@ -108,7 +107,7 @@ print_help = _cli_parser.print_help
def _receivers(dev_path=None): def _receivers(dev_path=None):
for dev_info in receivers(): for dev_info in base.receivers():
if dev_path is not None and dev_path != dev_info.path: if dev_path is not None and dev_path != dev_info.path:
continue continue
try: try:
@ -123,12 +122,12 @@ def _receivers(dev_path=None):
def _receivers_and_devices(dev_path=None): def _receivers_and_devices(dev_path=None):
for dev_info in receivers_and_devices(): for dev_info in base.receivers_and_devices():
if dev_path is not None and dev_path != dev_info.path: if dev_path is not None and dev_path != dev_info.path:
continue continue
try: try:
if dev_info.isDevice: if dev_info.isDevice:
d = _device.DeviceFactory.create_device(dev_info) d = _device.DeviceFactory.create_device(base, dev_info)
else: else:
d = _receiver.ReceiverFactory.create_receiver(dev_info) d = _receiver.ReceiverFactory.create_receiver(dev_info)

View File

@ -258,7 +258,7 @@ def _start(device_info):
if not isDevice: if not isDevice:
receiver = _receiver.ReceiverFactory.create_receiver(device_info, _setting_callback) receiver = _receiver.ReceiverFactory.create_receiver(device_info, _setting_callback)
else: else:
receiver = _device.DeviceFactory.create_device(device_info, _setting_callback) receiver = _device.DeviceFactory.create_device(_base, device_info, _setting_callback)
if receiver: if receiver:
configuration.attach_to(receiver) configuration.attach_to(receiver)
if receiver.bluetooth and receiver.hid_serial: if receiver.bluetooth and receiver.hid_serial:

View File

@ -13,7 +13,6 @@
## You should have received a copy of the GNU General Public License along ## You should have received a copy of the GNU General Public License along
## with this program; if not, write to the Free Software Foundation, Inc., ## with this program; if not, write to the Free Software Foundation, Inc.,
## 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. ## 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
import platform
from dataclasses import dataclass from dataclasses import dataclass
from functools import partial from functools import partial
@ -29,75 +28,77 @@ from logitech_receiver import hidpp20
from . import hidpp from . import hidpp
@pytest.fixture class LowLevelInterfaceFake:
def mock_base(): # allow override of base functions def __init__(self, responses=None):
with mock.patch("logitech_receiver.base.open_path", return_value=None) as mock_open_path: self.responses = responses
with mock.patch("logitech_receiver.base.request", return_value=None) as mock_request:
with mock.patch("logitech_receiver.base.ping", return_value=None) as mock_ping: def open_path(self, path):
yield mock_open_path, mock_request, mock_ping return hidpp.open_path(path)
def request(self, response, *args, **kwargs):
func = partial(hidpp.request, self.responses)
return func(response, *args, **kwargs)
def ping(self, response, *args, **kwargs):
func = partial(hidpp.ping, self.responses)
return func(response, *args, **kwargs)
def close(self, *args, **kwargs):
pass
@dataclass @dataclass
class DeviceInfo: class DeviceInfoStub:
path: str path: str
product_id: str
vendor_id: int = 1133 vendor_id: int = 1133
product_id: int = 4066
hidpp_short: bool = False hidpp_short: bool = False
hidpp_long: bool = True hidpp_long: bool = True
bus_id: int = 0x0003 # USB bus_id: int = 0x0003 # USB
serial: str = "aa:aa:aa;aa" serial: str = "aa:aa:aa;aa"
di_bad_handle = DeviceInfo(None, product_id="CCCC") di_bad_handle = DeviceInfoStub(None, product_id="CCCC")
di_error = DeviceInfo(11, product_id="CCCC") di_error = DeviceInfoStub(11, product_id="CCCC")
di_CCCC = DeviceInfo("11", product_id="CCCC") di_CCCC = DeviceInfoStub("11", product_id="CCCC")
di_C318 = DeviceInfo("11", product_id="C318") di_C318 = DeviceInfoStub("11", product_id="C318")
di_B530 = DeviceInfo("11", product_id="B350", bus_id=0x0005) di_B530 = DeviceInfoStub("11", product_id="B350", bus_id=0x0005)
di_C068 = DeviceInfo("11", product_id="C06B") di_C068 = DeviceInfoStub("11", product_id="C06B")
di_C08A = DeviceInfo("11", product_id="C08A") di_C08A = DeviceInfoStub("11", product_id="C08A")
di_DDDD = DeviceInfo("11", product_id="DDDD") di_DDDD = DeviceInfoStub("11", product_id="DDDD")
@pytest.mark.parametrize( @pytest.mark.parametrize(
"device_info, responses, success", "device_info, responses, expected_success",
[(di_bad_handle, hidpp.r_empty, None), (di_error, hidpp.r_empty, False), (di_CCCC, hidpp.r_empty, True)], [(di_bad_handle, hidpp.r_empty, None), (di_error, hidpp.r_empty, False), (di_CCCC, hidpp.r_empty, True)],
) )
def test_DeviceFactory(device_info, responses, success, mock_base): def test_create_device(device_info, responses, expected_success):
mock_base[0].side_effect = hidpp.open_path low_level_mock = LowLevelInterfaceFake(responses)
mock_base[1].side_effect = partial(hidpp.request, responses) if expected_success is None:
mock_base[2].side_effect = partial(hidpp.ping, responses) with pytest.raises(PermissionError):
device.DeviceFactory.create_device(low_level_mock, device_info)
if success is None: elif not expected_success:
with pytest.raises(Exception): # noqa: B017 with pytest.raises(TypeError):
test_device = device.DeviceFactory.create_device(device_info) device.DeviceFactory.create_device(low_level_mock, device_info)
else: else:
test_device = device.DeviceFactory.create_device(device_info) test_device = device.DeviceFactory.create_device(low_level_mock, device_info)
assert bool(test_device) == success assert bool(test_device) == expected_success
@pytest.mark.parametrize( @pytest.mark.parametrize(
"device_info, responses, codename, name, kind", "device_info, responses, expected_codename, expected_name, expected_kind",
[ [(di_CCCC, hidpp.r_empty, "?? (CCCC)", "Unknown device CCCC", "?")],
(di_CCCC, hidpp.r_empty, "?? (CCCC)", "Unknown device CCCC", "?"),
(di_C318, hidpp.r_keyboard_1, "?? (C318)", "Unknown device C318", "?"),
(di_B530, hidpp.r_keyboard_2, "ABCDEFGHIJKLMNOPQR", "ABCDEFGHIJKLMNOPQR", common.NamedInt(1, "keyboard")),
],
) )
def test_Device_name(device_info, responses, codename, name, kind, mock_base): def test_device_name(device_info, responses, expected_codename, expected_name, expected_kind):
mock_base[0].side_effect = hidpp.open_path low_level = LowLevelInterfaceFake(responses)
mock_base[1].side_effect = partial(hidpp.request, responses)
mock_base[2].side_effect = partial(hidpp.ping, responses)
test_device = device.DeviceFactory.create_device(device_info)
test_device._codename = None
test_device._name = None
test_device._kind = None
assert test_device.codename == codename test_device = device.DeviceFactory.create_device(low_level, device_info)
assert test_device.name == name
assert test_device.kind == kind assert test_device.codename == expected_codename
assert test_device.name == expected_name
assert test_device.kind == expected_kind
@pytest.mark.skipif(platform.system() == "Darwin", reason="Cleanup fails on macOS")
@pytest.mark.parametrize( @pytest.mark.parametrize(
"device_info, responses, handle, _name, _codename, number, protocol, registers", "device_info, responses, handle, _name, _codename, number, protocol, registers",
zip( zip(
@ -111,12 +112,8 @@ def test_Device_name(device_info, responses, codename, name, kind, mock_base):
[[], [], [], (common.NamedInt(7, "battery status"), common.NamedInt(81, "three leds")), [], []], [[], [], [], (common.NamedInt(7, "battery status"), common.NamedInt(81, "three leds")), [], []],
), ),
) )
def test_Device_info(device_info, responses, handle, _name, _codename, number, protocol, registers, mock_base): def test_device_info(device_info, responses, handle, _name, _codename, number, protocol, registers):
mock_base[0].side_effect = hidpp.open_path test_device = device.Device(LowLevelInterfaceFake(responses), None, None, None, handle=handle, device_info=device_info)
mock_base[1].side_effect = partial(hidpp.request, responses)
mock_base[2].side_effect = partial(hidpp.ping, responses)
test_device = device.Device(None, None, None, handle=handle, device_info=device_info)
assert test_device.handle == handle assert test_device.handle == handle
assert test_device._name == _name assert test_device._name == _name
@ -132,7 +129,7 @@ def test_Device_info(device_info, responses, handle, _name, _codename, number, p
@dataclass @dataclass
class Receiver: class FakeReceiver:
path: str = "11" path: str = "11"
handle: int = 0x11 handle: int = 0x11
codename: Optional[str] = None codename: Optional[str] = None
@ -158,7 +155,6 @@ pi_407B = {"wpid": "407B", "kind": 2, "serial": "5678", "polling": "1ms", "power
pi_DDDD = {"wpid": "DDDD", "kind": 2, "serial": "1234", "polling": "2ms", "power_switch": "top"} pi_DDDD = {"wpid": "DDDD", "kind": 2, "serial": "1234", "polling": "2ms", "power_switch": "top"}
@pytest.mark.skipif(platform.system() == "Darwin", reason="Cleanup fails on macOS")
@pytest.mark.parametrize( @pytest.mark.parametrize(
"number, pairing_info, responses, handle, _name, codename, p, p2, name", "number, pairing_info, responses, handle, _name, codename, p, p2, name",
zip( zip(
@ -180,13 +176,14 @@ pi_DDDD = {"wpid": "DDDD", "kind": 2, "serial": "1234", "polling": "2ms", "power
], ],
), ),
) )
def test_Device_receiver(number, pairing_info, responses, handle, _name, codename, p, p2, name, mock_base, mock_hid): def test_device_receiver(number, pairing_info, responses, handle, _name, codename, p, p2, name, mock_hid):
mock_base[0].side_effect = hidpp.open_path
mock_base[1].side_effect = partial(hidpp.request, hidpp.replace_number(responses, number))
mock_base[2].side_effect = partial(hidpp.ping, hidpp.replace_number(responses, number))
mock_hid.side_effect = lambda x, y, z: x mock_hid.side_effect = lambda x, y, z: x
test_device = device.Device(Receiver(codename="CODE"), number, True, pairing_info, handle=handle) low_level = LowLevelInterfaceFake(responses)
low_level.request = partial(hidpp.request, hidpp.replace_number(responses, number))
low_level.ping = partial(hidpp.ping, hidpp.replace_number(responses, number))
test_device = device.Device(low_level, FakeReceiver(codename="CODE"), number, True, pairing_info, handle=handle)
test_device.receiver.device = test_device test_device.receiver.device = test_device
assert test_device.handle == handle assert test_device.handle == handle
@ -197,6 +194,7 @@ def test_Device_receiver(number, pairing_info, responses, handle, _name, codenam
assert test_device.protocol == p2 assert test_device.protocol == p2
assert test_device.codename == codename assert test_device.codename == codename
assert test_device.name == name assert test_device.name == name
assert test_device == test_device assert test_device == test_device
assert not (test_device != test_device) assert not (test_device != test_device)
assert bool(test_device) assert bool(test_device)
@ -222,15 +220,14 @@ def test_Device_receiver(number, pairing_info, responses, handle, _name, codenam
["1ms", "2ms", "4ms", "8ms", "1ms", "9ms"], # polling rate ["1ms", "2ms", "4ms", "8ms", "1ms", "9ms"], # polling rate
), ),
) )
def test_Device_ids( def test_device_ids(number, info, responses, handle, unitId, modelId, tid, kind, firmware, serial, id, psl, rate, mock_hid):
number, info, responses, handle, unitId, modelId, tid, kind, firmware, serial, id, psl, rate, mock_base, mock_hid
):
mock_base[0].side_effect = hidpp.open_path
mock_base[1].side_effect = partial(hidpp.request, hidpp.replace_number(responses, number))
mock_base[2].side_effect = partial(hidpp.ping, hidpp.replace_number(responses, number))
mock_hid.side_effect = lambda x, y, z: x mock_hid.side_effect = lambda x, y, z: x
test_device = device.Device(Receiver(), number, True, info, handle=handle) low_level = LowLevelInterfaceFake(responses)
low_level.request = partial(hidpp.request, hidpp.replace_number(responses, number))
low_level.ping = partial(hidpp.ping, hidpp.replace_number(responses, number))
test_device = device.Device(low_level, FakeReceiver(), number, True, info, handle=handle)
assert test_device.unitId == unitId assert test_device.unitId == unitId
assert test_device.modelId == modelId assert test_device.modelId == modelId
@ -242,10 +239,10 @@ def test_Device_ids(
assert test_device.polling_rate == rate assert test_device.polling_rate == rate
class TestDevice(device.Device): # a fully functional Device but its HID++ functions look at local data class FakeDevice(device.Device): # a fully functional Device but its HID++ functions look at local data
def __init__(self, responses, *args, **kwargs): def __init__(self, responses, *args, **kwargs):
self.responses = responses self.responses = responses
super().__init__(*args, **kwargs) super().__init__(LowLevelInterfaceFake(responses), *args, **kwargs)
request = hidpp.Device.request request = hidpp.Device.request
ping = hidpp.Device.ping ping = hidpp.Device.ping
@ -262,8 +259,8 @@ class TestDevice(device.Device): # a fully functional Device but its HID++ func
(di_B530, hidpp.complex_responses_2, 4.5, hidpp20.RGBEffectsInfo, 8, 3, 1, True, True), (di_B530, hidpp.complex_responses_2, 4.5, hidpp20.RGBEffectsInfo, 8, 3, 1, True, True),
], ],
) )
def test_Device_complex(device_info, responses, protocol, led, keys, remap, gestures, backlight, profiles, mocker): def test_device_complex(device_info, responses, protocol, led, keys, remap, gestures, backlight, profiles, mocker):
test_device = TestDevice(responses, None, None, True, device_info=device_info) test_device = FakeDevice(responses, None, None, True, device_info=device_info)
test_device._name = "TestDevice" test_device._name = "TestDevice"
test_device._protocol = protocol test_device._protocol = protocol
spy_request = mocker.spy(test_device, "request") spy_request = mocker.spy(test_device, "request")
@ -300,9 +297,9 @@ def test_Device_complex(device_info, responses, protocol, led, keys, remap, gest
(di_C08A, hidpp.r_mouse_2, 4.5, {"p": "p"}, {"p": "p"}, 0), (di_C08A, hidpp.r_mouse_2, 4.5, {"p": "p"}, {"p": "p"}, 0),
], ],
) )
def test_Device_settings(device_info, responses, protocol, p, persister, settings, mocker): def test_device_settings(device_info, responses, protocol, p, persister, settings, mocker):
mocker.patch("solaar.configuration.persister", return_value=p) mocker.patch("solaar.configuration.persister", return_value=p)
test_device = TestDevice(responses, None, None, True, device_info=device_info) test_device = FakeDevice(responses, None, None, True, device_info=device_info)
test_device._name = "TestDevice" test_device._name = "TestDevice"
test_device._protocol = protocol test_device._protocol = protocol
@ -318,8 +315,8 @@ def test_Device_settings(device_info, responses, protocol, p, persister, setting
(di_B530, hidpp.r_keyboard_2, 4.5, common.Battery(18, 52, None, None), {"active": True, "alert": 0, "reason": None}), (di_B530, hidpp.r_keyboard_2, 4.5, common.Battery(18, 52, None, None), {"active": True, "alert": 0, "reason": None}),
], ],
) )
def test_Device_battery(device_info, responses, protocol, battery, changed, mocker): def test_device_battery(device_info, responses, protocol, battery, changed, mocker):
test_device = TestDevice(responses, None, None, online=True, device_info=device_info) test_device = FakeDevice(responses, None, None, online=True, device_info=device_info)
test_device._name = "TestDevice" test_device._name = "TestDevice"
test_device._protocol = protocol test_device._protocol = protocol
spy_changed = mocker.spy(test_device, "changed") spy_changed = mocker.spy(test_device, "changed")
@ -327,14 +324,3 @@ def test_Device_battery(device_info, responses, protocol, battery, changed, mock
assert test_device.battery() == battery assert test_device.battery() == battery
test_device.read_battery() test_device.read_battery()
spy_changed.assert_called_with(**changed) spy_changed.assert_called_with(**changed)
""" TODO
changed
enable_connection_notifications
add_notification_handler
remove_notification_handler
handle_notification
"""
# IMPORTANT TODO - battery