receiver: create subclasses of receiver for different variants

Related #2350
This commit is contained in:
Matthias Hagmann 2024-03-03 21:43:52 +01:00 committed by Peter F. Patel-Schneider
parent a90a367609
commit 4eb5a83326
1 changed files with 119 additions and 63 deletions

View File

@ -46,6 +46,18 @@ class ReceiverFactory:
handle = _base.open_path(device_info.path)
if handle:
receiver_kind = product_info.get("receiver_kind", "unknown")
if receiver_kind == "bolt":
return BoltReceiver(product_info, handle, device_info.path, device_info.product_id, setting_callback)
elif receiver_kind == "unifying":
return UnifyingReceiver(product_info, handle, device_info.path, device_info.product_id, setting_callback)
elif receiver_kind == "lightspeed":
return LightSpeedReceiver(product_info, handle, device_info.path, device_info.product_id, setting_callback)
elif receiver_kind == "nano":
return NanoReceiver(product_info, handle, device_info.path, device_info.product_id, setting_callback)
elif receiver_kind == "27Mhz":
return Ex100Receiver(product_info, handle, device_info.path, device_info.product_id, setting_callback)
else:
return Receiver(product_info, handle, device_info.path, device_info.product_id, setting_callback)
except OSError as e:
logger.exception("open %s", device_info)
@ -64,22 +76,35 @@ class Receiver:
number = 0xFF
kind = None
def __init__(self, product_info, handle, path, product_id, setting_callback=None):
def __init__(self, receiver_kind, product_info, handle, path, product_id, setting_callback=None):
assert handle
self.isDevice = False # some devices act as receiver so we need a property to distinguish them
self.handle = handle
self.path = path
self.product_id = product_id
self.setting_callback = setting_callback
self.receiver_kind = product_info.get("receiver_kind", "unknown")
self.receiver_kind = receiver_kind
self.serial = None
self.max_devices = None
self.may_unpair = None
self._firmware = None
self._devices = {}
self._remaining_pairings = None
self.name = product_info.get("name", "Receiver")
self.re_pairs = product_info.get("re_pairs", False)
self._str = "<%s(%s,%s%s)>" % (
self.name.replace(" ", ""),
self.path,
"" if isinstance(self.handle, int) else "T",
self.handle,
)
self.initialize(product_info)
def initialize(self, product_info: dict):
# read the serial immediately, so we can find out max_devices
if self.receiver_kind == "bolt":
serial_reply = self.read_register(_R.bolt_uniqueId)
self.serial = serial_reply.hex().upper()
self.max_devices = product_info.get("max_devices", 1)
self.may_unpair = product_info.get("may_unpair", False)
else:
serial_reply = self.read_register(_R.receiver_info, _IR.receiver_information)
if serial_reply:
self.serial = serial_reply[1:5].hex().upper()
@ -92,19 +117,6 @@ class Receiver:
self.max_devices = product_info.get("max_devices", 1)
self.may_unpair = product_info.get("may_unpair", False)
self.name = product_info.get("name", "Receiver")
self.re_pairs = product_info.get("re_pairs", False)
self._str = "<%s(%s,%s%s)>" % (
self.name.replace(" ", ""),
self.path,
"" if isinstance(self.handle, int) else "T",
self.handle,
)
self._firmware = None
self._devices = {}
self._remaining_pairings = None
def close(self):
handle, self.handle = self.handle, None
for _n, d in self._devices.items():
@ -157,12 +169,6 @@ class Receiver:
return flag_bits
def device_codename(self, n):
if self.receiver_kind == "bolt":
codename = self.read_register(_R.receiver_info, _IR.bolt_device_name + n, 0x01)
if codename:
codename = codename[3 : 3 + min(14, ord(codename[2:3]))]
return codename.decode("ascii")
else:
codename = self.read_register(_R.receiver_info, _IR.device_name + n - 1)
if codename:
codename = codename[2 : 2 + ord(codename[1:2])]
@ -170,15 +176,6 @@ class Receiver:
def device_pairing_information(self, n: int) -> dict:
"""Return information from pairing registers (and elsewhere when necessary)"""
if self.receiver_kind == "bolt":
pair_info = self.read_register(_R.receiver_info, _IR.bolt_pairing_information + n)
if pair_info:
wpid = (pair_info[3:4] + pair_info[2:3]).hex().upper()
kind = hidpp10_constants.DEVICE_KIND[pair_info[1] & 0x0F]
serial = pair_info[4:8].hex().upper()
return {"wpid": wpid, "kind": kind, "polling": None, "serial": serial, "power_switch": "(unknown)"}
else:
raise exceptions.NoSuchDevice(number=n, receiver=self, error="can't read Bolt pairing register")
polling_rate = ""
serial = None
power_switch = "(unknown)"
@ -202,6 +199,7 @@ class Receiver:
raise exceptions.NoSuchDevice(number=n, receiver=self, error="read pairing information - non-unifying")
else:
raise exceptions.NoSuchDevice(number=n, receiver=self, error="read pairing information")
pair_info = self.read_register(_R.receiver_info, _IR.extended_pairing_information + n - 1)
if pair_info:
power_switch = hidpp10_constants.POWER_SWITCH_LOCATION[pair_info[9] & 0x0F]
@ -272,23 +270,11 @@ class Receiver:
return True
logger.warning("%s: failed to %s the receiver lock", self, "close" if lock_closed else "open")
def discover(self, cancel=False, timeout=30): # Bolt device discovery
assert self.receiver_kind == "bolt"
if self.handle:
action = 0x02 if cancel else 0x01
reply = self.write_register(_R.bolt_device_discovery, timeout, action)
if reply:
return True
logger.warning("%s: failed to %s device discovery", self, "cancel" if cancel else "start")
def discover(self, cancel=False, timeout=30):
pass
def pair_device(self, pair=True, slot=0, address=b"\0\0\0\0\0\0", authentication=0x00, entropy=20): # Bolt pairing
assert self.receiver_kind == "bolt"
if self.handle:
action = 0x01 if pair is True else 0x03 if pair is False else 0x02
reply = self.write_register(_R.bolt_pairing, action, slot, address, authentication, entropy)
if reply:
return True
logger.warning("%s: failed to %s device %s", self, "pair" if pair else "unpair", address)
def pair_device(self, pair=True, slot=0, address=b"\0\0\0\0\0\0", authentication=0x00, entropy=20):
pass
def count(self):
count = self.read_register(_R.receiver_connection)
@ -356,10 +342,7 @@ class Receiver:
del self._devices[key]
logger.warning("%s removed device %s", self, dev)
else:
if self.receiver_kind == "bolt":
reply = self.write_register(_R.bolt_pairing, 0x03, key)
else:
reply = self.write_register(_R.receiver_pairing, 0x03, key)
reply = self._unpair_device_per_receiver(key)
if reply:
# invalidate the device
dev.online = False
@ -372,6 +355,10 @@ class Receiver:
logger.error("%s failed to unpair device %s", self, dev)
raise Exception("failed to unpair device %s: %s" % (dev.name, key))
def _unpair_device_per_receiver(self, key):
"""Receiver specific unpairing."""
return self.write_register(_R.receiver_pairing, 0x03, key)
def __len__(self):
return len([d for d in self._devices.values() if d is not None])
@ -396,3 +383,72 @@ class Receiver:
__repr__ = __str__
__bool__ = __nonzero__ = lambda self: self.handle is not None
class BoltReceiver(Receiver):
def __init__(self, product_info, handle, path, product_id, setting_callback=None):
super().__init__("bolt", product_info, handle, path, product_id, setting_callback)
def initialize(self, product_info: dict):
serial_reply = self.read_register(_R.bolt_uniqueId)
self.serial = serial_reply.hex().upper()
self.max_devices = product_info.get("max_devices", 1)
self.may_unpair = product_info.get("may_unpair", False)
def device_codename(self, n):
codename = self.read_register(_R.receiver_info, _IR.bolt_device_name + n, 0x01)
if codename:
codename = codename[3 : 3 + min(14, ord(codename[2:3]))]
return codename.decode("ascii")
def device_pairing_information(self, n: int) -> dict:
pair_info = self.read_register(_R.receiver_info, _IR.bolt_pairing_information + n)
if pair_info:
wpid = (pair_info[3:4] + pair_info[2:3]).hex().upper()
kind = hidpp10_constants.DEVICE_KIND[pair_info[1] & 0x0F]
serial = pair_info[4:8].hex().upper()
return {"wpid": wpid, "kind": kind, "polling": None, "serial": serial, "power_switch": "(unknown)"}
else:
raise exceptions.NoSuchDevice(number=n, receiver=self, error="can't read Bolt pairing register")
def discover(self, cancel=False, timeout=30):
"""Discover Logitech Bolt devices."""
if self.handle:
action = 0x02 if cancel else 0x01
reply = self.write_register(_R.bolt_device_discovery, timeout, action)
if reply:
return True
logger.warning("%s: failed to %s device discovery", self, "cancel" if cancel else "start")
def pair_device(self, pair=True, slot=0, address=b"\0\0\0\0\0\0", authentication=0x00, entropy=20):
"""Pair a Bolt device."""
if self.handle:
action = 0x01 if pair is True else 0x03 if pair is False else 0x02
reply = self.write_register(_R.bolt_pairing, action, slot, address, authentication, entropy)
if reply:
return True
logger.warning("%s: failed to %s device %s", self, "pair" if pair else "unpair", address)
def _unpair_device_per_receiver(self, key):
"""Receiver specific unpairing."""
return self.write_register(_R.bolt_pairing, 0x03, key)
class UnifyingReceiver(Receiver):
def __init__(self, product_info, handle, path, product_id, setting_callback=None):
super().__init__("unifying", product_info, handle, path, product_id, setting_callback)
class NanoReceiver(Receiver):
def __init__(self, product_info, handle, path, product_id, setting_callback=None):
super().__init__("nano", product_info, handle, path, product_id, setting_callback)
class LightSpeedReceiver(Receiver):
def __init__(self, product_info, handle, path, product_id, setting_callback=None):
super().__init__("lightspeed", product_info, handle, path, product_id, setting_callback)
class Ex100Receiver(Receiver):
def __init__(self, product_info, handle, path, product_id, setting_callback=None):
super().__init__("27Mhz", product_info, handle, path, product_id, setting_callback)