ui: add pairing for Bolt receivers

This commit is contained in:
Peter F. Patel-Schneider 2021-11-08 21:47:36 -05:00
parent 0aa9f4a22c
commit 603fbe96e0
3 changed files with 96 additions and 15 deletions

View File

@ -88,12 +88,18 @@ def _process_receiver_notification(receiver, status, n):
elif n.sub_id == _R.discovery_status_notification: # Bolt pairing elif n.sub_id == _R.discovery_status_notification: # Bolt pairing
with notification_lock: with notification_lock:
status.discovering = n.address == 0x00 status.discovering = n.address == 0x00
status.counter = status.device_address = status.device_authentication = status.device_name = None reason = (_('discovery lock is open') if status.discovering else _('discovery lock is closed'))
if _log.isEnabledFor(_INFO):
_log.info('%s: %s', receiver, reason)
status[_K.ERROR] = None
if status.discovering:
status.counter = status.device_address = status.device_authentication = status.device_name = None
status.device_passkey = None status.device_passkey = None
discover_error = ord(n.data[:1]) discover_error = ord(n.data[:1])
if discover_error: if discover_error:
status[_K.ERROR] = discover_string = _hidpp10.BOLT_PAIRING_ERRORS[discover_error] status[_K.ERROR] = discover_string = _hidpp10.BOLT_PAIRING_ERRORS[discover_error]
_log.warn('bolt discovering error %d: %s', discover_error, discover_string) _log.warn('bolt discovering error %d: %s', discover_error, discover_string)
status.changed(reason=reason)
return True return True
elif n.sub_id == _R.device_discovery_notification: # Bolt pairing elif n.sub_id == _R.device_discovery_notification: # Bolt pairing
@ -116,6 +122,12 @@ def _process_receiver_notification(receiver, status, n):
with notification_lock: with notification_lock:
status.device_passkey = None status.device_passkey = None
status.lock_open = n.address == 0x00 status.lock_open = n.address == 0x00
reason = (_('pairing lock is open') if status.lock_open else _('pairing lock is closed'))
if _log.isEnabledFor(_INFO):
_log.info('%s: %s', receiver, reason)
status[_K.ERROR] = None
if not status.lock_open:
status.counter = status.device_address = status.device_authentication = status.device_name = None
pair_error = n.data[0] pair_error = n.data[0]
if status.lock_open: if status.lock_open:
status.new_device = None status.new_device = None
@ -125,6 +137,7 @@ def _process_receiver_notification(receiver, status, n):
status[_K.ERROR] = error_string = _hidpp10.BOLT_PAIRING_ERRORS[pair_error] status[_K.ERROR] = error_string = _hidpp10.BOLT_PAIRING_ERRORS[pair_error]
status.new_device = None status.new_device = None
_log.warn('pairing error %d: %s', pair_error, error_string) _log.warn('pairing error %d: %s', pair_error, error_string)
status.changed(reason=reason)
return True return True
elif n.sub_id == _R.passkey_request_notification: # Bolt pairing elif n.sub_id == _R.passkey_request_notification: # Bolt pairing

View File

@ -262,10 +262,10 @@ class Receiver:
return True return True
_log.warn('%s: failed to %s device discovery', self, 'cancel' if cancel else 'start') _log.warn('%s: failed to %s device discovery', self, 'cancel' if cancel else 'start')
def pair_device(self, pair=True, slot=0, address=b'\0\0\0\0\0\0', authentication=None, entropy=20): # Bolt pairing def pair_device(self, pair=True, slot=0, address=b'\0\0\0\0\0\0', authentication=0x00, entropy=20): # Bolt pairing
assert self.receiver_kind == 'bolt' assert self.receiver_kind == 'bolt'
if self.handle: if self.handle:
action = 0x01 if pair else 0x03 action = 0x01 if pair is True else 0x03 if pair is False else 0x02
reply = self.write_register(_R.bolt_pairing, action, slot, address, authentication, entropy) reply = self.write_register(_R.bolt_pairing, action, slot, address, authentication, entropy)
if reply: if reply:
return True return True
@ -331,10 +331,11 @@ class Receiver:
if key in self._devices: if key in self._devices:
del self._devices[key] del self._devices[key]
_log.warn('%s removed device %s', self, dev) _log.warn('%s removed device %s', self, dev)
elif self.receiver_kind == 'bolt':
reply = self.write_register(_R.bolt_pairing, 0x03, key)
else: else:
reply = self.write_register(_R.receiver_pairing, 0x03, key) if self.receiver_kind == 'bolt':
reply = self.write_register(_R.bolt_pairing, 0x03, key)
else:
reply = self.write_register(_R.receiver_pairing, 0x03, key)
if reply: if reply:
# invalidate the device # invalidate the device
dev.online = False dev.online = False

