diff --git a/docs/devices/PRO X 2 LIGHTSPEED 0AF7.text b/docs/devices/PRO X 2 LIGHTSPEED 0AF7.text new file mode 100644 index 00000000..fac5a2a0 --- /dev/null +++ b/docs/devices/PRO X 2 LIGHTSPEED 0AF7.text @@ -0,0 +1,47 @@ +solaar version 1.1.19-24-g693159ee + +Centurion Receiver + Device path : /dev/hidraw4 + USB id : 046d:0AF7 + Protocol : Centurion + 1 : 0.02 + Has 1 device(s) out of a maximum of 1. + Supports 5 dongle features: + 0: ROOT {0000} + 1: FEATURE SET {0001} + 2: CENTURION DEVICE INFO {0100} + Firmware: 1 0.02 + Hardware: model 26 rev 255 product 0508 + 3: CENTPP BRIDGE {0003} + 4: CENTURION GENERIC DFU {010A} + + 1: PRO X 2 LIGHTSPEED + Device path : /dev/hidraw4 + USB id : 046d:0AF7 + Codename : PRO X 2 LIGHTSPEED + Kind : headset + Protocol : Centurion 2.6 + Serial number: + Model ID: 0508 + Unit ID: + 1: 0.02 + Supports 10 HID++ 2.0 features: + 0: ROOT {0000} V0 + 1: FEATURE SET {0001} V0 + 2: CENTURION DEVICE NAME {0101} V0 + 3: CENTURION DEVICE INFO {0100} V0 + Firmware: 1 0.02 + Serial: + Hardware: model 26 rev 255 product 0508 + 4: CENTURION BATTERY SOC {0104} V0 + Battery: 97%, BatteryStatus.DISCHARGING. + 5: CENTURION GENERIC DFU {010A} V0 + 6: CENTURION AUTO SLEEP {0108} V0 + Auto Sleep Timeout: 10 + 7: HEADSET AUDIO SIDETONE {0604} V0 + Headset Sidetone: 55 + 8: HEADSET MIC SNR {0602} V0 + Mic SNR: True + 9: HEADSET ONBOARD EQ {0636} V0 + EQ: 80Hz:+0dB, 240Hz:+0dB, 750Hz:+0dB, 2200Hz:+0dB, 6600Hz:+0dB + Battery: 97%, BatteryStatus.DISCHARGING. diff --git a/lib/hidapi/common.py b/lib/hidapi/common.py index 8a5d1b6c..6817511a 100644 --- a/lib/hidapi/common.py +++ b/lib/hidapi/common.py @@ -18,3 +18,4 @@ class DeviceInfo: isDevice: bool hidpp_short: str | None hidpp_long: str | None + centurion: bool = False diff --git a/lib/hidapi/udev_impl.py b/lib/hidapi/udev_impl.py index 8b0c1f03..955dd802 100644 --- a/lib/hidapi/udev_impl.py +++ b/lib/hidapi/udev_impl.py @@ -101,7 +101,7 @@ def _match(action: str, device, filter_func: typing.Callable[[int, int, int, boo try: # if report descriptor does not indicate HID++ capabilities then this device is not of interest to Solaar from hid_parser import ReportDescriptor - hidpp_short = hidpp_long = False + hidpp_short = hidpp_long = centurion = False devfile = "/sys" + hid_device.properties.get("DEVPATH") + "/report_descriptor" with fileopen(devfile, "rb") as fd: with warnings.catch_warnings(): @@ -111,11 +111,16 @@ def _match(action: str, device, filter_func: typing.Callable[[int, int, int, boo # and _Usage(0xFF00, 0x0001) in rd.get_input_items(0x10)[0].usages # be more permissive hidpp_long = 0x11 in rd.input_report_ids and 19 * 8 == int(rd.get_input_report_size(0x11)) # and _Usage(0xFF00, 0x0002) in rd.get_input_items(0x11)[0].usages # be more permissive - if not hidpp_short and not hidpp_long: + # Centurion transport: report ID 0x51, 63-byte reports (usage page 0xFFA0) + centurion = ( + 0x51 in rd.input_report_ids and 63 * 8 == int(rd.get_input_report_size(0x51)) and 0x51 in rd.output_report_ids + ) + if not hidpp_short and not hidpp_long and not centurion: return except Exception as e: # if can't process report descriptor fall back to old scheme hidpp_short = None hidpp_long = None + centurion = False logger.info( "Report Descriptor not processed for DEVICE %s BID %s VID %s PID %s: %s", device.device_node, @@ -125,7 +130,7 @@ def _match(action: str, device, filter_func: typing.Callable[[int, int, int, boo e, ) - filtered_result = filter_func(int(bid, 16), int(vid, 16), int(pid, 16), hidpp_short, hidpp_long) + filtered_result = filter_func(int(bid, 16), int(vid, 16), int(pid, 16), hidpp_short or centurion, hidpp_long or centurion) if not filtered_result: return interface_number = filtered_result.get("usb_interface") @@ -165,6 +170,7 @@ def _match(action: str, device, filter_func: typing.Callable[[int, int, int, boo isDevice=isDevice, hidpp_short=hidpp_short, hidpp_long=hidpp_long, + centurion=centurion if centurion else False, ) return d_info diff --git a/lib/logitech_receiver/base.py b/lib/logitech_receiver/base.py index 69aee2ae..b66e2a1e 100644 --- a/lib/logitech_receiver/base.py +++ b/lib/logitech_receiver/base.py @@ -96,6 +96,20 @@ HIDPP_SHORT_MESSAGE_ID = 0x10 HIDPP_LONG_MESSAGE_ID = 0x11 DJ_MESSAGE_ID = 0x20 +# Centurion transport (used by PRO X 2 LIGHTSPEED headset and similar) +# Uses report ID 0x51 on usage page 0xFFA0, 64-byte frames. +# Wire format (CPL): [0x51, cpl_length, flags=0x00, feat_idx, func_sw, params..., pad] +# cpl_length = number of bytes from flags to end of meaningful data (includes flags byte). +# The device_index byte from standard HID++ is NOT present in Centurion framing. +CENTURION_REPORT_ID = 0x51 +CENTURION_FRAME_SIZE = 64 # 1 byte report ID + 63 bytes payload +_CENTURION_MSG_SIZE = 63 # max reconstructed message size after unwrapping (2 + 61 payload bytes) + +# Set of handles that use Centurion framing +_centurion_handles: set[int] = set() +# Raw Centurion protocol version (major, minor) by handle, from ping response +_centurion_protocol_versions: dict[int, tuple[int, int]] = {} + """Default timeout on read (in seconds).""" DEFAULT_TIMEOUT = 4 @@ -287,6 +301,8 @@ def close(handle): if handle: try: if isinstance(handle, int): + _centurion_handles.discard(handle) + _centurion_protocol_versions.pop(handle, None) hidapi.close(handle) else: handle.close() @@ -318,6 +334,17 @@ def write(handle, devnumber, data, long_message=False): wdata = struct.pack("!BB18s", HIDPP_LONG_MESSAGE_ID, devnumber, data) else: wdata = struct.pack("!BB5s", HIDPP_SHORT_MESSAGE_ID, devnumber, data) + + ihandle = int(handle) + if ihandle in _centurion_handles: + # Centurion CPL framing: [0x51, cpl_length, flags=0x00, feat_idx, func_sw, params...] + # cpl_length = len(meaningful_payload) + 1 (the +1 counts the flags byte) + # The device_index is stripped — only the HID++ payload (feat_idx + func_sw + params) remains. + payload = wdata[2:] # skip report_id and devnumber from standard frame + cpl_length = len(data) + 1 # data is the unpadded payload; +1 for flags byte + wdata = struct.pack("!BBB", CENTURION_REPORT_ID, cpl_length, 0x00) + payload + wdata = wdata + b"\x00" * (CENTURION_FRAME_SIZE - len(wdata)) + if logger.isEnabledFor(logging.DEBUG): logger.debug( "(%s) <= w[%02X %02X %s %s]", @@ -329,7 +356,33 @@ def write(handle, devnumber, data, long_message=False): ) try: - hidapi.write(int(handle), wdata) + hidapi.write(ihandle, wdata) + except Exception as reason: + logger.error("write failed, assuming handle %r no longer available", handle) + close(handle) + raise exceptions.NoReceiver(reason=reason) from reason + + +def write_centurion_cpl(handle, layer3_payload, flags=0x00): + """Send a Centurion CPL frame with the given Layer 3+ payload. + + Builds: [0x51, cpl_length, flags, layer3_payload..., zero-pad to 64 bytes] + where cpl_length = len(layer3_payload) + 1 (the +1 counts the flags byte). + + For multi-fragment sends, flags encodes fragment index and continuation: + flags = (fragment_index << 1) | (1 if more_fragments else 0) + Single-frame messages use flags=0x00 (default). + """ + ihandle = int(handle) + if ihandle not in _centurion_handles: + raise ValueError("write_centurion_cpl called on non-Centurion handle") + cpl_length = len(layer3_payload) + 1 # +1 for flags byte + wdata = struct.pack("!BBB", CENTURION_REPORT_ID, cpl_length, flags) + layer3_payload + wdata = wdata + b"\x00" * (CENTURION_FRAME_SIZE - len(wdata)) + if logger.isEnabledFor(logging.DEBUG): + logger.debug("(%s) <= centurion_cpl[%s]", handle, common.strhex(wdata[: cpl_length + 2])) + try: + hidapi.write(ihandle, wdata) except Exception as reason: logger.error("write failed, assuming handle %r no longer available", handle) close(handle) @@ -361,17 +414,17 @@ def _is_relevant_message(data: bytes) -> bool: """ assert isinstance(data, bytes), (repr(data), type(data)) - # mapping from report_id to message length + # mapping from report_id to accepted message lengths report_lengths = { - HIDPP_SHORT_MESSAGE_ID: SHORT_MESSAGE_SIZE, - HIDPP_LONG_MESSAGE_ID: _LONG_MESSAGE_SIZE, - DJ_MESSAGE_ID: _MEDIUM_MESSAGE_SIZE, - 0x21: _MAX_READ_SIZE, + HIDPP_SHORT_MESSAGE_ID: (SHORT_MESSAGE_SIZE,), + HIDPP_LONG_MESSAGE_ID: (_LONG_MESSAGE_SIZE, _CENTURION_MSG_SIZE), + DJ_MESSAGE_ID: (_MEDIUM_MESSAGE_SIZE,), + 0x21: (_MAX_READ_SIZE,), } report_id = ord(data[:1]) if report_id in report_lengths: - if report_lengths.get(report_id) == len(data): + if len(data) in report_lengths[report_id]: return True else: logger.warning(f"unexpected message size: report_id {report_id:02X} message {common.strhex(data)}") @@ -387,15 +440,35 @@ def _read(handle, timeout) -> tuple[int, int, bytes]: been physically removed from the machine, or the kernel driver has been unloaded. The handle will be closed automatically. """ + ihandle = int(handle) + is_centurion = ihandle in _centurion_handles + read_size = CENTURION_FRAME_SIZE if is_centurion else _MAX_READ_SIZE try: # convert timeout to milliseconds, the hidapi expects it timeout = int(timeout * 1000) - data = hidapi.read(int(handle), _MAX_READ_SIZE, timeout) + data = hidapi.read(ihandle, read_size, timeout) except Exception as reason: logger.warning("read failed, assuming handle %r no longer available", handle) close(handle) raise exceptions.NoReceiver(reason=reason) from reason + if data and is_centurion and ord(data[:1]) == CENTURION_REPORT_ID: + # Unwrap Centurion CPL framing: + # RX: [0x51, cpl_length, flags=0x00, feat_idx, func_sw, data...] + # cpl_length includes the flags byte, so meaningful payload starts at byte 3 + # and has (cpl_length - 1) bytes. + # Reconstruct as HID++ long message: [0x11, devnumber=0xFF, feat_idx, func_sw, data...] + cpl_length = ord(data[1:2]) + inner_payload = data[3 : 2 + cpl_length] # bytes 3..2+cpl_length-1 = cpl_length-1 bytes + data = bytes([HIDPP_LONG_MESSAGE_ID, 0xFF]) + inner_payload + # Pad to a valid message size: standard long (20) or Centurion extended (63) + if len(data) <= _LONG_MESSAGE_SIZE: + data = data + b"\x00" * (_LONG_MESSAGE_SIZE - len(data)) + elif len(data) <= _CENTURION_MSG_SIZE: + data = data + b"\x00" * (_CENTURION_MSG_SIZE - len(data)) + else: + data = data[:_CENTURION_MSG_SIZE] + if data and _is_relevant_message(data): # ignore messages that fail check report_id = ord(data[:1]) devnumber = ord(data[1:2]) @@ -564,13 +637,17 @@ def request( if reply_data[:1] == b"\xff" and reply_data[1:3] == request_data[:2]: # a HID++ 2.0 feature call returned with an error error = ord(reply_data[3:4]) + try: + error_name = Hidpp20ErrorCode(error) + except ValueError: + error_name = f"unknown:{error:02X}" logger.error( "(%s) device %d error on feature request {%04X}: %d = %s", handle, devnumber, request_id, error, - Hidpp20ErrorCode(error), + error_name, ) raise exceptions.FeatureCallError( number=devnumber, @@ -641,9 +718,15 @@ def ping(handle, devnumber, long_message: bool = False): if reply: report_id, reply_devnumber, reply_data = reply if reply_devnumber == devnumber or reply_devnumber == devnumber ^ 0xFF: # BT device returning 0x00 - if reply_data[:2] == request_data[:2] and reply_data[4:5] == request_data[-1:]: + is_centurion = int(handle) in _centurion_handles + mark_ok = is_centurion or reply_data[4:5] == request_data[-1:] + if reply_data[:2] == request_data[:2] and mark_ok: # HID++ 2.0+ device, currently connected - return ord(reply_data[2:3]) + ord(reply_data[3:4]) / 10.0 + major = ord(reply_data[2:3]) + minor = ord(reply_data[3:4]) + if is_centurion: + _centurion_protocol_versions[int(handle)] = (major, minor) + return major + minor / 10.0 if ( report_id == HIDPP_SHORT_MESSAGE_ID @@ -675,17 +758,30 @@ def _read_input_buffer(handle, ihandle, notifications_hook): Used by request() and ping() before their write. """ + is_centurion = ihandle in _centurion_handles + read_size = CENTURION_FRAME_SIZE if is_centurion else _MAX_READ_SIZE while True: try: # read whatever is already in the buffer, if any - data = hidapi.read(ihandle, _MAX_READ_SIZE, 0) + data = hidapi.read(ihandle, read_size, 0) except Exception as reason: logger.error("read failed, assuming receiver %s no longer available", handle) close(handle) raise exceptions.NoReceiver(reason=reason) from reason if data: + if is_centurion and ord(data[:1]) == CENTURION_REPORT_ID: + # Unwrap Centurion CPL framing same as in _read() + cpl_length = ord(data[1:2]) + inner_payload = data[3 : 2 + cpl_length] + data = bytes([HIDPP_LONG_MESSAGE_ID, 0xFF]) + inner_payload + if len(data) <= _LONG_MESSAGE_SIZE: + data = data + b"\x00" * (_LONG_MESSAGE_SIZE - len(data)) + elif len(data) <= _CENTURION_MSG_SIZE: + data = data + b"\x00" * (_CENTURION_MSG_SIZE - len(data)) + else: + data = data[:_CENTURION_MSG_SIZE] if _is_relevant_message(data): # only process messages that pass check # report_id = ord(data[:1]) if notifications_hook: diff --git a/lib/logitech_receiver/centurion.py b/lib/logitech_receiver/centurion.py new file mode 100644 index 00000000..f1222869 --- /dev/null +++ b/lib/logitech_receiver/centurion.py @@ -0,0 +1,511 @@ +## Copyright (C) 2012-2013 Daniel Pavel +## Copyright (C) 2014-2024 Solaar Contributors https://pwr-solaar.github.io/Solaar/ +## +## This program is free software; you can redistribute it and/or modify +## it under the terms of the GNU General Public License as published by +## the Free Software Foundation; either version 2 of the License, or +## (at your option) any later version. +## +## This program is distributed in the hope that it will be useful, +## but WITHOUT ANY WARRANTY; without even the implied warranty of +## MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +## GNU General Public License for more details. +## +## You should have received a copy of the GNU General Public License along +## with this program; if not, write to the Free Software Foundation, Inc., +## 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. + +"""Centurion device protocol — receiver class, factory, device info/firmware/battery queries. + +CenturionReceiver is a lightweight receiver-like container for Centurion +(PRO X 2 LIGHTSPEED and similar) dongles. Protocol functions query device +info, firmware, serial, name, and battery via Centurion-specific HID++ features. +""" + +from __future__ import annotations + +import errno +import logging +import struct + +from typing import Callable + +from solaar import configuration + +from . import base +from . import exceptions +from . import hidpp10 +from .centurion_constants import CenturionCoreFeature +from .common import Alert +from .common import Battery +from .common import BatteryStatus +from .common import FirmwareKind +from .common import _read_usb_product_string +from .hidpp20_constants import SupportedFeature + +logger = logging.getLogger(__name__) + + +# --- Centurion protocol functions (standalone, operate on any device-like object) --- + + +def get_firmware_centurion(device): + """Reads firmware info from a Centurion device via DeviceInfo (0x0100) function 1.""" + from . import common + + fw = [] + seen = set() # track response signatures to detect duplicates + for index in range(0, 8): # try up to 8 entities + try: + report = device.feature_request(SupportedFeature.CENTURION_DEVICE_INFO, 0x10, index) + except exceptions.FeatureCallError: + break + if not report or len(report) < 5: + break + # Dedup: parent device returns the same response for every entity index + sig = bytes(report[: 5 + report[4]]) + if sig in seen: + break + seen.add(sig) + fw_type = report[0] + version = struct.unpack("!H", report[2:4])[0] + name_len = report[4] + name = report[5 : 5 + name_len].decode("ascii", errors="replace").rstrip("\x00") if name_len else "" + version_str = f"{version >> 8}.{version & 0xFF:02d}" + kind = FirmwareKind(fw_type) if fw_type <= 3 else FirmwareKind.Other + fw.append(common.FirmwareInfo(kind, name, version_str, None)) + return tuple(fw) if fw else None + + +def get_serial_centurion(device): + """Reads the serial number from a Centurion device via DeviceInfo (0x0100) function 2.""" + try: + report = device.feature_request(SupportedFeature.CENTURION_DEVICE_INFO, 0x20) + except exceptions.FeatureCallError: + return None + if not report or len(report) < 2: + return None + str_len = report[0] + return report[1 : 1 + str_len].decode("ascii", errors="replace").rstrip("\x00") + + +def get_hardware_info_centurion(device): + """Reads hardware info from a Centurion device via DeviceInfo (0x0100) function 0. + + Returns (modelId, hardwareRevision, productId) or None. + """ + try: + report = device.feature_request(SupportedFeature.CENTURION_DEVICE_INFO) + except exceptions.FeatureCallError: + return None + if not report or len(report) < 4: + return None + model_id = report[0] + hw_revision = report[1] + product_id = struct.unpack("!H", report[2:4])[0] + return model_id, hw_revision, product_id + + +def _centurion_sub_device_info_request(device, function=0x00, *params): + """Send a DeviceInfo (0x0100) request to the sub-device via bridge.""" + sub_indices = getattr(device, "_centurion_sub_indices", {}) + sub_idx = sub_indices.get(SupportedFeature.CENTURION_DEVICE_INFO) + if sub_idx is None: + return None + return device.centurion_bridge_request(sub_idx, function, *params) + + +def get_firmware_centurion_sub(device): + """Reads firmware info from the Centurion sub-device (headset) via bridge.""" + from . import common + + fw = [] + seen = set() + for index in range(0, 8): + report = _centurion_sub_device_info_request(device, 0x10, index) + if not report or len(report) < 5: + break + sig = bytes(report[: 5 + report[4]]) + if sig in seen: + break + seen.add(sig) + fw_type = report[0] + version = struct.unpack("!H", report[2:4])[0] + name_len = report[4] + name = report[5 : 5 + name_len].decode("ascii", errors="replace").rstrip("\x00") if name_len else "" + version_str = f"{version >> 8}.{version & 0xFF:02d}" + kind = FirmwareKind(fw_type) if fw_type <= 3 else FirmwareKind.Other + fw.append(common.FirmwareInfo(kind, name, version_str, None)) + return tuple(fw) if fw else None + + +def get_serial_centurion_sub(device): + """Reads the serial number from the Centurion sub-device (headset) via bridge.""" + report = _centurion_sub_device_info_request(device, 0x20) + if not report or len(report) < 2: + return None + str_len = report[0] + return report[1 : 1 + str_len].decode("ascii", errors="replace").rstrip("\x00") + + +def get_hardware_info_centurion_sub(device): + """Reads hardware info from the Centurion sub-device (headset) via bridge. + + Returns (modelId, hardwareRevision, productId) or None. + """ + report = _centurion_sub_device_info_request(device) + if not report or len(report) < 4: + return None + model_id = report[0] + hw_revision = report[1] + product_id = struct.unpack("!H", report[2:4])[0] + return model_id, hw_revision, product_id + + +def get_name_centurion(device): + """Reads a Centurion device's name via DeviceName (0x0101). + + Tries two response formats: + 1. Inline: function 0 returns [name_len, name_bytes...] (like serial) + 2. Chunked: function 0 returns [name_len], function 1 returns [name_bytes...] (like standard DeviceName) + """ + try: + reply = device.feature_request(SupportedFeature.CENTURION_DEVICE_NAME) + except exceptions.FeatureCallError: + return None + if not reply: + return None + name_length = reply[0] + if name_length == 0: + return None + # If the full name is inline (length + name bytes in one response) + if len(reply) >= 1 + name_length: + return reply[1 : 1 + name_length].decode("utf-8", errors="replace").rstrip("\x00") + # Otherwise, fetch name in chunks via function 1 (like standard DEVICE_NAME) + name = b"" + while len(name) < name_length: + try: + fragment = device.feature_request(SupportedFeature.CENTURION_DEVICE_NAME, 0x10, len(name)) + except exceptions.FeatureCallError: + break + if fragment: + name += fragment[: name_length - len(name)] + else: + break + return name.decode("utf-8", errors="replace").rstrip("\x00") if name else None + + +def get_battery_centurion(device): + """Query battery via CENTURION_BATTERY_SOC.""" + try: + report = device.feature_request(SupportedFeature.CENTURION_BATTERY_SOC) + if report is not None: + return decipher_battery_centurion(report) + except exceptions.FeatureCallError: + if SupportedFeature.CENTURION_BATTERY_SOC in device.features: + return SupportedFeature.CENTURION_BATTERY_SOC + return None + + +def decipher_battery_centurion(report) -> tuple[SupportedFeature, Battery]: + """Decipher CENTURION_BATTERY_SOC (0x0104) response. + + Response format (3 bytes): + Byte 0: Battery Percentage (0-100) + Byte 1: Battery Percentage (duplicate) + Byte 2: Charging Status (0=discharging, 1=charging, 2=charging via USB, 3=charge complete) + """ + if len(report) < 1: + return SupportedFeature.CENTURION_BATTERY_SOC, Battery(None, None, BatteryStatus.DISCHARGING, None) + soc = report[0] + logger.debug("centurion battery SOC raw: %s", report[:8].hex()) + charging_status = report[2] if len(report) >= 3 else 0 + if charging_status in (1, 2): + status = BatteryStatus.RECHARGING + elif charging_status == 3: + status = BatteryStatus.FULL + else: + status = BatteryStatus.DISCHARGING + return SupportedFeature.CENTURION_BATTERY_SOC, Battery(soc, None, status, None) + + +# --- CenturionReceiver class --- + + +class CenturionReceiver: + """A lightweight receiver-like container for Centurion (PRO X 2 LIGHTSPEED) dongles. + + Provides the Receiver interface to the UI so the dongle appears as a parent + with the headset as an indented child device. NOT a subclass of Receiver — + Receiver's __init__ does HID++ 1.0 register reads and pairing setup that + don't apply to Centurion. + + All centurion communication (bridge, features, settings, battery) lives in + the child Device; this class is just a UI container + handle owner. + """ + + read_register: Callable = hidpp10.read_register + write_register: Callable = hidpp10.write_register + number = 0xFF + kind = None + isDevice = False + may_unpair = False + re_pairs = False + max_devices = 1 + + def __init__(self, low_level, handle, device_info, setting_callback=None): + assert handle + self.low_level = low_level + self.handle = handle + self.path = device_info.path + self.product_id = device_info.product_id + self.setting_callback = setting_callback + self.status_callback = None + self.notification_flags = None + self._devices = {} + self._firmware = None + self._dongle_features = None # independently probed dongle features + self.cleanups = [] + + # Receiver identity + self.serial = None + self._usb_name = getattr(device_info, "product", None) + if not self._usb_name and self.path: + self._usb_name = _read_usb_product_string(self.path) + self.name = "Centurion Receiver" + + # Dummy pairing object — lock_open stays False + from .receiver import Pairing + + self.pairing = Pairing() + + # Discover dongle features independently + self._discover_dongle_features() + + # Read serial from dongle's CENTURION_DEVICE_INFO if available + if self.serial is None: + try: + s = get_serial_centurion(self) + if s and s.strip() and s.strip().isprintable(): + self.serial = s.strip() + except Exception: + pass + + def enable_connection_notifications(self, enable=True): + return False + + def remaining_pairings(self, cache=True): + return None + + def device_codename(self, n): + return self._usb_name + + def request(self, request_id, *params, no_reply=False): + """Send an HID++ request directly to the dongle (not through bridge).""" + if self.handle: + return self.low_level.request( + self.handle, 0xFF, request_id, *params, no_reply=no_reply, long_message=True, protocol=2.0 + ) + + def feature_request(self, feature, function=0x00, *params, no_reply=False): + """Send a feature request to the dongle using discovered feature indices.""" + if self._dongle_features is None: + self._discover_dongle_features() + feature_int = int(feature) + for _feat, feat_id, index in self._dongle_features or []: + if feat_id == feature_int: + request_id = (index << 8) | (function & 0xFF) + return self.request(request_id, *params, no_reply=no_reply) + raise exceptions.FeatureNotSupported(feature) + + def _discover_dongle_features(self): + """Independently discover features on the dongle hardware.""" + self._dongle_features = [] + try: + # Query ROOT for FEATURE_SET index + response = self.request(0x0000, 0x00, 0x01) + if response is None or response[0] == 0: + return + fs_index = response[0] + # Get feature count + count_resp = self.request(fs_index << 8) + if count_resp is None: + return + feature_count = count_resp[0] + # Enumerate features via CenturionFeatureSet (func 1 = 0x10, per-index query) + for idx in range(feature_count): + resp = self.request((fs_index << 8) | 0x10, idx) + if resp is None or len(resp) < 3: + continue + feat_id = struct.unpack("!H", resp[1:3])[0] + try: + feature = SupportedFeature(feat_id) + except ValueError: + feature = f"unknown:{feat_id:04X}" + self._dongle_features.append((feature, feat_id, idx)) + if logger.isEnabledFor(logging.DEBUG): + logger.debug("Centurion dongle features: %s", self._dongle_features) + except Exception: + logger.debug("Centurion dongle feature discovery failed", exc_info=True) + + @property + def dongle_features(self): + """Return list of (feature, feat_id, index) tuples for dongle features.""" + if self._dongle_features is None: + self._discover_dongle_features() + return self._dongle_features + + def count(self): + return len([d for d in self._devices.values() if d is not None]) + + @property + def firmware(self): + if self._firmware is None and self.handle: + self._firmware = get_firmware_centurion(self) + return self._firmware or () + + def notify_devices(self): + """Create child Device for the headset and trigger its initialization.""" + # Import Device locally to avoid circular import (centurion.py ↔ device.py) + from .device import Device + + # Signal receiver to UI first — tray/window need the receiver entry + # before a child device can be added under it. + self.changed(alert=Alert.NONE) + + # Create child Device with receiver=self, number=1 + pairing_info = { + "wpid": self.product_id, + "kind": None, + "serial": None, + "polling": None, + "power_switch": None, + } + dev = Device( + self.low_level, + self, + 1, + None, + pairing_info=pairing_info, + setting_callback=self.setting_callback, + ) + # Set centurion attributes on the child + dev.centurion = True + dev.product_id = self.product_id + dev.hidpp_long = True + dev._centurion_usb_name = self._usb_name + # Pre-set bridge index from dongle features so ping can probe the headset + for _feat, feat_id, idx in self._dongle_features or []: + if feat_id == CenturionCoreFeature.CENT_PP_BRIDGE: + dev._centurion_bridge_index = idx + break + + self._devices[1] = dev + configuration.attach_to(dev) + dev.status_callback = self.status_callback + + # Ping to determine online status. + # Notify UI either way — offline devices show as greyed out (matching receiver behavior). + online = dev.ping() + dev.changed(active=online) + if self.status_callback is not None: + self.status_callback(dev) + + def changed(self, alert=Alert.NOTIFICATION, reason=None): + if self.status_callback is not None: + self.status_callback(self, alert=alert, reason=reason) + + def status_string(self): + count = self.count() + if count == 0: + return "No devices." + return f"{count} device connected." + + def close(self): + handle, self.handle = self.handle, None + for _n, d in self._devices.items(): + if d: + d.close() + self._devices.clear() + for cleanup in self.cleanups: + cleanup(self) + return handle and self.low_level.close(handle) + + def __del__(self): + self.close() + + def __iter__(self): + for dev in self._devices.values(): + if dev is not None: + yield dev + + def __getitem__(self, key): + dev = self._devices.get(key) + if dev is not None: + return dev + raise IndexError(key) + + def __len__(self): + return len([d for d in self._devices.values() if d is not None]) + + def __contains__(self, dev): + if isinstance(dev, int): + return self._devices.get(dev) is not None + return self.__contains__(dev.number) + + def __bool__(self): + return self.handle is not None + + __nonzero__ = __bool__ + + def __eq__(self, other): + return other is not None and self.kind == other.kind and self.path == other.path + + def __ne__(self, other): + return other is None or self.kind != other.kind or self.path != other.path + + def __hash__(self): + return self.path.__hash__() + + def __str__(self): + return "<%s(%s,%s%s)>" % ( + self.name.replace(" ", "") if self.name else "CenturionReceiver", + self.path, + "" if isinstance(self.handle, int) else "T", + self.handle, + ) + + __repr__ = __str__ + + +def create_centurion_receiver(low_level, device_info, setting_callback=None): + """Opens a Centurion dongle and wraps it as a receiver-like container. + + Creates a CenturionReceiver, discovers its features, then checks if + CentPPBridge (0x0003) is among them. If not, this is a direct-connected + device (wired headset) — close and return None so the caller can fall + back to create_device(). + + :returns: A CenturionReceiver, or None. + """ + try: + handle = low_level.open_path(device_info.path) + if handle: + base._centurion_handles.add(int(handle)) + cr = CenturionReceiver(low_level, handle, device_info, setting_callback) + # Check if any discovered feature is CentPPBridge (0x0003) + has_bridge = any(feat_id == CenturionCoreFeature.CENT_PP_BRIDGE for _, feat_id, _ in (cr.dongle_features or [])) + if not has_bridge: + logger.info("Centurion device %s has no bridge, treating as direct device", device_info.path) + base._centurion_handles.discard(int(handle)) + cr.handle = None # prevent __del__ from double-closing + low_level.close(handle) + return None + return cr + except OSError as e: + logger.exception("open %s", device_info) + if e.errno == errno.EACCES: + raise e + except Exception as e: + logger.exception("open %s", device_info) + raise e diff --git a/lib/logitech_receiver/centurion_constants.py b/lib/logitech_receiver/centurion_constants.py new file mode 100644 index 00000000..6e0e3585 --- /dev/null +++ b/lib/logitech_receiver/centurion_constants.py @@ -0,0 +1,38 @@ +"""Centurion transport-specific constants. + +Feature IDs that collide with HID++ 2.0 core features live here +so they can coexist with SupportedFeature (which requires unique values). +""" + +from __future__ import annotations + +from enum import IntEnum + +from .hidpp20_constants import SupportedFeature + + +class CenturionCoreFeature(IntEnum): + """Centurion transport-specific features that collide with HID++ 2.0 core IDs.""" + + CENTURION_ROOT = 0x0000 + CENTURION_FEATURE_SET = 0x0001 + CENT_PP_BRIDGE = 0x0003 + MULTI_HOST_CONTROL = 0x0005 + KEEP_ALIVE = 0x0007 + + def __str__(self): + return self.name.replace("_", " ") + + +def resolve_feature(feat_id: int, centurion: bool = False): + """Resolve a feature ID to the appropriate enum, checking centurion-specific + features first when on the centurion transport.""" + if centurion: + try: + return CenturionCoreFeature(feat_id) + except ValueError: + pass + try: + return SupportedFeature(feat_id) + except ValueError: + return None diff --git a/lib/logitech_receiver/common.py b/lib/logitech_receiver/common.py index 3d00a015..3aa4127e 100644 --- a/lib/logitech_receiver/common.py +++ b/lib/logitech_receiver/common.py @@ -674,3 +674,17 @@ class Notification(IntEnum): class BusID(IntEnum): USB = 0x03 BLUETOOTH = 0x05 + + +def _read_usb_product_string(hidraw_path): + """Read the USB product string from sysfs for a hidraw device path.""" + import pathlib + + try: + # /sys/class/hidraw/hidrawN/device/../../product → USB device product string + hidraw_name = pathlib.Path(hidraw_path).name + product_path = pathlib.Path("/sys/class/hidraw") / hidraw_name / "device" / ".." / ".." / "product" + product = product_path.read_text().strip() + return product if product else None + except (OSError, ValueError): + return None diff --git a/lib/logitech_receiver/descriptors.py b/lib/logitech_receiver/descriptors.py index fc81d494..41a66670 100644 --- a/lib/logitech_receiver/descriptors.py +++ b/lib/logitech_receiver/descriptors.py @@ -465,3 +465,4 @@ _D( kind=DEVICE_KIND.headset, usbid=0x0ABA, ) +# PRO X 2 LIGHTSPEED Gaming Headset (0x0AF7) — fully probed via Centurion transport, no static descriptor needed diff --git a/lib/logitech_receiver/device.py b/lib/logitech_receiver/device.py index ca5de065..2f23c5d1 100644 --- a/lib/logitech_receiver/device.py +++ b/lib/logitech_receiver/device.py @@ -19,6 +19,7 @@ from __future__ import annotations import errno import logging +import struct import threading import time import typing @@ -29,6 +30,7 @@ from typing import Protocol from solaar import configuration +from . import base from . import descriptors from . import exceptions from . import hidpp10 @@ -38,6 +40,7 @@ from . import settings from . import settings_templates from .common import Alert from .common import Battery +from .common import _read_usb_product_string from .hidpp10_constants import NotificationFlag from .hidpp20_constants import SupportedFeature @@ -74,6 +77,8 @@ def create_device(low_level: LowLevelInterface, device_info, setting_callback=No try: handle = low_level.open_path(device_info.path) if handle: + if getattr(device_info, "centurion", False): + base._centurion_handles.add(int(handle)) # a direct connected device might not be online (as reported by user) return Device( low_level, @@ -124,6 +129,16 @@ class Device: self.product_id = device_info.product_id if device_info else None self.hidpp_short = device_info.hidpp_short if device_info else None self.hidpp_long = device_info.hidpp_long if device_info else None + self.centurion = device_info.centurion if device_info else False + self._centurion_usb_name = None + if self.centurion: + self.hidpp_long = True # Centurion devices always use long HID++ messages + # Read USB product string for device name — avoids slow bridge probe via CENTURION_DEVICE_NAME. + # device_info.product is often None (udev reads USB interface attrs, not device attrs), + # so fall back to reading from sysfs. + self._centurion_usb_name = getattr(device_info, "product", None) if device_info else None + if not self._centurion_usb_name and self.path: + self._centurion_usb_name = _read_usb_product_string(self.path) self.bluetooth = device_info.bus_id == 0x0005 if device_info else False # Bluetooth needs long messages self.hid_serial = device_info.serial if device_info else None self.setting_callback = setting_callback # for changes to settings @@ -230,10 +245,14 @@ class Device: def codename(self): if not self._codename: if self.online and self.protocol >= 2.0: - self._codename = _hidpp20.get_friendly_name(self) + if not self.centurion: + self._codename = _hidpp20.get_friendly_name(self) if not self._codename and self.name: - names = self.name.split(" ") - self._codename = names[1 if len(names) > 1 and names[0] == "Logitech" else 0] + if self.centurion: + self._codename = self.name + else: + names = self.name.split(" ") + self._codename = names[1 if len(names) > 1 and names[0] == "Logitech" else 0] if not self._codename and self.receiver: codename = self.receiver.device_codename(self.number) if codename: @@ -247,17 +266,40 @@ class Device: if not self._name: with self._simple_lock: if self._name is None: - if self.online and self.protocol >= 2.0: + if self.online and self.centurion: + self._name = _hidpp20.get_name_centurion(self) or getattr(self, "_centurion_usb_name", None) + if not self._name: + self._name = f"Unknown device {self.wpid or self.product_id}" + elif self.online and self.protocol >= 2.0: self._name = _hidpp20.get_name(self) return self._name or self._codename or f"Unknown device {self.wpid or self.product_id}" def get_ids(self): + if self.centurion: + self._get_ids_centurion() + return ids = _hidpp20.get_ids(self) if ids: self._unitId, self._modelId, self._tid_map = ids if logger.isEnabledFor(logging.INFO) and self._serial and self._serial != self._unitId: logger.info("%s: unitId %s does not match serial %s", self, self._unitId, self._serial) + def _get_ids_centurion(self): + if getattr(self, "_centurion_ids_done", False): + return + self._centurion_ids_done = True + serial = _hidpp20.get_serial_centurion(self) + if not serial or not serial.strip() or not serial.strip().isprintable(): + serial = _hidpp20.get_serial_centurion_sub(self) + if serial and serial.strip() and serial.strip().isprintable(): + self._serial = serial.strip() + self._unitId = self._serial + hw_info = _hidpp20.get_hardware_info_centurion(self) + if hw_info: + model_id, hw_revision, product_id = hw_info + self._modelId = f"{product_id:04X}" + self._tid_map = {"usbid": f"{product_id:04X}"} + @property def unitId(self): if not self._unitId and self.online and self.protocol >= 2.0: @@ -279,13 +321,32 @@ class Device: @property def kind(self): if not self._kind and self.online and self.protocol >= 2.0: - self._kind = _hidpp20.get_kind(self) + if self.centurion: + self._kind = self._infer_kind_centurion() + else: + self._kind = _hidpp20.get_kind(self) return self._kind or "?" + def _infer_kind_centurion(self): + """Infer device kind from Centurion features (sub-device or top-level).""" + # Check sub-device features (wireless via bridge) + for feature in getattr(self, "_centurion_sub_features", ()): + if isinstance(feature, int) and 0x0600 <= feature <= 0x06FF: + return hidpp10_constants.DEVICE_KIND.headset + # Check top-level features (direct USB connection, no bridge) + if self.features: + for feature, _index in self.features.enumerate(): + feat_int = int(feature) if isinstance(feature, int) else 0 + if 0x0600 <= feat_int <= 0x06FF: + return hidpp10_constants.DEVICE_KIND.headset + return None + @property def firmware(self) -> tuple[common.FirmwareInfo]: if self._firmware is None and self.online: - if self.protocol >= 2.0: + if self.centurion: + self._firmware = _hidpp20.get_firmware_centurion_sub(self) or _hidpp20.get_firmware_centurion(self) + elif self.protocol >= 2.0: self._firmware = _hidpp20.get_firmware(self) else: self._firmware = _hidpp10.get_firmware(self) @@ -293,6 +354,8 @@ class Device: @property def serial(self): + if not self._serial and self.online and self.centurion: + self.get_ids() return self._serial or "" @property @@ -472,7 +535,7 @@ class Device: else: self.set_configuration(0x11) # signal end of configuration self.read_battery() # battery information may have changed so try to read it now - elif was_active and self.receiver: # need to set configuration pending flag in receiver + elif was_active and self.receiver and not isinstance(self.receiver, CenturionReceiver): hidpp10.set_configuration_pending_flags(self.receiver, 0xFF) if logger.isEnabledFor(logging.DEBUG): logger.debug("device %d changed: active=%s %s", self.number, self._active, self.battery_info) @@ -537,9 +600,12 @@ class Device: long = self.hidpp_long is True or ( self.hidpp_long is None and (self.bluetooth or self._protocol is not None and self._protocol >= 2.0) ) + # Centurion child: CPL framing strips devnumber and responses always + # have devnumber=0xFF, so we must send 0xFF to match responses. + devnumber = 0xFF if (self.centurion and self.receiver and not self.handle) else self.number return self.low_level.request( self.handle or (self.receiver.handle if self.receiver else None), - self.number, + devnumber, request_id, *params, no_reply=no_reply, @@ -549,11 +615,231 @@ class Device: def feature_request(self, feature, function=0x00, *params, no_reply=False): if self.protocol >= 2.0: + if self.centurion: + # Ensure sub-device features are discovered before routing decision + if self.features is not None: + self.features._check() + if feature in getattr(self, "_centurion_sub_features", ()): + sub_idx = self.features.get(feature) + if sub_idx is not None: + return self.centurion_bridge_request(sub_idx, function, *params, no_reply=no_reply) return hidpp20.feature_request(self, feature, function, *params, no_reply=no_reply) + # Max sub-message bytes in the first bridge fragment: + # 64 - 1 (report ID) - 1 (cpl_len) - 1 (flags) - 2 (bridge prefix) - 2 (bridge hdr) = 57 + # LGHUB uses 56 for first fragment (60 byte payload - 4 bridge overhead) + _BRIDGE_FIRST_CHUNK = 56 + # Continuation fragments carry raw sub_msg data (no bridge prefix/hdr): + # 64 - 1 (report ID) - 1 (cpl_len) - 1 (flags) = 61, but LGHUB uses 60 + _BRIDGE_CONT_CHUNK = 60 + + def centurion_bridge_request(self, sub_feat_idx, sub_function=0x00, *params, no_reply=False): + """Send a request to a Centurion sub-device via CentPPBridge. + + Builds the 4-layer nested message: + Layer 1: [0x51] + Layer 2: [cpl_length, flags] + Layer 3: [bridge_idx, sendFragment_func|swid, bridge_hdr...] + Layer 4: [sub_cpl=0x00, sub_feat_idx, sub_func|swid, params...] + + For multi-fragment sends, only the first fragment includes the bridge + prefix and header. Continuation fragments carry raw sub_msg data. + The CPL flags byte encodes fragment index and continuation: + flags = (fragment_index << 1) | (1 if more_fragments else 0) + Single-frame messages use flags=0x00. + + Returns the sub-device response data (after bridge header), or None. + """ + if not getattr(self, "centurion", False): + raise ValueError("centurion_bridge_request called on non-Centurion device") + bridge_idx = getattr(self, "_centurion_bridge_index", None) + if bridge_idx is None: + raise ValueError("CentPPBridge index not discovered yet") + handle = self.handle or (self.receiver.handle if self.receiver else None) + if not handle: + return None + + sw_id = base._get_next_sw_id() + + # Build sub-device message: [sub_cpl=0x00, sub_feat_idx, sub_func|swid, params...] + # sub_function is in standard HID++ format: func_number << 4 (e.g. 0x10 for function 1) + sub_params = b"".join(struct.pack("B", p) if isinstance(p, int) else p for p in params) if params else b"" + sub_msg = struct.pack("BBB", 0x00, sub_feat_idx, (sub_function & 0xF0) | sw_id) + sub_params + + # Build bridge header: [device_id<<4 | len_hi, len_lo] + # device_id=0 for the headset, len is the total sub-message length + sub_len = len(sub_msg) + bridge_hdr = struct.pack("BB", (0x00 << 4) | ((sub_len >> 8) & 0x0F), sub_len & 0xFF) + bridge_prefix = struct.pack("BB", bridge_idx, (0x01 << 4) | sw_id) + + timeout = base.DEFAULT_TIMEOUT + with base.acquire_timeout(base.handle_lock(handle), handle, timeout): + if sub_len <= self._BRIDGE_FIRST_CHUNK: + # Single-frame path + layer3 = bridge_prefix + bridge_hdr + sub_msg + base.write_centurion_cpl(handle, layer3) + else: + # Multi-fragment send + # Fragment 0: bridge_prefix + bridge_hdr + first chunk of sub_msg + # Fragments 1+: raw sub_msg continuation data (no bridge overhead) + # CPL flags = (frag_index << 1) | (1 if more_fragments else 0) + # All fragments are sent back-to-back without waiting for + # intermediate ACKs (verified via LGHUB pcap). The device + # reassembles internally and sends a single ACK + MessageEvent + # after the last fragment. + frag_index = 0 + offset = 0 + while offset < sub_len: + if frag_index == 0: + chunk_size = self._BRIDGE_FIRST_CHUNK + chunk = sub_msg[offset : offset + chunk_size] + layer3 = bridge_prefix + bridge_hdr + chunk + else: + chunk_size = self._BRIDGE_CONT_CHUNK + chunk = sub_msg[offset : offset + chunk_size] + layer3 = chunk + has_more = (offset + chunk_size) < sub_len + flags = (frag_index << 1) | (1 if has_more else 0) + base.write_centurion_cpl(handle, layer3, flags=flags) + offset += len(chunk) + frag_index += 1 + + if no_reply: + return None + + # Read ACK + MessageEvent response + request_started = time.time() + ack_received = False + while time.time() - request_started < timeout: + reply = base._read(handle, timeout) + if not reply: + continue + _report_id, _devnumber, reply_data = reply + # ACK: short response echoing feat_idx and func|swid + if len(reply_data) >= 2 and reply_data[0] == bridge_idx: + func_sw = reply_data[1] + if (func_sw >> 4) == 0x01 and (func_sw & 0x0F) == sw_id: + ack_received = True + break + if (func_sw >> 4) == 0x01 and (func_sw & 0x0F) == 0: + # MessageEvent arrived before ACK — validate it's for our request + if self._is_bridge_response_for(reply_data, sub_feat_idx): + if logger.isEnabledFor(logging.DEBUG): + logger.debug("bridge idx=%d fn=0x%02X -> OK", sub_feat_idx, sub_function) + return self._parse_bridge_response(reply_data) + # Unsolicited notification, skip it + if not ack_received: + logger.warning("centurion_bridge_request: no ACK received") + return None + + # Read MessageEvent response (bridge function 1 with SW ID 0 = event) + while time.time() - request_started < timeout: + reply = base._read(handle, timeout) + if not reply: + continue + _report_id, _devnumber, reply_data = reply + if len(reply_data) >= 2 and reply_data[0] == bridge_idx: + func_sw = reply_data[1] + if (func_sw >> 4) == 0x01 and (func_sw & 0x0F) == 0: + if self._is_bridge_response_for(reply_data, sub_feat_idx): + if logger.isEnabledFor(logging.DEBUG): + logger.debug("bridge idx=%d fn=0x%02X -> OK", sub_feat_idx, sub_function) + return self._parse_bridge_response(reply_data) + # Unsolicited notification for a different feature, skip it + logger.warning("centurion_bridge_request: no MessageEvent received") + return None + + @staticmethod + def _wait_for_bridge_ack(handle, bridge_idx, sw_id, timeout): + """Wait for a bridge ACK response between multi-fragment sends.""" + started = time.time() + while time.time() - started < timeout: + reply = base._read(handle, timeout) + if not reply: + continue + _report_id, _devnumber, reply_data = reply + if len(reply_data) >= 2 and reply_data[0] == bridge_idx: + func_sw = reply_data[1] + if (func_sw >> 4) == 0x01 and (func_sw & 0x0F) == sw_id: + return True + return False + + @staticmethod + def _is_bridge_response_for(reply_data, expected_sub_feat_idx): + """Check if a bridge MessageEvent is a response for our specific sub-feature request. + + Accepts both normal responses (sub_feat_idx matches) and error responses + (sub_feat_idx=0xFF with original feat_idx in next byte). + Unsolicited notifications (sub_cpl=0xFF) are rejected. + """ + if len(reply_data) < 6: + return False + sub_cpl = reply_data[4] + sub_feat_idx = reply_data[5] + # Notifications have sub_cpl=0xFF; our responses have sub_cpl=0x00 + if sub_cpl != 0x00: + return False + if sub_feat_idx == expected_sub_feat_idx: + return True + # Error response: sub_feat_idx=0xFF, next byte is the original feat_idx that errored + if sub_feat_idx == 0xFF and len(reply_data) >= 7 and reply_data[6] == expected_sub_feat_idx: + return True + return False + + @staticmethod + def _parse_bridge_response(reply_data): + """Extract sub-device response from a CentPPBridge MessageEvent. + + reply_data layout (after report_id and devnumber have been stripped): + [bridge_idx, func_sw, dev_id<<4|len_hi, len_lo, sub_cpl, sub_feat_idx, sub_func_sw, data...] + Returns the sub-device data starting from sub_feat_idx onward. + + Error responses have sub_feat_idx=0xFF: [... sub_cpl, 0xFF, orig_feat_idx, orig_func_sw, error_code] + These return None. + """ + if len(reply_data) < 7: + return None + sub_feat_idx = reply_data[5] + # Error response from sub-device + if sub_feat_idx == 0xFF: + error_code = reply_data[8] if len(reply_data) > 8 else 0 + orig_feat_idx = reply_data[6] if len(reply_data) > 6 else 0 + logger.debug("bridge sub-device error: feat_idx=%d error=0x%02X", orig_feat_idx, error_code) + return None + return reply_data[7:] # response data after sub_cpl, sub_feat_idx, sub_func_sw + + def _record_ping_protocol(self, handle, protocol): + """Record a successful ping's protocol version, including raw Centurion (major, minor).""" + self._protocol = protocol + cent_ver = base._centurion_protocol_versions.get(int(handle)) + if cent_ver: + self._centurion_protocol = cent_ver + def ping(self): """Checks if the device is online and present, returns True of False. Some devices are integral with their receiver but may not be present even if the receiver responds to ping.""" + if self.centurion and self.receiver and not self.handle: + # Centurion child: first check if dongle is reachable + handle = self.receiver.handle + try: + protocol = self.low_level.ping(handle, 0xFF, long_message=True) + except exceptions.NoReceiver: + self.online = False + return False + if protocol: + self._record_ping_protocol(handle, protocol) + # Dongle responded — now check if headset is actually on by probing through bridge. + # Send ROOT.GetFeature(0x0001) to the sub-device via CentPPBridge. + bridge_idx = getattr(self, "_centurion_bridge_index", None) + if bridge_idx is not None: + try: + result = self.centurion_bridge_request(0, 0x00, 0x00, 0x01) + self.online = result is not None and self.present + except Exception: + self.online = False + else: + self.online = False + return self.online long = self.hidpp_long is True or ( self.hidpp_long is None and (self.bluetooth or self._protocol is not None and self._protocol >= 2.0) ) @@ -564,7 +850,7 @@ class Device: protocol = None self.online = protocol is not None and self.present if protocol: - self._protocol = protocol + self._record_ping_protocol(handle, protocol) if logger.isEnabledFor(logging.DEBUG): logger.debug("pinged %s: online %s protocol %s present %s", self.number, self.online, protocol, self.present) return self.online @@ -614,3 +900,8 @@ class Device: def __del__(self): self.close() + + +# Re-export from centurion.py — must be after Device class to avoid circular import +from .centurion import CenturionReceiver # noqa: E402,F401 +from .centurion import create_centurion_receiver # noqa: E402,F401 diff --git a/lib/logitech_receiver/hidpp20.py b/lib/logitech_receiver/hidpp20.py index c966981d..f5813836 100644 --- a/lib/logitech_receiver/hidpp20.py +++ b/lib/logitech_receiver/hidpp20.py @@ -35,10 +35,13 @@ import yaml from solaar.i18n import _ from typing_extensions import Protocol +from . import centurion as _centurion from . import common from . import exceptions from . import hidpp10_constants from . import special_keys +from .centurion_constants import CenturionCoreFeature +from .centurion_constants import resolve_feature from .common import Battery from .common import BatteryLevelApproximation from .common import BatteryStatus @@ -139,6 +142,7 @@ class FeaturesArray(dict): self.supported = True # Actually don't know whether it is supported yet self.device = device self.inverse = {} + self.sub_inverse = {} self.version = {} self.flags = {} self.count = 0 @@ -162,14 +166,105 @@ class FeaturesArray(dict): logger.warning("FEATURE_SET found, but failed to read features count") return False else: - self.count = count[0] + 1 # ROOT feature not included in count self[SupportedFeature.ROOT] = 0 self[SupportedFeature.FEATURE_SET] = fs_index + if getattr(self.device, "centurion", False): + self._check_centurion(fs_index, count) + else: + self.count = count[0] + 1 # ROOT feature not included in count return True else: self.supported = False return False + def _check_centurion(self, fs_index, count_response): + """Enumerate features on a Centurion device (parent + sub-device via CentPPBridge). + + Phase A: Enumerate parent device features via CenturionFeatureSet. + Find the CentPPBridge index (feature ID 0x0003 on Centurion = CentPPBridge). + Phase B: Route through CentPPBridge to discover sub-device features. + Use CenturionFeatureSet bulk query to get all sub-device features. + Store sub-device features keyed by SupportedFeature enum. + """ + # Phase A: Parent features + feature_count = count_response[0] # includes ROOT on Centurion + self.count = feature_count + bridge_index = None + for index in range(feature_count): + if self.inverse.get(index) is not None: + continue # already registered (ROOT=0, FEATURE_SET=fs_index) + response = self.device.request((fs_index << 8) | 0x10, index) + if response is None or len(response) < 3: + continue + # Centurion FeatureSet response: [remaining_count, feat_hi, feat_lo, type, flags] + feat_id = struct.unpack("!H", response[1:3])[0] + feature = resolve_feature(feat_id, centurion=True) + if feature is None: + feature = f"unknown:{feat_id:04X}" + self[feature] = index + self.inverse[index] = feature + if feature is CenturionCoreFeature.CENT_PP_BRIDGE: + bridge_index = index + + if bridge_index is not None: + self.device._centurion_bridge_index = bridge_index + self.device._centurion_sub_features = set() + self.device._centurion_sub_indices = {} + self._discover_sub_device_features(bridge_index) + + def _discover_sub_device_features(self, bridge_index): + """Phase B: Discover sub-device features via CentPPBridge. + + Uses CenturionFeatureSet bulk query (function 1, index 0) routed through + the bridge to get all sub-device features at once. + """ + # First, find the sub-device's FeatureSet index via CenturionRoot (sub_feat_idx=0) + # Query: CenturionRoot.GetFeature(0x0001) to find FeatureSet index on sub-device + fs_id_hi = (SupportedFeature.FEATURE_SET >> 8) & 0xFF + fs_id_lo = SupportedFeature.FEATURE_SET & 0xFF + response = self.device.centurion_bridge_request(0x00, 0x00, fs_id_hi, fs_id_lo) + if response is None or len(response) < 1: + logger.warning("Failed to find FeatureSet on Centurion sub-device") + return + sub_fs_index = response[0] + if sub_fs_index == 0: + logger.warning("Sub-device FeatureSet not found (index=0)") + return + + # Bulk enumerate: CenturionFeatureSet.GetFeatureId(func=1=0x10, start_index=0) + # Response: [count, (feat_hi, feat_lo, type, flags) × count] + response = self.device.centurion_bridge_request(sub_fs_index, 0x10, 0x00) + if response is None or len(response) < 1: + logger.warning("Failed to enumerate sub-device features") + return + + entry_count = response[0] + entries = response[1:] + sub_feat_idx = 0 # sub-device feature indices start at 0 + for i in range(entry_count): + offset = i * 4 + if offset + 2 > len(entries): + break + feat_id = struct.unpack("!H", entries[offset : offset + 2])[0] + try: + feature = SupportedFeature(feat_id) + except ValueError: + feature = f"unknown:{feat_id:04X}" + # Store sub-device index for ALL features (including parent overlaps) + # This enables querying the sub-device's copy of shared features via bridge + self.device._centurion_sub_indices[feature] = sub_feat_idx + # Only store unique sub-device features in dict (skip parent overlaps like ROOT, FEATURE_SET) + # This avoids clobbering parent inverse entries via __setitem__ + if dict.get(self, feature) is None: + dict.__setitem__(self, feature, sub_feat_idx) + self.device._centurion_sub_features.add(feature) + # Always store in sub_inverse for sub-device enumerate/display + self.sub_inverse[sub_feat_idx] = feature + if logger.isEnabledFor(logging.DEBUG): + logger.debug("Centurion sub-device feature: %s at sub-index %d", feature, sub_feat_idx) + sub_feat_idx += 1 + self._sub_feature_count = sub_feat_idx + def get_feature(self, index: int) -> SupportedFeature | None: feature = self.inverse.get(index) if feature is not None: @@ -178,7 +273,14 @@ class FeaturesArray(dict): feature = self.inverse.get(index) if feature is not None: return feature - response = self.device.feature_request(SupportedFeature.FEATURE_SET, 0x10, index) + # On Centurion devices, all features are discovered upfront (parent + sub-device) + if getattr(self.device, "centurion", False): + return None + try: + response = self.device.feature_request(SupportedFeature.FEATURE_SET, 0x10, index) + except exceptions.FeatureCallError: + logger.warning("failed to retrieve feature at index %d", index) + return None if response: data = struct.unpack("!H", response[:2])[0] try: @@ -194,7 +296,14 @@ class FeaturesArray(dict): if self._check(): for index in range(self.count): feature = self.get_feature(index) - yield feature, index + if feature is not None: + yield feature, index + # Also yield sub-device features for Centurion devices + sub_count = getattr(self, "_sub_feature_count", 0) + for sub_idx in range(sub_count): + feature = self.sub_inverse.get(sub_idx) + if feature is not None: + yield feature, sub_idx def get_feature_version(self, feature: NamedInt) -> Optional[int]: if self[feature]: @@ -224,7 +333,10 @@ class FeaturesArray(dict): index = super().get(feature) if index is not None: return index - response = self.device.request(0x0000, struct.pack("!H", feature)) + try: + response = self.device.request(0x0000, struct.pack("!H", feature)) + except exceptions.FeatureCallError: + return None if response: index = response[0] self[feature] = index if index else False @@ -243,7 +355,7 @@ class FeaturesArray(dict): raise ValueError("Don't delete features from FeatureArray") def __len__(self) -> int: - return self.count + return self.count + getattr(self, "_sub_feature_count", 0) __bool__ = __nonzero__ = _check @@ -1575,6 +1687,27 @@ class Hidpp20: fw.append(fw_info) return tuple(fw) + def get_firmware_centurion(self, device): + return _centurion.get_firmware_centurion(device) + + def get_serial_centurion(self, device): + return _centurion.get_serial_centurion(device) + + def get_hardware_info_centurion(self, device): + return _centurion.get_hardware_info_centurion(device) + + def _centurion_sub_device_info_request(self, device, function=0x00, *params): + return _centurion._centurion_sub_device_info_request(device, function, *params) + + def get_firmware_centurion_sub(self, device): + return _centurion.get_firmware_centurion_sub(device) + + def get_serial_centurion_sub(self, device): + return _centurion.get_serial_centurion_sub(device) + + def get_hardware_info_centurion_sub(self, device): + return _centurion.get_hardware_info_centurion_sub(device) + def get_ids(self, device): """Reads a device's ids (unit and model numbers)""" ids = device.feature_request(SupportedFeature.DEVICE_FW_VERSION) @@ -1626,6 +1759,9 @@ class Hidpp20: return name.decode("utf-8") + def get_name_centurion(self, device): + return _centurion.get_name_centurion(device) + def get_friendly_name(self, device: Device): """Reads a device's friendly name. @@ -1670,6 +1806,9 @@ class Hidpp20: except exceptions.FeatureCallError: return SupportedFeature.ADC_MEASUREMENT if SupportedFeature.ADC_MEASUREMENT in device.features else None + def get_battery_centurion(self, device: Device): + return _centurion.get_battery_centurion(device) + def get_battery(self, device, feature): """Return battery information - feature, approximate level, next, charging, voltage or battery feature if there is one but it is not responding or None for no battery feature""" @@ -1901,6 +2040,7 @@ battery_functions = { SupportedFeature.BATTERY_VOLTAGE: Hidpp20.get_battery_voltage, SupportedFeature.UNIFIED_BATTERY: Hidpp20.get_battery_unified, SupportedFeature.ADC_MEASUREMENT: Hidpp20.get_adc_measurement, + SupportedFeature.CENTURION_BATTERY_SOC: Hidpp20.get_battery_centurion, } @@ -1981,6 +2121,9 @@ def decipher_battery_unified(report) -> tuple[SupportedFeature, Battery]: return SupportedFeature.UNIFIED_BATTERY, Battery(discharge if discharge else approx_level, None, status, None) +decipher_battery_centurion = _centurion.decipher_battery_centurion + + def decipher_adc_measurement(report) -> tuple[SupportedFeature, Battery]: # partial implementation - needs mapping to levels adc_voltage, flags = struct.unpack("!HB", report[:3]) @@ -2120,3 +2263,10 @@ class ForceSensingButtonArray(UserDict): def acceptable_current_key(self, index: int, value: int) -> bool: return self[index].acceptable(value) + + +# --- OnboardEQ (0x0636) — re-exported from onboard_eq.py --- +from .onboard_eq import _build_set_eq_payload # noqa: E402, F401 +from .onboard_eq import get_onboard_eq_info # noqa: E402, F401 +from .onboard_eq import get_onboard_eq_params # noqa: E402, F401 +from .onboard_eq import set_onboard_eq_params # noqa: E402, F401 diff --git a/lib/logitech_receiver/hidpp20_constants.py b/lib/logitech_receiver/hidpp20_constants.py index 9f55ce21..be0082c1 100644 --- a/lib/logitech_receiver/hidpp20_constants.py +++ b/lib/logitech_receiver/hidpp20_constants.py @@ -179,6 +179,60 @@ class SupportedFeature(IntEnum): SIDETONE = 0x8300 EQUALIZER = 0x8310 HEADSET_OUT = 0x8320 + # Centurion core + CENTURION_DEVICE_INFO = 0x0100 + CENTURION_DEVICE_NAME = 0x0101 + CENTURION_ROOT = 0x0102 + CENTURION_MEMFAULT = 0x0103 + CENTURION_BATTERY_SOC = 0x0104 + CENTURION_AUTO_SLEEP = 0x0108 + CENTURION_GENERIC_DFU = 0x010A + CENTURION_LED_BRIGHTNESS = 0x0110 + CENTURION_EU_POWER_MODE = 0x0115 + CENTURION_DEVICE_BOOL_STATE = 0x0116 + # Headsets (Centurion-era) + HEADSET_VOLUME = 0x0200 + HEADSET_EQ = 0x0201 + HEADSET_ADVANCED_PARA_EQ = 0x020D + HEADSET_MIC_TEST = 0x020E + HEADSET_EQ_STYLES = 0x0213 + BT_HOST_INFO = 0x0305 + LIGHTSPEED_PAIRING = 0x0309 + BT_GAMING_MODE = 0x030A + HEADSET_RGB_EFFECTS = 0x0600 + HEADSET_MIC_MUTE = 0x0601 + HEADSET_MIC_SNR = 0x0602 + HEADSET_AUDIO_SIDETONE = 0x0604 + HEADSET_HOST_SWITCH = 0x0607 + HEADSET_MIX = 0x0609 + HEADSET_TONES = 0x060B + HEADSET_NOISE_EXPOSURE = 0x060D + HEADSET_AI_NOISE_REDUCTION = 0x060E + HEADSET_MIC_GAIN = 0x0611 + HEADSET_USAGE_TRACKING = 0x0617 + HEADSET_BATTERY_SAVER = 0x0618 + HEADSET_RGB_HOSTMODE = 0x0620 + HEADSET_RGB_ONBOARD_EFFECTS = 0x0621 + HEADSET_RGB_SIGNATURE_EFFECTS = 0x0622 + HEADSET_DO_NOT_DISTURB = 0x0631 + CENTURION_ONBOARD_PROFILES = 0x0634 + HEADSET_RGB_STREAMING = 0x0635 + HEADSET_ONBOARD_EQ = 0x0636 + # Audio mixing / LogiVoice + MIXER_AUDIO = 0x0800 + MIXER_MIC = 0x0801 + LOGIVOICE = 0x0900 + LOGIVOICE_NOISE_REDUCTION = 0x0901 + LOGIVOICE_NOISE_GATE = 0x0902 + LOGIVOICE_COMPRESSOR = 0x0903 + LOGIVOICE_DE_ESSER = 0x0904 + LOGIVOICE_DE_POPPER = 0x0905 + LOGIVOICE_LIMITER = 0x0906 + LOGIVOICE_HIGH_PASS_FILTER = 0x0907 + LOGIVOICE_EQUALIZER = 0x0908 + LOGIVOICE_AINR = 0x0909 + METERING = 0x0B01 + MIC_GAIN_AUTO_MODE = 0x0B02 # Fake features for Solaar internal use MOUSE_GESTURE = 0xFE00 diff --git a/lib/logitech_receiver/listener.py b/lib/logitech_receiver/listener.py index 7de6bb52..4137afd4 100644 --- a/lib/logitech_receiver/listener.py +++ b/lib/logitech_receiver/listener.py @@ -52,6 +52,9 @@ class _ThreadedHandle: else: # if logger.isEnabledFor(logging.DEBUG): # logger.debug("%r opened new handle %d", self, handle) + # If original handle was centurion, register new per-thread handle too + if any(h in base._centurion_handles for h in self._handles): + base._centurion_handles.add(handle) self._local.handle = handle self._handles.append(handle) return handle @@ -145,7 +148,8 @@ class EventsListener(threading.Thread): self.receiver.close() break if n: - n = base.make_notification(*n) + report_id, devnumber, data = n + n = base.make_notification(report_id, devnumber, data) else: n = self._queued_notifications.get() # deliver any queued notifications if n: diff --git a/lib/logitech_receiver/notifications.py b/lib/logitech_receiver/notifications.py index 1cc46248..693e3afd 100644 --- a/lib/logitech_receiver/notifications.py +++ b/lib/logitech_receiver/notifications.py @@ -287,6 +287,9 @@ def _process_feature_notification(device: Device, notification: HIDPPNotificatio else: logger.warning("%s: unknown ADC MEASUREMENT %s", device, notification) + elif feature == SupportedFeature.CENTURION_BATTERY_SOC: + device.set_battery_info(hidpp20.decipher_battery_centurion(notification.data)[1]) + elif feature == SupportedFeature.SOLAR_DASHBOARD: if notification.data[5:9] == b"GOOD": charge, lux, adc = struct.unpack("!BHH", notification.data[:5]) diff --git a/lib/logitech_receiver/onboard_eq.py b/lib/logitech_receiver/onboard_eq.py new file mode 100644 index 00000000..107b11f6 --- /dev/null +++ b/lib/logitech_receiver/onboard_eq.py @@ -0,0 +1,186 @@ +## Copyright (C) 2012-2013 Daniel Pavel +## Copyright (C) 2014-2024 Solaar Contributors https://pwr-solaar.github.io/Solaar/ +## +## This program is free software; you can redistribute it and/or modify +## it under the terms of the GNU General Public License as published by +## the Free Software Foundation; either version 2 of the License, or +## (at your option) any later version. +## +## This program is distributed in the hope that it will be useful, +## but WITHOUT ANY WARRANTY; without even the implied warranty of +## MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +## GNU General Public License for more details. +## +## You should have received a copy of the GNU General Public License along +## with this program; if not, write to the Free Software Foundation, Inc., +## 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. + +"""OnboardEQ (0x0636) biquad coefficient math and payload builders. + +Pure computation — no device or transport dependencies beyond feature_request(). +""" + +from __future__ import annotations + +import math +import struct + +from .hidpp20_constants import SupportedFeature + +# Mystery bytes observed in every LGHUB pcap EQ write between band params +# and coefficient header. Purpose not fully understood — possibly a null-band +# terminator for DSPs that support >5 bands (advanced 10-band mode). +# First byte matches band_count; bytes 2-3 look like LE16 coeff blob size. +# Hardcoded from pcap for initial bring-up; revisit once device-tested. +_EQ_MYSTERY_BYTES = b"\x05\x5a\xe3\x00" + + +def _peaking_eq_biquad(freq_hz, gain_db, Q, sample_rate=48000.0): + """Compute peaking EQ biquad coefficients (Audio EQ Cookbook). + + Returns (b0/a0, b1/a0, b2/a0, a1/a0, a2/a0) normalised coefficients. + """ + A = 10.0 ** (gain_db / 40.0) + w0 = 2.0 * math.pi * freq_hz / sample_rate + cos_w0 = math.cos(w0) + alpha = math.sin(w0) / (2.0 * Q) + a0 = 1.0 + alpha / A + return ( + (1.0 + alpha * A) / a0, + (-2.0 * cos_w0) / a0, + (1.0 - alpha * A) / a0, + (-2.0 * cos_w0) / a0, + (1.0 - alpha / A) / a0, + ) + + +def _quantize_coeffs(b0, b1, b2, a1, a2): + """Quantize biquad coefficients to mixed Q1.31 / Q2.30 fixed-point. + + b0, b2, a2 use Q1.31 (x 2^31); b1, a1 use Q2.30 (x 2^30). + Values are truncated to 24-bit precision (low byte zeroed) matching + the device DSP's internal format. + Returns list of 10 uint16 values (5 coefficients x 2 LE words each, + high word first). + """ + scales = [2**31, 2**30, 2**31, 2**30, 2**31] # b0, b1, b2, a1, a2 + words = [] + for val, scale in zip([b0, b1, b2, a1, a2], scales): + q = int(round(val * scale)) + q = max(-(1 << 31), min((1 << 31) - 1, q)) + q = q & 0xFFFFFF00 # 24-bit precision (low byte always zero) + words.append((q >> 16) & 0xFFFF) # high word + words.append(q & 0xFFFF) # low word + return words + + +def _build_coeff_section(bands, sample_rate, section_type=1): + """Build one coefficient section for a DSP processing block. + + Returns bytes: 4-byte section header + coefficient data as LE uint16 words. + Section header: [type, 0x00, count_lo, count_hi]. + + Coefficients are normalized by a rescale factor to prevent Q1.31 overflow. + Only feedforward coefficients (b0, b1, b2) are divided by rescale; feedback + coefficients (a1, a2) are left unchanged. The DSP multiplies the output by + rescale to restore correct gain. + """ + _HEADROOM = 1.19 # 19% headroom margin (matches LGHUB) + num_bands = len(bands) + all_words = [num_bands] # first uint16 = num_bands + + # First pass: compute raw biquad coefficients for all bands + raw_coeffs = [] + for freq, gain, Q in bands: + raw_coeffs.append(_peaking_eq_biquad(freq, gain, max(Q, 0.1), sample_rate)) + + # Compute rescale: ensure max |b0| fits in Q1.31 with headroom + max_b0 = max(abs(c[0]) for c in raw_coeffs) + rescale = max(1.0, max_b0) * _HEADROOM + + # Second pass: normalize b-coefficients and quantize + for b0, b1, b2, a1, a2 in raw_coeffs: + all_words.extend(_quantize_coeffs(b0 / rescale, b1 / rescale, b2 / rescale, a1, a2)) + + # Rescale factor as Q6.26, 24-bit precision + rs = int(round(rescale * (1 << 26))) + rs = max(-(1 << 31), min((1 << 31) - 1, rs)) & 0xFFFFFF00 + all_words.append((rs >> 16) & 0xFFFF) + all_words.append(rs & 0xFFFF) + + coeff_count = num_bands * 10 + 3 # num_bands word + 10 per band + 2 rescale words + hdr = bytes([section_type, 0x00, coeff_count & 0xFF, (coeff_count >> 8) & 0xFF]) + data = struct.pack(f"<{len(all_words)}H", *all_words) + return hdr + data + + +def _build_eq_coeffs_payload(bands): + """Build the full EQCoeffs wire payload for SetEQParameters. + + Two coefficient sections: type=1 (48 kHz playback) and type=2 (16 kHz mic). + Returns bytes: 7-byte header + sections (no trailing padding). + """ + section_count = 2 + header = bytes([0x03, 0x0E, 0x00, section_count, 0x00, 0x00, 0x00]) + sections = _build_coeff_section(bands, 48000.0, section_type=1) + sections += _build_coeff_section(bands, 16000.0, section_type=2) + return header + sections + + +def _build_set_eq_payload(slot, bands): + """Build complete SetEQParameters payload: band params + biquad coefficients. + + bands: list of (freq_hz, gain_db, Q) tuples. + Returns bytes ready to send as sub-device params. + """ + params = bytes([slot, len(bands)]) + for freq, gain, Q in bands: + params += struct.pack(">H", freq) + bytes([gain & 0xFF, Q & 0xFF]) + params += _EQ_MYSTERY_BYTES + params += _build_eq_coeffs_payload(bands) + return params + + +def get_onboard_eq_info(device): + """Query HEADSET_ONBOARD_EQ GetEQInfos (function 0). + + Returns (has_hw_eq, num_bands) or None. + """ + result = device.feature_request(SupportedFeature.HEADSET_ONBOARD_EQ, 0x00) + if result is None or len(result) < 5: + return None + has_hw_eq = bool(result[0] & 0x80) + num_bands = result[4] + return (has_hw_eq, num_bands) + + +def get_onboard_eq_params(device, slot=0x00): + """Query HEADSET_ONBOARD_EQ GetEQParameters (function 0x10). + + Returns list of (freq_hz, gain_db, q) tuples, or None. + """ + result = device.feature_request(SupportedFeature.HEADSET_ONBOARD_EQ, 0x10, slot) + if result is None or len(result) < 2: + return None + band_count = result[1] + bands = [] + offset = 2 + for _i in range(band_count): + if offset + 4 > len(result): + break + freq_hz = struct.unpack(">H", result[offset : offset + 2])[0] + gain_db = struct.unpack("b", bytes([result[offset + 2]]))[0] # signed + q = result[offset + 3] + bands.append((freq_hz, gain_db, q)) + offset += 4 + return bands + + +def set_onboard_eq_params(device, bands, slot=0x00): + """Send HEADSET_ONBOARD_EQ SetEQParameters (function 0x20). + + bands: list of (freq_hz, gain_db, Q) tuples. + Returns response or None. + """ + payload = _build_set_eq_payload(slot, bands) + return device.feature_request(SupportedFeature.HEADSET_ONBOARD_EQ, 0x20, payload) diff --git a/lib/logitech_receiver/settings.py b/lib/logitech_receiver/settings.py index 25257910..a27d7b06 100644 --- a/lib/logitech_receiver/settings.py +++ b/lib/logitech_receiver/settings.py @@ -27,6 +27,7 @@ from solaar.i18n import _ from . import common from . import hidpp20_constants from . import settings_validator +from .centurion_constants import CenturionCoreFeature from .common import NamedInt logger = logging.getLogger(__name__) @@ -42,6 +43,7 @@ class Kind(IntEnum): MAP_CHOICE = 0x0A MULTIPLE_TOGGLE = 0x10 PACKED_RANGE = 0x20 + GRAPHIC_EQ = 0x21 MULTIPLE_RANGE = 0x40 HETERO = 0x80 MAP_RANGE = 0x102 @@ -627,7 +629,7 @@ class FeatureRW: read_prefix=b"", no_reply=False, ): - assert isinstance(feature, hidpp20_constants.SupportedFeature) + assert isinstance(feature, (hidpp20_constants.SupportedFeature, CenturionCoreFeature)) self.feature = feature self.read_fnid = read_fnid self.write_fnid = write_fnid @@ -664,7 +666,7 @@ class FeatureRWMap(FeatureRW): key_byte_count=default_key_byte_count, no_reply=False, ): - assert isinstance(feature, hidpp20_constants.SupportedFeature) + assert isinstance(feature, (hidpp20_constants.SupportedFeature, CenturionCoreFeature)) self.feature = feature self.read_fnid = read_fnid self.write_fnid = write_fnid diff --git a/lib/logitech_receiver/settings_templates.py b/lib/logitech_receiver/settings_templates.py index a2dff87e..ddab8b0a 100644 --- a/lib/logitech_receiver/settings_templates.py +++ b/lib/logitech_receiver/settings_templates.py @@ -1584,6 +1584,190 @@ class ADCPower(settings.Setting): validator_options = {"byte_count": 1} +class HeadsetEcoMode(settings.Setting): + name = "headset-eco-mode" + label = _("Eco Mode") + description = _("Battery saver mode.") + feature = _F.HEADSET_BATTERY_SAVER + validator_class = settings_validator.BooleanValidator + + +class HeadsetDoNotDisturb(settings.Setting): + name = "headset-do-not-disturb" + label = _("Do Not Disturb") + description = _("Suppress notification sounds.") + feature = _F.HEADSET_DO_NOT_DISTURB + validator_class = settings_validator.BooleanValidator + + +class HeadsetMicMute(settings.Setting): + name = "headset-mic-mute" + label = _("Mic Mute") + description = _("Mute the microphone.") + feature = _F.HEADSET_MIC_MUTE + validator_class = settings_validator.BooleanValidator + + +class HeadsetMicSNR(settings.Setting): + name = "headset-mic-snr" + label = _("Mic SNR") + description = _("Microphone signal-to-noise ratio enhancement.") + feature = _F.HEADSET_MIC_SNR + validator_class = settings_validator.BooleanValidator + + +class HeadsetAINR(settings.Setting): + name = "headset-ai-nr" + label = _("AI Noise Reduction") + description = _("Enable AI noise reduction.") + feature = _F.HEADSET_AI_NOISE_REDUCTION + validator_class = settings_validator.BooleanValidator + + +class HeadsetAINRLevel(settings.Setting): + name = "headset-ai-nr-level" + label = _("AI Noise Reduction Level") + description = _("AI noise reduction intensity.") + feature = _F.HEADSET_AI_NOISE_REDUCTION + rw_options = {"read_fnid": 0x20, "write_fnid": 0x30} + validator_class = settings_validator.ChoicesValidator + choices_universe = common.NamedInts(Off=0, Low=1, Medium=2, High=3) + + +class HeadsetSidetone(settings.Setting): + name = "headset-sidetone" + label = _("Headset Sidetone") + description = _("Sidetone level (0 = off, 100 = max).") + feature = _F.HEADSET_AUDIO_SIDETONE + rw_options = {"read_fnid": 0x00, "write_fnid": 0x10} + validator_class = settings_validator.RangeValidator + min_value = 0 + max_value = 100 + + @classmethod + def build(cls, device): + # Version <= 1: GetSidetone returns [mic_count, mic_id, level]; SetSidetone takes [mic_id, level] + # Version > 1: GetSidetone returns [mic_count, mic_id, reserved, level]; SetSidetone takes [mic_id, 0xFF, level] + version = device.features.get_feature_version(cls.feature) or 0 + if version > 1: + skip, prefix = 3, b"\x01\xff" + else: + skip, prefix = 2, b"\x01" + rw = settings.FeatureRW(cls.feature, **cls.rw_options) + validator = cls.validator_class.build(cls, device, read_skip_byte_count=skip, write_prefix_bytes=prefix) + if validator: + return cls(device, rw, validator) + + +class HeadsetMicGain(settings.Setting): + name = "headset-mic-gain" + label = _("Mic Gain") + description = _("Microphone gain level.") + feature = _F.HEADSET_MIC_GAIN + rw_options = {"read_fnid": 0x10, "write_fnid": 0x20} + validator_class = settings_validator.RangeValidator + min_value = -128 + max_value = 127 + validator_options = {"byte_count": 1, "signed": True} + + +class HeadsetMixBalance(settings.Setting): + name = "headset-mix-balance" + label = _("Audio Mix Balance") + description = _("Balance between game and chat audio.") + feature = _F.HEADSET_MIX + validator_class = settings_validator.RangeValidator + min_value = 0 + max_value = 255 + validator_options = {"byte_count": 1} + + +class HeadsetAutoSleep(settings.Setting): + name = "headset-auto-sleep" + label = _("Auto Sleep Timeout") + description = _("Idle time in minutes before the headset enters sleep mode (0 = disabled).") + feature = _F.CENTURION_AUTO_SLEEP + rw_options = {"read_fnid": 0x00, "write_fnid": 0x10} + validator_class = settings_validator.RangeValidator + min_value = 0 + max_value = 255 + validator_options = {"byte_count": 1} + + +class HeadsetOnboardEQ(settings.RangeFieldSetting): + name = "headset-onboard-eq" + label = _("Headset Equalizer") + description = _("Set equalizer levels.") + feature = _F.HEADSET_ONBOARD_EQ + rw_options = {"read_fnid": 0x10, "write_fnid": 0x20, "read_prefix": b"\x00"} + keys_universe = [] + + class validator_class(settings_validator.PackedRangeValidator): + kind = settings.Kind.GRAPHIC_EQ + + @classmethod + def build(cls, setting_class, device): + info = hidpp20.get_onboard_eq_info(device) + if not info: + return None + _has_hw_eq, num_bands = info + bands = hidpp20.get_onboard_eq_params(device, slot=0x00) + if not bands or len(bands) != num_bands: + return None + keys = common.NamedInts() + for i, (freq, _gain, _q) in enumerate(bands): + keys[i] = str(freq) + _("Hz") + v = cls(keys, min_value=-12, max_value=12, count=num_bands, byte_count=1) + v._band_freqs = [freq for freq, _g, _q in bands] + v._band_qs = [q for _f, _g, q in bands] + return v + + def validate_read(self, reply_bytes): + if reply_bytes is None or len(reply_bytes) < 2: + return {} + band_count = reply_bytes[1] + result = {} + offset = 2 + for i in range(band_count): + if offset + 4 > len(reply_bytes): + break + freq = struct.unpack(">H", reply_bytes[offset : offset + 2])[0] + gain = struct.unpack("b", bytes([reply_bytes[offset + 2]]))[0] + q = reply_bytes[offset + 3] + result[i] = gain + # Update stored freq/Q arrays if they exist + if hasattr(self, "_band_freqs") and i < len(self._band_freqs): + self._band_freqs[i] = freq + if hasattr(self, "_band_qs") and i < len(self._band_qs): + self._band_qs[i] = q + offset += 4 + return result + + def prepare_write(self, new_values): + if not hasattr(self, "_band_freqs") or not hasattr(self, "_band_qs"): + return None + bands = [] + for i in range(self.count): + freq = self._band_freqs[i] if i < len(self._band_freqs) else 1000 + q = self._band_qs[i] if i < len(self._band_qs) else 10 + gain = new_values.get(i, 0) + bands.append((freq, gain, q)) + self._pending_bands = bands # stash for persist step + return hidpp20._build_set_eq_payload(0x00, bands) + + def write(self, map, save=True): + result = super().write(map, save) + # Also persist to device flash (slot 0x80) so EQ survives power cycle + if result is not None and hasattr(self._validator, "_pending_bands"): + bands = self._validator._pending_bands + del self._validator._pending_bands + try: + self._device.feature_request(_F.HEADSET_ONBOARD_EQ, 0x20, hidpp20._build_set_eq_payload(0x80, bands)) + except Exception: + logger.warning("HeadsetOnboardEQ: failed to persist EQ to slot 0x80") + return result + + class BrightnessControl(settings.Setting): name = "brightness_control" label = _("Brightness Control") @@ -2063,6 +2247,17 @@ SETTINGS: list[settings.Setting] = [ Sidetone, Equalizer, ADCPower, + HeadsetEcoMode, + HeadsetDoNotDisturb, + HeadsetMicMute, + HeadsetMicSNR, + HeadsetAINR, + HeadsetAINRLevel, + HeadsetSidetone, + HeadsetMicGain, + HeadsetMixBalance, + HeadsetAutoSleep, + HeadsetOnboardEQ, ] diff --git a/lib/logitech_receiver/settings_validator.py b/lib/logitech_receiver/settings_validator.py index ddc164ff..d9e3f278 100644 --- a/lib/logitech_receiver/settings_validator.py +++ b/lib/logitech_receiver/settings_validator.py @@ -531,12 +531,13 @@ class RangeValidator(Validator): kwargs["max_value"] = setting_class.max_value return cls(**kwargs) - def __init__(self, min_value=0, max_value=255, byte_count=1, read_skip_byte_count=0, write_prefix_bytes=b""): + def __init__(self, min_value=0, max_value=255, byte_count=1, read_skip_byte_count=0, write_prefix_bytes=b"", signed=False): assert max_value > min_value self.min_value = min_value self.max_value = max_value self.read_skip_byte_count = read_skip_byte_count self.write_prefix_bytes = write_prefix_bytes + self._signed = signed self.needs_current_value = True # read and check before write (needed for ADC power and probably a good idea anyway) self._byte_count = math.ceil(math.log(max_value + 1, 256)) if byte_count: @@ -545,7 +546,9 @@ class RangeValidator(Validator): assert self._byte_count < 8 def validate_read(self, reply_bytes): - reply_value = common.bytes2int(reply_bytes[self.read_skip_byte_count : self.read_skip_byte_count + self._byte_count]) + reply_value = common.bytes2int( + reply_bytes[self.read_skip_byte_count : self.read_skip_byte_count + self._byte_count], signed=self._signed + ) assert reply_value >= self.min_value, f"{self.__class__.__name__}: failed to validate read value {reply_value:02X}" assert reply_value <= self.max_value, f"{self.__class__.__name__}: failed to validate read value {reply_value:02X}" return reply_value @@ -554,7 +557,7 @@ class RangeValidator(Validator): if new_value < self.min_value or new_value > self.max_value: raise ValueError(f"invalid choice {new_value!r}") current_value = self.validate_read(current_value) if current_value is not None else None - to_write = self.write_prefix_bytes + common.int2bytes(new_value, self._byte_count) + to_write = self.write_prefix_bytes + common.int2bytes(new_value, self._byte_count, signed=self._signed) # current value is known and same as value to be written return None to signal not to write it return None if current_value is not None and current_value == new_value else to_write diff --git a/lib/solaar/cli/__init__.py b/lib/solaar/cli/__init__.py index a6b6818c..99312a9b 100644 --- a/lib/solaar/cli/__init__.py +++ b/lib/solaar/cli/__init__.py @@ -128,7 +128,14 @@ def _receivers_and_devices(dev_path=None): continue try: if dev_info.isDevice: - d = device.create_device(base, dev_info) + if getattr(dev_info, "centurion", False): + d = device.create_centurion_receiver(base, dev_info) + if d is not None: + d.notify_devices() + else: + d = device.create_device(base, dev_info) + else: + d = device.create_device(base, dev_info) else: d = receiver.create_receiver(base, dev_info) diff --git a/lib/solaar/cli/show.py b/lib/solaar/cli/show.py index 79f9bfb9..d1a1f84c 100644 --- a/lib/solaar/cli/show.py +++ b/lib/solaar/cli/show.py @@ -25,6 +25,7 @@ from logitech_receiver import settings_templates from logitech_receiver.common import LOGITECH_VENDOR_ID from logitech_receiver.common import NamedInt from logitech_receiver.common import strhex +from logitech_receiver.device import CenturionReceiver from logitech_receiver.hidpp20_constants import SupportedFeature from solaar import NAME @@ -35,36 +36,80 @@ _hidpp20 = hidpp20.Hidpp20() def _print_receiver(receiver): + is_centurion = isinstance(receiver, CenturionReceiver) paired_count = receiver.count() print(receiver.name) print(" Device path :", receiver.path) print(f" USB id : {LOGITECH_VENDOR_ID:04x}:{receiver.product_id}") - print(" Serial :", receiver.serial) - pending = hidpp10.get_configuration_pending_flags(receiver) - if pending: - print(f" C Pending : {pending:02x}") + if is_centurion: + print(" Protocol : Centurion") + if receiver.serial: + print(" Serial :", receiver.serial) + if not is_centurion: + pending = hidpp10.get_configuration_pending_flags(receiver) + if pending: + print(f" C Pending : {pending:02x}") if receiver.firmware: for f in receiver.firmware: print(" %-11s: %s" % (f.kind, f.version)) - print(" Has", paired_count, f"paired device(s) out of a maximum of {int(receiver.max_devices)}.") - if receiver.remaining_pairings() and receiver.remaining_pairings() >= 0: - print(f" Has {int(receiver.remaining_pairings())} successful pairing(s) remaining.") + if is_centurion: + print(" Has", paired_count, f"device(s) out of a maximum of {int(receiver.max_devices)}.") + else: + print(" Has", paired_count, f"paired device(s) out of a maximum of {int(receiver.max_devices)}.") + if receiver.remaining_pairings() and receiver.remaining_pairings() >= 0: + print(f" Has {int(receiver.remaining_pairings())} successful pairing(s) remaining.") - notification_flags = _hidpp10.get_notification_flags(receiver) - if notification_flags is not None: - if notification_flags: - notification_names = hidpp10_constants.NotificationFlag.flag_names(notification_flags) - print(f" Notifications: {', '.join(notification_names)} (0x{notification_flags.value:06X})") + if is_centurion: + _print_centurion_dongle_features(receiver) + else: + notification_flags = _hidpp10.get_notification_flags(receiver) + if notification_flags is not None: + if notification_flags: + notification_names = hidpp10_constants.NotificationFlag.flag_names(notification_flags) + print(f" Notifications: {', '.join(notification_names)} (0x{notification_flags.value:06X})") + else: + print(" Notifications: (none)") + + activity = receiver.read_register(hidpp10_constants.Registers.DEVICES_ACTIVITY) + if activity: + activity = [(d, ord(activity[d - 1 : d])) for d in range(1, receiver.max_devices)] + activity_text = ", ".join(f"{int(d)}={int(a)}" for d, a in activity if a > 0) + print(" Device activity counters:", activity_text or "(empty)") + + +def _print_centurion_dongle_features(receiver): + """Print dongle-level features, probed independently on the dongle hardware.""" + features = receiver.dongle_features + if not features: + return + print(f" Supports {len(features)} dongle features:") + for feature, feat_id, index in features: + display_name = "CENTPP BRIDGE" if feat_id == 0x0003 else feature + feat_bytes = feat_id.to_bytes(2, byteorder="big") + try: + flags_resp = receiver.request(0x0000, feat_bytes[0], feat_bytes[1]) + except Exception: + flags_resp = None + if flags_resp is not None and len(flags_resp) >= 2: + flags = flags_resp[1] + flag_names = common.flag_names(hidpp20_constants.FeatureFlag, flags) + print(" %2d: %-22s {%04X} %s " % (index, display_name, feat_id, ", ".join(flag_names))) else: - print(" Notifications: (none)") - - activity = receiver.read_register(hidpp10_constants.Registers.DEVICES_ACTIVITY) - if activity: - activity = [(d, ord(activity[d - 1 : d])) for d in range(1, receiver.max_devices)] - activity_text = ", ".join(f"{int(d)}={int(a)}" for d, a in activity if a > 0) - print(" Device activity counters:", activity_text or "(empty)") + print(" %2d: %-22s {%04X}" % (index, display_name, feat_id)) + if feature == SupportedFeature.CENTURION_DEVICE_INFO: + fw_list = _hidpp20.get_firmware_centurion(receiver) + serial = _hidpp20.get_serial_centurion(receiver) + hw_info = _hidpp20.get_hardware_info_centurion(receiver) + if fw_list: + for fw in fw_list: + print(f" Firmware: {(str(fw.kind) + ' ' + fw.name).strip()} {fw.version}") + if serial and serial.strip() and serial.strip().isprintable(): + print(f" Serial: {serial}") + if hw_info: + model_id, hw_rev, product_id = hw_info + print(f" Hardware: model {model_id}" f" rev {hw_rev} product {product_id:04X}") def _battery_text(level) -> str: @@ -91,9 +136,11 @@ def _battery_line(dev): def _print_device(dev, num=None): assert dev is not None + is_centurion = getattr(dev, "centurion", False) + is_centurion_child = is_centurion and isinstance(getattr(dev, "receiver", None), CenturionReceiver) # try to ping the device to see if it actually exists and to wake it up try: - dev.ping() + online = dev.ping() except exceptions.NoSuchDevice: print(f" {num}: Device not found" or dev.number) return @@ -102,18 +149,29 @@ def _print_device(dev, num=None): print(f" {int(num or dev.number)}: {dev.name}") else: print(f"{dev.name}") - print(" Device path :", dev.path) - if dev.wpid: + + if not online: + print(" Device is offline.") + return + # Centurion child has no separate hidraw path — show receiver's path + device_path = dev.path or (dev.receiver.path if is_centurion_child else None) + print(" Device path :", device_path) + if dev.wpid and not is_centurion_child: print(f" WPID : {dev.wpid}") if dev.product_id: print(f" USB id : {LOGITECH_VENDOR_ID:04x}:{dev.product_id}") print(" Codename :", dev.codename) print(" Kind :", dev.kind) if dev.protocol: - print(f" Protocol : HID++ {dev.protocol:1.1f}") + proto_name = "Centurion" if is_centurion else "HID++" + cent_proto = getattr(dev, "_centurion_protocol", None) + if cent_proto: + print(f" Protocol : {proto_name} {cent_proto[0]}.{cent_proto[1]}") + else: + print(f" Protocol : {proto_name} {dev.protocol:1.1f}") else: print(" Protocol : unknown (device is offline)") - if dev.polling_rate: + if not is_centurion and dev.polling_rate: print(" Report Rate :", dev.polling_rate) print(" Serial number:", dev.serial) if dev.modelId: @@ -127,7 +185,8 @@ def _print_device(dev, num=None): if dev.power_switch_location: print(f" The power switch is located on the {dev.power_switch_location}.") - if dev.online: + # Skip HID++ 1.0 register reads for centurion devices — they don't support these + if dev.online and not is_centurion: notification_flags = _hidpp10.get_notification_flags(dev) if notification_flags is not None: if notification_flags: @@ -144,25 +203,56 @@ def _print_device(dev, num=None): print(" Features: (none)") if dev.online and dev.features: - print(f" Supports {len(dev.features)} HID++ 2.0 features:") + is_centurion = getattr(dev, "centurion", False) + parent_count = dev.features.count + sub_count = getattr(dev.features, "_sub_feature_count", 0) + # For centurion child devices, dongle features are shown on the receiver — + # only show sub-device (headset) features here. + if is_centurion_child and sub_count > 0: + print(f" Supports {sub_count} HID++ 2.0 features:") + elif is_centurion and sub_count > 0: + print(f" Supports {parent_count} dongle + {sub_count} headset features:") + else: + print(f" Supports {len(dev.features)} HID++ 2.0 features:") dev_settings = [] settings_templates.check_feature_settings(dev, dev_settings) + feature_num = 0 + in_sub_device = False for feature, index in dev.features.enumerate(): + if is_centurion and not in_sub_device and feature_num >= parent_count: + in_sub_device = True + if not is_centurion_child: + print(" Headset (via CentPPBridge):") + feature_num += 1 + # For centurion child, skip dongle features (already shown on the receiver) + if is_centurion_child and not in_sub_device: + continue if isinstance(feature, str): feature_bytes = bytes.fromhex(feature[-4:]) else: feature_bytes = feature.to_bytes(2, byteorder="little") feature_int = int.from_bytes(feature_bytes, byteorder="little") - try: - flags = dev.request(0x0000, feature_bytes) - except Exception: - print(" %2d: %-22s {%04X} - can't retrieve" % (index, feature, feature_int)) - continue - flags = 0 if flags is None else ord(flags[1:2]) - flags = common.flag_names(hidpp20_constants.FeatureFlag, flags) - version = dev.features.get_feature_version(feature_int) - version = version if version else 0 - print(" %2d: %-22s {%04X} V%s %s " % (index, feature, feature_int, version, ", ".join(flags))) + display_name = feature + if is_centurion_child and in_sub_device: + # Use cached version — skip slow bridge ROOT queries + version = dev.features.get_feature_version(feature_int) or 0 + print(" %2d: %-22s {%04X} V%s" % (index, display_name, feature_int, version)) + else: + try: + flags = dev.request(0x0000, feature_bytes) + except Exception: + flags = None + if flags is not None: + flags = ord(flags[1:2]) + flag_names = common.flag_names(hidpp20_constants.FeatureFlag, flags) + version = dev.features.get_feature_version(feature_int) + version = version if version else 0 + print( + " %2d: %-22s {%04X} V%s %s " + % (index, display_name, feature_int, version, ", ".join(flag_names)) + ) + else: + print(" %2d: %-22s {%04X}" % (index, display_name, feature_int)) if feature == SupportedFeature.HIRES_WHEEL: wheel = _hidpp20.get_hires_wheel(dev) if wheel: @@ -230,7 +320,25 @@ def _print_device(dev, num=None): print(f" Kind: {_hidpp20.get_kind(dev)}") elif feature == SupportedFeature.DEVICE_FRIENDLY_NAME: print(f" Friendly Name: {_hidpp20.get_friendly_name(dev)}") - elif feature == SupportedFeature.DEVICE_FW_VERSION: + elif feature == SupportedFeature.CENTURION_DEVICE_INFO: + if in_sub_device: + # Use cached device properties to avoid redundant bridge requests + fw_list = dev.firmware + serial = dev.serial + hw_info = _hidpp20.get_hardware_info_centurion_sub(dev) + else: + fw_list = _hidpp20.get_firmware_centurion(dev) + serial = _hidpp20.get_serial_centurion(dev) + hw_info = _hidpp20.get_hardware_info_centurion(dev) + if fw_list: + for fw in fw_list: + print(f" Firmware: {(str(fw.kind) + ' ' + fw.name).strip()} {fw.version}") + if serial and serial.strip() and serial.strip().isprintable(): + print(f" Serial: {serial}") + if hw_info: + model_id, hw_rev, product_id = hw_info + print(f" Hardware: model {model_id}" f" rev {hw_rev} product {product_id:04X}") + elif isinstance(feature, SupportedFeature) and feature == SupportedFeature.DEVICE_FW_VERSION: for fw in _hidpp20.get_firmware(dev): extras = strhex(fw.extras) if fw.extras else "" print(f" Firmware: {fw.kind} {fw.name} {fw.version} {extras}") @@ -251,6 +359,10 @@ def _print_device(dev, num=None): else: mode = "On-Board" print(f" Device Mode: {mode}") + elif feature == SupportedFeature.HEADSET_ONBOARD_EQ: + bands = hidpp20.get_onboard_eq_params(dev) + if bands: + print(f" EQ: {', '.join(f'{f}Hz:{g:+d}dB' for f, g, _q in bands)}") elif hidpp20.battery_functions.get(feature, None): print("", end=" ") _battery_line(dev) @@ -324,7 +436,7 @@ def run(devices, args, find_receiver, find_device): if device_name == "all": for d in devices: - if isinstance(d, receiver.Receiver): + if isinstance(d, (receiver.Receiver, CenturionReceiver)): _print_receiver(d) count = d.count() if count: @@ -336,8 +448,8 @@ def run(devices, args, find_receiver, find_device): break print("") else: - print("") _print_device(d) + print("") return dev = find_receiver(devices, device_name) diff --git a/lib/solaar/listener.py b/lib/solaar/listener.py index 74d2cd89..4aee6700 100644 --- a/lib/solaar/listener.py +++ b/lib/solaar/listener.py @@ -115,7 +115,6 @@ class SolaarListener(listener.EventsListener): reason or "", ) else: - device.ping() logger.info( "status_changed %r: %s %s (%X) %s", device, @@ -149,6 +148,12 @@ class SolaarListener(listener.EventsListener): def _notifications_handler(self, n): assert self.receiver if n.devnumber == 0xFF: + # For CenturionReceiver, intercept bridge notifications and dispatch to child device + from logitech_receiver.device import CenturionReceiver + + if isinstance(self.receiver, CenturionReceiver): + self._handle_centurion_notification(n) + return # a receiver notification notifications.process(self.receiver, n) return @@ -228,6 +233,102 @@ class SolaarListener(listener.EventsListener): elif dev.online is None: dev.ping() + def _handle_centurion_notification(self, n): + """Handle notifications from a CenturionReceiver dongle. + + Bridge events have sub_id == bridge_index. The event function number + is in bits 7-4 of n.address: + - Function 0: ConnectionStateChangedEvent — sub-device connect/disconnect + - Function 1: MessageEvent — wrapped sub-device HID++ notification + + ConnectionStateChangedEvent payload (same format as getConnectionInfo): + n.data[0]: high nibble = connection type, low nibble = len_hi + n.data[1]: len_lo + n.data[2+]: sub-device descriptors (if any) + Empty sub-device list (length=0) means disconnected. + + MessageEvent data layout: + n.data[0:2] = dev_id<<4|len_hi, len_lo + n.data[2] = sub_cpl (0x00 for both responses and notifications) + n.data[3] = sub_feat_idx + n.data[4] = sub_func_sw (sw_id=0 for unsolicited notifications) + n.data[5:] = payload + """ + child = self.receiver._devices.get(1) + if not child: + if logger.isEnabledFor(logging.DEBUG): + logger.debug("CenturionReceiver: notification ignored (no child device): %s", n) + return + + bridge_idx = getattr(child, "_centurion_bridge_index", None) + if bridge_idx is None or n.sub_id != bridge_idx: + if logger.isEnabledFor(logging.DEBUG): + logger.debug( + "CenturionReceiver: non-bridge notification sub_id=%d addr=0x%02X data=%s", + n.sub_id, + n.address, + n.data[:12].hex() if n.data else "", + ) + return + + event_func = (n.address >> 4) & 0x0F + + if event_func == 0: + # ConnectionStateChangedEvent — parse sub-device list length + if len(n.data) < 2: + return + data_len = ((n.data[0] & 0x0F) << 8) | n.data[1] + if data_len > 0 and not child.online: + if logger.isEnabledFor(logging.INFO): + logger.info("CenturionReceiver: headset connected (ConnectionStateChangedEvent, len=%d)", data_len) + child.changed(active=True) + self._status_changed(child) + elif data_len == 0 and child.online: + if logger.isEnabledFor(logging.INFO): + logger.info("CenturionReceiver: headset disconnected (ConnectionStateChangedEvent, len=0)") + child.changed(active=False) + self._status_changed(child) + return + + if event_func == 1: + # MessageEvent — unwrap sub-device notification + # n.data layout: [dev_id<<4|len_hi, len_lo, sub_cpl, sub_feat_idx, sub_func_sw, payload...] + if len(n.data) < 5: + return + # A MessageEvent from the headset proves it's online. If we missed the + # ConnectionStateChangedEvent (e.g. cold-start power-on), bring it online now. + if not child.online: + if logger.isEnabledFor(logging.INFO): + logger.info("CenturionReceiver: headset online (MessageEvent received while offline)") + child.changed(active=True) + self._status_changed(child) + sub_feat_idx = n.data[3] + sub_func_sw = n.data[4] + payload = n.data[5:] + if logger.isEnabledFor(logging.DEBUG): + logger.debug( + "CenturionReceiver: bridge MessageEvent sub_feat=%d func=0x%02X payload=%s -> child %s", + sub_feat_idx, + sub_func_sw, + payload[:8].hex() if payload else "", + child, + ) + # Create synthetic notification and dispatch directly to feature processing. + # Sub-device features use 0x100 offset in FeaturesArray.inverse. + synthetic = base.HIDPPNotification(n.report_id, child.number, sub_feat_idx + 0x100, sub_func_sw, payload) + child.online = True + if child.features: + notifications._process_feature_notification(child, synthetic) + return + + if logger.isEnabledFor(logging.DEBUG): + logger.debug( + "CenturionReceiver: unhandled bridge event func=%d addr=0x%02X data=%s", + event_func, + n.address, + n.data[:12].hex() if n.data else "", + ) + def __str__(self): return f"" @@ -260,6 +361,13 @@ def _start(device_info: DeviceInfo): if not device_info.isDevice: receiver_ = logitech_receiver.receiver.create_receiver(base, device_info, _setting_callback) + elif getattr(device_info, "centurion", False): + receiver_ = logitech_receiver.device.create_centurion_receiver(base, device_info, _setting_callback) + if receiver_ is None: + # No bridge found — treat as a direct-connected centurion device (e.g., wired headset) + receiver_ = logitech_receiver.device.create_device(base, device_info, _setting_callback) + if receiver_: + configuration.attach_to(receiver_) else: receiver_ = logitech_receiver.device.create_device(base, device_info, _setting_callback) if receiver_: diff --git a/lib/solaar/ui/config_panel.py b/lib/solaar/ui/config_panel.py index 4c6e1a51..7622dcba 100644 --- a/lib/solaar/ui/config_panel.py +++ b/lib/solaar/ui/config_panel.py @@ -545,6 +545,69 @@ class PackedRangeControl(MultipleRangeControl): self._button.set_tooltip_text(b) +class GraphicEQControl(MultipleControl): + def setup(self, setting): + self._items = [] + validator = setting._validator + row = Gtk.ListBoxRow() + hbox = Gtk.HBox(homogeneous=True, spacing=8) + for item in range(validator.count): + vbox = Gtk.VBox(homogeneous=False, spacing=2) + scale = Gtk.Scale.new_with_range(Gtk.Orientation.VERTICAL, validator.min_value, validator.max_value, 1) + scale.set_inverted(True) + scale.set_round_digits(0) + scale.set_digits(0) + scale.set_draw_value(True) + scale.connect("format-value", lambda s, v: f"{int(v)} dB") + scale.set_has_origin(True) + scale.set_size_request(-1, 150) + scale.add_mark(0, Gtk.PositionType.LEFT, "0") + scale.connect(GtkSignal.VALUE_CHANGED.value, self._changed, validator.keys[item]) + lbl = Gtk.Label(label=str(validator.keys[item])) + lbl.set_line_wrap(True) + lbl.set_justify(Gtk.Justification.CENTER) + vbox.pack_start(scale, True, True, 0) + vbox.pack_end(lbl, False, False, 0) + vbox._setting_item = validator.keys[item] + vbox.control = scale + hbox.pack_start(vbox, True, True, 0) + self._items.append(vbox) + row.add(hbox) + self.add(row) + + def _changed(self, control, item): + if control.get_sensitive(): + if hasattr(control, "_timer"): + control._timer.cancel() + control._timer = Timer(0.5, lambda: GLib.idle_add(self._write, control, item)) + control._timer.start() + + def _write(self, control, item): + control._timer.cancel() + delattr(control, "_timer") + new_state = int(control.get_value()) + if self.sbox.setting._value[int(item)] != new_state: + self.sbox.setting._value[int(item)] = new_state + _write_async(self.sbox.setting, self.sbox.setting._value[int(item)], self.sbox, key=int(item)) + + def set_value(self, value): + if value is None: + return + b = "" + n = len(self._items) + for vbox in self._items: + item = vbox._setting_item + v = value.get(int(item), None) + if v is not None: + vbox.control.set_value(v) + else: + v = self.sbox.setting._value[int(item)] + b += f"{str(item)}: ({str(v)}) " + lbl_text = ngettext("%d value", "%d values", n) % n + self._button.set_label(lbl_text) + self._button.set_tooltip_text(b) + + # control with an ID key that determines what else to show class HeteroKeyControl(Gtk.HBox, Control): def __init__(self, sbox, delegate=None): @@ -715,6 +778,8 @@ def _create_sbox(s, _device): control = MultipleRangeControl(sbox, change) elif s.kind == settings.Kind.PACKED_RANGE: control = PackedRangeControl(sbox, change) + elif s.kind == settings.Kind.GRAPHIC_EQ: + control = GraphicEQControl(sbox, change) elif s.kind == settings.Kind.HETERO: control = HeteroKeyControl(sbox, change) else: diff --git a/lib/solaar/ui/window.py b/lib/solaar/ui/window.py index a9f117e1..f2d5ed5f 100644 --- a/lib/solaar/ui/window.py +++ b/lib/solaar/ui/window.py @@ -536,7 +536,13 @@ def _update_details(button): if device.product_id: yield _("Product ID"), f"{LOGITECH_VENDOR_ID:04x}:" + device.product_id hid_version = device.protocol - yield _("Protocol"), f"HID++ {hid_version:1.1f}" if hid_version else _("Unknown") + cent_proto = getattr(device, "_centurion_protocol", None) + if cent_proto: + yield _("Protocol"), f"Centurion {cent_proto[0]}.{cent_proto[1]}" + elif hid_version: + yield _("Protocol"), f"HID++ {hid_version:1.1f}" + else: + yield _("Protocol"), _("Unknown") if read_all and device.polling_rate: yield _("Polling rate"), device.polling_rate diff --git a/tests/logitech_receiver/fake_hidpp.py b/tests/logitech_receiver/fake_hidpp.py index 01f2d7a6..7758f2f5 100644 --- a/tests/logitech_receiver/fake_hidpp.py +++ b/tests/logitech_receiver/fake_hidpp.py @@ -386,6 +386,7 @@ class Device: version: Optional[int] = 0 wpid: Optional[str] = "0000" setting_callback: Any = None + centurion: bool = False sliding = profiles = _backlight = _keys = _remap_keys = _led_effects = _gestures = None _gestures_lock = threading.Lock() number = "d1" @@ -400,6 +401,7 @@ class Device: gestures = device.Device.gestures __hash__ = device.Device.__hash__ feature_request = device.Device.feature_request + _infer_kind_centurion = device.Device._infer_kind_centurion def __post_init__(self): self._name = self.name @@ -424,6 +426,9 @@ class Device: if self.setting_callback is None: self.setting_callback = lambda x, y, z: None self.add_notification_handler = lambda x, y: None + # Centurion bridge responses: keyed by (sub_feat_idx, sub_function) -> response bytes + if not hasattr(self, "_bridge_responses"): + self._bridge_responses = {} def request(self, id, *params, no_reply=False, long_message=False, protocol=2.0): params = b"".join(pack("B", p) if isinstance(p, int) else p for p in params) @@ -434,6 +439,24 @@ class Device: return bytes.fromhex(r.response) if isinstance(r.response, str) else r.response print("RESPONSE", self._name, None) + def centurion_bridge_request(self, sub_feat_idx, sub_function=0x00, *params, no_reply=False): + """Fake bridge request — looks up (sub_feat_idx, sub_function, params) in _bridge_responses.""" + params_bytes = b"".join(pack("B", p) if isinstance(p, int) else p for p in params) if params else b"" + key = (sub_feat_idx, sub_function, params_bytes.hex().upper()) + print("BRIDGE ", self._name, f"sub_idx={sub_feat_idx} func={sub_function} params={params_bytes.hex().upper()}") + result = self._bridge_responses.get(key) + if result is not None: + print("BRIDGE_R", self._name, result.hex().upper()) + return result + # Try without params for convenience + key_no_params = (sub_feat_idx, sub_function, "") + result = self._bridge_responses.get(key_no_params) + if result is not None: + print("BRIDGE_R", self._name, result.hex().upper()) + return result + print("BRIDGE_R", self._name, None) + return None + def ping(self, handle=None, devnumber=None, long_message=False): print("PING", self._protocol) return self._protocol @@ -451,6 +474,25 @@ class Device: pass +# Centurion headset (PRO X 2 LIGHTSPEED) parent device responses. +# Parent has 5 features: ROOT(0), FeatureSet(1), DeviceInfo(2), CentPPBridge(3), GenericDFU(4) +# Feature 0x0003 on Centurion = CentPPBridge (NOT FirmwareInfo). +r_centurion_headset = [ + Response(2.6, 0x0010), # ping (protocol 2.6) + Response("010001", 0x0000, "0001"), # FeatureSet at index 1 + Response("020001", 0x0000, "0100"), # DeviceInfo at index 2 + Response("030001", 0x0000, "0003"), # CentPPBridge at index 3 + Response("040001", 0x0000, "010A"), # GenericDFU at index 4 + Response("05", 0x0100), # feature count = 5 (includes ROOT on Centurion) + # FeatureSet.getFeatureID responses: [remaining_count, feat_hi, feat_lo, type, flags] + Response("0400000000", 0x0110, "00"), # index 0: ROOT (0x0000) + Response("0300010001", 0x0110, "01"), # index 1: FeatureSet (0x0001) + Response("0201000001", 0x0110, "02"), # index 2: DeviceInfo (0x0100) + Response("0100030001", 0x0110, "03"), # index 3: CentPPBridge (0x0003) + Response("00010A0001", 0x0110, "04"), # index 4: GenericDFU (0x010A) +] + + def match_requests(number, responses, call_args_list): for i in range(0 - number, 0): param = b"".join(pack("B", p) if isinstance(p, int) else p for p in call_args_list[i][0][1:]).hex().upper() diff --git a/tests/logitech_receiver/test_device.py b/tests/logitech_receiver/test_device.py index 0d45d69c..b291dba9 100644 --- a/tests/logitech_receiver/test_device.py +++ b/tests/logitech_receiver/test_device.py @@ -60,6 +60,7 @@ class DeviceInfoStub: hidpp_long: bool = True bus_id: int = 0x0003 # USB serial: str = "aa:aa:aa;aa" + centurion: bool = False di_bad_handle = DeviceInfoStub(None, product_id="CCCC") @@ -70,6 +71,7 @@ di_B530 = DeviceInfoStub("11", product_id="B350", bus_id=0x0005) di_C068 = DeviceInfoStub("11", product_id="C06B") di_C08A = DeviceInfoStub("11", product_id="C08A") di_DDDD = DeviceInfoStub("11", product_id="DDDD") +di_0AF7 = DeviceInfoStub("11", product_id="0AF7", centurion=True) @pytest.mark.parametrize( @@ -78,6 +80,7 @@ di_DDDD = DeviceInfoStub("11", product_id="DDDD") (di_bad_handle, fake_hidpp.r_empty, None), (di_error, fake_hidpp.r_empty, False), (di_CCCC, fake_hidpp.r_empty, True), + (di_0AF7, fake_hidpp.r_empty, True), ], ) def test_create_device(device_info, responses, expected_success): @@ -93,6 +96,22 @@ def test_create_device(device_info, responses, expected_success): assert bool(test_device) == expected_success +def test_create_centurion_device(): + """Test that a centurion device gets hidpp_long forced to True and centurion flag set.""" + from logitech_receiver import base + + low_level_mock = LowLevelInterfaceFake(fake_hidpp.r_empty) + test_device = device.create_device(low_level_mock, di_0AF7) + + assert test_device is not None + assert test_device.centurion is True + assert test_device.hidpp_long is True + assert int(test_device.handle) in base._centurion_handles + + # Clean up + base._centurion_handles.discard(int(test_device.handle)) + + @pytest.mark.parametrize( "device_info, responses, expected_codename, expected_name, expected_kind", [(di_CCCC, fake_hidpp.r_empty, "?? (CCCC)", "Unknown device CCCC", "?")], diff --git a/tests/logitech_receiver/test_hidpp20_complex.py b/tests/logitech_receiver/test_hidpp20_complex.py index dc7d4b85..1e3ef9dc 100644 --- a/tests/logitech_receiver/test_hidpp20_complex.py +++ b/tests/logitech_receiver/test_hidpp20_complex.py @@ -22,6 +22,7 @@ from logitech_receiver import exceptions from logitech_receiver import hidpp20 from logitech_receiver import hidpp20_constants from logitech_receiver import special_keys +from logitech_receiver.device import CenturionReceiver from logitech_receiver.hidpp20 import KeyFlag from logitech_receiver.hidpp20 import MappingFlag from logitech_receiver.hidpp20_constants import GestureId @@ -101,10 +102,9 @@ def test_FeaturesArray_get_feature(device, expected0, expected1, expected2, expe (hidpp20_constants.SupportedFeature.FEATURE_SET, 1), (hidpp20_constants.SupportedFeature.CONFIG_CHANGE, 2), (hidpp20_constants.SupportedFeature.DEVICE_FW_VERSION, 3), - (common.NamedInt(256, "unknown:0100"), 4), + (hidpp20_constants.SupportedFeature.CENTURION_DEVICE_INFO, 4), (hidpp20_constants.SupportedFeature.REPROG_CONTROLS_V4, 5), - (None, 6), - (None, 7), + # Indices 6 and 7 have no responses — get_feature returns None, enumerate skips them (hidpp20_constants.SupportedFeature.BATTERY_STATUS, 8), ], ), @@ -922,3 +922,361 @@ def test_onboard_profiles_device(responses, name, count, buttons, gbuttons, sect yml_dump = yaml.dump(profiles) assert yaml.safe_load(yml_dump).to_bytes().hex() == profiles.to_bytes().hex() + + +# --- Centurion (PRO X 2 LIGHTSPEED headset) tests --- + +device_centurion = fake_hidpp.Device("CENTURION", True, 2.6, fake_hidpp.r_centurion_headset, centurion=True) + + +def test_centurion_parent_feature_discovery(): + """Parent feature enumeration discovers CentPPBridge at index 3 and stores bridge index.""" + dev = fake_hidpp.Device("CENT_PARENT", True, 2.6, fake_hidpp.r_centurion_headset, centurion=True) + featuresarray = hidpp20.FeaturesArray(dev) + dev.features = featuresarray + + result = featuresarray._check() + + assert result is True + assert featuresarray.count == 5 + # Parent features registered + assert featuresarray[hidpp20_constants.SupportedFeature.ROOT] == 0 + assert featuresarray[hidpp20_constants.SupportedFeature.FEATURE_SET] == 1 + assert featuresarray[hidpp20_constants.SupportedFeature.CENTURION_DEVICE_INFO] == 2 + assert featuresarray[hidpp20_constants.SupportedFeature.CENTURION_GENERIC_DFU] == 4 + # Feature 0x0003 = CentPPBridge on Centurion (stored as DEVICE_FW_VERSION since same ID) + assert featuresarray[hidpp20_constants.SupportedFeature.DEVICE_FW_VERSION] == 3 + # Bridge index stored on device + assert dev._centurion_bridge_index == 3 + assert hasattr(dev, "_centurion_sub_features") + + +def test_centurion_sub_device_feature_discovery(): + """Sub-device feature discovery routes through bridge and populates _centurion_sub_features.""" + dev = fake_hidpp.Device("CENT_SUB", True, 2.6, fake_hidpp.r_centurion_headset, centurion=True) + # Set up bridge responses for sub-device discovery: + # 1. CenturionRoot.GetFeature(0x0001) -> FeatureSet at sub-index 1 + # 2. CenturionFeatureSet.GetFeatureId(index=0) -> bulk feature list + dev._bridge_responses = { + # CenturionRoot(idx=0).GetFeature(func=0) with feature_id=0x0001 -> sub_fs_index=1 + (0x00, 0x00, "0001"): bytes([0x01, 0x00, 0x00]), + # CenturionFeatureSet(idx=1).GetFeatureId(func=0x10, start=0) -> 3 features + # Response: [count, (feat_hi, feat_lo, type, flags) × count] + (0x01, 0x10, "00"): bytes( + [ + 0x03, # 3 features + 0x06, + 0x04, + 0x00, + 0x00, # HEADSET_AUDIO_SIDETONE (0x0604) at sub-idx 0 + 0x06, + 0x01, + 0x00, + 0x00, # HEADSET_MIC_MUTE (0x0601) at sub-idx 1 + 0x06, + 0x11, + 0x00, + 0x00, # HEADSET_MIC_GAIN (0x0611) at sub-idx 2 + ] + ), + } + featuresarray = hidpp20.FeaturesArray(dev) + dev.features = featuresarray + + featuresarray._check() + + # Sub-device features should be discovered + assert hidpp20_constants.SupportedFeature.HEADSET_AUDIO_SIDETONE in dev._centurion_sub_features + assert hidpp20_constants.SupportedFeature.HEADSET_MIC_MUTE in dev._centurion_sub_features + assert hidpp20_constants.SupportedFeature.HEADSET_MIC_GAIN in dev._centurion_sub_features + # Sub-device features should be in features array with their sub-device indices + assert featuresarray[hidpp20_constants.SupportedFeature.HEADSET_AUDIO_SIDETONE] == 0 + assert featuresarray[hidpp20_constants.SupportedFeature.HEADSET_MIC_MUTE] == 1 + assert featuresarray[hidpp20_constants.SupportedFeature.HEADSET_MIC_GAIN] == 2 + # _centurion_sub_indices should map ALL sub-device features to their sub-device indices + assert dev._centurion_sub_indices[hidpp20_constants.SupportedFeature.HEADSET_AUDIO_SIDETONE] == 0 + assert dev._centurion_sub_indices[hidpp20_constants.SupportedFeature.HEADSET_MIC_MUTE] == 1 + assert dev._centurion_sub_indices[hidpp20_constants.SupportedFeature.HEADSET_MIC_GAIN] == 2 + + +def test_centurion_feature_request_routes_sub_device(): + """feature_request() routes sub-device features through centurion_bridge_request().""" + dev = fake_hidpp.Device("CENT_ROUTE", True, 2.6, fake_hidpp.r_centurion_headset, centurion=True) + # Manually set up sub-device state (simulating completed discovery) + dev.features.count = 5 # mark discovery as complete so _check() short-circuits + dev._centurion_bridge_index = 3 + dev._centurion_sub_features = {hidpp20_constants.SupportedFeature.HEADSET_AUDIO_SIDETONE} + dev.features[hidpp20_constants.SupportedFeature.HEADSET_AUDIO_SIDETONE] = 7 # sub-device index + # Set up bridge response for GetSidetoneLevel + dev._bridge_responses = { + (7, 0x00, ""): bytes([0x01, 0x00, 0x32]), # mic_id=1, mute=0, level=50 + } + + result = dev.feature_request(hidpp20_constants.SupportedFeature.HEADSET_AUDIO_SIDETONE, 0x00) + + assert result is not None + assert result == bytes([0x01, 0x00, 0x32]) + + +def test_centurion_feature_request_parent_not_routed(): + """feature_request() does NOT route parent features through bridge.""" + dev = fake_hidpp.Device("CENT_PARENT2", True, 2.6, fake_hidpp.r_centurion_headset, centurion=True) + dev._centurion_bridge_index = 3 + dev._centurion_sub_features = {hidpp20_constants.SupportedFeature.HEADSET_AUDIO_SIDETONE} + + # FeatureSet is a parent feature — should go through normal request(), not bridge + featuresarray = hidpp20.FeaturesArray(dev) + dev.features = featuresarray + featuresarray._check() + + # CENTURION_DEVICE_INFO is a parent feature at index 2 — requesting it should + # NOT go through bridge, it should go through the normal hidpp20.feature_request path + assert hidpp20_constants.SupportedFeature.CENTURION_DEVICE_INFO not in dev._centurion_sub_features + + +def test_centurion_bridge_request_write(): + """centurion_bridge_request with no_reply=True returns None immediately.""" + dev = fake_hidpp.Device("CENT_WRITE", True, 2.6, fake_hidpp.r_centurion_headset, centurion=True) + dev._centurion_bridge_index = 3 + dev._centurion_sub_features = {hidpp20_constants.SupportedFeature.HEADSET_AUDIO_SIDETONE} + dev.features[hidpp20_constants.SupportedFeature.HEADSET_AUDIO_SIDETONE] = 7 + dev._bridge_responses = {} # no responses needed for no_reply + + result = dev.centurion_bridge_request(7, 0x10, 0x32, no_reply=True) + + assert result is None + + +def test_centurion_firmware_dedup(): + """get_firmware_centurion() deduplicates identical firmware entries.""" + # Simulate parent device that returns the same firmware for every entity index + fw_response = "00" + "00" + "0105" + "04" + "44303031" + "00" * 20 # type=0, ver=1.05, name="D001" + responses = fake_hidpp.r_centurion_headset + [fake_hidpp.Response(fw_response, 0x0210, f"{i:02X}") for i in range(8)] + dev = fake_hidpp.Device("CENT_DEDUP", True, 2.6, responses, centurion=True) + + fw = _hidpp20.get_firmware_centurion(dev) + + # Should only get 1 entry, not 8 + assert fw is not None + assert len(fw) == 1 + assert fw[0].name == "D001" + + +def test_centurion_sub_device_firmware(): + """get_firmware_centurion_sub() queries sub-device firmware via bridge.""" + dev = fake_hidpp.Device("CENT_SUBFW", True, 2.6, fake_hidpp.r_centurion_headset, centurion=True) + dev._centurion_bridge_index = 3 + dev._centurion_sub_features = set() + dev._centurion_sub_indices = {hidpp20_constants.SupportedFeature.CENTURION_DEVICE_INFO: 2} + # Sub-device firmware: type=0 (firmware), ver=3.02, name="H001" + dev._bridge_responses = { + (2, 0x10, "00"): bytes([0x00, 0x00, 0x03, 0x02, 0x04]) + b"H001", + (2, 0x10, "01"): bytes([0x00, 0x00, 0x03, 0x02, 0x04]) + b"H001", # duplicate → dedup stops + } + + fw = _hidpp20.get_firmware_centurion_sub(dev) + + assert fw is not None + assert len(fw) == 1 + assert fw[0].name == "H001" + assert fw[0].version == "3.02" + + +def test_centurion_sub_device_serial(): + """get_serial_centurion_sub() queries sub-device serial via bridge.""" + dev = fake_hidpp.Device("CENT_SUBSER", True, 2.6, fake_hidpp.r_centurion_headset, centurion=True) + dev._centurion_bridge_index = 3 + dev._centurion_sub_features = set() + dev._centurion_sub_indices = {hidpp20_constants.SupportedFeature.CENTURION_DEVICE_INFO: 2} + dev._bridge_responses = { + (2, 0x20, ""): bytes([0x0C]) + b"ABC123DEF456", + } + + serial = _hidpp20.get_serial_centurion_sub(dev) + + assert serial == "ABC123DEF456" + + +def test_centurion_sub_device_hardware_info(): + """get_hardware_info_centurion_sub() queries sub-device hardware info via bridge.""" + dev = fake_hidpp.Device("CENT_SUBHW", True, 2.6, fake_hidpp.r_centurion_headset, centurion=True) + dev._centurion_bridge_index = 3 + dev._centurion_sub_features = set() + dev._centurion_sub_indices = {hidpp20_constants.SupportedFeature.CENTURION_DEVICE_INFO: 2} + dev._bridge_responses = { + (2, 0x00, ""): bytes([0x01, 0x03, 0x0A, 0xF7]), + } + + hw_info = _hidpp20.get_hardware_info_centurion_sub(dev) + + assert hw_info is not None + model_id, hw_rev, product_id = hw_info + assert model_id == 1 + assert hw_rev == 3 + assert product_id == 0x0AF7 + + +def test_centurion_kind_inference(): + """Centurion device with 0x06xx audio features infers kind=headset.""" + dev = fake_hidpp.Device("CENT_KIND", True, 2.6, fake_hidpp.r_centurion_headset, centurion=True) + dev._centurion_sub_features = { + hidpp20_constants.SupportedFeature.HEADSET_AUDIO_SIDETONE, + hidpp20_constants.SupportedFeature.HEADSET_MIC_MUTE, + } + + kind = dev._infer_kind_centurion() + + from logitech_receiver import hidpp10_constants + + assert kind == hidpp10_constants.DEVICE_KIND.headset + + +# --- CenturionReceiver tests --- + + +class FakeCenturionDeviceInfo: + """Minimal device_info for CenturionReceiver tests.""" + + def __init__(self, path="/dev/hidraw99", product_id="0AF0", product=None, centurion=True): + self.path = path + self.product_id = product_id + self.product = product + self.centurion = centurion + self.isDevice = True + + +class FakeLowLevel: + """Minimal low_level for CenturionReceiver tests.""" + + def __init__(self, ping_protocol=2.6): + self.ping_protocol = ping_protocol + self.opened_paths = [] + self.closed_handles = [] + + def open_path(self, path): + self.opened_paths.append(path) + return 0x99 + + def ping(self, handle, number, long_message=False): + return self.ping_protocol + + def request(self, handle, devnumber, request_id, *params, **kwargs): + return None + + def close(self, handle, *args, **kwargs): + self.closed_handles.append(handle) + return True + + def find_paired_node(self, receiver_path, index, timeout): + return None + + +def test_centurion_receiver_attributes(): + """CenturionReceiver has correct receiver-like attributes.""" + info = FakeCenturionDeviceInfo(product="PRO X 2 LIGHTSPEED") + recv = CenturionReceiver(FakeLowLevel(), 0x99, info) + + assert recv.kind is None + assert recv.isDevice is False + assert recv.number == 0xFF + assert recv.max_devices == 1 + assert recv.may_unpair is False + assert recv.re_pairs is False + assert recv.handle == 0x99 + assert recv.path == "/dev/hidraw99" + assert recv.product_id == "0AF0" + assert recv.name == "Centurion Receiver" + assert recv.serial is None + assert recv.pairing is not None + assert recv.pairing.lock_open is False + assert bool(recv) is True + + +def test_centurion_receiver_container_empty(): + """Empty CenturionReceiver has correct container behavior.""" + info = FakeCenturionDeviceInfo() + recv = CenturionReceiver(FakeLowLevel(), 0x99, info) + + assert len(recv) == 0 + assert recv.count() == 0 + assert 1 not in recv + assert list(recv) == [] + assert recv.status_string() == "No devices." + + +def test_centurion_receiver_container_with_device(): + """CenturionReceiver with a child device has correct container behavior.""" + info = FakeCenturionDeviceInfo() + recv = CenturionReceiver(FakeLowLevel(), 0x99, info) + + # Simulate adding a child device (a simple mock) + class FakeChild: + number = 1 + + def close(self): + pass + + recv._devices[1] = FakeChild() + + assert len(recv) == 1 + assert recv.count() == 1 + assert 1 in recv + assert 2 not in recv + assert recv[1] is not None + assert recv.status_string() == "1 device connected." + with pytest.raises(IndexError): + recv[2] + + +def test_centurion_receiver_enable_connection_notifications(): + """CenturionReceiver.enable_connection_notifications() returns False.""" + info = FakeCenturionDeviceInfo() + recv = CenturionReceiver(FakeLowLevel(), 0x99, info) + + assert recv.enable_connection_notifications() is False + assert recv.remaining_pairings() is None + + +def test_centurion_receiver_device_codename(): + """CenturionReceiver.device_codename() returns USB product name.""" + info = FakeCenturionDeviceInfo(product="PRO X 2 LIGHTSPEED") + recv = CenturionReceiver(FakeLowLevel(), 0x99, info) + + assert recv.device_codename(1) == "PRO X 2 LIGHTSPEED" + + +def test_centurion_receiver_close(): + """CenturionReceiver.close() closes handle and clears devices.""" + low_level = FakeLowLevel() + info = FakeCenturionDeviceInfo() + recv = CenturionReceiver(low_level, 0x99, info) + + class FakeChild: + closed = False + + def close(self): + self.closed = True + + child = FakeChild() + recv._devices[1] = child + + recv.close() + + assert recv.handle is None + assert len(recv._devices) == 0 + assert child.closed is True + assert 0x99 in low_level.closed_handles + assert bool(recv) is False + + +def test_centurion_receiver_changed_callback(): + """CenturionReceiver.changed() invokes status_callback.""" + info = FakeCenturionDeviceInfo() + recv = CenturionReceiver(FakeLowLevel(), 0x99, info) + calls = [] + recv.status_callback = lambda *args, **kwargs: calls.append((args, kwargs)) + + recv.changed() + + assert len(calls) == 1 + assert calls[0][0][0] is recv diff --git a/tests/logitech_receiver/test_setting_templates.py b/tests/logitech_receiver/test_setting_templates.py index b1cff158..dbfb9b05 100644 --- a/tests/logitech_receiver/test_setting_templates.py +++ b/tests/logitech_receiver/test_setting_templates.py @@ -638,6 +638,60 @@ key_tests = [ fake_hidpp.Response("E010", 0x0430, "02E010"), fake_hidpp.Response("E018", 0x0430, "02E018"), ), + Setup( # HeadsetOnboardEQ: 2 bands, 128Hz/-2dB/Q10 and 256Hz/+3dB/Q10 + FeatureTest(settings_templates.HeadsetOnboardEQ, {0: -2, 1: 3}, {1: 5}, 2), + [-12, 12], + fake_hidpp.Response("8000000002", 0x0400), # GetEQInfos: has_hw_eq, 2 bands + fake_hidpp.Response("00020080FE0A0100030A", 0x0410, "00"), # GetEQParameters + fake_hidpp.Response( # SetEQParameters: write initial values back (slot 0x00) + "00", + 0x0420, + "00020080FE0A0100030A" + "055AE300" # mystery bytes (from pcap) + "030E00020000000100170002" + "007A6B00D69D94008C516B00C82380005DC27F0075" + "906B0022B6940002226B00B44080007EA37F00C1C3040044" + "0200170002" + "00506B00900F95006ED56A00D185800060477F00CA" + "906B00229D950052496A00A12E810085EC7E0075C40400AC", + ), + fake_hidpp.Response( # SetEQParameters: persist initial values (slot 0x80) + "00", + 0x0420, + "80020080FE0A0100030A" + "055AE300" + "030E00020000000100170002" + "007A6B00D69D94008C516B00C82380005DC27F0075" + "906B0022B6940002226B00B44080007EA37F00C1C3040044" + "0200170002" + "00506B00900F95006ED56A00D185800060477F00CA" + "906B00229D950052496A00A12E810085EC7E0075C40400AC", + ), + fake_hidpp.Response( # SetEQParameters: write updated band 1 gain=5 (slot 0x00) + "00", + 0x0420, + "00020080FE0A0100050A" + "055AE300" + "030E00020000000100170002" + "006F6B00F5A894006A466B00EB2380005DC27F0075" + "906B0022BC9400AA156B00613B80007CAD7F00C6C30400BF" + "0200170002" + "00306B00272F9500BAB56A008C85800060477F00CA" + "906B0022B1950002226A000E1F8100AB0A7F004FC604001D", + ), + fake_hidpp.Response( # SetEQParameters: persist updated values (slot 0x80) + "00", + 0x0420, + "80020080FE0A0100050A" + "055AE300" + "030E00020000000100170002" + "006F6B00F5A894006A466B00EB2380005DC27F0075" + "906B0022BC9400AA156B00613B80007CAD7F00C6C30400BF" + "0200170002" + "00306B00272F9500BAB56A008C85800060477F00CA" + "906B0022B1950002226A000E1F8100AB0A7F004FC604001D", + ), + ), Setup( FeatureTest(settings_templates.PerKeyLighting, {1: -1, 2: -1, 9: -1, 10: -1, 113: -1}, {2: 0xFF0000}, 4, 4, 0, 1), {