From 12aabf029b4e43b117ad7da0a1bca3193406d611 Mon Sep 17 00:00:00 2001 From: Ken Sanislo Date: Tue, 14 Apr 2026 08:43:23 -0700 Subject: [PATCH] centurion: support PRO X 2 LIGHTSPEED headphones Centurion features (#3150) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit * Add Centurion transport and PRO X 2 LIGHTSPEED headset support Adds support for the Logitech PRO X 2 LIGHTSPEED Gaming Headset (PID 0x0AF7) which uses the Centurion transport protocol (report ID 0x51 on USB usage page 0xFFA0) instead of standard HID++ report IDs. Changes: - HID enumeration: detect Centurion devices via report descriptor parsing (usage page 0xFFA0, report ID 0x51, 63-byte frames) - Centurion transport: wrap/unwrap HID++ 2.0 frames in Centurion framing for write, read, and ping operations - Feature discovery: enumerate features individually on Centurion devices (different response format: [remaining_count, feat_hi, feat_lo]) - Device descriptor for PRO X 2 LIGHTSPEED Gaming Headset - New feature enum entries for Centurion-era headset features (0x06xx) - CenturionRawRW class for write-only headset settings controlled via raw Centurion commands reverse-engineered from HeadsetControl - HeadsetSidetone setting (0-100 range, persisted locally) Known limitations: - Only sidetone control is implemented; other features need RE work - Settings are write-only (no read-back from device) - Headset features (0x06xx) not discoverable via IRoot; registered manually * Remove static PRO X 2 descriptor; fully probe Centurion devices at runtime Replace the hardcoded descriptor entry with dynamic discovery of all device properties via the Centurion protocol. The headset name, kind, serial, firmware, and battery are now probed at runtime — matching how the device actually presents itself rather than relying on static data. Key changes: - Discover sub-device features via CentPPBridge and route requests through the bridge automatically - Infer device kind from feature IDs (0x06xx = headset) for both wireless and direct USB connections - Read device name from USB product string with protocol probe fallback - Parse bridge error responses (sub_feat_idx=0xFF) instead of timing out - Handle unknown HID++ error codes gracefully in base.py - Fix firmware deduplication for Centurion parent devices - Prefer sub-device serial/firmware over parent (non-printable) values - Add Centurion-aware display in solaar show with parent/sub-device sections - Support both wireless (0AF7 dongle) and direct USB (0AF8) connections * Display Centurion dongle as receiver with headset as child device - Add CenturionReceiver class that provides the Receiver UI interface so the dongle appears as a parent with the headset indented underneath, matching how Lightspeed/Unifying receivers display - Independently probe dongle features via feature_request() on the CenturionReceiver, separate from headset features via bridge - Fix bridge notification dispatch: remove incorrect sub_cpl=0xFF filter that was silently dropping all battery and other notifications - Fix battery status decoding: charging status is at byte 2 (not byte 1) of the CENTURION_BATTERY_SOC response - Detect wired vs wireless by checking for CentPPBridge in discovered features; wired headsets fall back to standalone Device - Name the dongle "Centurion Receiver" to distinguish from the headset - Filter unprintable dongle serial (control characters 0x14-0x1F) - Update CLI show output with proper receiver/child hierarchy and spacing * Fix headset setting validators and code formatting - Add signed int8 support to RangeValidator for HeadsetMicGain (0x0611) - Make HeadsetSidetone version-aware: v1 uses 2-byte skip, v2+ uses 3-byte skip with 0xFF separator per protocol spec - Fix ruff formatting in device.py, listener.py, udev_impl.py - Update CenturionReceiver test for renamed receiver * Use ConnectionStateChangedEvent for headset online/offline detection Replace ad-hoc heuristics with proper bridge event function dispatch: - Function 0 (ConnectionStateChangedEvent): parse sub-device list length to determine connect (len>0) vs disconnect (len=0) - Function 1 (MessageEvent): fallback online detection if headset sends a message while marked offline (handles cold-start power-on) Remove CPL sub_id>=0x80 fallback in listener that misidentified HID++ error replies as disconnect events. Skip HID++ 1.0 set_configuration_pending_flags for CenturionReceiver (not supported). Also adds OnboardEQ (0x0636) support, bridge multi-fragment sends, bridge-based headset ping probe, and CLI offline display. * Update PRO X 2 LIGHTSPEED device doc with current solaar show output * Fix Centurion protocol version display (1.16 not 2.6) The HID++ ping math (major + minor/10.0) produced a bogus "2.6" for Centurion devices whose ProtocolCapabilities returns major=1, minor=0x10. Store the raw (major, minor) bytes from the ping response and display them correctly as "Centurion 1.16" in both CLI and GUI. * Add OnboardEQ (0x0636) support for Centurion headsets Implement host-computed biquad EQ coefficient generation and multi-fragment bridge writes for the PRO X 2 LIGHTSPEED headset's 5-band parametric EQ. The coefficient algorithm uses standard Audio EQ Cookbook peaking EQ formulas with a simplified rescale normalization (max_b0 × 1.19 headroom). This is our own implementation — not an exact replica of LGHUB's ~350-line per-band cascade normalization — but it produces functionally correct results. The DSP compensates via the rescale factor, and the EQ changes are audible and working on real hardware. Wire format verified against 38 LGHUB pcap writes: - 4-byte LE section headers, LE uint16 coefficient words - Mixed Q1.31/Q2.30 fixed-point with 24-bit precision - Only b-coefficients divided by rescale; a-coefficients unchanged - Two sections: 48kHz playback + 16kHz mic - No trailing padding, no extra words between sections Changes: - base.py: Add flags parameter to write_centurion_cpl() for multi-fragment CPL - device.py: Rewrite multi-fragment bridge send — proper CPL fragmentation with fragment 0 carrying bridge prefix/hdr and continuations carrying raw sub_msg, all fragments sent back-to-back without intermediate ACKs - hidpp20.py: Replace placeholder coefficient code with full biquad math, mixed Q-format quantization, rescale normalization, and dual-section output - settings_templates.py: Persist EQ to slot 0x80 after writing to slot 0x00 so settings survive power cycle - tests: Update expected SetEQParameters payloads for new coefficient format * Extract Centurion protocol into separate modules Move CenturionReceiver class, factory function, and Centurion protocol queries (firmware, serial, hardware info, battery, name) from device.py and hidpp20.py into new centurion.py module. Move OnboardEQ biquad math and payload builders from hidpp20.py into new onboard_eq.py module. Move _read_usb_product_string() to common.py to avoid circular imports. Re-exports preserve backward compatibility for all existing callers. * Add vertical graphic EQ slider widget for headset equalizer Replace horizontal slider rows with a traditional graphic EQ layout using vertical sliders side-by-side, with dB value display and frequency labels per band. * Fix device online state clobbered by debug ping in _status_changed The INFO-level logging guard in _status_changed() called device.ping() before logging, purely to show accurate online status. But ping() has side effects — it sets device.online based on the result. When a ConnectionStateChangedEvent correctly marked a device online, the subsequent _status_changed() callback would re-ping. If the device wasn't ready yet (e.g. Centurion headset still booting), the ping timed out and set online back to False, requiring 2-3 power cycles to sync state. Remove the unnecessary ping — the log message already reads device.online which reflects the state set by the event handler. * Sort feature constants by ID and add PROFILE_MANAGEMENT Move RPM_INDICATOR/RPM_LED_PATTERN (0x807A-B) before PER_KEY_LIGHTING (0x8080-81), sort five Centurion-era headset entries into their correct positions by feature ID, and add missing PROFILE_MANAGEMENT = 0x8101. * Add CenturionCoreFeature enum for colliding feature IDs Centurion transport reuses HID++ 2.0 feature IDs 0x0000, 0x0001, 0x0003, 0x0005, 0x0007 with different meanings. Since SupportedFeature (IntEnum) requires unique values, create a separate CenturionCoreFeature enum and resolve_feature() helper for transport-aware lookup. Also replace the +0x100 offset hack in FeaturesArray.inverse with a dedicated sub_inverse dict for sub-device feature indexing. * Fix ruff I001 import sorting in centurion.py and hidpp20.py * Add 9 missing centurion/headset feature names Add feature constants split out from the HID++ 2.0 names PR (#3153): CENTURION_LED_BRIGHTNESS (0x0110), CENTURION_EU_POWER_MODE (0x0115), CENTURION_DEVICE_BOOL_STATE (0x0116), HEADSET_ADVANCED_PARA_EQ (0x020D), HEADSET_MIC_TEST (0x020E), HEADSET_EQ_STYLES (0x0213), BT_HOST_INFO (0x0305), LIGHTSPEED_PAIRING (0x0309), BT_GAMING_MODE (0x030A). * Extract _record_ping_protocol helper so all ping paths capture Centurion version The raw Centurion (major, minor) pickup was only in the Centurion-child dongle branch of Device.ping(). Wired Centurion variants (e.g. PRO X 2 LIGHTSPEED 046d:0AF8) go through the generic fallback branch and never recorded the raw version, so they displayed "Centurion 2.6" instead of "Centurion 1.16". Extract the protocol + centurion version recording into a helper and call it from both branches. --------- Co-authored-by: Peter F. Patel-Schneider --- docs/devices/PRO X 2 LIGHTSPEED 0AF7.text | 47 ++ lib/hidapi/common.py | 1 + lib/hidapi/udev_impl.py | 12 +- lib/logitech_receiver/base.py | 120 +++- lib/logitech_receiver/centurion.py | 511 ++++++++++++++++++ lib/logitech_receiver/centurion_constants.py | 38 ++ lib/logitech_receiver/common.py | 14 + lib/logitech_receiver/descriptors.py | 1 + lib/logitech_receiver/device.py | 309 ++++++++++- lib/logitech_receiver/hidpp20.py | 160 +++++- lib/logitech_receiver/hidpp20_constants.py | 54 ++ lib/logitech_receiver/listener.py | 6 +- lib/logitech_receiver/notifications.py | 3 + lib/logitech_receiver/onboard_eq.py | 186 +++++++ lib/logitech_receiver/settings.py | 6 +- lib/logitech_receiver/settings_templates.py | 195 +++++++ lib/logitech_receiver/settings_validator.py | 9 +- lib/solaar/cli/__init__.py | 9 +- lib/solaar/cli/show.py | 190 +++++-- lib/solaar/listener.py | 110 +++- lib/solaar/ui/config_panel.py | 65 +++ lib/solaar/ui/window.py | 8 +- tests/logitech_receiver/fake_hidpp.py | 42 ++ tests/logitech_receiver/test_device.py | 19 + .../logitech_receiver/test_hidpp20_complex.py | 364 ++++++++++++- .../test_setting_templates.py | 54 ++ 26 files changed, 2453 insertions(+), 80 deletions(-) create mode 100644 docs/devices/PRO X 2 LIGHTSPEED 0AF7.text create mode 100644 lib/logitech_receiver/centurion.py create mode 100644 lib/logitech_receiver/centurion_constants.py create mode 100644 lib/logitech_receiver/onboard_eq.py diff --git a/docs/devices/PRO X 2 LIGHTSPEED 0AF7.text b/docs/devices/PRO X 2 LIGHTSPEED 0AF7.text new file mode 100644 index 00000000..fac5a2a0 --- /dev/null +++ b/docs/devices/PRO X 2 LIGHTSPEED 0AF7.text @@ -0,0 +1,47 @@ +solaar version 1.1.19-24-g693159ee + +Centurion Receiver + Device path : /dev/hidraw4 + USB id : 046d:0AF7 + Protocol : Centurion + 1 : 0.02 + Has 1 device(s) out of a maximum of 1. + Supports 5 dongle features: + 0: ROOT {0000} + 1: FEATURE SET {0001} + 2: CENTURION DEVICE INFO {0100} + Firmware: 1 0.02 + Hardware: model 26 rev 255 product 0508 + 3: CENTPP BRIDGE {0003} + 4: CENTURION GENERIC DFU {010A} + + 1: PRO X 2 LIGHTSPEED + Device path : /dev/hidraw4 + USB id : 046d:0AF7 + Codename : PRO X 2 LIGHTSPEED + Kind : headset + Protocol : Centurion 2.6 + Serial number: + Model ID: 0508 + Unit ID: + 1: 0.02 + Supports 10 HID++ 2.0 features: + 0: ROOT {0000} V0 + 1: FEATURE SET {0001} V0 + 2: CENTURION DEVICE NAME {0101} V0 + 3: CENTURION DEVICE INFO {0100} V0 + Firmware: 1 0.02 + Serial: + Hardware: model 26 rev 255 product 0508 + 4: CENTURION BATTERY SOC {0104} V0 + Battery: 97%, BatteryStatus.DISCHARGING. + 5: CENTURION GENERIC DFU {010A} V0 + 6: CENTURION AUTO SLEEP {0108} V0 + Auto Sleep Timeout: 10 + 7: HEADSET AUDIO SIDETONE {0604} V0 + Headset Sidetone: 55 + 8: HEADSET MIC SNR {0602} V0 + Mic SNR: True + 9: HEADSET ONBOARD EQ {0636} V0 + EQ: 80Hz:+0dB, 240Hz:+0dB, 750Hz:+0dB, 2200Hz:+0dB, 6600Hz:+0dB + Battery: 97%, BatteryStatus.DISCHARGING. diff --git a/lib/hidapi/common.py b/lib/hidapi/common.py index 8a5d1b6c..6817511a 100644 --- a/lib/hidapi/common.py +++ b/lib/hidapi/common.py @@ -18,3 +18,4 @@ class DeviceInfo: isDevice: bool hidpp_short: str | None hidpp_long: str | None + centurion: bool = False diff --git a/lib/hidapi/udev_impl.py b/lib/hidapi/udev_impl.py index 8b0c1f03..955dd802 100644 --- a/lib/hidapi/udev_impl.py +++ b/lib/hidapi/udev_impl.py @@ -101,7 +101,7 @@ def _match(action: str, device, filter_func: typing.Callable[[int, int, int, boo try: # if report descriptor does not indicate HID++ capabilities then this device is not of interest to Solaar from hid_parser import ReportDescriptor - hidpp_short = hidpp_long = False + hidpp_short = hidpp_long = centurion = False devfile = "/sys" + hid_device.properties.get("DEVPATH") + "/report_descriptor" with fileopen(devfile, "rb") as fd: with warnings.catch_warnings(): @@ -111,11 +111,16 @@ def _match(action: str, device, filter_func: typing.Callable[[int, int, int, boo # and _Usage(0xFF00, 0x0001) in rd.get_input_items(0x10)[0].usages # be more permissive hidpp_long = 0x11 in rd.input_report_ids and 19 * 8 == int(rd.get_input_report_size(0x11)) # and _Usage(0xFF00, 0x0002) in rd.get_input_items(0x11)[0].usages # be more permissive - if not hidpp_short and not hidpp_long: + # Centurion transport: report ID 0x51, 63-byte reports (usage page 0xFFA0) + centurion = ( + 0x51 in rd.input_report_ids and 63 * 8 == int(rd.get_input_report_size(0x51)) and 0x51 in rd.output_report_ids + ) + if not hidpp_short and not hidpp_long and not centurion: return except Exception as e: # if can't process report descriptor fall back to old scheme hidpp_short = None hidpp_long = None + centurion = False logger.info( "Report Descriptor not processed for DEVICE %s BID %s VID %s PID %s: %s", device.device_node, @@ -125,7 +130,7 @@ def _match(action: str, device, filter_func: typing.Callable[[int, int, int, boo e, ) - filtered_result = filter_func(int(bid, 16), int(vid, 16), int(pid, 16), hidpp_short, hidpp_long) + filtered_result = filter_func(int(bid, 16), int(vid, 16), int(pid, 16), hidpp_short or centurion, hidpp_long or centurion) if not filtered_result: return interface_number = filtered_result.get("usb_interface") @@ -165,6 +170,7 @@ def _match(action: str, device, filter_func: typing.Callable[[int, int, int, boo isDevice=isDevice, hidpp_short=hidpp_short, hidpp_long=hidpp_long, + centurion=centurion if centurion else False, ) return d_info diff --git a/lib/logitech_receiver/base.py b/lib/logitech_receiver/base.py index 69aee2ae..b66e2a1e 100644 --- a/lib/logitech_receiver/base.py +++ b/lib/logitech_receiver/base.py @@ -96,6 +96,20 @@ HIDPP_SHORT_MESSAGE_ID = 0x10 HIDPP_LONG_MESSAGE_ID = 0x11 DJ_MESSAGE_ID = 0x20 +# Centurion transport (used by PRO X 2 LIGHTSPEED headset and similar) +# Uses report ID 0x51 on usage page 0xFFA0, 64-byte frames. +# Wire format (CPL): [0x51, cpl_length, flags=0x00, feat_idx, func_sw, params..., pad] +# cpl_length = number of bytes from flags to end of meaningful data (includes flags byte). +# The device_index byte from standard HID++ is NOT present in Centurion framing. +CENTURION_REPORT_ID = 0x51 +CENTURION_FRAME_SIZE = 64 # 1 byte report ID + 63 bytes payload +_CENTURION_MSG_SIZE = 63 # max reconstructed message size after unwrapping (2 + 61 payload bytes) + +# Set of handles that use Centurion framing +_centurion_handles: set[int] = set() +# Raw Centurion protocol version (major, minor) by handle, from ping response +_centurion_protocol_versions: dict[int, tuple[int, int]] = {} + """Default timeout on read (in seconds).""" DEFAULT_TIMEOUT = 4 @@ -287,6 +301,8 @@ def close(handle): if handle: try: if isinstance(handle, int): + _centurion_handles.discard(handle) + _centurion_protocol_versions.pop(handle, None) hidapi.close(handle) else: handle.close() @@ -318,6 +334,17 @@ def write(handle, devnumber, data, long_message=False): wdata = struct.pack("!BB18s", HIDPP_LONG_MESSAGE_ID, devnumber, data) else: wdata = struct.pack("!BB5s", HIDPP_SHORT_MESSAGE_ID, devnumber, data) + + ihandle = int(handle) + if ihandle in _centurion_handles: + # Centurion CPL framing: [0x51, cpl_length, flags=0x00, feat_idx, func_sw, params...] + # cpl_length = len(meaningful_payload) + 1 (the +1 counts the flags byte) + # The device_index is stripped — only the HID++ payload (feat_idx + func_sw + params) remains. + payload = wdata[2:] # skip report_id and devnumber from standard frame + cpl_length = len(data) + 1 # data is the unpadded payload; +1 for flags byte + wdata = struct.pack("!BBB", CENTURION_REPORT_ID, cpl_length, 0x00) + payload + wdata = wdata + b"\x00" * (CENTURION_FRAME_SIZE - len(wdata)) + if logger.isEnabledFor(logging.DEBUG): logger.debug( "(%s) <= w[%02X %02X %s %s]", @@ -329,7 +356,33 @@ def write(handle, devnumber, data, long_message=False): ) try: - hidapi.write(int(handle), wdata) + hidapi.write(ihandle, wdata) + except Exception as reason: + logger.error("write failed, assuming handle %r no longer available", handle) + close(handle) + raise exceptions.NoReceiver(reason=reason) from reason + + +def write_centurion_cpl(handle, layer3_payload, flags=0x00): + """Send a Centurion CPL frame with the given Layer 3+ payload. + + Builds: [0x51, cpl_length, flags, layer3_payload..., zero-pad to 64 bytes] + where cpl_length = len(layer3_payload) + 1 (the +1 counts the flags byte). + + For multi-fragment sends, flags encodes fragment index and continuation: + flags = (fragment_index << 1) | (1 if more_fragments else 0) + Single-frame messages use flags=0x00 (default). + """ + ihandle = int(handle) + if ihandle not in _centurion_handles: + raise ValueError("write_centurion_cpl called on non-Centurion handle") + cpl_length = len(layer3_payload) + 1 # +1 for flags byte + wdata = struct.pack("!BBB", CENTURION_REPORT_ID, cpl_length, flags) + layer3_payload + wdata = wdata + b"\x00" * (CENTURION_FRAME_SIZE - len(wdata)) + if logger.isEnabledFor(logging.DEBUG): + logger.debug("(%s) <= centurion_cpl[%s]", handle, common.strhex(wdata[: cpl_length + 2])) + try: + hidapi.write(ihandle, wdata) except Exception as reason: logger.error("write failed, assuming handle %r no longer available", handle) close(handle) @@ -361,17 +414,17 @@ def _is_relevant_message(data: bytes) -> bool: """ assert isinstance(data, bytes), (repr(data), type(data)) - # mapping from report_id to message length + # mapping from report_id to accepted message lengths report_lengths = { - HIDPP_SHORT_MESSAGE_ID: SHORT_MESSAGE_SIZE, - HIDPP_LONG_MESSAGE_ID: _LONG_MESSAGE_SIZE, - DJ_MESSAGE_ID: _MEDIUM_MESSAGE_SIZE, - 0x21: _MAX_READ_SIZE, + HIDPP_SHORT_MESSAGE_ID: (SHORT_MESSAGE_SIZE,), + HIDPP_LONG_MESSAGE_ID: (_LONG_MESSAGE_SIZE, _CENTURION_MSG_SIZE), + DJ_MESSAGE_ID: (_MEDIUM_MESSAGE_SIZE,), + 0x21: (_MAX_READ_SIZE,), } report_id = ord(data[:1]) if report_id in report_lengths: - if report_lengths.get(report_id) == len(data): + if len(data) in report_lengths[report_id]: return True else: logger.warning(f"unexpected message size: report_id {report_id:02X} message {common.strhex(data)}") @@ -387,15 +440,35 @@ def _read(handle, timeout) -> tuple[int, int, bytes]: been physically removed from the machine, or the kernel driver has been unloaded. The handle will be closed automatically. """ + ihandle = int(handle) + is_centurion = ihandle in _centurion_handles + read_size = CENTURION_FRAME_SIZE if is_centurion else _MAX_READ_SIZE try: # convert timeout to milliseconds, the hidapi expects it timeout = int(timeout * 1000) - data = hidapi.read(int(handle), _MAX_READ_SIZE, timeout) + data = hidapi.read(ihandle, read_size, timeout) except Exception as reason: logger.warning("read failed, assuming handle %r no longer available", handle) close(handle) raise exceptions.NoReceiver(reason=reason) from reason + if data and is_centurion and ord(data[:1]) == CENTURION_REPORT_ID: + # Unwrap Centurion CPL framing: + # RX: [0x51, cpl_length, flags=0x00, feat_idx, func_sw, data...] + # cpl_length includes the flags byte, so meaningful payload starts at byte 3 + # and has (cpl_length - 1) bytes. + # Reconstruct as HID++ long message: [0x11, devnumber=0xFF, feat_idx, func_sw, data...] + cpl_length = ord(data[1:2]) + inner_payload = data[3 : 2 + cpl_length] # bytes 3..2+cpl_length-1 = cpl_length-1 bytes + data = bytes([HIDPP_LONG_MESSAGE_ID, 0xFF]) + inner_payload + # Pad to a valid message size: standard long (20) or Centurion extended (63) + if len(data) <= _LONG_MESSAGE_SIZE: + data = data + b"\x00" * (_LONG_MESSAGE_SIZE - len(data)) + elif len(data) <= _CENTURION_MSG_SIZE: + data = data + b"\x00" * (_CENTURION_MSG_SIZE - len(data)) + else: + data = data[:_CENTURION_MSG_SIZE] + if data and _is_relevant_message(data): # ignore messages that fail check report_id = ord(data[:1]) devnumber = ord(data[1:2]) @@ -564,13 +637,17 @@ def request( if reply_data[:1] == b"\xff" and reply_data[1:3] == request_data[:2]: # a HID++ 2.0 feature call returned with an error error = ord(reply_data[3:4]) + try: + error_name = Hidpp20ErrorCode(error) + except ValueError: + error_name = f"unknown:{error:02X}" logger.error( "(%s) device %d error on feature request {%04X}: %d = %s", handle, devnumber, request_id, error, - Hidpp20ErrorCode(error), + error_name, ) raise exceptions.FeatureCallError( number=devnumber, @@ -641,9 +718,15 @@ def ping(handle, devnumber, long_message: bool = False): if reply: report_id, reply_devnumber, reply_data = reply if reply_devnumber == devnumber or reply_devnumber == devnumber ^ 0xFF: # BT device returning 0x00 - if reply_data[:2] == request_data[:2] and reply_data[4:5] == request_data[-1:]: + is_centurion = int(handle) in _centurion_handles + mark_ok = is_centurion or reply_data[4:5] == request_data[-1:] + if reply_data[:2] == request_data[:2] and mark_ok: # HID++ 2.0+ device, currently connected - return ord(reply_data[2:3]) + ord(reply_data[3:4]) / 10.0 + major = ord(reply_data[2:3]) + minor = ord(reply_data[3:4]) + if is_centurion: + _centurion_protocol_versions[int(handle)] = (major, minor) + return major + minor / 10.0 if ( report_id == HIDPP_SHORT_MESSAGE_ID @@ -675,17 +758,30 @@ def _read_input_buffer(handle, ihandle, notifications_hook): Used by request() and ping() before their write. """ + is_centurion = ihandle in _centurion_handles + read_size = CENTURION_FRAME_SIZE if is_centurion else _MAX_READ_SIZE while True: try: # read whatever is already in the buffer, if any - data = hidapi.read(ihandle, _MAX_READ_SIZE, 0) + data = hidapi.read(ihandle, read_size, 0) except Exception as reason: logger.error("read failed, assuming receiver %s no longer available", handle) close(handle) raise exceptions.NoReceiver(reason=reason) from reason if data: + if is_centurion and ord(data[:1]) == CENTURION_REPORT_ID: + # Unwrap Centurion CPL framing same as in _read() + cpl_length = ord(data[1:2]) + inner_payload = data[3 : 2 + cpl_length] + data = bytes([HIDPP_LONG_MESSAGE_ID, 0xFF]) + inner_payload + if len(data) <= _LONG_MESSAGE_SIZE: + data = data + b"\x00" * (_LONG_MESSAGE_SIZE - len(data)) + elif len(data) <= _CENTURION_MSG_SIZE: + data = data + b"\x00" * (_CENTURION_MSG_SIZE - len(data)) + else: + data = data[:_CENTURION_MSG_SIZE] if _is_relevant_message(data): # only process messages that pass check # report_id = ord(data[:1]) if notifications_hook: diff --git a/lib/logitech_receiver/centurion.py b/lib/logitech_receiver/centurion.py new file mode 100644 index 00000000..f1222869 --- /dev/null +++ b/lib/logitech_receiver/centurion.py @@ -0,0 +1,511 @@ +## Copyright (C) 2012-2013 Daniel Pavel +## Copyright (C) 2014-2024 Solaar Contributors https://pwr-solaar.github.io/Solaar/ +## +## This program is free software; you can redistribute it and/or modify +## it under the terms of the GNU General Public License as published by +## the Free Software Foundation; either version 2 of the License, or +## (at your option) any later version. +## +## This program is distributed in the hope that it will be useful, +## but WITHOUT ANY WARRANTY; without even the implied warranty of +## MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +## GNU General Public License for more details. +## +## You should have received a copy of the GNU General Public License along +## with this program; if not, write to the Free Software Foundation, Inc., +## 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. + +"""Centurion device protocol — receiver class, factory, device info/firmware/battery queries. + +CenturionReceiver is a lightweight receiver-like container for Centurion +(PRO X 2 LIGHTSPEED and similar) dongles. Protocol functions query device +info, firmware, serial, name, and battery via Centurion-specific HID++ features. +""" + +from __future__ import annotations + +import errno +import logging +import struct + +from typing import Callable + +from solaar import configuration + +from . import base +from . import exceptions +from . import hidpp10 +from .centurion_constants import CenturionCoreFeature +from .common import Alert +from .common import Battery +from .common import BatteryStatus +from .common import FirmwareKind +from .common import _read_usb_product_string +from .hidpp20_constants import SupportedFeature + +logger = logging.getLogger(__name__) + + +# --- Centurion protocol functions (standalone, operate on any device-like object) --- + + +def get_firmware_centurion(device): + """Reads firmware info from a Centurion device via DeviceInfo (0x0100) function 1.""" + from . import common + + fw = [] + seen = set() # track response signatures to detect duplicates + for index in range(0, 8): # try up to 8 entities + try: + report = device.feature_request(SupportedFeature.CENTURION_DEVICE_INFO, 0x10, index) + except exceptions.FeatureCallError: + break + if not report or len(report) < 5: + break + # Dedup: parent device returns the same response for every entity index + sig = bytes(report[: 5 + report[4]]) + if sig in seen: + break + seen.add(sig) + fw_type = report[0] + version = struct.unpack("!H", report[2:4])[0] + name_len = report[4] + name = report[5 : 5 + name_len].decode("ascii", errors="replace").rstrip("\x00") if name_len else "" + version_str = f"{version >> 8}.{version & 0xFF:02d}" + kind = FirmwareKind(fw_type) if fw_type <= 3 else FirmwareKind.Other + fw.append(common.FirmwareInfo(kind, name, version_str, None)) + return tuple(fw) if fw else None + + +def get_serial_centurion(device): + """Reads the serial number from a Centurion device via DeviceInfo (0x0100) function 2.""" + try: + report = device.feature_request(SupportedFeature.CENTURION_DEVICE_INFO, 0x20) + except exceptions.FeatureCallError: + return None + if not report or len(report) < 2: + return None + str_len = report[0] + return report[1 : 1 + str_len].decode("ascii", errors="replace").rstrip("\x00") + + +def get_hardware_info_centurion(device): + """Reads hardware info from a Centurion device via DeviceInfo (0x0100) function 0. + + Returns (modelId, hardwareRevision, productId) or None. + """ + try: + report = device.feature_request(SupportedFeature.CENTURION_DEVICE_INFO) + except exceptions.FeatureCallError: + return None + if not report or len(report) < 4: + return None + model_id = report[0] + hw_revision = report[1] + product_id = struct.unpack("!H", report[2:4])[0] + return model_id, hw_revision, product_id + + +def _centurion_sub_device_info_request(device, function=0x00, *params): + """Send a DeviceInfo (0x0100) request to the sub-device via bridge.""" + sub_indices = getattr(device, "_centurion_sub_indices", {}) + sub_idx = sub_indices.get(SupportedFeature.CENTURION_DEVICE_INFO) + if sub_idx is None: + return None + return device.centurion_bridge_request(sub_idx, function, *params) + + +def get_firmware_centurion_sub(device): + """Reads firmware info from the Centurion sub-device (headset) via bridge.""" + from . import common + + fw = [] + seen = set() + for index in range(0, 8): + report = _centurion_sub_device_info_request(device, 0x10, index) + if not report or len(report) < 5: + break + sig = bytes(report[: 5 + report[4]]) + if sig in seen: + break + seen.add(sig) + fw_type = report[0] + version = struct.unpack("!H", report[2:4])[0] + name_len = report[4] + name = report[5 : 5 + name_len].decode("ascii", errors="replace").rstrip("\x00") if name_len else "" + version_str = f"{version >> 8}.{version & 0xFF:02d}" + kind = FirmwareKind(fw_type) if fw_type <= 3 else FirmwareKind.Other + fw.append(common.FirmwareInfo(kind, name, version_str, None)) + return tuple(fw) if fw else None + + +def get_serial_centurion_sub(device): + """Reads the serial number from the Centurion sub-device (headset) via bridge.""" + report = _centurion_sub_device_info_request(device, 0x20) + if not report or len(report) < 2: + return None + str_len = report[0] + return report[1 : 1 + str_len].decode("ascii", errors="replace").rstrip("\x00") + + +def get_hardware_info_centurion_sub(device): + """Reads hardware info from the Centurion sub-device (headset) via bridge. + + Returns (modelId, hardwareRevision, productId) or None. + """ + report = _centurion_sub_device_info_request(device) + if not report or len(report) < 4: + return None + model_id = report[0] + hw_revision = report[1] + product_id = struct.unpack("!H", report[2:4])[0] + return model_id, hw_revision, product_id + + +def get_name_centurion(device): + """Reads a Centurion device's name via DeviceName (0x0101). + + Tries two response formats: + 1. Inline: function 0 returns [name_len, name_bytes...] (like serial) + 2. Chunked: function 0 returns [name_len], function 1 returns [name_bytes...] (like standard DeviceName) + """ + try: + reply = device.feature_request(SupportedFeature.CENTURION_DEVICE_NAME) + except exceptions.FeatureCallError: + return None + if not reply: + return None + name_length = reply[0] + if name_length == 0: + return None + # If the full name is inline (length + name bytes in one response) + if len(reply) >= 1 + name_length: + return reply[1 : 1 + name_length].decode("utf-8", errors="replace").rstrip("\x00") + # Otherwise, fetch name in chunks via function 1 (like standard DEVICE_NAME) + name = b"" + while len(name) < name_length: + try: + fragment = device.feature_request(SupportedFeature.CENTURION_DEVICE_NAME, 0x10, len(name)) + except exceptions.FeatureCallError: + break + if fragment: + name += fragment[: name_length - len(name)] + else: + break + return name.decode("utf-8", errors="replace").rstrip("\x00") if name else None + + +def get_battery_centurion(device): + """Query battery via CENTURION_BATTERY_SOC.""" + try: + report = device.feature_request(SupportedFeature.CENTURION_BATTERY_SOC) + if report is not None: + return decipher_battery_centurion(report) + except exceptions.FeatureCallError: + if SupportedFeature.CENTURION_BATTERY_SOC in device.features: + return SupportedFeature.CENTURION_BATTERY_SOC + return None + + +def decipher_battery_centurion(report) -> tuple[SupportedFeature, Battery]: + """Decipher CENTURION_BATTERY_SOC (0x0104) response. + + Response format (3 bytes): + Byte 0: Battery Percentage (0-100) + Byte 1: Battery Percentage (duplicate) + Byte 2: Charging Status (0=discharging, 1=charging, 2=charging via USB, 3=charge complete) + """ + if len(report) < 1: + return SupportedFeature.CENTURION_BATTERY_SOC, Battery(None, None, BatteryStatus.DISCHARGING, None) + soc = report[0] + logger.debug("centurion battery SOC raw: %s", report[:8].hex()) + charging_status = report[2] if len(report) >= 3 else 0 + if charging_status in (1, 2): + status = BatteryStatus.RECHARGING + elif charging_status == 3: + status = BatteryStatus.FULL + else: + status = BatteryStatus.DISCHARGING + return SupportedFeature.CENTURION_BATTERY_SOC, Battery(soc, None, status, None) + + +# --- CenturionReceiver class --- + + +class CenturionReceiver: + """A lightweight receiver-like container for Centurion (PRO X 2 LIGHTSPEED) dongles. + + Provides the Receiver interface to the UI so the dongle appears as a parent + with the headset as an indented child device. NOT a subclass of Receiver — + Receiver's __init__ does HID++ 1.0 register reads and pairing setup that + don't apply to Centurion. + + All centurion communication (bridge, features, settings, battery) lives in + the child Device; this class is just a UI container + handle owner. + """ + + read_register: Callable = hidpp10.read_register + write_register: Callable = hidpp10.write_register + number = 0xFF + kind = None + isDevice = False + may_unpair = False + re_pairs = False + max_devices = 1 + + def __init__(self, low_level, handle, device_info, setting_callback=None): + assert handle + self.low_level = low_level + self.handle = handle + self.path = device_info.path + self.product_id = device_info.product_id + self.setting_callback = setting_callback + self.status_callback = None + self.notification_flags = None + self._devices = {} + self._firmware = None + self._dongle_features = None # independently probed dongle features + self.cleanups = [] + + # Receiver identity + self.serial = None + self._usb_name = getattr(device_info, "product", None) + if not self._usb_name and self.path: + self._usb_name = _read_usb_product_string(self.path) + self.name = "Centurion Receiver" + + # Dummy pairing object — lock_open stays False + from .receiver import Pairing + + self.pairing = Pairing() + + # Discover dongle features independently + self._discover_dongle_features() + + # Read serial from dongle's CENTURION_DEVICE_INFO if available + if self.serial is None: + try: + s = get_serial_centurion(self) + if s and s.strip() and s.strip().isprintable(): + self.serial = s.strip() + except Exception: + pass + + def enable_connection_notifications(self, enable=True): + return False + + def remaining_pairings(self, cache=True): + return None + + def device_codename(self, n): + return self._usb_name + + def request(self, request_id, *params, no_reply=False): + """Send an HID++ request directly to the dongle (not through bridge).""" + if self.handle: + return self.low_level.request( + self.handle, 0xFF, request_id, *params, no_reply=no_reply, long_message=True, protocol=2.0 + ) + + def feature_request(self, feature, function=0x00, *params, no_reply=False): + """Send a feature request to the dongle using discovered feature indices.""" + if self._dongle_features is None: + self._discover_dongle_features() + feature_int = int(feature) + for _feat, feat_id, index in self._dongle_features or []: + if feat_id == feature_int: + request_id = (index << 8) | (function & 0xFF) + return self.request(request_id, *params, no_reply=no_reply) + raise exceptions.FeatureNotSupported(feature) + + def _discover_dongle_features(self): + """Independently discover features on the dongle hardware.""" + self._dongle_features = [] + try: + # Query ROOT for FEATURE_SET index + response = self.request(0x0000, 0x00, 0x01) + if response is None or response[0] == 0: + return + fs_index = response[0] + # Get feature count + count_resp = self.request(fs_index << 8) + if count_resp is None: + return + feature_count = count_resp[0] + # Enumerate features via CenturionFeatureSet (func 1 = 0x10, per-index query) + for idx in range(feature_count): + resp = self.request((fs_index << 8) | 0x10, idx) + if resp is None or len(resp) < 3: + continue + feat_id = struct.unpack("!H", resp[1:3])[0] + try: + feature = SupportedFeature(feat_id) + except ValueError: + feature = f"unknown:{feat_id:04X}" + self._dongle_features.append((feature, feat_id, idx)) + if logger.isEnabledFor(logging.DEBUG): + logger.debug("Centurion dongle features: %s", self._dongle_features) + except Exception: + logger.debug("Centurion dongle feature discovery failed", exc_info=True) + + @property + def dongle_features(self): + """Return list of (feature, feat_id, index) tuples for dongle features.""" + if self._dongle_features is None: + self._discover_dongle_features() + return self._dongle_features + + def count(self): + return len([d for d in self._devices.values() if d is not None]) + + @property + def firmware(self): + if self._firmware is None and self.handle: + self._firmware = get_firmware_centurion(self) + return self._firmware or () + + def notify_devices(self): + """Create child Device for the headset and trigger its initialization.""" + # Import Device locally to avoid circular import (centurion.py ↔ device.py) + from .device import Device + + # Signal receiver to UI first — tray/window need the receiver entry + # before a child device can be added under it. + self.changed(alert=Alert.NONE) + + # Create child Device with receiver=self, number=1 + pairing_info = { + "wpid": self.product_id, + "kind": None, + "serial": None, + "polling": None, + "power_switch": None, + } + dev = Device( + self.low_level, + self, + 1, + None, + pairing_info=pairing_info, + setting_callback=self.setting_callback, + ) + # Set centurion attributes on the child + dev.centurion = True + dev.product_id = self.product_id + dev.hidpp_long = True + dev._centurion_usb_name = self._usb_name + # Pre-set bridge index from dongle features so ping can probe the headset + for _feat, feat_id, idx in self._dongle_features or []: + if feat_id == CenturionCoreFeature.CENT_PP_BRIDGE: + dev._centurion_bridge_index = idx + break + + self._devices[1] = dev + configuration.attach_to(dev) + dev.status_callback = self.status_callback + + # Ping to determine online status. + # Notify UI either way — offline devices show as greyed out (matching receiver behavior). + online = dev.ping() + dev.changed(active=online) + if self.status_callback is not None: + self.status_callback(dev) + + def changed(self, alert=Alert.NOTIFICATION, reason=None): + if self.status_callback is not None: + self.status_callback(self, alert=alert, reason=reason) + + def status_string(self): + count = self.count() + if count == 0: + return "No devices." + return f"{count} device connected." + + def close(self): + handle, self.handle = self.handle, None + for _n, d in self._devices.items(): + if d: + d.close() + self._devices.clear() + for cleanup in self.cleanups: + cleanup(self) + return handle and self.low_level.close(handle) + + def __del__(self): + self.close() + + def __iter__(self): + for dev in self._devices.values(): + if dev is not None: + yield dev + + def __getitem__(self, key): + dev = self._devices.get(key) + if dev is not None: + return dev + raise IndexError(key) + + def __len__(self): + return len([d for d in self._devices.values() if d is not None]) + + def __contains__(self, dev): + if isinstance(dev, int): + return self._devices.get(dev) is not None + return self.__contains__(dev.number) + + def __bool__(self): + return self.handle is not None + + __nonzero__ = __bool__ + + def __eq__(self, other): + return other is not None and self.kind == other.kind and self.path == other.path + + def __ne__(self, other): + return other is None or self.kind != other.kind or self.path != other.path + + def __hash__(self): + return self.path.__hash__() + + def __str__(self): + return "<%s(%s,%s%s)>" % ( + self.name.replace(" ", "") if self.name else "CenturionReceiver", + self.path, + "" if isinstance(self.handle, int) else "T", + self.handle, + ) + + __repr__ = __str__ + + +def create_centurion_receiver(low_level, device_info, setting_callback=None): + """Opens a Centurion dongle and wraps it as a receiver-like container. + + Creates a CenturionReceiver, discovers its features, then checks if + CentPPBridge (0x0003) is among them. If not, this is a direct-connected + device (wired headset) — close and return None so the caller can fall + back to create_device(). + + :returns: A CenturionReceiver, or None. + """ + try: + handle = low_level.open_path(device_info.path) + if handle: + base._centurion_handles.add(int(handle)) + cr = CenturionReceiver(low_level, handle, device_info, setting_callback) + # Check if any discovered feature is CentPPBridge (0x0003) + has_bridge = any(feat_id == CenturionCoreFeature.CENT_PP_BRIDGE for _, feat_id, _ in (cr.dongle_features or [])) + if not has_bridge: + logger.info("Centurion device %s has no bridge, treating as direct device", device_info.path) + base._centurion_handles.discard(int(handle)) + cr.handle = None # prevent __del__ from double-closing + low_level.close(handle) + return None + return cr + except OSError as e: + logger.exception("open %s", device_info) + if e.errno == errno.EACCES: + raise e + except Exception as e: + logger.exception("open %s", device_info) + raise e diff --git a/lib/logitech_receiver/centurion_constants.py b/lib/logitech_receiver/centurion_constants.py new file mode 100644 index 00000000..6e0e3585 --- /dev/null +++ b/lib/logitech_receiver/centurion_constants.py @@ -0,0 +1,38 @@ +"""Centurion transport-specific constants. + +Feature IDs that collide with HID++ 2.0 core features live here +so they can coexist with SupportedFeature (which requires unique values). +""" + +from __future__ import annotations + +from enum import IntEnum + +from .hidpp20_constants import SupportedFeature + + +class CenturionCoreFeature(IntEnum): + """Centurion transport-specific features that collide with HID++ 2.0 core IDs.""" + + CENTURION_ROOT = 0x0000 + CENTURION_FEATURE_SET = 0x0001 + CENT_PP_BRIDGE = 0x0003 + MULTI_HOST_CONTROL = 0x0005 + KEEP_ALIVE = 0x0007 + + def __str__(self): + return self.name.replace("_", " ") + + +def resolve_feature(feat_id: int, centurion: bool = False): + """Resolve a feature ID to the appropriate enum, checking centurion-specific + features first when on the centurion transport.""" + if centurion: + try: + return CenturionCoreFeature(feat_id) + except ValueError: + pass + try: + return SupportedFeature(feat_id) + except ValueError: + return None diff --git a/lib/logitech_receiver/common.py b/lib/logitech_receiver/common.py index 3d00a015..3aa4127e 100644 --- a/lib/logitech_receiver/common.py +++ b/lib/logitech_receiver/common.py @@ -674,3 +674,17 @@ class Notification(IntEnum): class BusID(IntEnum): USB = 0x03 BLUETOOTH = 0x05 + + +def _read_usb_product_string(hidraw_path): + """Read the USB product string from sysfs for a hidraw device path.""" + import pathlib + + try: + # /sys/class/hidraw/hidrawN/device/../../product → USB device product string + hidraw_name = pathlib.Path(hidraw_path).name + product_path = pathlib.Path("/sys/class/hidraw") / hidraw_name / "device" / ".." / ".." / "product" + product = product_path.read_text().strip() + return product if product else None + except (OSError, ValueError): + return None diff --git a/lib/logitech_receiver/descriptors.py b/lib/logitech_receiver/descriptors.py index fc81d494..41a66670 100644 --- a/lib/logitech_receiver/descriptors.py +++ b/lib/logitech_receiver/descriptors.py @@ -465,3 +465,4 @@ _D( kind=DEVICE_KIND.headset, usbid=0x0ABA, ) +# PRO X 2 LIGHTSPEED Gaming Headset (0x0AF7) — fully probed via Centurion transport, no static descriptor needed diff --git a/lib/logitech_receiver/device.py b/lib/logitech_receiver/device.py index ca5de065..2f23c5d1 100644 --- a/lib/logitech_receiver/device.py +++ b/lib/logitech_receiver/device.py @@ -19,6 +19,7 @@ from __future__ import annotations import errno import logging +import struct import threading import time import typing @@ -29,6 +30,7 @@ from typing import Protocol from solaar import configuration +from . import base from . import descriptors from . import exceptions from . import hidpp10 @@ -38,6 +40,7 @@ from . import settings from . import settings_templates from .common import Alert from .common import Battery +from .common import _read_usb_product_string from .hidpp10_constants import NotificationFlag from .hidpp20_constants import SupportedFeature @@ -74,6 +77,8 @@ def create_device(low_level: LowLevelInterface, device_info, setting_callback=No try: handle = low_level.open_path(device_info.path) if handle: + if getattr(device_info, "centurion", False): + base._centurion_handles.add(int(handle)) # a direct connected device might not be online (as reported by user) return Device( low_level, @@ -124,6 +129,16 @@ class Device: self.product_id = device_info.product_id if device_info else None self.hidpp_short = device_info.hidpp_short if device_info else None self.hidpp_long = device_info.hidpp_long if device_info else None + self.centurion = device_info.centurion if device_info else False + self._centurion_usb_name = None + if self.centurion: + self.hidpp_long = True # Centurion devices always use long HID++ messages + # Read USB product string for device name — avoids slow bridge probe via CENTURION_DEVICE_NAME. + # device_info.product is often None (udev reads USB interface attrs, not device attrs), + # so fall back to reading from sysfs. + self._centurion_usb_name = getattr(device_info, "product", None) if device_info else None + if not self._centurion_usb_name and self.path: + self._centurion_usb_name = _read_usb_product_string(self.path) self.bluetooth = device_info.bus_id == 0x0005 if device_info else False # Bluetooth needs long messages self.hid_serial = device_info.serial if device_info else None self.setting_callback = setting_callback # for changes to settings @@ -230,10 +245,14 @@ class Device: def codename(self): if not self._codename: if self.online and self.protocol >= 2.0: - self._codename = _hidpp20.get_friendly_name(self) + if not self.centurion: + self._codename = _hidpp20.get_friendly_name(self) if not self._codename and self.name: - names = self.name.split(" ") - self._codename = names[1 if len(names) > 1 and names[0] == "Logitech" else 0] + if self.centurion: + self._codename = self.name + else: + names = self.name.split(" ") + self._codename = names[1 if len(names) > 1 and names[0] == "Logitech" else 0] if not self._codename and self.receiver: codename = self.receiver.device_codename(self.number) if codename: @@ -247,17 +266,40 @@ class Device: if not self._name: with self._simple_lock: if self._name is None: - if self.online and self.protocol >= 2.0: + if self.online and self.centurion: + self._name = _hidpp20.get_name_centurion(self) or getattr(self, "_centurion_usb_name", None) + if not self._name: + self._name = f"Unknown device {self.wpid or self.product_id}" + elif self.online and self.protocol >= 2.0: self._name = _hidpp20.get_name(self) return self._name or self._codename or f"Unknown device {self.wpid or self.product_id}" def get_ids(self): + if self.centurion: + self._get_ids_centurion() + return ids = _hidpp20.get_ids(self) if ids: self._unitId, self._modelId, self._tid_map = ids if logger.isEnabledFor(logging.INFO) and self._serial and self._serial != self._unitId: logger.info("%s: unitId %s does not match serial %s", self, self._unitId, self._serial) + def _get_ids_centurion(self): + if getattr(self, "_centurion_ids_done", False): + return + self._centurion_ids_done = True + serial = _hidpp20.get_serial_centurion(self) + if not serial or not serial.strip() or not serial.strip().isprintable(): + serial = _hidpp20.get_serial_centurion_sub(self) + if serial and serial.strip() and serial.strip().isprintable(): + self._serial = serial.strip() + self._unitId = self._serial + hw_info = _hidpp20.get_hardware_info_centurion(self) + if hw_info: + model_id, hw_revision, product_id = hw_info + self._modelId = f"{product_id:04X}" + self._tid_map = {"usbid": f"{product_id:04X}"} + @property def unitId(self): if not self._unitId and self.online and self.protocol >= 2.0: @@ -279,13 +321,32 @@ class Device: @property def kind(self): if not self._kind and self.online and self.protocol >= 2.0: - self._kind = _hidpp20.get_kind(self) + if self.centurion: + self._kind = self._infer_kind_centurion() + else: + self._kind = _hidpp20.get_kind(self) return self._kind or "?" + def _infer_kind_centurion(self): + """Infer device kind from Centurion features (sub-device or top-level).""" + # Check sub-device features (wireless via bridge) + for feature in getattr(self, "_centurion_sub_features", ()): + if isinstance(feature, int) and 0x0600 <= feature <= 0x06FF: + return hidpp10_constants.DEVICE_KIND.headset + # Check top-level features (direct USB connection, no bridge) + if self.features: + for feature, _index in self.features.enumerate(): + feat_int = int(feature) if isinstance(feature, int) else 0 + if 0x0600 <= feat_int <= 0x06FF: + return hidpp10_constants.DEVICE_KIND.headset + return None + @property def firmware(self) -> tuple[common.FirmwareInfo]: if self._firmware is None and self.online: - if self.protocol >= 2.0: + if self.centurion: + self._firmware = _hidpp20.get_firmware_centurion_sub(self) or _hidpp20.get_firmware_centurion(self) + elif self.protocol >= 2.0: self._firmware = _hidpp20.get_firmware(self) else: self._firmware = _hidpp10.get_firmware(self) @@ -293,6 +354,8 @@ class Device: @property def serial(self): + if not self._serial and self.online and self.centurion: + self.get_ids() return self._serial or "" @property @@ -472,7 +535,7 @@ class Device: else: self.set_configuration(0x11) # signal end of configuration self.read_battery() # battery information may have changed so try to read it now - elif was_active and self.receiver: # need to set configuration pending flag in receiver + elif was_active and self.receiver and not isinstance(self.receiver, CenturionReceiver): hidpp10.set_configuration_pending_flags(self.receiver, 0xFF) if logger.isEnabledFor(logging.DEBUG): logger.debug("device %d changed: active=%s %s", self.number, self._active, self.battery_info) @@ -537,9 +600,12 @@ class Device: long = self.hidpp_long is True or ( self.hidpp_long is None and (self.bluetooth or self._protocol is not None and self._protocol >= 2.0) ) + # Centurion child: CPL framing strips devnumber and responses always + # have devnumber=0xFF, so we must send 0xFF to match responses. + devnumber = 0xFF if (self.centurion and self.receiver and not self.handle) else self.number return self.low_level.request( self.handle or (self.receiver.handle if self.receiver else None), - self.number, + devnumber, request_id, *params, no_reply=no_reply, @@ -549,11 +615,231 @@ class Device: def feature_request(self, feature, function=0x00, *params, no_reply=False): if self.protocol >= 2.0: + if self.centurion: + # Ensure sub-device features are discovered before routing decision + if self.features is not None: + self.features._check() + if feature in getattr(self, "_centurion_sub_features", ()): + sub_idx = self.features.get(feature) + if sub_idx is not None: + return self.centurion_bridge_request(sub_idx, function, *params, no_reply=no_reply) return hidpp20.feature_request(self, feature, function, *params, no_reply=no_reply) + # Max sub-message bytes in the first bridge fragment: + # 64 - 1 (report ID) - 1 (cpl_len) - 1 (flags) - 2 (bridge prefix) - 2 (bridge hdr) = 57 + # LGHUB uses 56 for first fragment (60 byte payload - 4 bridge overhead) + _BRIDGE_FIRST_CHUNK = 56 + # Continuation fragments carry raw sub_msg data (no bridge prefix/hdr): + # 64 - 1 (report ID) - 1 (cpl_len) - 1 (flags) = 61, but LGHUB uses 60 + _BRIDGE_CONT_CHUNK = 60 + + def centurion_bridge_request(self, sub_feat_idx, sub_function=0x00, *params, no_reply=False): + """Send a request to a Centurion sub-device via CentPPBridge. + + Builds the 4-layer nested message: + Layer 1: [0x51] + Layer 2: [cpl_length, flags] + Layer 3: [bridge_idx, sendFragment_func|swid, bridge_hdr...] + Layer 4: [sub_cpl=0x00, sub_feat_idx, sub_func|swid, params...] + + For multi-fragment sends, only the first fragment includes the bridge + prefix and header. Continuation fragments carry raw sub_msg data. + The CPL flags byte encodes fragment index and continuation: + flags = (fragment_index << 1) | (1 if more_fragments else 0) + Single-frame messages use flags=0x00. + + Returns the sub-device response data (after bridge header), or None. + """ + if not getattr(self, "centurion", False): + raise ValueError("centurion_bridge_request called on non-Centurion device") + bridge_idx = getattr(self, "_centurion_bridge_index", None) + if bridge_idx is None: + raise ValueError("CentPPBridge index not discovered yet") + handle = self.handle or (self.receiver.handle if self.receiver else None) + if not handle: + return None + + sw_id = base._get_next_sw_id() + + # Build sub-device message: [sub_cpl=0x00, sub_feat_idx, sub_func|swid, params...] + # sub_function is in standard HID++ format: func_number << 4 (e.g. 0x10 for function 1) + sub_params = b"".join(struct.pack("B", p) if isinstance(p, int) else p for p in params) if params else b"" + sub_msg = struct.pack("BBB", 0x00, sub_feat_idx, (sub_function & 0xF0) | sw_id) + sub_params + + # Build bridge header: [device_id<<4 | len_hi, len_lo] + # device_id=0 for the headset, len is the total sub-message length + sub_len = len(sub_msg) + bridge_hdr = struct.pack("BB", (0x00 << 4) | ((sub_len >> 8) & 0x0F), sub_len & 0xFF) + bridge_prefix = struct.pack("BB", bridge_idx, (0x01 << 4) | sw_id) + + timeout = base.DEFAULT_TIMEOUT + with base.acquire_timeout(base.handle_lock(handle), handle, timeout): + if sub_len <= self._BRIDGE_FIRST_CHUNK: + # Single-frame path + layer3 = bridge_prefix + bridge_hdr + sub_msg + base.write_centurion_cpl(handle, layer3) + else: + # Multi-fragment send + # Fragment 0: bridge_prefix + bridge_hdr + first chunk of sub_msg + # Fragments 1+: raw sub_msg continuation data (no bridge overhead) + # CPL flags = (frag_index << 1) | (1 if more_fragments else 0) + # All fragments are sent back-to-back without waiting for + # intermediate ACKs (verified via LGHUB pcap). The device + # reassembles internally and sends a single ACK + MessageEvent + # after the last fragment. + frag_index = 0 + offset = 0 + while offset < sub_len: + if frag_index == 0: + chunk_size = self._BRIDGE_FIRST_CHUNK + chunk = sub_msg[offset : offset + chunk_size] + layer3 = bridge_prefix + bridge_hdr + chunk + else: + chunk_size = self._BRIDGE_CONT_CHUNK + chunk = sub_msg[offset : offset + chunk_size] + layer3 = chunk + has_more = (offset + chunk_size) < sub_len + flags = (frag_index << 1) | (1 if has_more else 0) + base.write_centurion_cpl(handle, layer3, flags=flags) + offset += len(chunk) + frag_index += 1 + + if no_reply: + return None + + # Read ACK + MessageEvent response + request_started = time.time() + ack_received = False + while time.time() - request_started < timeout: + reply = base._read(handle, timeout) + if not reply: + continue + _report_id, _devnumber, reply_data = reply + # ACK: short response echoing feat_idx and func|swid + if len(reply_data) >= 2 and reply_data[0] == bridge_idx: + func_sw = reply_data[1] + if (func_sw >> 4) == 0x01 and (func_sw & 0x0F) == sw_id: + ack_received = True + break + if (func_sw >> 4) == 0x01 and (func_sw & 0x0F) == 0: + # MessageEvent arrived before ACK — validate it's for our request + if self._is_bridge_response_for(reply_data, sub_feat_idx): + if logger.isEnabledFor(logging.DEBUG): + logger.debug("bridge idx=%d fn=0x%02X -> OK", sub_feat_idx, sub_function) + return self._parse_bridge_response(reply_data) + # Unsolicited notification, skip it + if not ack_received: + logger.warning("centurion_bridge_request: no ACK received") + return None + + # Read MessageEvent response (bridge function 1 with SW ID 0 = event) + while time.time() - request_started < timeout: + reply = base._read(handle, timeout) + if not reply: + continue + _report_id, _devnumber, reply_data = reply + if len(reply_data) >= 2 and reply_data[0] == bridge_idx: + func_sw = reply_data[1] + if (func_sw >> 4) == 0x01 and (func_sw & 0x0F) == 0: + if self._is_bridge_response_for(reply_data, sub_feat_idx): + if logger.isEnabledFor(logging.DEBUG): + logger.debug("bridge idx=%d fn=0x%02X -> OK", sub_feat_idx, sub_function) + return self._parse_bridge_response(reply_data) + # Unsolicited notification for a different feature, skip it + logger.warning("centurion_bridge_request: no MessageEvent received") + return None + + @staticmethod + def _wait_for_bridge_ack(handle, bridge_idx, sw_id, timeout): + """Wait for a bridge ACK response between multi-fragment sends.""" + started = time.time() + while time.time() - started < timeout: + reply = base._read(handle, timeout) + if not reply: + continue + _report_id, _devnumber, reply_data = reply + if len(reply_data) >= 2 and reply_data[0] == bridge_idx: + func_sw = reply_data[1] + if (func_sw >> 4) == 0x01 and (func_sw & 0x0F) == sw_id: + return True + return False + + @staticmethod + def _is_bridge_response_for(reply_data, expected_sub_feat_idx): + """Check if a bridge MessageEvent is a response for our specific sub-feature request. + + Accepts both normal responses (sub_feat_idx matches) and error responses + (sub_feat_idx=0xFF with original feat_idx in next byte). + Unsolicited notifications (sub_cpl=0xFF) are rejected. + """ + if len(reply_data) < 6: + return False + sub_cpl = reply_data[4] + sub_feat_idx = reply_data[5] + # Notifications have sub_cpl=0xFF; our responses have sub_cpl=0x00 + if sub_cpl != 0x00: + return False + if sub_feat_idx == expected_sub_feat_idx: + return True + # Error response: sub_feat_idx=0xFF, next byte is the original feat_idx that errored + if sub_feat_idx == 0xFF and len(reply_data) >= 7 and reply_data[6] == expected_sub_feat_idx: + return True + return False + + @staticmethod + def _parse_bridge_response(reply_data): + """Extract sub-device response from a CentPPBridge MessageEvent. + + reply_data layout (after report_id and devnumber have been stripped): + [bridge_idx, func_sw, dev_id<<4|len_hi, len_lo, sub_cpl, sub_feat_idx, sub_func_sw, data...] + Returns the sub-device data starting from sub_feat_idx onward. + + Error responses have sub_feat_idx=0xFF: [... sub_cpl, 0xFF, orig_feat_idx, orig_func_sw, error_code] + These return None. + """ + if len(reply_data) < 7: + return None + sub_feat_idx = reply_data[5] + # Error response from sub-device + if sub_feat_idx == 0xFF: + error_code = reply_data[8] if len(reply_data) > 8 else 0 + orig_feat_idx = reply_data[6] if len(reply_data) > 6 else 0 + logger.debug("bridge sub-device error: feat_idx=%d error=0x%02X", orig_feat_idx, error_code) + return None + return reply_data[7:] # response data after sub_cpl, sub_feat_idx, sub_func_sw + + def _record_ping_protocol(self, handle, protocol): + """Record a successful ping's protocol version, including raw Centurion (major, minor).""" + self._protocol = protocol + cent_ver = base._centurion_protocol_versions.get(int(handle)) + if cent_ver: + self._centurion_protocol = cent_ver + def ping(self): """Checks if the device is online and present, returns True of False. Some devices are integral with their receiver but may not be present even if the receiver responds to ping.""" + if self.centurion and self.receiver and not self.handle: + # Centurion child: first check if dongle is reachable + handle = self.receiver.handle + try: + protocol = self.low_level.ping(handle, 0xFF, long_message=True) + except exceptions.NoReceiver: + self.online = False + return False + if protocol: + self._record_ping_protocol(handle, protocol) + # Dongle responded — now check if headset is actually on by probing through bridge. + # Send ROOT.GetFeature(0x0001) to the sub-device via CentPPBridge. + bridge_idx = getattr(self, "_centurion_bridge_index", None) + if bridge_idx is not None: + try: + result = self.centurion_bridge_request(0, 0x00, 0x00, 0x01) + self.online = result is not None and self.present + except Exception: + self.online = False + else: + self.online = False + return self.online long = self.hidpp_long is True or ( self.hidpp_long is None and (self.bluetooth or self._protocol is not None and self._protocol >= 2.0) ) @@ -564,7 +850,7 @@ class Device: protocol = None self.online = protocol is not None and self.present if protocol: - self._protocol = protocol + self._record_ping_protocol(handle, protocol) if logger.isEnabledFor(logging.DEBUG): logger.debug("pinged %s: online %s protocol %s present %s", self.number, self.online, protocol, self.present) return self.online @@ -614,3 +900,8 @@ class Device: def __del__(self): self.close() + + +# Re-export from centurion.py — must be after Device class to avoid circular import +from .centurion import CenturionReceiver # noqa: E402,F401 +from .centurion import create_centurion_receiver # noqa: E402,F401 diff --git a/lib/logitech_receiver/hidpp20.py b/lib/logitech_receiver/hidpp20.py index c966981d..f5813836 100644 --- a/lib/logitech_receiver/hidpp20.py +++ b/lib/logitech_receiver/hidpp20.py @@ -35,10 +35,13 @@ import yaml from solaar.i18n import _ from typing_extensions import Protocol +from . import centurion as _centurion from . import common from . import exceptions from . import hidpp10_constants from . import special_keys +from .centurion_constants import CenturionCoreFeature +from .centurion_constants import resolve_feature from .common import Battery from .common import BatteryLevelApproximation from .common import BatteryStatus @@ -139,6 +142,7 @@ class FeaturesArray(dict): self.supported = True # Actually don't know whether it is supported yet self.device = device self.inverse = {} + self.sub_inverse = {} self.version = {} self.flags = {} self.count = 0 @@ -162,14 +166,105 @@ class FeaturesArray(dict): logger.warning("FEATURE_SET found, but failed to read features count") return False else: - self.count = count[0] + 1 # ROOT feature not included in count self[SupportedFeature.ROOT] = 0 self[SupportedFeature.FEATURE_SET] = fs_index + if getattr(self.device, "centurion", False): + self._check_centurion(fs_index, count) + else: + self.count = count[0] + 1 # ROOT feature not included in count return True else: self.supported = False return False + def _check_centurion(self, fs_index, count_response): + """Enumerate features on a Centurion device (parent + sub-device via CentPPBridge). + + Phase A: Enumerate parent device features via CenturionFeatureSet. + Find the CentPPBridge index (feature ID 0x0003 on Centurion = CentPPBridge). + Phase B: Route through CentPPBridge to discover sub-device features. + Use CenturionFeatureSet bulk query to get all sub-device features. + Store sub-device features keyed by SupportedFeature enum. + """ + # Phase A: Parent features + feature_count = count_response[0] # includes ROOT on Centurion + self.count = feature_count + bridge_index = None + for index in range(feature_count): + if self.inverse.get(index) is not None: + continue # already registered (ROOT=0, FEATURE_SET=fs_index) + response = self.device.request((fs_index << 8) | 0x10, index) + if response is None or len(response) < 3: + continue + # Centurion FeatureSet response: [remaining_count, feat_hi, feat_lo, type, flags] + feat_id = struct.unpack("!H", response[1:3])[0] + feature = resolve_feature(feat_id, centurion=True) + if feature is None: + feature = f"unknown:{feat_id:04X}" + self[feature] = index + self.inverse[index] = feature + if feature is CenturionCoreFeature.CENT_PP_BRIDGE: + bridge_index = index + + if bridge_index is not None: + self.device._centurion_bridge_index = bridge_index + self.device._centurion_sub_features = set() + self.device._centurion_sub_indices = {} + self._discover_sub_device_features(bridge_index) + + def _discover_sub_device_features(self, bridge_index): + """Phase B: Discover sub-device features via CentPPBridge. + + Uses CenturionFeatureSet bulk query (function 1, index 0) routed through + the bridge to get all sub-device features at once. + """ + # First, find the sub-device's FeatureSet index via CenturionRoot (sub_feat_idx=0) + # Query: CenturionRoot.GetFeature(0x0001) to find FeatureSet index on sub-device + fs_id_hi = (SupportedFeature.FEATURE_SET >> 8) & 0xFF + fs_id_lo = SupportedFeature.FEATURE_SET & 0xFF + response = self.device.centurion_bridge_request(0x00, 0x00, fs_id_hi, fs_id_lo) + if response is None or len(response) < 1: + logger.warning("Failed to find FeatureSet on Centurion sub-device") + return + sub_fs_index = response[0] + if sub_fs_index == 0: + logger.warning("Sub-device FeatureSet not found (index=0)") + return + + # Bulk enumerate: CenturionFeatureSet.GetFeatureId(func=1=0x10, start_index=0) + # Response: [count, (feat_hi, feat_lo, type, flags) × count] + response = self.device.centurion_bridge_request(sub_fs_index, 0x10, 0x00) + if response is None or len(response) < 1: + logger.warning("Failed to enumerate sub-device features") + return + + entry_count = response[0] + entries = response[1:] + sub_feat_idx = 0 # sub-device feature indices start at 0 + for i in range(entry_count): + offset = i * 4 + if offset + 2 > len(entries): + break + feat_id = struct.unpack("!H", entries[offset : offset + 2])[0] + try: + feature = SupportedFeature(feat_id) + except ValueError: + feature = f"unknown:{feat_id:04X}" + # Store sub-device index for ALL features (including parent overlaps) + # This enables querying the sub-device's copy of shared features via bridge + self.device._centurion_sub_indices[feature] = sub_feat_idx + # Only store unique sub-device features in dict (skip parent overlaps like ROOT, FEATURE_SET) + # This avoids clobbering parent inverse entries via __setitem__ + if dict.get(self, feature) is None: + dict.__setitem__(self, feature, sub_feat_idx) + self.device._centurion_sub_features.add(feature) + # Always store in sub_inverse for sub-device enumerate/display + self.sub_inverse[sub_feat_idx] = feature + if logger.isEnabledFor(logging.DEBUG): + logger.debug("Centurion sub-device feature: %s at sub-index %d", feature, sub_feat_idx) + sub_feat_idx += 1 + self._sub_feature_count = sub_feat_idx + def get_feature(self, index: int) -> SupportedFeature | None: feature = self.inverse.get(index) if feature is not None: @@ -178,7 +273,14 @@ class FeaturesArray(dict): feature = self.inverse.get(index) if feature is not None: return feature - response = self.device.feature_request(SupportedFeature.FEATURE_SET, 0x10, index) + # On Centurion devices, all features are discovered upfront (parent + sub-device) + if getattr(self.device, "centurion", False): + return None + try: + response = self.device.feature_request(SupportedFeature.FEATURE_SET, 0x10, index) + except exceptions.FeatureCallError: + logger.warning("failed to retrieve feature at index %d", index) + return None if response: data = struct.unpack("!H", response[:2])[0] try: @@ -194,7 +296,14 @@ class FeaturesArray(dict): if self._check(): for index in range(self.count): feature = self.get_feature(index) - yield feature, index + if feature is not None: + yield feature, index + # Also yield sub-device features for Centurion devices + sub_count = getattr(self, "_sub_feature_count", 0) + for sub_idx in range(sub_count): + feature = self.sub_inverse.get(sub_idx) + if feature is not None: + yield feature, sub_idx def get_feature_version(self, feature: NamedInt) -> Optional[int]: if self[feature]: @@ -224,7 +333,10 @@ class FeaturesArray(dict): index = super().get(feature) if index is not None: return index - response = self.device.request(0x0000, struct.pack("!H", feature)) + try: + response = self.device.request(0x0000, struct.pack("!H", feature)) + except exceptions.FeatureCallError: + return None if response: index = response[0] self[feature] = index if index else False @@ -243,7 +355,7 @@ class FeaturesArray(dict): raise ValueError("Don't delete features from FeatureArray") def __len__(self) -> int: - return self.count + return self.count + getattr(self, "_sub_feature_count", 0) __bool__ = __nonzero__ = _check @@ -1575,6 +1687,27 @@ class Hidpp20: fw.append(fw_info) return tuple(fw) + def get_firmware_centurion(self, device): + return _centurion.get_firmware_centurion(device) + + def get_serial_centurion(self, device): + return _centurion.get_serial_centurion(device) + + def get_hardware_info_centurion(self, device): + return _centurion.get_hardware_info_centurion(device) + + def _centurion_sub_device_info_request(self, device, function=0x00, *params): + return _centurion._centurion_sub_device_info_request(device, function, *params) + + def get_firmware_centurion_sub(self, device): + return _centurion.get_firmware_centurion_sub(device) + + def get_serial_centurion_sub(self, device): + return _centurion.get_serial_centurion_sub(device) + + def get_hardware_info_centurion_sub(self, device): + return _centurion.get_hardware_info_centurion_sub(device) + def get_ids(self, device): """Reads a device's ids (unit and model numbers)""" ids = device.feature_request(SupportedFeature.DEVICE_FW_VERSION) @@ -1626,6 +1759,9 @@ class Hidpp20: return name.decode("utf-8") + def get_name_centurion(self, device): + return _centurion.get_name_centurion(device) + def get_friendly_name(self, device: Device): """Reads a device's friendly name. @@ -1670,6 +1806,9 @@ class Hidpp20: except exceptions.FeatureCallError: return SupportedFeature.ADC_MEASUREMENT if SupportedFeature.ADC_MEASUREMENT in device.features else None + def get_battery_centurion(self, device: Device): + return _centurion.get_battery_centurion(device) + def get_battery(self, device, feature): """Return battery information - feature, approximate level, next, charging, voltage or battery feature if there is one but it is not responding or None for no battery feature""" @@ -1901,6 +2040,7 @@ battery_functions = { SupportedFeature.BATTERY_VOLTAGE: Hidpp20.get_battery_voltage, SupportedFeature.UNIFIED_BATTERY: Hidpp20.get_battery_unified, SupportedFeature.ADC_MEASUREMENT: Hidpp20.get_adc_measurement, + SupportedFeature.CENTURION_BATTERY_SOC: Hidpp20.get_battery_centurion, } @@ -1981,6 +2121,9 @@ def decipher_battery_unified(report) -> tuple[SupportedFeature, Battery]: return SupportedFeature.UNIFIED_BATTERY, Battery(discharge if discharge else approx_level, None, status, None) +decipher_battery_centurion = _centurion.decipher_battery_centurion + + def decipher_adc_measurement(report) -> tuple[SupportedFeature, Battery]: # partial implementation - needs mapping to levels adc_voltage, flags = struct.unpack("!HB", report[:3]) @@ -2120,3 +2263,10 @@ class ForceSensingButtonArray(UserDict): def acceptable_current_key(self, index: int, value: int) -> bool: return self[index].acceptable(value) + + +# --- OnboardEQ (0x0636) — re-exported from onboard_eq.py --- +from .onboard_eq import _build_set_eq_payload # noqa: E402, F401 +from .onboard_eq import get_onboard_eq_info # noqa: E402, F401 +from .onboard_eq import get_onboard_eq_params # noqa: E402, F401 +from .onboard_eq import set_onboard_eq_params # noqa: E402, F401 diff --git a/lib/logitech_receiver/hidpp20_constants.py b/lib/logitech_receiver/hidpp20_constants.py index 9f55ce21..be0082c1 100644 --- a/lib/logitech_receiver/hidpp20_constants.py +++ b/lib/logitech_receiver/hidpp20_constants.py @@ -179,6 +179,60 @@ class SupportedFeature(IntEnum): SIDETONE = 0x8300 EQUALIZER = 0x8310 HEADSET_OUT = 0x8320 + # Centurion core + CENTURION_DEVICE_INFO = 0x0100 + CENTURION_DEVICE_NAME = 0x0101 + CENTURION_ROOT = 0x0102 + CENTURION_MEMFAULT = 0x0103 + CENTURION_BATTERY_SOC = 0x0104 + CENTURION_AUTO_SLEEP = 0x0108 + CENTURION_GENERIC_DFU = 0x010A + CENTURION_LED_BRIGHTNESS = 0x0110 + CENTURION_EU_POWER_MODE = 0x0115 + CENTURION_DEVICE_BOOL_STATE = 0x0116 + # Headsets (Centurion-era) + HEADSET_VOLUME = 0x0200 + HEADSET_EQ = 0x0201 + HEADSET_ADVANCED_PARA_EQ = 0x020D + HEADSET_MIC_TEST = 0x020E + HEADSET_EQ_STYLES = 0x0213 + BT_HOST_INFO = 0x0305 + LIGHTSPEED_PAIRING = 0x0309 + BT_GAMING_MODE = 0x030A + HEADSET_RGB_EFFECTS = 0x0600 + HEADSET_MIC_MUTE = 0x0601 + HEADSET_MIC_SNR = 0x0602 + HEADSET_AUDIO_SIDETONE = 0x0604 + HEADSET_HOST_SWITCH = 0x0607 + HEADSET_MIX = 0x0609 + HEADSET_TONES = 0x060B + HEADSET_NOISE_EXPOSURE = 0x060D + HEADSET_AI_NOISE_REDUCTION = 0x060E + HEADSET_MIC_GAIN = 0x0611 + HEADSET_USAGE_TRACKING = 0x0617 + HEADSET_BATTERY_SAVER = 0x0618 + HEADSET_RGB_HOSTMODE = 0x0620 + HEADSET_RGB_ONBOARD_EFFECTS = 0x0621 + HEADSET_RGB_SIGNATURE_EFFECTS = 0x0622 + HEADSET_DO_NOT_DISTURB = 0x0631 + CENTURION_ONBOARD_PROFILES = 0x0634 + HEADSET_RGB_STREAMING = 0x0635 + HEADSET_ONBOARD_EQ = 0x0636 + # Audio mixing / LogiVoice + MIXER_AUDIO = 0x0800 + MIXER_MIC = 0x0801 + LOGIVOICE = 0x0900 + LOGIVOICE_NOISE_REDUCTION = 0x0901 + LOGIVOICE_NOISE_GATE = 0x0902 + LOGIVOICE_COMPRESSOR = 0x0903 + LOGIVOICE_DE_ESSER = 0x0904 + LOGIVOICE_DE_POPPER = 0x0905 + LOGIVOICE_LIMITER = 0x0906 + LOGIVOICE_HIGH_PASS_FILTER = 0x0907 + LOGIVOICE_EQUALIZER = 0x0908 + LOGIVOICE_AINR = 0x0909 + METERING = 0x0B01 + MIC_GAIN_AUTO_MODE = 0x0B02 # Fake features for Solaar internal use MOUSE_GESTURE = 0xFE00 diff --git a/lib/logitech_receiver/listener.py b/lib/logitech_receiver/listener.py index 7de6bb52..4137afd4 100644 --- a/lib/logitech_receiver/listener.py +++ b/lib/logitech_receiver/listener.py @@ -52,6 +52,9 @@ class _ThreadedHandle: else: # if logger.isEnabledFor(logging.DEBUG): # logger.debug("%r opened new handle %d", self, handle) + # If original handle was centurion, register new per-thread handle too + if any(h in base._centurion_handles for h in self._handles): + base._centurion_handles.add(handle) self._local.handle = handle self._handles.append(handle) return handle @@ -145,7 +148,8 @@ class EventsListener(threading.Thread): self.receiver.close() break if n: - n = base.make_notification(*n) + report_id, devnumber, data = n + n = base.make_notification(report_id, devnumber, data) else: n = self._queued_notifications.get() # deliver any queued notifications if n: diff --git a/lib/logitech_receiver/notifications.py b/lib/logitech_receiver/notifications.py index 1cc46248..693e3afd 100644 --- a/lib/logitech_receiver/notifications.py +++ b/lib/logitech_receiver/notifications.py @@ -287,6 +287,9 @@ def _process_feature_notification(device: Device, notification: HIDPPNotificatio else: logger.warning("%s: unknown ADC MEASUREMENT %s", device, notification) + elif feature == SupportedFeature.CENTURION_BATTERY_SOC: + device.set_battery_info(hidpp20.decipher_battery_centurion(notification.data)[1]) + elif feature == SupportedFeature.SOLAR_DASHBOARD: if notification.data[5:9] == b"GOOD": charge, lux, adc = struct.unpack("!BHH", notification.data[:5]) diff --git a/lib/logitech_receiver/onboard_eq.py b/lib/logitech_receiver/onboard_eq.py new file mode 100644 index 00000000..107b11f6 --- /dev/null +++ b/lib/logitech_receiver/onboard_eq.py @@ -0,0 +1,186 @@ +## Copyright (C) 2012-2013 Daniel Pavel +## Copyright (C) 2014-2024 Solaar Contributors https://pwr-solaar.github.io/Solaar/ +## +## This program is free software; you can redistribute it and/or modify +## it under the terms of the GNU General Public License as published by +## the Free Software Foundation; either version 2 of the License, or +## (at your option) any later version. +## +## This program is distributed in the hope that it will be useful, +## but WITHOUT ANY WARRANTY; without even the implied warranty of +## MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +## GNU General Public License for more details. +## +## You should have received a copy of the GNU General Public License along +## with this program; if not, write to the Free Software Foundation, Inc., +## 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. + +"""OnboardEQ (0x0636) biquad coefficient math and payload builders. + +Pure computation — no device or transport dependencies beyond feature_request(). +""" + +from __future__ import annotations + +import math +import struct + +from .hidpp20_constants import SupportedFeature + +# Mystery bytes observed in every LGHUB pcap EQ write between band params +# and coefficient header. Purpose not fully understood — possibly a null-band +# terminator for DSPs that support >5 bands (advanced 10-band mode). +# First byte matches band_count; bytes 2-3 look like LE16 coeff blob size. +# Hardcoded from pcap for initial bring-up; revisit once device-tested. +_EQ_MYSTERY_BYTES = b"\x05\x5a\xe3\x00" + + +def _peaking_eq_biquad(freq_hz, gain_db, Q, sample_rate=48000.0): + """Compute peaking EQ biquad coefficients (Audio EQ Cookbook). + + Returns (b0/a0, b1/a0, b2/a0, a1/a0, a2/a0) normalised coefficients. + """ + A = 10.0 ** (gain_db / 40.0) + w0 = 2.0 * math.pi * freq_hz / sample_rate + cos_w0 = math.cos(w0) + alpha = math.sin(w0) / (2.0 * Q) + a0 = 1.0 + alpha / A + return ( + (1.0 + alpha * A) / a0, + (-2.0 * cos_w0) / a0, + (1.0 - alpha * A) / a0, + (-2.0 * cos_w0) / a0, + (1.0 - alpha / A) / a0, + ) + + +def _quantize_coeffs(b0, b1, b2, a1, a2): + """Quantize biquad coefficients to mixed Q1.31 / Q2.30 fixed-point. + + b0, b2, a2 use Q1.31 (x 2^31); b1, a1 use Q2.30 (x 2^30). + Values are truncated to 24-bit precision (low byte zeroed) matching + the device DSP's internal format. + Returns list of 10 uint16 values (5 coefficients x 2 LE words each, + high word first). + """ + scales = [2**31, 2**30, 2**31, 2**30, 2**31] # b0, b1, b2, a1, a2 + words = [] + for val, scale in zip([b0, b1, b2, a1, a2], scales): + q = int(round(val * scale)) + q = max(-(1 << 31), min((1 << 31) - 1, q)) + q = q & 0xFFFFFF00 # 24-bit precision (low byte always zero) + words.append((q >> 16) & 0xFFFF) # high word + words.append(q & 0xFFFF) # low word + return words + + +def _build_coeff_section(bands, sample_rate, section_type=1): + """Build one coefficient section for a DSP processing block. + + Returns bytes: 4-byte section header + coefficient data as LE uint16 words. + Section header: [type, 0x00, count_lo, count_hi]. + + Coefficients are normalized by a rescale factor to prevent Q1.31 overflow. + Only feedforward coefficients (b0, b1, b2) are divided by rescale; feedback + coefficients (a1, a2) are left unchanged. The DSP multiplies the output by + rescale to restore correct gain. + """ + _HEADROOM = 1.19 # 19% headroom margin (matches LGHUB) + num_bands = len(bands) + all_words = [num_bands] # first uint16 = num_bands + + # First pass: compute raw biquad coefficients for all bands + raw_coeffs = [] + for freq, gain, Q in bands: + raw_coeffs.append(_peaking_eq_biquad(freq, gain, max(Q, 0.1), sample_rate)) + + # Compute rescale: ensure max |b0| fits in Q1.31 with headroom + max_b0 = max(abs(c[0]) for c in raw_coeffs) + rescale = max(1.0, max_b0) * _HEADROOM + + # Second pass: normalize b-coefficients and quantize + for b0, b1, b2, a1, a2 in raw_coeffs: + all_words.extend(_quantize_coeffs(b0 / rescale, b1 / rescale, b2 / rescale, a1, a2)) + + # Rescale factor as Q6.26, 24-bit precision + rs = int(round(rescale * (1 << 26))) + rs = max(-(1 << 31), min((1 << 31) - 1, rs)) & 0xFFFFFF00 + all_words.append((rs >> 16) & 0xFFFF) + all_words.append(rs & 0xFFFF) + + coeff_count = num_bands * 10 + 3 # num_bands word + 10 per band + 2 rescale words + hdr = bytes([section_type, 0x00, coeff_count & 0xFF, (coeff_count >> 8) & 0xFF]) + data = struct.pack(f"<{len(all_words)}H", *all_words) + return hdr + data + + +def _build_eq_coeffs_payload(bands): + """Build the full EQCoeffs wire payload for SetEQParameters. + + Two coefficient sections: type=1 (48 kHz playback) and type=2 (16 kHz mic). + Returns bytes: 7-byte header + sections (no trailing padding). + """ + section_count = 2 + header = bytes([0x03, 0x0E, 0x00, section_count, 0x00, 0x00, 0x00]) + sections = _build_coeff_section(bands, 48000.0, section_type=1) + sections += _build_coeff_section(bands, 16000.0, section_type=2) + return header + sections + + +def _build_set_eq_payload(slot, bands): + """Build complete SetEQParameters payload: band params + biquad coefficients. + + bands: list of (freq_hz, gain_db, Q) tuples. + Returns bytes ready to send as sub-device params. + """ + params = bytes([slot, len(bands)]) + for freq, gain, Q in bands: + params += struct.pack(">H", freq) + bytes([gain & 0xFF, Q & 0xFF]) + params += _EQ_MYSTERY_BYTES + params += _build_eq_coeffs_payload(bands) + return params + + +def get_onboard_eq_info(device): + """Query HEADSET_ONBOARD_EQ GetEQInfos (function 0). + + Returns (has_hw_eq, num_bands) or None. + """ + result = device.feature_request(SupportedFeature.HEADSET_ONBOARD_EQ, 0x00) + if result is None or len(result) < 5: + return None + has_hw_eq = bool(result[0] & 0x80) + num_bands = result[4] + return (has_hw_eq, num_bands) + + +def get_onboard_eq_params(device, slot=0x00): + """Query HEADSET_ONBOARD_EQ GetEQParameters (function 0x10). + + Returns list of (freq_hz, gain_db, q) tuples, or None. + """ + result = device.feature_request(SupportedFeature.HEADSET_ONBOARD_EQ, 0x10, slot) + if result is None or len(result) < 2: + return None + band_count = result[1] + bands = [] + offset = 2 + for _i in range(band_count): + if offset + 4 > len(result): + break + freq_hz = struct.unpack(">H", result[offset : offset + 2])[0] + gain_db = struct.unpack("b", bytes([result[offset + 2]]))[0] # signed + q = result[offset + 3] + bands.append((freq_hz, gain_db, q)) + offset += 4 + return bands + + +def set_onboard_eq_params(device, bands, slot=0x00): + """Send HEADSET_ONBOARD_EQ SetEQParameters (function 0x20). + + bands: list of (freq_hz, gain_db, Q) tuples. + Returns response or None. + """ + payload = _build_set_eq_payload(slot, bands) + return device.feature_request(SupportedFeature.HEADSET_ONBOARD_EQ, 0x20, payload) diff --git a/lib/logitech_receiver/settings.py b/lib/logitech_receiver/settings.py index 25257910..a27d7b06 100644 --- a/lib/logitech_receiver/settings.py +++ b/lib/logitech_receiver/settings.py @@ -27,6 +27,7 @@ from solaar.i18n import _ from . import common from . import hidpp20_constants from . import settings_validator +from .centurion_constants import CenturionCoreFeature from .common import NamedInt logger = logging.getLogger(__name__) @@ -42,6 +43,7 @@ class Kind(IntEnum): MAP_CHOICE = 0x0A MULTIPLE_TOGGLE = 0x10 PACKED_RANGE = 0x20 + GRAPHIC_EQ = 0x21 MULTIPLE_RANGE = 0x40 HETERO = 0x80 MAP_RANGE = 0x102 @@ -627,7 +629,7 @@ class FeatureRW: read_prefix=b"", no_reply=False, ): - assert isinstance(feature, hidpp20_constants.SupportedFeature) + assert isinstance(feature, (hidpp20_constants.SupportedFeature, CenturionCoreFeature)) self.feature = feature self.read_fnid = read_fnid self.write_fnid = write_fnid @@ -664,7 +666,7 @@ class FeatureRWMap(FeatureRW): key_byte_count=default_key_byte_count, no_reply=False, ): - assert isinstance(feature, hidpp20_constants.SupportedFeature) + assert isinstance(feature, (hidpp20_constants.SupportedFeature, CenturionCoreFeature)) self.feature = feature self.read_fnid = read_fnid self.write_fnid = write_fnid diff --git a/lib/logitech_receiver/settings_templates.py b/lib/logitech_receiver/settings_templates.py index a2dff87e..ddab8b0a 100644 --- a/lib/logitech_receiver/settings_templates.py +++ b/lib/logitech_receiver/settings_templates.py @@ -1584,6 +1584,190 @@ class ADCPower(settings.Setting): validator_options = {"byte_count": 1} +class HeadsetEcoMode(settings.Setting): + name = "headset-eco-mode" + label = _("Eco Mode") + description = _("Battery saver mode.") + feature = _F.HEADSET_BATTERY_SAVER + validator_class = settings_validator.BooleanValidator + + +class HeadsetDoNotDisturb(settings.Setting): + name = "headset-do-not-disturb" + label = _("Do Not Disturb") + description = _("Suppress notification sounds.") + feature = _F.HEADSET_DO_NOT_DISTURB + validator_class = settings_validator.BooleanValidator + + +class HeadsetMicMute(settings.Setting): + name = "headset-mic-mute" + label = _("Mic Mute") + description = _("Mute the microphone.") + feature = _F.HEADSET_MIC_MUTE + validator_class = settings_validator.BooleanValidator + + +class HeadsetMicSNR(settings.Setting): + name = "headset-mic-snr" + label = _("Mic SNR") + description = _("Microphone signal-to-noise ratio enhancement.") + feature = _F.HEADSET_MIC_SNR + validator_class = settings_validator.BooleanValidator + + +class HeadsetAINR(settings.Setting): + name = "headset-ai-nr" + label = _("AI Noise Reduction") + description = _("Enable AI noise reduction.") + feature = _F.HEADSET_AI_NOISE_REDUCTION + validator_class = settings_validator.BooleanValidator + + +class HeadsetAINRLevel(settings.Setting): + name = "headset-ai-nr-level" + label = _("AI Noise Reduction Level") + description = _("AI noise reduction intensity.") + feature = _F.HEADSET_AI_NOISE_REDUCTION + rw_options = {"read_fnid": 0x20, "write_fnid": 0x30} + validator_class = settings_validator.ChoicesValidator + choices_universe = common.NamedInts(Off=0, Low=1, Medium=2, High=3) + + +class HeadsetSidetone(settings.Setting): + name = "headset-sidetone" + label = _("Headset Sidetone") + description = _("Sidetone level (0 = off, 100 = max).") + feature = _F.HEADSET_AUDIO_SIDETONE + rw_options = {"read_fnid": 0x00, "write_fnid": 0x10} + validator_class = settings_validator.RangeValidator + min_value = 0 + max_value = 100 + + @classmethod + def build(cls, device): + # Version <= 1: GetSidetone returns [mic_count, mic_id, level]; SetSidetone takes [mic_id, level] + # Version > 1: GetSidetone returns [mic_count, mic_id, reserved, level]; SetSidetone takes [mic_id, 0xFF, level] + version = device.features.get_feature_version(cls.feature) or 0 + if version > 1: + skip, prefix = 3, b"\x01\xff" + else: + skip, prefix = 2, b"\x01" + rw = settings.FeatureRW(cls.feature, **cls.rw_options) + validator = cls.validator_class.build(cls, device, read_skip_byte_count=skip, write_prefix_bytes=prefix) + if validator: + return cls(device, rw, validator) + + +class HeadsetMicGain(settings.Setting): + name = "headset-mic-gain" + label = _("Mic Gain") + description = _("Microphone gain level.") + feature = _F.HEADSET_MIC_GAIN + rw_options = {"read_fnid": 0x10, "write_fnid": 0x20} + validator_class = settings_validator.RangeValidator + min_value = -128 + max_value = 127 + validator_options = {"byte_count": 1, "signed": True} + + +class HeadsetMixBalance(settings.Setting): + name = "headset-mix-balance" + label = _("Audio Mix Balance") + description = _("Balance between game and chat audio.") + feature = _F.HEADSET_MIX + validator_class = settings_validator.RangeValidator + min_value = 0 + max_value = 255 + validator_options = {"byte_count": 1} + + +class HeadsetAutoSleep(settings.Setting): + name = "headset-auto-sleep" + label = _("Auto Sleep Timeout") + description = _("Idle time in minutes before the headset enters sleep mode (0 = disabled).") + feature = _F.CENTURION_AUTO_SLEEP + rw_options = {"read_fnid": 0x00, "write_fnid": 0x10} + validator_class = settings_validator.RangeValidator + min_value = 0 + max_value = 255 + validator_options = {"byte_count": 1} + + +class HeadsetOnboardEQ(settings.RangeFieldSetting): + name = "headset-onboard-eq" + label = _("Headset Equalizer") + description = _("Set equalizer levels.") + feature = _F.HEADSET_ONBOARD_EQ + rw_options = {"read_fnid": 0x10, "write_fnid": 0x20, "read_prefix": b"\x00"} + keys_universe = [] + + class validator_class(settings_validator.PackedRangeValidator): + kind = settings.Kind.GRAPHIC_EQ + + @classmethod + def build(cls, setting_class, device): + info = hidpp20.get_onboard_eq_info(device) + if not info: + return None + _has_hw_eq, num_bands = info + bands = hidpp20.get_onboard_eq_params(device, slot=0x00) + if not bands or len(bands) != num_bands: + return None + keys = common.NamedInts() + for i, (freq, _gain, _q) in enumerate(bands): + keys[i] = str(freq) + _("Hz") + v = cls(keys, min_value=-12, max_value=12, count=num_bands, byte_count=1) + v._band_freqs = [freq for freq, _g, _q in bands] + v._band_qs = [q for _f, _g, q in bands] + return v + + def validate_read(self, reply_bytes): + if reply_bytes is None or len(reply_bytes) < 2: + return {} + band_count = reply_bytes[1] + result = {} + offset = 2 + for i in range(band_count): + if offset + 4 > len(reply_bytes): + break + freq = struct.unpack(">H", reply_bytes[offset : offset + 2])[0] + gain = struct.unpack("b", bytes([reply_bytes[offset + 2]]))[0] + q = reply_bytes[offset + 3] + result[i] = gain + # Update stored freq/Q arrays if they exist + if hasattr(self, "_band_freqs") and i < len(self._band_freqs): + self._band_freqs[i] = freq + if hasattr(self, "_band_qs") and i < len(self._band_qs): + self._band_qs[i] = q + offset += 4 + return result + + def prepare_write(self, new_values): + if not hasattr(self, "_band_freqs") or not hasattr(self, "_band_qs"): + return None + bands = [] + for i in range(self.count): + freq = self._band_freqs[i] if i < len(self._band_freqs) else 1000 + q = self._band_qs[i] if i < len(self._band_qs) else 10 + gain = new_values.get(i, 0) + bands.append((freq, gain, q)) + self._pending_bands = bands # stash for persist step + return hidpp20._build_set_eq_payload(0x00, bands) + + def write(self, map, save=True): + result = super().write(map, save) + # Also persist to device flash (slot 0x80) so EQ survives power cycle + if result is not None and hasattr(self._validator, "_pending_bands"): + bands = self._validator._pending_bands + del self._validator._pending_bands + try: + self._device.feature_request(_F.HEADSET_ONBOARD_EQ, 0x20, hidpp20._build_set_eq_payload(0x80, bands)) + except Exception: + logger.warning("HeadsetOnboardEQ: failed to persist EQ to slot 0x80") + return result + + class BrightnessControl(settings.Setting): name = "brightness_control" label = _("Brightness Control") @@ -2063,6 +2247,17 @@ SETTINGS: list[settings.Setting] = [ Sidetone, Equalizer, ADCPower, + HeadsetEcoMode, + HeadsetDoNotDisturb, + HeadsetMicMute, + HeadsetMicSNR, + HeadsetAINR, + HeadsetAINRLevel, + HeadsetSidetone, + HeadsetMicGain, + HeadsetMixBalance, + HeadsetAutoSleep, + HeadsetOnboardEQ, ] diff --git a/lib/logitech_receiver/settings_validator.py b/lib/logitech_receiver/settings_validator.py index ddc164ff..d9e3f278 100644 --- a/lib/logitech_receiver/settings_validator.py +++ b/lib/logitech_receiver/settings_validator.py @@ -531,12 +531,13 @@ class RangeValidator(Validator): kwargs["max_value"] = setting_class.max_value return cls(**kwargs) - def __init__(self, min_value=0, max_value=255, byte_count=1, read_skip_byte_count=0, write_prefix_bytes=b""): + def __init__(self, min_value=0, max_value=255, byte_count=1, read_skip_byte_count=0, write_prefix_bytes=b"", signed=False): assert max_value > min_value self.min_value = min_value self.max_value = max_value self.read_skip_byte_count = read_skip_byte_count self.write_prefix_bytes = write_prefix_bytes + self._signed = signed self.needs_current_value = True # read and check before write (needed for ADC power and probably a good idea anyway) self._byte_count = math.ceil(math.log(max_value + 1, 256)) if byte_count: @@ -545,7 +546,9 @@ class RangeValidator(Validator): assert self._byte_count < 8 def validate_read(self, reply_bytes): - reply_value = common.bytes2int(reply_bytes[self.read_skip_byte_count : self.read_skip_byte_count + self._byte_count]) + reply_value = common.bytes2int( + reply_bytes[self.read_skip_byte_count : self.read_skip_byte_count + self._byte_count], signed=self._signed + ) assert reply_value >= self.min_value, f"{self.__class__.__name__}: failed to validate read value {reply_value:02X}" assert reply_value <= self.max_value, f"{self.__class__.__name__}: failed to validate read value {reply_value:02X}" return reply_value @@ -554,7 +557,7 @@ class RangeValidator(Validator): if new_value < self.min_value or new_value > self.max_value: raise ValueError(f"invalid choice {new_value!r}") current_value = self.validate_read(current_value) if current_value is not None else None - to_write = self.write_prefix_bytes + common.int2bytes(new_value, self._byte_count) + to_write = self.write_prefix_bytes + common.int2bytes(new_value, self._byte_count, signed=self._signed) # current value is known and same as value to be written return None to signal not to write it return None if current_value is not None and current_value == new_value else to_write diff --git a/lib/solaar/cli/__init__.py b/lib/solaar/cli/__init__.py index a6b6818c..99312a9b 100644 --- a/lib/solaar/cli/__init__.py +++ b/lib/solaar/cli/__init__.py @@ -128,7 +128,14 @@ def _receivers_and_devices(dev_path=None): continue try: if dev_info.isDevice: - d = device.create_device(base, dev_info) + if getattr(dev_info, "centurion", False): + d = device.create_centurion_receiver(base, dev_info) + if d is not None: + d.notify_devices() + else: + d = device.create_device(base, dev_info) + else: + d = device.create_device(base, dev_info) else: d = receiver.create_receiver(base, dev_info) diff --git a/lib/solaar/cli/show.py b/lib/solaar/cli/show.py index 79f9bfb9..d1a1f84c 100644 --- a/lib/solaar/cli/show.py +++ b/lib/solaar/cli/show.py @@ -25,6 +25,7 @@ from logitech_receiver import settings_templates from logitech_receiver.common import LOGITECH_VENDOR_ID from logitech_receiver.common import NamedInt from logitech_receiver.common import strhex +from logitech_receiver.device import CenturionReceiver from logitech_receiver.hidpp20_constants import SupportedFeature from solaar import NAME @@ -35,36 +36,80 @@ _hidpp20 = hidpp20.Hidpp20() def _print_receiver(receiver): + is_centurion = isinstance(receiver, CenturionReceiver) paired_count = receiver.count() print(receiver.name) print(" Device path :", receiver.path) print(f" USB id : {LOGITECH_VENDOR_ID:04x}:{receiver.product_id}") - print(" Serial :", receiver.serial) - pending = hidpp10.get_configuration_pending_flags(receiver) - if pending: - print(f" C Pending : {pending:02x}") + if is_centurion: + print(" Protocol : Centurion") + if receiver.serial: + print(" Serial :", receiver.serial) + if not is_centurion: + pending = hidpp10.get_configuration_pending_flags(receiver) + if pending: + print(f" C Pending : {pending:02x}") if receiver.firmware: for f in receiver.firmware: print(" %-11s: %s" % (f.kind, f.version)) - print(" Has", paired_count, f"paired device(s) out of a maximum of {int(receiver.max_devices)}.") - if receiver.remaining_pairings() and receiver.remaining_pairings() >= 0: - print(f" Has {int(receiver.remaining_pairings())} successful pairing(s) remaining.") + if is_centurion: + print(" Has", paired_count, f"device(s) out of a maximum of {int(receiver.max_devices)}.") + else: + print(" Has", paired_count, f"paired device(s) out of a maximum of {int(receiver.max_devices)}.") + if receiver.remaining_pairings() and receiver.remaining_pairings() >= 0: + print(f" Has {int(receiver.remaining_pairings())} successful pairing(s) remaining.") - notification_flags = _hidpp10.get_notification_flags(receiver) - if notification_flags is not None: - if notification_flags: - notification_names = hidpp10_constants.NotificationFlag.flag_names(notification_flags) - print(f" Notifications: {', '.join(notification_names)} (0x{notification_flags.value:06X})") + if is_centurion: + _print_centurion_dongle_features(receiver) + else: + notification_flags = _hidpp10.get_notification_flags(receiver) + if notification_flags is not None: + if notification_flags: + notification_names = hidpp10_constants.NotificationFlag.flag_names(notification_flags) + print(f" Notifications: {', '.join(notification_names)} (0x{notification_flags.value:06X})") + else: + print(" Notifications: (none)") + + activity = receiver.read_register(hidpp10_constants.Registers.DEVICES_ACTIVITY) + if activity: + activity = [(d, ord(activity[d - 1 : d])) for d in range(1, receiver.max_devices)] + activity_text = ", ".join(f"{int(d)}={int(a)}" for d, a in activity if a > 0) + print(" Device activity counters:", activity_text or "(empty)") + + +def _print_centurion_dongle_features(receiver): + """Print dongle-level features, probed independently on the dongle hardware.""" + features = receiver.dongle_features + if not features: + return + print(f" Supports {len(features)} dongle features:") + for feature, feat_id, index in features: + display_name = "CENTPP BRIDGE" if feat_id == 0x0003 else feature + feat_bytes = feat_id.to_bytes(2, byteorder="big") + try: + flags_resp = receiver.request(0x0000, feat_bytes[0], feat_bytes[1]) + except Exception: + flags_resp = None + if flags_resp is not None and len(flags_resp) >= 2: + flags = flags_resp[1] + flag_names = common.flag_names(hidpp20_constants.FeatureFlag, flags) + print(" %2d: %-22s {%04X} %s " % (index, display_name, feat_id, ", ".join(flag_names))) else: - print(" Notifications: (none)") - - activity = receiver.read_register(hidpp10_constants.Registers.DEVICES_ACTIVITY) - if activity: - activity = [(d, ord(activity[d - 1 : d])) for d in range(1, receiver.max_devices)] - activity_text = ", ".join(f"{int(d)}={int(a)}" for d, a in activity if a > 0) - print(" Device activity counters:", activity_text or "(empty)") + print(" %2d: %-22s {%04X}" % (index, display_name, feat_id)) + if feature == SupportedFeature.CENTURION_DEVICE_INFO: + fw_list = _hidpp20.get_firmware_centurion(receiver) + serial = _hidpp20.get_serial_centurion(receiver) + hw_info = _hidpp20.get_hardware_info_centurion(receiver) + if fw_list: + for fw in fw_list: + print(f" Firmware: {(str(fw.kind) + ' ' + fw.name).strip()} {fw.version}") + if serial and serial.strip() and serial.strip().isprintable(): + print(f" Serial: {serial}") + if hw_info: + model_id, hw_rev, product_id = hw_info + print(f" Hardware: model {model_id}" f" rev {hw_rev} product {product_id:04X}") def _battery_text(level) -> str: @@ -91,9 +136,11 @@ def _battery_line(dev): def _print_device(dev, num=None): assert dev is not None + is_centurion = getattr(dev, "centurion", False) + is_centurion_child = is_centurion and isinstance(getattr(dev, "receiver", None), CenturionReceiver) # try to ping the device to see if it actually exists and to wake it up try: - dev.ping() + online = dev.ping() except exceptions.NoSuchDevice: print(f" {num}: Device not found" or dev.number) return @@ -102,18 +149,29 @@ def _print_device(dev, num=None): print(f" {int(num or dev.number)}: {dev.name}") else: print(f"{dev.name}") - print(" Device path :", dev.path) - if dev.wpid: + + if not online: + print(" Device is offline.") + return + # Centurion child has no separate hidraw path — show receiver's path + device_path = dev.path or (dev.receiver.path if is_centurion_child else None) + print(" Device path :", device_path) + if dev.wpid and not is_centurion_child: print(f" WPID : {dev.wpid}") if dev.product_id: print(f" USB id : {LOGITECH_VENDOR_ID:04x}:{dev.product_id}") print(" Codename :", dev.codename) print(" Kind :", dev.kind) if dev.protocol: - print(f" Protocol : HID++ {dev.protocol:1.1f}") + proto_name = "Centurion" if is_centurion else "HID++" + cent_proto = getattr(dev, "_centurion_protocol", None) + if cent_proto: + print(f" Protocol : {proto_name} {cent_proto[0]}.{cent_proto[1]}") + else: + print(f" Protocol : {proto_name} {dev.protocol:1.1f}") else: print(" Protocol : unknown (device is offline)") - if dev.polling_rate: + if not is_centurion and dev.polling_rate: print(" Report Rate :", dev.polling_rate) print(" Serial number:", dev.serial) if dev.modelId: @@ -127,7 +185,8 @@ def _print_device(dev, num=None): if dev.power_switch_location: print(f" The power switch is located on the {dev.power_switch_location}.") - if dev.online: + # Skip HID++ 1.0 register reads for centurion devices — they don't support these + if dev.online and not is_centurion: notification_flags = _hidpp10.get_notification_flags(dev) if notification_flags is not None: if notification_flags: @@ -144,25 +203,56 @@ def _print_device(dev, num=None): print(" Features: (none)") if dev.online and dev.features: - print(f" Supports {len(dev.features)} HID++ 2.0 features:") + is_centurion = getattr(dev, "centurion", False) + parent_count = dev.features.count + sub_count = getattr(dev.features, "_sub_feature_count", 0) + # For centurion child devices, dongle features are shown on the receiver — + # only show sub-device (headset) features here. + if is_centurion_child and sub_count > 0: + print(f" Supports {sub_count} HID++ 2.0 features:") + elif is_centurion and sub_count > 0: + print(f" Supports {parent_count} dongle + {sub_count} headset features:") + else: + print(f" Supports {len(dev.features)} HID++ 2.0 features:") dev_settings = [] settings_templates.check_feature_settings(dev, dev_settings) + feature_num = 0 + in_sub_device = False for feature, index in dev.features.enumerate(): + if is_centurion and not in_sub_device and feature_num >= parent_count: + in_sub_device = True + if not is_centurion_child: + print(" Headset (via CentPPBridge):") + feature_num += 1 + # For centurion child, skip dongle features (already shown on the receiver) + if is_centurion_child and not in_sub_device: + continue if isinstance(feature, str): feature_bytes = bytes.fromhex(feature[-4:]) else: feature_bytes = feature.to_bytes(2, byteorder="little") feature_int = int.from_bytes(feature_bytes, byteorder="little") - try: - flags = dev.request(0x0000, feature_bytes) - except Exception: - print(" %2d: %-22s {%04X} - can't retrieve" % (index, feature, feature_int)) - continue - flags = 0 if flags is None else ord(flags[1:2]) - flags = common.flag_names(hidpp20_constants.FeatureFlag, flags) - version = dev.features.get_feature_version(feature_int) - version = version if version else 0 - print(" %2d: %-22s {%04X} V%s %s " % (index, feature, feature_int, version, ", ".join(flags))) + display_name = feature + if is_centurion_child and in_sub_device: + # Use cached version — skip slow bridge ROOT queries + version = dev.features.get_feature_version(feature_int) or 0 + print(" %2d: %-22s {%04X} V%s" % (index, display_name, feature_int, version)) + else: + try: + flags = dev.request(0x0000, feature_bytes) + except Exception: + flags = None + if flags is not None: + flags = ord(flags[1:2]) + flag_names = common.flag_names(hidpp20_constants.FeatureFlag, flags) + version = dev.features.get_feature_version(feature_int) + version = version if version else 0 + print( + " %2d: %-22s {%04X} V%s %s " + % (index, display_name, feature_int, version, ", ".join(flag_names)) + ) + else: + print(" %2d: %-22s {%04X}" % (index, display_name, feature_int)) if feature == SupportedFeature.HIRES_WHEEL: wheel = _hidpp20.get_hires_wheel(dev) if wheel: @@ -230,7 +320,25 @@ def _print_device(dev, num=None): print(f" Kind: {_hidpp20.get_kind(dev)}") elif feature == SupportedFeature.DEVICE_FRIENDLY_NAME: print(f" Friendly Name: {_hidpp20.get_friendly_name(dev)}") - elif feature == SupportedFeature.DEVICE_FW_VERSION: + elif feature == SupportedFeature.CENTURION_DEVICE_INFO: + if in_sub_device: + # Use cached device properties to avoid redundant bridge requests + fw_list = dev.firmware + serial = dev.serial + hw_info = _hidpp20.get_hardware_info_centurion_sub(dev) + else: + fw_list = _hidpp20.get_firmware_centurion(dev) + serial = _hidpp20.get_serial_centurion(dev) + hw_info = _hidpp20.get_hardware_info_centurion(dev) + if fw_list: + for fw in fw_list: + print(f" Firmware: {(str(fw.kind) + ' ' + fw.name).strip()} {fw.version}") + if serial and serial.strip() and serial.strip().isprintable(): + print(f" Serial: {serial}") + if hw_info: + model_id, hw_rev, product_id = hw_info + print(f" Hardware: model {model_id}" f" rev {hw_rev} product {product_id:04X}") + elif isinstance(feature, SupportedFeature) and feature == SupportedFeature.DEVICE_FW_VERSION: for fw in _hidpp20.get_firmware(dev): extras = strhex(fw.extras) if fw.extras else "" print(f" Firmware: {fw.kind} {fw.name} {fw.version} {extras}") @@ -251,6 +359,10 @@ def _print_device(dev, num=None): else: mode = "On-Board" print(f" Device Mode: {mode}") + elif feature == SupportedFeature.HEADSET_ONBOARD_EQ: + bands = hidpp20.get_onboard_eq_params(dev) + if bands: + print(f" EQ: {', '.join(f'{f}Hz:{g:+d}dB' for f, g, _q in bands)}") elif hidpp20.battery_functions.get(feature, None): print("", end=" ") _battery_line(dev) @@ -324,7 +436,7 @@ def run(devices, args, find_receiver, find_device): if device_name == "all": for d in devices: - if isinstance(d, receiver.Receiver): + if isinstance(d, (receiver.Receiver, CenturionReceiver)): _print_receiver(d) count = d.count() if count: @@ -336,8 +448,8 @@ def run(devices, args, find_receiver, find_device): break print("") else: - print("") _print_device(d) + print("") return dev = find_receiver(devices, device_name) diff --git a/lib/solaar/listener.py b/lib/solaar/listener.py index 74d2cd89..4aee6700 100644 --- a/lib/solaar/listener.py +++ b/lib/solaar/listener.py @@ -115,7 +115,6 @@ class SolaarListener(listener.EventsListener): reason or "", ) else: - device.ping() logger.info( "status_changed %r: %s %s (%X) %s", device, @@ -149,6 +148,12 @@ class SolaarListener(listener.EventsListener): def _notifications_handler(self, n): assert self.receiver if n.devnumber == 0xFF: + # For CenturionReceiver, intercept bridge notifications and dispatch to child device + from logitech_receiver.device import CenturionReceiver + + if isinstance(self.receiver, CenturionReceiver): + self._handle_centurion_notification(n) + return # a receiver notification notifications.process(self.receiver, n) return @@ -228,6 +233,102 @@ class SolaarListener(listener.EventsListener): elif dev.online is None: dev.ping() + def _handle_centurion_notification(self, n): + """Handle notifications from a CenturionReceiver dongle. + + Bridge events have sub_id == bridge_index. The event function number + is in bits 7-4 of n.address: + - Function 0: ConnectionStateChangedEvent — sub-device connect/disconnect + - Function 1: MessageEvent — wrapped sub-device HID++ notification + + ConnectionStateChangedEvent payload (same format as getConnectionInfo): + n.data[0]: high nibble = connection type, low nibble = len_hi + n.data[1]: len_lo + n.data[2+]: sub-device descriptors (if any) + Empty sub-device list (length=0) means disconnected. + + MessageEvent data layout: + n.data[0:2] = dev_id<<4|len_hi, len_lo + n.data[2] = sub_cpl (0x00 for both responses and notifications) + n.data[3] = sub_feat_idx + n.data[4] = sub_func_sw (sw_id=0 for unsolicited notifications) + n.data[5:] = payload + """ + child = self.receiver._devices.get(1) + if not child: + if logger.isEnabledFor(logging.DEBUG): + logger.debug("CenturionReceiver: notification ignored (no child device): %s", n) + return + + bridge_idx = getattr(child, "_centurion_bridge_index", None) + if bridge_idx is None or n.sub_id != bridge_idx: + if logger.isEnabledFor(logging.DEBUG): + logger.debug( + "CenturionReceiver: non-bridge notification sub_id=%d addr=0x%02X data=%s", + n.sub_id, + n.address, + n.data[:12].hex() if n.data else "", + ) + return + + event_func = (n.address >> 4) & 0x0F + + if event_func == 0: + # ConnectionStateChangedEvent — parse sub-device list length + if len(n.data) < 2: + return + data_len = ((n.data[0] & 0x0F) << 8) | n.data[1] + if data_len > 0 and not child.online: + if logger.isEnabledFor(logging.INFO): + logger.info("CenturionReceiver: headset connected (ConnectionStateChangedEvent, len=%d)", data_len) + child.changed(active=True) + self._status_changed(child) + elif data_len == 0 and child.online: + if logger.isEnabledFor(logging.INFO): + logger.info("CenturionReceiver: headset disconnected (ConnectionStateChangedEvent, len=0)") + child.changed(active=False) + self._status_changed(child) + return + + if event_func == 1: + # MessageEvent — unwrap sub-device notification + # n.data layout: [dev_id<<4|len_hi, len_lo, sub_cpl, sub_feat_idx, sub_func_sw, payload...] + if len(n.data) < 5: + return + # A MessageEvent from the headset proves it's online. If we missed the + # ConnectionStateChangedEvent (e.g. cold-start power-on), bring it online now. + if not child.online: + if logger.isEnabledFor(logging.INFO): + logger.info("CenturionReceiver: headset online (MessageEvent received while offline)") + child.changed(active=True) + self._status_changed(child) + sub_feat_idx = n.data[3] + sub_func_sw = n.data[4] + payload = n.data[5:] + if logger.isEnabledFor(logging.DEBUG): + logger.debug( + "CenturionReceiver: bridge MessageEvent sub_feat=%d func=0x%02X payload=%s -> child %s", + sub_feat_idx, + sub_func_sw, + payload[:8].hex() if payload else "", + child, + ) + # Create synthetic notification and dispatch directly to feature processing. + # Sub-device features use 0x100 offset in FeaturesArray.inverse. + synthetic = base.HIDPPNotification(n.report_id, child.number, sub_feat_idx + 0x100, sub_func_sw, payload) + child.online = True + if child.features: + notifications._process_feature_notification(child, synthetic) + return + + if logger.isEnabledFor(logging.DEBUG): + logger.debug( + "CenturionReceiver: unhandled bridge event func=%d addr=0x%02X data=%s", + event_func, + n.address, + n.data[:12].hex() if n.data else "", + ) + def __str__(self): return f"" @@ -260,6 +361,13 @@ def _start(device_info: DeviceInfo): if not device_info.isDevice: receiver_ = logitech_receiver.receiver.create_receiver(base, device_info, _setting_callback) + elif getattr(device_info, "centurion", False): + receiver_ = logitech_receiver.device.create_centurion_receiver(base, device_info, _setting_callback) + if receiver_ is None: + # No bridge found — treat as a direct-connected centurion device (e.g., wired headset) + receiver_ = logitech_receiver.device.create_device(base, device_info, _setting_callback) + if receiver_: + configuration.attach_to(receiver_) else: receiver_ = logitech_receiver.device.create_device(base, device_info, _setting_callback) if receiver_: diff --git a/lib/solaar/ui/config_panel.py b/lib/solaar/ui/config_panel.py index 4c6e1a51..7622dcba 100644 --- a/lib/solaar/ui/config_panel.py +++ b/lib/solaar/ui/config_panel.py @@ -545,6 +545,69 @@ class PackedRangeControl(MultipleRangeControl): self._button.set_tooltip_text(b) +class GraphicEQControl(MultipleControl): + def setup(self, setting): + self._items = [] + validator = setting._validator + row = Gtk.ListBoxRow() + hbox = Gtk.HBox(homogeneous=True, spacing=8) + for item in range(validator.count): + vbox = Gtk.VBox(homogeneous=False, spacing=2) + scale = Gtk.Scale.new_with_range(Gtk.Orientation.VERTICAL, validator.min_value, validator.max_value, 1) + scale.set_inverted(True) + scale.set_round_digits(0) + scale.set_digits(0) + scale.set_draw_value(True) + scale.connect("format-value", lambda s, v: f"{int(v)} dB") + scale.set_has_origin(True) + scale.set_size_request(-1, 150) + scale.add_mark(0, Gtk.PositionType.LEFT, "0") + scale.connect(GtkSignal.VALUE_CHANGED.value, self._changed, validator.keys[item]) + lbl = Gtk.Label(label=str(validator.keys[item])) + lbl.set_line_wrap(True) + lbl.set_justify(Gtk.Justification.CENTER) + vbox.pack_start(scale, True, True, 0) + vbox.pack_end(lbl, False, False, 0) + vbox._setting_item = validator.keys[item] + vbox.control = scale + hbox.pack_start(vbox, True, True, 0) + self._items.append(vbox) + row.add(hbox) + self.add(row) + + def _changed(self, control, item): + if control.get_sensitive(): + if hasattr(control, "_timer"): + control._timer.cancel() + control._timer = Timer(0.5, lambda: GLib.idle_add(self._write, control, item)) + control._timer.start() + + def _write(self, control, item): + control._timer.cancel() + delattr(control, "_timer") + new_state = int(control.get_value()) + if self.sbox.setting._value[int(item)] != new_state: + self.sbox.setting._value[int(item)] = new_state + _write_async(self.sbox.setting, self.sbox.setting._value[int(item)], self.sbox, key=int(item)) + + def set_value(self, value): + if value is None: + return + b = "" + n = len(self._items) + for vbox in self._items: + item = vbox._setting_item + v = value.get(int(item), None) + if v is not None: + vbox.control.set_value(v) + else: + v = self.sbox.setting._value[int(item)] + b += f"{str(item)}: ({str(v)}) " + lbl_text = ngettext("%d value", "%d values", n) % n + self._button.set_label(lbl_text) + self._button.set_tooltip_text(b) + + # control with an ID key that determines what else to show class HeteroKeyControl(Gtk.HBox, Control): def __init__(self, sbox, delegate=None): @@ -715,6 +778,8 @@ def _create_sbox(s, _device): control = MultipleRangeControl(sbox, change) elif s.kind == settings.Kind.PACKED_RANGE: control = PackedRangeControl(sbox, change) + elif s.kind == settings.Kind.GRAPHIC_EQ: + control = GraphicEQControl(sbox, change) elif s.kind == settings.Kind.HETERO: control = HeteroKeyControl(sbox, change) else: diff --git a/lib/solaar/ui/window.py b/lib/solaar/ui/window.py index a9f117e1..f2d5ed5f 100644 --- a/lib/solaar/ui/window.py +++ b/lib/solaar/ui/window.py @@ -536,7 +536,13 @@ def _update_details(button): if device.product_id: yield _("Product ID"), f"{LOGITECH_VENDOR_ID:04x}:" + device.product_id hid_version = device.protocol - yield _("Protocol"), f"HID++ {hid_version:1.1f}" if hid_version else _("Unknown") + cent_proto = getattr(device, "_centurion_protocol", None) + if cent_proto: + yield _("Protocol"), f"Centurion {cent_proto[0]}.{cent_proto[1]}" + elif hid_version: + yield _("Protocol"), f"HID++ {hid_version:1.1f}" + else: + yield _("Protocol"), _("Unknown") if read_all and device.polling_rate: yield _("Polling rate"), device.polling_rate diff --git a/tests/logitech_receiver/fake_hidpp.py b/tests/logitech_receiver/fake_hidpp.py index 01f2d7a6..7758f2f5 100644 --- a/tests/logitech_receiver/fake_hidpp.py +++ b/tests/logitech_receiver/fake_hidpp.py @@ -386,6 +386,7 @@ class Device: version: Optional[int] = 0 wpid: Optional[str] = "0000" setting_callback: Any = None + centurion: bool = False sliding = profiles = _backlight = _keys = _remap_keys = _led_effects = _gestures = None _gestures_lock = threading.Lock() number = "d1" @@ -400,6 +401,7 @@ class Device: gestures = device.Device.gestures __hash__ = device.Device.__hash__ feature_request = device.Device.feature_request + _infer_kind_centurion = device.Device._infer_kind_centurion def __post_init__(self): self._name = self.name @@ -424,6 +426,9 @@ class Device: if self.setting_callback is None: self.setting_callback = lambda x, y, z: None self.add_notification_handler = lambda x, y: None + # Centurion bridge responses: keyed by (sub_feat_idx, sub_function) -> response bytes + if not hasattr(self, "_bridge_responses"): + self._bridge_responses = {} def request(self, id, *params, no_reply=False, long_message=False, protocol=2.0): params = b"".join(pack("B", p) if isinstance(p, int) else p for p in params) @@ -434,6 +439,24 @@ class Device: return bytes.fromhex(r.response) if isinstance(r.response, str) else r.response print("RESPONSE", self._name, None) + def centurion_bridge_request(self, sub_feat_idx, sub_function=0x00, *params, no_reply=False): + """Fake bridge request — looks up (sub_feat_idx, sub_function, params) in _bridge_responses.""" + params_bytes = b"".join(pack("B", p) if isinstance(p, int) else p for p in params) if params else b"" + key = (sub_feat_idx, sub_function, params_bytes.hex().upper()) + print("BRIDGE ", self._name, f"sub_idx={sub_feat_idx} func={sub_function} params={params_bytes.hex().upper()}") + result = self._bridge_responses.get(key) + if result is not None: + print("BRIDGE_R", self._name, result.hex().upper()) + return result + # Try without params for convenience + key_no_params = (sub_feat_idx, sub_function, "") + result = self._bridge_responses.get(key_no_params) + if result is not None: + print("BRIDGE_R", self._name, result.hex().upper()) + return result + print("BRIDGE_R", self._name, None) + return None + def ping(self, handle=None, devnumber=None, long_message=False): print("PING", self._protocol) return self._protocol @@ -451,6 +474,25 @@ class Device: pass +# Centurion headset (PRO X 2 LIGHTSPEED) parent device responses. +# Parent has 5 features: ROOT(0), FeatureSet(1), DeviceInfo(2), CentPPBridge(3), GenericDFU(4) +# Feature 0x0003 on Centurion = CentPPBridge (NOT FirmwareInfo). +r_centurion_headset = [ + Response(2.6, 0x0010), # ping (protocol 2.6) + Response("010001", 0x0000, "0001"), # FeatureSet at index 1 + Response("020001", 0x0000, "0100"), # DeviceInfo at index 2 + Response("030001", 0x0000, "0003"), # CentPPBridge at index 3 + Response("040001", 0x0000, "010A"), # GenericDFU at index 4 + Response("05", 0x0100), # feature count = 5 (includes ROOT on Centurion) + # FeatureSet.getFeatureID responses: [remaining_count, feat_hi, feat_lo, type, flags] + Response("0400000000", 0x0110, "00"), # index 0: ROOT (0x0000) + Response("0300010001", 0x0110, "01"), # index 1: FeatureSet (0x0001) + Response("0201000001", 0x0110, "02"), # index 2: DeviceInfo (0x0100) + Response("0100030001", 0x0110, "03"), # index 3: CentPPBridge (0x0003) + Response("00010A0001", 0x0110, "04"), # index 4: GenericDFU (0x010A) +] + + def match_requests(number, responses, call_args_list): for i in range(0 - number, 0): param = b"".join(pack("B", p) if isinstance(p, int) else p for p in call_args_list[i][0][1:]).hex().upper() diff --git a/tests/logitech_receiver/test_device.py b/tests/logitech_receiver/test_device.py index 0d45d69c..b291dba9 100644 --- a/tests/logitech_receiver/test_device.py +++ b/tests/logitech_receiver/test_device.py @@ -60,6 +60,7 @@ class DeviceInfoStub: hidpp_long: bool = True bus_id: int = 0x0003 # USB serial: str = "aa:aa:aa;aa" + centurion: bool = False di_bad_handle = DeviceInfoStub(None, product_id="CCCC") @@ -70,6 +71,7 @@ di_B530 = DeviceInfoStub("11", product_id="B350", bus_id=0x0005) di_C068 = DeviceInfoStub("11", product_id="C06B") di_C08A = DeviceInfoStub("11", product_id="C08A") di_DDDD = DeviceInfoStub("11", product_id="DDDD") +di_0AF7 = DeviceInfoStub("11", product_id="0AF7", centurion=True) @pytest.mark.parametrize( @@ -78,6 +80,7 @@ di_DDDD = DeviceInfoStub("11", product_id="DDDD") (di_bad_handle, fake_hidpp.r_empty, None), (di_error, fake_hidpp.r_empty, False), (di_CCCC, fake_hidpp.r_empty, True), + (di_0AF7, fake_hidpp.r_empty, True), ], ) def test_create_device(device_info, responses, expected_success): @@ -93,6 +96,22 @@ def test_create_device(device_info, responses, expected_success): assert bool(test_device) == expected_success +def test_create_centurion_device(): + """Test that a centurion device gets hidpp_long forced to True and centurion flag set.""" + from logitech_receiver import base + + low_level_mock = LowLevelInterfaceFake(fake_hidpp.r_empty) + test_device = device.create_device(low_level_mock, di_0AF7) + + assert test_device is not None + assert test_device.centurion is True + assert test_device.hidpp_long is True + assert int(test_device.handle) in base._centurion_handles + + # Clean up + base._centurion_handles.discard(int(test_device.handle)) + + @pytest.mark.parametrize( "device_info, responses, expected_codename, expected_name, expected_kind", [(di_CCCC, fake_hidpp.r_empty, "?? (CCCC)", "Unknown device CCCC", "?")], diff --git a/tests/logitech_receiver/test_hidpp20_complex.py b/tests/logitech_receiver/test_hidpp20_complex.py index dc7d4b85..1e3ef9dc 100644 --- a/tests/logitech_receiver/test_hidpp20_complex.py +++ b/tests/logitech_receiver/test_hidpp20_complex.py @@ -22,6 +22,7 @@ from logitech_receiver import exceptions from logitech_receiver import hidpp20 from logitech_receiver import hidpp20_constants from logitech_receiver import special_keys +from logitech_receiver.device import CenturionReceiver from logitech_receiver.hidpp20 import KeyFlag from logitech_receiver.hidpp20 import MappingFlag from logitech_receiver.hidpp20_constants import GestureId @@ -101,10 +102,9 @@ def test_FeaturesArray_get_feature(device, expected0, expected1, expected2, expe (hidpp20_constants.SupportedFeature.FEATURE_SET, 1), (hidpp20_constants.SupportedFeature.CONFIG_CHANGE, 2), (hidpp20_constants.SupportedFeature.DEVICE_FW_VERSION, 3), - (common.NamedInt(256, "unknown:0100"), 4), + (hidpp20_constants.SupportedFeature.CENTURION_DEVICE_INFO, 4), (hidpp20_constants.SupportedFeature.REPROG_CONTROLS_V4, 5), - (None, 6), - (None, 7), + # Indices 6 and 7 have no responses — get_feature returns None, enumerate skips them (hidpp20_constants.SupportedFeature.BATTERY_STATUS, 8), ], ), @@ -922,3 +922,361 @@ def test_onboard_profiles_device(responses, name, count, buttons, gbuttons, sect yml_dump = yaml.dump(profiles) assert yaml.safe_load(yml_dump).to_bytes().hex() == profiles.to_bytes().hex() + + +# --- Centurion (PRO X 2 LIGHTSPEED headset) tests --- + +device_centurion = fake_hidpp.Device("CENTURION", True, 2.6, fake_hidpp.r_centurion_headset, centurion=True) + + +def test_centurion_parent_feature_discovery(): + """Parent feature enumeration discovers CentPPBridge at index 3 and stores bridge index.""" + dev = fake_hidpp.Device("CENT_PARENT", True, 2.6, fake_hidpp.r_centurion_headset, centurion=True) + featuresarray = hidpp20.FeaturesArray(dev) + dev.features = featuresarray + + result = featuresarray._check() + + assert result is True + assert featuresarray.count == 5 + # Parent features registered + assert featuresarray[hidpp20_constants.SupportedFeature.ROOT] == 0 + assert featuresarray[hidpp20_constants.SupportedFeature.FEATURE_SET] == 1 + assert featuresarray[hidpp20_constants.SupportedFeature.CENTURION_DEVICE_INFO] == 2 + assert featuresarray[hidpp20_constants.SupportedFeature.CENTURION_GENERIC_DFU] == 4 + # Feature 0x0003 = CentPPBridge on Centurion (stored as DEVICE_FW_VERSION since same ID) + assert featuresarray[hidpp20_constants.SupportedFeature.DEVICE_FW_VERSION] == 3 + # Bridge index stored on device + assert dev._centurion_bridge_index == 3 + assert hasattr(dev, "_centurion_sub_features") + + +def test_centurion_sub_device_feature_discovery(): + """Sub-device feature discovery routes through bridge and populates _centurion_sub_features.""" + dev = fake_hidpp.Device("CENT_SUB", True, 2.6, fake_hidpp.r_centurion_headset, centurion=True) + # Set up bridge responses for sub-device discovery: + # 1. CenturionRoot.GetFeature(0x0001) -> FeatureSet at sub-index 1 + # 2. CenturionFeatureSet.GetFeatureId(index=0) -> bulk feature list + dev._bridge_responses = { + # CenturionRoot(idx=0).GetFeature(func=0) with feature_id=0x0001 -> sub_fs_index=1 + (0x00, 0x00, "0001"): bytes([0x01, 0x00, 0x00]), + # CenturionFeatureSet(idx=1).GetFeatureId(func=0x10, start=0) -> 3 features + # Response: [count, (feat_hi, feat_lo, type, flags) × count] + (0x01, 0x10, "00"): bytes( + [ + 0x03, # 3 features + 0x06, + 0x04, + 0x00, + 0x00, # HEADSET_AUDIO_SIDETONE (0x0604) at sub-idx 0 + 0x06, + 0x01, + 0x00, + 0x00, # HEADSET_MIC_MUTE (0x0601) at sub-idx 1 + 0x06, + 0x11, + 0x00, + 0x00, # HEADSET_MIC_GAIN (0x0611) at sub-idx 2 + ] + ), + } + featuresarray = hidpp20.FeaturesArray(dev) + dev.features = featuresarray + + featuresarray._check() + + # Sub-device features should be discovered + assert hidpp20_constants.SupportedFeature.HEADSET_AUDIO_SIDETONE in dev._centurion_sub_features + assert hidpp20_constants.SupportedFeature.HEADSET_MIC_MUTE in dev._centurion_sub_features + assert hidpp20_constants.SupportedFeature.HEADSET_MIC_GAIN in dev._centurion_sub_features + # Sub-device features should be in features array with their sub-device indices + assert featuresarray[hidpp20_constants.SupportedFeature.HEADSET_AUDIO_SIDETONE] == 0 + assert featuresarray[hidpp20_constants.SupportedFeature.HEADSET_MIC_MUTE] == 1 + assert featuresarray[hidpp20_constants.SupportedFeature.HEADSET_MIC_GAIN] == 2 + # _centurion_sub_indices should map ALL sub-device features to their sub-device indices + assert dev._centurion_sub_indices[hidpp20_constants.SupportedFeature.HEADSET_AUDIO_SIDETONE] == 0 + assert dev._centurion_sub_indices[hidpp20_constants.SupportedFeature.HEADSET_MIC_MUTE] == 1 + assert dev._centurion_sub_indices[hidpp20_constants.SupportedFeature.HEADSET_MIC_GAIN] == 2 + + +def test_centurion_feature_request_routes_sub_device(): + """feature_request() routes sub-device features through centurion_bridge_request().""" + dev = fake_hidpp.Device("CENT_ROUTE", True, 2.6, fake_hidpp.r_centurion_headset, centurion=True) + # Manually set up sub-device state (simulating completed discovery) + dev.features.count = 5 # mark discovery as complete so _check() short-circuits + dev._centurion_bridge_index = 3 + dev._centurion_sub_features = {hidpp20_constants.SupportedFeature.HEADSET_AUDIO_SIDETONE} + dev.features[hidpp20_constants.SupportedFeature.HEADSET_AUDIO_SIDETONE] = 7 # sub-device index + # Set up bridge response for GetSidetoneLevel + dev._bridge_responses = { + (7, 0x00, ""): bytes([0x01, 0x00, 0x32]), # mic_id=1, mute=0, level=50 + } + + result = dev.feature_request(hidpp20_constants.SupportedFeature.HEADSET_AUDIO_SIDETONE, 0x00) + + assert result is not None + assert result == bytes([0x01, 0x00, 0x32]) + + +def test_centurion_feature_request_parent_not_routed(): + """feature_request() does NOT route parent features through bridge.""" + dev = fake_hidpp.Device("CENT_PARENT2", True, 2.6, fake_hidpp.r_centurion_headset, centurion=True) + dev._centurion_bridge_index = 3 + dev._centurion_sub_features = {hidpp20_constants.SupportedFeature.HEADSET_AUDIO_SIDETONE} + + # FeatureSet is a parent feature — should go through normal request(), not bridge + featuresarray = hidpp20.FeaturesArray(dev) + dev.features = featuresarray + featuresarray._check() + + # CENTURION_DEVICE_INFO is a parent feature at index 2 — requesting it should + # NOT go through bridge, it should go through the normal hidpp20.feature_request path + assert hidpp20_constants.SupportedFeature.CENTURION_DEVICE_INFO not in dev._centurion_sub_features + + +def test_centurion_bridge_request_write(): + """centurion_bridge_request with no_reply=True returns None immediately.""" + dev = fake_hidpp.Device("CENT_WRITE", True, 2.6, fake_hidpp.r_centurion_headset, centurion=True) + dev._centurion_bridge_index = 3 + dev._centurion_sub_features = {hidpp20_constants.SupportedFeature.HEADSET_AUDIO_SIDETONE} + dev.features[hidpp20_constants.SupportedFeature.HEADSET_AUDIO_SIDETONE] = 7 + dev._bridge_responses = {} # no responses needed for no_reply + + result = dev.centurion_bridge_request(7, 0x10, 0x32, no_reply=True) + + assert result is None + + +def test_centurion_firmware_dedup(): + """get_firmware_centurion() deduplicates identical firmware entries.""" + # Simulate parent device that returns the same firmware for every entity index + fw_response = "00" + "00" + "0105" + "04" + "44303031" + "00" * 20 # type=0, ver=1.05, name="D001" + responses = fake_hidpp.r_centurion_headset + [fake_hidpp.Response(fw_response, 0x0210, f"{i:02X}") for i in range(8)] + dev = fake_hidpp.Device("CENT_DEDUP", True, 2.6, responses, centurion=True) + + fw = _hidpp20.get_firmware_centurion(dev) + + # Should only get 1 entry, not 8 + assert fw is not None + assert len(fw) == 1 + assert fw[0].name == "D001" + + +def test_centurion_sub_device_firmware(): + """get_firmware_centurion_sub() queries sub-device firmware via bridge.""" + dev = fake_hidpp.Device("CENT_SUBFW", True, 2.6, fake_hidpp.r_centurion_headset, centurion=True) + dev._centurion_bridge_index = 3 + dev._centurion_sub_features = set() + dev._centurion_sub_indices = {hidpp20_constants.SupportedFeature.CENTURION_DEVICE_INFO: 2} + # Sub-device firmware: type=0 (firmware), ver=3.02, name="H001" + dev._bridge_responses = { + (2, 0x10, "00"): bytes([0x00, 0x00, 0x03, 0x02, 0x04]) + b"H001", + (2, 0x10, "01"): bytes([0x00, 0x00, 0x03, 0x02, 0x04]) + b"H001", # duplicate → dedup stops + } + + fw = _hidpp20.get_firmware_centurion_sub(dev) + + assert fw is not None + assert len(fw) == 1 + assert fw[0].name == "H001" + assert fw[0].version == "3.02" + + +def test_centurion_sub_device_serial(): + """get_serial_centurion_sub() queries sub-device serial via bridge.""" + dev = fake_hidpp.Device("CENT_SUBSER", True, 2.6, fake_hidpp.r_centurion_headset, centurion=True) + dev._centurion_bridge_index = 3 + dev._centurion_sub_features = set() + dev._centurion_sub_indices = {hidpp20_constants.SupportedFeature.CENTURION_DEVICE_INFO: 2} + dev._bridge_responses = { + (2, 0x20, ""): bytes([0x0C]) + b"ABC123DEF456", + } + + serial = _hidpp20.get_serial_centurion_sub(dev) + + assert serial == "ABC123DEF456" + + +def test_centurion_sub_device_hardware_info(): + """get_hardware_info_centurion_sub() queries sub-device hardware info via bridge.""" + dev = fake_hidpp.Device("CENT_SUBHW", True, 2.6, fake_hidpp.r_centurion_headset, centurion=True) + dev._centurion_bridge_index = 3 + dev._centurion_sub_features = set() + dev._centurion_sub_indices = {hidpp20_constants.SupportedFeature.CENTURION_DEVICE_INFO: 2} + dev._bridge_responses = { + (2, 0x00, ""): bytes([0x01, 0x03, 0x0A, 0xF7]), + } + + hw_info = _hidpp20.get_hardware_info_centurion_sub(dev) + + assert hw_info is not None + model_id, hw_rev, product_id = hw_info + assert model_id == 1 + assert hw_rev == 3 + assert product_id == 0x0AF7 + + +def test_centurion_kind_inference(): + """Centurion device with 0x06xx audio features infers kind=headset.""" + dev = fake_hidpp.Device("CENT_KIND", True, 2.6, fake_hidpp.r_centurion_headset, centurion=True) + dev._centurion_sub_features = { + hidpp20_constants.SupportedFeature.HEADSET_AUDIO_SIDETONE, + hidpp20_constants.SupportedFeature.HEADSET_MIC_MUTE, + } + + kind = dev._infer_kind_centurion() + + from logitech_receiver import hidpp10_constants + + assert kind == hidpp10_constants.DEVICE_KIND.headset + + +# --- CenturionReceiver tests --- + + +class FakeCenturionDeviceInfo: + """Minimal device_info for CenturionReceiver tests.""" + + def __init__(self, path="/dev/hidraw99", product_id="0AF0", product=None, centurion=True): + self.path = path + self.product_id = product_id + self.product = product + self.centurion = centurion + self.isDevice = True + + +class FakeLowLevel: + """Minimal low_level for CenturionReceiver tests.""" + + def __init__(self, ping_protocol=2.6): + self.ping_protocol = ping_protocol + self.opened_paths = [] + self.closed_handles = [] + + def open_path(self, path): + self.opened_paths.append(path) + return 0x99 + + def ping(self, handle, number, long_message=False): + return self.ping_protocol + + def request(self, handle, devnumber, request_id, *params, **kwargs): + return None + + def close(self, handle, *args, **kwargs): + self.closed_handles.append(handle) + return True + + def find_paired_node(self, receiver_path, index, timeout): + return None + + +def test_centurion_receiver_attributes(): + """CenturionReceiver has correct receiver-like attributes.""" + info = FakeCenturionDeviceInfo(product="PRO X 2 LIGHTSPEED") + recv = CenturionReceiver(FakeLowLevel(), 0x99, info) + + assert recv.kind is None + assert recv.isDevice is False + assert recv.number == 0xFF + assert recv.max_devices == 1 + assert recv.may_unpair is False + assert recv.re_pairs is False + assert recv.handle == 0x99 + assert recv.path == "/dev/hidraw99" + assert recv.product_id == "0AF0" + assert recv.name == "Centurion Receiver" + assert recv.serial is None + assert recv.pairing is not None + assert recv.pairing.lock_open is False + assert bool(recv) is True + + +def test_centurion_receiver_container_empty(): + """Empty CenturionReceiver has correct container behavior.""" + info = FakeCenturionDeviceInfo() + recv = CenturionReceiver(FakeLowLevel(), 0x99, info) + + assert len(recv) == 0 + assert recv.count() == 0 + assert 1 not in recv + assert list(recv) == [] + assert recv.status_string() == "No devices." + + +def test_centurion_receiver_container_with_device(): + """CenturionReceiver with a child device has correct container behavior.""" + info = FakeCenturionDeviceInfo() + recv = CenturionReceiver(FakeLowLevel(), 0x99, info) + + # Simulate adding a child device (a simple mock) + class FakeChild: + number = 1 + + def close(self): + pass + + recv._devices[1] = FakeChild() + + assert len(recv) == 1 + assert recv.count() == 1 + assert 1 in recv + assert 2 not in recv + assert recv[1] is not None + assert recv.status_string() == "1 device connected." + with pytest.raises(IndexError): + recv[2] + + +def test_centurion_receiver_enable_connection_notifications(): + """CenturionReceiver.enable_connection_notifications() returns False.""" + info = FakeCenturionDeviceInfo() + recv = CenturionReceiver(FakeLowLevel(), 0x99, info) + + assert recv.enable_connection_notifications() is False + assert recv.remaining_pairings() is None + + +def test_centurion_receiver_device_codename(): + """CenturionReceiver.device_codename() returns USB product name.""" + info = FakeCenturionDeviceInfo(product="PRO X 2 LIGHTSPEED") + recv = CenturionReceiver(FakeLowLevel(), 0x99, info) + + assert recv.device_codename(1) == "PRO X 2 LIGHTSPEED" + + +def test_centurion_receiver_close(): + """CenturionReceiver.close() closes handle and clears devices.""" + low_level = FakeLowLevel() + info = FakeCenturionDeviceInfo() + recv = CenturionReceiver(low_level, 0x99, info) + + class FakeChild: + closed = False + + def close(self): + self.closed = True + + child = FakeChild() + recv._devices[1] = child + + recv.close() + + assert recv.handle is None + assert len(recv._devices) == 0 + assert child.closed is True + assert 0x99 in low_level.closed_handles + assert bool(recv) is False + + +def test_centurion_receiver_changed_callback(): + """CenturionReceiver.changed() invokes status_callback.""" + info = FakeCenturionDeviceInfo() + recv = CenturionReceiver(FakeLowLevel(), 0x99, info) + calls = [] + recv.status_callback = lambda *args, **kwargs: calls.append((args, kwargs)) + + recv.changed() + + assert len(calls) == 1 + assert calls[0][0][0] is recv diff --git a/tests/logitech_receiver/test_setting_templates.py b/tests/logitech_receiver/test_setting_templates.py index b1cff158..dbfb9b05 100644 --- a/tests/logitech_receiver/test_setting_templates.py +++ b/tests/logitech_receiver/test_setting_templates.py @@ -638,6 +638,60 @@ key_tests = [ fake_hidpp.Response("E010", 0x0430, "02E010"), fake_hidpp.Response("E018", 0x0430, "02E018"), ), + Setup( # HeadsetOnboardEQ: 2 bands, 128Hz/-2dB/Q10 and 256Hz/+3dB/Q10 + FeatureTest(settings_templates.HeadsetOnboardEQ, {0: -2, 1: 3}, {1: 5}, 2), + [-12, 12], + fake_hidpp.Response("8000000002", 0x0400), # GetEQInfos: has_hw_eq, 2 bands + fake_hidpp.Response("00020080FE0A0100030A", 0x0410, "00"), # GetEQParameters + fake_hidpp.Response( # SetEQParameters: write initial values back (slot 0x00) + "00", + 0x0420, + "00020080FE0A0100030A" + "055AE300" # mystery bytes (from pcap) + "030E00020000000100170002" + "007A6B00D69D94008C516B00C82380005DC27F0075" + "906B0022B6940002226B00B44080007EA37F00C1C3040044" + "0200170002" + "00506B00900F95006ED56A00D185800060477F00CA" + "906B00229D950052496A00A12E810085EC7E0075C40400AC", + ), + fake_hidpp.Response( # SetEQParameters: persist initial values (slot 0x80) + "00", + 0x0420, + "80020080FE0A0100030A" + "055AE300" + "030E00020000000100170002" + "007A6B00D69D94008C516B00C82380005DC27F0075" + "906B0022B6940002226B00B44080007EA37F00C1C3040044" + "0200170002" + "00506B00900F95006ED56A00D185800060477F00CA" + "906B00229D950052496A00A12E810085EC7E0075C40400AC", + ), + fake_hidpp.Response( # SetEQParameters: write updated band 1 gain=5 (slot 0x00) + "00", + 0x0420, + "00020080FE0A0100050A" + "055AE300" + "030E00020000000100170002" + "006F6B00F5A894006A466B00EB2380005DC27F0075" + "906B0022BC9400AA156B00613B80007CAD7F00C6C30400BF" + "0200170002" + "00306B00272F9500BAB56A008C85800060477F00CA" + "906B0022B1950002226A000E1F8100AB0A7F004FC604001D", + ), + fake_hidpp.Response( # SetEQParameters: persist updated values (slot 0x80) + "00", + 0x0420, + "80020080FE0A0100050A" + "055AE300" + "030E00020000000100170002" + "006F6B00F5A894006A466B00EB2380005DC27F0075" + "906B0022BC9400AA156B00613B80007CAD7F00C6C30400BF" + "0200170002" + "00306B00272F9500BAB56A008C85800060477F00CA" + "906B0022B1950002226A000E1F8100AB0A7F004FC604001D", + ), + ), Setup( FeatureTest(settings_templates.PerKeyLighting, {1: -1, 2: -1, 9: -1, 10: -1, 113: -1}, {2: 0xFF0000}, 4, 4, 0, 1), {