View File

@ -20,6 +20,7 @@ from logging import DEBUG as _DEBUG
from logging import getLogger from logging import getLogger
from gi.repository import GLib, Gtk from gi.repository import GLib, Gtk
from logitech_receiver import hidpp10 as _hidpp10
from logitech_receiver.status import KEYS as _K from logitech_receiver.status import KEYS as _K
from solaar.i18n import _, ngettext from solaar.i18n import _, ngettext
@ -34,6 +35,8 @@ del getLogger
_PAIRING_TIMEOUT = 30 # seconds _PAIRING_TIMEOUT = 30 # seconds
_STATUS_CHECK = 500 # milliseconds _STATUS_CHECK = 500 # milliseconds
address = kind = authentication = name = passcode = None
def _create_page(assistant, kind, header=None, icon_name=None, text=None): def _create_page(assistant, kind, header=None, icon_name=None, text=None):
p = Gtk.VBox(False, 8) p = Gtk.VBox(False, 8)
@ -65,6 +68,8 @@ def _create_page(assistant, kind, header=None, icon_name=None, text=None):
def _check_lock_state(assistant, receiver, count=2): def _check_lock_state(assistant, receiver, count=2):
global address, kind, authentication, name, passcode
if not assistant.is_drawable(): if not assistant.is_drawable():
if _log.isEnabledFor(_DEBUG): if _log.isEnabledFor(_DEBUG):
_log.debug('assistant %s destroyed, bailing out', assistant) _log.debug('assistant %s destroyed, bailing out', assistant)
@ -79,11 +84,27 @@ def _check_lock_state(assistant, receiver, count=2):
device, receiver.status.new_device = receiver.status.new_device, None device, receiver.status.new_device = receiver.status.new_device, None
_pairing_succeeded(assistant, receiver, device) _pairing_succeeded(assistant, receiver, device)
return False return False
elif receiver.status.device_address and receiver.status.device_name and not address:
address = receiver.status.device_address
name = receiver.status.device_name
kind = receiver.status.device_kind
authentication = receiver.status.device_authentication
name = receiver.status.device_name
if receiver.pair_device(
address=address, authentication=authentication, entropy=20 if kind == _hidpp10.DEVICE_KIND.keyboard else 10
):
return True
else:
_pairing_failed(assistant, receiver, 'failed to open pairing lock')
return False
elif address and receiver.status.device_passkey and not passcode:
passcode = receiver.status.device_passkey
_show_passcode(assistant, receiver, passcode)
return True
if not receiver.status.lock_open: if not receiver.status.lock_open and not receiver.status.discovering:
if count > 0: if count > 0:
# the actual device notification may arrive after the lock was paired, # the actual device notification may arrive later so have a little patience
# so have a little patience
GLib.timeout_add(_STATUS_CHECK, _check_lock_state, assistant, receiver, count - 1) GLib.timeout_add(_STATUS_CHECK, _check_lock_state, assistant, receiver, count - 1)
else: else:
_pairing_failed(assistant, receiver, 'failed to open pairing lock') _pairing_failed(assistant, receiver, 'failed to open pairing lock')
@ -92,13 +113,43 @@ def _check_lock_state(assistant, receiver, count=2):
return True return True
def _show_passcode(assistant, receiver, passkey):
if _log.isEnabledFor(_DEBUG):
_log.debug('%s show passkey: %s', receiver, passkey)
name = receiver.status.device_name
authentication = receiver.status.device_authentication
intro_text = _('%(receiver_name)s: pair new device') % {'receiver_name': receiver.name}
page_text = _('Enter passcode on %(name)s.') % {'name': name}
page_text += '\n'
if authentication & 0x01:
page_text += _('Type %(passcode)s and then press the enter key.') % {'passcode': receiver.status.device_passkey}
else:
passcode = ', '.join([
_('right') if bit == '1' else _('left') for bit in f'{int(receiver.status.device_passkey):010b}'
])
page_text += _('Press %(code)s\nand then press left and right buttons simultaneously.') % {'code': passcode}
page = _create_page(assistant, Gtk.AssistantPageType.PROGRESS, intro_text, 'preferences-desktop-peripherals', page_text)
assistant.set_page_complete(page, True)
assistant.next_page()
def _prepare(assistant, page, receiver): def _prepare(assistant, page, receiver):
index = assistant.get_current_page() index = assistant.get_current_page()
if _log.isEnabledFor(_DEBUG): if _log.isEnabledFor(_DEBUG):
_log.debug('prepare %s %d %s', assistant, index, page) _log.debug('prepare %s %d %s', assistant, index, page)
if index == 0: if index == 0:
if receiver.set_lock(False, timeout=_PAIRING_TIMEOUT): if receiver.receiver_kind == 'bolt':
if receiver.discover(timeout=_PAIRING_TIMEOUT):
assert receiver.status.new_device is None
assert receiver.status.get(_K.ERROR) is None
spinner = page.get_children()[-1]
spinner.start()
GLib.timeout_add(_STATUS_CHECK, _check_lock_state, assistant, receiver)
assistant.set_page_complete(page, True)
else:
GLib.idle_add(_pairing_failed, assistant, receiver, 'discovery did not start')
elif receiver.set_lock(False, timeout=_PAIRING_TIMEOUT):
assert receiver.status.new_device is None assert receiver.status.new_device is None
assert receiver.status.get(_K.ERROR) is None assert receiver.status.get(_K.ERROR) is None
spinner = page.get_children()[-1] spinner = page.get_children()[-1]
@ -117,8 +168,13 @@ def _finish(assistant, receiver):
assistant.destroy() assistant.destroy()
receiver.status.new_device = None receiver.status.new_device = None
if receiver.status.lock_open: if receiver.status.lock_open:
receiver.set_lock() if receiver.receiver_kind == 'bolt':
else: receiver.pair_device('cancel')
else:
receiver.set_lock()
if receiver.status.discovering:
receiver.discover(True)
if not receiver.status.lock_open and not receiver.status.discovering:
receiver.status[_K.ERROR] = None receiver.status[_K.ERROR] = None
@ -192,6 +248,9 @@ def create(receiver):
assert receiver is not None assert receiver is not None
assert receiver.kind is None assert receiver.kind is None
global address, kind, authentication, name, passcode
address = name = kind = authentication = passcode = None
assistant = Gtk.Assistant() assistant = Gtk.Assistant()
assistant.set_title(_('%(receiver_name)s: pair new device') % {'receiver_name': receiver.name}) assistant.set_title(_('%(receiver_name)s: pair new device') % {'receiver_name': receiver.name})
assistant.set_icon_name('list-add') assistant.set_icon_name('list-add')
@ -200,7 +259,14 @@ def create(receiver):
assistant.set_resizable(False) assistant.set_resizable(False)
assistant.set_role('pair-device') assistant.set_role('pair-device')
page_text = _('If the device is already turned on, turn it off and on again.') if receiver.receiver_kind == 'bolt':
page_text = _('Press a pairing button or key until the pairing light flashes quickly.')
page_text += '\n'
page_text += _('You may have to first turn the device off and on again.')
else:
page_text = _('Turn on the device you want to pair.')
page_text += '\n'
page_text += _('If the device is already turned on, turn it off and on again.')
if receiver.remaining_pairings() and receiver.remaining_pairings() >= 0: if receiver.remaining_pairings() and receiver.remaining_pairings() >= 0:
page_text += ngettext( page_text += ngettext(
'\n\nThis receiver has %d pairing remaining.', '\n\nThis receiver has %d pairings remaining.', '\n\nThis receiver has %d pairing remaining.', '\n\nThis receiver has %d pairings remaining.',
@ -208,9 +274,10 @@ def create(receiver):
) % receiver.remaining_pairings() ) % receiver.remaining_pairings()
page_text += _('\nCancelling at this point will not use up a pairing.') page_text += _('\nCancelling at this point will not use up a pairing.')
intro_text = _('%(receiver_name)s: pair new device') % {'receiver_name': receiver.name}
page_intro = _create_page( page_intro = _create_page(
assistant, Gtk.AssistantPageType.PROGRESS, _('Turn on the device you want to pair.'), assistant, Gtk.AssistantPageType.PROGRESS, intro_text, 'preferences-desktop-peripherals', page_text
'preferences-desktop-peripherals', page_text
) )
spinner = Gtk.Spinner() spinner = Gtk.Spinner()
spinner.set_visible(True) spinner.set_visible(True)