## Copyright (C) 2024 Solaar Contributors https://pwr-solaar.github.io/Solaar/ ## ## This program is free software; you can redistribute it and/or modify ## it under the terms of the GNU General Public License as published by ## the Free Software Foundation; either version 2 of the License, or ## (at your option) any later version. ## ## This program is distributed in the hope that it will be useful, ## but WITHOUT ANY WARRANTY; without even the implied warranty of ## MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the ## GNU General Public License for more details. ## ## You should have received a copy of the GNU General Public License along ## with this program; if not, write to the Free Software Foundation, Inc., ## 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. """AdvancedParaEQ (0x020D) helpers. The device handles biquad coefficient computation — we transmit only per-band filter-type + frequency + gain; the DSP does the rest. V0/V1 wire format: 3-byte band stride [freq_hi, freq_lo, gain_i8], gain is whole dB; getEQInfos returns 5 bytes [bandCount, dbRange, caps, dbMin, dbMax]. V2 wire format: 1-byte header [direction_echo], then N × 5-byte band stride [freq_hi, freq_lo, filter_type, gain_hi, gain_lo], with 0..3 trailer bytes (opaque, ignored). The parser consumes 5-byte chunks until <5 bytes remain or a freq=0 sentinel is hit. Frequency u16 BE in Hz; gain u16 BE in **offset-binary**: raw 0..(steps-1) maps linearly to gain_min..gain_max (so on G522 with steps=241 / gain=[-6..6], raw=120 = 0 dB). getEQInfos returns 13 bytes with gain bounds + step count, format enum, XY-support flag, and onboard preset counts. (The protocol spec lists a 2-byte header [dir_echo, slot_echo] for getCustomEQ, but G522 firmware via the centurion bridge omits the slot_echo and emits a 1-byte header that matches getEQDefaults. Verified against pcap traces of LGHUB ↔ G522 LIGHTSPEED traffic.) """ from __future__ import annotations import logging import struct from .hidpp20_constants import SupportedFeature logger = logging.getLogger(__name__) DIRECTION_PLAYBACK = 0 DIRECTION_CAPTURE = 1 # V2 filter-type taxonomy (byte [+0] of each band). 0x16 is observed on # G522 for every band of its factory custom slot at ISO third-octave # centers, treated as peaking. Other filter kinds (LP, shelf, notch …) # need a live probe per device firmware to enumerate. FILTER_TYPE_HP = 0x00 FILTER_TYPE_PEAKING_G522 = 0x16 FILTER_TYPE_PEAKING = 0x78 FILTER_TYPE_NAMES = { FILTER_TYPE_HP: "HP", FILTER_TYPE_PEAKING_G522: "peaking", FILTER_TYPE_PEAKING: "peaking", } def _get_version(device) -> int: return device.features.get_feature_version(SupportedFeature.HEADSET_ADVANCED_PARA_EQ) or 0 def get_advanced_eq_info(device): """Query getEQInfos (function 0). Returns a dict or None. Common fields: version int feature version (0, 1, 2) gain_min_db int signed whole-dB min gain_max_db int signed whole-dB max step_db float dB per raw LSB (1.0 on V0/V1) V0/V1 only: band_count int number of bands (from wire byte 0) db_range int raw byte 1 capabilities int raw byte 2 V2 only: gain_steps int discrete gain positions format int 0=CLASSIC, 1=STYLES supports_xy bool onboard_ro_preset_count int factory preset slots onboard_custom_preset_count int user-writable preset slots """ version = _get_version(device) result = device.feature_request(SupportedFeature.HEADSET_ADVANCED_PARA_EQ, 0x00) if result is None: logger.debug("AdvancedParaEQ getEQInfos V%d: feature_request returned None", version) return None if version >= 2: if len(result) < 13: logger.debug("AdvancedParaEQ getEQInfos V2: short response len=%d %s", len(result), result.hex()) return None gain_min = struct.unpack("b", bytes([result[2]]))[0] gain_max = struct.unpack("b", bytes([result[3]]))[0] gain_steps = struct.unpack(">H", result[4:6])[0] fmt = result[6] supports_xy = bool(result[7]) ro_presets = result[9] custom_presets = result[10] step_db = (gain_max - gain_min) / max(1, gain_steps - 1) info = { "version": 2, "gain_min_db": gain_min, "gain_max_db": gain_max, "gain_steps": gain_steps, "step_db": step_db, "format": fmt, "supports_xy": supports_xy, "onboard_ro_preset_count": ro_presets, "onboard_custom_preset_count": custom_presets, } logger.debug( "AdvancedParaEQ getEQInfos V2: gain=[%d,%d] steps=%d step_db=%.4f format=%d xy=%s " "presets_ro=%d presets_custom=%d", gain_min, gain_max, gain_steps, step_db, fmt, supports_xy, ro_presets, custom_presets, ) device._advanced_eq_info = info return info # V0 / V1 if len(result) < 5: logger.debug("AdvancedParaEQ getEQInfos V%d: short response len=%d %s", version, len(result), result.hex()) return None band_count = result[0] db_range = result[1] caps = result[2] gain_min = struct.unpack("b", bytes([result[3]]))[0] gain_max = struct.unpack("b", bytes([result[4]]))[0] info = { "version": version, "band_count": band_count, "db_range": db_range, "capabilities": caps, "gain_min_db": gain_min, "gain_max_db": gain_max, "step_db": 1.0, } logger.debug( "AdvancedParaEQ getEQInfos V%d: bands=%d dbRange=%d caps=0x%02X gain=[%d,%d]", version, band_count, db_range, caps, gain_min, gain_max, ) device._advanced_eq_info = info return info def get_advanced_eq_active_slot(device, direction=DIRECTION_PLAYBACK): """Query getActiveEQ (function 3). Returns the active slot index, or None.""" result = device.feature_request(SupportedFeature.HEADSET_ADVANCED_PARA_EQ, 0x30, direction) if result is None: logger.debug("AdvancedParaEQ getActiveEQ(dir=%d): feature_request returned None", direction) return None if len(result) < 1: logger.debug("AdvancedParaEQ getActiveEQ(dir=%d): empty response", direction) return None logger.debug("AdvancedParaEQ getActiveEQ(dir=%d): slot=%d", direction, result[0]) return result[0] def parse_v2_bands(result: bytes, info: dict | None): """Parse a V2 getCustomEQ / getEQDefaults response. Wire layout (see module docstring): [direction_echo] (1-byte header) N × [freq_hi, freq_lo, filter_type, gain_hi, gain_lo] (5 bytes) [trailer …] (0..3 bytes, ignored) Gain is offset-binary against `info`'s gain bounds: gain_db = gain_min + (gain_max - gain_min) * raw / (steps - 1) `info` is the dict returned by `get_advanced_eq_info`. If absent we fall back to step_db=1.0 (and log via the caller, not here) which is wrong but won't crash. Returns list of (filter_type_byte, freq_hz, gain_db) tuples, or None if the payload is too short to contain a header. Empty payload with valid header returns []. Bands with freq=0 are treated as the end-of-bands sentinel (matches V0/V1 behavior at lines below). """ if result is None or len(result) < 1: return None payload = result[1:] # skip [dir_echo] band_size = 5 if info: gain_min = info.get("gain_min_db", -6) gain_max = info.get("gain_max_db", 6) steps = info.get("gain_steps", 241) else: gain_min, gain_max, steps = 0, 0, 1 # produces gain_db=0 for any raw bands = [] for i in range(len(payload) // band_size): e = payload[i * band_size : (i + 1) * band_size] freq_hz = (e[0] << 8) | e[1] filter_type = e[2] gain_raw = (e[3] << 8) | e[4] if freq_hz == 0: break # disabled band — end-of-bands sentinel if steps > 1: gain_db = gain_min + (gain_max - gain_min) * gain_raw / (steps - 1) else: gain_db = 0.0 bands.append((filter_type, freq_hz, float(gain_db))) return bands def _band_label(filter_type_byte: int, freq_hz: int) -> str: kind = FILTER_TYPE_NAMES.get(filter_type_byte, f"type-0x{filter_type_byte:02X}") if filter_type_byte == FILTER_TYPE_HP: return f"HP {freq_hz} Hz" return f"{freq_hz} Hz" if kind == "peaking" else f"{kind} {freq_hz} Hz" def get_advanced_eq_defaults(device, direction=DIRECTION_PLAYBACK, slot=0): """Query getEQDefaults (function 5). Same per-band layout as getCustomEQ. Returns list of (filter_type_byte, freq_hz, gain_db) tuples, or None. V0/V1 callers receive (FILTER_TYPE_PEAKING, freq_hz, gain_db) for compatibility with the V2 tuple shape. """ version = _get_version(device) result = device.feature_request(SupportedFeature.HEADSET_ADVANCED_PARA_EQ, 0x50, direction, slot) if result is None: logger.debug( "AdvancedParaEQ getEQDefaults V%d (dir=%d slot=%d): feature_request returned None", version, direction, slot, ) return None if version >= 2: info = getattr(device, "_advanced_eq_info", None) bands = parse_v2_bands(result, info) if bands is None: logger.debug( "AdvancedParaEQ getEQDefaults V2 (dir=%d slot=%d): payload too short raw=%s", direction, slot, result.hex(), ) return None logger.debug( "AdvancedParaEQ getEQDefaults V2 (dir=%d slot=%d): %d band(s) raw=%s %s", direction, slot, len(bands), result.hex(), [_band_label(t, f) + f" {round(g, 2)}dB" for t, f, g in bands], ) return bands # V0/V1 legacy 3-byte stride. bands = [] offset = 0 while offset + 3 <= len(result): freq = struct.unpack(">H", result[offset : offset + 2])[0] if freq == 0: break gain_db = struct.unpack("b", bytes([result[offset + 2]]))[0] bands.append((FILTER_TYPE_PEAKING, freq, float(gain_db))) offset += 3 logger.debug( "AdvancedParaEQ getEQDefaults V%d (dir=%d slot=%d): %d band(s)", version, direction, slot, len(bands), ) return bands def get_advanced_eq_friendly_name(device, direction=DIRECTION_PLAYBACK, slot=0): """Query getEQFriendlyName (function 6). Returns the UTF-8 preset name or None.""" result = device.feature_request(SupportedFeature.HEADSET_ADVANCED_PARA_EQ, 0x60, direction, slot) if result is None or len(result) < 1: return None name_len = result[0] if 1 + name_len > len(result): name_len = len(result) - 1 try: name = bytes(result[1 : 1 + name_len]).decode("utf-8", errors="replace") except Exception: name = result[1 : 1 + name_len].hex() return name def probe_advanced_eq_slots(device, direction=DIRECTION_PLAYBACK, info=None): """Probe every advertised EQ slot via getCustomEQ and cache which respond. Some firmware (G522) advertises N slots via getEQInfos but only honors a subset for getCustomEQ / setActiveEQ — the rest return 0x0B NOT_SUPPORTED. This iterates 0..total-1 and records which slots actually have data. Result is cached on `device._advanced_eq_working_slots` as a list of `(slot_index, name, bands)` tuples. The HeadsetActiveEQPreset selector builds its choices from this list; the HeadsetAdvancedEQ panel uses it to skip dead slots in its diagnostic output. Logs each working slot's bands at INFO and a summary line indicating how many of the advertised slots are actually accessible. """ cached = getattr(device, "_advanced_eq_working_slots", None) if cached is not None: return cached if info is None: info = getattr(device, "_advanced_eq_info", None) or get_advanced_eq_info(device) if not info: return [] ro_count = info.get("onboard_ro_preset_count", 0) or 0 custom_count = info.get("onboard_custom_preset_count", 0) or 0 total = ro_count + custom_count if total == 0: return [] def probe(slot): bands = get_advanced_eq_params(device, direction=direction, slot=slot) if bands is None: return None name = get_advanced_eq_friendly_name(device, direction=direction, slot=slot) kind = "factory" if slot < ro_count else "custom" logger.debug( "AdvancedParaEQ %s preset slot=%d (dir=%d) name=%r: %s", kind, slot, direction, name, [f"{_band_label(t, f)} {round(g, 2)}dB" for t, f, g in bands], ) return (slot, name, bands) working = [] # Slot 0 is canonical. If it fails the device is unusable; bail. entry = probe(0) if entry is None: device._advanced_eq_working_slots = working return working working.append(entry) # Slot 1 acts as a "multi-slot capable?" canary. G522 firmware # advertises 16 slots but only honors slot 0; LGHUB itself never # touches slots > 0 on this device. When the canary fails, skip the # remaining 14 NOT_SUPPORTED probes. if total > 1: entry = probe(1) if entry is None: logger.debug( "AdvancedParaEQ: slot 1 returned NOT_SUPPORTED; " "firmware advertises %d slots but only honors slot 0", total, ) device._advanced_eq_working_slots = working return working working.append(entry) for slot in range(2, total): entry = probe(slot) if entry is not None: working.append(entry) device._advanced_eq_working_slots = working logger.debug( "AdvancedParaEQ working slots on dir=%d: %d of %d advertised %s", direction, len(working), total, [w[0] for w in working], ) return working # Backward-compat alias kept until external callers are migrated. probe_all_presets = probe_advanced_eq_slots def get_advanced_eq_params(device, direction=DIRECTION_PLAYBACK, slot=0): """Query getCustomEQ (function 1). Returns list of (filter_type, freq_hz, gain_db) or None. V0/V1: filter_type is always FILTER_TYPE_PEAKING (synthesized), freq is raw Hz from wire, gain is whole dB. V2: filter_type comes from the wire (0x00=HP, 0x78=peaking), freq is raw Hz, gain is int16 × step_db. step_db for V2 is cached on the device by get_advanced_eq_info. """ version = _get_version(device) result = device.feature_request(SupportedFeature.HEADSET_ADVANCED_PARA_EQ, 0x10, direction, slot) if result is None: logger.debug( "AdvancedParaEQ getCustomEQ V%d (dir=%d slot=%d): feature_request returned None", version, direction, slot, ) return None if version >= 2: info = getattr(device, "_advanced_eq_info", None) if not info: logger.warning("AdvancedParaEQ getCustomEQ V2: no cached getEQInfos — gain values will be wrong") bands = parse_v2_bands(result, info) if bands is None: logger.debug( "AdvancedParaEQ getCustomEQ V2 (dir=%d slot=%d): payload too short raw=%s", direction, slot, result.hex(), ) return None step_db = info["step_db"] if info and "step_db" in info else 1.0 # Log raw=... too so we can compare wire shapes across firmware # variants and across get-fns (getCustomEQ vs getEQDefaults). logger.debug( "AdvancedParaEQ getCustomEQ V2 (dir=%d slot=%d): %d band(s) step_db=%.4f raw=%s %s", direction, slot, len(bands), step_db, result.hex(), [f"{_band_label(t, f)} {round(g, 2)}dB" for t, f, g in bands], ) return bands # V0 / V1 bands = [] offset = 0 while offset + 3 <= len(result): freq = struct.unpack(">H", result[offset : offset + 2])[0] if freq == 0: break gain_db = struct.unpack("b", bytes([result[offset + 2]]))[0] bands.append((FILTER_TYPE_PEAKING, freq, float(gain_db))) offset += 3 logger.debug( "AdvancedParaEQ getCustomEQ V%d (dir=%d slot=%d): parsed %d band(s) %s", version, direction, slot, len(bands), bands, ) return bands