From fa3a9bc5b34824591976e840cd8de333d944279a Mon Sep 17 00:00:00 2001 From: MattHag <16444067+MattHag@users.noreply.github.com> Date: Mon, 30 Dec 2024 22:47:26 +0100 Subject: [PATCH] notification: Refactor receiver event handling Split processing of receiver notification into smaller functions. Extract handler functions for every receiver notification for simple maintenence and testability. Related #2273 --- lib/logitech_receiver/notifications.py | 204 +++++++++++++------------ 1 file changed, 109 insertions(+), 95 deletions(-) diff --git a/lib/logitech_receiver/notifications.py b/lib/logitech_receiver/notifications.py index f9835340..59ea1d1d 100644 --- a/lib/logitech_receiver/notifications.py +++ b/lib/logitech_receiver/notifications.py @@ -43,9 +43,9 @@ from .hidpp10_constants import Registers if typing.TYPE_CHECKING: from .base import HIDPPNotification + from .device import Device from .receiver import Receiver - logger = logging.getLogger(__name__) _hidpp10 = hidpp10.Hidpp10() @@ -56,7 +56,8 @@ _F = hidpp20_constants.SupportedFeature notification_lock = threading.Lock() -def process(device, notification: HIDPPNotification): +def process(device: Device | Receiver, notification: HIDPPNotification): + """Handle incoming events (notification) from device or receiver.""" assert device assert notification @@ -65,106 +66,30 @@ def process(device, notification: HIDPPNotification): return process_device_notification(device, notification) -def process_receiver_notification(receiver: Receiver, notification: HIDPPNotification) -> bool | None: - # supposedly only 0x4x notifications arrive for the receiver - assert notification.sub_id in [ +def process_receiver_notification(receiver: Receiver, hidpp_notification: HIDPPNotification) -> bool | None: + """Process event messages from receivers.""" + if hidpp_notification.sub_id == Notification.PAIRING_LOCK: + return handle_pairing_lock(receiver, hidpp_notification) + elif hidpp_notification.sub_id == Registers.DISCOVERY_STATUS_NOTIFICATION: # Bolt pairing + return handle_discovery_status(receiver, hidpp_notification) + elif hidpp_notification.sub_id == Registers.DEVICE_DISCOVERY_NOTIFICATION: # Bolt pairing + return handle_device_discovery(receiver, hidpp_notification) + elif hidpp_notification.sub_id == Registers.PAIRING_STATUS_NOTIFICATION: # Bolt pairing + return handle_pairing_status(receiver, hidpp_notification) + elif hidpp_notification.sub_id == Registers.PASSKEY_REQUEST_NOTIFICATION: # Bolt pairing + return handle_passkey_request(receiver, hidpp_notification) + elif hidpp_notification.sub_id == Registers.PASSKEY_PRESSED_NOTIFICATION: # Bolt pairing + return handle_passkey_pressed(receiver, hidpp_notification) + + assert hidpp_notification.sub_id in [ Notification.CONNECT_DISCONNECT, Notification.DJ_PAIRING, Notification.CONNECTED, Notification.RAW_INPUT, - Notification.PAIRING_LOCK, Notification.POWER, - Registers.DEVICE_DISCOVERY_NOTIFICATION, - Registers.DISCOVERY_STATUS_NOTIFICATION, - Registers.PAIRING_STATUS_NOTIFICATION, - Registers.PASSKEY_PRESSED_NOTIFICATION, - Registers.PASSKEY_REQUEST_NOTIFICATION, ] - if notification.sub_id == Notification.PAIRING_LOCK: - receiver.pairing.lock_open = bool(notification.address & 0x01) - reason = _("pairing lock is open") if receiver.pairing.lock_open else _("pairing lock is closed") - if logger.isEnabledFor(logging.INFO): - logger.info("%s: %s", receiver, reason) - receiver.pairing.error = None - if receiver.pairing.lock_open: - receiver.pairing.new_device = None - pair_error = ord(notification.data[:1]) - if pair_error: - receiver.pairing.error = error_string = hidpp10_constants.PairingError(pair_error) - receiver.pairing.new_device = None - logger.warning("pairing error %d: %s", pair_error, error_string) - receiver.changed(reason=reason) - return True - - elif notification.sub_id == Registers.DISCOVERY_STATUS_NOTIFICATION: # Bolt pairing - with notification_lock: - receiver.pairing.discovering = notification.address == 0x00 - reason = _("discovery lock is open") if receiver.pairing.discovering else _("discovery lock is closed") - if logger.isEnabledFor(logging.INFO): - logger.info("%s: %s", receiver, reason) - receiver.pairing.error = None - if receiver.pairing.discovering: - receiver.pairing.counter = receiver.pairing.device_address = None - receiver.pairing.device_authentication = receiver.pairing.device_name = None - receiver.pairing.device_passkey = None - discover_error = ord(notification.data[:1]) - if discover_error: - receiver.pairing.error = discover_string = hidpp10_constants.BoltPairingError(discover_error) - logger.warning("bolt discovering error %d: %s", discover_error, discover_string) - receiver.changed(reason=reason) - return True - - elif notification.sub_id == Registers.DEVICE_DISCOVERY_NOTIFICATION: # Bolt pairing - with notification_lock: - counter = notification.address + notification.data[0] * 256 # notification counter - if receiver.pairing.counter is None: - receiver.pairing.counter = counter - else: - if not receiver.pairing.counter == counter: - return None - if notification.data[1] == 0: - receiver.pairing.device_kind = notification.data[3] - receiver.pairing.device_address = notification.data[6:12] - receiver.pairing.device_authentication = notification.data[14] - elif notification.data[1] == 1: - receiver.pairing.device_name = notification.data[3 : 3 + notification.data[2]].decode("utf-8") - return True - - elif notification.sub_id == Registers.PAIRING_STATUS_NOTIFICATION: # Bolt pairing - with notification_lock: - receiver.pairing.device_passkey = None - receiver.pairing.lock_open = notification.address == 0x00 - reason = _("pairing lock is open") if receiver.pairing.lock_open else _("pairing lock is closed") - if logger.isEnabledFor(logging.INFO): - logger.info("%s: %s", receiver, reason) - receiver.pairing.error = None - if not receiver.pairing.lock_open: - receiver.pairing.counter = None - receiver.pairing.device_address = None - receiver.pairing.device_authentication = None - receiver.pairing.device_name = None - pair_error = notification.data[0] - if receiver.pairing.lock_open: - receiver.pairing.new_device = None - elif notification.address == 0x02 and not pair_error: - receiver.pairing.new_device = receiver.register_new_device(notification.data[7]) - if pair_error: - receiver.pairing.error = error_string = hidpp10_constants.BoltPairingError(pair_error) - receiver.pairing.new_device = None - logger.warning("pairing error %d: %s", pair_error, error_string) - receiver.changed(reason=reason) - return True - - elif notification.sub_id == Registers.PASSKEY_REQUEST_NOTIFICATION: # Bolt pairing - with notification_lock: - receiver.pairing.device_passkey = notification.data[0:6].decode("utf-8") - return True - - elif notification.sub_id == Registers.PASSKEY_PRESSED_NOTIFICATION: # Bolt pairing - return True - - logger.warning("%s: unhandled notification %s", receiver, notification) + logger.warning(f"{receiver}: unhandled notification {hidpp_notification}") def process_device_notification(device, notification): @@ -488,3 +413,92 @@ def _process_feature_notification(device, notification, feature): diversion.process_notification(device, notification, feature) return True + + +def handle_pairing_lock(receiver: Receiver, hidpp_notification: HIDPPNotification) -> bool: + receiver.pairing.lock_open = bool(hidpp_notification.address & 0x01) + reason = _("pairing lock is open") if receiver.pairing.lock_open else _("pairing lock is closed") + if logger.isEnabledFor(logging.INFO): + logger.info("%s: %s", receiver, reason) + receiver.pairing.error = None + if receiver.pairing.lock_open: + receiver.pairing.new_device = None + pair_error = ord(hidpp_notification.data[:1]) + if pair_error: + receiver.pairing.error = error_string = hidpp10_constants.PairingError(pair_error) + receiver.pairing.new_device = None + logger.warning("pairing error %d: %s", pair_error, error_string) + receiver.changed(reason=reason) + return True + + +def handle_discovery_status(receiver: Receiver, hidpp_notification: HIDPPNotification) -> bool: + with notification_lock: + receiver.pairing.discovering = hidpp_notification.address == 0x00 + reason = _("discovery lock is open") if receiver.pairing.discovering else _("discovery lock is closed") + if logger.isEnabledFor(logging.INFO): + logger.info("%s: %s", receiver, reason) + receiver.pairing.error = None + if receiver.pairing.discovering: + receiver.pairing.counter = receiver.pairing.device_address = None + receiver.pairing.device_authentication = receiver.pairing.device_name = None + receiver.pairing.device_passkey = None + discover_error = ord(hidpp_notification.data[:1]) + if discover_error: + receiver.pairing.error = discover_string = hidpp10_constants.BoltPairingError(discover_error) + logger.warning("bolt discovering error %d: %s", discover_error, discover_string) + receiver.changed(reason=reason) + return True + + +def handle_device_discovery(receiver: Receiver, hidpp_notification: HIDPPNotification) -> bool: + with notification_lock: + counter = hidpp_notification.address + hidpp_notification.data[0] * 256 # notification counter + if receiver.pairing.counter is None: + receiver.pairing.counter = counter + else: + if not receiver.pairing.counter == counter: + return None + if hidpp_notification.data[1] == 0: + receiver.pairing.device_kind = hidpp_notification.data[3] + receiver.pairing.device_address = hidpp_notification.data[6:12] + receiver.pairing.device_authentication = hidpp_notification.data[14] + elif hidpp_notification.data[1] == 1: + receiver.pairing.device_name = hidpp_notification.data[3 : 3 + hidpp_notification.data[2]].decode("utf-8") + return True + + +def handle_pairing_status(receiver: Receiver, hidpp_notification: HIDPPNotification) -> bool: + with notification_lock: + receiver.pairing.device_passkey = None + receiver.pairing.lock_open = hidpp_notification.address == 0x00 + reason = _("pairing lock is open") if receiver.pairing.lock_open else _("pairing lock is closed") + if logger.isEnabledFor(logging.INFO): + logger.info("%s: %s", receiver, reason) + receiver.pairing.error = None + if not receiver.pairing.lock_open: + receiver.pairing.counter = None + receiver.pairing.device_address = None + receiver.pairing.device_authentication = None + receiver.pairing.device_name = None + pair_error = hidpp_notification.data[0] + if receiver.pairing.lock_open: + receiver.pairing.new_device = None + elif hidpp_notification.address == 0x02 and not pair_error: + receiver.pairing.new_device = receiver.register_new_device(hidpp_notification.data[7]) + if pair_error: + receiver.pairing.error = error_string = hidpp10_constants.BoltPairingError(pair_error) + receiver.pairing.new_device = None + logger.warning("pairing error %d: %s", pair_error, error_string) + receiver.changed(reason=reason) + return True + + +def handle_passkey_request(receiver: Receiver, hidpp_notification: HIDPPNotification) -> bool: + with notification_lock: + receiver.pairing.device_passkey = hidpp_notification.data[0:6].decode("utf-8") + return True + + +def handle_passkey_pressed(_receiver: Receiver, _hidpp_notification: HIDPPNotification) -> bool: + return True