diff --git a/lib/logitech_receiver/base.py b/lib/logitech_receiver/base.py index 7c0c25dd..56cb5335 100644 --- a/lib/logitech_receiver/base.py +++ b/lib/logitech_receiver/base.py @@ -453,13 +453,6 @@ def request(handle, devnumber, request_id, *params, no_reply=False, return_error raise _hidpp20.FeatureCallError(number=devnumber, request=request_id, error=error, params=params) if reply_data[:2] == request_data[:2]: - if request_id & 0xFE00 == 0x8200: - # long registry r/w should return a long reply - assert report_id == HIDPP_LONG_MESSAGE_ID - elif request_id & 0xFE00 == 0x8000: - # short registry r/w should return a short reply - assert report_id == HIDPP_SHORT_MESSAGE_ID - if devnumber == 0xFF: if request_id == 0x83B5 or request_id == 0x81F1: # these replies have to match the first parameter as well diff --git a/lib/logitech_receiver/device.py b/lib/logitech_receiver/device.py index e27541c8..ff155907 100644 --- a/lib/logitech_receiver/device.py +++ b/lib/logitech_receiver/device.py @@ -105,6 +105,7 @@ class Device: self._kind = _hidpp10.DEVICE_KIND[kind] else: # Not a notification, force a reading of the wpid + self.online = True self.update_pairing_information() # the wpid is necessary to properly identify wireless link on/off @@ -112,7 +113,7 @@ class Device: # device is unpaired assert self.wpid is not None, 'failed to read wpid: device %d of %s' % (number, receiver) - self.path = _hid.find_paired_node(receiver.path, number, _base.DEFAULT_TIMEOUT) + self.path = _hid.find_paired_node(receiver.path, number, 1) try: self.handle = _hid.open_path(self.path) if self.path else None except Exception: # maybe the device wasn't set up diff --git a/lib/logitech_receiver/hidpp10.py b/lib/logitech_receiver/hidpp10.py index 06ef05f2..5b845792 100644 --- a/lib/logitech_receiver/hidpp10.py +++ b/lib/logitech_receiver/hidpp10.py @@ -111,6 +111,7 @@ ERROR = _NamedInts( ) PAIRING_ERRORS = _NamedInts(device_timeout=0x01, device_not_supported=0x02, too_many_devices=0x03, sequence_timeout=0x06) +BOLT_PAIRING_ERRORS = _NamedInts(device_timeout=0x01, failed=0x02) """Known registers. Devices usually have a (small) sub-set of these. Some registers are only applicable to certain device kinds (e.g. smooth_scroll only applies to mice.""" @@ -120,6 +121,8 @@ REGISTERS = _NamedInts( receiver_pairing=0xB2, devices_activity=0x2B3, receiver_info=0x2B5, + bolt_device_discovery=0xC0, + bolt_pairing=0x2C1, # only apply to devices mouse_button_flags=0x01, @@ -134,6 +137,13 @@ REGISTERS = _NamedInts( # apply to both notifications=0x00, firmware=0xF1, + + # notifications + passkey_request_notification=0x4D, + passkey_pressed_notification=0x4E, + device_discovery_notification=0x4F, + discovery_status_notification=0x53, + pairing_status_notification=0x54, ) # Subregisters for receiver_info register INFO_SUBREGISTERS = _NamedInts( diff --git a/lib/logitech_receiver/notifications.py b/lib/logitech_receiver/notifications.py index 03fbb1dd..4216639f 100644 --- a/lib/logitech_receiver/notifications.py +++ b/lib/logitech_receiver/notifications.py @@ -1,5 +1,3 @@ -# -*- python-mode -*- - ## Copyright (C) 2012-2013 Daniel Pavel ## ## This program is free software; you can redistribute it and/or modify @@ -19,6 +17,8 @@ # Handles incoming events from the receiver/devices, updating the related # status object as appropriate. +import threading as _threading + from logging import DEBUG as _DEBUG from logging import INFO as _INFO from logging import getLogger @@ -43,6 +43,8 @@ _F = _hidpp20.FEATURE # # +notification_lock = _threading.Lock() + def process(device, notification): assert device @@ -67,26 +69,72 @@ def _process_receiver_notification(receiver, status, n): # supposedly only 0x4x notifications arrive for the receiver assert n.sub_id & 0x40 == 0x40 - # pairing lock notification - if n.sub_id == 0x4A: + if n.sub_id == 0x4A: # pairing lock notification status.lock_open = bool(n.address & 0x01) reason = (_('pairing lock is open') if status.lock_open else _('pairing lock is closed')) if _log.isEnabledFor(_INFO): _log.info('%s: %s', receiver, reason) - status[_K.ERROR] = None if status.lock_open: status.new_device = None - pair_error = ord(n.data[:1]) if pair_error: status[_K.ERROR] = error_string = _hidpp10.PAIRING_ERRORS[pair_error] status.new_device = None _log.warn('pairing error %d: %s', pair_error, error_string) - status.changed(reason=reason) return True + elif n.sub_id == _R.discovery_status_notification: # Bolt pairing + with notification_lock: + status.discovering = n.address == 0x00 + status.counter = status.device_address = status.device_authentication = status.device_name = None + status.device_passkey = None + discover_error = ord(n.data[:1]) + if discover_error: + status[_K.ERROR] = discover_string = _hidpp10.BOLT_PAIRING_ERRORS[discover_error] + _log.warn('bolt discovering error %d: %s', discover_error, discover_string) + return True + + elif n.sub_id == _R.device_discovery_notification: # Bolt pairing + with notification_lock: + counter = n.address + n.data[0] * 256 # notification counter + if status.counter is None: + status.counter = counter + else: + if not status.counter == counter: + return None + if n.data[1] == 0: + status.device_kind = n.data[3] + status.device_address = n.data[6:12] + status.device_authentication = n.data[14] + elif n.data[1] == 1: + status.device_name = n.data[3:3 + n.data[2]].decode('utf-8') + return True + + elif n.sub_id == _R.pairing_status_notification: # Bolt pairing + with notification_lock: + status.device_passkey = None + status.lock_open = n.address == 0x00 + pair_error = n.data[0] + if status.lock_open: + status.new_device = None + elif n.address == 0x02 and not pair_error: + status.new_device = receiver.register_new_device(n.data[7]) + if pair_error: + status[_K.ERROR] = error_string = _hidpp10.BOLT_PAIRING_ERRORS[pair_error] + status.new_device = None + _log.warn('pairing error %d: %s', pair_error, error_string) + return True + + elif n.sub_id == _R.passkey_request_notification: # Bolt pairing + with notification_lock: + status.device_passkey = n.data[0:6].decode('utf-8') + return True + + elif n.sub_id == _R.passkey_pressed_notification: # Bolt pairing + return True + _log.warn('%s: unhandled notification %s', receiver, n) diff --git a/lib/logitech_receiver/receiver.py b/lib/logitech_receiver/receiver.py index 930f6130..812160ab 100644 --- a/lib/logitech_receiver/receiver.py +++ b/lib/logitech_receiver/receiver.py @@ -158,7 +158,7 @@ class Receiver: kind = _hidpp10.DEVICE_KIND[ord(pair_info[1:2]) & 0x0F] return wpid, kind, 0 else: - return '0000', _hidpp10.DEVICE_KIND[0], 0 + raise _base.NoSuchDevice(number=n, receiver=self, error='read Bolt wpid') pair_info = self.read_register(_R.receiver_info, _IR.pairing_information + n - 1) polling_rate = 0 if pair_info: # may be either a Unifying receiver, or an Unifying-ready receiver @@ -253,6 +253,24 @@ class Receiver: return True _log.warn('%s: failed to %s the receiver lock', self, 'close' if lock_closed else 'open') + def discover(self, cancel=False, timeout=30): # Bolt device discovery + assert self.receiver_kind == 'bolt' + if self.handle: + action = 0x02 if cancel else 0x01 + reply = self.write_register(_R.bolt_device_discovery, timeout, action) + if reply: + return True + _log.warn('%s: failed to %s device discovery', self, 'cancel' if cancel else 'start') + + def pair_device(self, pair=True, slot=0, address=b'\0\0\0\0\0\0', authentication=None, entropy=20): # Bolt pairing + assert self.receiver_kind == 'bolt' + if self.handle: + action = 0x01 if pair else 0x03 + reply = self.write_register(_R.bolt_pairing, action, slot, address, authentication, entropy) + if reply: + return True + _log.warn('%s: failed to %s device %s', self, 'pair' if pair else 'unpair', address) + def count(self): count = self.read_register(_R.receiver_connection) return 0 if count is None else ord(count[1:2]) @@ -313,6 +331,8 @@ class Receiver: if key in self._devices: del self._devices[key] _log.warn('%s removed device %s', self, dev) + elif self.receiver_kind == 'bolt': + reply = self.write_register(_R.bolt_pairing, 0x03, key) else: reply = self.write_register(_R.receiver_pairing, 0x03, key) if reply: diff --git a/lib/logitech_receiver/status.py b/lib/logitech_receiver/status.py index 7b8a2786..63137caa 100644 --- a/lib/logitech_receiver/status.py +++ b/lib/logitech_receiver/status.py @@ -96,6 +96,13 @@ class ReceiverStatus(dict): # self.updated = 0 self.lock_open = False + self.discovering = False + self.counter = None + self.device_address = None + self.device_authentication = None + self.device_kind = None + self.device_name = None + self.device_passkey = None self.new_device = None self[KEYS.ERROR] = None diff --git a/lib/solaar/cli/pair.py b/lib/solaar/cli/pair.py index 2c632c48..307808bf 100644 --- a/lib/solaar/cli/pair.py +++ b/lib/solaar/cli/pair.py @@ -23,6 +23,8 @@ from logitech_receiver import hidpp10 as _hidpp10 from logitech_receiver import notifications as _notifications from logitech_receiver import status as _status +_R = _hidpp10.REGISTERS + def run(receivers, args, find_receiver, _ignore): assert receivers @@ -61,22 +63,62 @@ def run(receivers, args, find_receiver, _ignore): del receiver[n.devnumber] # get rid of information on device re-paired away receiver.status.new_device = receiver.register_new_device(n.devnumber, n) - timeout = 20 # seconds + timeout = 30 # seconds receiver.handle = _HandleWithNotificationHook(receiver.handle) - receiver.set_lock(False, timeout=timeout) - print('Pairing: turn your new device on (timing out in', timeout, 'seconds).') - - # the lock-open notification may come slightly later, wait for it a bit - pairing_start = _timestamp() - patience = 5 # seconds - - while receiver.status.lock_open or _timestamp() - pairing_start < patience: - n = _base.read(receiver.handle) - if n: - n = _base.make_notification(*n) + if receiver.receiver_kind == 'bolt': # Bolt receivers require authentication to pair a device + receiver.discover(timeout=timeout) + print('Bolt Pairing: long-press the pairing key or button on your device (timing out in', timeout, 'seconds).') + pairing_start = _timestamp() + patience = 5 # the discovering notification may come slightly later, so be patient + while receiver.status.discovering or _timestamp() - pairing_start < patience: + if receiver.status.device_address and receiver.status.device_authentication and receiver.status.device_name: + break + n = _base.read(receiver.handle) + n = _base.make_notification(*n) if n else None if n: receiver.handle.notifications_hook(n) + address = receiver.status.device_address + name = receiver.status.device_name + authentication = receiver.status.device_authentication + kind = receiver.status.device_kind + print(f'Bolt Pairing: discovered {name}') + receiver.pair_device( + address=address, authentication=authentication, entropy=20 if kind == _hidpp10.DEVICE_KIND.keyboard else 10 + ) + pairing_start = _timestamp() + patience = 5 # the discovering notification may come slightly later, so be patient + while receiver.status.lock_open or _timestamp() - pairing_start < patience: + if receiver.status.device_passkey: + break + n = _base.read(receiver.handle) + n = _base.make_notification(*n) if n else None + if n: + receiver.handle.notifications_hook(n) + if authentication & 0x01: + print(f'Bolt Pairing: type passkey {receiver.status.device_passkey} and then press the enter key') + else: + passkey = f'{int(receiver.status.device_passkey):010b}' + passkey = ', '.join(['right' if bit == '1' else 'left' for bit in passkey]) + print(f'Bolt Pairing: press {passkey}') + print('and then press left and right buttons simultaneously') + while receiver.status.lock_open: + n = _base.read(receiver.handle) + n = _base.make_notification(*n) if n else None + if n: + receiver.handle.notifications_hook(n) + + else: + receiver.set_lock(False, timeout=timeout) + print('Pairing: turn your new device on (timing out in', timeout, 'seconds).') + pairing_start = _timestamp() + patience = 5 # the lock-open notification may come slightly later, wait for it a bit + while receiver.status.lock_open or _timestamp() - pairing_start < patience: + n = _base.read(receiver.handle) + if n: + n = _base.make_notification(*n) + if n: + receiver.handle.notifications_hook(n) if not (old_notification_flags & _hidpp10.NOTIFICATION_FLAG.wireless): # only clear the flags if they weren't set before, otherwise a @@ -91,4 +133,4 @@ def run(receivers, args, find_receiver, _ignore): if error: raise Exception('pairing failed: %s' % error) else: - print('Paired a device') # this is better than an error + print('Paired device') # this is better than an error