notifications: Introduce unit tests
This commit is contained in:
parent
0cd9c0c9b5
commit
0d12c6f229
|
@ -15,11 +15,16 @@
|
||||||
## with this program; if not, write to the Free Software Foundation, Inc.,
|
## with this program; if not, write to the Free Software Foundation, Inc.,
|
||||||
## 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
|
## 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
|
||||||
|
|
||||||
# Handles incoming events from the receiver/devices, updating the object as appropriate.
|
"""Handles incoming events from the receiver/devices, updating the
|
||||||
|
object as appropriate.
|
||||||
|
"""
|
||||||
|
|
||||||
|
from __future__ import annotations
|
||||||
|
|
||||||
import logging
|
import logging
|
||||||
import struct
|
import struct
|
||||||
import threading
|
import threading
|
||||||
|
import typing
|
||||||
|
|
||||||
from solaar.i18n import _
|
from solaar.i18n import _
|
||||||
|
|
||||||
|
@ -36,6 +41,11 @@ from .common import BatteryStatus
|
||||||
from .common import Notification
|
from .common import Notification
|
||||||
from .hidpp10_constants import Registers
|
from .hidpp10_constants import Registers
|
||||||
|
|
||||||
|
if typing.TYPE_CHECKING:
|
||||||
|
from .base import HIDPPNotification
|
||||||
|
from .receiver import Receiver
|
||||||
|
|
||||||
|
|
||||||
logger = logging.getLogger(__name__)
|
logger = logging.getLogger(__name__)
|
||||||
|
|
||||||
_hidpp10 = hidpp10.Hidpp10()
|
_hidpp10 = hidpp10.Hidpp10()
|
||||||
|
@ -55,19 +65,31 @@ def process(device, notification):
|
||||||
return _process_device_notification(device, notification)
|
return _process_device_notification(device, notification)
|
||||||
|
|
||||||
|
|
||||||
def _process_receiver_notification(receiver, n):
|
def _process_receiver_notification(receiver: Receiver, hidpp_notification: HIDPPNotification) -> bool | None:
|
||||||
# supposedly only 0x4x notifications arrive for the receiver
|
# supposedly only 0x4x notifications arrive for the receiver
|
||||||
assert n.sub_id & 0x40 == 0x40
|
assert hidpp_notification.sub_id in [
|
||||||
|
Notification.CONNECT_DISCONNECT,
|
||||||
|
Notification.DJ_PAIRING,
|
||||||
|
Notification.CONNECTED,
|
||||||
|
Notification.RAW_INPUT,
|
||||||
|
Notification.PAIRING_LOCK,
|
||||||
|
Notification.POWER,
|
||||||
|
Registers.DEVICE_DISCOVERY_NOTIFICATION,
|
||||||
|
Registers.DISCOVERY_STATUS_NOTIFICATION,
|
||||||
|
Registers.PAIRING_STATUS_NOTIFICATION,
|
||||||
|
Registers.PASSKEY_PRESSED_NOTIFICATION,
|
||||||
|
Registers.PASSKEY_REQUEST_NOTIFICATION,
|
||||||
|
]
|
||||||
|
|
||||||
if n.sub_id == Notification.PAIRING_LOCK:
|
if hidpp_notification.sub_id == Notification.PAIRING_LOCK:
|
||||||
receiver.pairing.lock_open = bool(n.address & 0x01)
|
receiver.pairing.lock_open = bool(hidpp_notification.address & 0x01)
|
||||||
reason = _("pairing lock is open") if receiver.pairing.lock_open else _("pairing lock is closed")
|
reason = _("pairing lock is open") if receiver.pairing.lock_open else _("pairing lock is closed")
|
||||||
if logger.isEnabledFor(logging.INFO):
|
if logger.isEnabledFor(logging.INFO):
|
||||||
logger.info("%s: %s", receiver, reason)
|
logger.info("%s: %s", receiver, reason)
|
||||||
receiver.pairing.error = None
|
receiver.pairing.error = None
|
||||||
if receiver.pairing.lock_open:
|
if receiver.pairing.lock_open:
|
||||||
receiver.pairing.new_device = None
|
receiver.pairing.new_device = None
|
||||||
pair_error = ord(n.data[:1])
|
pair_error = ord(hidpp_notification.data[:1])
|
||||||
if pair_error:
|
if pair_error:
|
||||||
receiver.pairing.error = error_string = hidpp10_constants.PAIRING_ERRORS[pair_error]
|
receiver.pairing.error = error_string = hidpp10_constants.PAIRING_ERRORS[pair_error]
|
||||||
receiver.pairing.new_device = None
|
receiver.pairing.new_device = None
|
||||||
|
@ -75,9 +97,9 @@ def _process_receiver_notification(receiver, n):
|
||||||
receiver.changed(reason=reason)
|
receiver.changed(reason=reason)
|
||||||
return True
|
return True
|
||||||
|
|
||||||
elif n.sub_id == Registers.DISCOVERY_STATUS_NOTIFICATION: # Bolt pairing
|
elif hidpp_notification.sub_id == Registers.DISCOVERY_STATUS_NOTIFICATION: # Bolt pairing
|
||||||
with notification_lock:
|
with notification_lock:
|
||||||
receiver.pairing.discovering = n.address == 0x00
|
receiver.pairing.discovering = hidpp_notification.address == 0x00
|
||||||
reason = _("discovery lock is open") if receiver.pairing.discovering else _("discovery lock is closed")
|
reason = _("discovery lock is open") if receiver.pairing.discovering else _("discovery lock is closed")
|
||||||
if logger.isEnabledFor(logging.INFO):
|
if logger.isEnabledFor(logging.INFO):
|
||||||
logger.info("%s: %s", receiver, reason)
|
logger.info("%s: %s", receiver, reason)
|
||||||
|
@ -86,33 +108,33 @@ def _process_receiver_notification(receiver, n):
|
||||||
receiver.pairing.counter = receiver.pairing.device_address = None
|
receiver.pairing.counter = receiver.pairing.device_address = None
|
||||||
receiver.pairing.device_authentication = receiver.pairing.device_name = None
|
receiver.pairing.device_authentication = receiver.pairing.device_name = None
|
||||||
receiver.pairing.device_passkey = None
|
receiver.pairing.device_passkey = None
|
||||||
discover_error = ord(n.data[:1])
|
discover_error = ord(hidpp_notification.data[:1])
|
||||||
if discover_error:
|
if discover_error:
|
||||||
receiver.pairing.error = discover_string = hidpp10_constants.BOLT_PAIRING_ERRORS[discover_error]
|
receiver.pairing.error = discover_string = hidpp10_constants.BOLT_PAIRING_ERRORS[discover_error]
|
||||||
logger.warning("bolt discovering error %d: %s", discover_error, discover_string)
|
logger.warning("bolt discovering error %d: %s", discover_error, discover_string)
|
||||||
receiver.changed(reason=reason)
|
receiver.changed(reason=reason)
|
||||||
return True
|
return True
|
||||||
|
|
||||||
elif n.sub_id == Registers.DEVICE_DISCOVERY_NOTIFICATION: # Bolt pairing
|
elif hidpp_notification.sub_id == Registers.DEVICE_DISCOVERY_NOTIFICATION: # Bolt pairing
|
||||||
with notification_lock:
|
with notification_lock:
|
||||||
counter = n.address + n.data[0] * 256 # notification counter
|
counter = hidpp_notification.address + hidpp_notification.data[0] * 256 # notification counter
|
||||||
if receiver.pairing.counter is None:
|
if receiver.pairing.counter is None:
|
||||||
receiver.pairing.counter = counter
|
receiver.pairing.counter = counter
|
||||||
else:
|
else:
|
||||||
if not receiver.pairing.counter == counter:
|
if not receiver.pairing.counter == counter:
|
||||||
return None
|
return None
|
||||||
if n.data[1] == 0:
|
if hidpp_notification.data[1] == 0:
|
||||||
receiver.pairing.device_kind = n.data[3]
|
receiver.pairing.device_kind = hidpp_notification.data[3]
|
||||||
receiver.pairing.device_address = n.data[6:12]
|
receiver.pairing.device_address = hidpp_notification.data[6:12]
|
||||||
receiver.pairing.device_authentication = n.data[14]
|
receiver.pairing.device_authentication = hidpp_notification.data[14]
|
||||||
elif n.data[1] == 1:
|
elif hidpp_notification.data[1] == 1:
|
||||||
receiver.pairing.device_name = n.data[3 : 3 + n.data[2]].decode("utf-8")
|
receiver.pairing.device_name = hidpp_notification.data[3 : 3 + hidpp_notification.data[2]].decode("utf-8")
|
||||||
return True
|
return True
|
||||||
|
|
||||||
elif n.sub_id == Registers.PAIRING_STATUS_NOTIFICATION: # Bolt pairing
|
elif hidpp_notification.sub_id == Registers.PAIRING_STATUS_NOTIFICATION: # Bolt pairing
|
||||||
with notification_lock:
|
with notification_lock:
|
||||||
receiver.pairing.device_passkey = None
|
receiver.pairing.device_passkey = None
|
||||||
receiver.pairing.lock_open = n.address == 0x00
|
receiver.pairing.lock_open = hidpp_notification.address == 0x00
|
||||||
reason = _("pairing lock is open") if receiver.pairing.lock_open else _("pairing lock is closed")
|
reason = _("pairing lock is open") if receiver.pairing.lock_open else _("pairing lock is closed")
|
||||||
if logger.isEnabledFor(logging.INFO):
|
if logger.isEnabledFor(logging.INFO):
|
||||||
logger.info("%s: %s", receiver, reason)
|
logger.info("%s: %s", receiver, reason)
|
||||||
|
@ -122,11 +144,11 @@ def _process_receiver_notification(receiver, n):
|
||||||
receiver.pairing.device_address = None
|
receiver.pairing.device_address = None
|
||||||
receiver.pairing.device_authentication = None
|
receiver.pairing.device_authentication = None
|
||||||
receiver.pairing.device_name = None
|
receiver.pairing.device_name = None
|
||||||
pair_error = n.data[0]
|
pair_error = hidpp_notification.data[0]
|
||||||
if receiver.pairing.lock_open:
|
if receiver.pairing.lock_open:
|
||||||
receiver.pairing.new_device = None
|
receiver.pairing.new_device = None
|
||||||
elif n.address == 0x02 and not pair_error:
|
elif hidpp_notification.address == 0x02 and not pair_error:
|
||||||
receiver.pairing.new_device = receiver.register_new_device(n.data[7])
|
receiver.pairing.new_device = receiver.register_new_device(hidpp_notification.data[7])
|
||||||
if pair_error:
|
if pair_error:
|
||||||
receiver.pairing.error = error_string = hidpp10_constants.BOLT_PAIRING_ERRORS[pair_error]
|
receiver.pairing.error = error_string = hidpp10_constants.BOLT_PAIRING_ERRORS[pair_error]
|
||||||
receiver.pairing.new_device = None
|
receiver.pairing.new_device = None
|
||||||
|
@ -134,15 +156,15 @@ def _process_receiver_notification(receiver, n):
|
||||||
receiver.changed(reason=reason)
|
receiver.changed(reason=reason)
|
||||||
return True
|
return True
|
||||||
|
|
||||||
elif n.sub_id == Registers.PASSKEY_REQUEST_NOTIFICATION: # Bolt pairing
|
elif hidpp_notification.sub_id == Registers.PASSKEY_REQUEST_NOTIFICATION: # Bolt pairing
|
||||||
with notification_lock:
|
with notification_lock:
|
||||||
receiver.pairing.device_passkey = n.data[0:6].decode("utf-8")
|
receiver.pairing.device_passkey = hidpp_notification.data[0:6].decode("utf-8")
|
||||||
return True
|
return True
|
||||||
|
|
||||||
elif n.sub_id == Registers.PASSKEY_PRESSED_NOTIFICATION: # Bolt pairing
|
elif hidpp_notification.sub_id == Registers.PASSKEY_PRESSED_NOTIFICATION: # Bolt pairing
|
||||||
return True
|
return True
|
||||||
|
|
||||||
logger.warning("%s: unhandled notification %s", receiver, n)
|
logger.warning("%s: unhandled notification %s", receiver, hidpp_notification)
|
||||||
|
|
||||||
|
|
||||||
def _process_device_notification(device, n):
|
def _process_device_notification(device, n):
|
||||||
|
|
|
@ -0,0 +1,39 @@
|
||||||
|
import pytest
|
||||||
|
|
||||||
|
from logitech_receiver import hidpp10_constants
|
||||||
|
from logitech_receiver import notifications
|
||||||
|
from logitech_receiver.base import HIDPPNotification
|
||||||
|
from logitech_receiver.hidpp10_constants import Registers
|
||||||
|
from logitech_receiver.receiver import Receiver
|
||||||
|
|
||||||
|
|
||||||
|
class MockLowLevelInterface:
|
||||||
|
def open_path(self, path):
|
||||||
|
pass
|
||||||
|
|
||||||
|
def find_paired_node_wpid(self, receiver_path: str, index: int):
|
||||||
|
pass
|
||||||
|
|
||||||
|
def ping(self, handle, number, long_message=False):
|
||||||
|
pass
|
||||||
|
|
||||||
|
def request(self, handle, devnumber, request_id, *params, **kwargs):
|
||||||
|
pass
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.mark.parametrize(
|
||||||
|
"sub_id, notification_data, expected_error, expected_new_device",
|
||||||
|
[
|
||||||
|
(Registers.DISCOVERY_STATUS_NOTIFICATION, b"\x01", "device_timeout", None),
|
||||||
|
(Registers.PAIRING_STATUS_NOTIFICATION, b"\x02", "failed", None),
|
||||||
|
],
|
||||||
|
)
|
||||||
|
def test_process_receiver_notification(sub_id, notification_data, expected_error, expected_new_device):
|
||||||
|
receiver: Receiver = Receiver(MockLowLevelInterface(), None, {}, True, None, None)
|
||||||
|
notification = HIDPPNotification(0, 0, sub_id, 0, notification_data)
|
||||||
|
|
||||||
|
result = notifications._process_receiver_notification(receiver, notification)
|
||||||
|
|
||||||
|
assert result
|
||||||
|
assert receiver.pairing.error == hidpp10_constants.BOLT_PAIRING_ERRORS[expected_error]
|
||||||
|
assert receiver.pairing.new_device is expected_new_device
|
Loading…
Reference in New Issue