notification: Refactor process_device_notification
Simplify code and unify interfaces and type hints. Related #2273
This commit is contained in:
parent
15aaba2802
commit
8894463f64
|
@ -68,7 +68,7 @@ def process(device: Device | Receiver, notification: HIDPPNotification):
|
||||||
return process_device_notification(device, notification)
|
return process_device_notification(device, notification)
|
||||||
|
|
||||||
|
|
||||||
def process_receiver_notification(receiver: Receiver, hidpp_notification: HIDPPNotification) -> bool | None:
|
def process_receiver_notification(receiver: Receiver, notification: HIDPPNotification) -> bool | None:
|
||||||
"""Process event messages from receivers."""
|
"""Process event messages from receivers."""
|
||||||
event_handler_mapping: dict[int, NotificationHandler] = {
|
event_handler_mapping: dict[int, NotificationHandler] = {
|
||||||
Notification.PAIRING_LOCK: handle_pairing_lock,
|
Notification.PAIRING_LOCK: handle_pairing_lock,
|
||||||
|
@ -80,12 +80,12 @@ def process_receiver_notification(receiver: Receiver, hidpp_notification: HIDPPN
|
||||||
}
|
}
|
||||||
|
|
||||||
try:
|
try:
|
||||||
handler_func = event_handler_mapping[hidpp_notification.sub_id]
|
handler_func = event_handler_mapping[notification.sub_id]
|
||||||
return handler_func(receiver, hidpp_notification)
|
return handler_func(receiver, notification)
|
||||||
except KeyError:
|
except KeyError:
|
||||||
pass
|
pass
|
||||||
|
|
||||||
assert hidpp_notification.sub_id in [
|
assert notification.sub_id in [
|
||||||
Notification.CONNECT_DISCONNECT,
|
Notification.CONNECT_DISCONNECT,
|
||||||
Notification.DJ_PAIRING,
|
Notification.DJ_PAIRING,
|
||||||
Notification.CONNECTED,
|
Notification.CONNECTED,
|
||||||
|
@ -93,10 +93,12 @@ def process_receiver_notification(receiver: Receiver, hidpp_notification: HIDPPN
|
||||||
Notification.POWER,
|
Notification.POWER,
|
||||||
]
|
]
|
||||||
|
|
||||||
logger.warning(f"{receiver}: unhandled notification {hidpp_notification}")
|
logger.warning(f"{receiver}: unhandled notification {notification}")
|
||||||
|
|
||||||
|
|
||||||
def process_device_notification(device, notification):
|
def process_device_notification(device: Device, notification: HIDPPNotification):
|
||||||
|
"""Process event messages from devices."""
|
||||||
|
|
||||||
# incoming packets with SubId >= 0x80 are supposedly replies from HID++ 1.0 requests, should never get here
|
# incoming packets with SubId >= 0x80 are supposedly replies from HID++ 1.0 requests, should never get here
|
||||||
assert notification.sub_id & 0x80 == 0
|
assert notification.sub_id & 0x80 == 0
|
||||||
|
|
||||||
|
@ -129,16 +131,11 @@ def process_device_notification(device, notification):
|
||||||
if not device.features:
|
if not device.features:
|
||||||
logger.warning("%s: feature notification but features not set up: %02X %s", device, notification.sub_id, notification)
|
logger.warning("%s: feature notification but features not set up: %02X %s", device, notification.sub_id, notification)
|
||||||
return False
|
return False
|
||||||
try:
|
|
||||||
feature = device.features.get_feature(notification.sub_id)
|
|
||||||
except IndexError:
|
|
||||||
logger.warning("%s: notification from invalid feature index %02X: %s", device, notification.sub_id, notification)
|
|
||||||
return False
|
|
||||||
|
|
||||||
return _process_feature_notification(device, notification, feature)
|
return _process_feature_notification(device, notification)
|
||||||
|
|
||||||
|
|
||||||
def _process_dj_notification(device, notification):
|
def _process_dj_notification(device: Device, notification: HIDPPNotification):
|
||||||
if logger.isEnabledFor(logging.DEBUG):
|
if logger.isEnabledFor(logging.DEBUG):
|
||||||
logger.debug("%s (%s) DJ %s", device, device.protocol, notification)
|
logger.debug("%s (%s) DJ %s", device, device.protocol, notification)
|
||||||
|
|
||||||
|
@ -164,7 +161,7 @@ def _process_dj_notification(device, notification):
|
||||||
logger.warning("%s: unrecognized DJ %s", device, notification)
|
logger.warning("%s: unrecognized DJ %s", device, notification)
|
||||||
|
|
||||||
|
|
||||||
def _process_hidpp10_custom_notification(device, notification):
|
def _process_hidpp10_custom_notification(device: Device, notification: HIDPPNotification):
|
||||||
if logger.isEnabledFor(logging.DEBUG):
|
if logger.isEnabledFor(logging.DEBUG):
|
||||||
logger.debug("%s (%s) custom notification %s", device, device.protocol, notification)
|
logger.debug("%s (%s) custom notification %s", device, device.protocol, notification)
|
||||||
|
|
||||||
|
@ -177,7 +174,7 @@ def _process_hidpp10_custom_notification(device, notification):
|
||||||
logger.warning("%s: unrecognized %s", device, notification)
|
logger.warning("%s: unrecognized %s", device, notification)
|
||||||
|
|
||||||
|
|
||||||
def _process_hidpp10_notification(device, notification):
|
def _process_hidpp10_notification(device: Device, notification: HIDPPNotification):
|
||||||
if notification.sub_id == Notification.CONNECT_DISCONNECT: # device unpairing
|
if notification.sub_id == Notification.CONNECT_DISCONNECT: # device unpairing
|
||||||
if notification.address == 0x02:
|
if notification.address == 0x02:
|
||||||
# device un-paired
|
# device un-paired
|
||||||
|
@ -242,7 +239,13 @@ def _process_hidpp10_notification(device, notification):
|
||||||
logger.warning("%s: unrecognized %s", device, notification)
|
logger.warning("%s: unrecognized %s", device, notification)
|
||||||
|
|
||||||
|
|
||||||
def _process_feature_notification(device, notification, feature):
|
def _process_feature_notification(device: Device, notification: HIDPPNotification):
|
||||||
|
try:
|
||||||
|
feature = device.features.get_feature(notification.sub_id)
|
||||||
|
except IndexError:
|
||||||
|
logger.warning("%s: notification from invalid feature index %02X: %s", device, notification.sub_id, notification)
|
||||||
|
return False
|
||||||
|
|
||||||
if logger.isEnabledFor(logging.DEBUG):
|
if logger.isEnabledFor(logging.DEBUG):
|
||||||
logger.debug(
|
logger.debug(
|
||||||
"%s: notification for feature %s, report %s, data %s",
|
"%s: notification for feature %s, report %s, data %s",
|
||||||
|
@ -419,15 +422,15 @@ def _process_feature_notification(device, notification, feature):
|
||||||
return True
|
return True
|
||||||
|
|
||||||
|
|
||||||
def handle_pairing_lock(receiver: Receiver, hidpp_notification: HIDPPNotification) -> bool:
|
def handle_pairing_lock(receiver: Receiver, notification: HIDPPNotification) -> bool:
|
||||||
receiver.pairing.lock_open = bool(hidpp_notification.address & 0x01)
|
receiver.pairing.lock_open = bool(notification.address & 0x01)
|
||||||
reason = _("pairing lock is open") if receiver.pairing.lock_open else _("pairing lock is closed")
|
reason = _("pairing lock is open") if receiver.pairing.lock_open else _("pairing lock is closed")
|
||||||
if logger.isEnabledFor(logging.INFO):
|
if logger.isEnabledFor(logging.INFO):
|
||||||
logger.info("%s: %s", receiver, reason)
|
logger.info("%s: %s", receiver, reason)
|
||||||
receiver.pairing.error = None
|
receiver.pairing.error = None
|
||||||
if receiver.pairing.lock_open:
|
if receiver.pairing.lock_open:
|
||||||
receiver.pairing.new_device = None
|
receiver.pairing.new_device = None
|
||||||
pair_error = ord(hidpp_notification.data[:1])
|
pair_error = ord(notification.data[:1])
|
||||||
if pair_error:
|
if pair_error:
|
||||||
receiver.pairing.error = error_string = hidpp10_constants.PairingError(pair_error)
|
receiver.pairing.error = error_string = hidpp10_constants.PairingError(pair_error)
|
||||||
receiver.pairing.new_device = None
|
receiver.pairing.new_device = None
|
||||||
|
@ -436,9 +439,9 @@ def handle_pairing_lock(receiver: Receiver, hidpp_notification: HIDPPNotificatio
|
||||||
return True
|
return True
|
||||||
|
|
||||||
|
|
||||||
def handle_discovery_status(receiver: Receiver, hidpp_notification: HIDPPNotification) -> bool:
|
def handle_discovery_status(receiver: Receiver, notification: HIDPPNotification) -> bool:
|
||||||
with notification_lock:
|
with notification_lock:
|
||||||
receiver.pairing.discovering = hidpp_notification.address == 0x00
|
receiver.pairing.discovering = notification.address == 0x00
|
||||||
reason = _("discovery lock is open") if receiver.pairing.discovering else _("discovery lock is closed")
|
reason = _("discovery lock is open") if receiver.pairing.discovering else _("discovery lock is closed")
|
||||||
if logger.isEnabledFor(logging.INFO):
|
if logger.isEnabledFor(logging.INFO):
|
||||||
logger.info("%s: %s", receiver, reason)
|
logger.info("%s: %s", receiver, reason)
|
||||||
|
@ -447,7 +450,7 @@ def handle_discovery_status(receiver: Receiver, hidpp_notification: HIDPPNotific
|
||||||
receiver.pairing.counter = receiver.pairing.device_address = None
|
receiver.pairing.counter = receiver.pairing.device_address = None
|
||||||
receiver.pairing.device_authentication = receiver.pairing.device_name = None
|
receiver.pairing.device_authentication = receiver.pairing.device_name = None
|
||||||
receiver.pairing.device_passkey = None
|
receiver.pairing.device_passkey = None
|
||||||
discover_error = ord(hidpp_notification.data[:1])
|
discover_error = ord(notification.data[:1])
|
||||||
if discover_error:
|
if discover_error:
|
||||||
receiver.pairing.error = discover_string = hidpp10_constants.BoltPairingError(discover_error)
|
receiver.pairing.error = discover_string = hidpp10_constants.BoltPairingError(discover_error)
|
||||||
logger.warning("bolt discovering error %d: %s", discover_error, discover_string)
|
logger.warning("bolt discovering error %d: %s", discover_error, discover_string)
|
||||||
|
@ -455,27 +458,27 @@ def handle_discovery_status(receiver: Receiver, hidpp_notification: HIDPPNotific
|
||||||
return True
|
return True
|
||||||
|
|
||||||
|
|
||||||
def handle_device_discovery(receiver: Receiver, hidpp_notification: HIDPPNotification) -> bool:
|
def handle_device_discovery(receiver: Receiver, notification: HIDPPNotification) -> bool:
|
||||||
with notification_lock:
|
with notification_lock:
|
||||||
counter = hidpp_notification.address + hidpp_notification.data[0] * 256 # notification counter
|
counter = notification.address + notification.data[0] * 256 # notification counter
|
||||||
if receiver.pairing.counter is None:
|
if receiver.pairing.counter is None:
|
||||||
receiver.pairing.counter = counter
|
receiver.pairing.counter = counter
|
||||||
else:
|
else:
|
||||||
if not receiver.pairing.counter == counter:
|
if not receiver.pairing.counter == counter:
|
||||||
return None
|
return None
|
||||||
if hidpp_notification.data[1] == 0:
|
if notification.data[1] == 0:
|
||||||
receiver.pairing.device_kind = hidpp_notification.data[3]
|
receiver.pairing.device_kind = notification.data[3]
|
||||||
receiver.pairing.device_address = hidpp_notification.data[6:12]
|
receiver.pairing.device_address = notification.data[6:12]
|
||||||
receiver.pairing.device_authentication = hidpp_notification.data[14]
|
receiver.pairing.device_authentication = notification.data[14]
|
||||||
elif hidpp_notification.data[1] == 1:
|
elif notification.data[1] == 1:
|
||||||
receiver.pairing.device_name = hidpp_notification.data[3 : 3 + hidpp_notification.data[2]].decode("utf-8")
|
receiver.pairing.device_name = notification.data[3 : 3 + notification.data[2]].decode("utf-8")
|
||||||
return True
|
return True
|
||||||
|
|
||||||
|
|
||||||
def handle_pairing_status(receiver: Receiver, hidpp_notification: HIDPPNotification) -> bool:
|
def handle_pairing_status(receiver: Receiver, notification: HIDPPNotification) -> bool:
|
||||||
with notification_lock:
|
with notification_lock:
|
||||||
receiver.pairing.device_passkey = None
|
receiver.pairing.device_passkey = None
|
||||||
receiver.pairing.lock_open = hidpp_notification.address == 0x00
|
receiver.pairing.lock_open = notification.address == 0x00
|
||||||
reason = _("pairing lock is open") if receiver.pairing.lock_open else _("pairing lock is closed")
|
reason = _("pairing lock is open") if receiver.pairing.lock_open else _("pairing lock is closed")
|
||||||
if logger.isEnabledFor(logging.INFO):
|
if logger.isEnabledFor(logging.INFO):
|
||||||
logger.info("%s: %s", receiver, reason)
|
logger.info("%s: %s", receiver, reason)
|
||||||
|
@ -485,11 +488,11 @@ def handle_pairing_status(receiver: Receiver, hidpp_notification: HIDPPNotificat
|
||||||
receiver.pairing.device_address = None
|
receiver.pairing.device_address = None
|
||||||
receiver.pairing.device_authentication = None
|
receiver.pairing.device_authentication = None
|
||||||
receiver.pairing.device_name = None
|
receiver.pairing.device_name = None
|
||||||
pair_error = hidpp_notification.data[0]
|
pair_error = notification.data[0]
|
||||||
if receiver.pairing.lock_open:
|
if receiver.pairing.lock_open:
|
||||||
receiver.pairing.new_device = None
|
receiver.pairing.new_device = None
|
||||||
elif hidpp_notification.address == 0x02 and not pair_error:
|
elif notification.address == 0x02 and not pair_error:
|
||||||
receiver.pairing.new_device = receiver.register_new_device(hidpp_notification.data[7])
|
receiver.pairing.new_device = receiver.register_new_device(notification.data[7])
|
||||||
if pair_error:
|
if pair_error:
|
||||||
receiver.pairing.error = error_string = hidpp10_constants.BoltPairingError(pair_error)
|
receiver.pairing.error = error_string = hidpp10_constants.BoltPairingError(pair_error)
|
||||||
receiver.pairing.new_device = None
|
receiver.pairing.new_device = None
|
||||||
|
@ -498,9 +501,9 @@ def handle_pairing_status(receiver: Receiver, hidpp_notification: HIDPPNotificat
|
||||||
return True
|
return True
|
||||||
|
|
||||||
|
|
||||||
def handle_passkey_request(receiver: Receiver, hidpp_notification: HIDPPNotification) -> bool:
|
def handle_passkey_request(receiver: Receiver, notification: HIDPPNotification) -> bool:
|
||||||
with notification_lock:
|
with notification_lock:
|
||||||
receiver.pairing.device_passkey = hidpp_notification.data[0:6].decode("utf-8")
|
receiver.pairing.device_passkey = notification.data[0:6].decode("utf-8")
|
||||||
return True
|
return True
|
||||||
|
|
||||||
|
|
||||||
|
|
|
@ -274,7 +274,7 @@ def test_process_feature_notification(mocker, hidpp_notification, feature):
|
||||||
fake_device = fake_hidpp.Device()
|
fake_device = fake_hidpp.Device()
|
||||||
fake_device.receiver = ["rec1", "rec2"]
|
fake_device.receiver = ["rec1", "rec2"]
|
||||||
|
|
||||||
result = notifications._process_feature_notification(fake_device, hidpp_notification, feature)
|
result = notifications._process_feature_notification(fake_device, hidpp_notification)
|
||||||
|
|
||||||
assert result is True
|
assert result is True
|
||||||
|
|
||||||
|
@ -289,6 +289,23 @@ def test_process_receiver_notification_invalid(mocker):
|
||||||
notifications.process_receiver_notification(mock_receiver, notification)
|
notifications.process_receiver_notification(mock_receiver, notification)
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.mark.parametrize(
|
||||||
|
"sub_id, notification_data, expected",
|
||||||
|
[
|
||||||
|
(Notification.NO_OPERATION, b"\x00", False),
|
||||||
|
],
|
||||||
|
)
|
||||||
|
def test_process_device_notification_extended(mocker, sub_id, notification_data, expected):
|
||||||
|
device = mocker.Mock()
|
||||||
|
device.handle_notification.return_value = None
|
||||||
|
device.protocol = 2.0
|
||||||
|
notification = HIDPPNotification(0, 0, sub_id, 0, notification_data)
|
||||||
|
|
||||||
|
result = notifications.process_device_notification(device, notification)
|
||||||
|
|
||||||
|
assert result == expected
|
||||||
|
|
||||||
|
|
||||||
def test_handle_device_discovery():
|
def test_handle_device_discovery():
|
||||||
receiver: Receiver = Receiver(MockLowLevelInterface(), None, {}, True, None, None)
|
receiver: Receiver = Receiver(MockLowLevelInterface(), None, {}, True, None, None)
|
||||||
sub_id = Registers.DISCOVERY_STATUS_NOTIFICATION
|
sub_id = Registers.DISCOVERY_STATUS_NOTIFICATION
|
||||||
|
|
Loading…
Reference in New Issue