receiver: move pairing status to new dataclass attached to receiver

This commit is contained in:
Peter F. Patel-Schneider 2024-03-09 15:47:17 -05:00
parent 0d225f6cb1
commit 87285faf7f
8 changed files with 129 additions and 106 deletions

View File

@ -91,6 +91,7 @@ class Device:
self._persister = None # persister holds settings
self._led_effects = self._firmware = self._keys = self._remap_keys = self._gestures = None
self._profiles = self._backlight = self._registers = self._settings = None
self.notification_flags = None
self._feature_settings_checked = False
self._gestures_lock = _threading.Lock()

View File

@ -64,34 +64,35 @@ def _process_receiver_notification(receiver, status, n):
assert n.sub_id & 0x40 == 0x40
if n.sub_id == 0x4A: # pairing lock notification
status.lock_open = bool(n.address & 0x01)
reason = _("pairing lock is open") if status.lock_open else _("pairing lock is closed")
receiver.pairing.lock_open = bool(n.address & 0x01)
reason = _("pairing lock is open") if receiver.pairing.lock_open else _("pairing lock is closed")
if logger.isEnabledFor(logging.INFO):
logger.info("%s: %s", receiver, reason)
status.error = None
if status.lock_open:
status.new_device = None
receiver.pairing.error = None
if receiver.pairing.lock_open:
receiver.pairing.new_device = None
pair_error = ord(n.data[:1])
if pair_error:
status.error = error_string = _hidpp10_constants.PAIRING_ERRORS[pair_error]
status.new_device = None
receiver.pairing.error = error_string = _hidpp10_constants.PAIRING_ERRORS[pair_error]
receiver.pairing.new_device = None
logger.warning("pairing error %d: %s", pair_error, error_string)
status.changed(reason=reason)
return True
elif n.sub_id == _R.discovery_status_notification: # Bolt pairing
with notification_lock:
status.discovering = n.address == 0x00
reason = _("discovery lock is open") if status.discovering else _("discovery lock is closed")
receiver.pairing.discovering = n.address == 0x00
reason = _("discovery lock is open") if receiver.pairing.discovering else _("discovery lock is closed")
if logger.isEnabledFor(logging.INFO):
logger.info("%s: %s", receiver, reason)
status.error = None
if status.discovering:
status.counter = status.device_address = status.device_authentication = status.device_name = None
status.device_passkey = None
receiver.pairing.error = None
if receiver.pairing.discovering:
receiver.pairing.counter = receiver.pairing.device_address = None
receiver.pairing.device_authentication = receiver.pairing.device_name = None
receiver.pairing.device_passkey = None
discover_error = ord(n.data[:1])
if discover_error:
status.error = discover_string = _hidpp10_constants.BOLT_PAIRING_ERRORS[discover_error]
receiver.pairing.error = discover_string = _hidpp10_constants.BOLT_PAIRING_ERRORS[discover_error]
logger.warning("bolt discovering error %d: %s", discover_error, discover_string)
status.changed(reason=reason)
return True
@ -99,44 +100,46 @@ def _process_receiver_notification(receiver, status, n):
elif n.sub_id == _R.device_discovery_notification: # Bolt pairing
with notification_lock:
counter = n.address + n.data[0] * 256 # notification counter
if status.counter is None:
status.counter = counter
if receiver.pairing.counter is None:
receiver.pairing.counter = counter
else:
if not status.counter == counter:
if not receiver.pairing.counter == counter:
return None
if n.data[1] == 0:
status.device_kind = n.data[3]
status.device_address = n.data[6:12]
status.device_authentication = n.data[14]
receiver.pairing.device_kind = n.data[3]
receiver.pairing.device_address = n.data[6:12]
receiver.pairing.device_authentication = n.data[14]
elif n.data[1] == 1:
status.device_name = n.data[3 : 3 + n.data[2]].decode("utf-8")
receiver.pairing.device_name = n.data[3 : 3 + n.data[2]].decode("utf-8")
return True
elif n.sub_id == _R.pairing_status_notification: # Bolt pairing
with notification_lock:
status.device_passkey = None
status.lock_open = n.address == 0x00
reason = _("pairing lock is open") if status.lock_open else _("pairing lock is closed")
receiver.pairing.device_passkey = None
receiver.pairing.lock_open = n.address == 0x00
reason = _("pairing lock is open") if receiver.pairing.lock_open else _("pairing lock is closed")
if logger.isEnabledFor(logging.INFO):
logger.info("%s: %s", receiver, reason)
status.error = None
if not status.lock_open:
status.counter = status.device_address = status.device_authentication = status.device_name = None
receiver.pairing.error = None
if not receiver.pairing.lock_open:
receiver.pairing.counter = (
receiver.pairing.device_address
) = receiver.pairing.device_authentication = receiver.pairing.device_name = None
pair_error = n.data[0]
if status.lock_open:
status.new_device = None
if receiver.pairing.lock_open:
receiver.pairing.new_device = None
elif n.address == 0x02 and not pair_error:
status.new_device = receiver.register_new_device(n.data[7])
receiver.pairing.new_device = receiver.register_new_device(n.data[7])
if pair_error:
status.error = error_string = _hidpp10_constants.BOLT_PAIRING_ERRORS[pair_error]
status.new_device = None
receiver.pairing.error = error_string = _hidpp10_constants.BOLT_PAIRING_ERRORS[pair_error]
receiver.pairing.new_device = None
logger.warning("pairing error %d: %s", pair_error, error_string)
status.changed(reason=reason)
return True
elif n.sub_id == _R.passkey_request_notification: # Bolt pairing
with notification_lock:
status.device_passkey = n.data[0:6].decode("utf-8")
receiver.pairing.device_passkey = n.data[0:6].decode("utf-8")
return True
elif n.sub_id == _R.passkey_pressed_notification: # Bolt pairing

