From 3df2a30f30b31f459656af31bc5b0795063b9641 Mon Sep 17 00:00:00 2001 From: Ken Sanislo Date: Wed, 13 May 2026 15:02:09 -0700 Subject: [PATCH] Add RGB lighting persistence and software LED power management for G515 MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Software-managed LED persistence and power management for devices that expose RGBEffects (0x8071) — primarily G515 LIGHTSPEED TKL, but the same infrastructure works on any 0x8071 device that supports SW takeover. Core mechanism: RGBControl toggle drives a Set SWControl(mode=3, flags) handshake. While SW control is held, the host owns the LED pipeline: zone effects, per-key paint, idle/sleep transitions, and the NvConfig boot/exit animations. On release, the firmware resumes its onboard profile. Major pieces: - rgb_power.py — new module hosting the software RGB power manager: ACTIVE / DIMMING / IDLE / SLEEPING state machine driven by firmware onUserActivity events, smooth 5-second dim ramp (zone or per-key), idle Static color snap, software sleep timer, wake handler that re-pushes saved state. Includes the cleanup hook that runs on device close (and, optionally, fires the cap 0x0040 shutdown trigger). - RGBControl (settings_templates) — switch-style render via BooleanValidator (true_value=3 / false_value=0) plus a full _claim_sw_control / _release_sw_control pair: profile-management mode, SetSWControl, per-key flag reset, manager start, cleanup registration, and a post-claim repaint pass so the device immediately reflects Solaar's saved zone + per-key state. - RGBEffectSetting — zone-effect Setting subclass for 0x8071. Handles per-key/zone coexistence: per-key paint dominates only when zone is Static and the user has explicitly opted in via the lock icon; under animations or before opt-in, the zone wire push is the visible layer. - RGBIdleEffect / RGBIdleTimeout / RGBSleepTimeout — Solaar-managed idle behavior. Choice list: "No change" → Dim → Static (snap to color) → device-specific animations. Static idle substitutes the idle color for unset per-key cells via effective_zone_base_color's state-aware lookup. - RgbStartupAnimation / RgbShutdownAnimation — toggle-and-color rows for RGBEffects NvConfig caps 0x0001 and 0x0040, exposed only on devices that answer the probe. Shutdown trigger fires SetRgbPowerMode(0) at cleanup time so the firmware plays the configured animation on exit. - PerKeyLighting — per-key painter improvements: explicit-opt-in dominance over zone effects, BUSY retry, FrameEnd suppression on per-cell failure, single-shot prep sequence (SetEffectByIndex into the out-of-range slot) on mice with firmware effect cards. - device_quirks.py — small per-model quirks table keyed by device.modelId (stable across USB/BLE/wireless). Currently used to mark RGBEffects NvConfig color slots as inert on devices where the firmware accepts but ignores the bytes (G502 X PLUS startup colors). - config_panel.py — HeteroKeyControl gains a TOGGLE field kind that renders as a Gtk.Switch (used by the boot-effect rows). Visual gate greys out RGB settings whose prerequisites aren't met: rgb_zone_* / rgb_idle_* / rgb_sleep_timeout require LED Control = Solaar; per-key additionally requires every zone effect to be Static. Visual-only — doesn't touch the persister's user lock-icon state. - tests/test_rgb_power.py — coverage for the power manager state machine, dim ramp, idle effects, wake path, and per-key/zone coexistence. Closes pwr-Solaar/Solaar#3149. --- lib/logitech_receiver/device.py | 10 +- lib/logitech_receiver/device_quirks.py | 47 + lib/logitech_receiver/hidpp20.py | 87 +- lib/logitech_receiver/notifications.py | 9 + lib/logitech_receiver/rgb_power.py | 975 ++++++++++++++++++ lib/logitech_receiver/settings_templates.py | 910 +++++++++++++++- lib/logitech_receiver/settings_validator.py | 3 +- lib/solaar/ui/config_panel.py | 214 +++- lib/solaar/ui/perkey/control.py | 14 +- tests/logitech_receiver/fake_hidpp.py | 3 + tests/logitech_receiver/test_rgb_power.py | 679 ++++++++++++ .../test_setting_templates.py | 109 +- 12 files changed, 2981 insertions(+), 79 deletions(-) create mode 100644 lib/logitech_receiver/device_quirks.py create mode 100644 lib/logitech_receiver/rgb_power.py create mode 100644 tests/logitech_receiver/test_rgb_power.py diff --git a/lib/logitech_receiver/device.py b/lib/logitech_receiver/device.py index 8df492d6..1de933eb 100644 --- a/lib/logitech_receiver/device.py +++ b/lib/logitech_receiver/device.py @@ -940,12 +940,16 @@ class Device: pass def close(self): - handle, self.handle = self.handle, None - if self in Device.instances: - Device.instances.remove(self) + # Run device.cleanups before clearing self.handle — cleanup callbacks + # typically need to issue final feature_request() writes (e.g. release + # SW control, restore device-side state) and feature_request() relies + # on self.handle being set. if hasattr(self, "cleanups"): for cleanup in self.cleanups: cleanup(self) + handle, self.handle = self.handle, None + if self in Device.instances: + Device.instances.remove(self) return handle and self.low_level.close(handle) def __index__(self): diff --git a/lib/logitech_receiver/device_quirks.py b/lib/logitech_receiver/device_quirks.py new file mode 100644 index 00000000..978e0505 --- /dev/null +++ b/lib/logitech_receiver/device_quirks.py @@ -0,0 +1,47 @@ +"""Per-device-model quirks. + +Keyed by ``device.modelId``, which Logitech composes by concatenating every +transport-specific PID (btid + btleid + wpid + usbid) for a single physical +model. That makes one entry cover the same device regardless of how it is +currently connected — no transport-aliasing gotchas. + +Quirks are hand-curated. Devices do not self-report behaviors like "this +firmware ignores the bytes I wrote", so each entry is observation-derived. +Keep entries narrow and document the observation in a comment. +""" + +from __future__ import annotations + +# Quirk keys are named after the doc-canonical feature/function path so a +# grep for the HID++ feature name (e.g. "RGBEffects", "NvConfig") lands here. +# +# Default-allow: each quirk lists what is KNOWN to be broken or ignored on +# that device model. Unlisted models / unlisted entries get the full UI. +# +# rgb_effects_nvconfig_colors_inert +# Feature 0x8071 RGBEffects, Function 3 NvConfig — cap-id → set of +# color slot names ({"color1", "color2"}) whose bytes the firmware +# accepts but does not visibly apply. UI hides color pickers for +# slots in the set. +QUIRKS: dict[str, dict[str, object]] = { + # G502 X PLUS — RGBEffects NvConfig startup effect (cap 0x0001): color + # bytes are entirely ignored; only the enabled flag is honored. Shutdown + # cap (0x0040) is not supported and is suppressed via build()-time probe. + "4099C0950000": { + "rgb_effects_nvconfig_colors_inert": { + 0x0001: {"color1", "color2"}, + }, + }, +} + + +def get(device, key, default=None): + """Look up a quirk by key for the device's model. + + Returns ``default`` when either the model has no quirks entry or the + entry lacks the requested key. + """ + model_id = getattr(device, "modelId", None) + if not model_id: + return default + return QUIRKS.get(model_id, {}).get(key, default) diff --git a/lib/logitech_receiver/hidpp20.py b/lib/logitech_receiver/hidpp20.py index 992b3433..3c9d467e 100644 --- a/lib/logitech_receiver/hidpp20.py +++ b/lib/logitech_receiver/hidpp20.py @@ -1128,22 +1128,37 @@ class LEDParam: ramp = "ramp" form = "form" saturation = "saturation" + direction = "direction" -class LedRampChoice(IntEnum): - DEFAULT = 0 - YES = 1 - NO = 2 +# NamedInts (not IntEnum) so the GTK ComboBoxText shows readable labels. +LedRampChoice = common.NamedInts(Default=0, Yes=1, No=2) +LedFormChoices = common.NamedInts( + Default=0, + Sine=1, + Square=2, + Triangle=3, + Sawtooth=4, + Shark_fin=5, + Exponential=6, +) -class LedFormChoices(IntEnum): - DEFAULT = 0 - SINE = 1 - SQUARE = 2 - TRIANGLE = 3 - SAWTOOTH = 4 - SHARKFIN = 5 - EXPONENTIAL = 6 +LedDirectionChoices = common.NamedInts() +LedDirectionChoices[0] = _("Cycle") +LedDirectionChoices[1] = _("Right") +LedDirectionChoices[2] = _("Down") +LedDirectionChoices[3] = _("Center Out") +LedDirectionChoices[4] = _("In") +LedDirectionChoices[5] = _("Out") +LedDirectionChoices[6] = _("Left") +LedDirectionChoices[7] = _("Up") +LedDirectionChoices[8] = _("Center In") + +# Direction values to hide on devices whose LED grid can't render them. +LedDirectionBlocklist = { + "40B4": {4, 5}, # G515 LS TKL — no edge-radiating wave geometry +} LEDParamSize = { @@ -1154,26 +1169,62 @@ LEDParamSize = { LEDParam.ramp: 1, LEDParam.form: 1, LEDParam.saturation: 1, + LEDParam.direction: 1, } -# not implemented from x8070 Wave=4, Stars=5, Press=6, Audio=7 -# not implemented from x8071 Custom=12, Kitt=13, HSVPulsing=20, -# WaveC=22, RippleC=23, SignatureActive=24, SignaturePassive=25 +# Entry: [NamedInt, params, defaults, ranges] — trailing dicts optional. +# ranges overrides a field's global min/max, e.g. period: (2, 200). LEDEffects = { 0x00: [NamedInt(0x00, _("Disabled")), {}], 0x01: [NamedInt(0x01, _("Static")), {LEDParam.color: 0, LEDParam.ramp: 3}], 0x02: [NamedInt(0x02, _("Pulse")), {LEDParam.color: 0, LEDParam.speed: 3}], - 0x03: [NamedInt(0x03, _("Cycle")), {LEDParam.period: 5, LEDParam.intensity: 7}], + 0x03: [ + NamedInt(0x03, _("Cycle")), + {LEDParam.period: 5, LEDParam.intensity: 7}, + {LEDParam.period: 5000, LEDParam.intensity: 100}, + ], + # No probe device enumerates base Wave; assume the 0x16 layout so the + # UI matches what 0x16-capable hardware shows. + 0x04: [ + NamedInt(0x04, _("Wave")), + {LEDParam.period: 6, LEDParam.direction: 9}, + {LEDParam.period: 5000}, + ], 0x08: [NamedInt(0x08, _("Boot")), {}], 0x09: [NamedInt(0x09, _("Demo")), {}], 0x0A: [ NamedInt(0x0A, _("Breathe")), {LEDParam.color: 0, LEDParam.period: 3, LEDParam.form: 5, LEDParam.intensity: 6}, + {LEDParam.period: 5000, LEDParam.intensity: 100}, + ], + 0x0B: [ + NamedInt(0x0B, _("Ripple")), + {LEDParam.color: 0, LEDParam.period: 4}, + {LEDParam.period: 20}, + {LEDParam.period: (2, 200)}, ], - 0x0B: [NamedInt(0x0B, _("Ripple")), {LEDParam.color: 0, LEDParam.period: 4}], 0x0E: [NamedInt(0x0E, _("Decomposition")), {LEDParam.period: 6, LEDParam.intensity: 8}], 0x0F: [NamedInt(0x0F, _("Signature1")), {LEDParam.period: 5, LEDParam.intensity: 7}], 0x10: [NamedInt(0x10, _("Signature2")), {LEDParam.period: 5, LEDParam.intensity: 7}], - 0x15: [NamedInt(0x15, _("CycleS")), {LEDParam.saturation: 1, LEDParam.period: 6, LEDParam.intensity: 8}], + 0x15: [ + NamedInt(0x15, _("Cycle")), + {LEDParam.saturation: 1, LEDParam.period: 6, LEDParam.intensity: 8}, + {LEDParam.saturation: 255, LEDParam.period: 5000, LEDParam.intensity: 100}, + ], + 0x16: [ + NamedInt(0x16, _("Wave")), + {LEDParam.saturation: 1, LEDParam.period: 6, LEDParam.intensity: 8, LEDParam.direction: 9}, + {LEDParam.saturation: 255, LEDParam.period: 5000, LEDParam.intensity: 100}, + ], + # Saturation derivative of Ripple 0x0B; pcap layout: color @ 0-2, + # saturation @ 3, period @ 6-7. + 0x17: [ + NamedInt(0x17, _("Ripple")), + {LEDParam.color: 0, LEDParam.saturation: 3, LEDParam.period: 6}, + {LEDParam.saturation: 255, LEDParam.period: 20}, + {LEDParam.period: (2, 200)}, + ], + # Synthetic — host-side dim ramp, no wire effect. + 0x80: [NamedInt(0x80, _("Dim")), {LEDParam.intensity: 0}], } diff --git a/lib/logitech_receiver/notifications.py b/lib/logitech_receiver/notifications.py index 21ede6c5..d2323a26 100644 --- a/lib/logitech_receiver/notifications.py +++ b/lib/logitech_receiver/notifications.py @@ -34,6 +34,7 @@ from . import diversion from . import hidpp10 from . import hidpp10_constants from . import hidpp20 +from . import rgb_power from . import settings_templates from .common import Alert from .common import BatteryStatus @@ -427,6 +428,14 @@ def _process_feature_notification(device: Device, notification: HIDPPNotificatio brightness = struct.unpack("!H", device.feature_request(SupportedFeature.BRIGHTNESS_CONTROL, 0x10)[:2])[0] device.setting_callback(device, settings_templates.BrightnessControl, [brightness]) + elif feature == SupportedFeature.RGB_EFFECTS: + fn = notification.address >> 4 + if fn == 1: # onUserActivity: type=0 is IDLE, type!=0 is ACTIVE + activity_type = notification.data[0] if notification.data else 0xFF + rgb_power.on_user_activity(device, activity_type) + if logger.isEnabledFor(logging.DEBUG): + logger.debug("%s: RGB_EFFECTS notification addr=%02x: %s", device, notification.address, notification) + diversion.process_notification(device, notification, feature) return True diff --git a/lib/logitech_receiver/rgb_power.py b/lib/logitech_receiver/rgb_power.py new file mode 100644 index 00000000..2aece87c --- /dev/null +++ b/lib/logitech_receiver/rgb_power.py @@ -0,0 +1,975 @@ +## Copyright (C) 2012-2013 Daniel Pavel +## Copyright (C) 2014-2024 Solaar Contributors https://pwr-solaar.github.io/Solaar/ +## +## This program is free software; you can redistribute it and/or modify +## it under the terms of the GNU General Public License as published by +## the Free Software Foundation; either version 2 of the License, or +## (at your option) any later version. +## +## This program is distributed in the hope that it will be useful, +## but WITHOUT ANY WARRANTY; without even the implied warranty of +## MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +## GNU General Public License for more details. +## +## You should have received a copy of the GNU General Public License along +## with this program; if not, write to the Free Software Foundation, Inc., +## 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. + +"""Software-driven RGB power management for devices that hand off LED +control to the host (RGB_EFFECTS / 0x8071). + +Handles the firmware onUserActivity events, the two-stage idle effect +(smooth dim ramp or animation), and the software sleep timer that fires +after idle_timeout has elapsed. +""" + +from __future__ import annotations + +import logging + +from time import sleep + +from . import exceptions +from . import hidpp20_constants +from . import settings +from . import special_keys +from .hidpp20_constants import SupportedFeature + +logger = logging.getLogger(__name__) + +try: + from gi.repository import GLib + + _has_glib = True +except ImportError: + _has_glib = False + + +# SetSWControl flag bits for RGB_EFFECTS (0x8071). +FLAG_EFFECT = 0x01 +FLAG_POWER = 0x02 +FLAG_NVCONFIG = 0x04 + +# SetSWControl payloads: [subfn=set, mode=3, flags] +SW_ACTIVE = bytes([0x01, 0x03, FLAG_NVCONFIG]) # firmware monitors idle +SW_IDLE = bytes([0x01, 0x03, FLAG_POWER]) # firmware monitors activity +SW_RELEASE = bytes([0x01, 0x00, 0x00]) + + +_managers = {} # keyed by id(device) + + +def get_manager(device): + """Return the active RGBPowerManager for `device`, or None.""" + return _managers.get(id(device)) + + +def on_user_activity(device, activity_type): + """Dispatch firmware onUserActivity events to the device's power manager.""" + mgr = _managers.get(id(device)) + if mgr: + mgr.on_user_activity(activity_type) + + +def translate_color_for_display(color, state, dim_pct, dim_step, dim_steps): + """Map a saved (undimmed) color to the display color for `state`. + Returns None for SLEEPING.""" + if state == RGBPowerManager.ACTIVE: + return color + if state == RGBPowerManager.SLEEPING: + return None + target = RGBPowerManager._compute_dim_color(color, dim_pct) + if state == RGBPowerManager.IDLE: + return target + # DIMMING — interpolate from saved toward dimmed target by ramp progress. + t = (dim_step / dim_steps) if dim_steps else 1.0 + return RGBPowerManager._interpolate_color(color, target, t) + + +def translate_for_device(device, color): + """Translate `color` through the device's RGBPowerManager state, or + return it unchanged when no manager is registered. None signals SLEEPING.""" + mgr = _managers.get(id(device)) + if mgr is None: + return color + return mgr.translate_color(color) + + +_EFFECT_STATIC = 0x01 + + +def perkey_has_paint(device): + """Return ``(perkey_setting, has_paint)``. has_paint is True when the + per-key buffer has at least one real color, a usable zone set, and the + user has *explicitly enabled* per-key via the lock icon (sensitivity + is True). Default-sensitivity (False) and ignore both yield False so + zone effects remain the primary mechanism on devices where the user + hasn't opted in to per-key dominance.""" + perkey = None + for s in getattr(device, "settings", []) or []: + if s.name == "per-key-lighting": + perkey = s + break + if perkey is None: + return None, False + validator = getattr(perkey, "_validator", None) + choices = getattr(validator, "choices", None) + if not choices: + return perkey, False + # Apply path runs rgb_zone_ before per-key, so _value may still be None + # when this gate is consulted — fall back to the persister. + value = getattr(perkey, "_value", None) + persister = getattr(device, "persister", None) + if value is None and persister is not None: + value = persister.get("per-key-lighting") + if not value: + return perkey, False + no_change = special_keys.COLORSPLUS["No change"] + if not any(c != no_change and isinstance(c, int) and c >= 0 for c in value.values()): + return perkey, False + if persister is None or persister.get_sensitivity("per-key-lighting") is not True: + return perkey, False + return perkey, True + + +def zone_effect_is_static(device): + """True when the persisted zone effect is Static, or when no + rgb_zone_* setting exists at all (per-key-only hardware).""" + has_zone = False + persister = getattr(device, "persister", None) + for s in getattr(device, "settings", []) or []: + if s.name.startswith("rgb_zone_"): + has_zone = True + value = getattr(s, "_value", None) + if value is None and persister is not None: + value = persister.get(s.name) + if value is not None and int(getattr(value, "ID", 0) or 0) == _EFFECT_STATIC: + return True + return not has_zone + + +def zone_effect_is_ignored(device): + """True when every rgb_zone_* setting on `device` is marked + SENSITIVITY_IGNORE in the persister.""" + persister = getattr(device, "persister", None) + if persister is None: + return False + zones = [s for s in getattr(device, "settings", []) or [] if s.name.startswith("rgb_zone_")] + if not zones: + return False + return all(persister.get_sensitivity(s.name) == settings.SENSITIVITY_IGNORE for s in zones) + + +def effective_zone_base_color(device): + """Color to use for per-key unset cells: 0 (off/black) when the zone + effect is ignored (or unavailable), the persisted zone color otherwise. + Reads through the persister so we still get the saved color even before + apply has populated _value. + + During an idle-Static transition the saved color is substituted with + the idle effect's color so unset cells track the idle primary. Reverts + on wake when state returns to ACTIVE.""" + if zone_effect_is_ignored(device): + return 0 + mgr = _managers.get(id(device)) + if mgr is not None and mgr._state == RGBPowerManager.IDLE and mgr._idle_effect_id() == 0x01: + return int(getattr(mgr._idle_effect, "color", 0) or 0) + persister = getattr(device, "persister", None) + for s in getattr(device, "settings", []) or []: + if not s.name.startswith("rgb_zone_"): + continue + value = getattr(s, "_value", None) + if value is None and persister is not None: + value = persister.get(s.name) + if value is not None: + color = getattr(value, "color", None) + if isinstance(color, int): + return int(color) + return 0 + + +_RETRY_BUSY_BACKOFF_MS = (30, 60, 90) + + +def feature_request_acked(device, feature, function, data=b"", retries=3): + """feature_request with BUSY/timeout retries. Returns reply bytes + on ACK, None on hard failure (logged WARNING).""" + busy_attempt = 0 + max_busy = len(_RETRY_BUSY_BACKOFF_MS) + for attempt in range(retries + 1): + try: + reply = device.feature_request(feature, function, data) + except exceptions.FeatureCallError as e: + if getattr(e, "error", None) == hidpp20_constants.ErrorCode.BUSY and busy_attempt < max_busy: + delay_ms = _RETRY_BUSY_BACKOFF_MS[busy_attempt] + busy_attempt += 1 + if logger.isEnabledFor(logging.DEBUG): + logger.debug( + "%s: feature 0x%04x fn 0x%02x BUSY, retry %d/%d after %dms", + device, + int(feature), + function, + busy_attempt, + max_busy, + delay_ms, + ) + sleep(delay_ms / 1000.0) + continue + logger.warning("%s: feature 0x%04x fn 0x%02x rejected: %s", device, int(feature), function, e) + return None + if reply is not None: + if (attempt > 0 or busy_attempt > 0) and logger.isEnabledFor(logging.DEBUG): + logger.debug( + "%s: feature 0x%04x fn 0x%02x succeeded after %d timeout retries, %d BUSY retries", + device, + int(feature), + function, + attempt, + busy_attempt, + ) + return reply + if logger.isEnabledFor(logging.DEBUG): + logger.debug( + "%s: feature 0x%04x fn 0x%02x timed out (attempt %d/%d)", + device, + int(feature), + function, + attempt + 1, + retries + 1, + ) + logger.warning("%s: feature 0x%04x fn 0x%02x no ACK after %d attempts", device, int(feature), function, retries + 1) + return None + + +def _probe_tmpl_bytes(device): + """GetEffectSpecificInfo page 1: returns (tmpl_0, tmpl_1) or + (None, None) if the device has no firmware effect cards.""" + try: + reply = device.feature_request(SupportedFeature.RGB_EFFECTS, 0x00, 0xFF, 0x00, 0x01, 0x00, 0x01) + except exceptions.FeatureCallError: + return (None, None) + if reply is None or len(reply) < 12: + return (None, None) + return (reply[10], reply[11]) + + +def push_artanis_perkey_prep(device): + """Disable the firmware effects engine on mice with firmware effect cards. + Returns True if the call ACKed.""" + infos = getattr(device, "led_effects", None) + if not infos or not infos.zones: + return False + num_effects = len(infos.zones[0].effects) + # SetEffectByIndex: cluster + effectIdx + 10 param bytes + persist. + # Shipping with call 2 only — sufficient on tested hardware (G502 X PLUS). + # Call 1 (TMPL-handshake) left commented for reactivation if broader + # testing turns up a device that needs it; uncomment the _probe_tmpl_bytes + # use and the call1 block together. + # tmpl_0, tmpl_1 = _probe_tmpl_bytes(device) + # if tmpl_0 is None: + # return False + # call1 = b"\xff\x02" + b"\x00" * 6 + bytes([tmpl_0, tmpl_1]) + b"\x00\x00" + b"\x01" + # if feature_request_acked(device, SupportedFeature.RGB_EFFECTS, 0x10, call1) is None: + # return False + call2 = b"\xff" + bytes([num_effects]) + b"\x00" * 10 + b"\x01" + return feature_request_acked(device, SupportedFeature.RGB_EFFECTS, 0x10, call2) is not None + + +def start(device): + """Begin software RGB power management for `device`. No-op without GLib.""" + if not _has_glib: + return + key = id(device) + if key not in _managers: + mgr = RGBPowerManager(device) + _managers[key] = mgr + mgr.start() + else: + mgr = _managers[key] + mgr.reset() + # Push persisted settings into the manager. Settings marked ignore via the + # lock icon are skipped so the manager keeps its built-in default. + from . import hidpp20 + + persister = getattr(device, "persister", None) + + def _ignored(name): + return persister is not None and persister.get_sensitivity(name) == settings.SENSITIVITY_IGNORE + + for s in device.settings: + if _ignored(s.name): + continue + if s.name == "rgb_idle_timeout": + val = s._value if s._value is not None else 60 + mgr.set_idle_timeout(int(val)) + elif s.name == "rgb_sleep_timeout": + val = s._value if s._value is not None else 300 + mgr.set_sleep_timeout(int(val)) + elif s.name == "rgb_idle_effect": + val = s._value if s._value is not None else hidpp20.LEDEffectSetting(ID=0x80, intensity=50) + mgr.set_idle_effect(val) + + +def stop(device): + """End software RGB power management for `device`.""" + key = id(device) + mgr = _managers.pop(key, None) + if mgr: + mgr.stop() + + +def cleanup(device): + """device.cleanups handler — restore firmware control on device close. + + On devices that support NvConfig cap 0x0040 (shutdown effect), also fires + SetRgbPowerMode(0) as the final step so the firmware plays the configured + shutdown animation during the active→off transition. If the cap is + disabled, the firmware powers down LEDs silently. Matches LGHUB exit. + See solaar_shutdown_effect_trigger_spec.md. + """ + stop(device) + try: + device.feature_request(SupportedFeature.RGB_EFFECTS, 0x50, SW_RELEASE) + if device.features and SupportedFeature.PROFILE_MANAGEMENT in device.features: + device.feature_request(SupportedFeature.PROFILE_MANAGEMENT, 0x60, b"\x03") + elif device.features and SupportedFeature.ONBOARD_PROFILES in device.features: + device.feature_request(SupportedFeature.ONBOARD_PROFILES, 0x10, b"\x01") + except Exception: + pass # Device may already be offline + if getattr(device, "_rgb_has_shutdown_cap", False): + try: + # SetRgbPowerMode(set=1, mode=0) — firmware off transition. + # no_reply: device goes offline; don't block waiting for an ACK. + device.feature_request(SupportedFeature.RGB_EFFECTS, 0x90, b"\x01\x00", no_reply=True) + except Exception: + pass + + +class RGBPowerManager: + """Two-stage idle handler driven by firmware onUserActivity events. + + State machine: ACTIVE → DIMMING → IDLE → SLEEPING. + Stage 1 (idle) runs a host-side dim ramp or hands off to a firmware + animation. Stage 2 (sleep) is a software timer that fires + sleep_timeout - idle_timeout after IDLE. + """ + + ACTIVE = 0 + DIMMING = 1 + IDLE = 2 + SLEEPING = 3 + + _DIM_INTERVAL_MS = 200 + _DIM_STEPS = 25 # ~5s dim ramp + + def __init__(self, device): + self._device = device + self._state = self.ACTIVE + self._idle_timeout = 60 + self._sleep_timeout = 300 + # LEDEffectSetting with ID in {0x00 Disabled, 0x80 Dim, 0x0A + # Breathe, 0x0B Ripple}. Populated by start() from the persister. + self._idle_effect = None + self._sleep_timer_id = None + self._dim_timer_id = None + self._dim_step = 0 + self._dim_zones = [] + self._dim_perkey = None + + def start(self): + self._state = self.ACTIVE + self._read_firmware_timers() + if logger.isEnabledFor(logging.DEBUG): + logger.debug( + "%s: RGB power manager started (firmware idle=%ds, sleep=%ds)", + self._device, + self._idle_timeout, + self._sleep_timeout, + ) + + def stop(self): + self._cancel_dim_timer() + self._cancel_sleep_timer() + if self._state != self.ACTIVE: + try: + self._wake() + except Exception: + pass # Best effort during shutdown + if logger.isEnabledFor(logging.DEBUG): + logger.debug("%s: RGB power manager stopped", self._device) + + def reset(self): + """Reset to ACTIVE on device reconnect. Re-reads firmware timers so + externally-updated values (other tool wrote NV between sessions) + are picked up even when our settings are ignored.""" + self._cancel_dim_timer() + self._cancel_sleep_timer() + self._state = self.ACTIVE + self._read_firmware_timers() + if logger.isEnabledFor(logging.DEBUG): + logger.debug("%s: RGB power manager reset to ACTIVE", self._device) + + def set_idle_timeout(self, seconds): + self._idle_timeout = seconds + self._cancel_sleep_timer() + if seconds == 0 and self._state in (self.DIMMING, self.IDLE): + self._wake() + self._write_firmware_idle_timeout(seconds) + if logger.isEnabledFor(logging.DEBUG): + logger.debug("%s: RGB idle timeout set to %ss", self._device, seconds) + + def set_sleep_timeout(self, seconds): + """0 disables sleep.""" + self._sleep_timeout = seconds + self._cancel_sleep_timer() + if seconds == 0 and self._state == self.SLEEPING: + self._wake() + if logger.isEnabledFor(logging.DEBUG): + logger.debug("%s: RGB sleep timeout set to %ss", self._device, seconds) + + def set_idle_effect(self, effect): + """`effect` is an LEDEffectSetting. Wake immediately if the user + switched to Disabled while we're mid-idle.""" + self._idle_effect = effect + if self._idle_effect_id() == 0x00 and self._state in (self.DIMMING, self.IDLE): + self._wake() + if logger.isEnabledFor(logging.DEBUG): + logger.debug( + "%s: RGB idle effect set to ID=0x%02X (period=%s, intensity=%s)", + self._device, + self._idle_effect_id(), + getattr(self._idle_effect, "period", None), + getattr(self._idle_effect, "intensity", None), + ) + + def _idle_effect_id(self): + """Return the ID of the current idle effect, or 0 if unset.""" + return int(getattr(self._idle_effect, "ID", 0) or 0) + + # --- Firmware activity events --- + + def on_user_activity(self, activity_type): + """Handle firmware onUserActivity event from RGB_EFFECTS (0x8071). + + activity_type=0: IDLE — user stopped typing, firmware idle timer expired. + activity_type!=0: ACTIVE — user resumed typing after being idle. + + The firmware sends a burst of ~8 events with exponential backoff. + Only the first event matters; subsequent events for the same transition are ignored. + """ + if not self._device.online: + return + + if activity_type == 0: + # IDLE event — firmware detected inactivity at idle_timeout + if self._state != self.ACTIVE: + return # Already idle/dimming/sleeping, ignore burst + if logger.isEnabledFor(logging.DEBUG): + logger.debug("%s: firmware IDLE event — starting idle sequence", self._device) + # Switch to flags=3 so firmware monitors for activity during dim/idle + try: + self._device.feature_request(SupportedFeature.RGB_EFFECTS, 0x50, SW_IDLE) + except Exception: + pass + idle_enabled = self._idle_effect_id() != 0 and self._idle_timeout > 0 and not self._is_ignored("rgb_idle_effect") + if idle_enabled: + self._start_idle_effect() + else: + self._state = self.IDLE + # Sleep is host-driven only — schedule whenever _sleep_timeout > 0, + # regardless of the setting's ignore flag (which only blocks pushing + # the user value to firmware, see start()). + sleep_enabled = self._sleep_timeout > 0 + if sleep_enabled: + delay = max(self._sleep_timeout - self._idle_timeout, 0) + if delay == 0: + self._start_sleep() + else: + self._sleep_timer_id = GLib.timeout_add_seconds(delay, self._sleep_timer_fired) + if logger.isEnabledFor(logging.DEBUG): + logger.debug("%s: sleep timer scheduled in %ds", self._device, delay) + else: + # ACTIVE event — user resumed typing + if self._state == self.ACTIVE: + return # Already active, ignore burst + if logger.isEnabledFor(logging.DEBUG): + logger.debug("%s: firmware ACTIVE event — waking", self._device) + self._cancel_sleep_timer() + self._wake() + + def _sleep_timer_fired(self): + """GLib callback — software sleep timer expired after IDLE.""" + self._sleep_timer_id = None + if self._state in (self.IDLE, self.DIMMING) and self._device.online: + self._start_sleep() + return False # One-shot timer + + def _cancel_sleep_timer(self): + if self._sleep_timer_id is not None: + GLib.source_remove(self._sleep_timer_id) + self._sleep_timer_id = None + + def _read_firmware_timers(self): + """Read idle/sleep timeouts from firmware as the manager's defaults.""" + try: + resp = self._device.feature_request(SupportedFeature.RGB_EFFECTS, 0x70, b"\x00") + if resp and len(resp) >= 7: + idle_s = (resp[3] << 8) | resp[4] + sleep_s = (resp[5] << 8) | resp[6] + if idle_s > 0: + self._idle_timeout = idle_s + if sleep_s > 0: + self._sleep_timeout = sleep_s + if logger.isEnabledFor(logging.DEBUG): + logger.debug( + "%s: firmware timers: idle=%ds, sleep=%ds", + self._device, + idle_s, + sleep_s, + ) + except Exception as e: + if logger.isEnabledFor(logging.DEBUG): + logger.debug("%s: could not read firmware timers, using defaults: %s", self._device, e) + + def _write_firmware_idle_timeout(self, seconds): + """Push idle/sleep timeouts back to firmware so it fires IDLE on time.""" + try: + idle_hi = (seconds >> 8) & 0xFF + idle_lo = seconds & 0xFF + sleep_hi = (self._sleep_timeout >> 8) & 0xFF + sleep_lo = self._sleep_timeout & 0xFF + payload = bytes([0x01, 0x00, 0x00, idle_hi, idle_lo, sleep_hi, sleep_lo]) + self._device.feature_request(SupportedFeature.RGB_EFFECTS, 0x70, payload) + except Exception as e: + if logger.isEnabledFor(logging.DEBUG): + logger.debug("%s: could not write firmware idle timeout: %s", self._device, e) + + # --- Idle effect --- + + def _start_idle_effect(self): + idle_id = self._idle_effect_id() + if idle_id == 0x80: # Dim + dim_pct = int(getattr(self._idle_effect, "intensity", 50) or 50) + self._start_dim_ramp(dim_pct) + elif idle_id == 0x01: # Static — snap to idle color + self._start_static_idle() + elif idle_id != 0x00: + self._apply_animation(idle_id) + + def _start_static_idle(self): + """Snap to the idle effect's color exactly as if the user had set + the active Static zone color to it. Per-key paint continues to + display; unset cells repaint to the idle primary color via + effective_zone_base_color's IDLE-state substitution. No animation + — instant transition. Wake reverts via _restore_colors().""" + idle_color = int(getattr(self._idle_effect, "color", 0) or 0) + infos = getattr(self._device, "led_effects", None) + if not infos or not infos.zones: + self._state = self.IDLE + return + self._state = self.IDLE + perkey_setting, has_paint = perkey_has_paint(self._device) + perkey_dominates = has_paint and zone_effect_is_static(self._device) + if perkey_dominates and perkey_setting is not None: + # Per-key is the visible layer — repaint unset cells with the + # idle color (effective_zone_base_color now returns it because + # state == IDLE and idle effect ID == Static). + try: + if perkey_setting._fill_unset_zones_with_base_color(): + perkey_setting._send_with_retry(0x70, b"\x00") # FrameEnd + except Exception as e: + if logger.isEnabledFor(logging.WARNING): + logger.warning("%s: static idle per-key repaint failed: %s", self._device, e) + return + # Zone is the visible layer — push Static at idle.color to each zone. + for zone in infos.zones: + if 0x01 in (e.ID for e in zone.effects): + try: + self._push_static_effect(zone, idle_color) + except Exception as e: + if logger.isEnabledFor(logging.WARNING): + logger.warning("%s: static idle zone push failed: %s", self._device, e) + + def _start_dim_ramp(self, dim_pct): + """Smooth ~5s dim ramp. Dims the per-key buffer when it's the visible + layer (any real per-key paint), otherwise the zone effect.""" + infos = getattr(self._device, "led_effects", None) + if not infos or not infos.zones: + self._state = self.IDLE + return + + perkey_setting, has_paint = perkey_has_paint(self._device) + # Per-key only dominates when zone is Static. Under animations, the + # firmware engine owns the visible layer — dim the zone instead. + perkey_active = has_paint and zone_effect_is_static(self._device) + + self._dim_perkey = None + if perkey_active: + self._dim_zones = [] + self._dim_perkey = self._build_full_perkey_dim_map(perkey_setting, dim_pct) + if self._dim_perkey: + # Push base color to unset cells first so they don't start from stale. + self._init_unset_perkey_zones(perkey_setting) + else: + self._dim_zones = [] + for zone in infos.zones: + if 0x01 in (e.ID for e in zone.effects): + start_color = self._get_zone_color(zone) + target_color = self._compute_dim_color(start_color, dim_pct) + self._dim_zones.append((zone, start_color, target_color)) + + if not self._dim_zones and not self._dim_perkey: + self._state = self.IDLE + return + self._dim_step = 0 + self._state = self.DIMMING + self._dim_timer_id = GLib.timeout_add(self._DIM_INTERVAL_MS, self._dim_ramp_step) + if logger.isEnabledFor(logging.DEBUG): + n_zones = len(self._dim_zones) + n_perkey = len(self._dim_perkey) if self._dim_perkey else 0 + logger.debug( + "%s: starting dim ramp to %d%% brightness (%d zones, %d per-key%s)", + self._device, + dim_pct, + n_zones, + n_perkey, + ", per-key masking zones" if perkey_active else "", + ) + + def _dim_ramp_step(self): + if self._state != self.DIMMING or not self._device.online: + self._dim_timer_id = None + return False + self._dim_step += 1 + t = self._dim_step / self._DIM_STEPS + for zone, start_color, target_color in self._dim_zones: + try: + self._push_static_effect(zone, self._interpolate_color(start_color, target_color, t)) + except Exception as e: + if logger.isEnabledFor(logging.WARNING): + logger.warning("%s: dim ramp step failed for zone %s: %s", self._device, zone.index, e) + if self._dim_perkey: + try: + self._push_perkey_dimmed(t) + except Exception as e: + if logger.isEnabledFor(logging.WARNING): + logger.warning("%s: dim ramp step failed for per-key: %s", self._device, e) + if self._dim_step >= self._DIM_STEPS: + self._state = self.IDLE + self._dim_timer_id = None + return False + return True + + def _push_static_effect(self, zone, color): + """Non-persistent Static effect, one zone.""" + static_effect = next((e for e in zone.effects if e.ID == 0x01), None) + if static_effect is None: + return + r = (color >> 16) & 0xFF + g = (color >> 8) & 0xFF + b = color & 0xFF + params = bytes([r, g, b, 0, 0, 0, 0, 0, 0, 0]) + payload = bytes([zone.index, static_effect.index]) + params + b"\x01" + self._device.feature_request(SupportedFeature.RGB_EFFECTS, 0x10, payload) + + def _push_perkey_dimmed(self, t): + """Push interpolated per-key colors for one dim ramp step. + + Groups keys by their interpolated color and uses SetRgbZonesSingleValue + (0x8081 function 6) for efficient bulk writes — up to 13 zone IDs per + HID message when multiple keys share the same dimmed color. + """ + # Build color -> [zone_ids] map for this interpolation step + color_groups = {} + for zone_id, (start_color, target_color) in self._dim_perkey.items(): + color = self._interpolate_color(start_color, target_color, t) + if color not in color_groups: + color_groups[color] = [] + color_groups[color].append(zone_id) + + feat = SupportedFeature.PER_KEY_LIGHTING_V2 + for color, zone_ids in color_groups.items(): + r = (color >> 16) & 0xFF + g = (color >> 8) & 0xFF + b = color & 0xFF + # Function 6: SetRgbZonesSingleValue — color(3) + zone_ids (up to 13 per report) + while zone_ids: + batch = zone_ids[:13] + zone_ids = zone_ids[13:] + data = bytes([r, g, b]) + bytes(batch) + self._device.feature_request(feat, 0x60, data) + # Commit the frame + self._device.feature_request(feat, 0x70, b"\x00\x00\x00\x00\x00") + + def _apply_animation(self, effect_id): + """Hand off to a firmware animation. Generic over any effect in + hidpp20.LEDEffects: builds the 10-byte param block from the + effect's param map, sourcing color from the zone and other + params from the persisted _idle_effect.""" + from . import hidpp20 + + infos = getattr(self._device, "led_effects", None) + if not infos or not infos.zones: + self._state = self.IDLE + return + entry = hidpp20.LEDEffects.get(effect_id) + if entry is None: + self._state = self.IDLE + return + param_map = entry[1] + for zone in infos.zones: + effect_info = next((e for e in zone.effects if e.ID == effect_id), None) + if effect_info is None: + continue + color = self._get_zone_color(zone) + params = bytearray(10) + if hidpp20.LEDParam.color in param_map: + offset = param_map[hidpp20.LEDParam.color] + params[offset] = (color >> 16) & 0xFF + params[offset + 1] = (color >> 8) & 0xFF + params[offset + 2] = color & 0xFF + for pname, poff in param_map.items(): + if pname == hidpp20.LEDParam.color: + continue + psize = hidpp20.LEDParamSize.get(pname, 1) + user_val = getattr(self._idle_effect, str(pname), None) + if user_val is None: + user_val = effect_info.period or 3000 if pname == hidpp20.LEDParam.period else 0 + params[poff : poff + psize] = int(user_val).to_bytes(psize, "big") + if effect_id == 0x01: + params[3] = 0x02 # Static fixed-color marker + payload = bytes([zone.index, effect_info.index]) + bytes(params) + b"\x01" + try: + self._device.feature_request(SupportedFeature.RGB_EFFECTS, 0x10, payload) + except Exception as exc: + if logger.isEnabledFor(logging.WARNING): + logger.warning( + "%s: failed to apply animation 0x%02x to zone %d: %s", + self._device, + effect_id, + zone.index, + exc, + ) + self._state = self.IDLE + + # --- Sleep --- + + def _start_sleep(self): + """Enter firmware-managed sleep. Firmware fades from current level.""" + self._cancel_dim_timer() + try: + self._device.feature_request(SupportedFeature.RGB_EFFECTS, 0x80, b"\x01\x03\x00") + self._state = self.SLEEPING + if logger.isEnabledFor(logging.DEBUG): + logger.debug("%s: RGB entering sleep (firmware power-down)", self._device) + except Exception as e: + if logger.isEnabledFor(logging.WARNING): + logger.warning("%s: failed to enter RGB sleep: %s", self._device, e) + + # --- Wake --- + + def _wake(self): + """Restore full lighting from any non-ACTIVE state.""" + if self._state == self.ACTIVE: + return + prev_state = self._state + self._cancel_dim_timer() + self._cancel_sleep_timer() + # State must be ACTIVE before _restore_colors() — the paint paths + # translate through it, and writes would otherwise go at the old dim. + self._state = self.ACTIVE + try: + if prev_state == self.SLEEPING: + self._set_power_mode_with_retry(1) + # Re-claim full LED pipeline control + self._device.feature_request(SupportedFeature.RGB_EFFECTS, 0x50, SW_ACTIVE) + # Firmware engine has re-engaged during sleep — re-arm per-key + # one-shots so the next write re-fires the prep + double-send. + for s in self._device.settings: + if s.name == "per-key-lighting": + s._frame_settled = False + s._prep_pushed = False + break + self._restore_colors() + if logger.isEnabledFor(logging.DEBUG): + state_names = {self.DIMMING: "dimming", self.IDLE: "idle", self.SLEEPING: "sleep"} + logger.debug("%s: RGB woken from %s", self._device, state_names.get(prev_state, "unknown")) + except Exception as e: + if logger.isEnabledFor(logging.WARNING): + logger.warning("%s: failed to wake RGB LEDs: %s", self._device, e) + + def _cancel_dim_timer(self): + if self._dim_timer_id is not None: + GLib.source_remove(self._dim_timer_id) + self._dim_timer_id = None + + def _get_zone_color(self, zone): + location = int(zone.location) + setting_name = f"rgb_zone_{location}" + for s in self._device.settings: + if s.name == setting_name and s._value is not None: + return getattr(s._value, "color", 0xFFFFFF) + return 0xFFFFFF + + def _get_zone_base_color(self): + """Color used as the base for unset per-key cells. Black when the + zone effect is marked ignore, the saved zone color otherwise.""" + return effective_zone_base_color(self._device) + + @staticmethod + def _has_real_perkey_colors(perkey_setting): + if not perkey_setting._value: + return False + no_change = special_keys.COLORSPLUS["No change"] + return any(color != no_change and isinstance(color, int) and color >= 0 for color in perkey_setting._value.values()) + + def _build_full_perkey_dim_map(self, perkey_setting, dim_pct): + """{zone_id: (start, target)} for every zone — user-set keys from + their color, unset from the zone base.""" + no_change = special_keys.COLORSPLUS["No change"] + zone_base = self._get_zone_base_color() + user_colors = {int(k): c for k, c in perkey_setting._value.items() if c != no_change and isinstance(c, int) and c >= 0} + return { + int(k): (start, self._compute_dim_color(start, dim_pct)) + for k in perkey_setting._validator.choices + for start in (user_colors.get(int(k), zone_base),) + } + + def _init_unset_perkey_zones(self, perkey_setting): + """Push the zone base color to per-key cells the user hasn't painted — + avoids the white-default flash when per-key takes over the buffer.""" + no_change = special_keys.COLORSPLUS["No change"] + zone_base = self._get_zone_base_color() + r = (zone_base >> 16) & 0xFF + g = (zone_base >> 8) & 0xFF + b = zone_base & 0xFF + + user_set = {int(k) for k, c in perkey_setting._value.items() if c != no_change and isinstance(c, int) and c >= 0} + unset_zones = [int(k) for k in perkey_setting._validator.choices if int(k) not in user_set] + if not unset_zones: + return + + feat = SupportedFeature.PER_KEY_LIGHTING_V2 + remaining = list(unset_zones) + try: + while remaining: + batch = remaining[:13] + remaining = remaining[13:] + self._device.feature_request(feat, 0x60, bytes([r, g, b]) + bytes(batch)) + self._device.feature_request(feat, 0x70, b"\x00\x00\x00\x00\x00") + except exceptions.FeatureCallError as e: + logger.warning("%s: per-key zone init failed (device busy?): %s", self._device, e) + + @staticmethod + def _compute_dim_color(color, dim_pct): + r = ((color >> 16) & 0xFF) * dim_pct // 100 + g = ((color >> 8) & 0xFF) * dim_pct // 100 + b = (color & 0xFF) * dim_pct // 100 + return (r << 16) | (g << 8) | b + + @staticmethod + def _interpolate_color(start, target, t): + r_s, g_s, b_s = (start >> 16) & 0xFF, (start >> 8) & 0xFF, start & 0xFF + r_t, g_t, b_t = (target >> 16) & 0xFF, (target >> 8) & 0xFF, target & 0xFF + r = int(r_s + (r_t - r_s) * t) + g = int(g_s + (g_t - g_s) * t) + b = int(b_s + (b_t - b_s) * t) + return (r << 16) | (g << 8) | b + + def _current_dim_pct(self): + """100 unless we're in Dim mode — animations run at firmware brightness.""" + if self._idle_effect_id() != 0x80: + return 100 + return int(getattr(self._idle_effect, "intensity", 50) or 50) + + def translate_color(self, color): + """Map a saved (undimmed) per-key color to what should be displayed + on the device right now, given the current power-management state. + Returns None to signal SLEEPING — caller should persist and skip the + wire write; _restore_colors on wake will re-push the saved value.""" + # Static idle is a color swap, not a brightness change — user-painted + # cells render their saved color unchanged, and the unset-cell + # substitution happens upstream via effective_zone_base_color. + if self._state == self.IDLE and self._idle_effect_id() == 0x01: + return color + return translate_color_for_display(color, self._state, self._current_dim_pct(), self._dim_step, self._DIM_STEPS) + + def notify_perkey_changed(self, zone_id, new_color): + """Resync a per-key zone's dim ramp entry to a user-repainted color.""" + if self._state != self.DIMMING or not self._dim_perkey or zone_id not in self._dim_perkey: + return + self._dim_perkey[zone_id] = ( + new_color, + self._compute_dim_color(new_color, self._current_dim_pct()), + ) + + def notify_perkey_bulk_changed(self, color_map): + """Bulk notify_perkey_changed, skipping 'No change' entries.""" + if self._state != self.DIMMING or not self._dim_perkey: + return + no_change = special_keys.COLORSPLUS["No change"] + for zone_id, color in color_map.items(): + if color == no_change or not isinstance(color, int) or color < 0: + continue + self.notify_perkey_changed(int(zone_id), int(color)) + + def notify_zone_changed(self, cluster_index, new_color): + """Resync a zone-effect dim ramp entry to a user-repainted color.""" + if self._state != self.DIMMING or not self._dim_zones: + return + dim_pct = self._current_dim_pct() + for i, (zone, _start, _target) in enumerate(self._dim_zones): + if int(zone.index) == int(cluster_index): + self._dim_zones[i] = (zone, new_color, self._compute_dim_color(new_color, dim_pct)) + return + + def _set_power_mode_with_retry(self, mode): + """First command after wake may fail; retry.""" + params = bytes([0x01, mode, 0x00]) + for attempt in range(3): + try: + self._device.feature_request(SupportedFeature.RGB_EFFECTS, 0x80, params) + return + except Exception: + if attempt == 2: + raise + import time as _time + + _time.sleep(0.1) + + def _is_ignored(self, setting_name): + """True if marked ignore via the lock icon.""" + persister = getattr(self._device, "persister", None) + if persister is None: + return False + return persister.get_sensitivity(setting_name) == settings.SENSITIVITY_IGNORE + + def _restore_colors(self): + """Re-push lighting state after waking. Per-key dominates only when + zone is Static — under animations, the zone wire push goes through + and per-key is skipped.""" + _perkey_setting, has_paint = perkey_has_paint(self._device) + zone_static = zone_effect_is_static(self._device) + perkey_dominates = has_paint and zone_static + for s in self._device.settings: + if s._value is None: + continue + if self._is_ignored(s.name): + continue + if s.name == "per-key-lighting": + if not self._has_real_perkey_colors(s): + continue + if not zone_static: + continue # firmware animation owns the visible layer + elif s.name.startswith("rgb_zone_"): + if perkey_dominates: + continue + else: + continue + try: + s.write(s._value, save=False) + if logger.isEnabledFor(logging.DEBUG): + logger.debug("%s: restored %s after wake", self._device, s.name) + except Exception as e: + if logger.isEnabledFor(logging.WARNING): + logger.warning("%s: failed to restore %s: %s", self._device, s.name, e) diff --git a/lib/logitech_receiver/settings_templates.py b/lib/logitech_receiver/settings_templates.py index 27f5e8da..4d24d71a 100644 --- a/lib/logitech_receiver/settings_templates.py +++ b/lib/logitech_receiver/settings_templates.py @@ -21,20 +21,26 @@ import socket import struct import traceback +from time import sleep from time import time from typing import Callable from typing import Protocol +import yaml + from solaar.i18n import _ from . import base from . import common from . import descriptors from . import desktop_notifications +from . import device_quirks from . import diversion from . import exceptions +from . import hidpp10_constants from . import hidpp20 from . import hidpp20_constants +from . import rgb_power from . import settings from . import settings_new from . import settings_validator @@ -51,6 +57,42 @@ _hidpp20 = hidpp20.Hidpp20() _F = hidpp20_constants.SupportedFeature +def halving_marks(max_value, step_count): + """Halving series from max_value down by powers of two, plus 0. + + For max=100, count=5 → [0, 13, 25, 50, 100], matching the G515 FN-F8 cycle. + """ + if step_count < 2 or max_value <= 0: + return [] + levels = [-(-max_value // (1 << i)) for i in range(step_count - 1)] + return sorted({0, *levels}) + + +def auto_step_count(max_value, low_pct=10): + """Step count for halving_marks that stops before dropping below low_pct of max.""" + if max_value <= 0: + return 0 + threshold = max(1, max_value * low_pct // 100) + n = 0 + while -(-max_value // (1 << n)) >= threshold: + n += 1 + return n + 1 + + +def _possible_fields_with_direction_filter(device, possible_fields, direction_field): + """Filter direction_field's choices against the per-device blocklist for + Wave directions the firmware accepts but doesn't render.""" + blocked = hidpp20.LedDirectionBlocklist.get(device.wpid) + if not blocked: + return possible_fields + filtered = common.NamedInts() + for v in hidpp20.LedDirectionChoices: + if int(v) not in blocked: + filtered[int(v)] = str(v) + device_direction_field = dict(direction_field, choices=filtered) + return [device_direction_field if f is direction_field else f for f in possible_fields] + + class State(enum.Enum): IDLE = "idle" PRESSED = "pressed" @@ -1782,6 +1824,19 @@ class BrightnessControl(settings.Setting): rw.min_nonzero_value = validator.min_value validator.min_value = 0 if validator.on_off else validator.min_value + def write(self, value, save=True): + # Snap to firmware-driven halving levels (off only at exact 0). + steps = getattr(self._validator, "steps", 0) + marks = halving_marks(self._validator.max_value, steps) + if value is not None and marks: + if value <= 0: + value = 0 + else: + nonzero = [m for m in marks if m > 0] + if nonzero: + value = min(reversed(nonzero), key=lambda m: abs(m - value)) + return super().write(value, save) + class rw_class(settings.FeatureRW): def read(self, device, data_bytes=b""): if self.on_off: @@ -1805,10 +1860,25 @@ class BrightnessControl(settings.Setting): assert reply, "Oops, brightness range cannot be retrieved!" if reply: max_value = int.from_bytes(reply[0:2], byteorder="big") + steps_and_flags = reply[2] + caps = reply[3] min_value = int.from_bytes(reply[4:6], byteorder="big") - on_off = bool(reply[3] & 0x04) # separate on/off control + on_off = bool(caps & 0x04) + if logger.isEnabledFor(logging.DEBUG): + logger.debug( + "%s BrightnessControl getInfo: %s — max=%d min=%d steps_and_flags=0x%02x caps=0x%02x on_off=%s", + device, + reply[:8].hex(), + max_value, + min_value, + steps_and_flags, + caps, + on_off, + ) validator = cls(min_value=min_value, max_value=max_value, byte_count=2) validator.on_off = on_off + validator.steps = steps_and_flags & 0x0F + device._brightness_steps = validator.steps # for sibling settings (e.g. idle Dim) return validator @@ -1843,14 +1913,41 @@ class LEDZoneSetting(settings.Setting): feature = _F.COLOR_LED_EFFECTS color_field = {"name": _LEDP.color, "kind": settings.Kind.COLOR, "label": _("Color")} speed_field = {"name": _LEDP.speed, "kind": settings.Kind.RANGE, "label": _("Speed"), "min": 0, "max": 255} - period_field = {"name": _LEDP.period, "kind": settings.Kind.RANGE, "label": _("Period"), "min": 100, "max": 5000} + period_field = { + "name": _LEDP.period, + "kind": settings.Kind.RANGE, + "label": _("Period"), + "min": 1000, + "max": 20000, + "display_seconds": True, + } intensity_field = {"name": _LEDP.intensity, "kind": settings.Kind.RANGE, "label": _("Intensity"), "min": 0, "max": 100} ramp_field = {"name": _LEDP.ramp, "kind": settings.Kind.CHOICE, "label": _("Ramp"), "choices": hidpp20.LedRampChoice} - possible_fields = [color_field, speed_field, period_field, intensity_field, ramp_field] + saturation_field = {"name": _LEDP.saturation, "kind": settings.Kind.RANGE, "label": _("Saturation"), "min": 0, "max": 255} + form_field = {"name": _LEDP.form, "kind": settings.Kind.CHOICE, "label": _("Waveform"), "choices": hidpp20.LedFormChoices} + direction_field = { + "name": _LEDP.direction, + "kind": settings.Kind.CHOICE, + "label": _("Direction"), + "choices": hidpp20.LedDirectionChoices, + } + # Per-widget visibility driven by LEDEffects[ID][1]; RGBEffectSetting + # overrides this list to drop ramp/form on 0x8071. + possible_fields = [ + color_field, + speed_field, + period_field, + intensity_field, + ramp_field, + saturation_field, + form_field, + direction_field, + ] @classmethod def setup(cls, device, read_fnid, write_fnid, suffix): infos = device.led_effects + possible_fields = cls._device_possible_fields(device) settings_ = [] for zone in infos.zones: prefix = common.int2bytes(zone.index, 1) @@ -1863,11 +1960,15 @@ class LEDZoneSetting(settings.Setting): setting.label = _("LEDs") + " " + str(hidpp20.LEDZoneLocations[zone.location]) choices = [hidpp20.LEDEffects[e.ID][0] for e in zone.effects if e.ID in hidpp20.LEDEffects] ID_field = {"name": "ID", "kind": settings.Kind.CHOICE, "label": None, "choices": choices} - setting.possible_fields = [ID_field] + cls.possible_fields + setting.possible_fields = [ID_field] + possible_fields setting.fields_map = hidpp20.LEDEffects settings_.append(setting) return settings_ + @classmethod + def _device_possible_fields(cls, device): + return _possible_fields_with_direction_filter(device, cls.possible_fields, cls.direction_field) + @classmethod def build(cls, device): return cls.setup(device, 0xE0, 0x30, b"") @@ -1880,34 +1981,533 @@ class RGBControl(settings.Setting): feature = _F.RGB_EFFECTS rw_options = {"read_fnid": 0x50, "write_fnid": 0x50} # Two-state setting — render as a Gtk.Switch rather than a 2-option combo. - # true_value=1 / false_value=0 are the wire bytes for Solaar / Device mode - # returned by GetSWControl after the 1-byte sub-fn echo. + # true_value=3 / false_value=0 are the wire bytes for Solaar / Device mode + # returned by GetSWControl after the 1-byte sub-fn echo. Mode 3 is the + # full SW takeover the claim handshake below expects. validator_class = settings_validator.BooleanValidator - validator_options = {"true_value": 1, "false_value": 0, "write_prefix_bytes": b"\x01", "read_skip_byte_count": 1} + validator_options = {"true_value": 3, "false_value": 0, "write_prefix_bytes": b"\x01", "read_skip_byte_count": 1} def _pre_read(self, cached, key=None): - # Migrate legacy int values (0/1) stored under the old ChoicesValidator + # Migrate legacy int values (0/3) stored under the old ChoicesValidator # to bool so the switch widget gets a value it can set_state() on. super()._pre_read(cached, key) if isinstance(self._value, int) and not isinstance(self._value, bool): self._value = self._value != 0 + def write(self, value, save=True): + assert hasattr(self, "_value") + assert hasattr(self, "_device") + assert value is not None + device = self._device + if not device.online: + return None + if self._value != value: + self.update(value, save) + claiming = int(value) != 0 # any non-zero value is a Solaar-side claim + if claiming: + self._claim_sw_control(device) + else: + self._release_sw_control(device) + return value + + def _claim_sw_control(self, device): + # Disable firmware power management via profile management or onboard profiles + if device.features and _F.PROFILE_MANAGEMENT in device.features: + device.feature_request(_F.PROFILE_MANAGEMENT, 0x60, b"\x05") + elif device.features and _F.ONBOARD_PROFILES in device.features: + device.feature_request(_F.ONBOARD_PROFILES, 0x10, b"\x02") + # Claim LED pipeline: SetSWControl(mode=3, flags=5) + device.feature_request(_F.RGB_EFFECTS, 0x50, rgb_power.SW_ACTIVE) + # Reset per-key one-shot flags so the first write after this claim + # re-fires the prep + double-send. + for s in device.settings: + if s.name == "per-key-lighting": + s._frame_settled = False + s._prep_pushed = False + break + # Start software power management + rgb_power.start(device) + # Register cleanup for graceful release on device close + if rgb_power.cleanup not in device.cleanups: + device.cleanups.append(rgb_power.cleanup) + # Repaint LEDs with Solaar's saved state. Without this the firmware's + # last-active onboard profile keeps showing until the user changes + # something — the takeover would look like it did nothing. + self._repaint_after_claim(device) + + def _repaint_after_claim(self, device): + """Push saved zone effects and (if opted in) per-key buffer to the + device after a fresh SW claim. Best-effort: individual failures get + logged but don't abort the rest of the repaint.""" + for s in device.settings: + if s.name.startswith("rgb_zone_") and s._value is not None: + try: + s.write(s._value, save=False) + except Exception as e: + logger.warning("%s: post-claim repaint of %s failed: %s", device, s.name, e) + perkey, has_paint = rgb_power.perkey_has_paint(device) + if has_paint and perkey._value is not None: + try: + perkey.write(perkey._value, save=False) + except Exception as e: + logger.warning("%s: post-claim per-key repaint failed: %s", device, e) + + def _release_sw_control(self, device): + # Stop software power management + rgb_power.stop(device) + # Release LED pipeline: SetSWControl(mode=0, flags=0) + device.feature_request(_F.RGB_EFFECTS, 0x50, rgb_power.SW_RELEASE) + # Restore firmware power management + if device.features and _F.PROFILE_MANAGEMENT in device.features: + device.feature_request(_F.PROFILE_MANAGEMENT, 0x60, b"\x03") + elif device.features and _F.ONBOARD_PROFILES in device.features: + device.feature_request(_F.ONBOARD_PROFILES, 0x10, b"\x01") + # Keep cleanup registered on devices that support the shutdown effect + # cap — it also fires the firmware shutdown animation trigger on exit. + if not getattr(device, "_rgb_has_shutdown_cap", False): + if rgb_power.cleanup in device.cleanups: + device.cleanups.remove(rgb_power.cleanup) + + +class RGBIdleTimeout(settings.Setting): + name = "rgb_idle_timeout" + label = _("Idle Timeout") + description = _("Time without input before LED idle effect starts.") + "\n" + _("LED Control needs to be enabled.") + feature = _F.RGB_EFFECTS + choices_universe = common.NamedInts( + **{ + "Disabled": 0, + "15 Seconds": 15, + "30 Seconds": 30, + "1 Minute": 60, + "2 Minutes": 120, + "5 Minutes": 300, + } + ) + validator_class = settings_validator.ChoicesValidator + validator_options = {"choices": choices_universe} + + class rw_class: + def __init__(self, feature, **kwargs): + self.feature = feature + self.kind = settings.FeatureRW.kind + + def read(self, device): + return common.int2bytes(60, 2) # default 1 minute + + def write(self, device, data_bytes): + timeout = int.from_bytes(data_bytes, byteorder="big") + mgr = rgb_power.get_manager(device) + if mgr: + mgr.set_idle_timeout(timeout) + return True + + +class RGBIdleEffect(settings.Setting): + """Idle-effect setting with per-effect sub-widgets. Persisted value is an + LEDEffectSetting; legacy bare-int values are migrated in `_pre_read`.""" + + name = "rgb_idle_effect" + label = _("Idle Effect") + description = ( + _("What happens to LEDs when idle — dim to a percentage, change the base color, or play an animation.") + + "\n" + + _("LED Control needs to be enabled.") + ) + feature = _F.RGB_EFFECTS + # Reuse zone fields so idle controls match the active-zone setup exactly. + # Idle-specific intensity override carries the halving marks for Dim. + intensity_field = {**LEDZoneSetting.intensity_field, "halving": True} + period_field = LEDZoneSetting.period_field + saturation_field = LEDZoneSetting.saturation_field + speed_field = LEDZoneSetting.speed_field + direction_field = LEDZoneSetting.direction_field + color_field = LEDZoneSetting.color_field + possible_fields = [color_field, speed_field, period_field, intensity_field, saturation_field, direction_field] + + class rw_class: + def __init__(self, feature, **kwargs): + self.feature = feature + self.kind = settings.FeatureRW.kind + + def read(self, device): + return hidpp20.LEDEffectSetting(ID=0x80, intensity=50).to_bytes() + + def write(self, device, data_bytes): + value = hidpp20.LEDEffectSetting.from_bytes(data_bytes) + mgr = rgb_power.get_manager(device) + if mgr: + mgr.set_idle_effect(value) + return True + + def _pre_read(self, cached, key=None): + """Migrate legacy bare-int values to LEDEffectSetting on first read.""" + super()._pre_read(cached, key) + if self._value is None or isinstance(self._value, hidpp20.LEDEffectSetting): + return + if not isinstance(self._value, int): + return + legacy = self._value + if legacy == 0: + migrated = hidpp20.LEDEffectSetting(ID=0x00) + elif legacy in (25, 50, 75): + migrated = hidpp20.LEDEffectSetting(ID=0x80, intensity=legacy) + elif legacy == 0x0A: + migrated = hidpp20.LEDEffectSetting(ID=0x0A, period=3000, intensity=100) + elif legacy == 0x0B: + migrated = hidpp20.LEDEffectSetting(ID=0x0B, period=3000) + else: + return # unrecognized — leave alone, write() will error informatively + if logger.isEnabledFor(logging.DEBUG): + logger.debug( + "%s: migrating legacy bare-int %s to LEDEffectSetting on %s", + self.name, + legacy, + self._device, + ) + self._value = migrated + if getattr(self._device, "persister", None) is not None: + self._device.persister[self.name] = self._value + + # Ripple needs keyboard input to animate so it can't run while idle. + # Disabled (0x00) and Dim (0x80) are seeded into choice_ids directly. + # Static (0x01) is also seeded so it appears right below Dim regardless + # of probe-derived ID ordering. + _IDLE_EXCLUDED_IDS = frozenset({0x00, 0x0B, 0x17}) + + @classmethod + def build(cls, device): + rw = cls.rw_class(cls.feature) + # No change → Dim → Static, then any probed device-specific effects. + choice_ids = [0x00, 0x80, 0x01] + probed = set() + try: + infos = device.led_effects + if infos and infos.zones: + probed = {int(e.ID) for e in infos.zones[0].effects} + except Exception: + pass + for eid in sorted(probed): + if eid in cls._IDLE_EXCLUDED_IDS or eid in choice_ids: + continue + if eid not in hidpp20.LEDEffects: + continue + choice_ids.append(eid) + idle_disabled = common.NamedInt(0x00, _("No change")) + choices = [idle_disabled if i == 0x00 else hidpp20.LEDEffects[i][0] for i in choice_ids] + ID_field = {"name": "ID", "kind": settings.Kind.CHOICE, "label": None, "choices": choices} + fields_map = { + 0x00: [idle_disabled, {}], + 0x80: [ + hidpp20.LEDEffects[0x80][0], + {hidpp20.LEDParam.intensity: 0}, + {hidpp20.LEDParam.intensity: 50}, + ], + } + for eid in choice_ids: + if eid not in fields_map: + fields_map[eid] = hidpp20.LEDEffects[eid] + validator = settings_validator.HeteroValidator(data_class=hidpp20.LEDEffectSetting, options=None, readable=True) + setting = cls(device, rw, validator) + setting.possible_fields = [ID_field] + _possible_fields_with_direction_filter( + device, cls.possible_fields, cls.direction_field + ) + setting.fields_map = fields_map + return setting + + +class RGBSleepTimeout(settings.Setting): + name = "rgb_sleep_timeout" + label = _("Sleep Timeout") + description = _("Time without input before LEDs fade off completely.") + "\n" + _("LED Control needs to be enabled.") + feature = _F.RGB_EFFECTS + choices_universe = common.NamedInts( + **{ + "Disabled": 0, + "2 Minutes": 120, + "5 Minutes": 300, + "10 Minutes": 600, + "15 Minutes": 900, + "30 Minutes": 1800, + } + ) + validator_class = settings_validator.ChoicesValidator + validator_options = {"choices": choices_universe} + + class rw_class: + def __init__(self, feature, **kwargs): + self.feature = feature + self.kind = settings.FeatureRW.kind + + def read(self, device): + return common.int2bytes(300, 2) # default 5 minutes + + def write(self, device, data_bytes): + timeout = int.from_bytes(data_bytes, byteorder="big") + mgr = rgb_power.get_manager(device) + if mgr: + mgr.set_sleep_timeout(timeout) + return True + + +class _RgbBootEffect: + """NvConfig persistent boot/shutdown effect payload on RGBEffects (0x8071). + + Wire format (7 bytes, NvConfig cap 0x0001 startup / 0x0040 shutdown): + [enabled, R1, G1, B1, R2, G2, B2] + enabled: 0x01 on, 0x02 off. Colors are kept editable in both states so + toggling Off doesn't lose the user's chosen color. + """ + + _COLOR_ATTRS = ("color1", "color2") + + def __init__(self, ID=1, color1=0, color2=0): + self.ID = int(ID) + for k, v in (("color1", color1), ("color2", color2)): + iv = int(v) & 0xFFFFFF + setattr(self, k, common.ColorInt(iv)) + + @classmethod + def from_bytes(cls, data, options=None): + if data is None or len(data) < 7: + return cls() + c1 = (data[1] << 16) | (data[2] << 8) | data[3] + c2 = (data[4] << 16) | (data[5] << 8) | data[6] + return cls(ID=data[0], color1=c1, color2=c2) + + def to_bytes(self, options=None): + return bytes( + [ + self.ID & 0xFF, + (self.color1 >> 16) & 0xFF, + (self.color1 >> 8) & 0xFF, + self.color1 & 0xFF, + (self.color2 >> 16) & 0xFF, + (self.color2 >> 8) & 0xFF, + self.color2 & 0xFF, + ] + ) + + def __eq__(self, other): + return isinstance(other, self.__class__) and self.to_bytes() == other.to_bytes() + + def __str__(self): + return yaml.dump(self, width=float("inf")).rstrip("\n") + + @classmethod + def from_yaml(cls, loader, node): + return cls(**loader.construct_mapping(node)) + + @classmethod + def to_yaml(cls, dumper, data): + return dumper.represent_mapping("!RgbBootEffect", data.__dict__, flow_style=True) + + +yaml.SafeLoader.add_constructor("!RgbBootEffect", _RgbBootEffect.from_yaml) +yaml.add_representer(_RgbBootEffect, _RgbBootEffect.to_yaml) + + +class _RgbBootEffectSetting(settings.Setting): + """Base for NvConfig persistent effect toggles on RGBEffects (0x8071). + + Subclasses set cap_id (0x0001 startup, 0x0040 shutdown). Build probes the + cap once and suppresses the setting if the device doesn't answer — so the + setting auto-appears on any 0x8071 device that supports the cap, without + needing per-model gating. + """ + + feature = _F.RGB_EFFECTS + cap_id: int = 0 + + _ENABLED_CHOICES = common.NamedInts(**{"On": 1, "Off": 2}) + _COLOR1_FIELD = {"name": "color1", "kind": settings.Kind.COLOR, "label": _("Primary")} + _COLOR2_FIELD = {"name": "color2", "kind": settings.Kind.COLOR, "label": _("Secondary")} + + class rw_class: + kind = settings.FeatureRW.kind + + def __init__(self, feature, cap_id): + self.feature = feature + self.cap_id = cap_id + self._cap_bytes = bytes([(cap_id >> 8) & 0xFF, cap_id & 0xFF]) + + def read(self, device): + reply = device.feature_request(self.feature, 0x30, b"\x00" + self._cap_bytes) + if reply is None or len(reply) < 10: + return None + return reply[3:10] # strip [sub-fn, capHi, capLo] echo + + def write(self, device, data_bytes): + return device.feature_request(self.feature, 0x30, b"\x01" + self._cap_bytes + bytes(data_bytes)) + + @classmethod + def build(cls, device): + cap_bytes = bytes([(cls.cap_id >> 8) & 0xFF, cls.cap_id & 0xFF]) + try: + reply = device.feature_request(_F.RGB_EFFECTS, 0x30, b"\x00" + cap_bytes) + except exceptions.FeatureCallError: + return None # device rejects this cap — gate the setting off + if reply is None or len(reply) < 10: + return None + rw = cls.rw_class(cls.feature, cls.cap_id) + validator = settings_validator.HeteroValidator(data_class=_RgbBootEffect, options=None) + setting = cls(device, rw, validator) + # Render the enabled byte as a right-aligned Gtk.Switch (like a plain + # TOGGLE setting) rather than an inline Off/On combo. on_value/off_value + # carry the wire-format byte (0x01 = on, 0x02 = off) through to the + # data class without changing the byte semantics. + id_field = {"name": "ID", "kind": settings.Kind.TOGGLE, "label": None, "on_value": 1, "off_value": 2} + # Keep all color widgets in possible_fields so reads still populate + # them and writes still carry their values — the firmware may store + # bytes it doesn't visibly use. fields_map controls UI visibility. + setting.possible_fields = [id_field, cls._COLOR1_FIELD, cls._COLOR2_FIELD] + # Default-allow: show both color pickers unless this device model is + # known to ignore one or both. Most 0x8071 devices honor the bytes. + inert = device_quirks.get(device, "rgb_effects_nvconfig_colors_inert", {}).get(cls.cap_id, set()) + visible = {n: o for n, o in (("color1", 1), ("color2", 4)) if n not in inert} + # Both On/Off map to the same visible field set so colors stay editable + # when the effect is Off (pre-stages them for next enable). + setting.fields_map = { + int(cls._ENABLED_CHOICES["On"]): (cls._ENABLED_CHOICES["On"], visible), + int(cls._ENABLED_CHOICES["Off"]): (cls._ENABLED_CHOICES["Off"], visible), + } + # Register the firmware shutdown trigger on cap 0x0040 devices so the + # animation plays on Solaar exit. rgb_power.cleanup fires mode 0 at + # its end when _rgb_has_shutdown_cap is set. See + # solaar_shutdown_effect_trigger_spec.md. + if cls.cap_id == 0x0040: + device._rgb_has_shutdown_cap = True + if rgb_power.cleanup not in device.cleanups: + device.cleanups.append(rgb_power.cleanup) + return setting + + +class RgbStartupAnimation(_RgbBootEffectSetting): + name = "rgb_startup_animation" + label = _("Startup Animation") + description = _( + "Firmware-played animation when the keyboard wakes from deep sleep or powers on.\n" + "Setting persists on the device (non-volatile)." + ) + cap_id = 0x0001 + + +class RgbShutdownAnimation(_RgbBootEffectSetting): + name = "rgb_shutdown_animation" + label = _("Shutdown Animation") + description = _( + "Firmware-played animation when the keyboard powers off.\n" "Setting persists on the device (non-volatile)." + ) + cap_id = 0x0040 + class RGBEffectSetting(LEDZoneSetting): name = "rgb_zone_" # the trailing underscore signals that this setting creates other settings label = _("LED Zone Effects") description = _("Set effect for LED Zone") + "\n" + _("LED Control needs to be enabled.") feature = _F.RGB_EFFECTS + # 0x8071 firmware-fixes ramp/form bytes; drop those widgets here. + possible_fields = [ + LEDZoneSetting.color_field, + LEDZoneSetting.speed_field, + LEDZoneSetting.period_field, + LEDZoneSetting.intensity_field, + LEDZoneSetting.saturation_field, + LEDZoneSetting.direction_field, + ] @classmethod def build(cls, device): return cls.setup(device, None, 0x10, b"\x01") + def write(self, value, save=True): + """Push zone effect to wire unless per-key is the dominant layer. + + Per-key acts as a multi-color sub-mode of Static: when zone is Static + and per-key is opted in with paint, per-key owns the visible layer. + Non-Static zone effects (animations) always go to the wire — per-key + defers to the firmware animation. Transitions into Static still push + the Static wire so any running animation stops. + """ + assert hasattr(self, "_value") + assert hasattr(self, "_device") + assert value is not None + device = self._device + if not device.online: + return None + perkey, has_paint = rgb_power.perkey_has_paint(device) + new_is_static = int(getattr(value, "ID", 0) or 0) == rgb_power._EFFECT_STATIC + old_value = self._value + old_is_static = old_value is None or int(getattr(old_value, "ID", 0) or 0) == rgb_power._EFFECT_STATIC + if has_paint and new_is_static and old_is_static: + if save: + changed = old_value != value + self.update(value, save) + if changed and perkey._fill_unset_zones_with_base_color(): + perkey._send_with_retry(0x70, b"\x00") # FrameEnd + # Resync the dim ramp's start colors for unset cells. + mgr = rgb_power.get_manager(device) + if mgr is not None: + new_base = int(getattr(value, "color", 0) or 0) + unset_zones = perkey._unset_zone_ids() + if unset_zones: + mgr.notify_perkey_bulk_changed({z: new_base for z in unset_zones}) + return value + # Persist undimmed value first (single source of truth). + if self._value != value: + self.update(value, save) + wire_value = self._translate_for_wire(value) + if wire_value is None: # SLEEPING — _wake() will re-push at full brightness. + return value + current_value = None + if self._validator.needs_current_value: + current_value = self._rw.read(device) + data_bytes = self._validator.prepare_write(wire_value, current_value) + if data_bytes is None: + return None + reply = self._rw.write(device, data_bytes) + if not reply: + return None + # Animation → Static transition with per-key paint: the Static wire + # we just pushed stops the animation; now repaint the per-key + # multi-color overlay on top so the user gets a seamless switch. + if has_paint and new_is_static and perkey is not None: + try: + perkey.write(perkey._value, save=False) + except Exception as e: + logger.warning("%s: per-key repaint after Static restore failed: %s", device, e) + # Resync any in-flight dim ramp to the new color. + mgr = rgb_power.get_manager(device) + if mgr is not None and getattr(value, "color", None) is not None and self._rw.prefix: + mgr.notify_zone_changed(self._rw.prefix[0], int(value.color)) + return value + + def _translate_for_wire(self, value): + """Clone `value` with `.color` translated through rgb_power state. + Returns None for SLEEPING.""" + saved_color = getattr(value, "color", None) + if saved_color is None: + return value + wire_color = rgb_power.translate_for_device(self._device, int(saved_color)) + if wire_color is None: + return None + if int(wire_color) == int(saved_color): + return value # ACTIVE or no-op translation; reuse original + # Build a shallow clone with translated color so the persister and the + # in-memory _value keep the undimmed source-of-truth color. + wire_attrs = dict(value.__dict__) + wire_attrs["color"] = int(wire_color) + return hidpp20.LEDEffectSetting(**wire_attrs) + class PerKeyLighting(settings.Settings): name = "per-key-lighting" label = _("Per-key Lighting") - description = _("Control per-key lighting.") + description = ( + _("Control per-key lighting.") + + "\n" + + _("LED Control needs to be enabled and the zone effect set to Static for per-key paint to be visible.") + ) feature = _F.PER_KEY_LIGHTING_V2 keys_universe = special_keys.KEYCODES editor_class = "solaar.ui.perkey.control:PerKeyControl" @@ -1937,6 +2537,116 @@ class PerKeyLighting(settings.Settings): def update_key_value(self, key, value, save=True): super().update_key_value(key, self._wrap_color(value), save) + def _ensure_sw_control(self): + """Ensure SW control is claimed before writing per-key colors.""" + if getattr(self, "_has_rgb_effects", None) is None: + self._has_rgb_effects = bool(self._device.features and _F.RGB_EFFECTS in self._device.features) + if not self._has_rgb_effects: + return # No autonomous effect engine, no claim needed + for s in self._device.settings: + if s.name == "rgb_control": + # _value may be bool (current) or int (legacy persister value + # before BooleanValidator migration); both coerce cleanly. + if not s._value: # Not already claimed by Solaar + s.write(True) # Triggers full claim sequence in RGBControl + return + + # BUSY-retry backoff (ms). + _BUSY_BACKOFF_MS = (30, 60, 90) + + def _send_with_retry(self, function, data, retries=3): + # Retry on BUSY and timeout (both transient). Other FeatureCallError + # codes abort — they're real bugs we shouldn't paper over. + busy_attempt = 0 + max_busy = len(self._BUSY_BACKOFF_MS) + for attempt in range(retries + 1): + try: + reply = self._device.feature_request(self.feature, function, data) + except exceptions.FeatureCallError as e: + if getattr(e, "error", None) == hidpp20_constants.ErrorCode.BUSY and busy_attempt < max_busy: + delay_ms = self._BUSY_BACKOFF_MS[busy_attempt] + busy_attempt += 1 + if logger.isEnabledFor(logging.DEBUG): + logger.debug( + "%s: per-key 0x%02x BUSY, retry %d/%d after %dms", + self._device, + function, + busy_attempt, + max_busy, + delay_ms, + ) + sleep(delay_ms / 1000.0) + continue + logger.warning("%s: per-key 0x%02x rejected by device", self._device, function) + return False + if reply is not None: + if (attempt > 0 or busy_attempt > 0) and logger.isEnabledFor(logging.DEBUG): + logger.debug( + "%s: per-key 0x%02x succeeded after %d timeout retries, %d BUSY retries", + self._device, + function, + attempt, + busy_attempt, + ) + return True + if logger.isEnabledFor(logging.DEBUG): + logger.debug( + "%s: per-key 0x%02x timed out (attempt %d/%d)", + self._device, + function, + attempt + 1, + retries + 1, + ) + return False + + def _zone_base_color(self): + """Color used to fill per-key unset cells. Defers to + rgb_power.effective_zone_base_color which returns black when the + zone effect is ignored, otherwise the saved zone color.""" + return rgb_power.effective_zone_base_color(self._device) + + def _send_zone_color(self, zone_id, wire_color): + """Emit a single-zone color update (fn 0x10). `wire_color` is the + already-translated color that should appear on the device.""" + r = (wire_color >> 16) & 0xFF + g = (wire_color >> 8) & 0xFF + b = wire_color & 0xFF + return self._send_with_retry(0x10, bytes([zone_id, r, g, b])) + + def _unset_zone_ids(self): + """Per-key zone IDs that don't have a user-painted color.""" + no_change = special_keys.COLORSPLUS["No change"] + user_set = set() + if self._value: + for key, color in self._value.items(): + if color != no_change and isinstance(color, int) and color >= 0: + user_set.add(int(key)) + return [int(k) for k in self._validator.choices if int(k) not in user_set] + + def _fill_unset_zones_with_base_color(self): + """Push the zone base color (translated for current power state) to + any per-key cell the user hasn't painted. Caller commits FrameEnd.""" + if not self._has_rgb_effects: + return True + zone_base = self._zone_base_color() + wire_base = rgb_power.translate_for_device(self._device, zone_base) + if wire_base is None: + return True # SLEEPING — caller defers wire entirely + r = (wire_base >> 16) & 0xFF + g = (wire_base >> 8) & 0xFF + b = wire_base & 0xFF + unset_zones = self._unset_zone_ids() + if not unset_zones: + return True + remaining = list(unset_zones) + ok = True + while remaining: + batch = remaining[:13] + remaining = remaining[13:] + if not self._send_with_retry(0x60, bytes([r, g, b]) + bytes(batch)): + ok = False + return ok + def read(self, cached=True): # The 0x8081 protocol has no GetIndividualRgbZones — the device cannot # report its current per-key buffer back. So a "live" read is fictional: @@ -1955,47 +2665,144 @@ class PerKeyLighting(settings.Settings): self._value = reply_map return reply_map + def _send_perkey_frame(self, map, no_change): + """Send all per-key sub-packets for `map`, fill unset cells with the + zone base color, and commit with FrameEnd. Returns True on success.""" + # Bucket by color; single-bucket allows the range-update fast path. + table = {} + for key, value in map.items(): + if value in table: + table[value].append(key) + else: + table[value] = [key] + ok = True + if len(table) == 1: # use range update + for value, keys in table.items(): # only one, of course + if value != no_change: # this signals no change, so don't update at all + wire = rgb_power.translate_for_device(self._device, int(value)) + data_bytes = keys[0].to_bytes(1, "big") + keys[-1].to_bytes(1, "big") + wire.to_bytes(3, "big") + if not self._send_with_retry(0x50, data_bytes): # range update command to update all keys + ok = False + else: + data_bytes = b"" + for value, keys in table.items(): + if value != no_change: # this signals no change, so ignore it + wire = rgb_power.translate_for_device(self._device, int(value)) + while len(keys) > 3: # use an optimized update command that can update up to 13 keys + data = wire.to_bytes(3, "big") + b"".join([key.to_bytes(1, "big") for key in keys[0:13]]) + if not self._send_with_retry(0x60, data): # single-value multiple-keys update + ok = False + keys = keys[13:] + for key in keys: + data_bytes += key.to_bytes(1, "big") + wire.to_bytes(3, "big") + if len(data_bytes) >= 16: # up to four values are packed into a regular update + if not self._send_with_retry(0x10, data_bytes): + ok = False + data_bytes = b"" + if len(data_bytes) > 0: # update any remaining keys + if not self._send_with_retry(0x10, data_bytes): + ok = False + # Fill unset zones before FrameEnd so the frame commits atomically. + if not self._fill_unset_zones_with_base_color(): + ok = False + # Suppress FrameEnd on partial failure to avoid a visibly wrong commit. + if ok: + if not self._send_with_retry(0x70, b"\x00"): + logger.warning("%s: per-key FrameEnd failed; frame not committed", self._device) + ok = False + else: + logger.warning( + "%s: per-key frame had failed sub-packets; suppressing FrameEnd to avoid partial commit", + self._device, + ) + return ok + def write(self, map, save=True): if self._device.online: + self._ensure_sw_control() + # Persist undimmed (single source of truth). self.update(map, save) - table = {} - for key, value in map.items(): - if value in table: - table[value].append(key) # keys will be in order from small to large - else: - table[value] = [key] - if len(table) == 1: # use range update - for value, keys in table.items(): # only one, of course - if value != special_keys.COLORSPLUS["No change"]: # this signals no change, so don't update at all - data_bytes = keys[0].to_bytes(1, "big") + keys[-1].to_bytes(1, "big") + value.to_bytes(3, "big") - self._device.feature_request(self.feature, 0x50, data_bytes) # range update command to update all keys - self._device.feature_request(self.feature, 0x70, 0x00) # signal device to make the changes - else: - data_bytes = b"" - for value, keys in table.items(): # only one, of course - if value != special_keys.COLORSPLUS["No change"]: # this signals no change, so ignore it - while len(keys) > 3: # use an optimized update command that can update up to 13 keys - data = value.to_bytes(3, "big") + b"".join([key.to_bytes(1, "big") for key in keys[0:13]]) - self._device.feature_request(self.feature, 0x60, data) # single-value multiple-keys update - keys = keys[13:] - for key in keys: - data_bytes += key.to_bytes(1, "big") + value.to_bytes(3, "big") - if len(data_bytes) >= 16: # up to four values are packed into a regular update - self._device.feature_request(self.feature, 0x10, data_bytes) - data_bytes = b"" - if len(data_bytes) > 0: # update any remaining keys - self._device.feature_request(self.feature, 0x10, data_bytes) - self._device.feature_request(self.feature, 0x70, 0x00) # signal device to make the changes + # Per-key is a sub-mode of Static — when zone is animating, the + # firmware engine owns the visible layer. + if not rgb_power.zone_effect_is_static(self._device): + return map + no_change = special_keys.COLORSPLUS["No change"] + # SLEEPING — defer wire to wake. + for value in map.values(): + if value != no_change and rgb_power.translate_for_device(self._device, int(value)) is None: + return map + # Mouse-only prep, one-shot per claim — keyboards don't need it. + if ( + getattr(self, "_has_rgb_effects", False) + and not getattr(self, "_prep_pushed", False) + and int(getattr(self._device, "kind", -1) or -1) == int(hidpp10_constants.DEVICE_KIND.mouse) + ): + if rgb_power.push_artanis_perkey_prep(self._device): + self._prep_pushed = True + ok = self._send_perkey_frame(map, no_change) + # First frame after a claim sometimes lands before the firmware + # has fully transitioned out of onboard mode — replay it once. + if ok and not getattr(self, "_frame_settled", False): + self._send_perkey_frame(map, no_change) + self._frame_settled = True + if ok: + mgr = rgb_power.get_manager(self._device) + if mgr is not None: + mgr.notify_perkey_bulk_changed(map) return map def write_key_value(self, key, value, save=True): - if value != special_keys.COLORSPLUS["No change"]: # this signals no change - result = super().write_key_value(int(key), value, save) - if self._device.online: - self._device.feature_request(self.feature, 0x70, 0x00) # signal device to make the change - return result + self._ensure_sw_control() + no_change = special_keys.COLORSPLUS["No change"] + zone_id = int(key) + if value != no_change: + self.update_key_value(zone_id, value, save) + if not self._device.online: + return value + # Per-key is a sub-mode of Static — defer to firmware animation. + if not rgb_power.zone_effect_is_static(self._device): + return value + wire = rgb_power.translate_for_device(self._device, int(value)) + if wire is None: + return value # SLEEPING — wake re-pushes + ok = True + # Fill unset zones once so they don't show white-default. + if not getattr(self, "_base_filled", False): + if self._fill_unset_zones_with_base_color(): + self._base_filled = True + else: + ok = False + if ok and not self._send_zone_color(zone_id, wire): + ok = False + if ok: + mgr = rgb_power.get_manager(self._device) + if mgr is not None: + mgr.notify_perkey_changed(zone_id, int(value)) + if not self._send_with_retry(0x70, b"\x00"): + logger.warning("%s: per-key FrameEnd failed; frame not committed", self._device) + else: + logger.warning("%s: per-key write failed; suppressing FrameEnd", self._device) + return value else: - return True + # Un-set: store "No change", push the zone base color to that cell. + self.update_key_value(zone_id, no_change, save) + if not self._device.online: + return no_change + if not rgb_power.zone_effect_is_static(self._device): + return no_change + zone_base = self._zone_base_color() + wire_base = rgb_power.translate_for_device(self._device, zone_base) + if wire_base is None: + return no_change + if self._send_zone_color(zone_id, wire_base): + mgr = rgb_power.get_manager(self._device) + if mgr is not None: + mgr.notify_perkey_changed(zone_id, zone_base) + if not self._send_with_retry(0x70, b"\x00"): + logger.warning("%s: per-key FrameEnd failed; frame not committed", self._device) + else: + logger.warning("%s: per-key un-set write failed; suppressing FrameEnd", self._device) + return no_change class rw_class(settings.FeatureRWMap): pass @@ -2312,8 +3119,13 @@ SETTINGS: list[settings.Setting] = [ LEDZoneSetting, RGBControl, RGBEffectSetting, - BrightnessControl, PerKeyLighting, + BrightnessControl, + RGBIdleEffect, + RGBIdleTimeout, + RGBSleepTimeout, + RgbStartupAnimation, + RgbShutdownAnimation, FnSwap, # simple NewFnSwap, # simple K375sFnSwap, # working @@ -2476,7 +3288,15 @@ def check_feature_settings(device, already_known) -> bool: new_absent = [] for sclass in SETTINGS: if sclass.feature: - known_present = device.persister and sclass.name in device.persister + if device.persister: + if sclass.name.endswith("_"): + # Multi-setting prototype (e.g. rgb_zone_); persister stores child keys + # like rgb_zone_1, never the prototype name itself. + known_present = any(k.startswith(sclass.name) for k in device.persister) + else: + known_present = sclass.name in device.persister + else: + known_present = False if not any(s.name == sclass.name for s in already_known) and (known_present or sclass.name not in absent): try: setting = check_feature(device, sclass) diff --git a/lib/logitech_receiver/settings_validator.py b/lib/logitech_receiver/settings_validator.py index 267cf4e5..5e6d2eff 100644 --- a/lib/logitech_receiver/settings_validator.py +++ b/lib/logitech_receiver/settings_validator.py @@ -584,7 +584,8 @@ class HeteroValidator(Validator): return cls(**kwargs) def __init__(self, data_class=None, options=None, readable=True): - assert data_class is not None and options is not None + # options=None for purely host-side settings — data_class handles bytes[0] as the ID. + assert data_class is not None self.data_class = data_class self.options = options self.readable = readable diff --git a/lib/solaar/ui/config_panel.py b/lib/solaar/ui/config_panel.py index 990293da..e7e2c61b 100644 --- a/lib/solaar/ui/config_panel.py +++ b/lib/solaar/ui/config_panel.py @@ -24,6 +24,7 @@ import gi from logitech_receiver import hidpp20 from logitech_receiver import settings +from logitech_receiver import settings_templates from solaar.i18n import _ from solaar.i18n import ngettext @@ -144,6 +145,13 @@ class SliderControl(Gtk.Scale, Control): self.set_round_digits(0) self.set_digits(0) self.set_increments(1, 5) + # Halving tick marks are an intensity-slider feature only. + if self.sbox.setting.name == "brightness_control": + validator = getattr(self.sbox.setting, "_validator", None) + steps = getattr(validator, "steps", 0) if validator is not None else 0 + if steps: + for mark in settings_templates.halving_marks(validator.max_value, steps): + self.add_mark(mark, Gtk.PositionType.BOTTOM, None) self.connect(GtkSignal.VALUE_CHANGED.value, self.changed) def set_value(self, value): @@ -609,6 +617,26 @@ class GraphicEQControl(MultipleControl): # control with an ID key that determines what else to show +class _HeteroToggleSwitch(Gtk.Switch): + """Gtk.Switch with int-valued get/set_value for HeteroKeyControl. + + Maps switch True/False to the field's wire on/off integer values so the + surrounding control machinery (changed handler, get_value, set_value) can + stay int-based like every other field kind. + """ + + def __init__(self, on_value: int = 1, off_value: int = 2, **kwargs): + super().__init__(**kwargs) + self._on = int(on_value) + self._off = int(off_value) + + def get_value(self) -> int: + return self._on if self.get_state() else self._off + + def set_value(self, value) -> None: + self.set_state(int(value) == self._on) + + class HeteroKeyControl(Gtk.HBox, Control): def __init__(self, sbox, delegate=None): super().__init__(homogeneous=False, spacing=6) @@ -629,6 +657,17 @@ class HeteroKeyControl(Gtk.HBox, Control): item_box.set_active(0) item_box.connect(GtkSignal.CHANGED.value, self.changed) self.pack_start(item_box, False, False, 0) + elif item["kind"] == settings.Kind.TOGGLE: + # Right-align like standard TOGGLE settings — pack_end so the + # switch hugs the right edge while other fields stay left. + item_box = _HeteroToggleSwitch( + on_value=item.get("on_value", 1), + off_value=item.get("off_value", 2), + halign=Gtk.Align.CENTER, + valign=Gtk.Align.CENTER, + ) + item_box.connect(GtkSignal.NOTIFY_ACTIVE.value, self.changed) + self.pack_end(item_box, False, False, 0) elif item["kind"] == settings.Kind.COLOR: item_box = Gtk.ColorButton() item_box.connect(GtkSignal.COLOR_SET.value, self.changed) @@ -639,6 +678,15 @@ class HeteroKeyControl(Gtk.HBox, Control): item_box.set_round_digits(0) item_box.set_digits(0) item_box.set_increments(1, 5) + # Halving tick marks are an intensity-slider feature only. + if item.get("halving") and str(item.get("name")) == str(hidpp20.LEDParam.intensity): + steps = getattr(sbox.setting._device, "_brightness_steps", 0) or settings_templates.auto_step_count( + item["max"] + ) + for mark in settings_templates.halving_marks(item["max"], steps): + item_box.add_mark(mark, Gtk.PositionType.BOTTOM, None) + if item.get("display_seconds", False): + item_box.connect("format-value", lambda _s, v: f"{int(v) / 1000:.2f}s") item_box.connect(GtkSignal.VALUE_CHANGED.value, self.changed) self.pack_start(item_box, True, True, 0) item_box.set_visible(False) @@ -655,11 +703,14 @@ class HeteroKeyControl(Gtk.HBox, Control): result[str(k)] = (r << 16) | (g << 8) | b else: result[str(k)] = box.get_value() - result = hidpp20.LEDEffectSetting(**result) + data_class = getattr(self.sbox.setting._validator, "data_class", hidpp20.LEDEffectSetting) + result = data_class(**result) return result def set_value(self, value): self.set_sensitive(False) + id_ = value.ID if value is not None else 0 + self._apply_id_ranges(id_) if value is not None: for k, v in value.__dict__.items(): if k in self._items: @@ -673,7 +724,7 @@ class HeteroKeyControl(Gtk.HBox, Control): box.set_value(v) else: self.sbox._failed.set_visible(True) - self.setup_visibles(value.ID if value is not None else 0) + self.setup_visibles(id_) def setup_visibles(self, id_): fields = self.sbox.setting.fields_map[id_][1] if id_ in self.sbox.setting.fields_map else {} @@ -683,15 +734,61 @@ class HeteroKeyControl(Gtk.HBox, Control): lblbox.set_visible(visible) box.set_visible(visible) - def changed(self, control): + def changed(self, control, *_args): + # *_args swallows the extra GParamSpec passed by Gtk.Switch's + # "notify::active" signal — other field signals pass just (widget,). if self.get_sensitive() and control.get_sensitive(): if "ID" in self._items and control == self._items["ID"][1]: - self.setup_visibles(int(self._items["ID"][1].get_value())) + new_id = int(self._items["ID"][1].get_value()) + self.setup_visibles(new_id) + self._apply_id_ranges(new_id) + self._apply_id_defaults(new_id) if hasattr(control, "_timer"): control._timer.cancel() control._timer = Timer(0.3, lambda: GLib.idle_add(self._write, control)) control._timer.start() + def _apply_id_ranges(self, id_): + """Reset every RANGE widget to its field's global min/max, then apply + per-effect overrides from fields_map[id_][3]. Reset-first ensures + switching from an override (e.g. Ripple 2-200) to an effect without + one restores the global range instead of inheriting the narrow one.""" + fields_map = getattr(self.sbox.setting, "fields_map", None) + entry = fields_map.get(id_) if fields_map else None + ranges = entry[3] if entry and len(entry) > 3 else {} + for field in self.sbox.setting.possible_fields: + if field.get("kind") != settings.Kind.RANGE: + continue + name = str(field["name"]) + if name not in self._items: + continue + _, box = self._items[name] + lo, hi = ranges.get(field["name"], (field.get("min", 0), field.get("max", 0))) + box.set_range(lo, hi) + + def _apply_id_defaults(self, id_): + """Apply fields_map[id_][2] defaults to RANGE widgets sitting at min.""" + fields_map = getattr(self.sbox.setting, "fields_map", None) + if not fields_map or id_ not in fields_map: + return + entry = fields_map[id_] + if len(entry) < 3: + return + defaults = entry[2] + ranges = entry[3] if len(entry) > 3 else {} + field_by_name = {str(f["name"]): f for f in self.sbox.setting.possible_fields} + for param_name, default_value in defaults.items(): + name = str(param_name) + if name not in self._items: + continue + field = field_by_name.get(name) + if field is None or field.get("kind") != settings.Kind.RANGE: + continue + _, box = self._items[name] + effective_min = ranges[param_name][0] if param_name in ranges else field.get("min", 0) + if box.get_value() == effective_min: + box.set_value(default_value) + def _write(self, control): control._timer.cancel() delattr(control, "_timer") @@ -711,6 +808,81 @@ _icons_allowables = {v: k for k, v in _allowables_icons.items()} # clicking on the lock icon changes from changeable to unchangeable to ignore +# Settings whose operation depends on LED Control being set to Solaar. +# Zone settings (rgb_zone_*) are matched by prefix because their name carries +# the zone index (rgb_zone_1, rgb_zone_2, ...). +_SW_CONTROL_DEPENDENT_NAMES = ("rgb_idle_timeout", "rgb_idle_effect", "rgb_sleep_timeout") +_SW_CONTROL_DEPENDENT_PREFIXES = ("rgb_zone_",) + + +def _sw_control_blocked(device): + """True when LED Control is not Solaar. Reads from setting._value first, + then the persister, so the gate is right at panel load before live reads + have populated. Accepts either the current bool (BooleanValidator) or the + legacy int 3/0 (older ChoicesValidator persister entries).""" + persister = getattr(device, "persister", None) + if persister is None: + return False + value = None + for s in getattr(device, "settings", []) or []: + if s.name == "rgb_control": + value = s._value + break + if value is None: + value = persister.get("rgb_control") + if value is None: + return False + # Solaar = bool True or legacy int 3; everything else (False, 0, …) blocks. + return value not in (True, 3) + + +def _zone_effect_blocks_perkey(device): + """True when any zone effect's saved ID is not Static (0x01) — zone + animations mask the per-key buffer regardless of SW control state.""" + persister = getattr(device, "persister", None) + if persister is None: + return False + for s in getattr(device, "settings", []) or []: + if not s.name.startswith("rgb_zone_"): + continue + v = s._value if s._value is not None else persister.get(s.name) + if v is None: + continue + if int(getattr(v, "ID", 0) or 0) != 0x01: + return True + return False + + +def _set_row_sensitive(device, name, can_function): + """Apply sensitivity to a single setting's control row. Combines the + user's lock-icon opt-in (persister sensitivity) with the can-function + gate so neither alone can override the other.""" + device_id = (device.receiver.path if device.receiver else device.path, device.number) + sbox = _items.get((device_id[0], device_id[1], name)) + if sbox is None or not hasattr(sbox, "_control"): + return + persister = getattr(device, "persister", None) + user_allowed = persister.get_sensitivity(name) if persister else True + sbox._control.set_sensitive(user_allowed is True and can_function) + + +def _apply_rgb_gates(device): + """Grey out RGB settings whose prerequisites aren't met. Visual-only: + leaves persister _sensitive flags (user lock-icon opt-ins) intact. + + - rgb_zone_* and rgb_idle_*/rgb_sleep_timeout need LED Control = Solaar + (rgb_control == 3). + - per-key-lighting needs LED Control = Solaar AND every zone effect on + Static (0x01), because non-Static zone animations mask per-key writes. + """ + sw_blocked = _sw_control_blocked(device) + for s in getattr(device, "settings", []) or []: + if s.name in _SW_CONTROL_DEPENDENT_NAMES or any(s.name.startswith(p) for p in _SW_CONTROL_DEPENDENT_PREFIXES): + _set_row_sensitive(device, s.name, not sw_blocked) + perkey_blocked = sw_blocked or _zone_effect_blocks_perkey(device) + _set_row_sensitive(device, "per-key-lighting", not perkey_blocked) + + def _change_click(button, sbox): icon = button.get_children()[0] icon_name, _ = icon.get_icon_name() @@ -728,6 +900,35 @@ def _change_click(button, sbox): _write_async(setting, persisted, sbox) else: _read_async(setting, True, sbox, bool(sbox.setting._device.online), sbox._control.get_sensitive()) + elif new_allowed == settings.SENSITIVITY_IGNORE and sbox.setting.name == "per-key-lighting": + # User just opted out of per-key lighting. The firmware effect engine + # is currently in its OOR "direct mode" slot showing the per-key buffer + # (entered when the prep sequence wrote effectIdx=numEffects via + # SetEffectByIndex on 0x8071). Writing a regular in-range effectIdx + # with persist=1 displaces that slot and the saved zone effect becomes + # the visible layer again. See LOGITECH_HIDPP2_PROTOCOL.md + # "Per-key prep sequence" and 0x8071 SetEffectByIndex persist=1 + # requirement. + device = sbox.setting._device + for s in device.settings: + if s.name.startswith("rgb_zone_") and s._value is not None: + _write_async(s, s._value, None) + break # one zone-effect write is enough to flip the engine + if sbox.setting.name.startswith("rgb_zone_"): + # Toggling zone-effect sensitivity changes the effective base color + # for per-key unset cells (zone color ↔ black). When per-key is + # opted-in, repaint it so the unset cells pick up the new base. + from logitech_receiver import rgb_power + + device = sbox.setting._device + perkey, has_paint = rgb_power.perkey_has_paint(device) + if has_paint: + _write_async(perkey, perkey._value, None) + # The lock icon on rgb_control, any zone, or per-key itself can change + # whether per-key is functional — re-evaluate the gate. + name = sbox.setting.name + if name == "rgb_control" or name == "per-key-lighting" or name.startswith("rgb_zone_"): + _apply_rgb_gates(sbox.setting._device) return True @@ -828,6 +1029,10 @@ def _update_setting_item(sbox, value, is_online=True, sensitive=True, null_okay= logger.warning("%s: error setting control value (%s): %s", sbox.setting.name, sbox.setting._device, repr(e)) sbox._control.set_sensitive(sensitive is True) _change_icon(sensitive, sbox._change_icon) + # rgb_control and rgb_zone_* state gate per-key sensitivity. + name = sbox.setting.name + if name == "rgb_control" or name.startswith("rgb_zone_"): + _apply_rgb_gates(sbox.setting._device) def _disable_listbox_highlight_bg(lb): @@ -887,6 +1092,7 @@ def update(device, is_online=None): sensitive = device.persister.get_sensitivity(s.name) if device.persister else True _read_async(s, False, sbox, is_online, sensitive) + _apply_rgb_gates(device) _box.set_visible(True) diff --git a/lib/solaar/ui/perkey/control.py b/lib/solaar/ui/perkey/control.py index 0c3dd557..3377821f 100644 --- a/lib/solaar/ui/perkey/control.py +++ b/lib/solaar/ui/perkey/control.py @@ -137,15 +137,15 @@ class _SettingSink: persister[self._palette_key()] = {"active": int(active), "previous": int(previous)} def zone_base_color(self) -> int | None: + """Color used to render per-key unset cells in the editor. Matches + rgb_power.effective_zone_base_color: black when zone is ignored, + the saved zone color otherwise.""" device = getattr(self._setting, "_device", None) - if device is None or not getattr(device, "settings", None): + if device is None: return None - for s in device.settings: - if s.name.startswith("rgb_zone_") and s._value is not None: - color = getattr(s._value, "color", None) - if isinstance(color, int): - return int(color) - return None + from logitech_receiver import rgb_power + + return int(rgb_power.effective_zone_base_color(device)) def _notify(self) -> None: snapshot = self.current diff --git a/tests/logitech_receiver/fake_hidpp.py b/tests/logitech_receiver/fake_hidpp.py index 7758f2f5..91fcabb8 100644 --- a/tests/logitech_receiver/fake_hidpp.py +++ b/tests/logitech_receiver/fake_hidpp.py @@ -387,6 +387,8 @@ class Device: wpid: Optional[str] = "0000" setting_callback: Any = None centurion: bool = False + path = None + cleanups = None sliding = profiles = _backlight = _keys = _remap_keys = _led_effects = _gestures = None _gestures_lock = threading.Lock() number = "d1" @@ -409,6 +411,7 @@ class Device: self.persister = configuration._DeviceEntry() self.features = hidpp20.FeaturesArray(self) self.settings = [] + self.cleanups = [] self.receiver = [] if self.feature is not None: self.features = hidpp20.FeaturesArray(self) diff --git a/tests/logitech_receiver/test_rgb_power.py b/tests/logitech_receiver/test_rgb_power.py new file mode 100644 index 00000000..cdaf74c8 --- /dev/null +++ b/tests/logitech_receiver/test_rgb_power.py @@ -0,0 +1,679 @@ +## Copyright (C) Solaar contributors +## +## This program is free software; you can redistribute it and/or modify +## it under the terms of the GNU General Public License as published by +## the Free Software Foundation; either version 2 of the License, or +## (at your option) any later version. + +"""Tests for the display-state-aware color translation in rgb_power. + +These cover the pure math (`translate_color_for_display`) and the small +manager hooks that PerKeyLighting calls into (`translate_color`, +`notify_perkey_changed`) without requiring a GLib main loop. +""" + +import pytest + +from logitech_receiver import rgb_power + +M = rgb_power.RGBPowerManager + + +# --- translate_color_for_display (pure function) ---------------------------- + + +@pytest.mark.parametrize( + "color", + [0x000000, 0x123456, 0xFF0000, 0xFFFFFF], +) +def test_translate_active_is_identity(color): + assert rgb_power.translate_color_for_display(color, M.ACTIVE, 50, 0, 25) == color + + +def test_translate_idle_50pct(): + # 255 * 50 // 100 == 127 + assert rgb_power.translate_color_for_display(0xFFFFFF, M.IDLE, 50, 0, 25) == 0x7F7F7F + + +def test_translate_idle_25pct_on_ff8800(): + # 0xFF * 25 // 100 = 63 (0x3F); 0x88 * 25 // 100 = 34 (0x22); 0 stays 0 + assert rgb_power.translate_color_for_display(0xFF8800, M.IDLE, 25, 0, 25) == 0x3F2200 + + +def test_translate_dimming_start_is_saved_color(): + # t = 0/25 = 0 → interpolation returns the start (saved) color + assert rgb_power.translate_color_for_display(0xABCDEF, M.DIMMING, 50, 0, 25) == 0xABCDEF + + +def test_translate_dimming_end_equals_idle(): + # t = 25/25 = 1 → interpolation returns the target (fully dimmed) + idle = rgb_power.translate_color_for_display(0xFFFFFF, M.IDLE, 50, 0, 25) + dimming_end = rgb_power.translate_color_for_display(0xFFFFFF, M.DIMMING, 50, 25, 25) + assert dimming_end == idle + + +def test_translate_dimming_midramp_between_full_and_dim(): + # At t=12/25 (≈0.48), white interpolates between 0xFF and 0x7F (50% target). + # Expected r = 255 + (127 - 255) * 12/25 = 255 - 61.44 → int(193.56) = 193 (0xC1) + assert rgb_power.translate_color_for_display(0xFFFFFF, M.DIMMING, 50, 12, 25) == 0xC1C1C1 + + +def test_translate_sleeping_returns_none(): + assert rgb_power.translate_color_for_display(0xFFFFFF, M.SLEEPING, 50, 0, 25) is None + + +# --- translate_for_device (manager lookup) ---------------------------------- + + +def test_translate_for_device_no_manager_is_identity(): + # An arbitrary object with no manager registered → returns input unchanged. + fake = object() + assert rgb_power.translate_for_device(fake, 0xABCDEF) == 0xABCDEF + + +def _dim(intensity): + """Shortcut: build a Dim-mode LEDEffectSetting for tests that used to + pass a bare dim-percent int as `_idle_effect`.""" + from logitech_receiver import hidpp20 + + return hidpp20.LEDEffectSetting(ID=0x80, intensity=intensity) + + +def _install_manager(monkeypatch, state, idle_effect=None, dim_step=0): + """Build an RGBPowerManager with state injected and register it for a + fake device id. Cleanup happens via monkeypatch's _managers swap. + + `idle_effect` defaults to Dim 50%. Pass a bare int (legacy) and it + will be wrapped as a Dim-mode LEDEffectSetting; pass an + LEDEffectSetting directly to use as-is. + """ + from logitech_receiver import hidpp20 + + fake_device = object() + + class _Dev: + pass + + mgr = M.__new__(M) # bypass __init__ (no GLib needed) + mgr._device = _Dev() + mgr._state = state + if idle_effect is None: + mgr._idle_effect = _dim(50) + elif isinstance(idle_effect, int): + mgr._idle_effect = _dim(idle_effect) + else: + mgr._idle_effect = idle_effect + mgr._dim_step = dim_step + mgr._dim_perkey = None + # Avoid circular import surprise — make sure the LEDEffectSetting is + # imported here so type checks in helpers don't crash. + _ = hidpp20.LEDEffectSetting + + saved_managers = dict(rgb_power._managers) + monkeypatch.setattr(rgb_power, "_managers", {id(fake_device): mgr}) + yield_data = (fake_device, mgr, saved_managers) + return yield_data + + +def test_translate_for_device_idle_routes_through_manager(monkeypatch): + fake_device, mgr, _ = _install_manager(monkeypatch, M.IDLE, idle_effect=50) + assert rgb_power.translate_for_device(fake_device, 0xFFFFFF) == 0x7F7F7F + + +def test_translate_for_device_sleeping_returns_none(monkeypatch): + fake_device, _, _ = _install_manager(monkeypatch, M.SLEEPING) + assert rgb_power.translate_for_device(fake_device, 0xFFFFFF) is None + + +def test_current_dim_pct_falls_back_to_100_for_non_dim_effects(): + from logitech_receiver import hidpp20 + + mgr = M.__new__(M) + # Dim is the only host-side idle effect — it's the only one Solaar can + # render itself (by interpolating colors toward a dimmer target on a + # Static zone color or in the per-key buffer). Disabled does nothing. + # Breathe and Ripple hand off to the firmware effect engine, which + # runs the animation at its own brightness; Solaar applies no dim + # translation in those cases, so _current_dim_pct returns 100. + for fw_or_disabled in (0x00, 0x0A, 0x0B): + mgr._idle_effect = hidpp20.LEDEffectSetting(ID=fw_or_disabled) + assert mgr._current_dim_pct() == 100 + # Dim mode — intensity carries the dim percentage. + for dim_pct in (25, 50, 75): + mgr._idle_effect = hidpp20.LEDEffectSetting(ID=0x80, intensity=dim_pct) + assert mgr._current_dim_pct() == dim_pct + + +# --- notify_perkey_changed -------------------------------------------------- + + +def test_notify_perkey_changed_updates_dim_map_during_dimming(): + mgr = M.__new__(M) + mgr._state = M.DIMMING + mgr._idle_effect = _dim(50) + mgr._dim_perkey = {5: (0xFFFFFF, 0x7F7F7F)} + mgr.notify_perkey_changed(5, 0x00FF00) + start, target = mgr._dim_perkey[5] + assert start == 0x00FF00 + # target = dim(0x00FF00, 50) = (0, 0xFF*50//100, 0) = (0, 0x7F, 0) + assert target == 0x007F00 + + +def test_notify_perkey_changed_noop_when_not_dimming(): + mgr = M.__new__(M) + mgr._state = M.IDLE + mgr._idle_effect = _dim(50) + mgr._dim_perkey = {5: (0xFFFFFF, 0x7F7F7F)} + mgr.notify_perkey_changed(5, 0x00FF00) + assert mgr._dim_perkey[5] == (0xFFFFFF, 0x7F7F7F) + + +def test_notify_perkey_changed_noop_for_unknown_zone(): + mgr = M.__new__(M) + mgr._state = M.DIMMING + mgr._idle_effect = _dim(50) + mgr._dim_perkey = {5: (0xFFFFFF, 0x7F7F7F)} + mgr.notify_perkey_changed(99, 0x00FF00) # not in _dim_perkey + assert mgr._dim_perkey == {5: (0xFFFFFF, 0x7F7F7F)} + + +def test_notify_perkey_bulk_changed_skips_no_change(): + from logitech_receiver import special_keys + + no_change = special_keys.COLORSPLUS["No change"] + mgr = M.__new__(M) + mgr._state = M.DIMMING + mgr._idle_effect = _dim(50) + mgr._dim_perkey = {5: (0xFFFFFF, 0x7F7F7F), 7: (0x000000, 0x000000)} + mgr.notify_perkey_bulk_changed({5: 0x00FF00, 7: no_change}) + # Zone 5 updated, zone 7 (No change) left alone. + assert mgr._dim_perkey[5][0] == 0x00FF00 + assert mgr._dim_perkey[7] == (0x000000, 0x000000) + + +# --- notify_zone_changed (zone-effect dim ramp) ----------------------------- + + +class _FakeZone: + """Minimal stand-in for hidpp20.LEDZoneInfo — only `.index` is consulted + by RGBPowerManager.notify_zone_changed.""" + + def __init__(self, index): + self.index = index + + +def test_notify_zone_changed_updates_matching_cluster_during_dimming(): + mgr = M.__new__(M) + mgr._state = M.DIMMING + mgr._idle_effect = _dim(50) + zone_a = _FakeZone(0) + zone_b = _FakeZone(1) + mgr._dim_zones = [ + (zone_a, 0xFFFFFF, 0x7F7F7F), + (zone_b, 0xFF0000, 0x7F0000), + ] + mgr.notify_zone_changed(1, 0x00FF00) + # zone_a unchanged + assert mgr._dim_zones[0] == (zone_a, 0xFFFFFF, 0x7F7F7F) + # zone_b updated: new start + recomputed target at 50% + zone, start, target = mgr._dim_zones[1] + assert zone is zone_b + assert start == 0x00FF00 + # _compute_dim_color(0x00FF00, 50) = (0, 0xFF*50//100, 0) = (0, 0x7F, 0) + assert target == 0x007F00 + + +def test_notify_zone_changed_noop_when_not_dimming(): + mgr = M.__new__(M) + mgr._state = M.IDLE + mgr._idle_effect = _dim(50) + zone = _FakeZone(0) + mgr._dim_zones = [(zone, 0xFFFFFF, 0x7F7F7F)] + mgr.notify_zone_changed(0, 0x00FF00) + assert mgr._dim_zones[0] == (zone, 0xFFFFFF, 0x7F7F7F) + + +def test_notify_zone_changed_noop_for_unknown_cluster(): + mgr = M.__new__(M) + mgr._state = M.DIMMING + mgr._idle_effect = _dim(50) + zone = _FakeZone(0) + mgr._dim_zones = [(zone, 0xFFFFFF, 0x7F7F7F)] + mgr.notify_zone_changed(99, 0x00FF00) # cluster 99 not in _dim_zones + assert mgr._dim_zones[0] == (zone, 0xFFFFFF, 0x7F7F7F) + + +def test_notify_zone_changed_noop_when_dim_zones_empty(): + mgr = M.__new__(M) + mgr._state = M.DIMMING + mgr._idle_effect = _dim(50) + mgr._dim_zones = [] # e.g. per-key active, zone path skipped + mgr.notify_zone_changed(0, 0x00FF00) # should not raise + assert mgr._dim_zones == [] + + +# --- perkey_has_paint / zone_effect_is_static (module-level predicates) ----- + + +class _FakePerKey: + """Minimal stand-in for PerKeyLighting: holds a _value map and a + _validator with .choices that perkey_has_paint inspects.""" + + name = "per-key-lighting" + + def __init__(self, value, choices=(1, 2, 3)): + self._value = value + + class _V: + pass + + self._validator = _V() + self._validator.choices = list(choices) + + +class _FakeZoneSetting: + """Minimal stand-in for an RGBEffectSetting child instance: holds a + name starting with "rgb_zone_" and a _value with an .ID attribute.""" + + def __init__(self, name, value): + self.name = name + self._value = value + + +class _ValueWithID: + """Stand-in for hidpp20.LEDEffectSetting — only .ID is consulted by + zone_effect_is_static.""" + + def __init__(self, ID): + self.ID = ID + + +class _FakePersister: + def __init__(self, sensitivities=None): + self._s = sensitivities or {} + + def get_sensitivity(self, name): + return self._s.get(name, False) + + +class _FakeDevice: + def __init__(self, settings_list, persister=None): + self.settings = settings_list + self.persister = persister + + +def test_perkey_has_paint_with_real_colors(): + pk = _FakePerKey({1: 0xFF0000, 2: -1, 3: -1}) # one real color, rest "No change" + dev = _FakeDevice([pk], _FakePersister({"per-key-lighting": True})) + found, has_paint = rgb_power.perkey_has_paint(dev) + assert found is pk + assert has_paint is True + + +def test_perkey_has_paint_requires_explicit_opt_in(): + # Paint exists but sensitivity is the default (False) — has_paint must + # be False so zone effects remain primary on devices where the user + # hasn't opted in to per-key dominance (e.g. G502X mouse out of the box). + pk = _FakePerKey({1: 0xFF0000}) + dev = _FakeDevice([pk], _FakePersister()) # sensitivity defaults to False + _, has_paint = rgb_power.perkey_has_paint(dev) + assert has_paint is False + + +def test_perkey_has_paint_only_no_change(): + pk = _FakePerKey({1: -1, 2: -1, 3: -1}) # all "No change" + dev = _FakeDevice([pk]) + found, has_paint = rgb_power.perkey_has_paint(dev) + assert found is pk + assert has_paint is False + + +def test_perkey_has_paint_no_perkey_setting(): + dev = _FakeDevice([]) + found, has_paint = rgb_power.perkey_has_paint(dev) + assert found is None + assert has_paint is False + + +def test_perkey_has_paint_validator_choices_empty(): + pk = _FakePerKey({1: 0xFF0000}, choices=()) + dev = _FakeDevice([pk]) + found, has_paint = rgb_power.perkey_has_paint(dev) + assert found is pk + assert has_paint is False + + +def test_perkey_has_paint_user_ignores_perkey(): + from logitech_receiver import settings as _settings + + pk = _FakePerKey({1: 0xFF0000}) + dev = _FakeDevice([pk], _FakePersister({"per-key-lighting": _settings.SENSITIVITY_IGNORE})) + found, has_paint = rgb_power.perkey_has_paint(dev) + assert found is pk + assert has_paint is False + + +def test_perkey_has_paint_with_no_persister(): + # No persister means no place to record the user opting in, so per-key + # dominance never engages — zone effects remain primary. + pk = _FakePerKey({1: 0xFF0000}) + dev = _FakeDevice([pk], persister=None) + _, has_paint = rgb_power.perkey_has_paint(dev) + assert has_paint is False + + +def test_zone_effect_is_static_true_for_static(): + z = _FakeZoneSetting("rgb_zone_1", _ValueWithID(0x01)) + assert rgb_power.zone_effect_is_static(_FakeDevice([z])) is True + + +def test_zone_effect_is_static_false_for_animated(): + for eff_id in (0x02, 0x03, 0x0A, 0x0B, 0x0E, 0x15): + z = _FakeZoneSetting("rgb_zone_1", _ValueWithID(eff_id)) + assert rgb_power.zone_effect_is_static(_FakeDevice([z])) is False, eff_id + + +def test_zone_effect_is_static_false_for_disabled(): + # Disabled (0x00) means the user wants no zone effect — including + # suppressing the per-key paint, since per-key push would re-light + # the device. + z = _FakeZoneSetting("rgb_zone_1", _ValueWithID(0x00)) + assert rgb_power.zone_effect_is_static(_FakeDevice([z])) is False + + +def test_zone_effect_is_static_true_when_no_zone_setting(): + # Devices that only enumerate PER_KEY_LIGHTING_V2 (no RGB_EFFECTS) have + # no zone-effect setting at all — per-key paint should be free to drive. + assert rgb_power.zone_effect_is_static(_FakeDevice([])) is True + + +def test_zone_effect_is_static_true_when_any_zone_is_static(): + # Multi-zone device with one Static and one Cycle zone — at least one + # Static slot is enough for per-key to overlay on. + a = _FakeZoneSetting("rgb_zone_1", _ValueWithID(0x01)) # Static + b = _FakeZoneSetting("rgb_zone_2", _ValueWithID(0x03)) # Cycle + assert rgb_power.zone_effect_is_static(_FakeDevice([a, b])) is True + + +# --- RGBEffectSetting.write divert / push -------------------------------- + + +def _make_rgb_effect_setting(device, current_value): + """Build a partial RGBEffectSetting bound to `device` without running the + setup classmethod (which requires a real led_effects descriptor). Only + the fields touched by RGBEffectSetting.write are populated.""" + from unittest.mock import MagicMock + + from logitech_receiver import settings_templates + + s = settings_templates.RGBEffectSetting.__new__(settings_templates.RGBEffectSetting) + s._device = device + s._value = current_value + s._rw = MagicMock() + s._rw.prefix = b"\x01" + s._validator = MagicMock() + s._validator.needs_current_value = False + # update() persists onto _value; mimic with a simple assignment. + s.update = lambda v, save=True: setattr(s, "_value", v) + return s + + +def test_rgb_effect_write_static_to_static_repaints_unset_zones(monkeypatch): + """User tweaks the Static base color while per-key is the visible + layer: persist, repaint per-key's unset zones, send FrameEnd. No + SetEffectByIndex (would reclaim the engine and overwrite per-key). + Also notifies any in-flight dim ramp so unset cells' start_color + tracks the new base.""" + from unittest.mock import MagicMock + + from logitech_receiver import hidpp20 + + perkey = MagicMock() + perkey._fill_unset_zones_with_base_color.return_value = True + perkey._send_with_retry.return_value = True + perkey._unset_zone_ids.return_value = [5, 7, 9] + device = MagicMock() + device.online = True + + mgr = MagicMock() + monkeypatch.setattr(rgb_power, "perkey_has_paint", lambda d: (perkey, True)) + monkeypatch.setattr(rgb_power, "get_manager", lambda d: mgr) + + old = hidpp20.LEDEffectSetting(ID=1, color=0xFF0000) + new = hidpp20.LEDEffectSetting(ID=1, color=0x00FF00) + s = _make_rgb_effect_setting(device, old) + + result = s.write(new) + + assert result is new + assert s._value is new + perkey._fill_unset_zones_with_base_color.assert_called_once() + perkey._send_with_retry.assert_called_once_with(0x70, b"\x00") + s._rw.write.assert_not_called() + # No follow-up per-key push — we were already in Static. + perkey.write.assert_not_called() + # In-flight dim ramp gets notified that unset cells now interpolate + # from the new base (0x00FF00) rather than the old. + mgr.notify_perkey_bulk_changed.assert_called_once_with({5: 0x00FF00, 7: 0x00FF00, 9: 0x00FF00}) + + +def test_rgb_effect_write_static_to_static_no_notify_when_no_unset_zones(monkeypatch): + """If every per-key cell is user-painted, _unset_zone_ids returns + [] and the dim-ramp notification is skipped (nothing to update).""" + from unittest.mock import MagicMock + + from logitech_receiver import hidpp20 + + perkey = MagicMock() + perkey._fill_unset_zones_with_base_color.return_value = True + perkey._send_with_retry.return_value = True + perkey._unset_zone_ids.return_value = [] + device = MagicMock() + device.online = True + + mgr = MagicMock() + monkeypatch.setattr(rgb_power, "perkey_has_paint", lambda d: (perkey, True)) + monkeypatch.setattr(rgb_power, "get_manager", lambda d: mgr) + + old = hidpp20.LEDEffectSetting(ID=1, color=0xFF0000) + new = hidpp20.LEDEffectSetting(ID=1, color=0x00FF00) + s = _make_rgb_effect_setting(device, old) + + s.write(new) + + mgr.notify_perkey_bulk_changed.assert_not_called() + + +def test_rgb_effect_write_static_to_static_unchanged_is_noop(monkeypatch): + """Same value persisted: no repaint, no FrameEnd, no wire write.""" + from unittest.mock import MagicMock + + from logitech_receiver import hidpp20 + + perkey = MagicMock() + device = MagicMock() + device.online = True + monkeypatch.setattr(rgb_power, "perkey_has_paint", lambda d: (perkey, True)) + + same = hidpp20.LEDEffectSetting(ID=1, color=0xFF0000) + s = _make_rgb_effect_setting(device, same) + + s.write(same) + + perkey._fill_unset_zones_with_base_color.assert_not_called() + perkey._send_with_retry.assert_not_called() + s._rw.write.assert_not_called() + + +def test_rgb_effect_write_static_to_animation_pushes_wire(monkeypatch): + """User switches the zone effect dropdown from Static to an animation + while per-key has paint: push the new value to wire so the animation + starts. Per-key is a sub-mode of Static — animations take over the + visible layer when selected.""" + from unittest.mock import MagicMock + + from logitech_receiver import hidpp20 + + perkey = MagicMock() + perkey._value = {1: 0xFF0000} + device = MagicMock() + device.online = True + monkeypatch.setattr(rgb_power, "perkey_has_paint", lambda d: (perkey, True)) + monkeypatch.setattr(rgb_power, "translate_for_device", lambda d, c: c) + monkeypatch.setattr(rgb_power, "get_manager", lambda d: None) + + old = hidpp20.LEDEffectSetting(ID=0x01, color=0xFF0000) + new = hidpp20.LEDEffectSetting(ID=0x0A, color=0x00FF00) + s = _make_rgb_effect_setting(device, old) + s._validator.prepare_write.return_value = b"prepared" + s._rw.write.return_value = b"ack" + + s.write(new) + + s._rw.write.assert_called_once() + perkey._fill_unset_zones_with_base_color.assert_not_called() + + +def test_rgb_effect_write_repaints_perkey_unset_on_color_change(monkeypatch): + """When per-key has paint and the zone's color changes, repaint the + per-key unset cells with the new base so the visible result tracks + the user's color choice.""" + from unittest.mock import MagicMock + + from logitech_receiver import hidpp20 + + perkey = MagicMock() + perkey._value = {1: 0xFF0000, 2: -1} + perkey._fill_unset_zones_with_base_color.return_value = True + perkey._unset_zone_ids.return_value = [2] + device = MagicMock() + device.online = True + monkeypatch.setattr(rgb_power, "perkey_has_paint", lambda d: (perkey, True)) + monkeypatch.setattr(rgb_power, "translate_for_device", lambda d, c: c) + monkeypatch.setattr(rgb_power, "get_manager", lambda d: None) + + old = hidpp20.LEDEffectSetting(ID=0x01, color=0xFF0000) + new = hidpp20.LEDEffectSetting(ID=0x01, color=0x00FF00) + s = _make_rgb_effect_setting(device, old) + s._validator.prepare_write.return_value = b"prepared" + s._rw.write.return_value = b"ack" + + s.write(new) + + s._rw.write.assert_not_called() + perkey._fill_unset_zones_with_base_color.assert_called_once() + perkey._send_with_retry.assert_called_once_with(0x70, b"\x00") + + +def test_rgb_effect_write_apply_path_suppressed_when_perkey_has_paint(monkeypatch): + """save=False is the apply_all_settings path. When per-key has paint and + is opted in, per-key fully owns the visible layer — the zone wire push + is suppressed here too, not just on the save path.""" + from unittest.mock import MagicMock + + from logitech_receiver import hidpp20 + + perkey = MagicMock() + device = MagicMock() + device.online = True + monkeypatch.setattr(rgb_power, "perkey_has_paint", lambda d: (perkey, True)) + monkeypatch.setattr(rgb_power, "translate_for_device", lambda d, c: c) + monkeypatch.setattr(rgb_power, "get_manager", lambda d: None) + + new = hidpp20.LEDEffectSetting(ID=0x01, color=0x00FF00) + s = _make_rgb_effect_setting(device, None) + s._validator.prepare_write.return_value = b"prepared" + s._rw.write.return_value = b"ack" + + s.write(new, save=False) + + s._rw.write.assert_not_called() + # Apply path doesn't repaint either — that's reserved for explicit user + # color changes via save=True. + perkey._fill_unset_zones_with_base_color.assert_not_called() + + +def test_rgb_effect_write_inactive_perkey_falls_through(monkeypatch): + """No per-key paint (or per-key feature absent): existing + translate-through-power-state wire path runs unchanged.""" + from unittest.mock import MagicMock + + from logitech_receiver import hidpp20 + + perkey = MagicMock() + device = MagicMock() + device.online = True + monkeypatch.setattr(rgb_power, "perkey_has_paint", lambda d: (perkey, False)) + monkeypatch.setattr(rgb_power, "translate_for_device", lambda d, c: c) + monkeypatch.setattr(rgb_power, "get_manager", lambda d: None) + + new = hidpp20.LEDEffectSetting(ID=0x01, color=0x00FF00) + s = _make_rgb_effect_setting(device, None) + s._validator.prepare_write.return_value = b"prepared" + s._rw.write.return_value = b"ack" + + s.write(new) + + s._rw.write.assert_called_once() + perkey._fill_unset_zones_with_base_color.assert_not_called() + perkey.write.assert_not_called() + + +# --- PerKeyLighting.write defers to firmware animations ------------------- + + +def test_perkey_write_skipped_when_zone_is_animation(monkeypatch): + """Per-key is a sub-mode of Static. When the saved zone effect is an + animation (Breathe etc.), the firmware engine owns the visible layer + and per-key writes do not go to the wire.""" + from unittest.mock import MagicMock + + from logitech_receiver import settings_templates + + breathe_zone = _FakeZoneSetting("rgb_zone_1", _ValueWithID(0x0A)) + + s = settings_templates.PerKeyLighting.__new__(settings_templates.PerKeyLighting) + device = MagicMock() + device.online = True + device.settings = [breathe_zone] + s._device = device + s._value = {} + s._has_rgb_effects = True + s._ensure_sw_control = MagicMock() + s._send_with_retry = MagicMock(return_value=True) + s._fill_unset_zones_with_base_color = MagicMock(return_value=True) + monkeypatch.setattr(rgb_power, "translate_for_device", lambda d, c: c) + monkeypatch.setattr(rgb_power, "get_manager", lambda d: None) + s.update = lambda m, save=True: None + + s.write({1: 0xFF0000}) + + s._send_with_retry.assert_not_called() + + +def test_perkey_write_key_value_skipped_when_zone_is_animation(monkeypatch): + """Same: write_key_value defers to firmware animations.""" + from unittest.mock import MagicMock + + from logitech_receiver import settings_templates + + breathe_zone = _FakeZoneSetting("rgb_zone_1", _ValueWithID(0x0A)) + + s = settings_templates.PerKeyLighting.__new__(settings_templates.PerKeyLighting) + device = MagicMock() + device.online = True + device.settings = [breathe_zone] + s._device = device + s._value = {} + s._has_rgb_effects = True + s._ensure_sw_control = MagicMock() + s._send_with_retry = MagicMock(return_value=True) + s._send_zone_color = MagicMock(return_value=True) + s._fill_unset_zones_with_base_color = MagicMock(return_value=True) + monkeypatch.setattr(rgb_power, "translate_for_device", lambda d, c: c) + monkeypatch.setattr(rgb_power, "get_manager", lambda d: None) + s.update_key_value = lambda k, v, save=True: None + + s.write_key_value(7, 0xFF0000) + + s._send_zone_color.assert_not_called() diff --git a/tests/logitech_receiver/test_setting_templates.py b/tests/logitech_receiver/test_setting_templates.py index 44f8d844..e667a698 100644 --- a/tests/logitech_receiver/test_setting_templates.py +++ b/tests/logitech_receiver/test_setting_templates.py @@ -297,7 +297,27 @@ simple_tests = [ Setup( FeatureTest(settings_templates.RGBControl, False, True), fake_hidpp.Response("0000", 0x0450), - fake_hidpp.Response("010100", 0x0450, "0101"), + fake_hidpp.Response("010304", 0x0450, "010304"), + fake_hidpp.Response("00003C012C", 0x0470, "00"), # GetRgbPowerModeConfig: idle=60s, sleep=300s + ), + Setup( # RGBIdleEffect — software-only, no feature requests for read/write + # The setting is a HeteroValidator carrying a LEDEffectSetting. + # Default (read with empty persister) = Dim 50%. + FeatureTest( + settings_templates.RGBIdleEffect, + hidpp20.LEDEffectSetting(ID=0x80, intensity=50), + hidpp20.LEDEffectSetting(ID=0x80, intensity=75), + 0, + ), + fake_hidpp.Response("00", 0xFFFF), # placeholder — no device requests needed + ), + Setup( # RGBIdleTimeout — software-only, no feature requests for read/write + FeatureTest(settings_templates.RGBIdleTimeout, 60, 300, 0), + fake_hidpp.Response("00", 0xFFFF), # placeholder — no device requests needed + ), + Setup( # RGBSleepTimeout — software-only, no feature requests for read/write + FeatureTest(settings_templates.RGBSleepTimeout, 300, 600, 0), + fake_hidpp.Response("00", 0xFFFF), # placeholder — no device requests needed ), Setup( FeatureTest( @@ -911,3 +931,90 @@ def test_check_feature_setting(test, mocker): setting = settings_templates.check_feature_setting(device, tst.sclass.name) assert setting + + +# --- RGBIdleEffect._pre_read legacy bare-int migration --------------------- +# Solaar versions before the HeteroValidator refactor stored +# `rgb_idle_effect` as a bare int (0 / 25 / 50 / 75 / 0x0A / 0x0B). +# On first read after upgrade, RGBIdleEffect._pre_read should map +# each to the equivalent LEDEffectSetting and write the upgraded form +# back to the persister so subsequent reads return it directly. + + +def _idle_setting_with_persisted(value): + """Build a minimally-scaffolded RGBIdleEffect with `value` already + in the persister and `_value` unread. Returns (setting, device). + Avoids RGBIdleEffect.build's led_effects probe (which needs a + full device fixture); _pre_read is fully exercised regardless. + """ + from solaar import configuration + + class _Dev: + pass + + device = _Dev() + device.persister = configuration._DeviceEntry() + if value is not None: + device.persister[settings_templates.RGBIdleEffect.name] = value + setting = settings_templates.RGBIdleEffect.__new__(settings_templates.RGBIdleEffect) + setting._device = device + setting._value = None + return setting, device + + +@pytest.mark.parametrize( + "legacy, expected_id, expected_attrs", + [ + (0, 0x00, {}), # Disabled + (25, 0x80, {"intensity": 25}), # Dim 25% + (50, 0x80, {"intensity": 50}), # Dim 50% + (75, 0x80, {"intensity": 75}), # Dim 75% + (0x0A, 0x0A, {"period": 3000, "intensity": 100}), # Breathe + (0x0B, 0x0B, {"period": 3000}), # Ripple + ], +) +def test_RGBIdleEffect_legacy_int_migration(legacy, expected_id, expected_attrs): + setting, device = _idle_setting_with_persisted(legacy) + + setting._pre_read(cached=True) + + assert isinstance(setting._value, hidpp20.LEDEffectSetting) + assert int(setting._value.ID) == expected_id + for attr, val in expected_attrs.items(): + assert getattr(setting._value, attr) == val + # Persister was rewritten so subsequent reads return the migrated form. + persisted = device.persister[setting.name] + assert isinstance(persisted, hidpp20.LEDEffectSetting) + assert int(persisted.ID) == expected_id + + +def test_RGBIdleEffect_already_migrated_is_unchanged(): + """A persisted LEDEffectSetting passes through _pre_read untouched — + no double-migration on subsequent reads.""" + pre_migrated = hidpp20.LEDEffectSetting(ID=0x80, intensity=42) + setting, _ = _idle_setting_with_persisted(pre_migrated) + + setting._pre_read(cached=True) + + assert setting._value is pre_migrated # same instance, no rewrap + + +def test_RGBIdleEffect_none_value_passes_through(): + """Fresh install / nothing in persister: _pre_read leaves _value as + None instead of crashing in the migration branch.""" + setting, _ = _idle_setting_with_persisted(None) + + setting._pre_read(cached=True) + + assert setting._value is None + + +def test_RGBIdleEffect_unrecognized_int_passes_through(): + """An int outside the legacy mapping (corrupt config, future value) + falls through _pre_read without touching _value or the persister.""" + setting, device = _idle_setting_with_persisted(999) + + setting._pre_read(cached=True) + + assert setting._value == 999 + assert device.persister[setting.name] == 999