device: move pairing information gathering to receiver

This commit is contained in:
Peter F. Patel-Schneider 2024-02-21 09:41:46 -05:00
parent 9228fa1da0
commit 4b33c119f6
3 changed files with 104 additions and 193 deletions

View File

@ -33,7 +33,6 @@ from . import hidpp10 as _hidpp10
from . import hidpp10_constants as _hidpp10_constants from . import hidpp10_constants as _hidpp10_constants
from . import hidpp20 as _hidpp20 from . import hidpp20 as _hidpp20
from . import hidpp20_constants as _hidpp20_constants from . import hidpp20_constants as _hidpp20_constants
from .common import strhex as _strhex
from .settings_templates import check_feature_settings as _check_feature_settings from .settings_templates import check_feature_settings as _check_feature_settings
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
@ -43,68 +42,48 @@ _IR = _hidpp10_constants.INFO_SUBREGISTERS
KIND_MAP = {kind: _hidpp10_constants.DEVICE_KIND[str(kind)] for kind in _hidpp20_constants.DEVICE_KIND} KIND_MAP = {kind: _hidpp10_constants.DEVICE_KIND[str(kind)] for kind in _hidpp20_constants.DEVICE_KIND}
#
#
#
class Device: class Device:
instances = [] instances = []
read_register = _hidpp10.read_register read_register = _hidpp10.read_register
write_register = _hidpp10.write_register write_register = _hidpp10.write_register
def __init__( def __init__(self, receiver, number, online, pairing_info=None, handle=None, device_info=None, setting_callback=None):
self, assert receiver or device_info
receiver,
number,
link_notification=None,
path=None,
handle=None,
short=None,
long=None,
product_id=None,
bus_id=None,
setting_callback=None,
):
assert receiver or handle
Device.instances.append(self)
self.isDevice = True # some devices act as receiver so we need a property to distinguish them
self.may_unpair = False
self.receiver = receiver
self.path = path
self.handle = handle
self.product_id = product_id
self.hidpp_short = short
self.hidpp_long = long
self.bluetooth = bus_id == 0x0005 # Bluetooth connections need long messages
self.setting_callback = setting_callback
if receiver: if receiver:
assert number > 0 and number <= 15 # some receivers have devices past their max # of devices assert number > 0 and number <= 15 # some receivers have devices past their max # of devices
self.number = number # will be None at this point for directly connected devices self.number = number # will be None at this point for directly connected devices
self.online = self.descriptor = None self.online = online
self.descriptor = None
self.wpid = None # the Wireless PID is unique per device model self.isDevice = True # some devices act as receiver so we need a property to distinguish them
self._kind = None # mouse, keyboard, etc (see _hidpp10.DEVICE_KIND) self.may_unpair = False
self._codename = None # Unifying peripherals report a codename. self.receiver = receiver
self.handle = handle
self.path = device_info.path if device_info else None
self.product_id = device_info.product_id if device_info else None
self.hidpp_short = device_info.hidpp_short if device_info else None
self.hidpp_long = device_info.hidpp_long if device_info else None
self.bluetooth = device_info.bus_id == 0x0005 if device_info else False # Bluetooth needs long messages
self.setting_callback = setting_callback
self.wpid = pairing_info["wpid"] if pairing_info else None # the Wireless PID is unique per device model
self._kind = pairing_info["kind"] if pairing_info else None # mouse, keyboard, etc (see _hidpp10.DEVICE_KIND)
self._serial = pairing_info["serial"] if pairing_info else None # serial number (an 8-char hex string)
self._polling_rate = pairing_info["polling"] if pairing_info else None
self._power_switch = pairing_info["power_switch"] if pairing_info else None
self._name = None # the full name of the model self._name = None # the full name of the model
self._codename = None # Unifying peripherals report a codename.
self._protocol = None # HID++ protocol version, 1.0 or 2.0 self._protocol = None # HID++ protocol version, 1.0 or 2.0
self._serial = None # serial number (an 8-char hex string) self._unitId = None # unit id (distinguishes within a model - generally the same as serial)
self._unitId = None # unit id (distinguishes within a model - the same as serial)
self._modelId = None # model id (contains identifiers for the transports of the device) self._modelId = None # model id (contains identifiers for the transports of the device)
self._tid_map = None # map from transports to product identifiers self._tid_map = None # map from transports to product identifiers
self._persister = None # persister holds settings self._persister = None # persister holds settings
self._led_effects = self._firmware = self._keys = self._remap_keys = self._gestures = None
self._firmware = self._keys = self._remap_keys = self._gestures = None
self._polling_rate = self._power_switch = self._led_effects = None
self._gestures_lock = _threading.Lock()
self._profiles = self._backlight = self._registers = self._settings = None self._profiles = self._backlight = self._registers = self._settings = None
self._feature_settings_checked = False
self._settings_lock = _threading.Lock()
# See `add_notification_handler` self._feature_settings_checked = False
self._notification_handlers = {} self._gestures_lock = _threading.Lock()
self._settings_lock = _threading.Lock()
self._notification_handlers = {} # See `add_notification_handler`
if not self.path: if not self.path:
self.path = _hid.find_paired_node(receiver.path, number, 1) if receiver else None self.path = _hid.find_paired_node(receiver.path, number, 1) if receiver else None
@ -116,73 +95,42 @@ class Device:
time.sleep(1) time.sleep(1)
self.handle = _base.open_path(self.path) if self.path else None self.handle = _base.open_path(self.path) if self.path else None
except Exception: # give up except Exception: # give up
self.handle = None self.handle = None # should this give up completely?
if receiver: if receiver:
if link_notification is not None: if not self.wpid:
self.online = not bool(ord(link_notification.data[0:1]) & 0x40) raise exceptions.NoSuchDevice(number=number, receiver=receiver, error="no wpid for device connected to receiver")
self.wpid = _strhex(link_notification.data[2:3] + link_notification.data[1:2])
# assert link_notification.address == (0x04 if unifying else 0x03)
kind = ord(link_notification.data[0:1]) & 0x0F
# get 27Mhz wpid and set kind based on index
if receiver.receiver_kind == "27Mhz": # 27 Mhz receiver
self.wpid = "00" + _strhex(link_notification.data[2:3])
kind = receiver.get_kind_from_index(number)
self._kind = _hidpp10_constants.DEVICE_KIND[kind]
elif receiver.receiver_kind == "27Mhz": # 27 Mhz receiver doesn't have pairing registers
self.wpid = _hid.find_paired_node_wpid(receiver.path, number)
if not self.wpid:
logger.error("Unable to get wpid from udev for device %d of %s", number, receiver)
raise exceptions.NoSuchDevice(number=number, receiver=receiver, error="Not present 27Mhz device")
kind = receiver.get_kind_from_index(number)
self._kind = _hidpp10_constants.DEVICE_KIND[kind]
else: # get information from pairing registers
self.online = True
self.update_pairing_information()
self.update_extended_pairing_information()
if not self.wpid and not self._serial: # if neither then the device almost certainly wasn't found
raise exceptions.NoSuchDevice(number=number, receiver=receiver, error="no wpid or serial")
# the wpid is set to None on this object when the device is unpaired
assert self.wpid is not None, "failed to read wpid: device %d of %s" % (number, receiver)
self.descriptor = _descriptors.get_wpid(self.wpid) self.descriptor = _descriptors.get_wpid(self.wpid)
if self.descriptor is None: if self.descriptor is None:
# Last chance to correctly identify the device; many Nano receivers do not support this call. codename = self.receiver.device_codename(self.number) # Last chance to get a descriptor, may fail
codename = self.receiver.device_codename(self.number)
if codename: if codename:
self._codename = codename self._codename = codename
self.descriptor = _descriptors.get_codename(self._codename) self.descriptor = _descriptors.get_codename(self._codename)
else: else:
self.online = None # a direct connected device might not be online (as reported by user) self.descriptor = _descriptors.get_btid(self.product_id) if self.bluetooth else _descriptors.get_usbid(self.product_id)
self.descriptor = (
_descriptors.get_btid(self.product_id) if self.bluetooth else _descriptors.get_usbid(self.product_id)
)
if self.number is None: # for direct-connected devices get 'number' from descriptor protocol else use 0xFF if self.number is None: # for direct-connected devices get 'number' from descriptor protocol else use 0xFF
self.number = 0x00 if self.descriptor and self.descriptor.protocol and self.descriptor.protocol < 2.0 else 0xFF self.number = 0x00 if self.descriptor and self.descriptor.protocol and self.descriptor.protocol < 2.0 else 0xFF
if self.descriptor: if self.descriptor:
self._name = self.descriptor.name self._name = self.descriptor.name
if self.descriptor.protocol:
self._protocol = self.descriptor.protocol
if self._codename is None: if self._codename is None:
self._codename = self.descriptor.codename self._codename = self.descriptor.codename
if self._kind is None: if self._kind is None:
self._kind = self.descriptor.kind self._kind = self.descriptor.kind
self._protocol = self.descriptor.protocol if self.descriptor.protocol else None
if self._protocol is not None: if self._protocol is not None:
self.features = None if self._protocol < 2.0 else _hidpp20.FeaturesArray(self) self.features = None if self._protocol < 2.0 else _hidpp20.FeaturesArray(self)
else: else:
# may be a 2.0 device; if not, it will fix itself later self.features = _hidpp20.FeaturesArray(self) # may be a 2.0 device; if not, it will fix itself later
self.features = _hidpp20.FeaturesArray(self)
def find(self, serial): # find a device by serial number or unit ID Device.instances.append(self)
assert serial, "need serial number or unit ID to find a device"
result = None def find(self, id): # find a device by serial number or unit ID
assert id, "need serial number or unit ID to find a device"
for device in Device.instances: for device in Device.instances:
if device.online and (device.unitId == serial or device.serial == serial): if device.online and (device.unitId == id or device.serial == id):
result = device return device
return result
@property @property
def protocol(self): def protocol(self):
@ -244,31 +192,11 @@ class Device:
self.get_ids() self.get_ids()
return self._tid_map return self._tid_map
def update_pairing_information(self):
if self.receiver and (not self.wpid or self._kind is None or self._polling_rate is None):
wpid, kind, polling_rate = self.receiver.device_pairing_information(self.number)
if not self.wpid:
self.wpid = wpid
if not self._kind:
self._kind = kind
if not self._polling_rate:
self._polling_rate = str(polling_rate) + "ms"
def update_extended_pairing_information(self):
if self.receiver:
serial, power_switch = self.receiver.device_extended_pairing_information(self.number)
if not self._serial:
self._serial = serial
if not self._power_switch:
self._power_switch = power_switch
@property @property
def kind(self): def kind(self):
if not self._kind: if not self._kind and self.online and self.protocol >= 2.0:
self.update_pairing_information() kind = _hidpp20.get_kind(self)
if not self._kind and self.protocol >= 2.0: self._kind = KIND_MAP[kind] if kind else None
kind = _hidpp20.get_kind(self)
self._kind = KIND_MAP[kind] if kind else None
return self._kind or "?" return self._kind or "?"
@property @property
@ -282,8 +210,6 @@ class Device:
@property @property
def serial(self): def serial(self):
if not self._serial:
self.update_extended_pairing_information()
return self._serial or "" return self._serial or ""
@property @property
@ -295,15 +221,11 @@ class Device:
@property @property
def power_switch_location(self): def power_switch_location(self):
if not self._power_switch:
self.update_extended_pairing_information()
return self._power_switch return self._power_switch
@property @property
def polling_rate(self): def polling_rate(self):
if not self._polling_rate: if self.online and self.protocol >= 2.0:
self.update_pairing_information()
if self.protocol >= 2.0:
rate = _hidpp20.get_polling_rate(self) rate = _hidpp20.get_polling_rate(self)
self._polling_rate = rate if rate else self._polling_rate self._polling_rate = rate if rate else self._polling_rate
return self._polling_rate return self._polling_rate
@ -487,7 +409,6 @@ class Device:
def ping(self): def ping(self):
"""Checks if the device is online, returns True of False""" """Checks if the device is online, returns True of False"""
# long = self.bluetooth or self.hidpp_short is False or self._protocol is not None and self._protocol >= 2.0
long = self.hidpp_long is True or ( long = self.hidpp_long is True or (
self.hidpp_long is None and (self.bluetooth or self._protocol is not None and self._protocol >= 2.0) self.hidpp_long is None and (self.bluetooth or self._protocol is not None and self._protocol >= 2.0)
) )
@ -497,6 +418,32 @@ class Device:
self._protocol = protocol self._protocol = protocol
return self.online return self.online
def notify_devices(self): # no need to notify, as there are none
pass
@classmethod
def open(self, device_info, setting_callback=None):
"""Opens a Logitech Device found attached to the machine, by Linux device path.
:returns: An open file handle for the found receiver, or None.
"""
try:
handle = _base.open_path(device_info.path)
if handle:
# a direct connected device might not be online (as reported by user)
return Device(None, None, None, handle=handle, device_info=device_info, setting_callback=setting_callback)
except OSError as e:
logger.exception("open %s", device_info)
if e.errno == _errno.EACCES:
raise
except Exception:
logger.exception("open %s", device_info)
def close(self):
handle, self.handle = self.handle, None
if self in Device.instances:
Device.instances.remove(self)
return handle and _base.close(handle)
def __index__(self): def __index__(self):
return self.number return self.number
@ -525,40 +472,5 @@ class Device:
__repr__ = __str__ __repr__ = __str__
def notify_devices(self): # no need to notify, as there are none
pass
@classmethod
def open(self, device_info, setting_callback=None):
"""Opens a Logitech Device found attached to the machine, by Linux device path.
:returns: An open file handle for the found receiver, or ``None``.
"""
try:
handle = _base.open_path(device_info.path)
if handle:
return Device(
None,
None,
handle=handle,
path=device_info.path,
short=device_info.hidpp_short,
long=device_info.hidpp_long,
product_id=device_info.product_id,
bus_id=device_info.bus_id,
setting_callback=setting_callback,
)
except OSError as e:
logger.exception("open %s", device_info)
if e.errno == _errno.EACCES:
raise
except Exception:
logger.exception("open %s", device_info)
def close(self):
handle, self.handle = self.handle, None
if self in Device.instances:
Device.instances.remove(self)
return handle and _base.close(handle)
def __del__(self): def __del__(self):
self.close() self.close()

