diff --git a/lib/logitech_receiver/device.py b/lib/logitech_receiver/device.py index e6ef12f8..ac125202 100644 --- a/lib/logitech_receiver/device.py +++ b/lib/logitech_receiver/device.py @@ -91,6 +91,7 @@ class Device: self._persister = None # persister holds settings self._led_effects = self._firmware = self._keys = self._remap_keys = self._gestures = None self._profiles = self._backlight = self._registers = self._settings = None + self.notification_flags = None self._feature_settings_checked = False self._gestures_lock = _threading.Lock() diff --git a/lib/logitech_receiver/notifications.py b/lib/logitech_receiver/notifications.py index 5535d9ed..937c49dd 100644 --- a/lib/logitech_receiver/notifications.py +++ b/lib/logitech_receiver/notifications.py @@ -64,34 +64,35 @@ def _process_receiver_notification(receiver, status, n): assert n.sub_id & 0x40 == 0x40 if n.sub_id == 0x4A: # pairing lock notification - status.lock_open = bool(n.address & 0x01) - reason = _("pairing lock is open") if status.lock_open else _("pairing lock is closed") + receiver.pairing.lock_open = bool(n.address & 0x01) + reason = _("pairing lock is open") if receiver.pairing.lock_open else _("pairing lock is closed") if logger.isEnabledFor(logging.INFO): logger.info("%s: %s", receiver, reason) - status.error = None - if status.lock_open: - status.new_device = None + receiver.pairing.error = None + if receiver.pairing.lock_open: + receiver.pairing.new_device = None pair_error = ord(n.data[:1]) if pair_error: - status.error = error_string = _hidpp10_constants.PAIRING_ERRORS[pair_error] - status.new_device = None + receiver.pairing.error = error_string = _hidpp10_constants.PAIRING_ERRORS[pair_error] + receiver.pairing.new_device = None logger.warning("pairing error %d: %s", pair_error, error_string) status.changed(reason=reason) return True elif n.sub_id == _R.discovery_status_notification: # Bolt pairing with notification_lock: - status.discovering = n.address == 0x00 - reason = _("discovery lock is open") if status.discovering else _("discovery lock is closed") + receiver.pairing.discovering = n.address == 0x00 + reason = _("discovery lock is open") if receiver.pairing.discovering else _("discovery lock is closed") if logger.isEnabledFor(logging.INFO): logger.info("%s: %s", receiver, reason) - status.error = None - if status.discovering: - status.counter = status.device_address = status.device_authentication = status.device_name = None - status.device_passkey = None + receiver.pairing.error = None + if receiver.pairing.discovering: + receiver.pairing.counter = receiver.pairing.device_address = None + receiver.pairing.device_authentication = receiver.pairing.device_name = None + receiver.pairing.device_passkey = None discover_error = ord(n.data[:1]) if discover_error: - status.error = discover_string = _hidpp10_constants.BOLT_PAIRING_ERRORS[discover_error] + receiver.pairing.error = discover_string = _hidpp10_constants.BOLT_PAIRING_ERRORS[discover_error] logger.warning("bolt discovering error %d: %s", discover_error, discover_string) status.changed(reason=reason) return True @@ -99,44 +100,46 @@ def _process_receiver_notification(receiver, status, n): elif n.sub_id == _R.device_discovery_notification: # Bolt pairing with notification_lock: counter = n.address + n.data[0] * 256 # notification counter - if status.counter is None: - status.counter = counter + if receiver.pairing.counter is None: + receiver.pairing.counter = counter else: - if not status.counter == counter: + if not receiver.pairing.counter == counter: return None if n.data[1] == 0: - status.device_kind = n.data[3] - status.device_address = n.data[6:12] - status.device_authentication = n.data[14] + receiver.pairing.device_kind = n.data[3] + receiver.pairing.device_address = n.data[6:12] + receiver.pairing.device_authentication = n.data[14] elif n.data[1] == 1: - status.device_name = n.data[3 : 3 + n.data[2]].decode("utf-8") + receiver.pairing.device_name = n.data[3 : 3 + n.data[2]].decode("utf-8") return True elif n.sub_id == _R.pairing_status_notification: # Bolt pairing with notification_lock: - status.device_passkey = None - status.lock_open = n.address == 0x00 - reason = _("pairing lock is open") if status.lock_open else _("pairing lock is closed") + receiver.pairing.device_passkey = None + receiver.pairing.lock_open = n.address == 0x00 + reason = _("pairing lock is open") if receiver.pairing.lock_open else _("pairing lock is closed") if logger.isEnabledFor(logging.INFO): logger.info("%s: %s", receiver, reason) - status.error = None - if not status.lock_open: - status.counter = status.device_address = status.device_authentication = status.device_name = None + receiver.pairing.error = None + if not receiver.pairing.lock_open: + receiver.pairing.counter = ( + receiver.pairing.device_address + ) = receiver.pairing.device_authentication = receiver.pairing.device_name = None pair_error = n.data[0] - if status.lock_open: - status.new_device = None + if receiver.pairing.lock_open: + receiver.pairing.new_device = None elif n.address == 0x02 and not pair_error: - status.new_device = receiver.register_new_device(n.data[7]) + receiver.pairing.new_device = receiver.register_new_device(n.data[7]) if pair_error: - status.error = error_string = _hidpp10_constants.BOLT_PAIRING_ERRORS[pair_error] - status.new_device = None + receiver.pairing.error = error_string = _hidpp10_constants.BOLT_PAIRING_ERRORS[pair_error] + receiver.pairing.new_device = None logger.warning("pairing error %d: %s", pair_error, error_string) status.changed(reason=reason) return True elif n.sub_id == _R.passkey_request_notification: # Bolt pairing with notification_lock: - status.device_passkey = n.data[0:6].decode("utf-8") + receiver.pairing.device_passkey = n.data[0:6].decode("utf-8") return True elif n.sub_id == _R.passkey_pressed_notification: # Bolt pairing diff --git a/lib/logitech_receiver/receiver.py b/lib/logitech_receiver/receiver.py index dd3c99b1..c2e5f375 100644 --- a/lib/logitech_receiver/receiver.py +++ b/lib/logitech_receiver/receiver.py @@ -18,6 +18,7 @@ import errno as _errno import logging +from dataclasses import dataclass from typing import Optional import hidapi as _hid @@ -33,6 +34,22 @@ _R = hidpp10_constants.REGISTERS _IR = hidpp10_constants.INFO_SUBREGISTERS +@dataclass +class Pairing: + """Information about the current or most recent pairing""" + + lock_open: bool = False + discovering: bool = False + counter: Optional[int] = None + device_address: Optional[bytes] = None + device_authentication: Optional[int] = None + device_kind: Optional[int] = None + device_name: Optional[str] = None + device_passkey: Optional[str] = None + new_device: Optional[Device] = None + error: Optional[any] = None + + class Receiver: """A generic Receiver instance, mostly implementing the interface used on Unifying, Nano, and LightSpeed receivers" The paired devices are available through the sequence interface. @@ -57,6 +74,8 @@ class Receiver: self.name = product_info.get("name", "Receiver") self.may_unpair = product_info.get("may_unpair", False) self.re_pairs = product_info.get("re_pairs", False) + self.notification_flags = None + self.pairing = Pairing() self.initialize(product_info) def initialize(self, product_info: dict): diff --git a/lib/logitech_receiver/status.py b/lib/logitech_receiver/status.py index 11a0a920..596f18f2 100644 --- a/lib/logitech_receiver/status.py +++ b/lib/logitech_receiver/status.py @@ -45,27 +45,26 @@ def attach_to(device, changed_callback): class ReceiverStatus: - """The 'runtime' status of a receiver, mostly about the pairing process -- - is the pairing lock open or closed, any pairing errors, etc. - """ + """The 'runtime' status of a receiver, currently vestigial.""" def __init__(self, receiver, changed_callback): assert receiver self._receiver = receiver assert changed_callback self._changed_callback = changed_callback - self.notification_flags = None - self.error = None - self.lock_open = False - self.discovering = False - self.counter = None - self.device_address = None - self.device_authentication = None - self.device_kind = None - self.device_name = None - self.device_passkey = None - self.new_device = None + # self.notification_flags = None + # self.error = None + + # self.lock_open = False + # self.discovering = False + # self.counter = None + # self.device_address = None + # self.device_authentication = None + # self.device_kind = None + # self.device_name = None + # self.device_passkey = None + # self.new_device = None def to_string(self): count = len(self._receiver) @@ -96,8 +95,8 @@ class DeviceStatus: self._active = None # is the device active? self.battery = None self.link_encrypted = None - self.notification_flags = None - self.error = None + # self.notification_flags = None + self.battery_error = None def to_string(self): return self.battery.to_str() if self.battery is not None else "" @@ -118,11 +117,11 @@ class DeviceStatus: alert, reason = ALERT.NONE, None if info.ok(): - self.error = None + self.battery_error = None else: logger.warning("%s: battery %d%%, ALERT %s", self._device, info.level, info.status) - if self.error != info.status: - self.error = info.status + if self.battery_error != info.status: + self.battery_error = info.status alert = ALERT.NOTIFICATION | ALERT.ATTENTION reason = info.to_str() @@ -149,7 +148,7 @@ class DeviceStatus: # get cleared when the device is turned off (but not when the device # goes idle, and we can't tell the difference right now). if d.protocol < 2.0: - self.notification_flags = d.enable_connection_notifications() + self._device.notification_flags = d.enable_connection_notifications() # battery information may have changed so try to read it now self.read_battery() diff --git a/lib/solaar/cli/pair.py b/lib/solaar/cli/pair.py index 626f680b..445c60ff 100644 --- a/lib/solaar/cli/pair.py +++ b/lib/solaar/cli/pair.py @@ -58,10 +58,10 @@ def run(receivers, args, find_receiver, _ignore): kd, known_devices = known_devices, None # only process one connection notification if kd is not None: if n.devnumber not in kd: - receiver.status.new_device = receiver.register_new_device(n.devnumber, n) + receiver.pairing.new_device = receiver.register_new_device(n.devnumber, n) elif receiver.re_pairs: del receiver[n.devnumber] # get rid of information on device re-paired away - receiver.status.new_device = receiver.register_new_device(n.devnumber, n) + receiver.pairing.new_device = receiver.register_new_device(n.devnumber, n) timeout = 30 # seconds receiver.handle = _HandleWithNotificationHook(receiver.handle) @@ -71,17 +71,17 @@ def run(receivers, args, find_receiver, _ignore): print("Bolt Pairing: long-press the pairing key or button on your device (timing out in", timeout, "seconds).") pairing_start = _timestamp() patience = 5 # the discovering notification may come slightly later, so be patient - while receiver.status.discovering or _timestamp() - pairing_start < patience: - if receiver.status.device_address and receiver.status.device_authentication and receiver.status.device_name: + while receiver.pairing.discovering or _timestamp() - pairing_start < patience: + if receiver.pairing.device_address and receiver.pairing.device_authentication and receiver.pairing.device_name: break n = _base.read(receiver.handle) n = _base.make_notification(*n) if n else None if n: receiver.handle.notifications_hook(n) - address = receiver.status.device_address - name = receiver.status.device_name - authentication = receiver.status.device_authentication - kind = receiver.status.device_kind + address = receiver.pairing.device_address + name = receiver.pairing.device_name + authentication = receiver.pairing.device_authentication + kind = receiver.pairing.device_kind print(f"Bolt Pairing: discovered {name}") receiver.pair_device( address=address, @@ -90,21 +90,21 @@ def run(receivers, args, find_receiver, _ignore): ) pairing_start = _timestamp() patience = 5 # the discovering notification may come slightly later, so be patient - while receiver.status.lock_open or _timestamp() - pairing_start < patience: - if receiver.status.device_passkey: + while receiver.pairing.lock_open or _timestamp() - pairing_start < patience: + if receiver.pairing.device_passkey: break n = _base.read(receiver.handle) n = _base.make_notification(*n) if n else None if n: receiver.handle.notifications_hook(n) if authentication & 0x01: - print(f"Bolt Pairing: type passkey {receiver.status.device_passkey} and then press the enter key") + print(f"Bolt Pairing: type passkey {receiver.pairing.device_passkey} and then press the enter key") else: - passkey = f"{int(receiver.status.device_passkey):010b}" + passkey = f"{int(receiver.pairing.device_passkey):010b}" passkey = ", ".join(["right" if bit == "1" else "left" for bit in passkey]) print(f"Bolt Pairing: press {passkey}") print("and then press left and right buttons simultaneously") - while receiver.status.lock_open: + while receiver.pairing.lock_open: n = _base.read(receiver.handle) n = _base.make_notification(*n) if n else None if n: @@ -115,7 +115,7 @@ def run(receivers, args, find_receiver, _ignore): print("Pairing: turn your new device on (timing out in", timeout, "seconds).") pairing_start = _timestamp() patience = 5 # the lock-open notification may come slightly later, wait for it a bit - while receiver.status.lock_open or _timestamp() - pairing_start < patience: + while receiver.pairing.lock_open or _timestamp() - pairing_start < patience: n = _base.read(receiver.handle) if n: n = _base.make_notification(*n) @@ -127,11 +127,11 @@ def run(receivers, args, find_receiver, _ignore): # concurrently running Solaar app might stop working properly _hidpp10.set_notification_flags(receiver, old_notification_flags) - if receiver.status.new_device: - dev = receiver.status.new_device + if receiver.pairing.new_device: + dev = receiver.pairing.new_device print("Paired device %d: %s (%s) [%s:%s]" % (dev.number, dev.name, dev.codename, dev.wpid, dev.serial)) else: - error = receiver.status.get(_status.error) + error = receiver.pairing.error if error: raise Exception("pairing failed: %s" % error) else: diff --git a/lib/solaar/listener.py b/lib/solaar/listener.py index beff6885..f25ee0fc 100644 --- a/lib/solaar/listener.py +++ b/lib/solaar/listener.py @@ -188,12 +188,12 @@ class ReceiverListener(_listener.EventsListener): if self.receiver.read_register(_R.receiver_info, _IR.pairing_information + n.devnumber - 1) is None: return dev = self.receiver.register_new_device(n.devnumber, n) - elif self.receiver.status.lock_open and self.receiver.re_pairs and not ord(n.data[0:1]) & 0x40: + elif self.receiver.pairing.lock_open and self.receiver.re_pairs and not ord(n.data[0:1]) & 0x40: dev = self.receiver[n.devnumber] del self.receiver[n.devnumber] # get rid of information on device re-paired away self._status_changed(dev) # signal that this device has changed dev = self.receiver.register_new_device(n.devnumber, n) - self.receiver.status.new_device = self.receiver[n.devnumber] + self.receiver.pairing.new_device = self.receiver[n.devnumber] else: dev = self.receiver[n.devnumber] else: @@ -224,12 +224,13 @@ class ReceiverListener(_listener.EventsListener): else: _notifications.process(dev, n) - if self.receiver.status.lock_open and not already_known: + if self.receiver.pairing.lock_open and not already_known: # this should be the first notification after a device was paired - assert n.sub_id == 0x41, "first notification was not a connection notification" + if logger.isEnabledFor(logging.WARNING): + logger.warning("first notification was not a connection notification") if logger.isEnabledFor(logging.INFO): logger.info("%s: pairing detected new device", self.receiver) - self.receiver.status.new_device = dev + self.receiver.pairing.new_device = dev elif dev.online is None: dev.ping() diff --git a/lib/solaar/ui/pair_window.py b/lib/solaar/ui/pair_window.py index 052129c5..c90bd3c5 100644 --- a/lib/solaar/ui/pair_window.py +++ b/lib/solaar/ui/pair_window.py @@ -71,23 +71,23 @@ def _check_lock_state(assistant, receiver, count=2): logger.debug("assistant %s destroyed, bailing out", assistant) return False - if receiver.status.error: - # receiver.status.new_device = _fake_device(receiver) - _pairing_failed(assistant, receiver, receiver.status.error) - receiver.status.error = None + if receiver.pairing.error: + # receiver.pairing.new_device = _fake_device(receiver) + _pairing_failed(assistant, receiver, receiver.pairing.error) + receiver.pairing.error = None return False - if receiver.status.new_device: + if receiver.pairing.new_device: receiver.remaining_pairings(False) # Update remaining pairings - device, receiver.status.new_device = receiver.status.new_device, None + device, receiver.pairing.new_device = receiver.pairing.new_device, None _pairing_succeeded(assistant, receiver, device) return False - elif receiver.status.device_address and receiver.status.device_name and not address: - address = receiver.status.device_address - name = receiver.status.device_name - kind = receiver.status.device_kind - authentication = receiver.status.device_authentication - name = receiver.status.device_name + elif receiver.pairing.device_address and receiver.pairing.device_name and not address: + address = receiver.pairing.device_address + name = receiver.pairing.device_name + kind = receiver.pairing.device_kind + authentication = receiver.pairing.device_authentication + name = receiver.pairing.device_name entropy = 10 if kind == _hidpp10_constants.DEVICE_KIND.keyboard: entropy = 20 @@ -100,12 +100,12 @@ def _check_lock_state(assistant, receiver, count=2): else: _pairing_failed(assistant, receiver, "failed to open pairing lock") return False - elif address and receiver.status.device_passkey and not passcode: - passcode = receiver.status.device_passkey + elif address and receiver.pairing.device_passkey and not passcode: + passcode = receiver.pairing.device_passkey _show_passcode(assistant, receiver, passcode) return True - if not receiver.status.lock_open and not receiver.status.discovering: + if not receiver.pairing.lock_open and not receiver.pairing.discovering: if count > 0: # the actual device notification may arrive later so have a little patience GLib.timeout_add(_STATUS_CHECK, _check_lock_state, assistant, receiver, count - 1) @@ -119,16 +119,16 @@ def _check_lock_state(assistant, receiver, count=2): def _show_passcode(assistant, receiver, passkey): if logger.isEnabledFor(logging.DEBUG): logger.debug("%s show passkey: %s", receiver, passkey) - name = receiver.status.device_name - authentication = receiver.status.device_authentication + name = receiver.pairing.device_name + authentication = receiver.pairing.device_authentication intro_text = _("%(receiver_name)s: pair new device") % {"receiver_name": receiver.name} page_text = _("Enter passcode on %(name)s.") % {"name": name} page_text += "\n" if authentication & 0x01: - page_text += _("Type %(passcode)s and then press the enter key.") % {"passcode": receiver.status.device_passkey} + page_text += _("Type %(passcode)s and then press the enter key.") % {"passcode": receiver.pairing.device_passkey} else: passcode = ", ".join( - [_("right") if bit == "1" else _("left") for bit in f"{int(receiver.status.device_passkey):010b}"] + [_("right") if bit == "1" else _("left") for bit in f"{int(receiver.pairing.device_passkey):010b}"] ) page_text += _("Press %(code)s\nand then press left and right buttons simultaneously.") % {"code": passcode} page = _create_page(assistant, Gtk.AssistantPageType.PROGRESS, intro_text, "preferences-desktop-peripherals", page_text) @@ -144,8 +144,8 @@ def _prepare(assistant, page, receiver): if index == 0: if receiver.receiver_kind == "bolt": if receiver.discover(timeout=_PAIRING_TIMEOUT): - assert receiver.status.new_device is None - assert receiver.status.error is None + assert receiver.pairing.new_device is None + assert receiver.pairing.error is None spinner = page.get_children()[-1] spinner.start() GLib.timeout_add(_STATUS_CHECK, _check_lock_state, assistant, receiver) @@ -153,8 +153,8 @@ def _prepare(assistant, page, receiver): else: GLib.idle_add(_pairing_failed, assistant, receiver, "discovery did not start") elif receiver.set_lock(False, timeout=_PAIRING_TIMEOUT): - assert receiver.status.new_device is None - assert receiver.status.error is None + assert receiver.pairing.new_device is None + assert receiver.pairing.error is None spinner = page.get_children()[-1] spinner.start() GLib.timeout_add(_STATUS_CHECK, _check_lock_state, assistant, receiver) @@ -169,16 +169,16 @@ def _finish(assistant, receiver): if logger.isEnabledFor(logging.DEBUG): logger.debug("finish %s", assistant) assistant.destroy() - receiver.status.new_device = None - if receiver.status.lock_open: + receiver.pairing.new_device = None + if receiver.pairing.lock_open: if receiver.receiver_kind == "bolt": receiver.pair_device("cancel") else: receiver.set_lock() - if receiver.status.discovering: + if receiver.pairing.discovering: receiver.discover(True) - if not receiver.status.lock_open and not receiver.status.discovering: - receiver.status.error = None + if not receiver.pairing.lock_open and not receiver.pairing.discovering: + receiver.pairing.error = None def _pairing_failed(assistant, receiver, error): diff --git a/lib/solaar/ui/window.py b/lib/solaar/ui/window.py index 999439a9..4ab3f3d8 100644 --- a/lib/solaar/ui/window.py +++ b/lib/solaar/ui/window.py @@ -573,7 +573,7 @@ def _update_details(button): elif device.kind is None or device.online: yield (" %s" % _("Firmware"), "...") - flag_bits = device.status.notification_flags + flag_bits = device.notification_flags if flag_bits is not None: flag_names = ( ("(%s)" % _("none"),) if flag_bits == 0 else _hidpp10_constants.NOTIFICATION_FLAG.flag_names(flag_bits) @@ -648,7 +648,7 @@ def _update_receiver_panel(receiver, panel, buttons, full=False): panel._count.set_markup(paired_text) - is_pairing = receiver.status.lock_open + is_pairing = receiver.pairing.lock_open if is_pairing: panel._scanning.set_visible(True) if not panel._spinner.get_visible(): @@ -872,7 +872,7 @@ def update(device, need_popup=False, refresh=False): if is_alive and item: was_pairing = bool(_model.get_value(item, _COLUMN.STATUS_ICON)) - is_pairing = (not device.isDevice) and bool(device.status.lock_open) + is_pairing = (not device.isDevice) and bool(device.pairing.lock_open) _model.set_value(item, _COLUMN.STATUS_ICON, "network-wireless" if is_pairing else _CAN_SET_ROW_NONE) if selected_device_id == (device.path, 0):