notification: Refactor receiver event handling

Split processing of receiver notification into smaller functions.
Extract handler functions for every receiver notification for simple
maintenence and testability.

Related #2273
This commit is contained in:
MattHag 2024-12-30 22:47:26 +01:00 committed by Peter F. Patel-Schneider
parent 33c057feff
commit fa3a9bc5b3
1 changed files with 109 additions and 95 deletions

View File

@ -43,9 +43,9 @@ from .hidpp10_constants import Registers
if typing.TYPE_CHECKING: if typing.TYPE_CHECKING:
from .base import HIDPPNotification from .base import HIDPPNotification
from .device import Device
from .receiver import Receiver from .receiver import Receiver
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
_hidpp10 = hidpp10.Hidpp10() _hidpp10 = hidpp10.Hidpp10()
@ -56,7 +56,8 @@ _F = hidpp20_constants.SupportedFeature
notification_lock = threading.Lock() notification_lock = threading.Lock()
def process(device, notification: HIDPPNotification): def process(device: Device | Receiver, notification: HIDPPNotification):
"""Handle incoming events (notification) from device or receiver."""
assert device assert device
assert notification assert notification
@ -65,106 +66,30 @@ def process(device, notification: HIDPPNotification):
return process_device_notification(device, notification) return process_device_notification(device, notification)
def process_receiver_notification(receiver: Receiver, notification: HIDPPNotification) -> bool | None: def process_receiver_notification(receiver: Receiver, hidpp_notification: HIDPPNotification) -> bool | None:
# supposedly only 0x4x notifications arrive for the receiver """Process event messages from receivers."""
assert notification.sub_id in [ if hidpp_notification.sub_id == Notification.PAIRING_LOCK:
return handle_pairing_lock(receiver, hidpp_notification)
elif hidpp_notification.sub_id == Registers.DISCOVERY_STATUS_NOTIFICATION: # Bolt pairing
return handle_discovery_status(receiver, hidpp_notification)
elif hidpp_notification.sub_id == Registers.DEVICE_DISCOVERY_NOTIFICATION: # Bolt pairing
return handle_device_discovery(receiver, hidpp_notification)
elif hidpp_notification.sub_id == Registers.PAIRING_STATUS_NOTIFICATION: # Bolt pairing
return handle_pairing_status(receiver, hidpp_notification)
elif hidpp_notification.sub_id == Registers.PASSKEY_REQUEST_NOTIFICATION: # Bolt pairing
return handle_passkey_request(receiver, hidpp_notification)
elif hidpp_notification.sub_id == Registers.PASSKEY_PRESSED_NOTIFICATION: # Bolt pairing
return handle_passkey_pressed(receiver, hidpp_notification)
assert hidpp_notification.sub_id in [
Notification.CONNECT_DISCONNECT, Notification.CONNECT_DISCONNECT,
Notification.DJ_PAIRING, Notification.DJ_PAIRING,
Notification.CONNECTED, Notification.CONNECTED,
Notification.RAW_INPUT, Notification.RAW_INPUT,
Notification.PAIRING_LOCK,
Notification.POWER, Notification.POWER,
Registers.DEVICE_DISCOVERY_NOTIFICATION,
Registers.DISCOVERY_STATUS_NOTIFICATION,
Registers.PAIRING_STATUS_NOTIFICATION,
Registers.PASSKEY_PRESSED_NOTIFICATION,
Registers.PASSKEY_REQUEST_NOTIFICATION,
] ]
if notification.sub_id == Notification.PAIRING_LOCK: logger.warning(f"{receiver}: unhandled notification {hidpp_notification}")
receiver.pairing.lock_open = bool(notification.address & 0x01)
reason = _("pairing lock is open") if receiver.pairing.lock_open else _("pairing lock is closed")
if logger.isEnabledFor(logging.INFO):
logger.info("%s: %s", receiver, reason)
receiver.pairing.error = None
if receiver.pairing.lock_open:
receiver.pairing.new_device = None
pair_error = ord(notification.data[:1])
if pair_error:
receiver.pairing.error = error_string = hidpp10_constants.PairingError(pair_error)
receiver.pairing.new_device = None
logger.warning("pairing error %d: %s", pair_error, error_string)
receiver.changed(reason=reason)
return True
elif notification.sub_id == Registers.DISCOVERY_STATUS_NOTIFICATION: # Bolt pairing
with notification_lock:
receiver.pairing.discovering = notification.address == 0x00
reason = _("discovery lock is open") if receiver.pairing.discovering else _("discovery lock is closed")
if logger.isEnabledFor(logging.INFO):
logger.info("%s: %s", receiver, reason)
receiver.pairing.error = None
if receiver.pairing.discovering:
receiver.pairing.counter = receiver.pairing.device_address = None
receiver.pairing.device_authentication = receiver.pairing.device_name = None
receiver.pairing.device_passkey = None
discover_error = ord(notification.data[:1])
if discover_error:
receiver.pairing.error = discover_string = hidpp10_constants.BoltPairingError(discover_error)
logger.warning("bolt discovering error %d: %s", discover_error, discover_string)
receiver.changed(reason=reason)
return True
elif notification.sub_id == Registers.DEVICE_DISCOVERY_NOTIFICATION: # Bolt pairing
with notification_lock:
counter = notification.address + notification.data[0] * 256 # notification counter
if receiver.pairing.counter is None:
receiver.pairing.counter = counter
else:
if not receiver.pairing.counter == counter:
return None
if notification.data[1] == 0:
receiver.pairing.device_kind = notification.data[3]
receiver.pairing.device_address = notification.data[6:12]
receiver.pairing.device_authentication = notification.data[14]
elif notification.data[1] == 1:
receiver.pairing.device_name = notification.data[3 : 3 + notification.data[2]].decode("utf-8")
return True
elif notification.sub_id == Registers.PAIRING_STATUS_NOTIFICATION: # Bolt pairing
with notification_lock:
receiver.pairing.device_passkey = None
receiver.pairing.lock_open = notification.address == 0x00
reason = _("pairing lock is open") if receiver.pairing.lock_open else _("pairing lock is closed")
if logger.isEnabledFor(logging.INFO):
logger.info("%s: %s", receiver, reason)
receiver.pairing.error = None
if not receiver.pairing.lock_open:
receiver.pairing.counter = None
receiver.pairing.device_address = None
receiver.pairing.device_authentication = None
receiver.pairing.device_name = None
pair_error = notification.data[0]
if receiver.pairing.lock_open:
receiver.pairing.new_device = None
elif notification.address == 0x02 and not pair_error:
receiver.pairing.new_device = receiver.register_new_device(notification.data[7])
if pair_error:
receiver.pairing.error = error_string = hidpp10_constants.BoltPairingError(pair_error)
receiver.pairing.new_device = None
logger.warning("pairing error %d: %s", pair_error, error_string)
receiver.changed(reason=reason)
return True
elif notification.sub_id == Registers.PASSKEY_REQUEST_NOTIFICATION: # Bolt pairing
with notification_lock:
receiver.pairing.device_passkey = notification.data[0:6].decode("utf-8")
return True
elif notification.sub_id == Registers.PASSKEY_PRESSED_NOTIFICATION: # Bolt pairing
return True
logger.warning("%s: unhandled notification %s", receiver, notification)
def process_device_notification(device, notification): def process_device_notification(device, notification):
@ -488,3 +413,92 @@ def _process_feature_notification(device, notification, feature):
diversion.process_notification(device, notification, feature) diversion.process_notification(device, notification, feature)
return True return True
def handle_pairing_lock(receiver: Receiver, hidpp_notification: HIDPPNotification) -> bool:
receiver.pairing.lock_open = bool(hidpp_notification.address & 0x01)
reason = _("pairing lock is open") if receiver.pairing.lock_open else _("pairing lock is closed")
if logger.isEnabledFor(logging.INFO):
logger.info("%s: %s", receiver, reason)
receiver.pairing.error = None
if receiver.pairing.lock_open:
receiver.pairing.new_device = None
pair_error = ord(hidpp_notification.data[:1])
if pair_error:
receiver.pairing.error = error_string = hidpp10_constants.PairingError(pair_error)
receiver.pairing.new_device = None
logger.warning("pairing error %d: %s", pair_error, error_string)
receiver.changed(reason=reason)
return True
def handle_discovery_status(receiver: Receiver, hidpp_notification: HIDPPNotification) -> bool:
with notification_lock:
receiver.pairing.discovering = hidpp_notification.address == 0x00
reason = _("discovery lock is open") if receiver.pairing.discovering else _("discovery lock is closed")
if logger.isEnabledFor(logging.INFO):
logger.info("%s: %s", receiver, reason)
receiver.pairing.error = None
if receiver.pairing.discovering:
receiver.pairing.counter = receiver.pairing.device_address = None
receiver.pairing.device_authentication = receiver.pairing.device_name = None
receiver.pairing.device_passkey = None
discover_error = ord(hidpp_notification.data[:1])
if discover_error:
receiver.pairing.error = discover_string = hidpp10_constants.BoltPairingError(discover_error)
logger.warning("bolt discovering error %d: %s", discover_error, discover_string)
receiver.changed(reason=reason)
return True
def handle_device_discovery(receiver: Receiver, hidpp_notification: HIDPPNotification) -> bool:
with notification_lock:
counter = hidpp_notification.address + hidpp_notification.data[0] * 256 # notification counter
if receiver.pairing.counter is None:
receiver.pairing.counter = counter
else:
if not receiver.pairing.counter == counter:
return None
if hidpp_notification.data[1] == 0:
receiver.pairing.device_kind = hidpp_notification.data[3]
receiver.pairing.device_address = hidpp_notification.data[6:12]
receiver.pairing.device_authentication = hidpp_notification.data[14]
elif hidpp_notification.data[1] == 1:
receiver.pairing.device_name = hidpp_notification.data[3 : 3 + hidpp_notification.data[2]].decode("utf-8")
return True
def handle_pairing_status(receiver: Receiver, hidpp_notification: HIDPPNotification) -> bool:
with notification_lock:
receiver.pairing.device_passkey = None
receiver.pairing.lock_open = hidpp_notification.address == 0x00
reason = _("pairing lock is open") if receiver.pairing.lock_open else _("pairing lock is closed")
if logger.isEnabledFor(logging.INFO):
logger.info("%s: %s", receiver, reason)
receiver.pairing.error = None
if not receiver.pairing.lock_open:
receiver.pairing.counter = None
receiver.pairing.device_address = None
receiver.pairing.device_authentication = None
receiver.pairing.device_name = None
pair_error = hidpp_notification.data[0]
if receiver.pairing.lock_open:
receiver.pairing.new_device = None
elif hidpp_notification.address == 0x02 and not pair_error:
receiver.pairing.new_device = receiver.register_new_device(hidpp_notification.data[7])
if pair_error:
receiver.pairing.error = error_string = hidpp10_constants.BoltPairingError(pair_error)
receiver.pairing.new_device = None
logger.warning("pairing error %d: %s", pair_error, error_string)
receiver.changed(reason=reason)
return True
def handle_passkey_request(receiver: Receiver, hidpp_notification: HIDPPNotification) -> bool:
with notification_lock:
receiver.pairing.device_passkey = hidpp_notification.data[0:6].decode("utf-8")
return True
def handle_passkey_pressed(_receiver: Receiver, _hidpp_notification: HIDPPNotification) -> bool:
return True