From 603fbe96e074432fa9dfc26c014cb6c1d8cdef31 Mon Sep 17 00:00:00 2001 From: "Peter F. Patel-Schneider" Date: Mon, 8 Nov 2021 21:47:36 -0500 Subject: [PATCH] ui: add pairing for Bolt receivers --- lib/logitech_receiver/notifications.py | 15 ++++- lib/logitech_receiver/receiver.py | 11 ++-- lib/solaar/ui/pair_window.py | 85 +++++++++++++++++++++++--- 3 files changed, 96 insertions(+), 15 deletions(-) diff --git a/lib/logitech_receiver/notifications.py b/lib/logitech_receiver/notifications.py index 4216639f..64961b94 100644 --- a/lib/logitech_receiver/notifications.py +++ b/lib/logitech_receiver/notifications.py @@ -88,12 +88,18 @@ def _process_receiver_notification(receiver, status, n): elif n.sub_id == _R.discovery_status_notification: # Bolt pairing with notification_lock: status.discovering = n.address == 0x00 - status.counter = status.device_address = status.device_authentication = status.device_name = None + reason = (_('discovery lock is open') if status.discovering else _('discovery lock is closed')) + if _log.isEnabledFor(_INFO): + _log.info('%s: %s', receiver, reason) + status[_K.ERROR] = None + if status.discovering: + status.counter = status.device_address = status.device_authentication = status.device_name = None status.device_passkey = None discover_error = ord(n.data[:1]) if discover_error: status[_K.ERROR] = discover_string = _hidpp10.BOLT_PAIRING_ERRORS[discover_error] _log.warn('bolt discovering error %d: %s', discover_error, discover_string) + status.changed(reason=reason) return True elif n.sub_id == _R.device_discovery_notification: # Bolt pairing @@ -116,6 +122,12 @@ def _process_receiver_notification(receiver, status, n): with notification_lock: status.device_passkey = None status.lock_open = n.address == 0x00 + reason = (_('pairing lock is open') if status.lock_open else _('pairing lock is closed')) + if _log.isEnabledFor(_INFO): + _log.info('%s: %s', receiver, reason) + status[_K.ERROR] = None + if not status.lock_open: + status.counter = status.device_address = status.device_authentication = status.device_name = None pair_error = n.data[0] if status.lock_open: status.new_device = None @@ -125,6 +137,7 @@ def _process_receiver_notification(receiver, status, n): status[_K.ERROR] = error_string = _hidpp10.BOLT_PAIRING_ERRORS[pair_error] status.new_device = None _log.warn('pairing error %d: %s', pair_error, error_string) + status.changed(reason=reason) return True elif n.sub_id == _R.passkey_request_notification: # Bolt pairing diff --git a/lib/logitech_receiver/receiver.py b/lib/logitech_receiver/receiver.py index 812160ab..963a4f81 100644 --- a/lib/logitech_receiver/receiver.py +++ b/lib/logitech_receiver/receiver.py @@ -262,10 +262,10 @@ class Receiver: return True _log.warn('%s: failed to %s device discovery', self, 'cancel' if cancel else 'start') - def pair_device(self, pair=True, slot=0, address=b'\0\0\0\0\0\0', authentication=None, entropy=20): # Bolt pairing + def pair_device(self, pair=True, slot=0, address=b'\0\0\0\0\0\0', authentication=0x00, entropy=20): # Bolt pairing assert self.receiver_kind == 'bolt' if self.handle: - action = 0x01 if pair else 0x03 + action = 0x01 if pair is True else 0x03 if pair is False else 0x02 reply = self.write_register(_R.bolt_pairing, action, slot, address, authentication, entropy) if reply: return True @@ -331,10 +331,11 @@ class Receiver: if key in self._devices: del self._devices[key] _log.warn('%s removed device %s', self, dev) - elif self.receiver_kind == 'bolt': - reply = self.write_register(_R.bolt_pairing, 0x03, key) else: - reply = self.write_register(_R.receiver_pairing, 0x03, key) + if self.receiver_kind == 'bolt': + reply = self.write_register(_R.bolt_pairing, 0x03, key) + else: + reply = self.write_register(_R.receiver_pairing, 0x03, key) if reply: # invalidate the device dev.online = False diff --git a/lib/solaar/ui/pair_window.py b/lib/solaar/ui/pair_window.py index a5994948..bf2b02ba 100644 --- a/lib/solaar/ui/pair_window.py +++ b/lib/solaar/ui/pair_window.py @@ -20,6 +20,7 @@ from logging import DEBUG as _DEBUG from logging import getLogger from gi.repository import GLib, Gtk +from logitech_receiver import hidpp10 as _hidpp10 from logitech_receiver.status import KEYS as _K from solaar.i18n import _, ngettext @@ -34,6 +35,8 @@ del getLogger _PAIRING_TIMEOUT = 30 # seconds _STATUS_CHECK = 500 # milliseconds +address = kind = authentication = name = passcode = None + def _create_page(assistant, kind, header=None, icon_name=None, text=None): p = Gtk.VBox(False, 8) @@ -65,6 +68,8 @@ def _create_page(assistant, kind, header=None, icon_name=None, text=None): def _check_lock_state(assistant, receiver, count=2): + global address, kind, authentication, name, passcode + if not assistant.is_drawable(): if _log.isEnabledFor(_DEBUG): _log.debug('assistant %s destroyed, bailing out', assistant) @@ -79,11 +84,27 @@ def _check_lock_state(assistant, receiver, count=2): device, receiver.status.new_device = receiver.status.new_device, None _pairing_succeeded(assistant, receiver, device) return False + elif receiver.status.device_address and receiver.status.device_name and not address: + address = receiver.status.device_address + name = receiver.status.device_name + kind = receiver.status.device_kind + authentication = receiver.status.device_authentication + name = receiver.status.device_name + if receiver.pair_device( + address=address, authentication=authentication, entropy=20 if kind == _hidpp10.DEVICE_KIND.keyboard else 10 + ): + return True + else: + _pairing_failed(assistant, receiver, 'failed to open pairing lock') + return False + elif address and receiver.status.device_passkey and not passcode: + passcode = receiver.status.device_passkey + _show_passcode(assistant, receiver, passcode) + return True - if not receiver.status.lock_open: + if not receiver.status.lock_open and not receiver.status.discovering: if count > 0: - # the actual device notification may arrive after the lock was paired, - # so have a little patience + # the actual device notification may arrive later so have a little patience GLib.timeout_add(_STATUS_CHECK, _check_lock_state, assistant, receiver, count - 1) else: _pairing_failed(assistant, receiver, 'failed to open pairing lock') @@ -92,13 +113,43 @@ def _check_lock_state(assistant, receiver, count=2): return True +def _show_passcode(assistant, receiver, passkey): + if _log.isEnabledFor(_DEBUG): + _log.debug('%s show passkey: %s', receiver, passkey) + name = receiver.status.device_name + authentication = receiver.status.device_authentication + intro_text = _('%(receiver_name)s: pair new device') % {'receiver_name': receiver.name} + page_text = _('Enter passcode on %(name)s.') % {'name': name} + page_text += '\n' + if authentication & 0x01: + page_text += _('Type %(passcode)s and then press the enter key.') % {'passcode': receiver.status.device_passkey} + else: + passcode = ', '.join([ + _('right') if bit == '1' else _('left') for bit in f'{int(receiver.status.device_passkey):010b}' + ]) + page_text += _('Press %(code)s\nand then press left and right buttons simultaneously.') % {'code': passcode} + page = _create_page(assistant, Gtk.AssistantPageType.PROGRESS, intro_text, 'preferences-desktop-peripherals', page_text) + assistant.set_page_complete(page, True) + assistant.next_page() + + def _prepare(assistant, page, receiver): index = assistant.get_current_page() if _log.isEnabledFor(_DEBUG): _log.debug('prepare %s %d %s', assistant, index, page) if index == 0: - if receiver.set_lock(False, timeout=_PAIRING_TIMEOUT): + if receiver.receiver_kind == 'bolt': + if receiver.discover(timeout=_PAIRING_TIMEOUT): + assert receiver.status.new_device is None + assert receiver.status.get(_K.ERROR) is None + spinner = page.get_children()[-1] + spinner.start() + GLib.timeout_add(_STATUS_CHECK, _check_lock_state, assistant, receiver) + assistant.set_page_complete(page, True) + else: + GLib.idle_add(_pairing_failed, assistant, receiver, 'discovery did not start') + elif receiver.set_lock(False, timeout=_PAIRING_TIMEOUT): assert receiver.status.new_device is None assert receiver.status.get(_K.ERROR) is None spinner = page.get_children()[-1] @@ -117,8 +168,13 @@ def _finish(assistant, receiver): assistant.destroy() receiver.status.new_device = None if receiver.status.lock_open: - receiver.set_lock() - else: + if receiver.receiver_kind == 'bolt': + receiver.pair_device('cancel') + else: + receiver.set_lock() + if receiver.status.discovering: + receiver.discover(True) + if not receiver.status.lock_open and not receiver.status.discovering: receiver.status[_K.ERROR] = None @@ -192,6 +248,9 @@ def create(receiver): assert receiver is not None assert receiver.kind is None + global address, kind, authentication, name, passcode + address = name = kind = authentication = passcode = None + assistant = Gtk.Assistant() assistant.set_title(_('%(receiver_name)s: pair new device') % {'receiver_name': receiver.name}) assistant.set_icon_name('list-add') @@ -200,7 +259,14 @@ def create(receiver): assistant.set_resizable(False) assistant.set_role('pair-device') - page_text = _('If the device is already turned on, turn it off and on again.') + if receiver.receiver_kind == 'bolt': + page_text = _('Press a pairing button or key until the pairing light flashes quickly.') + page_text += '\n' + page_text += _('You may have to first turn the device off and on again.') + else: + page_text = _('Turn on the device you want to pair.') + page_text += '\n' + page_text += _('If the device is already turned on, turn it off and on again.') if receiver.remaining_pairings() and receiver.remaining_pairings() >= 0: page_text += ngettext( '\n\nThis receiver has %d pairing remaining.', '\n\nThis receiver has %d pairings remaining.', @@ -208,9 +274,10 @@ def create(receiver): ) % receiver.remaining_pairings() page_text += _('\nCancelling at this point will not use up a pairing.') + intro_text = _('%(receiver_name)s: pair new device') % {'receiver_name': receiver.name} + page_intro = _create_page( - assistant, Gtk.AssistantPageType.PROGRESS, _('Turn on the device you want to pair.'), - 'preferences-desktop-peripherals', page_text + assistant, Gtk.AssistantPageType.PROGRESS, intro_text, 'preferences-desktop-peripherals', page_text ) spinner = Gtk.Spinner() spinner.set_visible(True)