receiver: Refactor extraction of serial and max. devices

Related #2273
This commit is contained in:
MattHag 2024-12-01 20:22:09 +01:00 committed by Peter F. Patel-Schneider
parent 800d3498f4
commit 5e0c85a6d7
1 changed files with 74 additions and 26 deletions

View File

@ -83,6 +83,53 @@ class Pairing:
error: Optional[any] = None error: Optional[any] = None
def extract_serial(response: bytes) -> str:
"""Extracts serial number from receiver response."""
return response.hex().upper()
def extract_max_devices(response: bytes) -> int:
"""Extracts maximum number of supported devices from response."""
max_devices = response[6]
return int(max_devices)
def extract_remaining_pairings(response: bytes) -> int:
ps = ord(response[2:3])
remaining_pairings = ps - 5 if ps >= 5 else -1
return int(remaining_pairings)
def extract_codename(response: bytes) -> str:
codename = response[2 : 2 + ord(response[1:2])]
return codename.decode("ascii")
def extract_power_switch_location(response: bytes) -> str:
"""Extracts power switch location from response."""
index = response[9] & 0x0F
return hidpp10_constants.PowerSwitchLocation(index).name.lower()
def extract_connection_count(response: bytes) -> int:
"""Extract connection count from receiver response."""
return ord(response[1:2])
def extract_wpid(response: bytes) -> str:
"""Extract wpid from receiver response."""
return response.hex().upper()
def extract_polling_rate(response: bytes) -> int:
"""Returns polling rate in milliseconds."""
return int(response[2])
def extract_device_kind(response: int) -> str:
return hidpp10_constants.DEVICE_KIND[response]
class Receiver: class Receiver:
"""A generic Receiver instance, mostly implementing the interface used on Unifying, Nano, and LightSpeed receivers" """A generic Receiver instance, mostly implementing the interface used on Unifying, Nano, and LightSpeed receivers"
The paired devices are available through the sequence interface. The paired devices are available through the sequence interface.
@ -129,9 +176,9 @@ class Receiver:
# read the receiver information subregister, so we can find out max_devices # read the receiver information subregister, so we can find out max_devices
serial_reply = self.read_register(Registers.RECEIVER_INFO, InfoSubRegisters.RECEIVER_INFORMATION) serial_reply = self.read_register(Registers.RECEIVER_INFO, InfoSubRegisters.RECEIVER_INFORMATION)
if serial_reply: if serial_reply:
self.serial = serial_reply[1:5].hex().upper() self.serial = extract_serial(serial_reply[1:5])
self.max_devices = serial_reply[6] self.max_devices = extract_max_devices(serial_reply)
if self.max_devices <= 0 or self.max_devices > 6: if not (1 <= self.max_devices <= 6):
self.max_devices = product_info.get("max_devices", 1) self.max_devices = product_info.get("max_devices", 1)
else: # handle receivers that don't have a serial number specially (i.e., c534) else: # handle receivers that don't have a serial number specially (i.e., c534)
self.serial = None self.serial = None
@ -164,8 +211,7 @@ class Receiver:
if self._remaining_pairings is None or not cache: if self._remaining_pairings is None or not cache:
ps = self.read_register(Registers.RECEIVER_CONNECTION) ps = self.read_register(Registers.RECEIVER_CONNECTION)
if ps is not None: if ps is not None:
ps = ord(ps[2:3]) self._remaining_pairings = extract_remaining_pairings(ps)
self._remaining_pairings = ps - 5 if ps >= 5 else -1
return self._remaining_pairings return self._remaining_pairings
def enable_connection_notifications(self, enable=True): def enable_connection_notifications(self, enable=True):
@ -195,8 +241,7 @@ class Receiver:
def device_codename(self, n): def device_codename(self, n):
codename = self.read_register(Registers.RECEIVER_INFO, InfoSubRegisters.DEVICE_NAME + n - 1) codename = self.read_register(Registers.RECEIVER_INFO, InfoSubRegisters.DEVICE_NAME + n - 1)
if codename: if codename:
codename = codename[2 : 2 + ord(codename[1:2])] return extract_codename(codename)
return codename.decode("ascii")
def notify_devices(self): def notify_devices(self):
"""Scan all devices.""" """Scan all devices."""
@ -209,8 +254,8 @@ class Receiver:
assert notification.address != 0x02 assert notification.address != 0x02
online = not bool(notification.data[0] & 0x40) online = not bool(notification.data[0] & 0x40)
encrypted = bool(notification.data[0] & 0x20) or notification.address == 0x10 encrypted = bool(notification.data[0] & 0x20) or notification.address == 0x10
kind = hidpp10_constants.DEVICE_KIND[notification.data[0] & 0x0F] kind = extract_device_kind(notification.data[0] & 0x0F)
wpid = (notification.data[2:3] + notification.data[1:2]).hex().upper() wpid = extract_wpid(notification.data[2:3] + notification.data[1:2])
return online, encrypted, wpid, kind return online, encrypted, wpid, kind
def device_pairing_information(self, n: int) -> dict: def device_pairing_information(self, n: int) -> dict:
@ -220,28 +265,29 @@ class Receiver:
power_switch = "(unknown)" power_switch = "(unknown)"
pair_info = self.read_register(Registers.RECEIVER_INFO, InfoSubRegisters.PAIRING_INFORMATION + n - 1) pair_info = self.read_register(Registers.RECEIVER_INFO, InfoSubRegisters.PAIRING_INFORMATION + n - 1)
if pair_info: # a receiver that uses Unifying-style pairing registers if pair_info: # a receiver that uses Unifying-style pairing registers
wpid = pair_info[3:5].hex().upper() wpid = extract_wpid(pair_info[3:5])
kind = hidpp10_constants.DEVICE_KIND[pair_info[7] & 0x0F] kind = extract_device_kind(pair_info[7] & 0x0F)
polling_rate = str(pair_info[2]) + "ms" polling_rate_ms = extract_polling_rate(pair_info)
polling_rate = f"{polling_rate_ms}ms"
elif not self.receiver_kind == "unifying": # may be an old Nano receiver elif not self.receiver_kind == "unifying": # may be an old Nano receiver
device_info = self.read_register(Registers.RECEIVER_INFO, 0x04) # undocumented device_info = self.read_register(Registers.RECEIVER_INFO, 0x04) # undocumented
if device_info: if device_info:
logger.warning("using undocumented register for device wpid") logger.warning("using undocumented register for device wpid")
wpid = device_info[3:5].hex().upper() wpid = extract_wpid(device_info[3:5])
kind = hidpp10_constants.DEVICE_KIND[0x00] # unknown kind kind = extract_device_kind(0x00) # unknown kind
else: else:
raise exceptions.NoSuchDevice(number=n, receiver=self, error="read pairing information - non-unifying") raise exceptions.NoSuchDevice(number=n, receiver=self, error="read pairing information - non-unifying")
else: else:
raise exceptions.NoSuchDevice(number=n, receiver=self, error="read pairing information") raise exceptions.NoSuchDevice(number=n, receiver=self, error="read pairing information")
pair_info = self.read_register(Registers.RECEIVER_INFO, InfoSubRegisters.EXTENDED_PAIRING_INFORMATION + n - 1) pair_info = self.read_register(Registers.RECEIVER_INFO, InfoSubRegisters.EXTENDED_PAIRING_INFORMATION + n - 1)
if pair_info: if pair_info:
power_switch = hidpp10_constants.PowerSwitchLocation(pair_info[9] & 0x0F) power_switch = extract_power_switch_location(pair_info)
serial = pair_info[1:5].hex().upper() serial = extract_serial(pair_info[1:5])
else: # some Nano receivers? else: # some Nano receivers?
pair_info = self.read_register(0x2D5) # undocumented and questionable pair_info = self.read_register(0x2D5) # undocumented and questionable
if pair_info: if pair_info:
logger.warning("using undocumented register for device serial number") logger.warning("using undocumented register for device serial number")
serial = pair_info[1:5].hex().upper() serial = extract_serial(pair_info[1:5])
return {"wpid": wpid, "kind": kind, "polling": polling_rate, "serial": serial, "power_switch": power_switch} return {"wpid": wpid, "kind": kind, "polling": polling_rate, "serial": serial, "power_switch": power_switch}
def register_new_device(self, number, notification=None): def register_new_device(self, number, notification=None):
@ -287,7 +333,9 @@ class Receiver:
def count(self): def count(self):
count = self.read_register(Registers.RECEIVER_CONNECTION) count = self.read_register(Registers.RECEIVER_CONNECTION)
return 0 if count is None else ord(count[1:2]) if count is None:
return 0
return extract_connection_count(count)
def request(self, request_id, *params): def request(self, request_id, *params):
if bool(self): if bool(self):
@ -412,7 +460,7 @@ class BoltReceiver(Receiver):
def initialize(self, product_info: dict): def initialize(self, product_info: dict):
serial_reply = self.read_register(Registers.BOLT_UNIQUE_ID) serial_reply = self.read_register(Registers.BOLT_UNIQUE_ID)
self.serial = serial_reply.hex().upper() self.serial = extract_serial(serial_reply)
self.max_devices = product_info.get("max_devices", 1) self.max_devices = product_info.get("max_devices", 1)
def device_codename(self, n): def device_codename(self, n):
@ -424,9 +472,9 @@ class BoltReceiver(Receiver):
def device_pairing_information(self, n: int) -> dict: def device_pairing_information(self, n: int) -> dict:
pair_info = self.read_register(Registers.RECEIVER_INFO, InfoSubRegisters.BOLT_PAIRING_INFORMATION + n) pair_info = self.read_register(Registers.RECEIVER_INFO, InfoSubRegisters.BOLT_PAIRING_INFORMATION + n)
if pair_info: if pair_info:
wpid = (pair_info[3:4] + pair_info[2:3]).hex().upper() wpid = extract_wpid(pair_info[3:4] + pair_info[2:3])
kind = hidpp10_constants.DEVICE_KIND[pair_info[1] & 0x0F] kind = extract_device_kind(pair_info[1] & 0x0F)
serial = pair_info[4:8].hex().upper() serial = extract_serial(pair_info[4:8])
return {"wpid": wpid, "kind": kind, "polling": None, "serial": serial, "power_switch": "(unknown)"} return {"wpid": wpid, "kind": kind, "polling": None, "serial": serial, "power_switch": "(unknown)"}
else: else:
raise exceptions.NoSuchDevice(number=n, receiver=self, error="can't read Bolt pairing register") raise exceptions.NoSuchDevice(number=n, receiver=self, error="can't read Bolt pairing register")
@ -484,8 +532,8 @@ class Ex100Receiver(Receiver):
assert notification.address == 0x02 assert notification.address == 0x02
online = True online = True
encrypted = bool(notification.data[0] & 0x80) encrypted = bool(notification.data[0] & 0x80)
kind = hidpp10_constants.DEVICE_KIND[_get_kind_from_index(self, number)] kind = extract_device_kind(_get_kind_from_index(self, number))
wpid = "00" + notification.data[2:3].hex().upper() wpid = extract_wpid("00" + notification.data[2:3])
return online, encrypted, wpid, kind return online, encrypted, wpid, kind
def device_pairing_information(self, number: int) -> dict: def device_pairing_information(self, number: int) -> dict:
@ -494,11 +542,11 @@ class Ex100Receiver(Receiver):
if not wpid: if not wpid:
logger.error("Unable to get wpid from udev for device %d of %s", number, self) logger.error("Unable to get wpid from udev for device %d of %s", number, self)
raise exceptions.NoSuchDevice(number=number, receiver=self, error="Not present 27Mhz device") raise exceptions.NoSuchDevice(number=number, receiver=self, error="Not present 27Mhz device")
kind = hidpp10_constants.DEVICE_KIND[_get_kind_from_index(self, number)] kind = extract_device_kind(_get_kind_from_index(self, number))
return {"wpid": wpid, "kind": kind, "polling": "", "serial": None, "power_switch": "(unknown)"} return {"wpid": wpid, "kind": kind, "polling": "", "serial": None, "power_switch": "(unknown)"}
def _get_kind_from_index(receiver, index): def _get_kind_from_index(receiver, index: int) -> int:
"""Get device kind from 27Mhz device index""" """Get device kind from 27Mhz device index"""
# From drivers/hid/hid-logitech-dj.c # From drivers/hid/hid-logitech-dj.c
if index == 1: # mouse if index == 1: # mouse