receiver: move more method code to subclasses

This commit is contained in:
Peter F. Patel-Schneider 2024-03-05 14:16:20 -05:00
parent 4eb5a83326
commit 3916c189be
1 changed files with 120 additions and 108 deletions

View File

@ -18,6 +18,8 @@
import errno as _errno import errno as _errno
import logging import logging
from typing import Optional
import hidapi as _hid import hidapi as _hid
from . import base as _base from . import base as _base
@ -31,45 +33,8 @@ _R = hidpp10_constants.REGISTERS
_IR = hidpp10_constants.INFO_SUBREGISTERS _IR = hidpp10_constants.INFO_SUBREGISTERS
class ReceiverFactory:
@staticmethod
def create_receiver(device_info, setting_callback=None):
"""Opens a Logitech Receiver found attached to the machine, by Linux device path.
:returns: An open file handle for the found receiver, or ``None``.
"""
try:
product_info = _base.product_information(device_info.product_id)
if not product_info:
logger.warning("Unknown receiver type: %s", device_info.product_id)
product_info = {}
handle = _base.open_path(device_info.path)
if handle:
receiver_kind = product_info.get("receiver_kind", "unknown")
if receiver_kind == "bolt":
return BoltReceiver(product_info, handle, device_info.path, device_info.product_id, setting_callback)
elif receiver_kind == "unifying":
return UnifyingReceiver(product_info, handle, device_info.path, device_info.product_id, setting_callback)
elif receiver_kind == "lightspeed":
return LightSpeedReceiver(product_info, handle, device_info.path, device_info.product_id, setting_callback)
elif receiver_kind == "nano":
return NanoReceiver(product_info, handle, device_info.path, device_info.product_id, setting_callback)
elif receiver_kind == "27Mhz":
return Ex100Receiver(product_info, handle, device_info.path, device_info.product_id, setting_callback)
else:
return Receiver(product_info, handle, device_info.path, device_info.product_id, setting_callback)
except OSError as e:
logger.exception("open %s", device_info)
if e.errno == _errno.EACCES:
raise
except Exception:
logger.exception("open %s", device_info)
class Receiver: class Receiver:
"""A Unifying Receiver instance. """A generic Receiver instance, mostly implementing the interface used on Unifying, Nano, and LightSpeed receivers"
The paired devices are available through the sequence interface. The paired devices are available through the sequence interface.
""" """
@ -86,36 +51,25 @@ class Receiver:
self.receiver_kind = receiver_kind self.receiver_kind = receiver_kind
self.serial = None self.serial = None
self.max_devices = None self.max_devices = None
self.may_unpair = None
self._firmware = None self._firmware = None
self._devices = {}
self._remaining_pairings = None self._remaining_pairings = None
self._devices = {}
self.name = product_info.get("name", "Receiver") self.name = product_info.get("name", "Receiver")
self.may_unpair = product_info.get("may_unpair", False)
self.re_pairs = product_info.get("re_pairs", False) self.re_pairs = product_info.get("re_pairs", False)
self._str = "<%s(%s,%s%s)>" % (
self.name.replace(" ", ""),
self.path,
"" if isinstance(self.handle, int) else "T",
self.handle,
)
self.initialize(product_info) self.initialize(product_info)
def initialize(self, product_info: dict): def initialize(self, product_info: dict):
# read the serial immediately, so we can find out max_devices # read the receiver information subregister, so we can find out max_devices
serial_reply = self.read_register(_R.receiver_info, _IR.receiver_information) serial_reply = self.read_register(_R.receiver_info, _IR.receiver_information)
if serial_reply: if serial_reply:
self.serial = serial_reply[1:5].hex().upper() self.serial = serial_reply[1:5].hex().upper()
self.max_devices = ord(serial_reply[6:7]) self.max_devices = serial_reply[6]
if self.max_devices <= 0 or self.max_devices > 6: if self.max_devices <= 0 or self.max_devices > 6:
self.max_devices = product_info.get("max_devices", 1) self.max_devices = product_info.get("max_devices", 1)
self.may_unpair = product_info.get("may_unpair", False) else: # handle receivers that don't have a serial number specially (i.e., c534)
else: # handle receivers that don't have a serial number specially (i.e., c534 and Bolt receivers)
self.serial = None self.serial = None
self.max_devices = product_info.get("max_devices", 1) self.max_devices = product_info.get("max_devices", 1)
self.may_unpair = product_info.get("may_unpair", False)
def close(self): def close(self):
handle, self.handle = self.handle, None handle, self.handle = self.handle, None
@ -174,63 +128,52 @@ class Receiver:
codename = codename[2 : 2 + ord(codename[1:2])] codename = codename[2 : 2 + ord(codename[1:2])]
return codename.decode("ascii") return codename.decode("ascii")
def notify_devices(self):
"""Scan all devices."""
if self.handle:
if not self.write_register(_R.receiver_connection, 0x02):
logger.warning("%s: failed to trigger device link notifications", self)
def notification_information(self, number, notification):
"""Extract information from unifying-style notification"""
assert notification.address != 0x02
online = not bool(notification.data[0] & 0x40)
encrypted = bool(notification.data[0] & 0x20) or notification.address == 0x10
kind = hidpp10_constants.DEVICE_KIND[notification.data[0] & 0x0F]
wpid = (notification.data[2:3] + notification.data[1:2]).hex().upper()
return online, encrypted, wpid, kind
def device_pairing_information(self, n: int) -> dict: def device_pairing_information(self, n: int) -> dict:
"""Return information from pairing registers (and elsewhere when necessary)""" """Return information from pairing registers (and elsewhere when necessary)"""
polling_rate = "" polling_rate = ""
serial = None serial = None
power_switch = "(unknown)" power_switch = "(unknown)"
pair_info = self.read_register(_R.receiver_info, _IR.pairing_information + n - 1) pair_info = self.read_register(_R.receiver_info, _IR.pairing_information + n - 1)
if pair_info: # either a Unifying receiver or a Unifying-ready receiver if pair_info: # a receiver that uses Unifying-style pairing registers
wpid = pair_info[3:5].hex().upper() wpid = pair_info[3:5].hex().upper()
kind = hidpp10_constants.DEVICE_KIND[pair_info[7] & 0x0F] kind = hidpp10_constants.DEVICE_KIND[pair_info[7] & 0x0F]
polling_rate = str(ord(pair_info[2:3])) + "ms" polling_rate = str(pair_info[2]) + "ms"
elif self.receiver_kind == "27Mz": # 27Mhz receiver, extract WPID from udev path
wpid = _hid.find_paired_node_wpid(self.path, n)
if not wpid:
logger.error("Unable to get wpid from udev for device %d of %s", n, self)
raise exceptions.NoSuchDevice(number=n, receiver=self, error="Not present 27Mhz device")
kind = hidpp10_constants.DEVICE_KIND[self.get_kind_from_index(n)]
elif not self.receiver_kind == "unifying": # may be an old Nano receiver elif not self.receiver_kind == "unifying": # may be an old Nano receiver
device_info = self.read_register(_R.receiver_info, 0x04) device_info = self.read_register(_R.receiver_info, 0x04) # undocumented
if device_info: if device_info:
logger.warning("using undocumented register for device wpid")
wpid = device_info[3:5].hex().upper() wpid = device_info[3:5].hex().upper()
kind = hidpp10_constants.DEVICE_KIND[0x00] # unknown kind kind = hidpp10_constants.DEVICE_KIND[0x00] # unknown kind
else: else:
raise exceptions.NoSuchDevice(number=n, receiver=self, error="read pairing information - non-unifying") raise exceptions.NoSuchDevice(number=n, receiver=self, error="read pairing information - non-unifying")
else: else:
raise exceptions.NoSuchDevice(number=n, receiver=self, error="read pairing information") raise exceptions.NoSuchDevice(number=n, receiver=self, error="read pairing information")
pair_info = self.read_register(_R.receiver_info, _IR.extended_pairing_information + n - 1) pair_info = self.read_register(_R.receiver_info, _IR.extended_pairing_information + n - 1)
if pair_info: if pair_info:
power_switch = hidpp10_constants.POWER_SWITCH_LOCATION[pair_info[9] & 0x0F] power_switch = hidpp10_constants.POWER_SWITCH_LOCATION[pair_info[9] & 0x0F]
else: # some Nano receivers?
pair_info = self.read_register(0x2D5)
if pair_info:
serial = pair_info[1:5].hex().upper() serial = pair_info[1:5].hex().upper()
else: # some Nano receivers?
pair_info = self.read_register(0x2D5) # undocumented and questionable
if pair_info:
logger.warning("using undocumented register for device serial number")
serial = pair_info[1:5].hex().upper()
return {"wpid": wpid, "kind": kind, "polling": polling_rate, "serial": serial, "power_switch": power_switch} return {"wpid": wpid, "kind": kind, "polling": polling_rate, "serial": serial, "power_switch": power_switch}
def get_kind_from_index(self, index):
"""Get device kind from 27Mhz device index"""
# From drivers/hid/hid-logitech-dj.c
if index == 1: # mouse
kind = 2
elif index == 2: # mouse
kind = 2
elif index == 3: # keyboard
kind = 1
elif index == 4: # numpad
kind = 3
else: # unknown device number on 27Mhz receiver
logger.error("failed to calculate device kind for device %d of %s", index, self)
raise exceptions.NoSuchDevice(number=index, receiver=self, error="Unknown 27Mhz device number")
return kind
def notify_devices(self):
"""Scan all devices."""
if self.handle:
if not self.write_register(_R.receiver_connection, 0x02):
logger.warning("%s: failed to trigger device link notifications", self)
def register_new_device(self, number, notification=None): def register_new_device(self, number, notification=None):
if self._devices.get(number) is not None: if self._devices.get(number) is not None:
raise IndexError("%s: device number %d already registered" % (self, number)) raise IndexError("%s: device number %d already registered" % (self, number))
@ -241,14 +184,15 @@ class Receiver:
try: try:
info = self.device_pairing_information(number) info = self.device_pairing_information(number)
if notification is not None: if notification is not None:
online = not bool(ord(notification.data[0:1]) & 0x40) online, _e, nwpid, nkind = self.notification_information(number, notification)
# the rest may be redundant, but keep it around for now if info["wpid"] is None:
info["wpid"] = (notification.data[2:3] + notification.data[1:2]).hex().upper() info["wpid"] = nwpid
kind = ord(notification.data[0:1]) & 0x0F elif nwpid is not None and info["wpid"] != nwpid:
if self.receiver_kind == "27Mhz": # get 27Mhz wpid and set kind based on index logger.warning("mismatch on device WPID %s %s", info["wpid"], nwpid)
info["wpid"] = "00" + notification.data[2:3].hex().upper() if info["kind"] is None:
kind = self.get_kind_from_index(number) info["kind"] = nkind
info["kind"] = hidpp10_constants.DEVICE_KIND[kind] elif nkind is not None and info["kind"] != nkind:
logger.warning("mismatch on device kind %s %s", info["kind"], nkind)
else: else:
online = True online = True
dev = Device(self, number, online, pairing_info=info, setting_callback=self.setting_callback) dev = Device(self, number, online, pairing_info=info, setting_callback=self.setting_callback)
@ -270,19 +214,10 @@ class Receiver:
return True return True
logger.warning("%s: failed to %s the receiver lock", self, "close" if lock_closed else "open") logger.warning("%s: failed to %s the receiver lock", self, "close" if lock_closed else "open")
def discover(self, cancel=False, timeout=30):
pass
def pair_device(self, pair=True, slot=0, address=b"\0\0\0\0\0\0", authentication=0x00, entropy=20):
pass
def count(self): def count(self):
count = self.read_register(_R.receiver_connection) count = self.read_register(_R.receiver_connection)
return 0 if count is None else ord(count[1:2]) return 0 if count is None else ord(count[1:2])
# def has_devices(self):
# return len(self) > 0 or self.count() > 0
def request(self, request_id, *params): def request(self, request_id, *params):
if bool(self): if bool(self):
return _base.request(self.handle, 0xFF, request_id, *params) return _base.request(self.handle, 0xFF, request_id, *params)
@ -378,7 +313,12 @@ class Receiver:
return self.path.__hash__() return self.path.__hash__()
def __str__(self): def __str__(self):
return self._str return "<%s(%s,%s%s)>" % (
self.name.replace(" ", ""),
self.path,
"" if isinstance(self.handle, int) else "T",
self.handle,
)
__repr__ = __str__ __repr__ = __str__
@ -386,6 +326,8 @@ class Receiver:
class BoltReceiver(Receiver): class BoltReceiver(Receiver):
"""Bolt receivers use a different pairing prototol and have different pairing registers"""
def __init__(self, product_info, handle, path, product_id, setting_callback=None): def __init__(self, product_info, handle, path, product_id, setting_callback=None):
super().__init__("bolt", product_info, handle, path, product_id, setting_callback) super().__init__("bolt", product_info, handle, path, product_id, setting_callback)
@ -393,7 +335,6 @@ class BoltReceiver(Receiver):
serial_reply = self.read_register(_R.bolt_uniqueId) serial_reply = self.read_register(_R.bolt_uniqueId)
self.serial = serial_reply.hex().upper() self.serial = serial_reply.hex().upper()
self.max_devices = product_info.get("max_devices", 1) self.max_devices = product_info.get("max_devices", 1)
self.may_unpair = product_info.get("may_unpair", False)
def device_codename(self, n): def device_codename(self, n):
codename = self.read_register(_R.receiver_info, _IR.bolt_device_name + n, 0x01) codename = self.read_register(_R.receiver_info, _IR.bolt_device_name + n, 0x01)
@ -450,5 +391,76 @@ class LightSpeedReceiver(Receiver):
class Ex100Receiver(Receiver): class Ex100Receiver(Receiver):
"""A very old style receiver, somewhat different from newer receivers"""
def __init__(self, product_info, handle, path, product_id, setting_callback=None): def __init__(self, product_info, handle, path, product_id, setting_callback=None):
super().__init__("27Mhz", product_info, handle, path, product_id, setting_callback) super().__init__("27Mhz", product_info, handle, path, product_id, setting_callback)
def initialize(self, product_info: dict):
self.serial = None
self.max_devices = product_info.get("max_devices", 1)
def notification_information(self, number, notification):
"""Extract information from 27Mz-style notification and device index"""
assert notification.address == 0x02
online = True
encrypted = bool(notification.data[0] & 0x80)
kind = hidpp10_constants.DEVICE_KIND[self.get_kind_from_index(number)]
wpid = "00" + notification.data[2:3].hex().upper()
return online, encrypted, wpid, kind
def device_pairing_information(self, number: int) -> dict:
wpid = _hid.find_paired_node_wpid(self.path, number) # extract WPID from udev path
if not wpid:
logger.error("Unable to get wpid from udev for device %d of %s", number, self)
raise exceptions.NoSuchDevice(number=number, receiver=self, error="Not present 27Mhz device")
kind = hidpp10_constants.DEVICE_KIND[self.get_kind_from_index(number)]
return {"wpid": wpid, "kind": kind, "polling": "", "serial": None, "power_switch": "(unknown)"}
def get_kind_from_index(self, index):
"""Get device kind from 27Mhz device index"""
# From drivers/hid/hid-logitech-dj.c
if index == 1: # mouse
kind = 2
elif index == 2: # mouse
kind = 2
elif index == 3: # keyboard
kind = 1
elif index == 4: # numpad
kind = 3
else: # unknown device number on 27Mhz receiver
logger.error("failed to calculate device kind for device %d of %s", index, self)
raise exceptions.NoSuchDevice(number=index, receiver=self, error="Unknown 27Mhz device number")
return kind
receiver_class_mapping = {
"bolt": BoltReceiver,
"unifying": UnifyingReceiver,
"lightspeed": LightSpeedReceiver,
"nano": NanoReceiver,
"27Mhz": Ex100Receiver,
}
class ReceiverFactory:
@staticmethod
def create_receiver(device_info, setting_callback=None) -> Optional[Receiver]:
"""Opens a Logitech Receiver found attached to the machine, by Linux device path."""
try:
handle = _base.open_path(device_info.path)
if handle:
product_info = _base.product_information(device_info.product_id)
if not product_info:
logger.warning("Unknown receiver type: %s", device_info.product_id)
product_info = {}
receiver_kind = product_info.get("receiver_kind", "unknown")
receiver_class = receiver_class_mapping.get(receiver_kind, Receiver)
return receiver_class(product_info, handle, device_info.path, device_info.product_id, setting_callback)
except OSError as e:
logger.exception("open %s", device_info)
if e.errno == _errno.EACCES:
raise
except Exception:
logger.exception("open %s", device_info)