Test notifications

Fixes #2711
This commit is contained in:
MattHag 2024-12-16 00:17:44 +01:00 committed by Peter F. Patel-Schneider
parent f28a923d15
commit 207be464a5
3 changed files with 250 additions and 2 deletions

View File

@ -1902,7 +1902,7 @@ def decipher_battery_voltage(report: bytes):
charge_sts = ChargeStatus(flags & 0x03) charge_sts = ChargeStatus(flags & 0x03)
if charge_sts is None: if charge_sts is None:
charge_sts = ErrorCode.UNKNOWN charge_sts = ErrorCode.UNKNOWN
elif ChargeStatus.FULL in charge_sts: elif isinstance(charge_sts, ChargeStatus) and ChargeStatus.FULL in charge_sts:
charge_lvl = ChargeLevel.FULL charge_lvl = ChargeLevel.FULL
status = BatteryStatus.FULL status = BatteryStatus.FULL
if flags & (1 << 3): if flags & (1 << 3):

View File

@ -388,6 +388,7 @@ class Device:
setting_callback: Any = None setting_callback: Any = None
sliding = profiles = _backlight = _keys = _remap_keys = _led_effects = _gestures = None sliding = profiles = _backlight = _keys = _remap_keys = _led_effects = _gestures = None
_gestures_lock = threading.Lock() _gestures_lock = threading.Lock()
number = "d1"
read_register = device.Device.read_register read_register = device.Device.read_register
write_register = device.Device.write_register write_register = device.Device.write_register
@ -405,6 +406,7 @@ class Device:
self.persister = configuration._DeviceEntry() self.persister = configuration._DeviceEntry()
self.features = hidpp20.FeaturesArray(self) self.features = hidpp20.FeaturesArray(self)
self.settings = [] self.settings = []
self.receiver = []
if self.feature is not None: if self.feature is not None:
self.features = hidpp20.FeaturesArray(self) self.features = hidpp20.FeaturesArray(self)
self.responses = [ self.responses = [
@ -435,6 +437,18 @@ class Device:
print("PING", self._protocol) print("PING", self._protocol)
return self._protocol return self._protocol
def handle_notification(self, handle):
pass
def changed(self, *args, **kwargs):
pass
def set_battery_info(self, *args, **kwargs):
pass
def status_string(self):
pass
def match_requests(number, responses, call_args_list): def match_requests(number, responses, call_args_list):
for i in range(0 - number, 0): for i in range(0 - number, 0):

View File

@ -6,8 +6,11 @@ from logitech_receiver.common import Notification
from logitech_receiver.hidpp10_constants import BoltPairingError from logitech_receiver.hidpp10_constants import BoltPairingError
from logitech_receiver.hidpp10_constants import PairingError from logitech_receiver.hidpp10_constants import PairingError
from logitech_receiver.hidpp10_constants import Registers from logitech_receiver.hidpp10_constants import Registers
from logitech_receiver.hidpp20_constants import SupportedFeature
from logitech_receiver.receiver import Receiver from logitech_receiver.receiver import Receiver
from . import fake_hidpp
class MockLowLevelInterface: class MockLowLevelInterface:
def open_path(self, path): def open_path(self, path):
@ -22,24 +25,255 @@ class MockLowLevelInterface:
def request(self, handle, devnumber, request_id, *params, **kwargs): def request(self, handle, devnumber, request_id, *params, **kwargs):
pass pass
def find_paired_node(self, receiver_path: str, index: int, timeout: int):
return None
def close(self, device_handle) -> None:
pass
@pytest.mark.parametrize( @pytest.mark.parametrize(
"sub_id, notification_data, expected_error, expected_new_device", "sub_id, notification_data, expected_error, expected_new_device",
[ [
(Registers.DISCOVERY_STATUS_NOTIFICATION, b"\x01", BoltPairingError.DEVICE_TIMEOUT, None), (Registers.DISCOVERY_STATUS_NOTIFICATION, b"\x01", BoltPairingError.DEVICE_TIMEOUT, None),
(
Registers.DEVICE_DISCOVERY_NOTIFICATION,
b"\x01\x01\x01\x01\x01\x01\x01\x01\x01",
None,
None,
),
(Registers.PAIRING_STATUS_NOTIFICATION, b"\x02", BoltPairingError.FAILED, None), (Registers.PAIRING_STATUS_NOTIFICATION, b"\x02", BoltPairingError.FAILED, None),
(Notification.PAIRING_LOCK, b"\x01", PairingError.DEVICE_TIMEOUT, None), (Notification.PAIRING_LOCK, b"\x01", PairingError.DEVICE_TIMEOUT, None),
(Notification.PAIRING_LOCK, b"\x02", PairingError.DEVICE_NOT_SUPPORTED, None), (Notification.PAIRING_LOCK, b"\x02", PairingError.DEVICE_NOT_SUPPORTED, None),
(Notification.PAIRING_LOCK, b"\x03", PairingError.TOO_MANY_DEVICES, None), (Notification.PAIRING_LOCK, b"\x03", PairingError.TOO_MANY_DEVICES, None),
(Notification.PAIRING_LOCK, b"\x06", PairingError.SEQUENCE_TIMEOUT, None), (Notification.PAIRING_LOCK, b"\x06", PairingError.SEQUENCE_TIMEOUT, None),
(Registers.PASSKEY_REQUEST_NOTIFICATION, b"\x06", None, None),
(Registers.PASSKEY_PRESSED_NOTIFICATION, b"\x06", None, None),
], ],
) )
def test_process_receiver_notification(sub_id, notification_data, expected_error, expected_new_device): def test_process_receiver_notification(sub_id, notification_data, expected_error, expected_new_device):
receiver: Receiver = Receiver(MockLowLevelInterface(), None, {}, True, None, None) receiver: Receiver = Receiver(MockLowLevelInterface(), None, {}, True, None, None)
notification = HIDPPNotification(0, 0, sub_id, 0, notification_data) notification = HIDPPNotification(0, 0, sub_id, 0x02, notification_data)
result = notifications._process_receiver_notification(receiver, notification) result = notifications._process_receiver_notification(receiver, notification)
assert result assert result
assert receiver.pairing.error == expected_error assert receiver.pairing.error == expected_error
assert receiver.pairing.new_device is expected_new_device assert receiver.pairing.new_device is expected_new_device
@pytest.mark.parametrize(
"hidpp_notification, expected",
[
(HIDPPNotification(0, 0, sub_id=Registers.BATTERY_STATUS, address=0, data=b"0x01"), False),
(HIDPPNotification(0, 0, sub_id=Notification.NO_OPERATION, address=0, data=b"0x01"), False),
(HIDPPNotification(0, 0, sub_id=0x40, address=0, data=b"0x01"), True),
],
)
def test_process_device_notification(hidpp_notification, expected):
device = fake_hidpp.Device()
result = notifications._process_device_notification(device, hidpp_notification)
assert result == expected
@pytest.mark.parametrize(
"hidpp_notification, expected",
[
(HIDPPNotification(0, 0, sub_id=Notification.CONNECT_DISCONNECT, address=0, data=b"0x01"), True),
(HIDPPNotification(0, 0, sub_id=Notification.DJ_PAIRING, address=0, data=b"0x01"), True),
(HIDPPNotification(0, 0, sub_id=Notification.CONNECTED, address=0, data=b"0x01"), True),
(HIDPPNotification(0, 0, sub_id=Notification.RAW_INPUT, address=0, data=b"0x01"), None),
],
)
def test_process_dj_notification(hidpp_notification, expected):
device = fake_hidpp.Device()
result = notifications._process_dj_notification(device, hidpp_notification)
assert result == expected
@pytest.mark.parametrize(
"hidpp_notification, expected",
[
(HIDPPNotification(0, 0, sub_id=Registers.BATTERY_STATUS, address=0, data=b"\x01\x00"), True),
(HIDPPNotification(0, 0, sub_id=Registers.BATTERY_CHARGE, address=0, data=b"0x01\x00"), True),
(HIDPPNotification(0, 0, sub_id=Notification.RAW_INPUT, address=0, data=b"0x01"), None),
],
)
def test_process_hidpp10_custom_notification(hidpp_notification, expected):
device = fake_hidpp.Device()
result = notifications._process_hidpp10_custom_notification(device, hidpp_notification)
assert result == expected
@pytest.mark.parametrize(
"hidpp_notification, expected",
[
(HIDPPNotification(0, 0, sub_id=Notification.CONNECT_DISCONNECT, address=0x02, data=b"0x01"), True),
(HIDPPNotification(0, 0, sub_id=Notification.CONNECT_DISCONNECT, address=0x00, data=b"0x01"), True),
(HIDPPNotification(0, 0, sub_id=Notification.DJ_PAIRING, address=0x00, data=b"0x01"), True),
(HIDPPNotification(0, 0, sub_id=Notification.DJ_PAIRING, address=0x02, data=b"0x01"), True),
(HIDPPNotification(0, 0, sub_id=Notification.DJ_PAIRING, address=0x03, data=b"0x01"), True),
(HIDPPNotification(0, 0, sub_id=Notification.DJ_PAIRING, address=0x03, data=b"0x4040"), True),
(HIDPPNotification(0, 0, sub_id=Notification.RAW_INPUT, address=0x00, data=b"0x01"), True),
(HIDPPNotification(0, 0, sub_id=Notification.POWER, address=0x00, data=b"0x01"), True),
(HIDPPNotification(0, 0, sub_id=Notification.POWER, address=0x01, data=b"0x01"), True),
(HIDPPNotification(0, 0, sub_id=Notification.PAIRING_LOCK, address=0x01, data=b"0x01"), None),
],
)
def test_process_hidpp10_notification(hidpp_notification, expected):
fake_device = fake_hidpp.Device()
fake_device.receiver = ["rec1", "rec2"]
result = notifications._process_hidpp10_notification(fake_device, hidpp_notification)
assert result == expected
@pytest.mark.parametrize(
"hidpp_notification, feature",
[
(
HIDPPNotification(0, 0, sub_id=Notification.CONNECT_DISCONNECT, address=0x00, data=b"0x01"),
SupportedFeature.BATTERY_STATUS,
),
(
HIDPPNotification(0, 0, sub_id=Notification.CONNECT_DISCONNECT, address=0x02, data=b"0x01"),
SupportedFeature.BATTERY_STATUS,
),
(
HIDPPNotification(0, 0, sub_id=Notification.CONNECT_DISCONNECT, address=0x00, data=b"0x01"),
SupportedFeature.BATTERY_VOLTAGE,
),
(
HIDPPNotification(0, 0, sub_id=Notification.CONNECT_DISCONNECT, address=0x05, data=b"0x01"),
SupportedFeature.BATTERY_VOLTAGE,
),
(
HIDPPNotification(0, 0, sub_id=Notification.CONNECT_DISCONNECT, address=0x00, data=b"0x01"),
SupportedFeature.UNIFIED_BATTERY,
),
(
HIDPPNotification(0, 0, sub_id=Notification.CONNECT_DISCONNECT, address=0x02, data=b"0x01"),
SupportedFeature.UNIFIED_BATTERY,
),
(
HIDPPNotification(0, 0, sub_id=Notification.CONNECT_DISCONNECT, address=0x00, data=b"0x01"),
SupportedFeature.ADC_MEASUREMENT,
),
(
HIDPPNotification(0, 0, sub_id=Notification.CONNECT_DISCONNECT, address=0x02, data=b"0x01"),
SupportedFeature.ADC_MEASUREMENT,
),
(
HIDPPNotification(0, 0, sub_id=Notification.CONNECT_DISCONNECT, address=0x00, data=b"01234GOOD"),
SupportedFeature.SOLAR_DASHBOARD,
),
(
HIDPPNotification(0, 0, sub_id=Notification.CONNECT_DISCONNECT, address=0x10, data=b"01234GOOD"),
SupportedFeature.SOLAR_DASHBOARD,
),
(
HIDPPNotification(0, 0, sub_id=Notification.CONNECT_DISCONNECT, address=0x20, data=b"01234GOOD"),
SupportedFeature.SOLAR_DASHBOARD,
),
(
HIDPPNotification(0, 0, sub_id=Notification.CONNECT_DISCONNECT, address=0x02, data=b"01234GOOD"),
SupportedFeature.SOLAR_DASHBOARD,
),
(
HIDPPNotification(0, 0, sub_id=Notification.CONNECT_DISCONNECT, address=0x02, data=b"CHARGENOTGOOD"),
SupportedFeature.SOLAR_DASHBOARD,
),
(
HIDPPNotification(0, 0, sub_id=Notification.CONNECT_DISCONNECT, address=0x00, data=b"\x01\x01\x02"),
SupportedFeature.WIRELESS_DEVICE_STATUS,
),
(
HIDPPNotification(0, 0, sub_id=Notification.CONNECT_DISCONNECT, address=0x02, data=b"0x01"),
SupportedFeature.WIRELESS_DEVICE_STATUS,
),
(
HIDPPNotification(0, 0, sub_id=Notification.CONNECT_DISCONNECT, address=0x00, data=b"0x01"),
SupportedFeature.TOUCHMOUSE_RAW_POINTS,
),
(
HIDPPNotification(0, 0, sub_id=Notification.CONNECT_DISCONNECT, address=0x10, data=b"0x01"),
SupportedFeature.TOUCHMOUSE_RAW_POINTS,
),
(
HIDPPNotification(0, 0, sub_id=Notification.CONNECT_DISCONNECT, address=0x05, data=b"0x01"),
SupportedFeature.TOUCHMOUSE_RAW_POINTS,
),
(
HIDPPNotification(0, 0, sub_id=Notification.CONNECT_DISCONNECT, address=0x00, data=b"0x01"),
SupportedFeature.REPROG_CONTROLS,
),
(
HIDPPNotification(0, 0, sub_id=Notification.CONNECT_DISCONNECT, address=0x02, data=b"0x01"),
SupportedFeature.REPROG_CONTROLS,
),
(
HIDPPNotification(0, 0, sub_id=Notification.CONNECT_DISCONNECT, address=0x00, data=b"0x01"),
SupportedFeature.BACKLIGHT2,
),
(
HIDPPNotification(
0, 0, sub_id=Notification.CONNECT_DISCONNECT, address=0x00, data=b"\x01\x01\x01\x01\x01\x01\x01\x01"
),
SupportedFeature.REPROG_CONTROLS_V4,
),
(
HIDPPNotification(0, 0, sub_id=Notification.CONNECT_DISCONNECT, address=0x10, data=b"0x01"),
SupportedFeature.REPROG_CONTROLS_V4,
),
(
HIDPPNotification(0, 0, sub_id=Notification.CONNECT_DISCONNECT, address=0x20, data=b"0x01"),
SupportedFeature.REPROG_CONTROLS_V4,
),
(
HIDPPNotification(0, 0, sub_id=Notification.CONNECT_DISCONNECT, address=0x00, data=b"0x01"),
SupportedFeature.HIRES_WHEEL,
),
(
HIDPPNotification(0, 0, sub_id=Notification.CONNECT_DISCONNECT, address=0x10, data=b"0x01"),
SupportedFeature.HIRES_WHEEL,
),
(
HIDPPNotification(0, 0, sub_id=Notification.CONNECT_DISCONNECT, address=0x02, data=b"0x01"),
SupportedFeature.HIRES_WHEEL,
),
(
HIDPPNotification(0, 0, sub_id=Notification.CONNECT_DISCONNECT, address=0x00, data=b"0x01"),
SupportedFeature.ONBOARD_PROFILES,
),
(
HIDPPNotification(0, 0, sub_id=Notification.CONNECT_DISCONNECT, address=0x20, data=b"0x01"),
SupportedFeature.ONBOARD_PROFILES,
),
(
HIDPPNotification(0, 0, sub_id=Notification.CONNECT_DISCONNECT, address=0x00, data=b"0x01"),
SupportedFeature.BRIGHTNESS_CONTROL,
),
(
HIDPPNotification(0, 0, sub_id=Notification.CONNECT_DISCONNECT, address=0x10, data=b"0x01"),
SupportedFeature.BRIGHTNESS_CONTROL,
),
(
HIDPPNotification(0, 0, sub_id=Notification.CONNECT_DISCONNECT, address=0x20, data=b"0x01"),
SupportedFeature.BRIGHTNESS_CONTROL,
),
],
)
def test_process_feature_notification(mocker, hidpp_notification, feature):
fake_device = fake_hidpp.Device()
fake_device.receiver = ["rec1", "rec2"]
result = notifications._process_feature_notification(fake_device, hidpp_notification, feature)
assert result is True