diff --git a/lib/logitech_receiver/notifications.py b/lib/logitech_receiver/notifications.py index a034f797..f9835340 100644 --- a/lib/logitech_receiver/notifications.py +++ b/lib/logitech_receiver/notifications.py @@ -56,18 +56,18 @@ _F = hidpp20_constants.SupportedFeature notification_lock = threading.Lock() -def process(device, notification): +def process(device, notification: HIDPPNotification): assert device assert notification if not device.isDevice: - return _process_receiver_notification(device, notification) - return _process_device_notification(device, notification) + return process_receiver_notification(device, notification) + return process_device_notification(device, notification) -def _process_receiver_notification(receiver: Receiver, hidpp_notification: HIDPPNotification) -> bool | None: +def process_receiver_notification(receiver: Receiver, notification: HIDPPNotification) -> bool | None: # supposedly only 0x4x notifications arrive for the receiver - assert hidpp_notification.sub_id in [ + assert notification.sub_id in [ Notification.CONNECT_DISCONNECT, Notification.DJ_PAIRING, Notification.CONNECTED, @@ -81,15 +81,15 @@ def _process_receiver_notification(receiver: Receiver, hidpp_notification: HIDPP Registers.PASSKEY_REQUEST_NOTIFICATION, ] - if hidpp_notification.sub_id == Notification.PAIRING_LOCK: - receiver.pairing.lock_open = bool(hidpp_notification.address & 0x01) + if notification.sub_id == Notification.PAIRING_LOCK: + receiver.pairing.lock_open = bool(notification.address & 0x01) reason = _("pairing lock is open") if receiver.pairing.lock_open else _("pairing lock is closed") if logger.isEnabledFor(logging.INFO): logger.info("%s: %s", receiver, reason) receiver.pairing.error = None if receiver.pairing.lock_open: receiver.pairing.new_device = None - pair_error = ord(hidpp_notification.data[:1]) + pair_error = ord(notification.data[:1]) if pair_error: receiver.pairing.error = error_string = hidpp10_constants.PairingError(pair_error) receiver.pairing.new_device = None @@ -97,9 +97,9 @@ def _process_receiver_notification(receiver: Receiver, hidpp_notification: HIDPP receiver.changed(reason=reason) return True - elif hidpp_notification.sub_id == Registers.DISCOVERY_STATUS_NOTIFICATION: # Bolt pairing + elif notification.sub_id == Registers.DISCOVERY_STATUS_NOTIFICATION: # Bolt pairing with notification_lock: - receiver.pairing.discovering = hidpp_notification.address == 0x00 + receiver.pairing.discovering = notification.address == 0x00 reason = _("discovery lock is open") if receiver.pairing.discovering else _("discovery lock is closed") if logger.isEnabledFor(logging.INFO): logger.info("%s: %s", receiver, reason) @@ -108,33 +108,33 @@ def _process_receiver_notification(receiver: Receiver, hidpp_notification: HIDPP receiver.pairing.counter = receiver.pairing.device_address = None receiver.pairing.device_authentication = receiver.pairing.device_name = None receiver.pairing.device_passkey = None - discover_error = ord(hidpp_notification.data[:1]) + discover_error = ord(notification.data[:1]) if discover_error: receiver.pairing.error = discover_string = hidpp10_constants.BoltPairingError(discover_error) logger.warning("bolt discovering error %d: %s", discover_error, discover_string) receiver.changed(reason=reason) return True - elif hidpp_notification.sub_id == Registers.DEVICE_DISCOVERY_NOTIFICATION: # Bolt pairing + elif notification.sub_id == Registers.DEVICE_DISCOVERY_NOTIFICATION: # Bolt pairing with notification_lock: - counter = hidpp_notification.address + hidpp_notification.data[0] * 256 # notification counter + counter = notification.address + notification.data[0] * 256 # notification counter if receiver.pairing.counter is None: receiver.pairing.counter = counter else: if not receiver.pairing.counter == counter: return None - if hidpp_notification.data[1] == 0: - receiver.pairing.device_kind = hidpp_notification.data[3] - receiver.pairing.device_address = hidpp_notification.data[6:12] - receiver.pairing.device_authentication = hidpp_notification.data[14] - elif hidpp_notification.data[1] == 1: - receiver.pairing.device_name = hidpp_notification.data[3 : 3 + hidpp_notification.data[2]].decode("utf-8") + if notification.data[1] == 0: + receiver.pairing.device_kind = notification.data[3] + receiver.pairing.device_address = notification.data[6:12] + receiver.pairing.device_authentication = notification.data[14] + elif notification.data[1] == 1: + receiver.pairing.device_name = notification.data[3 : 3 + notification.data[2]].decode("utf-8") return True - elif hidpp_notification.sub_id == Registers.PAIRING_STATUS_NOTIFICATION: # Bolt pairing + elif notification.sub_id == Registers.PAIRING_STATUS_NOTIFICATION: # Bolt pairing with notification_lock: receiver.pairing.device_passkey = None - receiver.pairing.lock_open = hidpp_notification.address == 0x00 + receiver.pairing.lock_open = notification.address == 0x00 reason = _("pairing lock is open") if receiver.pairing.lock_open else _("pairing lock is closed") if logger.isEnabledFor(logging.INFO): logger.info("%s: %s", receiver, reason) @@ -144,11 +144,11 @@ def _process_receiver_notification(receiver: Receiver, hidpp_notification: HIDPP receiver.pairing.device_address = None receiver.pairing.device_authentication = None receiver.pairing.device_name = None - pair_error = hidpp_notification.data[0] + pair_error = notification.data[0] if receiver.pairing.lock_open: receiver.pairing.new_device = None - elif hidpp_notification.address == 0x02 and not pair_error: - receiver.pairing.new_device = receiver.register_new_device(hidpp_notification.data[7]) + elif notification.address == 0x02 and not pair_error: + receiver.pairing.new_device = receiver.register_new_device(notification.data[7]) if pair_error: receiver.pairing.error = error_string = hidpp10_constants.BoltPairingError(pair_error) receiver.pairing.new_device = None @@ -156,18 +156,18 @@ def _process_receiver_notification(receiver: Receiver, hidpp_notification: HIDPP receiver.changed(reason=reason) return True - elif hidpp_notification.sub_id == Registers.PASSKEY_REQUEST_NOTIFICATION: # Bolt pairing + elif notification.sub_id == Registers.PASSKEY_REQUEST_NOTIFICATION: # Bolt pairing with notification_lock: - receiver.pairing.device_passkey = hidpp_notification.data[0:6].decode("utf-8") + receiver.pairing.device_passkey = notification.data[0:6].decode("utf-8") return True - elif hidpp_notification.sub_id == Registers.PASSKEY_PRESSED_NOTIFICATION: # Bolt pairing + elif notification.sub_id == Registers.PASSKEY_PRESSED_NOTIFICATION: # Bolt pairing return True - logger.warning("%s: unhandled notification %s", receiver, hidpp_notification) + logger.warning("%s: unhandled notification %s", receiver, notification) -def _process_device_notification(device, notification): +def process_device_notification(device, notification): # incoming packets with SubId >= 0x80 are supposedly replies from HID++ 1.0 requests, should never get here assert notification.sub_id & 0x80 == 0 diff --git a/tests/logitech_receiver/test_notifications.py b/tests/logitech_receiver/test_notifications.py index fd651478..fd9f578e 100644 --- a/tests/logitech_receiver/test_notifications.py +++ b/tests/logitech_receiver/test_notifications.py @@ -55,7 +55,7 @@ def test_process_receiver_notification(sub_id, notification_data, expected_error receiver: Receiver = Receiver(MockLowLevelInterface(), None, {}, True, None, None) notification = HIDPPNotification(0, 0, sub_id, 0x02, notification_data) - result = notifications._process_receiver_notification(receiver, notification) + result = notifications.process_receiver_notification(receiver, notification) assert result assert receiver.pairing.error == expected_error @@ -73,7 +73,7 @@ def test_process_receiver_notification(sub_id, notification_data, expected_error def test_process_device_notification(hidpp_notification, expected): device = fake_hidpp.Device() - result = notifications._process_device_notification(device, hidpp_notification) + result = notifications.process_device_notification(device, hidpp_notification) assert result == expected