receiver: Refactor extraction of serial and max. devices

Related #2273
This commit is contained in:
MattHag 2024-12-01 20:22:09 +01:00 committed by Peter F. Patel-Schneider
parent 800d3498f4
commit 5e0c85a6d7
1 changed files with 74 additions and 26 deletions

View File

@ -83,6 +83,53 @@ class Pairing:
error: Optional[any] = None
def extract_serial(response: bytes) -> str:
"""Extracts serial number from receiver response."""
return response.hex().upper()
def extract_max_devices(response: bytes) -> int:
"""Extracts maximum number of supported devices from response."""
max_devices = response[6]
return int(max_devices)
def extract_remaining_pairings(response: bytes) -> int:
ps = ord(response[2:3])
remaining_pairings = ps - 5 if ps >= 5 else -1
return int(remaining_pairings)
def extract_codename(response: bytes) -> str:
codename = response[2 : 2 + ord(response[1:2])]
return codename.decode("ascii")
def extract_power_switch_location(response: bytes) -> str:
"""Extracts power switch location from response."""
index = response[9] & 0x0F
return hidpp10_constants.PowerSwitchLocation(index).name.lower()
def extract_connection_count(response: bytes) -> int:
"""Extract connection count from receiver response."""
return ord(response[1:2])
def extract_wpid(response: bytes) -> str:
"""Extract wpid from receiver response."""
return response.hex().upper()
def extract_polling_rate(response: bytes) -> int:
"""Returns polling rate in milliseconds."""
return int(response[2])
def extract_device_kind(response: int) -> str:
return hidpp10_constants.DEVICE_KIND[response]
class Receiver:
"""A generic Receiver instance, mostly implementing the interface used on Unifying, Nano, and LightSpeed receivers"
The paired devices are available through the sequence interface.
@ -129,9 +176,9 @@ class Receiver:
# read the receiver information subregister, so we can find out max_devices
serial_reply = self.read_register(Registers.RECEIVER_INFO, InfoSubRegisters.RECEIVER_INFORMATION)
if serial_reply:
self.serial = serial_reply[1:5].hex().upper()
self.max_devices = serial_reply[6]
if self.max_devices <= 0 or self.max_devices > 6:
self.serial = extract_serial(serial_reply[1:5])
self.max_devices = extract_max_devices(serial_reply)
if not (1 <= self.max_devices <= 6):
self.max_devices = product_info.get("max_devices", 1)
else: # handle receivers that don't have a serial number specially (i.e., c534)
self.serial = None
@ -164,8 +211,7 @@ class Receiver:
if self._remaining_pairings is None or not cache:
ps = self.read_register(Registers.RECEIVER_CONNECTION)
if ps is not None:
ps = ord(ps[2:3])
self._remaining_pairings = ps - 5 if ps >= 5 else -1
self._remaining_pairings = extract_remaining_pairings(ps)
return self._remaining_pairings
def enable_connection_notifications(self, enable=True):
@ -195,8 +241,7 @@ class Receiver:
def device_codename(self, n):
codename = self.read_register(Registers.RECEIVER_INFO, InfoSubRegisters.DEVICE_NAME + n - 1)
if codename:
codename = codename[2 : 2 + ord(codename[1:2])]
return codename.decode("ascii")
return extract_codename(codename)
def notify_devices(self):
"""Scan all devices."""
@ -209,8 +254,8 @@ class Receiver:
assert notification.address != 0x02
online = not bool(notification.data[0] & 0x40)
encrypted = bool(notification.data[0] & 0x20) or notification.address == 0x10
kind = hidpp10_constants.DEVICE_KIND[notification.data[0] & 0x0F]
wpid = (notification.data[2:3] + notification.data[1:2]).hex().upper()
kind = extract_device_kind(notification.data[0] & 0x0F)
wpid = extract_wpid(notification.data[2:3] + notification.data[1:2])
return online, encrypted, wpid, kind
def device_pairing_information(self, n: int) -> dict:
@ -220,28 +265,29 @@ class Receiver:
power_switch = "(unknown)"
pair_info = self.read_register(Registers.RECEIVER_INFO, InfoSubRegisters.PAIRING_INFORMATION + n - 1)
if pair_info: # a receiver that uses Unifying-style pairing registers
wpid = pair_info[3:5].hex().upper()
kind = hidpp10_constants.DEVICE_KIND[pair_info[7] & 0x0F]
polling_rate = str(pair_info[2]) + "ms"
wpid = extract_wpid(pair_info[3:5])
kind = extract_device_kind(pair_info[7] & 0x0F)
polling_rate_ms = extract_polling_rate(pair_info)
polling_rate = f"{polling_rate_ms}ms"
elif not self.receiver_kind == "unifying": # may be an old Nano receiver
device_info = self.read_register(Registers.RECEIVER_INFO, 0x04) # undocumented
if device_info:
logger.warning("using undocumented register for device wpid")
wpid = device_info[3:5].hex().upper()
kind = hidpp10_constants.DEVICE_KIND[0x00] # unknown kind
wpid = extract_wpid(device_info[3:5])
kind = extract_device_kind(0x00) # unknown kind
else:
raise exceptions.NoSuchDevice(number=n, receiver=self, error="read pairing information - non-unifying")
else:
raise exceptions.NoSuchDevice(number=n, receiver=self, error="read pairing information")
pair_info = self.read_register(Registers.RECEIVER_INFO, InfoSubRegisters.EXTENDED_PAIRING_INFORMATION + n - 1)
if pair_info:
power_switch = hidpp10_constants.PowerSwitchLocation(pair_info[9] & 0x0F)
serial = pair_info[1:5].hex().upper()
power_switch = extract_power_switch_location(pair_info)
serial = extract_serial(pair_info[1:5])
else: # some Nano receivers?
pair_info = self.read_register(0x2D5) # undocumented and questionable
if pair_info:
logger.warning("using undocumented register for device serial number")
serial = pair_info[1:5].hex().upper()
serial = extract_serial(pair_info[1:5])
return {"wpid": wpid, "kind": kind, "polling": polling_rate, "serial": serial, "power_switch": power_switch}
def register_new_device(self, number, notification=None):
@ -287,7 +333,9 @@ class Receiver:
def count(self):
count = self.read_register(Registers.RECEIVER_CONNECTION)
return 0 if count is None else ord(count[1:2])
if count is None:
return 0
return extract_connection_count(count)
def request(self, request_id, *params):
if bool(self):
@ -412,7 +460,7 @@ class BoltReceiver(Receiver):
def initialize(self, product_info: dict):
serial_reply = self.read_register(Registers.BOLT_UNIQUE_ID)
self.serial = serial_reply.hex().upper()
self.serial = extract_serial(serial_reply)
self.max_devices = product_info.get("max_devices", 1)
def device_codename(self, n):
@ -424,9 +472,9 @@ class BoltReceiver(Receiver):
def device_pairing_information(self, n: int) -> dict:
pair_info = self.read_register(Registers.RECEIVER_INFO, InfoSubRegisters.BOLT_PAIRING_INFORMATION + n)
if pair_info:
wpid = (pair_info[3:4] + pair_info[2:3]).hex().upper()
kind = hidpp10_constants.DEVICE_KIND[pair_info[1] & 0x0F]
serial = pair_info[4:8].hex().upper()
wpid = extract_wpid(pair_info[3:4] + pair_info[2:3])
kind = extract_device_kind(pair_info[1] & 0x0F)
serial = extract_serial(pair_info[4:8])
return {"wpid": wpid, "kind": kind, "polling": None, "serial": serial, "power_switch": "(unknown)"}
else:
raise exceptions.NoSuchDevice(number=n, receiver=self, error="can't read Bolt pairing register")
@ -484,8 +532,8 @@ class Ex100Receiver(Receiver):
assert notification.address == 0x02
online = True
encrypted = bool(notification.data[0] & 0x80)
kind = hidpp10_constants.DEVICE_KIND[_get_kind_from_index(self, number)]
wpid = "00" + notification.data[2:3].hex().upper()
kind = extract_device_kind(_get_kind_from_index(self, number))
wpid = extract_wpid("00" + notification.data[2:3])
return online, encrypted, wpid, kind
def device_pairing_information(self, number: int) -> dict:
@ -494,11 +542,11 @@ class Ex100Receiver(Receiver):
if not wpid:
logger.error("Unable to get wpid from udev for device %d of %s", number, self)
raise exceptions.NoSuchDevice(number=number, receiver=self, error="Not present 27Mhz device")
kind = hidpp10_constants.DEVICE_KIND[_get_kind_from_index(self, number)]
kind = extract_device_kind(_get_kind_from_index(self, number))
return {"wpid": wpid, "kind": kind, "polling": "", "serial": None, "power_switch": "(unknown)"}
def _get_kind_from_index(receiver, index):
def _get_kind_from_index(receiver, index: int) -> int:
"""Get device kind from 27Mhz device index"""
# From drivers/hid/hid-logitech-dj.c
if index == 1: # mouse