View File

@ -150,37 +150,38 @@ class Receiver:
if codename: if codename:
codename = codename[3 : 3 + min(14, ord(codename[2:3]))] codename = codename[3 : 3 + min(14, ord(codename[2:3]))]
return codename.decode("ascii") return codename.decode("ascii")
return else:
codename = self.read_register(_R.receiver_info, _IR.device_name + n - 1) codename = self.read_register(_R.receiver_info, _IR.device_name + n - 1)
if codename: if codename:
codename = codename[2 : 2 + ord(codename[1:2])] codename = codename[2 : 2 + ord(codename[1:2])]
return codename.decode("ascii") return codename.decode("ascii")
def device_pairing_information(self, n): def device_pairing_information(self, n: int) -> dict:
"""Return information from pairing registers (and elsewhere when necessary)"""
if self.receiver_kind == "bolt": if self.receiver_kind == "bolt":
pair_info = self.read_register(_R.receiver_info, _IR.bolt_pairing_information + n) pair_info = self.read_register(_R.receiver_info, _IR.bolt_pairing_information + n)
if pair_info: if pair_info:
wpid = _strhex(pair_info[3:4]) + _strhex(pair_info[2:3]) wpid = _strhex(pair_info[3:4]) + _strhex(pair_info[2:3])
kind = _hidpp10_constants.DEVICE_KIND[ord(pair_info[1:2]) & 0x0F] kind = _hidpp10_constants.DEVICE_KIND[pair_info[1] & 0x0F]
return wpid, kind, 0 serial = _strhex(pair_info[4:8])
return {"wpid": wpid, "kind": kind, "polling": None, "serial": serial, "power_switch": "(unknown)"}
else: else:
raise exceptions.NoSuchDevice(number=n, receiver=self, error="read Bolt wpid") raise exceptions.NoSuchDevice(number=n, receiver=self, error="can't read Bolt pairing register")
wpid = 0 polling_rate = ""
kind = None serial = None
polling_rate = None power_switch = "(unknown)"
pair_info = self.read_register(_R.receiver_info, _IR.pairing_information + n - 1) pair_info = self.read_register(_R.receiver_info, _IR.pairing_information + n - 1)
if pair_info: # may be either a Unifying receiver, or an Unifying-ready receiver if pair_info: # either a Unifying receiver or a Unifying-ready receiver
wpid = _strhex(pair_info[3:5]) wpid = _strhex(pair_info[3:5])
kind = _hidpp10_constants.DEVICE_KIND[ord(pair_info[7:8]) & 0x0F] kind = _hidpp10_constants.DEVICE_KIND[pair_info[7] & 0x0F]
polling_rate = str(ord(pair_info[2:3])) + "ms" polling_rate = str(ord(pair_info[2:3])) + "ms"
elif self.receiver_kind == "27Mz": # 27Mhz receiver, fill extracting WPID from udev path elif self.receiver_kind == "27Mz": # 27Mhz receiver, extract WPID from udev path
wpid = _hid.find_paired_node_wpid(self.path, n) wpid = _hid.find_paired_node_wpid(self.path, n)
if not wpid: if not wpid:
logger.error("Unable to get wpid from udev for device %d of %s", n, self) logger.error("Unable to get wpid from udev for device %d of %s", n, self)
raise exceptions.NoSuchDevice(number=n, receiver=self, error="Not present 27Mhz device") raise exceptions.NoSuchDevice(number=n, receiver=self, error="Not present 27Mhz device")
kind = _hidpp10_constants.DEVICE_KIND[self.get_kind_from_index(n)] kind = _hidpp10_constants.DEVICE_KIND[self.get_kind_from_index(n)]
elif not self.receiver_kind == "unifying": # may be an old Nano receiver
elif not self.receiver_kind == "unifying": # unifying protocol not supported, may be an old Nano receiver
device_info = self.read_register(_R.receiver_info, 0x04) device_info = self.read_register(_R.receiver_info, 0x04)
if device_info: if device_info:
wpid = _strhex(device_info[3:5]) wpid = _strhex(device_info[3:5])
@ -189,32 +190,18 @@ class Receiver:
raise exceptions.NoSuchDevice(number=n, receiver=self, error="read pairing information - non-unifying") raise exceptions.NoSuchDevice(number=n, receiver=self, error="read pairing information - non-unifying")
else: else:
raise exceptions.NoSuchDevice(number=n, receiver=self, error="read pairing information") raise exceptions.NoSuchDevice(number=n, receiver=self, error="read pairing information")
return wpid, kind, polling_rate
def device_extended_pairing_information(self, n):
serial = None
power_switch = "(unknown)"
if self.receiver_kind == "bolt":
pair_info = self.read_register(_R.receiver_info, _IR.bolt_pairing_information + n)
if pair_info:
serial = _strhex(pair_info[4:8])
return serial, power_switch
else:
return "?", power_switch
pair_info = self.read_register(_R.receiver_info, _IR.extended_pairing_information + n - 1) pair_info = self.read_register(_R.receiver_info, _IR.extended_pairing_information + n - 1)
if pair_info: if pair_info:
power_switch = _hidpp10_constants.POWER_SWITCH_LOCATION[ord(pair_info[9:10]) & 0x0F] power_switch = _hidpp10_constants.POWER_SWITCH_LOCATION[pair_info[9] & 0x0F]
else: # some Nano receivers? else: # some Nano receivers?
pair_info = self.read_register(0x2D5) pair_info = self.read_register(0x2D5)
if pair_info: if pair_info:
serial = _strhex(pair_info[1:5]) serial = _strhex(pair_info[1:5])
return serial, power_switch return {"wpid": wpid, "kind": kind, "polling": polling_rate, "serial": serial, "power_switch": power_switch}
def get_kind_from_index(self, index): def get_kind_from_index(self, index):
"""Get device kind from 27Mhz device index""" """Get device kind from 27Mhz device index"""
# accordingly to drivers/hid/hid-logitech-dj.c # From drivers/hid/hid-logitech-dj.c
# index 1 or 2 always mouse, index 3 always the keyboard,
# index 4 is used for an optional separate numpad
if index == 1: # mouse if index == 1: # mouse
kind = 2 kind = 2
elif index == 2: # mouse elif index == 2: # mouse
@ -242,7 +229,19 @@ class Receiver:
assert notification is None or notification.sub_id == 0x41 assert notification is None or notification.sub_id == 0x41
try: try:
dev = Device(self, number, notification, setting_callback=self.setting_callback) info = self.device_pairing_information(number)
if notification is not None:
online = not bool(ord(notification.data[0:1]) & 0x40)
# the rest may be redundant, but keep it around for now
info["wpid"] = _strhex(notification.data[2:3] + notification.data[1:2])
kind = ord(notification.data[0:1]) & 0x0F
if self.receiver_kind == "27Mhz": # get 27Mhz wpid and set kind based on index
info["wpid"] = "00" + _strhex(notification.data[2:3])
kind = self.get_kind_from_index(number)
info["kind"] = _hidpp10_constants.DEVICE_KIND[kind]
else:
online = True
dev = Device(self, number, online, pairing_info=info, setting_callback=self.setting_callback)
if logger.isEnabledFor(logging.INFO): if logger.isEnabledFor(logging.INFO):
logger.info("%s: found new device %d (%s)", self, number, dev.wpid) logger.info("%s: found new device %d (%s)", self, number, dev.wpid)
self._devices[number] = dev self._devices[number] = dev

View File

@ -1,5 +1,5 @@
[tool.ruff] [tool.ruff]
line-length = 127 line-length = 140
target-version = "py37" target-version = "py37"
[tool.ruff.lint] [tool.ruff.lint]