diff --git a/lib/logitech_receiver/receiver.py b/lib/logitech_receiver/receiver.py index b28bf887..2dc71dc9 100644 --- a/lib/logitech_receiver/receiver.py +++ b/lib/logitech_receiver/receiver.py @@ -18,6 +18,8 @@ import errno as _errno import logging +from typing import Optional + import hidapi as _hid from . import base as _base @@ -31,45 +33,8 @@ _R = hidpp10_constants.REGISTERS _IR = hidpp10_constants.INFO_SUBREGISTERS -class ReceiverFactory: - @staticmethod - def create_receiver(device_info, setting_callback=None): - """Opens a Logitech Receiver found attached to the machine, by Linux device path. - - :returns: An open file handle for the found receiver, or ``None``. - """ - try: - product_info = _base.product_information(device_info.product_id) - if not product_info: - logger.warning("Unknown receiver type: %s", device_info.product_id) - product_info = {} - - handle = _base.open_path(device_info.path) - if handle: - receiver_kind = product_info.get("receiver_kind", "unknown") - if receiver_kind == "bolt": - return BoltReceiver(product_info, handle, device_info.path, device_info.product_id, setting_callback) - elif receiver_kind == "unifying": - return UnifyingReceiver(product_info, handle, device_info.path, device_info.product_id, setting_callback) - elif receiver_kind == "lightspeed": - return LightSpeedReceiver(product_info, handle, device_info.path, device_info.product_id, setting_callback) - elif receiver_kind == "nano": - return NanoReceiver(product_info, handle, device_info.path, device_info.product_id, setting_callback) - elif receiver_kind == "27Mhz": - return Ex100Receiver(product_info, handle, device_info.path, device_info.product_id, setting_callback) - else: - return Receiver(product_info, handle, device_info.path, device_info.product_id, setting_callback) - except OSError as e: - logger.exception("open %s", device_info) - if e.errno == _errno.EACCES: - raise - except Exception: - logger.exception("open %s", device_info) - - class Receiver: - """A Unifying Receiver instance. - + """A generic Receiver instance, mostly implementing the interface used on Unifying, Nano, and LightSpeed receivers" The paired devices are available through the sequence interface. """ @@ -86,36 +51,25 @@ class Receiver: self.receiver_kind = receiver_kind self.serial = None self.max_devices = None - self.may_unpair = None - self._firmware = None - self._devices = {} self._remaining_pairings = None - + self._devices = {} self.name = product_info.get("name", "Receiver") + self.may_unpair = product_info.get("may_unpair", False) self.re_pairs = product_info.get("re_pairs", False) - self._str = "<%s(%s,%s%s)>" % ( - self.name.replace(" ", ""), - self.path, - "" if isinstance(self.handle, int) else "T", - self.handle, - ) - self.initialize(product_info) def initialize(self, product_info: dict): - # read the serial immediately, so we can find out max_devices + # read the receiver information subregister, so we can find out max_devices serial_reply = self.read_register(_R.receiver_info, _IR.receiver_information) if serial_reply: self.serial = serial_reply[1:5].hex().upper() - self.max_devices = ord(serial_reply[6:7]) + self.max_devices = serial_reply[6] if self.max_devices <= 0 or self.max_devices > 6: self.max_devices = product_info.get("max_devices", 1) - self.may_unpair = product_info.get("may_unpair", False) - else: # handle receivers that don't have a serial number specially (i.e., c534 and Bolt receivers) + else: # handle receivers that don't have a serial number specially (i.e., c534) self.serial = None self.max_devices = product_info.get("max_devices", 1) - self.may_unpair = product_info.get("may_unpair", False) def close(self): handle, self.handle = self.handle, None @@ -174,63 +128,52 @@ class Receiver: codename = codename[2 : 2 + ord(codename[1:2])] return codename.decode("ascii") + def notify_devices(self): + """Scan all devices.""" + if self.handle: + if not self.write_register(_R.receiver_connection, 0x02): + logger.warning("%s: failed to trigger device link notifications", self) + + def notification_information(self, number, notification): + """Extract information from unifying-style notification""" + assert notification.address != 0x02 + online = not bool(notification.data[0] & 0x40) + encrypted = bool(notification.data[0] & 0x20) or notification.address == 0x10 + kind = hidpp10_constants.DEVICE_KIND[notification.data[0] & 0x0F] + wpid = (notification.data[2:3] + notification.data[1:2]).hex().upper() + return online, encrypted, wpid, kind + def device_pairing_information(self, n: int) -> dict: """Return information from pairing registers (and elsewhere when necessary)""" polling_rate = "" serial = None power_switch = "(unknown)" pair_info = self.read_register(_R.receiver_info, _IR.pairing_information + n - 1) - if pair_info: # either a Unifying receiver or a Unifying-ready receiver + if pair_info: # a receiver that uses Unifying-style pairing registers wpid = pair_info[3:5].hex().upper() kind = hidpp10_constants.DEVICE_KIND[pair_info[7] & 0x0F] - polling_rate = str(ord(pair_info[2:3])) + "ms" - elif self.receiver_kind == "27Mz": # 27Mhz receiver, extract WPID from udev path - wpid = _hid.find_paired_node_wpid(self.path, n) - if not wpid: - logger.error("Unable to get wpid from udev for device %d of %s", n, self) - raise exceptions.NoSuchDevice(number=n, receiver=self, error="Not present 27Mhz device") - kind = hidpp10_constants.DEVICE_KIND[self.get_kind_from_index(n)] + polling_rate = str(pair_info[2]) + "ms" elif not self.receiver_kind == "unifying": # may be an old Nano receiver - device_info = self.read_register(_R.receiver_info, 0x04) + device_info = self.read_register(_R.receiver_info, 0x04) # undocumented if device_info: + logger.warning("using undocumented register for device wpid") wpid = device_info[3:5].hex().upper() kind = hidpp10_constants.DEVICE_KIND[0x00] # unknown kind else: raise exceptions.NoSuchDevice(number=n, receiver=self, error="read pairing information - non-unifying") else: raise exceptions.NoSuchDevice(number=n, receiver=self, error="read pairing information") - pair_info = self.read_register(_R.receiver_info, _IR.extended_pairing_information + n - 1) if pair_info: power_switch = hidpp10_constants.POWER_SWITCH_LOCATION[pair_info[9] & 0x0F] - else: # some Nano receivers? - pair_info = self.read_register(0x2D5) - if pair_info: serial = pair_info[1:5].hex().upper() + else: # some Nano receivers? + pair_info = self.read_register(0x2D5) # undocumented and questionable + if pair_info: + logger.warning("using undocumented register for device serial number") + serial = pair_info[1:5].hex().upper() return {"wpid": wpid, "kind": kind, "polling": polling_rate, "serial": serial, "power_switch": power_switch} - def get_kind_from_index(self, index): - """Get device kind from 27Mhz device index""" - # From drivers/hid/hid-logitech-dj.c - if index == 1: # mouse - kind = 2 - elif index == 2: # mouse - kind = 2 - elif index == 3: # keyboard - kind = 1 - elif index == 4: # numpad - kind = 3 - else: # unknown device number on 27Mhz receiver - logger.error("failed to calculate device kind for device %d of %s", index, self) - raise exceptions.NoSuchDevice(number=index, receiver=self, error="Unknown 27Mhz device number") - return kind - - def notify_devices(self): - """Scan all devices.""" - if self.handle: - if not self.write_register(_R.receiver_connection, 0x02): - logger.warning("%s: failed to trigger device link notifications", self) - def register_new_device(self, number, notification=None): if self._devices.get(number) is not None: raise IndexError("%s: device number %d already registered" % (self, number)) @@ -241,14 +184,15 @@ class Receiver: try: info = self.device_pairing_information(number) if notification is not None: - online = not bool(ord(notification.data[0:1]) & 0x40) - # the rest may be redundant, but keep it around for now - info["wpid"] = (notification.data[2:3] + notification.data[1:2]).hex().upper() - kind = ord(notification.data[0:1]) & 0x0F - if self.receiver_kind == "27Mhz": # get 27Mhz wpid and set kind based on index - info["wpid"] = "00" + notification.data[2:3].hex().upper() - kind = self.get_kind_from_index(number) - info["kind"] = hidpp10_constants.DEVICE_KIND[kind] + online, _e, nwpid, nkind = self.notification_information(number, notification) + if info["wpid"] is None: + info["wpid"] = nwpid + elif nwpid is not None and info["wpid"] != nwpid: + logger.warning("mismatch on device WPID %s %s", info["wpid"], nwpid) + if info["kind"] is None: + info["kind"] = nkind + elif nkind is not None and info["kind"] != nkind: + logger.warning("mismatch on device kind %s %s", info["kind"], nkind) else: online = True dev = Device(self, number, online, pairing_info=info, setting_callback=self.setting_callback) @@ -270,19 +214,10 @@ class Receiver: return True logger.warning("%s: failed to %s the receiver lock", self, "close" if lock_closed else "open") - def discover(self, cancel=False, timeout=30): - pass - - def pair_device(self, pair=True, slot=0, address=b"\0\0\0\0\0\0", authentication=0x00, entropy=20): - pass - def count(self): count = self.read_register(_R.receiver_connection) return 0 if count is None else ord(count[1:2]) - # def has_devices(self): - # return len(self) > 0 or self.count() > 0 - def request(self, request_id, *params): if bool(self): return _base.request(self.handle, 0xFF, request_id, *params) @@ -378,7 +313,12 @@ class Receiver: return self.path.__hash__() def __str__(self): - return self._str + return "<%s(%s,%s%s)>" % ( + self.name.replace(" ", ""), + self.path, + "" if isinstance(self.handle, int) else "T", + self.handle, + ) __repr__ = __str__ @@ -386,6 +326,8 @@ class Receiver: class BoltReceiver(Receiver): + """Bolt receivers use a different pairing prototol and have different pairing registers""" + def __init__(self, product_info, handle, path, product_id, setting_callback=None): super().__init__("bolt", product_info, handle, path, product_id, setting_callback) @@ -393,7 +335,6 @@ class BoltReceiver(Receiver): serial_reply = self.read_register(_R.bolt_uniqueId) self.serial = serial_reply.hex().upper() self.max_devices = product_info.get("max_devices", 1) - self.may_unpair = product_info.get("may_unpair", False) def device_codename(self, n): codename = self.read_register(_R.receiver_info, _IR.bolt_device_name + n, 0x01) @@ -450,5 +391,76 @@ class LightSpeedReceiver(Receiver): class Ex100Receiver(Receiver): + """A very old style receiver, somewhat different from newer receivers""" + def __init__(self, product_info, handle, path, product_id, setting_callback=None): super().__init__("27Mhz", product_info, handle, path, product_id, setting_callback) + + def initialize(self, product_info: dict): + self.serial = None + self.max_devices = product_info.get("max_devices", 1) + + def notification_information(self, number, notification): + """Extract information from 27Mz-style notification and device index""" + assert notification.address == 0x02 + online = True + encrypted = bool(notification.data[0] & 0x80) + kind = hidpp10_constants.DEVICE_KIND[self.get_kind_from_index(number)] + wpid = "00" + notification.data[2:3].hex().upper() + return online, encrypted, wpid, kind + + def device_pairing_information(self, number: int) -> dict: + wpid = _hid.find_paired_node_wpid(self.path, number) # extract WPID from udev path + if not wpid: + logger.error("Unable to get wpid from udev for device %d of %s", number, self) + raise exceptions.NoSuchDevice(number=number, receiver=self, error="Not present 27Mhz device") + kind = hidpp10_constants.DEVICE_KIND[self.get_kind_from_index(number)] + return {"wpid": wpid, "kind": kind, "polling": "", "serial": None, "power_switch": "(unknown)"} + + def get_kind_from_index(self, index): + """Get device kind from 27Mhz device index""" + # From drivers/hid/hid-logitech-dj.c + if index == 1: # mouse + kind = 2 + elif index == 2: # mouse + kind = 2 + elif index == 3: # keyboard + kind = 1 + elif index == 4: # numpad + kind = 3 + else: # unknown device number on 27Mhz receiver + logger.error("failed to calculate device kind for device %d of %s", index, self) + raise exceptions.NoSuchDevice(number=index, receiver=self, error="Unknown 27Mhz device number") + return kind + + +receiver_class_mapping = { + "bolt": BoltReceiver, + "unifying": UnifyingReceiver, + "lightspeed": LightSpeedReceiver, + "nano": NanoReceiver, + "27Mhz": Ex100Receiver, +} + + +class ReceiverFactory: + @staticmethod + def create_receiver(device_info, setting_callback=None) -> Optional[Receiver]: + """Opens a Logitech Receiver found attached to the machine, by Linux device path.""" + + try: + handle = _base.open_path(device_info.path) + if handle: + product_info = _base.product_information(device_info.product_id) + if not product_info: + logger.warning("Unknown receiver type: %s", device_info.product_id) + product_info = {} + receiver_kind = product_info.get("receiver_kind", "unknown") + receiver_class = receiver_class_mapping.get(receiver_kind, Receiver) + return receiver_class(product_info, handle, device_info.path, device_info.product_id, setting_callback) + except OSError as e: + logger.exception("open %s", device_info) + if e.errno == _errno.EACCES: + raise + except Exception: + logger.exception("open %s", device_info)