receiver: add key reprogramming setting

This commit is contained in:
Peter F. Patel-Schneider 2020-06-08 09:57:46 -04:00
parent 89995656cd
commit 9beb13a305
3 changed files with 272 additions and 4 deletions

View File

@ -19,7 +19,7 @@
from __future__ import absolute_import, division, print_function, unicode_literals
from logging import getLogger, DEBUG as _DEBUG
from logging import getLogger, DEBUG as _DEBUG, INFO as _INFO
_log = getLogger(__name__)
del getLogger
@ -37,7 +37,7 @@ from .common import (
#
#
KIND = _NamedInts(toggle=0x01, choice=0x02, range=0x04)
KIND = _NamedInts(toggle=0x01, choice=0x02, range=0x04, map_choice=0x0A)
class Setting(object):
"""A setting descriptor.
@ -171,6 +171,125 @@ class Setting(object):
return '<Setting([%s:%s] %s)>' % (self._rw.kind, self._validator.kind, self.name)
__unicode__ = __repr__ = __str__
class Settings(Setting):
"""A setting descriptor for multiple choices, being a map from keys to values.
Needs to be instantiated for each specific device."""
def read(self, cached=True):
assert hasattr(self, '_value')
assert hasattr(self, '_device')
if _log.isEnabledFor(_DEBUG):
_log.debug("%s: settings read %r from %s", self.name, self._value, self._device)
if self._value is None and getattr(self._device,'persister',None):
# We haven't read a value from the device yet,
# maybe we have something in the configuration.
self._value = self._device.persister.get(self.name)
if cached and self._value is not None:
if getattr(self._device,'persister',None) and self.name not in self._device.persister:
# If this is a new device (or a new setting for an old device),
# make sure to save its current value for the next time.
self._device.persister[self.name] = self._value
return self._value
if self._device.online:
reply_map = dict()
for key, value in self._validator.choices.items():
reply = self._rw.read(self._device, key)
if reply:
# keys are ints, because that is what the device uses,
# encoded into strings because JSON requires strings as keys
reply_map[str(int(key))] = self._validator.validate_read(reply,key)
self._value = reply_map
if getattr(self._device,'persister',None) and self.name not in self._device.persister:
# Don't update the persister if it already has a value,
# otherwise the first read might overwrite the value we wanted.
self._device.persister[self.name] = self._value
return self._value
def read_key(self, key, cached=True):
assert hasattr(self, '_value')
assert hasattr(self, '_device')
assert key is not None
if _log.isEnabledFor(_DEBUG):
_log.debug("%s: settings read %r key %r from %s", self.name, self._value, key, self._device)
if self._value is None and getattr(self._device,'persister',None):
self._value = self._device.persister.get(self.name)
if cached and self._value is not None:
if getattr(self._device,'persister',None) and self.name not in self._device.persister:
self._device.persister[self.name] = self._value
return self._value[str(int(key))]
if self._device.online:
reply = self._rw.read(self._device, key)
if reply:
self._value[str(int(key))] = self._validator.validate_read(reply,key)
if getattr(self._device,'persister',None) and self.name not in self._device.persister:
self._device.persister[self.name] = self._value
return self._value[str(int(key))]
def write(self, map):
assert hasattr(self, '_value')
assert hasattr(self, '_device')
assert map is not None
if _log.isEnabledFor(_DEBUG):
_log.debug("%s: settings write %r to %s", self.name, map, self._device)
if self._device.online:
# Remember the value we're trying to set, even if the write fails.
# This way even if the device is offline or some other error occurs,
# the last value we've tried to write is remembered in the configuration.
self._value = map
if self._device.persister:
self._device.persister[self.name] = map
for key, value in map.items():
data_bytes = self._validator.prepare_write(int(key), value)
if data_bytes is not None:
if _log.isEnabledFor(_DEBUG):
_log.debug("%s: settings prepare map write(%s,%s) => %r", self.name, key, value, data_bytes)
reply = self._rw.write(self._device, int(key), data_bytes)
if not reply:
return None
return map
def write_key_value(self, key, value):
assert hasattr(self, '_value')
assert hasattr(self, '_device')
assert key is not None
assert value is not None
if _log.isEnabledFor(_DEBUG):
_log.debug("%s: settings write key %r value %r to %s", self.name, key, value, self._device)
if self._device.online:
# Remember the value we're trying to set, even if the write fails.
# This way even if the device is offline or some other error occurs,
# the last value we've tried to write is remembered in the configuration.
self._value[str(key)] = value
if self._device.persister:
self._device.persister[self.name] = self._value
data_bytes = self._validator.prepare_write(int(key), value)
if data_bytes is not None:
if _log.isEnabledFor(_DEBUG):
_log.debug("%s: settings prepare key value write(%s,%s) => %r", self.name, key, value, data_bytes)
reply = self._rw.write(self._device, int(key), data_bytes)
if not reply:
# tell whomever is calling that the write failed
return None
return value
#
# read/write low-level operators
#
@ -212,6 +331,31 @@ class FeatureRW(object):
assert self.feature is not None
return device.feature_request(self.feature, self.write_fnid, data_bytes)
class FeatureRWMap(FeatureRW):
kind = _NamedInt(0x02, 'feature')
default_read_fnid = 0x00
default_write_fnid = 0x10
default_key_bytes = 1
def __init__(self, feature, read_fnid=default_read_fnid, write_fnid=default_write_fnid, key_bytes=default_key_bytes):
assert isinstance(feature, _NamedInt)
self.feature = feature
self.read_fnid = read_fnid
self.write_fnid = write_fnid
self.key_bytes = key_bytes
def read(self, device, key):
assert self.feature is not None
key_bytes = _int2bytes(key, self.key_bytes)
return device.feature_request(self.feature, self.read_fnid, key_bytes)
def write(self, device, key, data_bytes):
assert self.feature is not None
key_bytes = _int2bytes(key, self.key_bytes)
return device.feature_request(self.feature, self.write_fnid, key_bytes, data_bytes)
#
# value validators
# handle the conversion from read bytes, to setting value, and back
@ -372,6 +516,53 @@ class ChoicesValidator(object):
assert isinstance(choice, _NamedInt)
return choice.bytes(self._bytes_count)
class ChoicesMapValidator(ChoicesValidator):
kind = KIND.map_choice
def __init__(self, choices_map, key_bytes_count=None, skip_bytes_count=None, value_bytes_count=None, extra_default=None):
assert choices_map is not None
assert isinstance(choices_map, dict)
max_key_bits = 0
max_value_bits = 0
for key, choices in choices_map.items():
assert isinstance(key, _NamedInt)
assert isinstance(choices, list)
max_key_bits = max(max_key_bits, key.bit_length())
for key_value in choices:
assert isinstance(key_value, _NamedInt)
max_value_bits = max(max_value_bits, key_value.bit_length())
self.choices = choices_map
self.needs_current_value = False
self.extra_default=extra_default
self._key_bytes_count = (max_key_bits+7) // 8
if key_bytes_count:
assert self._key_bytes_count <= key_bytes_count
self._key_bytes_count = key_bytes_count
self._value_bytes_count = (max_value_bits+7) // 8
if value_bytes_count:
assert self._value_bytes_count <= value_bytes_count
self._value_bytes_count = value_bytes_count
self._skip_bytes_count = skip_bytes_count if skip_bytes_count is not None else 0
self._bytes_count = self._key_bytes_count + self._skip_bytes_count + self._value_bytes_count
def validate_read(self, reply_bytes, key):
start = self._key_bytes_count + self._skip_bytes_count
end = start + self._value_bytes_count
reply_value = _bytes2int(reply_bytes[start:end])
# reprogrammable keys starts out as 0, which is not a choice, so don't use assert here
if self.extra_default is not None and self.extra_default==reply_value:
return self.choices[key][0]
assert reply_value in self.choices[key], "%s: failed to validate read value %02X" % (self.__class__.__name__, reply_value)
return reply_value
def prepare_write(self, key, new_value):
choices = self.choices[key]
if new_value not in choices and new_value != self.extra_default:
raise ValueError("invalid choice %r" % new_value)
return _int2bytes(new_value, self._skip_bytes_count + self._value_bytes_count)
class RangeValidator(object):
__slots__ = ('min_value', 'max_value', 'flag', '_bytes_count', 'needs_current_value')

View File

@ -26,19 +26,25 @@ del getLogger
from .i18n import _
from . import hidpp10 as _hidpp10
from . import hidpp20 as _hidpp20
from . import special_keys as _special_keys
from .common import (
bytes2int as _bytes2int,
int2bytes as _int2bytes,
NamedInt as _NamedInt,
NamedInts as _NamedInts,
unpack as _unpack,
ReprogrammableKeyInfoV4 as _ReprogrammableKeyInfoV4,
)
from .settings import (
KIND as _KIND,
Setting as _Setting,
Settings as _Settings,
RegisterRW as _RegisterRW,
FeatureRW as _FeatureRW,
FeatureRWMap as _FeatureRWMap,
BooleanValidator as _BooleanV,
ChoicesValidator as _ChoicesV,
ChoicesMapValidator as _ChoicesMapV,
RangeValidator as _RangeV,
)
@ -104,6 +110,34 @@ def feature_choices_dynamic(name, feature, choices_callback,
return setting(device)
return instantiate
# maintain a mapping from keys (NamedInts) to one of a list of choices (NamedInts), default is first one
# the setting is stored as a JSON-compatible object mapping the key int (as a string) to the choice int
# extra_default is an extra value that comes from the device that also means the default
def feature_map_choices(name, feature, choicesmap,
read_function_id, write_function_id,
key_bytes_count=None, skip_bytes_count=None, value_bytes_count=None,
label=None, description=None, device_kind=None, extra_default=None):
assert choicesmap
validator = _ChoicesMapV(choicesmap, key_bytes_count=key_bytes_count, skip_bytes_count=skip_bytes_count, value_bytes_count=value_bytes_count, extra_default=extra_default )
rw = _FeatureRWMap(feature, read_function_id, write_function_id, key_bytes=key_bytes_count)
return _Settings(name, rw, validator, feature=feature, kind=_KIND.map_choice, label=label, description=description, device_kind=device_kind)
def feature_map_choices_dynamic(name, feature, choices_callback,
read_function_id, write_function_id,
key_bytes_count=None, skip_bytes_count=None, value_bytes_count=None,
label=None, description=None, device_kind=None, extra_default=None):
# Proxy that obtains choices dynamically from a device
def instantiate(device):
choices = choices_callback(device)
if not choices: # no choices, so don't create a Setting
return None
setting = feature_map_choices(name, feature, choices,
read_function_id, write_function_id,
key_bytes_count=key_bytes_count, skip_bytes_count=skip_bytes_count, value_bytes_count=value_bytes_count,
label=label, description=description, device_kind=device_kind, extra_default=extra_default)
return setting(device)
return instantiate
def feature_range(name, feature, min_value, max_value,
read_function_id=_FeatureRW.default_read_fnid,
write_function_id=_FeatureRW.default_write_fnid,
@ -149,6 +183,8 @@ _BACKLIGHT = ('backlight', _("Backlight"),
_SMART_SHIFT = ('smart-shift', _("Smart Shift"),
_("Automatically switch the mouse wheel between ratchet and freespin mode.\n"
"The mouse wheel is always free at 0, and always locked at 50"))
_REPROGRAMMABLE_KEYS = ('reprogrammable-keys', _("Actions"), _("Change the action for the key"))
#
#
#
@ -306,6 +342,42 @@ def _feature_pointer_speed():
bytes_count=2,
label=_POINTER_SPEED[1], description=_POINTER_SPEED[2],
device_kind=(_DK.mouse, _DK.trackball))
# the keys for the choice map are Logitech controls (from special_keys)
# each choice value is a NamedInt with the string from a task (to be shown to the user)
# and the integer being the control number for that task (to be written to the device)
def _feature_reprogrammable_keys_choices(device):
count = device.feature_request(_F.REPROG_CONTROLS_V4)
assert count, 'Oops, reprogrammable key count cannot be retrieved!'
count = ord(count[:1]) # the number of key records
keys = [None] * count
groups = [ [] for i in range(0,9) ]
choices = {}
for i in range(0,count): # get the data for each key record on device
keydata = device.feature_request(_F.REPROG_CONTROLS_V4, 0x10, i)
key, key_task, flags, pos, group, gmask = _unpack('!HHBBBB', keydata[:8])
action =_NamedInt(key, str(_special_keys.TASK[key_task]))
keys[i] = ( _special_keys.CONTROL[key], action, flags, gmask )
groups[group].append(action)
for k in keys:
if k[2] & _special_keys.KEY_FLAG.reprogrammable:
key_choices = [ k[1] ] # it should always be possible to map the key to itself
for g in range(1,9):
if k[3] & 2**(g-1):
for gm in groups[g]:
if int(gm) != int(k[0]): # don't put itself in twice
key_choices.append(gm)
choices[k[0]] = key_choices
return choices
def _feature_reprogrammable_keys():
return feature_map_choices_dynamic(_REPROGRAMMABLE_KEYS[0], _F.REPROG_CONTROLS_V4,
_feature_reprogrammable_keys_choices,
read_function_id=0x20, write_function_id=0x30,
key_bytes_count=2, skip_bytes_count=1, value_bytes_count=2,
label=_REPROGRAMMABLE_KEYS[1], description=_REPROGRAMMABLE_KEYS[2],
device_kind=(_DK.keyboard,), extra_default=0)
#
#
#
@ -327,6 +399,7 @@ _SETTINGS_LIST = namedtuple('_SETTINGS_LIST', [
'backlight',
'typing_illumination',
'smart_shift',
'reprogrammable_keys',
])
del namedtuple
@ -346,6 +419,7 @@ RegisterSettings = _SETTINGS_LIST(
backlight=None,
typing_illumination=None,
smart_shift=None,
reprogrammable_keys=None,
)
FeatureSettings = _SETTINGS_LIST(
fn_swap=_feature_fn_swap,
@ -363,6 +437,7 @@ FeatureSettings = _SETTINGS_LIST(
backlight=_feature_backlight2,
typing_illumination=None,
smart_shift=_feature_smart_shift,
reprogrammable_keys=_feature_reprogrammable_keys,
)
del _SETTINGS_LIST
@ -400,7 +475,8 @@ def check_feature_settings(device, already_known):
detected = feature(device)
if _log.isEnabledFor(_DEBUG):
_log.debug("check_feature[%s] detected %s", featureId, detected)
already_known.append(detected)
if detected:
already_known.append(detected)
except Exception as reason:
_log.error("check_feature[%s] inconsistent feature %s", featureId, reason)
@ -415,4 +491,5 @@ def check_feature_settings(device, already_known):
check_feature(_POINTER_SPEED[0], _F.POINTER_SPEED)
check_feature(_SMART_SHIFT[0], _F.SMART_SHIFT)
check_feature(_BACKLIGHT[0], _F.BACKLIGHT2)
check_feature(_REPROGRAMMABLE_KEYS[0], _F.REPROG_CONTROLS_V4)
return True

View File

@ -119,7 +119,7 @@ def run(receivers, args, find_receiver, find_device):
raise Exception("can't interpret '%s' as integer" % args.value)
else:
raise NotImplemented
raise Exception("NotImplemented")
result = setting.write(value)
if result is None: