From 4eb5a8332632c85d3fbe0f6bb681e201bc610b01 Mon Sep 17 00:00:00 2001 From: Matthias Hagmann <16444067+MattHag@users.noreply.github.com> Date: Sun, 3 Mar 2024 21:43:52 +0100 Subject: [PATCH] receiver: create subclasses of receiver for different variants Related #2350 --- lib/logitech_receiver/receiver.py | 182 +++++++++++++++++++----------- 1 file changed, 119 insertions(+), 63 deletions(-) diff --git a/lib/logitech_receiver/receiver.py b/lib/logitech_receiver/receiver.py index a0c97c78..b28bf887 100644 --- a/lib/logitech_receiver/receiver.py +++ b/lib/logitech_receiver/receiver.py @@ -46,7 +46,19 @@ class ReceiverFactory: handle = _base.open_path(device_info.path) if handle: - return Receiver(product_info, handle, device_info.path, device_info.product_id, setting_callback) + receiver_kind = product_info.get("receiver_kind", "unknown") + if receiver_kind == "bolt": + return BoltReceiver(product_info, handle, device_info.path, device_info.product_id, setting_callback) + elif receiver_kind == "unifying": + return UnifyingReceiver(product_info, handle, device_info.path, device_info.product_id, setting_callback) + elif receiver_kind == "lightspeed": + return LightSpeedReceiver(product_info, handle, device_info.path, device_info.product_id, setting_callback) + elif receiver_kind == "nano": + return NanoReceiver(product_info, handle, device_info.path, device_info.product_id, setting_callback) + elif receiver_kind == "27Mhz": + return Ex100Receiver(product_info, handle, device_info.path, device_info.product_id, setting_callback) + else: + return Receiver(product_info, handle, device_info.path, device_info.product_id, setting_callback) except OSError as e: logger.exception("open %s", device_info) if e.errno == _errno.EACCES: @@ -64,33 +76,21 @@ class Receiver: number = 0xFF kind = None - def __init__(self, product_info, handle, path, product_id, setting_callback=None): + def __init__(self, receiver_kind, product_info, handle, path, product_id, setting_callback=None): assert handle self.isDevice = False # some devices act as receiver so we need a property to distinguish them self.handle = handle self.path = path self.product_id = product_id self.setting_callback = setting_callback - self.receiver_kind = product_info.get("receiver_kind", "unknown") + self.receiver_kind = receiver_kind + self.serial = None + self.max_devices = None + self.may_unpair = None - # read the serial immediately, so we can find out max_devices - if self.receiver_kind == "bolt": - serial_reply = self.read_register(_R.bolt_uniqueId) - self.serial = serial_reply.hex().upper() - self.max_devices = product_info.get("max_devices", 1) - self.may_unpair = product_info.get("may_unpair", False) - else: - serial_reply = self.read_register(_R.receiver_info, _IR.receiver_information) - if serial_reply: - self.serial = serial_reply[1:5].hex().upper() - self.max_devices = ord(serial_reply[6:7]) - if self.max_devices <= 0 or self.max_devices > 6: - self.max_devices = product_info.get("max_devices", 1) - self.may_unpair = product_info.get("may_unpair", False) - else: # handle receivers that don't have a serial number specially (i.e., c534 and Bolt receivers) - self.serial = None - self.max_devices = product_info.get("max_devices", 1) - self.may_unpair = product_info.get("may_unpair", False) + self._firmware = None + self._devices = {} + self._remaining_pairings = None self.name = product_info.get("name", "Receiver") self.re_pairs = product_info.get("re_pairs", False) @@ -101,9 +101,21 @@ class Receiver: self.handle, ) - self._firmware = None - self._devices = {} - self._remaining_pairings = None + self.initialize(product_info) + + def initialize(self, product_info: dict): + # read the serial immediately, so we can find out max_devices + serial_reply = self.read_register(_R.receiver_info, _IR.receiver_information) + if serial_reply: + self.serial = serial_reply[1:5].hex().upper() + self.max_devices = ord(serial_reply[6:7]) + if self.max_devices <= 0 or self.max_devices > 6: + self.max_devices = product_info.get("max_devices", 1) + self.may_unpair = product_info.get("may_unpair", False) + else: # handle receivers that don't have a serial number specially (i.e., c534 and Bolt receivers) + self.serial = None + self.max_devices = product_info.get("max_devices", 1) + self.may_unpair = product_info.get("may_unpair", False) def close(self): handle, self.handle = self.handle, None @@ -157,28 +169,13 @@ class Receiver: return flag_bits def device_codename(self, n): - if self.receiver_kind == "bolt": - codename = self.read_register(_R.receiver_info, _IR.bolt_device_name + n, 0x01) - if codename: - codename = codename[3 : 3 + min(14, ord(codename[2:3]))] - return codename.decode("ascii") - else: - codename = self.read_register(_R.receiver_info, _IR.device_name + n - 1) - if codename: - codename = codename[2 : 2 + ord(codename[1:2])] - return codename.decode("ascii") + codename = self.read_register(_R.receiver_info, _IR.device_name + n - 1) + if codename: + codename = codename[2 : 2 + ord(codename[1:2])] + return codename.decode("ascii") def device_pairing_information(self, n: int) -> dict: """Return information from pairing registers (and elsewhere when necessary)""" - if self.receiver_kind == "bolt": - pair_info = self.read_register(_R.receiver_info, _IR.bolt_pairing_information + n) - if pair_info: - wpid = (pair_info[3:4] + pair_info[2:3]).hex().upper() - kind = hidpp10_constants.DEVICE_KIND[pair_info[1] & 0x0F] - serial = pair_info[4:8].hex().upper() - return {"wpid": wpid, "kind": kind, "polling": None, "serial": serial, "power_switch": "(unknown)"} - else: - raise exceptions.NoSuchDevice(number=n, receiver=self, error="can't read Bolt pairing register") polling_rate = "" serial = None power_switch = "(unknown)" @@ -202,6 +199,7 @@ class Receiver: raise exceptions.NoSuchDevice(number=n, receiver=self, error="read pairing information - non-unifying") else: raise exceptions.NoSuchDevice(number=n, receiver=self, error="read pairing information") + pair_info = self.read_register(_R.receiver_info, _IR.extended_pairing_information + n - 1) if pair_info: power_switch = hidpp10_constants.POWER_SWITCH_LOCATION[pair_info[9] & 0x0F] @@ -272,23 +270,11 @@ class Receiver: return True logger.warning("%s: failed to %s the receiver lock", self, "close" if lock_closed else "open") - def discover(self, cancel=False, timeout=30): # Bolt device discovery - assert self.receiver_kind == "bolt" - if self.handle: - action = 0x02 if cancel else 0x01 - reply = self.write_register(_R.bolt_device_discovery, timeout, action) - if reply: - return True - logger.warning("%s: failed to %s device discovery", self, "cancel" if cancel else "start") + def discover(self, cancel=False, timeout=30): + pass - def pair_device(self, pair=True, slot=0, address=b"\0\0\0\0\0\0", authentication=0x00, entropy=20): # Bolt pairing - assert self.receiver_kind == "bolt" - if self.handle: - action = 0x01 if pair is True else 0x03 if pair is False else 0x02 - reply = self.write_register(_R.bolt_pairing, action, slot, address, authentication, entropy) - if reply: - return True - logger.warning("%s: failed to %s device %s", self, "pair" if pair else "unpair", address) + def pair_device(self, pair=True, slot=0, address=b"\0\0\0\0\0\0", authentication=0x00, entropy=20): + pass def count(self): count = self.read_register(_R.receiver_connection) @@ -356,10 +342,7 @@ class Receiver: del self._devices[key] logger.warning("%s removed device %s", self, dev) else: - if self.receiver_kind == "bolt": - reply = self.write_register(_R.bolt_pairing, 0x03, key) - else: - reply = self.write_register(_R.receiver_pairing, 0x03, key) + reply = self._unpair_device_per_receiver(key) if reply: # invalidate the device dev.online = False @@ -372,6 +355,10 @@ class Receiver: logger.error("%s failed to unpair device %s", self, dev) raise Exception("failed to unpair device %s: %s" % (dev.name, key)) + def _unpair_device_per_receiver(self, key): + """Receiver specific unpairing.""" + return self.write_register(_R.receiver_pairing, 0x03, key) + def __len__(self): return len([d for d in self._devices.values() if d is not None]) @@ -396,3 +383,72 @@ class Receiver: __repr__ = __str__ __bool__ = __nonzero__ = lambda self: self.handle is not None + + +class BoltReceiver(Receiver): + def __init__(self, product_info, handle, path, product_id, setting_callback=None): + super().__init__("bolt", product_info, handle, path, product_id, setting_callback) + + def initialize(self, product_info: dict): + serial_reply = self.read_register(_R.bolt_uniqueId) + self.serial = serial_reply.hex().upper() + self.max_devices = product_info.get("max_devices", 1) + self.may_unpair = product_info.get("may_unpair", False) + + def device_codename(self, n): + codename = self.read_register(_R.receiver_info, _IR.bolt_device_name + n, 0x01) + if codename: + codename = codename[3 : 3 + min(14, ord(codename[2:3]))] + return codename.decode("ascii") + + def device_pairing_information(self, n: int) -> dict: + pair_info = self.read_register(_R.receiver_info, _IR.bolt_pairing_information + n) + if pair_info: + wpid = (pair_info[3:4] + pair_info[2:3]).hex().upper() + kind = hidpp10_constants.DEVICE_KIND[pair_info[1] & 0x0F] + serial = pair_info[4:8].hex().upper() + return {"wpid": wpid, "kind": kind, "polling": None, "serial": serial, "power_switch": "(unknown)"} + else: + raise exceptions.NoSuchDevice(number=n, receiver=self, error="can't read Bolt pairing register") + + def discover(self, cancel=False, timeout=30): + """Discover Logitech Bolt devices.""" + if self.handle: + action = 0x02 if cancel else 0x01 + reply = self.write_register(_R.bolt_device_discovery, timeout, action) + if reply: + return True + logger.warning("%s: failed to %s device discovery", self, "cancel" if cancel else "start") + + def pair_device(self, pair=True, slot=0, address=b"\0\0\0\0\0\0", authentication=0x00, entropy=20): + """Pair a Bolt device.""" + if self.handle: + action = 0x01 if pair is True else 0x03 if pair is False else 0x02 + reply = self.write_register(_R.bolt_pairing, action, slot, address, authentication, entropy) + if reply: + return True + logger.warning("%s: failed to %s device %s", self, "pair" if pair else "unpair", address) + + def _unpair_device_per_receiver(self, key): + """Receiver specific unpairing.""" + return self.write_register(_R.bolt_pairing, 0x03, key) + + +class UnifyingReceiver(Receiver): + def __init__(self, product_info, handle, path, product_id, setting_callback=None): + super().__init__("unifying", product_info, handle, path, product_id, setting_callback) + + +class NanoReceiver(Receiver): + def __init__(self, product_info, handle, path, product_id, setting_callback=None): + super().__init__("nano", product_info, handle, path, product_id, setting_callback) + + +class LightSpeedReceiver(Receiver): + def __init__(self, product_info, handle, path, product_id, setting_callback=None): + super().__init__("lightspeed", product_info, handle, path, product_id, setting_callback) + + +class Ex100Receiver(Receiver): + def __init__(self, product_info, handle, path, product_id, setting_callback=None): + super().__init__("27Mhz", product_info, handle, path, product_id, setting_callback)