View File

@ -18,6 +18,7 @@
import errno as _errno
import logging
from dataclasses import dataclass
from typing import Optional
import hidapi as _hid
@ -33,6 +34,22 @@ _R = hidpp10_constants.REGISTERS
_IR = hidpp10_constants.INFO_SUBREGISTERS
@dataclass
class Pairing:
"""Information about the current or most recent pairing"""
lock_open: bool = False
discovering: bool = False
counter: Optional[int] = None
device_address: Optional[bytes] = None
device_authentication: Optional[int] = None
device_kind: Optional[int] = None
device_name: Optional[str] = None
device_passkey: Optional[str] = None
new_device: Optional[Device] = None
error: Optional[any] = None
class Receiver:
"""A generic Receiver instance, mostly implementing the interface used on Unifying, Nano, and LightSpeed receivers"
The paired devices are available through the sequence interface.
@ -57,6 +74,8 @@ class Receiver:
self.name = product_info.get("name", "Receiver")
self.may_unpair = product_info.get("may_unpair", False)
self.re_pairs = product_info.get("re_pairs", False)
self.notification_flags = None
self.pairing = Pairing()
self.initialize(product_info)
def initialize(self, product_info: dict):

View File

@ -45,27 +45,26 @@ def attach_to(device, changed_callback):
class ReceiverStatus:
"""The 'runtime' status of a receiver, mostly about the pairing process --
is the pairing lock open or closed, any pairing errors, etc.
"""
"""The 'runtime' status of a receiver, currently vestigial."""
def __init__(self, receiver, changed_callback):
assert receiver
self._receiver = receiver
assert changed_callback
self._changed_callback = changed_callback
self.notification_flags = None
self.error = None
self.lock_open = False
self.discovering = False
self.counter = None
self.device_address = None
self.device_authentication = None
self.device_kind = None
self.device_name = None
self.device_passkey = None
self.new_device = None
# self.notification_flags = None
# self.error = None
# self.lock_open = False
# self.discovering = False
# self.counter = None
# self.device_address = None
# self.device_authentication = None
# self.device_kind = None
# self.device_name = None
# self.device_passkey = None
# self.new_device = None
def to_string(self):
count = len(self._receiver)
@ -96,8 +95,8 @@ class DeviceStatus:
self._active = None # is the device active?
self.battery = None
self.link_encrypted = None
self.notification_flags = None
self.error = None
# self.notification_flags = None
self.battery_error = None
def to_string(self):
return self.battery.to_str() if self.battery is not None else ""
@ -118,11 +117,11 @@ class DeviceStatus:
alert, reason = ALERT.NONE, None
if info.ok():
self.error = None
self.battery_error = None
else:
logger.warning("%s: battery %d%%, ALERT %s", self._device, info.level, info.status)
if self.error != info.status:
self.error = info.status
if self.battery_error != info.status:
self.battery_error = info.status
alert = ALERT.NOTIFICATION | ALERT.ATTENTION
reason = info.to_str()
@ -149,7 +148,7 @@ class DeviceStatus:
# get cleared when the device is turned off (but not when the device
# goes idle, and we can't tell the difference right now).
if d.protocol < 2.0:
self.notification_flags = d.enable_connection_notifications()
self._device.notification_flags = d.enable_connection_notifications()
# battery information may have changed so try to read it now
self.read_battery()

View File

@ -58,10 +58,10 @@ def run(receivers, args, find_receiver, _ignore):
kd, known_devices = known_devices, None # only process one connection notification
if kd is not None:
if n.devnumber not in kd:
receiver.status.new_device = receiver.register_new_device(n.devnumber, n)
receiver.pairing.new_device = receiver.register_new_device(n.devnumber, n)
elif receiver.re_pairs:
del receiver[n.devnumber] # get rid of information on device re-paired away
receiver.status.new_device = receiver.register_new_device(n.devnumber, n)
receiver.pairing.new_device = receiver.register_new_device(n.devnumber, n)
timeout = 30 # seconds
receiver.handle = _HandleWithNotificationHook(receiver.handle)
@ -71,17 +71,17 @@ def run(receivers, args, find_receiver, _ignore):
print("Bolt Pairing: long-press the pairing key or button on your device (timing out in", timeout, "seconds).")
pairing_start = _timestamp()
patience = 5 # the discovering notification may come slightly later, so be patient
while receiver.status.discovering or _timestamp() - pairing_start < patience:
if receiver.status.device_address and receiver.status.device_authentication and receiver.status.device_name:
while receiver.pairing.discovering or _timestamp() - pairing_start < patience:
if receiver.pairing.device_address and receiver.pairing.device_authentication and receiver.pairing.device_name:
break
n = _base.read(receiver.handle)
n = _base.make_notification(*n) if n else None
if n:
receiver.handle.notifications_hook(n)
address = receiver.status.device_address
name = receiver.status.device_name
authentication = receiver.status.device_authentication
kind = receiver.status.device_kind
address = receiver.pairing.device_address
name = receiver.pairing.device_name
authentication = receiver.pairing.device_authentication
kind = receiver.pairing.device_kind
print(f"Bolt Pairing: discovered {name}")
receiver.pair_device(
address=address,
@ -90,21 +90,21 @@ def run(receivers, args, find_receiver, _ignore):
)
pairing_start = _timestamp()
patience = 5 # the discovering notification may come slightly later, so be patient
while receiver.status.lock_open or _timestamp() - pairing_start < patience:
if receiver.status.device_passkey:
while receiver.pairing.lock_open or _timestamp() - pairing_start < patience:
if receiver.pairing.device_passkey:
break
n = _base.read(receiver.handle)
n = _base.make_notification(*n) if n else None
if n:
receiver.handle.notifications_hook(n)
if authentication & 0x01:
print(f"Bolt Pairing: type passkey {receiver.status.device_passkey} and then press the enter key")
print(f"Bolt Pairing: type passkey {receiver.pairing.device_passkey} and then press the enter key")
else:
passkey = f"{int(receiver.status.device_passkey):010b}"
passkey = f"{int(receiver.pairing.device_passkey):010b}"
passkey = ", ".join(["right" if bit == "1" else "left" for bit in passkey])
print(f"Bolt Pairing: press {passkey}")
print("and then press left and right buttons simultaneously")
while receiver.status.lock_open:
while receiver.pairing.lock_open:
n = _base.read(receiver.handle)
n = _base.make_notification(*n) if n else None
if n:
@ -115,7 +115,7 @@ def run(receivers, args, find_receiver, _ignore):
print("Pairing: turn your new device on (timing out in", timeout, "seconds).")
pairing_start = _timestamp()
patience = 5 # the lock-open notification may come slightly later, wait for it a bit
while receiver.status.lock_open or _timestamp() - pairing_start < patience:
while receiver.pairing.lock_open or _timestamp() - pairing_start < patience:
n = _base.read(receiver.handle)
if n:
n = _base.make_notification(*n)
@ -127,11 +127,11 @@ def run(receivers, args, find_receiver, _ignore):
# concurrently running Solaar app might stop working properly
_hidpp10.set_notification_flags(receiver, old_notification_flags)
if receiver.status.new_device:
dev = receiver.status.new_device
if receiver.pairing.new_device:
dev = receiver.pairing.new_device
print("Paired device %d: %s (%s) [%s:%s]" % (dev.number, dev.name, dev.codename, dev.wpid, dev.serial))
else:
error = receiver.status.get(_status.error)
error = receiver.pairing.error
if error:
raise Exception("pairing failed: %s" % error)
else:

View File

@ -188,12 +188,12 @@ class ReceiverListener(_listener.EventsListener):
if self.receiver.read_register(_R.receiver_info, _IR.pairing_information + n.devnumber - 1) is None:
return
dev = self.receiver.register_new_device(n.devnumber, n)
elif self.receiver.status.lock_open and self.receiver.re_pairs and not ord(n.data[0:1]) & 0x40:
elif self.receiver.pairing.lock_open and self.receiver.re_pairs and not ord(n.data[0:1]) & 0x40:
dev = self.receiver[n.devnumber]
del self.receiver[n.devnumber] # get rid of information on device re-paired away
self._status_changed(dev) # signal that this device has changed
dev = self.receiver.register_new_device(n.devnumber, n)
self.receiver.status.new_device = self.receiver[n.devnumber]
self.receiver.pairing.new_device = self.receiver[n.devnumber]
else:
dev = self.receiver[n.devnumber]
else:
@ -224,12 +224,13 @@ class ReceiverListener(_listener.EventsListener):
else:
_notifications.process(dev, n)
if self.receiver.status.lock_open and not already_known:
if self.receiver.pairing.lock_open and not already_known:
# this should be the first notification after a device was paired
assert n.sub_id == 0x41, "first notification was not a connection notification"
if logger.isEnabledFor(logging.WARNING):
logger.warning("first notification was not a connection notification")
if logger.isEnabledFor(logging.INFO):
logger.info("%s: pairing detected new device", self.receiver)
self.receiver.status.new_device = dev
self.receiver.pairing.new_device = dev
elif dev.online is None:
dev.ping()

View File

@ -71,23 +71,23 @@ def _check_lock_state(assistant, receiver, count=2):
logger.debug("assistant %s destroyed, bailing out", assistant)
return False
if receiver.status.error:
# receiver.status.new_device = _fake_device(receiver)
_pairing_failed(assistant, receiver, receiver.status.error)
receiver.status.error = None
if receiver.pairing.error:
# receiver.pairing.new_device = _fake_device(receiver)
_pairing_failed(assistant, receiver, receiver.pairing.error)
receiver.pairing.error = None
return False
if receiver.status.new_device:
if receiver.pairing.new_device:
receiver.remaining_pairings(False) # Update remaining pairings
device, receiver.status.new_device = receiver.status.new_device, None
device, receiver.pairing.new_device = receiver.pairing.new_device, None
_pairing_succeeded(assistant, receiver, device)
return False
elif receiver.status.device_address and receiver.status.device_name and not address:
address = receiver.status.device_address
name = receiver.status.device_name
kind = receiver.status.device_kind
authentication = receiver.status.device_authentication
name = receiver.status.device_name
elif receiver.pairing.device_address and receiver.pairing.device_name and not address:
address = receiver.pairing.device_address
name = receiver.pairing.device_name
kind = receiver.pairing.device_kind
authentication = receiver.pairing.device_authentication
name = receiver.pairing.device_name
entropy = 10
if kind == _hidpp10_constants.DEVICE_KIND.keyboard:
entropy = 20
@ -100,12 +100,12 @@ def _check_lock_state(assistant, receiver, count=2):
else:
_pairing_failed(assistant, receiver, "failed to open pairing lock")
return False
elif address and receiver.status.device_passkey and not passcode:
passcode = receiver.status.device_passkey
elif address and receiver.pairing.device_passkey and not passcode:
passcode = receiver.pairing.device_passkey
_show_passcode(assistant, receiver, passcode)
return True
if not receiver.status.lock_open and not receiver.status.discovering:
if not receiver.pairing.lock_open and not receiver.pairing.discovering:
if count > 0:
# the actual device notification may arrive later so have a little patience
GLib.timeout_add(_STATUS_CHECK, _check_lock_state, assistant, receiver, count - 1)
@ -119,16 +119,16 @@ def _check_lock_state(assistant, receiver, count=2):
def _show_passcode(assistant, receiver, passkey):
if logger.isEnabledFor(logging.DEBUG):
logger.debug("%s show passkey: %s", receiver, passkey)
name = receiver.status.device_name
authentication = receiver.status.device_authentication
name = receiver.pairing.device_name
authentication = receiver.pairing.device_authentication
intro_text = _("%(receiver_name)s: pair new device") % {"receiver_name": receiver.name}
page_text = _("Enter passcode on %(name)s.") % {"name": name}
page_text += "\n"
if authentication & 0x01:
page_text += _("Type %(passcode)s and then press the enter key.") % {"passcode": receiver.status.device_passkey}
page_text += _("Type %(passcode)s and then press the enter key.") % {"passcode": receiver.pairing.device_passkey}
else:
passcode = ", ".join(
[_("right") if bit == "1" else _("left") for bit in f"{int(receiver.status.device_passkey):010b}"]
[_("right") if bit == "1" else _("left") for bit in f"{int(receiver.pairing.device_passkey):010b}"]
)
page_text += _("Press %(code)s\nand then press left and right buttons simultaneously.") % {"code": passcode}
page = _create_page(assistant, Gtk.AssistantPageType.PROGRESS, intro_text, "preferences-desktop-peripherals", page_text)
@ -144,8 +144,8 @@ def _prepare(assistant, page, receiver):
if index == 0:
if receiver.receiver_kind == "bolt":
if receiver.discover(timeout=_PAIRING_TIMEOUT):
assert receiver.status.new_device is None
assert receiver.status.error is None
assert receiver.pairing.new_device is None
assert receiver.pairing.error is None
spinner = page.get_children()[-1]
spinner.start()
GLib.timeout_add(_STATUS_CHECK, _check_lock_state, assistant, receiver)
@ -153,8 +153,8 @@ def _prepare(assistant, page, receiver):
else:
GLib.idle_add(_pairing_failed, assistant, receiver, "discovery did not start")
elif receiver.set_lock(False, timeout=_PAIRING_TIMEOUT):
assert receiver.status.new_device is None
assert receiver.status.error is None
assert receiver.pairing.new_device is None
assert receiver.pairing.error is None
spinner = page.get_children()[-1]
spinner.start()
GLib.timeout_add(_STATUS_CHECK, _check_lock_state, assistant, receiver)
@ -169,16 +169,16 @@ def _finish(assistant, receiver):
if logger.isEnabledFor(logging.DEBUG):
logger.debug("finish %s", assistant)
assistant.destroy()
receiver.status.new_device = None
if receiver.status.lock_open:
receiver.pairing.new_device = None
if receiver.pairing.lock_open:
if receiver.receiver_kind == "bolt":
receiver.pair_device("cancel")
else:
receiver.set_lock()
if receiver.status.discovering:
if receiver.pairing.discovering:
receiver.discover(True)
if not receiver.status.lock_open and not receiver.status.discovering:
receiver.status.error = None
if not receiver.pairing.lock_open and not receiver.pairing.discovering:
receiver.pairing.error = None
def _pairing_failed(assistant, receiver, error):

View File

@ -573,7 +573,7 @@ def _update_details(button):
elif device.kind is None or device.online:
yield (" %s" % _("Firmware"), "...")
flag_bits = device.status.notification_flags
flag_bits = device.notification_flags
if flag_bits is not None:
flag_names = (
("(%s)" % _("none"),) if flag_bits == 0 else _hidpp10_constants.NOTIFICATION_FLAG.flag_names(flag_bits)
@ -648,7 +648,7 @@ def _update_receiver_panel(receiver, panel, buttons, full=False):
panel._count.set_markup(paired_text)
is_pairing = receiver.status.lock_open
is_pairing = receiver.pairing.lock_open
if is_pairing:
panel._scanning.set_visible(True)
if not panel._spinner.get_visible():
@ -872,7 +872,7 @@ def update(device, need_popup=False, refresh=False):
if is_alive and item:
was_pairing = bool(_model.get_value(item, _COLUMN.STATUS_ICON))
is_pairing = (not device.isDevice) and bool(device.status.lock_open)
is_pairing = (not device.isDevice) and bool(device.pairing.lock_open)
_model.set_value(item, _COLUMN.STATUS_ICON, "network-wireless" if is_pairing else _CAN_SET_ROW_NONE)
if selected_device_id == (device.path, 0):