Solaar/lib/logitech_receiver/settings.py

1572 lines
62 KiB
Python

## Copyright (C) 2012-2013 Daniel Pavel
##
## This program is free software; you can redistribute it and/or modify
## it under the terms of the GNU General Public License as published by
## the Free Software Foundation; either version 2 of the License, or
## (at your option) any later version.
##
## This program is distributed in the hope that it will be useful,
## but WITHOUT ANY WARRANTY; without even the implied warranty of
## MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
## GNU General Public License for more details.
##
## You should have received a copy of the GNU General Public License along
## with this program; if not, write to the Free Software Foundation, Inc.,
## 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
import logging
import math
from struct import unpack as _unpack
from time import sleep as _sleep
from . import hidpp20_constants as _hidpp20_constants
from .common import NamedInt as _NamedInt
from .common import NamedInts as _NamedInts
from .common import bytes2int as _bytes2int
from .common import int2bytes as _int2bytes
from .i18n import _
logger = logging.getLogger(__name__)
#
#
#
SENSITIVITY_IGNORE = "ignore"
KIND = _NamedInts(
toggle=0x01,
choice=0x02,
range=0x04,
map_choice=0x0A,
multiple_toggle=0x10,
packed_range=0x20,
multiple_range=0x40,
hetero=0x80,
)
def bool_or_toggle(current, new):
if isinstance(new, bool):
return new
try:
return bool(int(new))
except (TypeError, ValueError):
new = str(new).lower()
if new in ("true", "yes", "on", "t", "y"):
return True
if new in ("false", "no", "off", "f", "n"):
return False
if new in ("~", "toggle"):
return not current
return None
class Setting:
"""A setting descriptor. Needs to be instantiated for each specific device."""
name = label = description = ""
feature = register = kind = None
min_version = 0
persist = True
rw_options = {}
validator_class = None
validator_options = {}
def __init__(self, device, rw, validator):
self._device = device
self._rw = rw
self._validator = validator
self.kind = getattr(self._validator, "kind", None)
self._value = None
@classmethod
def build(cls, device):
assert cls.feature or cls.register, "Settings require either a feature or a register"
rw_class = cls.rw_class if hasattr(cls, "rw_class") else FeatureRW if cls.feature else RegisterRW
rw = rw_class(cls.feature if cls.feature else cls.register, **cls.rw_options)
p = device.protocol
if p == 1.0: # HID++ 1.0 devices do not support features
assert rw.kind == RegisterRW.kind
elif p >= 2.0: # HID++ 2.0 devices do not support registers
assert rw.kind == FeatureRW.kind
validator_class = cls.validator_class
validator = validator_class.build(cls, device, **cls.validator_options)
if validator:
assert cls.kind is None or cls.kind & validator.kind != 0
return cls(device, rw, validator)
def val_to_string(self, value):
return self._validator.to_string(value)
@property
def choices(self):
assert hasattr(self, "_value")
assert hasattr(self, "_device")
return self._validator.choices if self._validator and self._validator.kind & KIND.choice else None
@property
def range(self):
assert hasattr(self, "_value")
assert hasattr(self, "_device")
if self._validator.kind == KIND.range:
return (self._validator.min_value, self._validator.max_value)
def _pre_read(self, cached, key=None):
if self.persist and self._value is None and getattr(self._device, "persister", None):
# We haven't read a value from the device yet,
# maybe we have something in the configuration.
self._value = self._device.persister.get(self.name)
if cached and self._value is not None:
if getattr(self._device, "persister", None) and self.name not in self._device.persister:
# If this is a new device (or a new setting for an old device),
# make sure to save its current value for the next time.
self._device.persister[self.name] = self._value if self.persist else None
def read(self, cached=True):
assert hasattr(self, "_value")
assert hasattr(self, "_device")
self._pre_read(cached)
if cached and self._value is not None:
if logger.isEnabledFor(logging.DEBUG):
logger.debug("%s: cached value %r on %s", self.name, self._value, self._device)
return self._value
if self._device.online:
reply = self._rw.read(self._device)
if reply:
self._value = self._validator.validate_read(reply)
if self._value is not None and self._device.persister and self.name not in self._device.persister:
# Don't update the persister if it already has a value,
# otherwise the first read might overwrite the value we wanted.
self._device.persister[self.name] = self._value if self.persist else None
if logger.isEnabledFor(logging.DEBUG):
logger.debug("%s: read value %r on %s", self.name, self._value, self._device)
return self._value
def _pre_write(self, save=True):
# Remember the value we're trying to set, even if the write fails.
# This way even if the device is offline or some other error occurs,
# the last value we've tried to write is remembered in the configuration.
if self._device.persister and save:
self._device.persister[self.name] = self._value if self.persist else None
def update(self, value, save=True):
self._value = value
self._pre_write(save)
def write(self, value, save=True):
assert hasattr(self, "_value")
assert hasattr(self, "_device")
assert value is not None
if logger.isEnabledFor(logging.DEBUG):
logger.debug("%s: write %r to %s", self.name, value, self._device)
if self._device.online:
if self._value != value:
self.update(value, save)
current_value = None
if self._validator.needs_current_value:
# the _validator needs the current value, possibly to merge flag values
current_value = self._rw.read(self._device)
if logger.isEnabledFor(logging.DEBUG):
logger.debug("%s: current value %r on %s", self.name, current_value, self._device)
data_bytes = self._validator.prepare_write(value, current_value)
if data_bytes is not None:
if logger.isEnabledFor(logging.DEBUG):
logger.debug("%s: prepare write(%s) => %r", self.name, value, data_bytes)
reply = self._rw.write(self._device, data_bytes)
if not reply:
# tell whomever is calling that the write failed
return None
return value
def acceptable(self, args, current):
return self._validator.acceptable(args, current) if self._validator else None
def compare(self, args, current):
return self._validator.compare(args, current) if self._validator else None
def apply(self):
assert hasattr(self, "_value")
assert hasattr(self, "_device")
if logger.isEnabledFor(logging.DEBUG):
logger.debug("%s: apply (%s)", self.name, self._device)
value = self.read(self.persist) # Don't use persisted value if setting doesn't persist
if self.persist and value is not None: # If setting doesn't persist no need to write value just read
try:
self.write(value, save=False)
except Exception as e:
if logger.isEnabledFor(logging.WARNING):
logger.warning(
"%s: error applying value %s so ignore it (%s): %s", self.name, self._value, self._device, repr(e)
)
def __str__(self):
if hasattr(self, "_value"):
assert hasattr(self, "_device")
return "<Setting([%s:%s] %s:%s=%s)>" % (
self._rw.kind,
self._validator.kind if self._validator else None,
self._device.codename,
self.name,
self._value,
)
return f"<Setting([{self._rw.kind}:{self._validator.kind if self._validator else None}] {self.name})>"
__repr__ = __str__
class Settings(Setting):
"""A setting descriptor for multiple choices, being a map from keys to values.
Needs to be instantiated for each specific device."""
def read(self, cached=True):
assert hasattr(self, "_value")
assert hasattr(self, "_device")
if logger.isEnabledFor(logging.DEBUG):
logger.debug("%s: settings read %r from %s", self.name, self._value, self._device)
self._pre_read(cached)
if cached and self._value is not None:
return self._value
if self._device.online:
reply_map = {}
for key in self._validator.choices:
reply = self._rw.read(self._device, key)
if reply:
reply_map[int(key)] = self._validator.validate_read(reply, key)
self._value = reply_map
if getattr(self._device, "persister", None) and self.name not in self._device.persister:
# Don't update the persister if it already has a value,
# otherwise the first read might overwrite the value we wanted.
self._device.persister[self.name] = self._value if self.persist else None
return self._value
def read_key(self, key, cached=True):
assert hasattr(self, "_value")
assert hasattr(self, "_device")
assert key is not None
if logger.isEnabledFor(logging.DEBUG):
logger.debug("%s: settings read %r key %r from %s", self.name, self._value, key, self._device)
self._pre_read(cached)
if cached and self._value is not None:
return self._value[int(key)]
if self._device.online:
reply = self._rw.read(self._device, key)
if reply:
self._value[int(key)] = self._validator.validate_read(reply, key)
if getattr(self._device, "persister", None) and self.name not in self._device.persister:
self._device.persister[self.name] = self._value if self.persist else None
return self._value[int(key)]
def write(self, map, save=True):
assert hasattr(self, "_value")
assert hasattr(self, "_device")
assert map is not None
if logger.isEnabledFor(logging.DEBUG):
logger.debug("%s: settings write %r to %s", self.name, map, self._device)
if self._device.online:
self.update(map, save)
for key, value in map.items():
data_bytes = self._validator.prepare_write(int(key), value)
if data_bytes is not None:
if logger.isEnabledFor(logging.DEBUG):
logger.debug("%s: settings prepare map write(%s,%s) => %r", self.name, key, value, data_bytes)
reply = self._rw.write(self._device, int(key), data_bytes)
if not reply:
return None
return map
def update_key_value(self, key, value, save=True):
self._value[int(key)] = value
self._pre_write(save)
def write_key_value(self, key, value, save=True):
assert hasattr(self, "_value")
assert hasattr(self, "_device")
assert key is not None
assert value is not None
if logger.isEnabledFor(logging.DEBUG):
logger.debug("%s: settings write key %r value %r to %s", self.name, key, value, self._device)
if self._device.online:
if not self._value:
self.read()
try:
data_bytes = self._validator.prepare_write(int(key), value)
# always need to write to configuration because dictionary is shared and could have changed
self.update_key_value(key, value, save)
except ValueError:
data_bytes = value = None
if data_bytes is not None:
if logger.isEnabledFor(logging.DEBUG):
logger.debug("%s: settings prepare key value write(%s,%s) => %r", self.name, key, value, data_bytes)
reply = self._rw.write(self._device, int(key), data_bytes)
if not reply:
return None
return value
class LongSettings(Setting):
"""A setting descriptor for multiple choices, being a map from keys to values.
Allows multiple write requests, if the options don't fit in 16 bytes.
The validator must return a list.
Needs to be instantiated for each specific device."""
def read(self, cached=True):
assert hasattr(self, "_value")
assert hasattr(self, "_device")
if logger.isEnabledFor(logging.DEBUG):
logger.debug("%s: settings read %r from %s", self.name, self._value, self._device)
self._pre_read(cached)
if cached and self._value is not None:
return self._value
if self._device.online:
reply_map = {}
# Reading one item at a time. This can probably be optimised
for item in self._validator.items:
r = self._validator.prepare_read_item(item)
reply = self._rw.read(self._device, r)
if reply:
reply_map[int(item)] = self._validator.validate_read_item(reply, item)
self._value = reply_map
if getattr(self._device, "persister", None) and self.name not in self._device.persister:
# Don't update the persister if it already has a value,
# otherwise the first read might overwrite the value we wanted.
self._device.persister[self.name] = self._value if self.persist else None
return self._value
def read_item(self, item, cached=True):
assert hasattr(self, "_value")
assert hasattr(self, "_device")
assert item is not None
if logger.isEnabledFor(logging.DEBUG):
logger.debug("%s: settings read %r item %r from %s", self.name, self._value, item, self._device)
self._pre_read(cached)
if cached and self._value is not None:
return self._value[int(item)]
if self._device.online:
r = self._validator.prepare_read_item(item)
reply = self._rw.read(self._device, r)
if reply:
self._value[int(item)] = self._validator.validate_read_item(reply, item)
if getattr(self._device, "persister", None) and self.name not in self._device.persister:
self._device.persister[self.name] = self._value if self.persist else None
return self._value[int(item)]
def write(self, map, save=True):
assert hasattr(self, "_value")
assert hasattr(self, "_device")
assert map is not None
if logger.isEnabledFor(logging.DEBUG):
logger.debug("%s: long settings write %r to %s", self.name, map, self._device)
if self._device.online:
self.update(map, save)
for item, value in map.items():
data_bytes_list = self._validator.prepare_write(self._value)
if data_bytes_list is not None:
for data_bytes in data_bytes_list:
if data_bytes is not None:
if logger.isEnabledFor(logging.DEBUG):
logger.debug("%s: settings prepare map write(%s,%s) => %r", self.name, item, value, data_bytes)
reply = self._rw.write(self._device, data_bytes)
if not reply:
return None
return map
def update_key_value(self, key, value, save=True):
self._value[int(key)] = value
self._pre_write(save)
def write_key_value(self, item, value, save=True):
assert hasattr(self, "_value")
assert hasattr(self, "_device")
assert item is not None
assert value is not None
if logger.isEnabledFor(logging.DEBUG):
logger.debug("%s: long settings write item %r value %r to %s", self.name, item, value, self._device)
if self._device.online:
if not self._value:
self.read()
data_bytes = self._validator.prepare_write_item(item, value)
self.update_key_value(item, value, save)
if data_bytes is not None:
if logger.isEnabledFor(logging.DEBUG):
logger.debug("%s: settings prepare item value write(%s,%s) => %r", self.name, item, value, data_bytes)
reply = self._rw.write(self._device, data_bytes)
if not reply:
return None
return value
class BitFieldSetting(Setting):
"""A setting descriptor for a set of choices represented by one bit each, being a map from options to booleans.
Needs to be instantiated for each specific device."""
def read(self, cached=True):
assert hasattr(self, "_value")
assert hasattr(self, "_device")
if logger.isEnabledFor(logging.DEBUG):
logger.debug("%s: settings read %r from %s", self.name, self._value, self._device)
self._pre_read(cached)
if cached and self._value is not None:
return self._value
if self._device.online:
reply_map = {}
reply = self._do_read()
if reply:
reply_map = self._validator.validate_read(reply)
self._value = reply_map
if getattr(self._device, "persister", None) and self.name not in self._device.persister:
# Don't update the persister if it already has a value,
# otherwise the first read might overwrite the value we wanted.
self._device.persister[self.name] = self._value if self.persist else None
return self._value
def _do_read(self):
return self._rw.read(self._device)
def read_key(self, key, cached=True):
assert hasattr(self, "_value")
assert hasattr(self, "_device")
assert key is not None
if logger.isEnabledFor(logging.DEBUG):
logger.debug("%s: settings read %r key %r from %s", self.name, self._value, key, self._device)
self._pre_read(cached)
if cached and self._value is not None:
return self._value[int(key)]
if self._device.online:
reply = self._do_read_key(key)
if reply:
self._value = self._validator.validate_read(reply)
if getattr(self._device, "persister", None) and self.name not in self._device.persister:
self._device.persister[self.name] = self._value if self.persist else None
return self._value[int(key)]
def _do_read_key(self, key):
return self._rw.read(self._device, key)
def write(self, map, save=True):
assert hasattr(self, "_value")
assert hasattr(self, "_device")
assert map is not None
if logger.isEnabledFor(logging.DEBUG):
logger.debug("%s: bit field settings write %r to %s", self.name, map, self._device)
if self._device.online:
self.update(map, save)
data_bytes = self._validator.prepare_write(self._value)
if data_bytes is not None:
if logger.isEnabledFor(logging.DEBUG):
logger.debug("%s: settings prepare map write(%s) => %r", self.name, self._value, data_bytes)
# if prepare_write returns a list, write one item at a time
seq = data_bytes if isinstance(data_bytes, list) else [data_bytes]
for b in seq:
reply = self._rw.write(self._device, b)
if not reply:
return None
return map
def update_key_value(self, key, value, save=True):
self._value[int(key)] = value
self._pre_write(save)
def write_key_value(self, key, value, save=True):
assert hasattr(self, "_value")
assert hasattr(self, "_device")
assert key is not None
assert value is not None
if logger.isEnabledFor(logging.DEBUG):
logger.debug("%s: bit field settings write key %r value %r to %s", self.name, key, value, self._device)
if self._device.online:
if not self._value:
self.read()
value = bool(value)
self.update_key_value(key, value, save)
data_bytes = self._validator.prepare_write(self._value)
if data_bytes is not None:
if logger.isEnabledFor(logging.DEBUG):
logger.debug("%s: settings prepare key value write(%s,%s) => %r", self.name, key, str(value), data_bytes)
# if prepare_write returns a list, write one item at a time
seq = data_bytes if isinstance(data_bytes, list) else [data_bytes]
for b in seq:
reply = self._rw.write(self._device, b)
if not reply:
return None
return value
class BitFieldWithOffsetAndMaskSetting(BitFieldSetting):
"""A setting descriptor for a set of choices represented by one bit each,
each one having an offset, being a map from options to booleans.
Needs to be instantiated for each specific device."""
def _do_read(self):
return {r: self._rw.read(self._device, r) for r in self._validator.prepare_read()}
def _do_read_key(self, key):
r = self._validator.prepare_read_key(key)
return {r: self._rw.read(self._device, r)}
class RangeFieldSetting(Setting):
"""A setting descriptor for a set of choices represented by one field each, with map from option names to range(0,n).
Needs to be instantiated for each specific device."""
def read(self, cached=True):
assert hasattr(self, "_value")
assert hasattr(self, "_device")
if logger.isEnabledFor(logging.DEBUG):
logger.debug("%s: settings read %r from %s", self.name, self._value, self._device)
self._pre_read(cached)
if cached and self._value is not None:
return self._value
if self._device.online:
reply_map = {}
reply = self._do_read()
if reply:
reply_map = self._validator.validate_read(reply)
self._value = reply_map
if getattr(self._device, "persister", None) and self.name not in self._device.persister:
# Don't update the persister if it already has a value,
# otherwise the first read might overwrite the value we wanted.
self._device.persister[self.name] = self._value if self.persist else None
return self._value
def _do_read(self):
return self._rw.read(self._device)
def read_key(self, key, cached=True):
return self.read(cached)[int(key)]
def write(self, map, save=True):
assert hasattr(self, "_value")
assert hasattr(self, "_device")
assert map is not None
if logger.isEnabledFor(logging.DEBUG):
logger.debug("%s: range field setting write %r to %s", self.name, map, self._device)
if self._device.online:
self.update(map, save)
data_bytes = self._validator.prepare_write(self._value)
if data_bytes is not None:
if logger.isEnabledFor(logging.DEBUG):
logger.debug("%s: range field setting prepare map write(%s) => %r", self.name, self._value, data_bytes)
reply = self._rw.write(self._device, data_bytes)
if not reply:
return None
elif logger.isEnabledFor(logging.WARNING):
logger.warning("%s: range field setting no data to write", self.name)
return map
def write_key_value(self, key, value, save=True):
assert key is not None
assert value is not None
if logger.isEnabledFor(logging.DEBUG):
logger.debug("%s: range field setting write key %r value %r to %s", self.name, key, value, self._device)
if self._device.online:
if not self._value:
self.read()
map = self._value
map[int(key)] = value
self.write(map, save)
return value
#
# read/write low-level operators
#
class RegisterRW:
__slots__ = ("register",)
kind = _NamedInt(0x01, _("register"))
def __init__(self, register):
assert isinstance(register, int)
self.register = register
def read(self, device):
return device.read_register(self.register)
def write(self, device, data_bytes):
return device.write_register(self.register, data_bytes)
class FeatureRW:
kind = _NamedInt(0x02, _("feature"))
default_read_fnid = 0x00
default_write_fnid = 0x10
def __init__(self, feature, read_fnid=0x00, write_fnid=0x10, prefix=b"", suffix=b"", read_prefix=b"", no_reply=False):
assert isinstance(feature, _NamedInt)
self.feature = feature
self.read_fnid = read_fnid
self.write_fnid = write_fnid
self.no_reply = no_reply
self.prefix = prefix
self.suffix = suffix
self.read_prefix = read_prefix
def read(self, device, data_bytes=b""):
assert self.feature is not None
return device.feature_request(self.feature, self.read_fnid, self.prefix, self.read_prefix, data_bytes)
def write(self, device, data_bytes):
assert self.feature is not None
reply = device.feature_request(
self.feature, self.write_fnid, self.prefix, data_bytes, self.suffix, no_reply=self.no_reply
)
return reply if not self.no_reply else True
class FeatureRWMap(FeatureRW):
kind = _NamedInt(0x02, _("feature"))
default_read_fnid = 0x00
default_write_fnid = 0x10
default_key_byte_count = 1
def __init__(
self,
feature,
read_fnid=default_read_fnid,
write_fnid=default_write_fnid,
key_byte_count=default_key_byte_count,
no_reply=False,
):
assert isinstance(feature, _NamedInt)
self.feature = feature
self.read_fnid = read_fnid
self.write_fnid = write_fnid
self.key_byte_count = key_byte_count
self.no_reply = no_reply
def read(self, device, key):
assert self.feature is not None
key_bytes = _int2bytes(key, self.key_byte_count)
return device.feature_request(self.feature, self.read_fnid, key_bytes)
def write(self, device, key, data_bytes):
assert self.feature is not None
key_bytes = _int2bytes(key, self.key_byte_count)
reply = device.feature_request(self.feature, self.write_fnid, key_bytes, data_bytes, no_reply=self.no_reply)
return reply if not self.no_reply else True
class Validator:
@classmethod
def build(cls, setting_class, device, **kwargs):
return cls(**kwargs)
@classmethod
def to_string(cls, value):
return str(value)
def compare(self, args, current):
if len(args) != 1:
return False
return args[0] == current
class BooleanValidator(Validator):
__slots__ = ("true_value", "false_value", "read_skip_byte_count", "write_prefix_bytes", "mask", "needs_current_value")
kind = KIND.toggle
default_true = 0x01
default_false = 0x00
# mask specifies all the affected bits in the value
default_mask = 0xFF
def __init__(
self,
true_value=default_true,
false_value=default_false,
mask=default_mask,
read_skip_byte_count=0,
write_prefix_bytes=b"",
):
if isinstance(true_value, int):
assert isinstance(false_value, int)
if mask is None:
mask = self.default_mask
else:
assert isinstance(mask, int)
assert true_value & false_value == 0
assert true_value & mask == true_value
assert false_value & mask == false_value
self.needs_current_value = mask != self.default_mask
elif isinstance(true_value, bytes):
if false_value is None or false_value == self.default_false:
false_value = b"\x00" * len(true_value)
else:
assert isinstance(false_value, bytes)
if mask is None or mask == self.default_mask:
mask = b"\xFF" * len(true_value)
else:
assert isinstance(mask, bytes)
assert len(mask) == len(true_value) == len(false_value)
tv = _bytes2int(true_value)
fv = _bytes2int(false_value)
mv = _bytes2int(mask)
assert tv != fv # true and false might be something other than bit values
assert tv & mv == tv
assert fv & mv == fv
self.needs_current_value = any(m != 0xFF for m in mask)
else:
raise Exception(f"invalid mask '{mask!r}', type {type(mask)}")
self.true_value = true_value
self.false_value = false_value
self.mask = mask
self.read_skip_byte_count = read_skip_byte_count
self.write_prefix_bytes = write_prefix_bytes
def validate_read(self, reply_bytes):
reply_bytes = reply_bytes[self.read_skip_byte_count :]
if isinstance(self.mask, int):
reply_value = ord(reply_bytes[:1]) & self.mask
if logger.isEnabledFor(logging.DEBUG):
logger.debug("BooleanValidator: validate read %r => %02X", reply_bytes, reply_value)
if reply_value == self.true_value:
return True
if reply_value == self.false_value:
return False
logger.warning(
"BooleanValidator: reply %02X mismatched %02X/%02X/%02X",
reply_value,
self.true_value,
self.false_value,
self.mask,
)
return False
count = len(self.mask)
mask = _bytes2int(self.mask)
reply_value = _bytes2int(reply_bytes[:count]) & mask
true_value = _bytes2int(self.true_value)
if reply_value == true_value:
return True
false_value = _bytes2int(self.false_value)
if reply_value == false_value:
return False
logger.warning(
"BooleanValidator: reply %r mismatched %r/%r/%r", reply_bytes, self.true_value, self.false_value, self.mask
)
return False
def prepare_write(self, new_value, current_value=None):
if new_value is None:
new_value = False
else:
assert isinstance(new_value, bool), f"New value {new_value} for boolean setting is not a boolean"
to_write = self.true_value if new_value else self.false_value
if isinstance(self.mask, int):
if current_value is not None and self.needs_current_value:
to_write |= ord(current_value[:1]) & (0xFF ^ self.mask)
if current_value is not None and to_write == ord(current_value[:1]):
return None
to_write = bytes([to_write])
else:
to_write = bytearray(to_write)
count = len(self.mask)
for i in range(0, count):
b = ord(to_write[i : i + 1])
m = ord(self.mask[i : i + 1])
assert b & m == b
# b &= m
if current_value is not None and self.needs_current_value:
b |= ord(current_value[i : i + 1]) & (0xFF ^ m)
to_write[i] = b
to_write = bytes(to_write)
if current_value is not None and to_write == current_value[: len(to_write)]:
return None
if logger.isEnabledFor(logging.DEBUG):
logger.debug("BooleanValidator: prepare_write(%s, %s) => %r", new_value, current_value, to_write)
return self.write_prefix_bytes + to_write
def acceptable(self, args, current):
if len(args) != 1:
return None
val = bool_or_toggle(current, args[0])
return [val] if val is not None else None
class BitFieldValidator(Validator):
__slots__ = ("byte_count", "options")
kind = KIND.multiple_toggle
def __init__(self, options, byte_count=None):
assert isinstance(options, list)
self.options = options
self.byte_count = (max(x.bit_length() for x in options) + 7) // 8
if byte_count:
assert isinstance(byte_count, int) and byte_count >= self.byte_count
self.byte_count = byte_count
def to_string(self, value):
def element_to_string(key, val):
k = next((k for k in self.options if int(key) == k), None)
return str(k) + ":" + str(val) if k is not None else "?"
return "{" + ", ".join([element_to_string(k, value[k]) for k in value]) + "}"
def validate_read(self, reply_bytes):
r = _bytes2int(reply_bytes[: self.byte_count])
value = {int(k): False for k in self.options}
m = 1
for _ignore in range(8 * self.byte_count):
if m in self.options:
value[int(m)] = bool(r & m)
m <<= 1
return value
def prepare_write(self, new_value):
assert isinstance(new_value, dict)
w = 0
for k, v in new_value.items():
if v:
w |= int(k)
return _int2bytes(w, self.byte_count)
def get_options(self):
return self.options
def acceptable(self, args, current):
if len(args) != 2:
return None
key = next((key for key in self.options if key == args[0]), None)
if key is None:
return None
val = bool_or_toggle(current[int(key)], args[1])
return None if val is None else [int(key), val]
def compare(self, args, current):
if len(args) != 2:
return False
key = next((key for key in self.options if key == args[0]), None)
if key is None:
return False
return args[1] == current[int(key)]
class BitFieldWithOffsetAndMaskValidator(Validator):
__slots__ = ("byte_count", "options", "_option_from_key", "_mask_from_offset", "_option_from_offset_mask")
kind = KIND.multiple_toggle
sep = 0x01
def __init__(self, options, om_method=None, byte_count=None):
assert isinstance(options, list)
# each element of options is an instance of a class
# that has an id (which is used as an index in other dictionaries)
# and where om_method is a method that returns a byte offset and byte mask
# that says how to access and modify the bit toggle for the option
self.options = options
self.om_method = om_method
# to retrieve the options efficiently:
self._option_from_key = {}
self._mask_from_offset = {}
self._option_from_offset_mask = {}
for opt in options:
offset, mask = om_method(opt)
self._option_from_key[int(opt)] = opt
try:
self._mask_from_offset[offset] |= mask
except KeyError:
self._mask_from_offset[offset] = mask
try:
mask_to_opt = self._option_from_offset_mask[offset]
except KeyError:
mask_to_opt = {}
self._option_from_offset_mask[offset] = mask_to_opt
mask_to_opt[mask] = opt
self.byte_count = (max(om_method(x)[1].bit_length() for x in options) + 7) // 8 # is this correct??
if byte_count:
assert isinstance(byte_count, int) and byte_count >= self.byte_count
self.byte_count = byte_count
def prepare_read(self):
r = []
for offset, mask in self._mask_from_offset.items():
b = offset << (8 * (self.byte_count + 1))
b |= (self.sep << (8 * self.byte_count)) | mask
r.append(_int2bytes(b, self.byte_count + 2))
return r
def prepare_read_key(self, key):
option = self._option_from_key.get(key, None)
if option is None:
return None
offset, mask = option.om_method(option)
b = offset << (8 * (self.byte_count + 1))
b |= (self.sep << (8 * self.byte_count)) | mask
return _int2bytes(b, self.byte_count + 2)
def validate_read(self, reply_bytes_dict):
values = {int(k): False for k in self.options}
for query, b in reply_bytes_dict.items():
offset = _bytes2int(query[0:1])
b += (self.byte_count - len(b)) * b"\x00"
value = _bytes2int(b[: self.byte_count])
mask_to_opt = self._option_from_offset_mask.get(offset, {})
m = 1
for _ignore in range(8 * self.byte_count):
if m in mask_to_opt:
values[int(mask_to_opt[m])] = bool(value & m)
m <<= 1
return values
def prepare_write(self, new_value):
assert isinstance(new_value, dict)
w = {}
for k, v in new_value.items():
option = self._option_from_key[int(k)]
offset, mask = self.om_method(option)
if offset not in w:
w[offset] = 0
if v:
w[offset] |= mask
return [
_int2bytes(
(offset << (8 * (2 * self.byte_count + 1)))
| (self.sep << (16 * self.byte_count))
| (self._mask_from_offset[offset] << (8 * self.byte_count))
| value,
2 * self.byte_count + 2,
)
for offset, value in w.items()
]
def get_options(self):
return [int(opt) if isinstance(opt, int) else opt.as_int() for opt in self.options]
def acceptable(self, args, current):
if len(args) != 2:
return None
key = next((option.id for option in self.options if option.as_int() == args[0]), None)
if key is None:
return None
val = bool_or_toggle(current[int(key)], args[1])
return None if val is None else [int(key), val]
def compare(self, args, current):
if len(args) != 2:
return False
key = next((option.id for option in self.options if option.as_int() == args[0]), None)
if key is None:
return False
return args[1] == current[int(key)]
class ChoicesValidator(Validator):
"""Translates between NamedInts and a byte sequence.
:param choices: a list of NamedInts
:param byte_count: the size of the derived byte sequence. If None, it
will be calculated from the choices."""
kind = KIND.choice
def __init__(self, choices=None, byte_count=None, read_skip_byte_count=0, write_prefix_bytes=b""):
assert choices is not None
assert isinstance(choices, _NamedInts)
assert len(choices) > 1
self.choices = choices
self.needs_current_value = False
max_bits = max(x.bit_length() for x in choices)
self._byte_count = (max_bits // 8) + (1 if max_bits % 8 else 0)
if byte_count:
assert self._byte_count <= byte_count
self._byte_count = byte_count
assert self._byte_count < 8
self._read_skip_byte_count = read_skip_byte_count
self._write_prefix_bytes = write_prefix_bytes if write_prefix_bytes else b""
assert self._byte_count + self._read_skip_byte_count <= 14
assert self._byte_count + len(self._write_prefix_bytes) <= 14
def to_string(self, value):
return str(self.choices[value]) if isinstance(value, int) else str(value)
def validate_read(self, reply_bytes):
reply_value = _bytes2int(reply_bytes[self._read_skip_byte_count : self._read_skip_byte_count + self._byte_count])
valid_value = self.choices[reply_value]
assert valid_value is not None, f"{self.__class__.__name__}: failed to validate read value {reply_value:02X}"
return valid_value
def prepare_write(self, new_value, current_value=None):
if new_value is None:
value = self.choices[:][0]
else:
value = self.choice(new_value)
if value is None:
raise ValueError(f"invalid choice {new_value!r}")
assert isinstance(value, _NamedInt)
return self._write_prefix_bytes + value.bytes(self._byte_count)
def choice(self, value):
if isinstance(value, int):
return self.choices[value]
try:
int(value)
if int(value) in self.choices:
return self.choices[int(value)]
except Exception:
pass
if value in self.choices:
return self.choices[value]
else:
return None
def acceptable(self, args, current):
choice = self.choice(args[0]) if len(args) == 1 else None
return None if choice is None else [choice]
class ChoicesMapValidator(ChoicesValidator):
kind = KIND.map_choice
def __init__(
self,
choices_map,
key_byte_count=0,
key_postfix_bytes=b"",
byte_count=0,
read_skip_byte_count=0,
write_prefix_bytes=b"",
extra_default=None,
mask=-1,
activate=0,
):
assert choices_map is not None
assert isinstance(choices_map, dict)
max_key_bits = 0
max_value_bits = 0
for key, choices in choices_map.items():
assert isinstance(key, _NamedInt)
assert isinstance(choices, _NamedInts)
max_key_bits = max(max_key_bits, key.bit_length())
for key_value in choices:
assert isinstance(key_value, _NamedInt)
max_value_bits = max(max_value_bits, key_value.bit_length())
self._key_byte_count = (max_key_bits + 7) // 8
if key_byte_count:
assert self._key_byte_count <= key_byte_count
self._key_byte_count = key_byte_count
self._byte_count = (max_value_bits + 7) // 8
if byte_count:
assert self._byte_count <= byte_count
self._byte_count = byte_count
self.choices = choices_map
self.needs_current_value = False
self.extra_default = extra_default
self._key_postfix_bytes = key_postfix_bytes
self._read_skip_byte_count = read_skip_byte_count if read_skip_byte_count else 0
self._write_prefix_bytes = write_prefix_bytes if write_prefix_bytes else b""
self.activate = activate
self.mask = mask
assert self._byte_count + self._read_skip_byte_count + self._key_byte_count <= 14
assert self._byte_count + len(self._write_prefix_bytes) + self._key_byte_count <= 14
def to_string(self, value):
def element_to_string(key, val):
k, c = next(((k, c) for k, c in self.choices.items() if int(key) == k), (None, None))
return str(k) + ":" + str(c[val]) if k is not None else "?"
return "{" + ", ".join([element_to_string(k, value[k]) for k in sorted(value)]) + "}"
def validate_read(self, reply_bytes, key):
start = self._key_byte_count + self._read_skip_byte_count
end = start + self._byte_count
reply_value = _bytes2int(reply_bytes[start:end]) & self.mask
# reprogrammable keys starts out as 0, which is not a choice, so don't use assert here
if self.extra_default is not None and self.extra_default == reply_value:
return int(self.choices[key][0])
if reply_value not in self.choices[key]:
assert reply_value in self.choices[key], "%s: failed to validate read value %02X" % (
self.__class__.__name__,
reply_value,
)
return reply_value
def prepare_key(self, key):
return key.to_bytes(self._key_byte_count, "big") + self._key_postfix_bytes
def prepare_write(self, key, new_value):
choices = self.choices.get(key)
if choices is None or (new_value not in choices and new_value != self.extra_default):
logger.error("invalid choice %r for %s", new_value, key)
return None
new_value = new_value | self.activate
return self._write_prefix_bytes + new_value.to_bytes(self._byte_count, "big")
def acceptable(self, args, current):
if len(args) != 2:
return None
key, choices = next(((key, item) for key, item in self.choices.items() if key == args[0]), (None, None))
if choices is None or args[1] not in choices:
return None
choice = next((item for item in choices if item == args[1]), None)
return [int(key), int(choice)] if choice is not None else None
def compare(self, args, current):
if len(args) != 2:
return False
key = next((key for key in self.choices if key == int(args[0])), None)
if key is None:
return False
return args[1] == current[int(key)]
class RangeValidator(Validator):
kind = KIND.range
"""Translates between integers and a byte sequence.
:param min_value: minimum accepted value (inclusive)
:param max_value: maximum accepted value (inclusive)
:param byte_count: the size of the derived byte sequence. If None, it
will be calculated from the range."""
min_value = 0
max_value = 255
@classmethod
def build(cls, setting_class, device, **kwargs):
kwargs["min_value"] = setting_class.min_value
kwargs["max_value"] = setting_class.max_value
return cls(**kwargs)
def __init__(self, min_value=0, max_value=255, byte_count=1):
assert max_value > min_value
self.min_value = min_value
self.max_value = max_value
self.needs_current_value = True # read and check before write (needed for ADC power and probably a good idea anyway)
self._byte_count = math.ceil(math.log(max_value + 1, 256))
if byte_count:
assert self._byte_count <= byte_count
self._byte_count = byte_count
assert self._byte_count < 8
def validate_read(self, reply_bytes):
reply_value = _bytes2int(reply_bytes[: self._byte_count])
assert reply_value >= self.min_value, f"{self.__class__.__name__}: failed to validate read value {reply_value:02X}"
assert reply_value <= self.max_value, f"{self.__class__.__name__}: failed to validate read value {reply_value:02X}"
return reply_value
def prepare_write(self, new_value, current_value=None):
if new_value < self.min_value or new_value > self.max_value:
raise ValueError(f"invalid choice {new_value!r}")
current_value = self.validate_read(current_value) if current_value is not None else None
to_write = _int2bytes(new_value, self._byte_count)
# current value is known and same as value to be written return None to signal not to write it
return None if current_value is not None and current_value == new_value else to_write
def acceptable(self, args, current):
arg = args[0]
# None if len(args) != 1 or type(arg) != int or arg < self.min_value or arg > self.max_value else args)
return None if len(args) != 1 or isinstance(arg, int) or arg < self.min_value or arg > self.max_value else args
def compare(self, args, current):
if len(args) == 1:
return args[0] == current
elif len(args) == 2:
return args[0] <= current and current <= args[1]
else:
return False
class HeteroValidator(Validator):
kind = KIND.hetero
@classmethod
def build(cls, setting_class, device, **kwargs):
return cls(**kwargs)
def __init__(self, data_class=None, options=None, readable=True):
assert data_class is not None and options is not None
self.data_class = data_class
self.options = options
self.readable = readable
self.needs_current_value = False
def validate_read(self, reply_bytes):
if self.readable:
reply_value = self.data_class.from_bytes(reply_bytes, options=self.options)
return reply_value
def prepare_write(self, new_value, current_value=None):
to_write = new_value.to_bytes(options=self.options)
return to_write
def acceptable(self, args, current): # should this actually do some checking?
return True
class PackedRangeValidator(Validator):
kind = KIND.packed_range
"""Several range values, all the same size, all the same min and max"""
min_value = 0
max_value = 255
count = 1
rsbc = 0
write_prefix_bytes = b""
def __init__(
self, keys, min_value=0, max_value=255, count=1, byte_count=1, read_skip_byte_count=0, write_prefix_bytes=b""
):
assert max_value > min_value
self.needs_current_value = True
self.keys = keys
self.min_value = min_value
self.max_value = max_value
self.count = count
self.bc = math.ceil(math.log(max_value + 1 - min(0, min_value), 256))
if byte_count:
assert self.bc <= byte_count
self.bc = byte_count
assert self.bc * self.count
self.rsbc = read_skip_byte_count
self.write_prefix_bytes = write_prefix_bytes
def validate_read(self, reply_bytes):
rvs = {
n: _bytes2int(reply_bytes[self.rsbc + n * self.bc : self.rsbc + (n + 1) * self.bc], signed=True)
for n in range(self.count)
}
for n in range(self.count):
assert rvs[n] >= self.min_value, f"{self.__class__.__name__}: failed to validate read value {rvs[n]:02X}"
assert rvs[n] <= self.max_value, f"{self.__class__.__name__}: failed to validate read value {rvs[n]:02X}"
return rvs
def prepare_write(self, new_values):
if len(new_values) != self.count:
raise ValueError(f"wrong number of values {new_values!r}")
for new_value in new_values:
if new_value < self.min_value or new_value > self.max_value:
raise ValueError(f"invalid value {new_value!r}")
bytes = self.write_prefix_bytes + b"".join(_int2bytes(new_values[n], self.bc, signed=True) for n in range(self.count))
return bytes
def acceptable(self, args, current):
if len(args) != 2 or int(args[0]) < 0 or int(args[0]) >= self.count:
return None
return None if not isinstance(args[1], int) or args[1] < self.min_value or args[1] > self.max_value else args
def compare(self, args, current):
logger.warning("compare not implemented for packed range settings")
return False
class MultipleRangeValidator(Validator):
kind = KIND.multiple_range
def __init__(self, items, sub_items):
assert isinstance(items, list) # each element must have .index and its __int__ must return its id (not its index)
assert isinstance(sub_items, dict)
# sub_items: items -> class with .minimum, .maximum, .length (in bytes), .id (a string) and .widget (e.g. 'Scale')
self.items = items
self.keys = _NamedInts(**{str(item): int(item) for item in items})
self._item_from_id = {int(k): k for k in items}
self.sub_items = sub_items
def prepare_read_item(self, item):
return _int2bytes((self._item_from_id[int(item)].index << 1) | 0xFF, 2)
def validate_read_item(self, reply_bytes, item):
item = self._item_from_id[int(item)]
start = 0
value = {}
for sub_item in self.sub_items[item]:
r = reply_bytes[start : start + sub_item.length]
if len(r) < sub_item.length:
r += b"\x00" * (sub_item.length - len(value))
v = _bytes2int(r)
if not (sub_item.minimum < v < sub_item.maximum):
logger.warning(
f"{self.__class__.__name__}: failed to validate read value for {item}.{sub_item}: "
+ f"{v} not in [{sub_item.minimum}..{sub_item.maximum}]"
)
value[str(sub_item)] = v
start += sub_item.length
return value
def prepare_write(self, value):
seq = []
w = b""
for item in value.keys():
_item = self._item_from_id[int(item)]
b = _int2bytes(_item.index, 1)
for sub_item in self.sub_items[_item]:
try:
v = value[int(item)][str(sub_item)]
except KeyError:
return None
if not (sub_item.minimum <= v <= sub_item.maximum):
raise ValueError(
f"invalid choice for {item}.{sub_item}: {v} not in [{sub_item.minimum}..{sub_item.maximum}]"
)
b += _int2bytes(v, sub_item.length)
if len(w) + len(b) > 15:
seq.append(b + b"\xFF")
w = b""
w += b
seq.append(w + b"\xFF")
return seq
def prepare_write_item(self, item, value):
_item = self._item_from_id[int(item)]
w = _int2bytes(_item.index, 1)
for sub_item in self.sub_items[_item]:
try:
v = value[str(sub_item)]
except KeyError:
return None
if not (sub_item.minimum <= v <= sub_item.maximum):
raise ValueError(f"invalid choice for {item}.{sub_item}: {v} not in [{sub_item.minimum}..{sub_item.maximum}]")
w += _int2bytes(v, sub_item.length)
return w + b"\xFF"
def acceptable(self, args, current):
# just one item, with at least one sub-item
if not isinstance(args, list) or len(args) != 2 or not isinstance(args[1], dict):
return None
item = next((p for p in self.items if p.id == args[0] or str(p) == args[0]), None)
if not item:
return None
for sub_key, value in args[1].items():
sub_item = next((it for it in self.sub_items[item] if it.id == sub_key), None)
if not sub_item:
return None
if not isinstance(value, int) or not (sub_item.minimum <= value <= sub_item.maximum):
return None
return [int(item), {**args[1]}]
def compare(self, args, current):
logger.warning("compare not implemented for multiple range settings")
return False
class ActionSettingRW:
"""Special RW class for settings that turn on and off special processing when a key or button is depressed"""
def __init__(self, feature, name="", divert_setting_name="divert-keys"):
self.feature = feature # not used?
self.name = name
self.divert_setting_name = divert_setting_name
self.kind = FeatureRW.kind # pretend to be FeatureRW as required for HID++ 2.0 devices
self.device = None
self.key = None
self.active = False
self.pressed = False
def activate_action(self): # action to take when setting is activated (write non-false)
pass
def deactivate_action(self): # action to take when setting is deactivated (write false)
pass
def press_action(self): # action to take when key is pressed
pass
def release_action(self): # action to take when key is released
pass
def move_action(self, dx, dy): # action to take when mouse is moved while key is down
pass
def key_action(self, key): # acction to take when some other diverted key is pressed
pass
def read(self, device): # need to return bytes, as if read from device
return _int2bytes(self.key.key, 2) if self.active and self.key else b"\x00\x00"
def write(self, device, data_bytes):
def handler(device, n): # Called on notification events from the device
if n.sub_id < 0x40 and device.features.get_feature(n.sub_id) == _hidpp20_constants.FEATURE.REPROG_CONTROLS_V4:
if n.address == 0x00:
cids = _unpack("!HHHH", n.data[:8])
if not self.pressed and int(self.key.key) in cids: # trigger key pressed
self.pressed = True
self.press_action()
elif self.pressed:
if int(self.key.key) not in cids: # trigger key released
self.pressed = False
self.release_action()
else:
for key in cids:
if key and not key == self.key.key: # some other diverted key pressed
self.key_action(key)
elif n.address == 0x10:
if self.pressed:
dx, dy = _unpack("!hh", n.data[:4])
self.move_action(dx, dy)
divertSetting = next(filter(lambda s: s.name == self.divert_setting_name, device.settings), None)
if divertSetting is None:
logger.warning("setting %s not found on %s", self.divert_setting_name, device.name)
return None
self.device = device
key = _bytes2int(data_bytes)
if key: # Enable
self.key = next((k for k in device.keys if k.key == key), None)
if self.key:
self.active = True
if divertSetting:
divertSetting.write_key_value(int(self.key.key), 1)
if self.device.setting_callback:
self.device.setting_callback(device, type(divertSetting), [self.key.key, 1])
device.add_notification_handler(self.name, handler)
self.activate_action()
else:
logger.error("cannot enable %s on %s for key %s", self.name, device, key)
else: # Disable
if self.active:
self.active = False
if divertSetting:
divertSetting.write_key_value(int(self.key.key), 0)
if self.device.setting_callback:
self.device.setting_callback(device, type(divertSetting), [self.key.key, 0])
try:
device.remove_notification_handler(self.name)
except Exception:
if logger.isEnabledFor(logging.WARNING):
logger.warning("cannot disable %s on %s", self.name, device)
self.deactivate_action()
return True
class RawXYProcessing:
"""Special class for processing RawXY action messages initiated by pressing a key with rawXY diversion capability"""
def __init__(self, device, name=""):
self.device = device
self.name = name
self.keys = [] # the keys that can initiate processing
self.initiating_key = None # the key that did initiate processing
self.active = False
self.feature_offset = device.features[_hidpp20_constants.FEATURE.REPROG_CONTROLS_V4]
assert self.feature_offset is not False
def handler(self, device, n): # Called on notification events from the device
if n.sub_id < 0x40 and device.features.get_feature(n.sub_id) == _hidpp20_constants.FEATURE.REPROG_CONTROLS_V4:
if n.address == 0x00:
cids = _unpack("!HHHH", n.data[:8])
## generalize to list of keys
if not self.initiating_key: # no initiating key pressed
for k in self.keys:
if int(k.key) in cids: # initiating key that was pressed
self.initiating_key = k
if self.initiating_key:
self.press_action(self.initiating_key)
else:
if int(self.initiating_key.key) not in cids: # initiating key released
self.initiating_key = None
self.release_action()
else:
for key in cids:
if key and key != self.initiating_key.key:
self.key_action(key)
elif n.address == 0x10:
if self.initiating_key:
dx, dy = _unpack("!hh", n.data[:4])
self.move_action(dx, dy)
def start(self, key):
device_key = next((k for k in self.device.keys if k.key == key), None)
if device_key:
self.keys.append(device_key)
if not self.active:
self.active = True
self.activate_action()
self.device.add_notification_handler(self.name, self.handler)
device_key.set_rawXY_reporting(True)
def stop(self, key): # only stop if this is the active key
if self.active:
processing_key = next((k for k in self.keys if k.key == key), None)
if processing_key:
processing_key.set_rawXY_reporting(False)
self.keys.remove(processing_key)
if not self.keys:
try:
self.device.remove_notification_handler(self.name)
except Exception:
if logger.isEnabledFor(logging.WARNING):
logger.warning("cannot disable %s on %s", self.name, self.device)
self.deactivate_action()
self.active = False
def activate_action(self): # action to take when processing is activated
pass
def deactivate_action(self): # action to take when processing is deactivated
pass
def press_action(self, key): # action to take when an initiating key is pressed
pass
def release_action(self): # action to take when key is released
pass
def move_action(self, dx, dy): # action to take when mouse is moved while key is down
pass
def key_action(self, key): # acction to take when some other diverted key is pressed
pass
def apply_all_settings(device):
if device.features and _hidpp20_constants.FEATURE.HIRES_WHEEL in device.features:
_sleep(0.2) # delay to try to get out of race condition with Linux HID++ driver
persister = getattr(device, "persister", None)
sensitives = persister.get("_sensitive", {}) if persister else {}
for s in device.settings:
ignore = sensitives.get(s.name, False)
if ignore != SENSITIVITY_IGNORE:
s.apply()
Setting.validator_class = BooleanValidator