From 4b33c119f6a502f52a216f9878f26e3023aed024 Mon Sep 17 00:00:00 2001 From: "Peter F. Patel-Schneider" Date: Wed, 21 Feb 2024 09:41:46 -0500 Subject: [PATCH] device: move pairing information gathering to receiver --- lib/logitech_receiver/device.py | 226 +++++++++--------------------- lib/logitech_receiver/receiver.py | 69 +++++---- pyproject.toml | 2 +- 3 files changed, 104 insertions(+), 193 deletions(-) diff --git a/lib/logitech_receiver/device.py b/lib/logitech_receiver/device.py index c5ad3745..45454567 100644 --- a/lib/logitech_receiver/device.py +++ b/lib/logitech_receiver/device.py @@ -33,7 +33,6 @@ from . import hidpp10 as _hidpp10 from . import hidpp10_constants as _hidpp10_constants from . import hidpp20 as _hidpp20 from . import hidpp20_constants as _hidpp20_constants -from .common import strhex as _strhex from .settings_templates import check_feature_settings as _check_feature_settings logger = logging.getLogger(__name__) @@ -43,68 +42,48 @@ _IR = _hidpp10_constants.INFO_SUBREGISTERS KIND_MAP = {kind: _hidpp10_constants.DEVICE_KIND[str(kind)] for kind in _hidpp20_constants.DEVICE_KIND} -# -# -# - class Device: instances = [] read_register = _hidpp10.read_register write_register = _hidpp10.write_register - def __init__( - self, - receiver, - number, - link_notification=None, - path=None, - handle=None, - short=None, - long=None, - product_id=None, - bus_id=None, - setting_callback=None, - ): - assert receiver or handle - Device.instances.append(self) - self.isDevice = True # some devices act as receiver so we need a property to distinguish them - self.may_unpair = False - self.receiver = receiver - self.path = path - self.handle = handle - self.product_id = product_id - self.hidpp_short = short - self.hidpp_long = long - self.bluetooth = bus_id == 0x0005 # Bluetooth connections need long messages - self.setting_callback = setting_callback - + def __init__(self, receiver, number, online, pairing_info=None, handle=None, device_info=None, setting_callback=None): + assert receiver or device_info if receiver: assert number > 0 and number <= 15 # some receivers have devices past their max # of devices self.number = number # will be None at this point for directly connected devices - self.online = self.descriptor = None - - self.wpid = None # the Wireless PID is unique per device model - self._kind = None # mouse, keyboard, etc (see _hidpp10.DEVICE_KIND) - self._codename = None # Unifying peripherals report a codename. + self.online = online + self.descriptor = None + self.isDevice = True # some devices act as receiver so we need a property to distinguish them + self.may_unpair = False + self.receiver = receiver + self.handle = handle + self.path = device_info.path if device_info else None + self.product_id = device_info.product_id if device_info else None + self.hidpp_short = device_info.hidpp_short if device_info else None + self.hidpp_long = device_info.hidpp_long if device_info else None + self.bluetooth = device_info.bus_id == 0x0005 if device_info else False # Bluetooth needs long messages + self.setting_callback = setting_callback + self.wpid = pairing_info["wpid"] if pairing_info else None # the Wireless PID is unique per device model + self._kind = pairing_info["kind"] if pairing_info else None # mouse, keyboard, etc (see _hidpp10.DEVICE_KIND) + self._serial = pairing_info["serial"] if pairing_info else None # serial number (an 8-char hex string) + self._polling_rate = pairing_info["polling"] if pairing_info else None + self._power_switch = pairing_info["power_switch"] if pairing_info else None self._name = None # the full name of the model + self._codename = None # Unifying peripherals report a codename. self._protocol = None # HID++ protocol version, 1.0 or 2.0 - self._serial = None # serial number (an 8-char hex string) - self._unitId = None # unit id (distinguishes within a model - the same as serial) + self._unitId = None # unit id (distinguishes within a model - generally the same as serial) self._modelId = None # model id (contains identifiers for the transports of the device) self._tid_map = None # map from transports to product identifiers self._persister = None # persister holds settings - - self._firmware = self._keys = self._remap_keys = self._gestures = None - self._polling_rate = self._power_switch = self._led_effects = None - - self._gestures_lock = _threading.Lock() + self._led_effects = self._firmware = self._keys = self._remap_keys = self._gestures = None self._profiles = self._backlight = self._registers = self._settings = None - self._feature_settings_checked = False - self._settings_lock = _threading.Lock() - # See `add_notification_handler` - self._notification_handlers = {} + self._feature_settings_checked = False + self._gestures_lock = _threading.Lock() + self._settings_lock = _threading.Lock() + self._notification_handlers = {} # See `add_notification_handler` if not self.path: self.path = _hid.find_paired_node(receiver.path, number, 1) if receiver else None @@ -116,73 +95,42 @@ class Device: time.sleep(1) self.handle = _base.open_path(self.path) if self.path else None except Exception: # give up - self.handle = None + self.handle = None # should this give up completely? if receiver: - if link_notification is not None: - self.online = not bool(ord(link_notification.data[0:1]) & 0x40) - self.wpid = _strhex(link_notification.data[2:3] + link_notification.data[1:2]) - # assert link_notification.address == (0x04 if unifying else 0x03) - kind = ord(link_notification.data[0:1]) & 0x0F - # get 27Mhz wpid and set kind based on index - if receiver.receiver_kind == "27Mhz": # 27 Mhz receiver - self.wpid = "00" + _strhex(link_notification.data[2:3]) - kind = receiver.get_kind_from_index(number) - self._kind = _hidpp10_constants.DEVICE_KIND[kind] - elif receiver.receiver_kind == "27Mhz": # 27 Mhz receiver doesn't have pairing registers - self.wpid = _hid.find_paired_node_wpid(receiver.path, number) - if not self.wpid: - logger.error("Unable to get wpid from udev for device %d of %s", number, receiver) - raise exceptions.NoSuchDevice(number=number, receiver=receiver, error="Not present 27Mhz device") - kind = receiver.get_kind_from_index(number) - self._kind = _hidpp10_constants.DEVICE_KIND[kind] - else: # get information from pairing registers - self.online = True - self.update_pairing_information() - self.update_extended_pairing_information() - if not self.wpid and not self._serial: # if neither then the device almost certainly wasn't found - raise exceptions.NoSuchDevice(number=number, receiver=receiver, error="no wpid or serial") - - # the wpid is set to None on this object when the device is unpaired - assert self.wpid is not None, "failed to read wpid: device %d of %s" % (number, receiver) - + if not self.wpid: + raise exceptions.NoSuchDevice(number=number, receiver=receiver, error="no wpid for device connected to receiver") self.descriptor = _descriptors.get_wpid(self.wpid) if self.descriptor is None: - # Last chance to correctly identify the device; many Nano receivers do not support this call. - codename = self.receiver.device_codename(self.number) + codename = self.receiver.device_codename(self.number) # Last chance to get a descriptor, may fail if codename: self._codename = codename self.descriptor = _descriptors.get_codename(self._codename) else: - self.online = None # a direct connected device might not be online (as reported by user) - self.descriptor = ( - _descriptors.get_btid(self.product_id) if self.bluetooth else _descriptors.get_usbid(self.product_id) - ) + self.descriptor = _descriptors.get_btid(self.product_id) if self.bluetooth else _descriptors.get_usbid(self.product_id) if self.number is None: # for direct-connected devices get 'number' from descriptor protocol else use 0xFF self.number = 0x00 if self.descriptor and self.descriptor.protocol and self.descriptor.protocol < 2.0 else 0xFF if self.descriptor: self._name = self.descriptor.name - if self.descriptor.protocol: - self._protocol = self.descriptor.protocol if self._codename is None: self._codename = self.descriptor.codename if self._kind is None: self._kind = self.descriptor.kind + self._protocol = self.descriptor.protocol if self.descriptor.protocol else None if self._protocol is not None: self.features = None if self._protocol < 2.0 else _hidpp20.FeaturesArray(self) else: - # may be a 2.0 device; if not, it will fix itself later - self.features = _hidpp20.FeaturesArray(self) + self.features = _hidpp20.FeaturesArray(self) # may be a 2.0 device; if not, it will fix itself later - def find(self, serial): # find a device by serial number or unit ID - assert serial, "need serial number or unit ID to find a device" - result = None + Device.instances.append(self) + + def find(self, id): # find a device by serial number or unit ID + assert id, "need serial number or unit ID to find a device" for device in Device.instances: - if device.online and (device.unitId == serial or device.serial == serial): - result = device - return result + if device.online and (device.unitId == id or device.serial == id): + return device @property def protocol(self): @@ -244,31 +192,11 @@ class Device: self.get_ids() return self._tid_map - def update_pairing_information(self): - if self.receiver and (not self.wpid or self._kind is None or self._polling_rate is None): - wpid, kind, polling_rate = self.receiver.device_pairing_information(self.number) - if not self.wpid: - self.wpid = wpid - if not self._kind: - self._kind = kind - if not self._polling_rate: - self._polling_rate = str(polling_rate) + "ms" - - def update_extended_pairing_information(self): - if self.receiver: - serial, power_switch = self.receiver.device_extended_pairing_information(self.number) - if not self._serial: - self._serial = serial - if not self._power_switch: - self._power_switch = power_switch - @property def kind(self): - if not self._kind: - self.update_pairing_information() - if not self._kind and self.protocol >= 2.0: - kind = _hidpp20.get_kind(self) - self._kind = KIND_MAP[kind] if kind else None + if not self._kind and self.online and self.protocol >= 2.0: + kind = _hidpp20.get_kind(self) + self._kind = KIND_MAP[kind] if kind else None return self._kind or "?" @property @@ -282,8 +210,6 @@ class Device: @property def serial(self): - if not self._serial: - self.update_extended_pairing_information() return self._serial or "" @property @@ -295,15 +221,11 @@ class Device: @property def power_switch_location(self): - if not self._power_switch: - self.update_extended_pairing_information() return self._power_switch @property def polling_rate(self): - if not self._polling_rate: - self.update_pairing_information() - if self.protocol >= 2.0: + if self.online and self.protocol >= 2.0: rate = _hidpp20.get_polling_rate(self) self._polling_rate = rate if rate else self._polling_rate return self._polling_rate @@ -487,7 +409,6 @@ class Device: def ping(self): """Checks if the device is online, returns True of False""" - # long = self.bluetooth or self.hidpp_short is False or self._protocol is not None and self._protocol >= 2.0 long = self.hidpp_long is True or ( self.hidpp_long is None and (self.bluetooth or self._protocol is not None and self._protocol >= 2.0) ) @@ -497,6 +418,32 @@ class Device: self._protocol = protocol return self.online + def notify_devices(self): # no need to notify, as there are none + pass + + @classmethod + def open(self, device_info, setting_callback=None): + """Opens a Logitech Device found attached to the machine, by Linux device path. + :returns: An open file handle for the found receiver, or None. + """ + try: + handle = _base.open_path(device_info.path) + if handle: + # a direct connected device might not be online (as reported by user) + return Device(None, None, None, handle=handle, device_info=device_info, setting_callback=setting_callback) + except OSError as e: + logger.exception("open %s", device_info) + if e.errno == _errno.EACCES: + raise + except Exception: + logger.exception("open %s", device_info) + + def close(self): + handle, self.handle = self.handle, None + if self in Device.instances: + Device.instances.remove(self) + return handle and _base.close(handle) + def __index__(self): return self.number @@ -525,40 +472,5 @@ class Device: __repr__ = __str__ - def notify_devices(self): # no need to notify, as there are none - pass - - @classmethod - def open(self, device_info, setting_callback=None): - """Opens a Logitech Device found attached to the machine, by Linux device path. - :returns: An open file handle for the found receiver, or ``None``. - """ - try: - handle = _base.open_path(device_info.path) - if handle: - return Device( - None, - None, - handle=handle, - path=device_info.path, - short=device_info.hidpp_short, - long=device_info.hidpp_long, - product_id=device_info.product_id, - bus_id=device_info.bus_id, - setting_callback=setting_callback, - ) - except OSError as e: - logger.exception("open %s", device_info) - if e.errno == _errno.EACCES: - raise - except Exception: - logger.exception("open %s", device_info) - - def close(self): - handle, self.handle = self.handle, None - if self in Device.instances: - Device.instances.remove(self) - return handle and _base.close(handle) - def __del__(self): self.close() diff --git a/lib/logitech_receiver/receiver.py b/lib/logitech_receiver/receiver.py index 9b081932..d3b5920b 100644 --- a/lib/logitech_receiver/receiver.py +++ b/lib/logitech_receiver/receiver.py @@ -150,37 +150,38 @@ class Receiver: if codename: codename = codename[3 : 3 + min(14, ord(codename[2:3]))] return codename.decode("ascii") - return - codename = self.read_register(_R.receiver_info, _IR.device_name + n - 1) - if codename: - codename = codename[2 : 2 + ord(codename[1:2])] - return codename.decode("ascii") + else: + codename = self.read_register(_R.receiver_info, _IR.device_name + n - 1) + if codename: + codename = codename[2 : 2 + ord(codename[1:2])] + return codename.decode("ascii") - def device_pairing_information(self, n): + def device_pairing_information(self, n: int) -> dict: + """Return information from pairing registers (and elsewhere when necessary)""" if self.receiver_kind == "bolt": pair_info = self.read_register(_R.receiver_info, _IR.bolt_pairing_information + n) if pair_info: wpid = _strhex(pair_info[3:4]) + _strhex(pair_info[2:3]) - kind = _hidpp10_constants.DEVICE_KIND[ord(pair_info[1:2]) & 0x0F] - return wpid, kind, 0 + kind = _hidpp10_constants.DEVICE_KIND[pair_info[1] & 0x0F] + serial = _strhex(pair_info[4:8]) + return {"wpid": wpid, "kind": kind, "polling": None, "serial": serial, "power_switch": "(unknown)"} else: - raise exceptions.NoSuchDevice(number=n, receiver=self, error="read Bolt wpid") - wpid = 0 - kind = None - polling_rate = None + raise exceptions.NoSuchDevice(number=n, receiver=self, error="can't read Bolt pairing register") + polling_rate = "" + serial = None + power_switch = "(unknown)" pair_info = self.read_register(_R.receiver_info, _IR.pairing_information + n - 1) - if pair_info: # may be either a Unifying receiver, or an Unifying-ready receiver + if pair_info: # either a Unifying receiver or a Unifying-ready receiver wpid = _strhex(pair_info[3:5]) - kind = _hidpp10_constants.DEVICE_KIND[ord(pair_info[7:8]) & 0x0F] + kind = _hidpp10_constants.DEVICE_KIND[pair_info[7] & 0x0F] polling_rate = str(ord(pair_info[2:3])) + "ms" - elif self.receiver_kind == "27Mz": # 27Mhz receiver, fill extracting WPID from udev path + elif self.receiver_kind == "27Mz": # 27Mhz receiver, extract WPID from udev path wpid = _hid.find_paired_node_wpid(self.path, n) if not wpid: logger.error("Unable to get wpid from udev for device %d of %s", n, self) raise exceptions.NoSuchDevice(number=n, receiver=self, error="Not present 27Mhz device") kind = _hidpp10_constants.DEVICE_KIND[self.get_kind_from_index(n)] - - elif not self.receiver_kind == "unifying": # unifying protocol not supported, may be an old Nano receiver + elif not self.receiver_kind == "unifying": # may be an old Nano receiver device_info = self.read_register(_R.receiver_info, 0x04) if device_info: wpid = _strhex(device_info[3:5]) @@ -189,32 +190,18 @@ class Receiver: raise exceptions.NoSuchDevice(number=n, receiver=self, error="read pairing information - non-unifying") else: raise exceptions.NoSuchDevice(number=n, receiver=self, error="read pairing information") - return wpid, kind, polling_rate - - def device_extended_pairing_information(self, n): - serial = None - power_switch = "(unknown)" - if self.receiver_kind == "bolt": - pair_info = self.read_register(_R.receiver_info, _IR.bolt_pairing_information + n) - if pair_info: - serial = _strhex(pair_info[4:8]) - return serial, power_switch - else: - return "?", power_switch pair_info = self.read_register(_R.receiver_info, _IR.extended_pairing_information + n - 1) if pair_info: - power_switch = _hidpp10_constants.POWER_SWITCH_LOCATION[ord(pair_info[9:10]) & 0x0F] + power_switch = _hidpp10_constants.POWER_SWITCH_LOCATION[pair_info[9] & 0x0F] else: # some Nano receivers? pair_info = self.read_register(0x2D5) if pair_info: serial = _strhex(pair_info[1:5]) - return serial, power_switch + return {"wpid": wpid, "kind": kind, "polling": polling_rate, "serial": serial, "power_switch": power_switch} def get_kind_from_index(self, index): """Get device kind from 27Mhz device index""" - # accordingly to drivers/hid/hid-logitech-dj.c - # index 1 or 2 always mouse, index 3 always the keyboard, - # index 4 is used for an optional separate numpad + # From drivers/hid/hid-logitech-dj.c if index == 1: # mouse kind = 2 elif index == 2: # mouse @@ -242,7 +229,19 @@ class Receiver: assert notification is None or notification.sub_id == 0x41 try: - dev = Device(self, number, notification, setting_callback=self.setting_callback) + info = self.device_pairing_information(number) + if notification is not None: + online = not bool(ord(notification.data[0:1]) & 0x40) + # the rest may be redundant, but keep it around for now + info["wpid"] = _strhex(notification.data[2:3] + notification.data[1:2]) + kind = ord(notification.data[0:1]) & 0x0F + if self.receiver_kind == "27Mhz": # get 27Mhz wpid and set kind based on index + info["wpid"] = "00" + _strhex(notification.data[2:3]) + kind = self.get_kind_from_index(number) + info["kind"] = _hidpp10_constants.DEVICE_KIND[kind] + else: + online = True + dev = Device(self, number, online, pairing_info=info, setting_callback=self.setting_callback) if logger.isEnabledFor(logging.INFO): logger.info("%s: found new device %d (%s)", self, number, dev.wpid) self._devices[number] = dev diff --git a/pyproject.toml b/pyproject.toml index 285ad5ca..c4fa1ea3 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -1,5 +1,5 @@ [tool.ruff] -line-length = 127 +line-length = 140 target-version = "py37" [tool.ruff.lint]