cli: add pairing for Bolt receivers

This commit is contained in:
Peter F. Patel-Schneider 2021-11-07 19:42:22 -05:00
parent 886df1daaf
commit 885cefb5b6
7 changed files with 150 additions and 29 deletions

View File

@ -453,13 +453,6 @@ def request(handle, devnumber, request_id, *params, no_reply=False, return_error
raise _hidpp20.FeatureCallError(number=devnumber, request=request_id, error=error, params=params)
if reply_data[:2] == request_data[:2]:
if request_id & 0xFE00 == 0x8200:
# long registry r/w should return a long reply
assert report_id == HIDPP_LONG_MESSAGE_ID
elif request_id & 0xFE00 == 0x8000:
# short registry r/w should return a short reply
assert report_id == HIDPP_SHORT_MESSAGE_ID
if devnumber == 0xFF:
if request_id == 0x83B5 or request_id == 0x81F1:
# these replies have to match the first parameter as well

View File

@ -105,6 +105,7 @@ class Device:
self._kind = _hidpp10.DEVICE_KIND[kind]
else:
# Not a notification, force a reading of the wpid
self.online = True
self.update_pairing_information()
# the wpid is necessary to properly identify wireless link on/off
@ -112,7 +113,7 @@ class Device:
# device is unpaired
assert self.wpid is not None, 'failed to read wpid: device %d of %s' % (number, receiver)
self.path = _hid.find_paired_node(receiver.path, number, _base.DEFAULT_TIMEOUT)
self.path = _hid.find_paired_node(receiver.path, number, 1)
try:
self.handle = _hid.open_path(self.path) if self.path else None
except Exception: # maybe the device wasn't set up

View File

@ -111,6 +111,7 @@ ERROR = _NamedInts(
)
PAIRING_ERRORS = _NamedInts(device_timeout=0x01, device_not_supported=0x02, too_many_devices=0x03, sequence_timeout=0x06)
BOLT_PAIRING_ERRORS = _NamedInts(device_timeout=0x01, failed=0x02)
"""Known registers.
Devices usually have a (small) sub-set of these. Some registers are only
applicable to certain device kinds (e.g. smooth_scroll only applies to mice."""
@ -120,6 +121,8 @@ REGISTERS = _NamedInts(
receiver_pairing=0xB2,
devices_activity=0x2B3,
receiver_info=0x2B5,
bolt_device_discovery=0xC0,
bolt_pairing=0x2C1,
# only apply to devices
mouse_button_flags=0x01,
@ -134,6 +137,13 @@ REGISTERS = _NamedInts(
# apply to both
notifications=0x00,
firmware=0xF1,
# notifications
passkey_request_notification=0x4D,
passkey_pressed_notification=0x4E,
device_discovery_notification=0x4F,
discovery_status_notification=0x53,
pairing_status_notification=0x54,
)
# Subregisters for receiver_info register
INFO_SUBREGISTERS = _NamedInts(

View File

@ -1,5 +1,3 @@
# -*- python-mode -*-
## Copyright (C) 2012-2013 Daniel Pavel
##
## This program is free software; you can redistribute it and/or modify
@ -19,6 +17,8 @@
# Handles incoming events from the receiver/devices, updating the related
# status object as appropriate.
import threading as _threading
from logging import DEBUG as _DEBUG
from logging import INFO as _INFO
from logging import getLogger
@ -43,6 +43,8 @@ _F = _hidpp20.FEATURE
#
#
notification_lock = _threading.Lock()
def process(device, notification):
assert device
@ -67,26 +69,72 @@ def _process_receiver_notification(receiver, status, n):
# supposedly only 0x4x notifications arrive for the receiver
assert n.sub_id & 0x40 == 0x40
# pairing lock notification
if n.sub_id == 0x4A:
if n.sub_id == 0x4A: # pairing lock notification
status.lock_open = bool(n.address & 0x01)
reason = (_('pairing lock is open') if status.lock_open else _('pairing lock is closed'))
if _log.isEnabledFor(_INFO):
_log.info('%s: %s', receiver, reason)
status[_K.ERROR] = None
if status.lock_open:
status.new_device = None
pair_error = ord(n.data[:1])
if pair_error:
status[_K.ERROR] = error_string = _hidpp10.PAIRING_ERRORS[pair_error]
status.new_device = None
_log.warn('pairing error %d: %s', pair_error, error_string)
status.changed(reason=reason)
return True
elif n.sub_id == _R.discovery_status_notification: # Bolt pairing
with notification_lock:
status.discovering = n.address == 0x00
status.counter = status.device_address = status.device_authentication = status.device_name = None
status.device_passkey = None
discover_error = ord(n.data[:1])
if discover_error:
status[_K.ERROR] = discover_string = _hidpp10.BOLT_PAIRING_ERRORS[discover_error]
_log.warn('bolt discovering error %d: %s', discover_error, discover_string)
return True
elif n.sub_id == _R.device_discovery_notification: # Bolt pairing
with notification_lock:
counter = n.address + n.data[0] * 256 # notification counter
if status.counter is None:
status.counter = counter
else:
if not status.counter == counter:
return None
if n.data[1] == 0:
status.device_kind = n.data[3]
status.device_address = n.data[6:12]
status.device_authentication = n.data[14]
elif n.data[1] == 1:
status.device_name = n.data[3:3 + n.data[2]].decode('utf-8')
return True
elif n.sub_id == _R.pairing_status_notification: # Bolt pairing
with notification_lock:
status.device_passkey = None
status.lock_open = n.address == 0x00
pair_error = n.data[0]
if status.lock_open:
status.new_device = None
elif n.address == 0x02 and not pair_error:
status.new_device = receiver.register_new_device(n.data[7])
if pair_error:
status[_K.ERROR] = error_string = _hidpp10.BOLT_PAIRING_ERRORS[pair_error]
status.new_device = None
_log.warn('pairing error %d: %s', pair_error, error_string)
return True
elif n.sub_id == _R.passkey_request_notification: # Bolt pairing
with notification_lock:
status.device_passkey = n.data[0:6].decode('utf-8')
return True
elif n.sub_id == _R.passkey_pressed_notification: # Bolt pairing
return True
_log.warn('%s: unhandled notification %s', receiver, n)

View File

@ -158,7 +158,7 @@ class Receiver:
kind = _hidpp10.DEVICE_KIND[ord(pair_info[1:2]) & 0x0F]
return wpid, kind, 0
else:
return '0000', _hidpp10.DEVICE_KIND[0], 0
raise _base.NoSuchDevice(number=n, receiver=self, error='read Bolt wpid')
pair_info = self.read_register(_R.receiver_info, _IR.pairing_information + n - 1)
polling_rate = 0
if pair_info: # may be either a Unifying receiver, or an Unifying-ready receiver
@ -253,6 +253,24 @@ class Receiver:
return True
_log.warn('%s: failed to %s the receiver lock', self, 'close' if lock_closed else 'open')
def discover(self, cancel=False, timeout=30): # Bolt device discovery
assert self.receiver_kind == 'bolt'
if self.handle:
action = 0x02 if cancel else 0x01
reply = self.write_register(_R.bolt_device_discovery, timeout, action)
if reply:
return True
_log.warn('%s: failed to %s device discovery', self, 'cancel' if cancel else 'start')
def pair_device(self, pair=True, slot=0, address=b'\0\0\0\0\0\0', authentication=None, entropy=20): # Bolt pairing
assert self.receiver_kind == 'bolt'
if self.handle:
action = 0x01 if pair else 0x03
reply = self.write_register(_R.bolt_pairing, action, slot, address, authentication, entropy)
if reply:
return True
_log.warn('%s: failed to %s device %s', self, 'pair' if pair else 'unpair', address)
def count(self):
count = self.read_register(_R.receiver_connection)
return 0 if count is None else ord(count[1:2])
@ -313,6 +331,8 @@ class Receiver:
if key in self._devices:
del self._devices[key]
_log.warn('%s removed device %s', self, dev)
elif self.receiver_kind == 'bolt':
reply = self.write_register(_R.bolt_pairing, 0x03, key)
else:
reply = self.write_register(_R.receiver_pairing, 0x03, key)
if reply:

View File

@ -96,6 +96,13 @@ class ReceiverStatus(dict):
# self.updated = 0
self.lock_open = False
self.discovering = False
self.counter = None
self.device_address = None
self.device_authentication = None
self.device_kind = None
self.device_name = None
self.device_passkey = None
self.new_device = None
self[KEYS.ERROR] = None

View File

@ -23,6 +23,8 @@ from logitech_receiver import hidpp10 as _hidpp10
from logitech_receiver import notifications as _notifications
from logitech_receiver import status as _status
_R = _hidpp10.REGISTERS
def run(receivers, args, find_receiver, _ignore):
assert receivers
@ -61,22 +63,62 @@ def run(receivers, args, find_receiver, _ignore):
del receiver[n.devnumber] # get rid of information on device re-paired away
receiver.status.new_device = receiver.register_new_device(n.devnumber, n)
timeout = 20 # seconds
timeout = 30 # seconds
receiver.handle = _HandleWithNotificationHook(receiver.handle)
receiver.set_lock(False, timeout=timeout)
print('Pairing: turn your new device on (timing out in', timeout, 'seconds).')
# the lock-open notification may come slightly later, wait for it a bit
pairing_start = _timestamp()
patience = 5 # seconds
while receiver.status.lock_open or _timestamp() - pairing_start < patience:
n = _base.read(receiver.handle)
if n:
n = _base.make_notification(*n)
if receiver.receiver_kind == 'bolt': # Bolt receivers require authentication to pair a device
receiver.discover(timeout=timeout)
print('Bolt Pairing: long-press the pairing key or button on your device (timing out in', timeout, 'seconds).')
pairing_start = _timestamp()
patience = 5 # the discovering notification may come slightly later, so be patient
while receiver.status.discovering or _timestamp() - pairing_start < patience:
if receiver.status.device_address and receiver.status.device_authentication and receiver.status.device_name:
break
n = _base.read(receiver.handle)
n = _base.make_notification(*n) if n else None
if n:
receiver.handle.notifications_hook(n)
address = receiver.status.device_address
name = receiver.status.device_name
authentication = receiver.status.device_authentication
kind = receiver.status.device_kind
print(f'Bolt Pairing: discovered {name}')
receiver.pair_device(
address=address, authentication=authentication, entropy=20 if kind == _hidpp10.DEVICE_KIND.keyboard else 10
)
pairing_start = _timestamp()
patience = 5 # the discovering notification may come slightly later, so be patient
while receiver.status.lock_open or _timestamp() - pairing_start < patience:
if receiver.status.device_passkey:
break
n = _base.read(receiver.handle)
n = _base.make_notification(*n) if n else None
if n:
receiver.handle.notifications_hook(n)
if authentication & 0x01:
print(f'Bolt Pairing: type passkey {receiver.status.device_passkey} and then press the enter key')
else:
passkey = f'{int(receiver.status.device_passkey):010b}'
passkey = ', '.join(['right' if bit == '1' else 'left' for bit in passkey])
print(f'Bolt Pairing: press {passkey}')
print('and then press left and right buttons simultaneously')
while receiver.status.lock_open:
n = _base.read(receiver.handle)
n = _base.make_notification(*n) if n else None
if n:
receiver.handle.notifications_hook(n)
else:
receiver.set_lock(False, timeout=timeout)
print('Pairing: turn your new device on (timing out in', timeout, 'seconds).')
pairing_start = _timestamp()
patience = 5 # the lock-open notification may come slightly later, wait for it a bit
while receiver.status.lock_open or _timestamp() - pairing_start < patience:
n = _base.read(receiver.handle)
if n:
n = _base.make_notification(*n)
if n:
receiver.handle.notifications_hook(n)
if not (old_notification_flags & _hidpp10.NOTIFICATION_FLAG.wireless):
# only clear the flags if they weren't set before, otherwise a
@ -91,4 +133,4 @@ def run(receivers, args, find_receiver, _ignore):
if error:
raise Exception('pairing failed: %s' % error)
else:
print('Paired a device') # this is better than an error
print('Paired device') # this is better than an error