"""Read-only corpus probe for the headset RGB feature pair: - HEADSET_RGB_ONBOARD_EFFECTS (0x0621) - HEADSET_RGB_SIGNATURE_EFFECTS (0x0622) Logs raw response bytes and lengths at INFO so field testers without ``-dd`` can still capture the data. All calls are strictly read-side — no setters are invoked. If a feature isn't present the probe short-circuits cleanly. Pcap analysis of G HUB's color-set traffic confirmed that on 0x0621, ``setRGBClusterEffect`` (fn 0x30) takes a 10-byte payload ``[cluster, effect_id_BE_u16, R, G, B, ...]`` where ``effect_id=0x0000`` means "Static (with RGB)" — this is also the slot-0 entry in the fn 0x10 ``getRGBClusterInfo`` reply, which we decode structurally so the test corpus shows effect-id semantics in plaintext. """ from __future__ import annotations import logging from . import exceptions from .hidpp20_constants import SupportedFeature logger = logging.getLogger(__name__) def _hex_or_none(data) -> str | None: return data.hex() if data else None def _format_feature(feat) -> str: """Render a feature for the log: 0x{id:04X}{:NAME} when known, else raw. Unknown features are stored as the string "unknown:HHHH" by the feature discovery code, so handle that shape explicitly — int(feat) on those raises ValueError. Wrap the rest in a broad except so a future unhandled feature shape can't kill the whole table dump. """ if feat is None: return "?" if isinstance(feat, str): if feat.startswith("unknown:") and len(feat) > 8: return f"0x{feat[8:].upper()}" return feat try: return f"0x{int(feat):04X}:{feat.name}" except (AttributeError, TypeError, ValueError): try: return f"0x{int(feat):04X}" except (TypeError, ValueError): return repr(feat) def _log_feature_table(device) -> None: if not device.features: return try: # Parent features live in FeaturesArray.inverse, indexed by their # parent feature-set position. On Centurion devices these are the # ones the dongle itself exposes (typically 5-6 entries). parent = [] for idx in range(len(device.features)): parent.append(f"{idx}:{_format_feature(device.features[idx])}") logger.debug("RGB probe: parent features for %s: %s", device, ", ".join(parent)) # Centurion sub-device features live in FeaturesArray.sub_inverse, # keyed by sub-device feature index. These are where the actual # headset features (0x0620/0x0621/0x0622, LogiVoice, EQ, mic mute, # …) live — without dumping them the log shows only the dongle's # parent features and gives the wrong impression that the device # has nothing else. sub_inverse = getattr(device.features, "sub_inverse", None) if sub_inverse: sub = [f"{idx}:{_format_feature(feat)}" for idx, feat in sorted(sub_inverse.items())] logger.debug("RGB probe: sub-device features for %s: %s", device, ", ".join(sub)) except Exception as e: logger.debug("RGB probe: feature-table dump failed: %s", e) def _call(device, feature: SupportedFeature, fn: int, *params): """Wrap feature_request with uniform INFO logging. Returns the raw bytes on success, None on transport/no-feature, and doesn't raise — FeatureCallError is caught and logged as an error code. """ label = f"0x{int(feature):04X}.fn{fn:02X}" if params: label += "(" + ",".join(f"{b:02X}" for b in params) + ")" try: resp = device.feature_request(feature, fn, *params) except exceptions.FeatureCallError as e: logger.debug("RGB probe: %s err=0x%02X", label, getattr(e, "error", 0) & 0xFF) return None except Exception as e: logger.debug("RGB probe: %s raised %r", label, e) return None if resp is None: logger.debug("RGB probe: %s no reply (feature unsupported or transport failure)", label) return None logger.debug("RGB probe: %s resp=%s len=%d", label, _hex_or_none(resp), len(resp)) return resp # Names for known effect_ids on the headset RGB cluster. Confirmed via # pcap analysis of G HUB color-set traffic: setRGBClusterEffect with # effect_id=0x0000 + RGB writes a static color, so 0x0000 is "Static" # rather than the "Off / Disabled" we'd guessed from cluster ordering. # Other ids haven't been observed on the wire yet — names are placeholder # until further pcap traffic confirms them. _EFFECT_ID_NAMES = { 0x0000: "Static", 0x0001: "Effect 0x0001", 0x0006: "Effect 0x0006", 0x0007: "Effect 0x0007", 0x000F: "Effect 0x000F", 0x007F: "Effect 0x007F", } def _decode_cluster_info(resp) -> str | None: """Decode a 0x0621 fn 0x10 getRGBClusterInfo reply into a readable summary. Best-effort — returns None on unexpected length/shape. Observed shape on G522: 4-byte records (effect_id LE u16, slot_idx LE u16). The effect_id at slot 0 is 0x0000 = "Static" (with RGB), confirmed by pcap of G HUB color-set traffic. Records continue until trailing-zero padding. Note: most HID++ multi-byte fields are BE, but this particular response uses LE — confirmed against captured factory-default bytes on G522 where the values 0x0001 / 0x000F / 0x007F appear at byte 0 of each record with byte 1 = 0x00 (consistent with LE u16). """ if not resp or len(resp) < 4: return None effects = [] seen_static = False for i in range(0, len(resp) - 3, 4): eid = resp[i] | (resp[i + 1] << 8) slot = resp[i + 2] | (resp[i + 3] << 8) # Skip purely-zero padding once we've seen the (effect=0, slot=0) entry. if eid == 0 and slot == 0: if seen_static: continue seen_static = True name = _EFFECT_ID_NAMES.get(eid, f"0x{eid:04X}") effects.append(f"slot={slot}:{name}") return ", ".join(effects) if effects else None def probe_onboard_effects(device) -> None: """Probe 0x0621 RGBOnboardEffects read-side functions.""" feature = SupportedFeature.HEADSET_RGB_ONBOARD_EFFECTS if not device.features or feature not in device.features: return logger.debug("RGB probe: 0x0621 HEADSET_RGB_ONBOARD_EFFECTS present on %s", device) # fn 0x00 getInfo — empty payload _call(device, feature, 0x00) # fn 0x10 getRGBClusterInfo — iterate cluster indexes 0..7, stop on error. for cluster_idx in range(8): resp = _call(device, feature, 0x10, cluster_idx) if resp is None: break decoded = _decode_cluster_info(resp) if decoded: logger.debug("RGB probe: 0x0621.fn10(%02X) decoded: %s", cluster_idx, decoded) # fn 0x20 getRGBClusterEffect — current state per cluster. for cluster_idx in range(8): resp = _call(device, feature, 0x20, cluster_idx) if resp is None: break # fn 0x40 getRGBCustomEffectName — single call, documented. _call(device, feature, 0x40) def probe_signature_effects(device) -> None: """Probe 0x0622 RGBSignatureEffects read-side functions.""" feature = SupportedFeature.HEADSET_RGB_SIGNATURE_EFFECTS if not device.features or feature not in device.features: return logger.debug("RGB probe: 0x0622 HEADSET_RGB_SIGNATURE_EFFECTS present on %s", device) # fn 0x00 getSignatureEffectsInfo. _call(device, feature, 0x00) # fn 0x10 getSignatureEffectParams — iterate effectId 0..2 (Startup/Shutdown/Passive). # effectId is u16 BE. for eid in range(3): _call(device, feature, 0x10, (eid >> 8) & 0xFF, eid & 0xFF) # fn 0x30 getSignatureEffectState — same effectId range. for eid in range(3): _call(device, feature, 0x30, (eid >> 8) & 0xFF, eid & 0xFF) def probe(device) -> None: """Run both read-only RGB-effects probes once per device. Gated via ``_rgb_effects_probed`` so re-entry on reconnect / setting rebuild doesn't spam the log with duplicate corpus dumps. """ if getattr(device, "_rgb_effects_probed", False): return device._rgb_effects_probed = True _log_feature_table(device) try: probe_onboard_effects(device) except Exception as e: logger.debug("RGB probe: onboard-effects probe raised %r", e) try: probe_signature_effects(device) except Exception as e: logger.debug("RGB probe: signature-effects probe raised %r", e)