From 9beb13a3053bd595959304840f33fd4bfef1ede5 Mon Sep 17 00:00:00 2001 From: "Peter F. Patel-Schneider" Date: Mon, 8 Jun 2020 09:57:46 -0400 Subject: [PATCH] receiver: add key reprogramming setting --- lib/logitech_receiver/settings.py | 195 +++++++++++++++++++- lib/logitech_receiver/settings_templates.py | 79 +++++++- lib/solaar/cli/config.py | 2 +- 3 files changed, 272 insertions(+), 4 deletions(-) diff --git a/lib/logitech_receiver/settings.py b/lib/logitech_receiver/settings.py index f117f45e..0f94a361 100644 --- a/lib/logitech_receiver/settings.py +++ b/lib/logitech_receiver/settings.py @@ -19,7 +19,7 @@ from __future__ import absolute_import, division, print_function, unicode_literals -from logging import getLogger, DEBUG as _DEBUG +from logging import getLogger, DEBUG as _DEBUG, INFO as _INFO _log = getLogger(__name__) del getLogger @@ -37,7 +37,7 @@ from .common import ( # # -KIND = _NamedInts(toggle=0x01, choice=0x02, range=0x04) +KIND = _NamedInts(toggle=0x01, choice=0x02, range=0x04, map_choice=0x0A) class Setting(object): """A setting descriptor. @@ -171,6 +171,125 @@ class Setting(object): return '' % (self._rw.kind, self._validator.kind, self.name) __unicode__ = __repr__ = __str__ + +class Settings(Setting): + """A setting descriptor for multiple choices, being a map from keys to values. + Needs to be instantiated for each specific device.""" + + def read(self, cached=True): + assert hasattr(self, '_value') + assert hasattr(self, '_device') + + if _log.isEnabledFor(_DEBUG): + _log.debug("%s: settings read %r from %s", self.name, self._value, self._device) + + if self._value is None and getattr(self._device,'persister',None): + # We haven't read a value from the device yet, + # maybe we have something in the configuration. + self._value = self._device.persister.get(self.name) + + if cached and self._value is not None: + if getattr(self._device,'persister',None) and self.name not in self._device.persister: + # If this is a new device (or a new setting for an old device), + # make sure to save its current value for the next time. + self._device.persister[self.name] = self._value + return self._value + + if self._device.online: + reply_map = dict() + for key, value in self._validator.choices.items(): + reply = self._rw.read(self._device, key) + if reply: + # keys are ints, because that is what the device uses, + # encoded into strings because JSON requires strings as keys + reply_map[str(int(key))] = self._validator.validate_read(reply,key) + self._value = reply_map + if getattr(self._device,'persister',None) and self.name not in self._device.persister: + # Don't update the persister if it already has a value, + # otherwise the first read might overwrite the value we wanted. + self._device.persister[self.name] = self._value + return self._value + + def read_key(self, key, cached=True): + assert hasattr(self, '_value') + assert hasattr(self, '_device') + assert key is not None + + if _log.isEnabledFor(_DEBUG): + _log.debug("%s: settings read %r key %r from %s", self.name, self._value, key, self._device) + + if self._value is None and getattr(self._device,'persister',None): + self._value = self._device.persister.get(self.name) + + if cached and self._value is not None: + if getattr(self._device,'persister',None) and self.name not in self._device.persister: + self._device.persister[self.name] = self._value + return self._value[str(int(key))] + + if self._device.online: + reply = self._rw.read(self._device, key) + if reply: + self._value[str(int(key))] = self._validator.validate_read(reply,key) + if getattr(self._device,'persister',None) and self.name not in self._device.persister: + self._device.persister[self.name] = self._value + return self._value[str(int(key))] + + def write(self, map): + assert hasattr(self, '_value') + assert hasattr(self, '_device') + assert map is not None + + if _log.isEnabledFor(_DEBUG): + _log.debug("%s: settings write %r to %s", self.name, map, self._device) + + if self._device.online: + # Remember the value we're trying to set, even if the write fails. + # This way even if the device is offline or some other error occurs, + # the last value we've tried to write is remembered in the configuration. + self._value = map + if self._device.persister: + self._device.persister[self.name] = map + + for key, value in map.items(): + data_bytes = self._validator.prepare_write(int(key), value) + if data_bytes is not None: + if _log.isEnabledFor(_DEBUG): + _log.debug("%s: settings prepare map write(%s,%s) => %r", self.name, key, value, data_bytes) + reply = self._rw.write(self._device, int(key), data_bytes) + if not reply: + return None + + return map + + def write_key_value(self, key, value): + assert hasattr(self, '_value') + assert hasattr(self, '_device') + assert key is not None + assert value is not None + + if _log.isEnabledFor(_DEBUG): + _log.debug("%s: settings write key %r value %r to %s", self.name, key, value, self._device) + + if self._device.online: + # Remember the value we're trying to set, even if the write fails. + # This way even if the device is offline or some other error occurs, + # the last value we've tried to write is remembered in the configuration. + self._value[str(key)] = value + if self._device.persister: + self._device.persister[self.name] = self._value + + data_bytes = self._validator.prepare_write(int(key), value) + if data_bytes is not None: + if _log.isEnabledFor(_DEBUG): + _log.debug("%s: settings prepare key value write(%s,%s) => %r", self.name, key, value, data_bytes) + reply = self._rw.write(self._device, int(key), data_bytes) + if not reply: + # tell whomever is calling that the write failed + return None + + return value + + # # read/write low-level operators # @@ -212,6 +331,31 @@ class FeatureRW(object): assert self.feature is not None return device.feature_request(self.feature, self.write_fnid, data_bytes) + +class FeatureRWMap(FeatureRW): + kind = _NamedInt(0x02, 'feature') + default_read_fnid = 0x00 + default_write_fnid = 0x10 + default_key_bytes = 1 + + def __init__(self, feature, read_fnid=default_read_fnid, write_fnid=default_write_fnid, key_bytes=default_key_bytes): + assert isinstance(feature, _NamedInt) + self.feature = feature + self.read_fnid = read_fnid + self.write_fnid = write_fnid + self.key_bytes = key_bytes + + def read(self, device, key): + assert self.feature is not None + key_bytes = _int2bytes(key, self.key_bytes) + return device.feature_request(self.feature, self.read_fnid, key_bytes) + + def write(self, device, key, data_bytes): + assert self.feature is not None + key_bytes = _int2bytes(key, self.key_bytes) + return device.feature_request(self.feature, self.write_fnid, key_bytes, data_bytes) + + # # value validators # handle the conversion from read bytes, to setting value, and back @@ -372,6 +516,53 @@ class ChoicesValidator(object): assert isinstance(choice, _NamedInt) return choice.bytes(self._bytes_count) +class ChoicesMapValidator(ChoicesValidator): + kind = KIND.map_choice + + def __init__(self, choices_map, key_bytes_count=None, skip_bytes_count=None, value_bytes_count=None, extra_default=None): + assert choices_map is not None + assert isinstance(choices_map, dict) + max_key_bits = 0 + max_value_bits = 0 + for key, choices in choices_map.items(): + assert isinstance(key, _NamedInt) + assert isinstance(choices, list) + max_key_bits = max(max_key_bits, key.bit_length()) + for key_value in choices: + assert isinstance(key_value, _NamedInt) + max_value_bits = max(max_value_bits, key_value.bit_length()) + self.choices = choices_map + self.needs_current_value = False + self.extra_default=extra_default + + self._key_bytes_count = (max_key_bits+7) // 8 + if key_bytes_count: + assert self._key_bytes_count <= key_bytes_count + self._key_bytes_count = key_bytes_count + self._value_bytes_count = (max_value_bits+7) // 8 + if value_bytes_count: + assert self._value_bytes_count <= value_bytes_count + self._value_bytes_count = value_bytes_count + self._skip_bytes_count = skip_bytes_count if skip_bytes_count is not None else 0 + self._bytes_count = self._key_bytes_count + self._skip_bytes_count + self._value_bytes_count + + def validate_read(self, reply_bytes, key): + start = self._key_bytes_count + self._skip_bytes_count + end = start + self._value_bytes_count + reply_value = _bytes2int(reply_bytes[start:end]) + # reprogrammable keys starts out as 0, which is not a choice, so don't use assert here + if self.extra_default is not None and self.extra_default==reply_value: + return self.choices[key][0] + assert reply_value in self.choices[key], "%s: failed to validate read value %02X" % (self.__class__.__name__, reply_value) + return reply_value + + def prepare_write(self, key, new_value): + choices = self.choices[key] + if new_value not in choices and new_value != self.extra_default: + raise ValueError("invalid choice %r" % new_value) + return _int2bytes(new_value, self._skip_bytes_count + self._value_bytes_count) + + class RangeValidator(object): __slots__ = ('min_value', 'max_value', 'flag', '_bytes_count', 'needs_current_value') diff --git a/lib/logitech_receiver/settings_templates.py b/lib/logitech_receiver/settings_templates.py index 677bcbd6..b2c46a96 100644 --- a/lib/logitech_receiver/settings_templates.py +++ b/lib/logitech_receiver/settings_templates.py @@ -26,19 +26,25 @@ del getLogger from .i18n import _ from . import hidpp10 as _hidpp10 from . import hidpp20 as _hidpp20 +from . import special_keys as _special_keys from .common import ( bytes2int as _bytes2int, int2bytes as _int2bytes, + NamedInt as _NamedInt, NamedInts as _NamedInts, unpack as _unpack, + ReprogrammableKeyInfoV4 as _ReprogrammableKeyInfoV4, ) from .settings import ( KIND as _KIND, Setting as _Setting, + Settings as _Settings, RegisterRW as _RegisterRW, FeatureRW as _FeatureRW, + FeatureRWMap as _FeatureRWMap, BooleanValidator as _BooleanV, ChoicesValidator as _ChoicesV, + ChoicesMapValidator as _ChoicesMapV, RangeValidator as _RangeV, ) @@ -104,6 +110,34 @@ def feature_choices_dynamic(name, feature, choices_callback, return setting(device) return instantiate +# maintain a mapping from keys (NamedInts) to one of a list of choices (NamedInts), default is first one +# the setting is stored as a JSON-compatible object mapping the key int (as a string) to the choice int +# extra_default is an extra value that comes from the device that also means the default +def feature_map_choices(name, feature, choicesmap, + read_function_id, write_function_id, + key_bytes_count=None, skip_bytes_count=None, value_bytes_count=None, + label=None, description=None, device_kind=None, extra_default=None): + assert choicesmap + validator = _ChoicesMapV(choicesmap, key_bytes_count=key_bytes_count, skip_bytes_count=skip_bytes_count, value_bytes_count=value_bytes_count, extra_default=extra_default ) + rw = _FeatureRWMap(feature, read_function_id, write_function_id, key_bytes=key_bytes_count) + return _Settings(name, rw, validator, feature=feature, kind=_KIND.map_choice, label=label, description=description, device_kind=device_kind) + +def feature_map_choices_dynamic(name, feature, choices_callback, + read_function_id, write_function_id, + key_bytes_count=None, skip_bytes_count=None, value_bytes_count=None, + label=None, description=None, device_kind=None, extra_default=None): + # Proxy that obtains choices dynamically from a device + def instantiate(device): + choices = choices_callback(device) + if not choices: # no choices, so don't create a Setting + return None + setting = feature_map_choices(name, feature, choices, + read_function_id, write_function_id, + key_bytes_count=key_bytes_count, skip_bytes_count=skip_bytes_count, value_bytes_count=value_bytes_count, + label=label, description=description, device_kind=device_kind, extra_default=extra_default) + return setting(device) + return instantiate + def feature_range(name, feature, min_value, max_value, read_function_id=_FeatureRW.default_read_fnid, write_function_id=_FeatureRW.default_write_fnid, @@ -149,6 +183,8 @@ _BACKLIGHT = ('backlight', _("Backlight"), _SMART_SHIFT = ('smart-shift', _("Smart Shift"), _("Automatically switch the mouse wheel between ratchet and freespin mode.\n" "The mouse wheel is always free at 0, and always locked at 50")) +_REPROGRAMMABLE_KEYS = ('reprogrammable-keys', _("Actions"), _("Change the action for the key")) + # # # @@ -306,6 +342,42 @@ def _feature_pointer_speed(): bytes_count=2, label=_POINTER_SPEED[1], description=_POINTER_SPEED[2], device_kind=(_DK.mouse, _DK.trackball)) + +# the keys for the choice map are Logitech controls (from special_keys) +# each choice value is a NamedInt with the string from a task (to be shown to the user) +# and the integer being the control number for that task (to be written to the device) +def _feature_reprogrammable_keys_choices(device): + count = device.feature_request(_F.REPROG_CONTROLS_V4) + assert count, 'Oops, reprogrammable key count cannot be retrieved!' + count = ord(count[:1]) # the number of key records + keys = [None] * count + groups = [ [] for i in range(0,9) ] + choices = {} + for i in range(0,count): # get the data for each key record on device + keydata = device.feature_request(_F.REPROG_CONTROLS_V4, 0x10, i) + key, key_task, flags, pos, group, gmask = _unpack('!HHBBBB', keydata[:8]) + action =_NamedInt(key, str(_special_keys.TASK[key_task])) + keys[i] = ( _special_keys.CONTROL[key], action, flags, gmask ) + groups[group].append(action) + for k in keys: + if k[2] & _special_keys.KEY_FLAG.reprogrammable: + key_choices = [ k[1] ] # it should always be possible to map the key to itself + for g in range(1,9): + if k[3] & 2**(g-1): + for gm in groups[g]: + if int(gm) != int(k[0]): # don't put itself in twice + key_choices.append(gm) + choices[k[0]] = key_choices + return choices + +def _feature_reprogrammable_keys(): + return feature_map_choices_dynamic(_REPROGRAMMABLE_KEYS[0], _F.REPROG_CONTROLS_V4, + _feature_reprogrammable_keys_choices, + read_function_id=0x20, write_function_id=0x30, + key_bytes_count=2, skip_bytes_count=1, value_bytes_count=2, + label=_REPROGRAMMABLE_KEYS[1], description=_REPROGRAMMABLE_KEYS[2], + device_kind=(_DK.keyboard,), extra_default=0) + # # # @@ -327,6 +399,7 @@ _SETTINGS_LIST = namedtuple('_SETTINGS_LIST', [ 'backlight', 'typing_illumination', 'smart_shift', + 'reprogrammable_keys', ]) del namedtuple @@ -346,6 +419,7 @@ RegisterSettings = _SETTINGS_LIST( backlight=None, typing_illumination=None, smart_shift=None, + reprogrammable_keys=None, ) FeatureSettings = _SETTINGS_LIST( fn_swap=_feature_fn_swap, @@ -363,6 +437,7 @@ FeatureSettings = _SETTINGS_LIST( backlight=_feature_backlight2, typing_illumination=None, smart_shift=_feature_smart_shift, + reprogrammable_keys=_feature_reprogrammable_keys, ) del _SETTINGS_LIST @@ -400,7 +475,8 @@ def check_feature_settings(device, already_known): detected = feature(device) if _log.isEnabledFor(_DEBUG): _log.debug("check_feature[%s] detected %s", featureId, detected) - already_known.append(detected) + if detected: + already_known.append(detected) except Exception as reason: _log.error("check_feature[%s] inconsistent feature %s", featureId, reason) @@ -415,4 +491,5 @@ def check_feature_settings(device, already_known): check_feature(_POINTER_SPEED[0], _F.POINTER_SPEED) check_feature(_SMART_SHIFT[0], _F.SMART_SHIFT) check_feature(_BACKLIGHT[0], _F.BACKLIGHT2) + check_feature(_REPROGRAMMABLE_KEYS[0], _F.REPROG_CONTROLS_V4) return True diff --git a/lib/solaar/cli/config.py b/lib/solaar/cli/config.py index 60e63d50..301c6829 100644 --- a/lib/solaar/cli/config.py +++ b/lib/solaar/cli/config.py @@ -119,7 +119,7 @@ def run(receivers, args, find_receiver, find_device): raise Exception("can't interpret '%s' as integer" % args.value) else: - raise NotImplemented + raise Exception("NotImplemented") result = setting.write(value) if result is None: