## Copyright (C) 2012-2013 Daniel Pavel ## ## This program is free software; you can redistribute it and/or modify ## it under the terms of the GNU General Public License as published by ## the Free Software Foundation; either version 2 of the License, or ## (at your option) any later version. ## ## This program is distributed in the hope that it will be useful, ## but WITHOUT ANY WARRANTY; without even the implied warranty of ## MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the ## GNU General Public License for more details. ## ## You should have received a copy of the GNU General Public License along ## with this program; if not, write to the Free Software Foundation, Inc., ## 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. import logging import math import struct import time from solaar.i18n import _ from . import common from . import hidpp20_constants from .common import NamedInt from .common import NamedInts logger = logging.getLogger(__name__) SENSITIVITY_IGNORE = "ignore" KIND = NamedInts( toggle=0x01, choice=0x02, range=0x04, map_choice=0x0A, multiple_toggle=0x10, packed_range=0x20, multiple_range=0x40, hetero=0x80, ) def bool_or_toggle(current, new): if isinstance(new, bool): return new try: return bool(int(new)) except (TypeError, ValueError): new = str(new).lower() if new in ("true", "yes", "on", "t", "y"): return True if new in ("false", "no", "off", "f", "n"): return False if new in ("~", "toggle"): return not current return None class Setting: """A setting descriptor. Needs to be instantiated for each specific device.""" name = label = description = "" feature = register = kind = None min_version = 0 persist = True rw_options = {} validator_class = None validator_options = {} def __init__(self, device, rw, validator): self._device = device self._rw = rw self._validator = validator self.kind = getattr(self._validator, "kind", None) self._value = None @classmethod def build(cls, device): assert cls.feature or cls.register, "Settings require either a feature or a register" rw_class = cls.rw_class if hasattr(cls, "rw_class") else FeatureRW if cls.feature else RegisterRW rw = rw_class(cls.feature if cls.feature else cls.register, **cls.rw_options) p = device.protocol if p == 1.0: # HID++ 1.0 devices do not support features assert rw.kind == RegisterRW.kind elif p >= 2.0: # HID++ 2.0 devices do not support registers assert rw.kind == FeatureRW.kind validator_class = cls.validator_class validator = validator_class.build(cls, device, **cls.validator_options) if validator: assert cls.kind is None or cls.kind & validator.kind != 0 return cls(device, rw, validator) def val_to_string(self, value): return self._validator.to_string(value) @property def choices(self): assert hasattr(self, "_value") assert hasattr(self, "_device") return self._validator.choices if self._validator and self._validator.kind & KIND.choice else None @property def range(self): assert hasattr(self, "_value") assert hasattr(self, "_device") if self._validator.kind == KIND.range: return (self._validator.min_value, self._validator.max_value) def _pre_read(self, cached, key=None): if self.persist and self._value is None and getattr(self._device, "persister", None): # We haven't read a value from the device yet, # maybe we have something in the configuration. self._value = self._device.persister.get(self.name) if cached and self._value is not None: if getattr(self._device, "persister", None) and self.name not in self._device.persister: # If this is a new device (or a new setting for an old device), # make sure to save its current value for the next time. self._device.persister[self.name] = self._value if self.persist else None def read(self, cached=True): assert hasattr(self, "_value") assert hasattr(self, "_device") self._pre_read(cached) if cached and self._value is not None: if logger.isEnabledFor(logging.DEBUG): logger.debug("%s: cached value %r on %s", self.name, self._value, self._device) return self._value if self._device.online: reply = self._rw.read(self._device) if reply: self._value = self._validator.validate_read(reply) if self._value is not None and self._device.persister and self.name not in self._device.persister: # Don't update the persister if it already has a value, # otherwise the first read might overwrite the value we wanted. self._device.persister[self.name] = self._value if self.persist else None if logger.isEnabledFor(logging.DEBUG): logger.debug("%s: read value %r on %s", self.name, self._value, self._device) return self._value def _pre_write(self, save=True): # Remember the value we're trying to set, even if the write fails. # This way even if the device is offline or some other error occurs, # the last value we've tried to write is remembered in the configuration. if self._device.persister and save: self._device.persister[self.name] = self._value if self.persist else None def update(self, value, save=True): self._value = value self._pre_write(save) def write(self, value, save=True): assert hasattr(self, "_value") assert hasattr(self, "_device") assert value is not None if logger.isEnabledFor(logging.DEBUG): logger.debug("%s: write %r to %s", self.name, value, self._device) if self._device.online: if self._value != value: self.update(value, save) current_value = None if self._validator.needs_current_value: # the _validator needs the current value, possibly to merge flag values current_value = self._rw.read(self._device) if logger.isEnabledFor(logging.DEBUG): logger.debug("%s: current value %r on %s", self.name, current_value, self._device) data_bytes = self._validator.prepare_write(value, current_value) if data_bytes is not None: if logger.isEnabledFor(logging.DEBUG): logger.debug("%s: prepare write(%s) => %r", self.name, value, data_bytes) reply = self._rw.write(self._device, data_bytes) if not reply: # tell whomever is calling that the write failed return None return value def acceptable(self, args, current): return self._validator.acceptable(args, current) if self._validator else None def compare(self, args, current): return self._validator.compare(args, current) if self._validator else None def apply(self): assert hasattr(self, "_value") assert hasattr(self, "_device") if logger.isEnabledFor(logging.DEBUG): logger.debug("%s: apply (%s)", self.name, self._device) try: value = self.read(self.persist) # Don't use persisted value if setting doesn't persist if self.persist and value is not None: # If setting doesn't persist no need to write value just read self.write(value, save=False) except Exception as e: if logger.isEnabledFor(logging.WARNING): logger.warning("%s: error applying %s so ignore it (%s): %s", self.name, self._value, self._device, repr(e)) def __str__(self): if hasattr(self, "_value"): assert hasattr(self, "_device") return "" % ( self._rw.kind, self._validator.kind if self._validator else None, self._device.codename, self.name, self._value, ) return f"" __repr__ = __str__ class Settings(Setting): """A setting descriptor for multiple choices, being a map from keys to values. Needs to be instantiated for each specific device.""" def read(self, cached=True): assert hasattr(self, "_value") assert hasattr(self, "_device") if logger.isEnabledFor(logging.DEBUG): logger.debug("%s: settings read %r from %s", self.name, self._value, self._device) self._pre_read(cached) if cached and self._value is not None: return self._value if self._device.online: reply_map = {} for key in self._validator.choices: reply = self._rw.read(self._device, key) if reply: reply_map[int(key)] = self._validator.validate_read(reply, key) self._value = reply_map if getattr(self._device, "persister", None) and self.name not in self._device.persister: # Don't update the persister if it already has a value, # otherwise the first read might overwrite the value we wanted. self._device.persister[self.name] = self._value if self.persist else None return self._value def read_key(self, key, cached=True): assert hasattr(self, "_value") assert hasattr(self, "_device") assert key is not None if logger.isEnabledFor(logging.DEBUG): logger.debug("%s: settings read %r key %r from %s", self.name, self._value, key, self._device) self._pre_read(cached) if cached and self._value is not None: return self._value[int(key)] if self._device.online: reply = self._rw.read(self._device, key) if reply: self._value[int(key)] = self._validator.validate_read(reply, key) if getattr(self._device, "persister", None) and self.name not in self._device.persister: self._device.persister[self.name] = self._value if self.persist else None return self._value[int(key)] def write(self, map, save=True): assert hasattr(self, "_value") assert hasattr(self, "_device") assert map is not None if logger.isEnabledFor(logging.DEBUG): logger.debug("%s: settings write %r to %s", self.name, map, self._device) if self._device.online: self.update(map, save) for key, value in map.items(): data_bytes = self._validator.prepare_write(int(key), value) if data_bytes is not None: if logger.isEnabledFor(logging.DEBUG): logger.debug("%s: settings prepare map write(%s,%s) => %r", self.name, key, value, data_bytes) reply = self._rw.write(self._device, int(key), data_bytes) if not reply: return None return map def update_key_value(self, key, value, save=True): self._value[int(key)] = value self._pre_write(save) def write_key_value(self, key, value, save=True): assert hasattr(self, "_value") assert hasattr(self, "_device") assert key is not None assert value is not None if logger.isEnabledFor(logging.DEBUG): logger.debug("%s: settings write key %r value %r to %s", self.name, key, value, self._device) if self._device.online: if not self._value: self.read() try: data_bytes = self._validator.prepare_write(int(key), value) # always need to write to configuration because dictionary is shared and could have changed self.update_key_value(key, value, save) except ValueError: data_bytes = value = None if data_bytes is not None: if logger.isEnabledFor(logging.DEBUG): logger.debug("%s: settings prepare key value write(%s,%s) => %r", self.name, key, value, data_bytes) reply = self._rw.write(self._device, int(key), data_bytes) if not reply: return None return value class LongSettings(Setting): """A setting descriptor for multiple choices, being a map from keys to values. Allows multiple write requests, if the options don't fit in 16 bytes. The validator must return a list. Needs to be instantiated for each specific device.""" def read(self, cached=True): assert hasattr(self, "_value") assert hasattr(self, "_device") if logger.isEnabledFor(logging.DEBUG): logger.debug("%s: settings read %r from %s", self.name, self._value, self._device) self._pre_read(cached) if cached and self._value is not None: return self._value if self._device.online: reply_map = {} # Reading one item at a time. This can probably be optimised for item in self._validator.items: r = self._validator.prepare_read_item(item) reply = self._rw.read(self._device, r) if reply: reply_map[int(item)] = self._validator.validate_read_item(reply, item) self._value = reply_map if getattr(self._device, "persister", None) and self.name not in self._device.persister: # Don't update the persister if it already has a value, # otherwise the first read might overwrite the value we wanted. self._device.persister[self.name] = self._value if self.persist else None return self._value def read_item(self, item, cached=True): assert hasattr(self, "_value") assert hasattr(self, "_device") assert item is not None if logger.isEnabledFor(logging.DEBUG): logger.debug("%s: settings read %r item %r from %s", self.name, self._value, item, self._device) self._pre_read(cached) if cached and self._value is not None: return self._value[int(item)] if self._device.online: r = self._validator.prepare_read_item(item) reply = self._rw.read(self._device, r) if reply: self._value[int(item)] = self._validator.validate_read_item(reply, item) if getattr(self._device, "persister", None) and self.name not in self._device.persister: self._device.persister[self.name] = self._value if self.persist else None return self._value[int(item)] def write(self, map, save=True): assert hasattr(self, "_value") assert hasattr(self, "_device") assert map is not None if logger.isEnabledFor(logging.DEBUG): logger.debug("%s: long settings write %r to %s", self.name, map, self._device) if self._device.online: self.update(map, save) for item, value in map.items(): data_bytes_list = self._validator.prepare_write(self._value) if data_bytes_list is not None: for data_bytes in data_bytes_list: if data_bytes is not None: if logger.isEnabledFor(logging.DEBUG): logger.debug("%s: settings prepare map write(%s,%s) => %r", self.name, item, value, data_bytes) reply = self._rw.write(self._device, data_bytes) if not reply: return None return map def update_key_value(self, key, value, save=True): self._value[int(key)] = value self._pre_write(save) def write_key_value(self, item, value, save=True): assert hasattr(self, "_value") assert hasattr(self, "_device") assert item is not None assert value is not None if logger.isEnabledFor(logging.DEBUG): logger.debug("%s: long settings write item %r value %r to %s", self.name, item, value, self._device) if self._device.online: if not self._value: self.read() data_bytes = self._validator.prepare_write_item(item, value) self.update_key_value(item, value, save) if data_bytes is not None: if logger.isEnabledFor(logging.DEBUG): logger.debug("%s: settings prepare item value write(%s,%s) => %r", self.name, item, value, data_bytes) reply = self._rw.write(self._device, data_bytes) if not reply: return None return value class BitFieldSetting(Setting): """A setting descriptor for a set of choices represented by one bit each, being a map from options to booleans. Needs to be instantiated for each specific device.""" def read(self, cached=True): assert hasattr(self, "_value") assert hasattr(self, "_device") if logger.isEnabledFor(logging.DEBUG): logger.debug("%s: settings read %r from %s", self.name, self._value, self._device) self._pre_read(cached) if cached and self._value is not None: return self._value if self._device.online: reply_map = {} reply = self._do_read() if reply: reply_map = self._validator.validate_read(reply) self._value = reply_map if getattr(self._device, "persister", None) and self.name not in self._device.persister: # Don't update the persister if it already has a value, # otherwise the first read might overwrite the value we wanted. self._device.persister[self.name] = self._value if self.persist else None return self._value def _do_read(self): return self._rw.read(self._device) def read_key(self, key, cached=True): assert hasattr(self, "_value") assert hasattr(self, "_device") assert key is not None if logger.isEnabledFor(logging.DEBUG): logger.debug("%s: settings read %r key %r from %s", self.name, self._value, key, self._device) self._pre_read(cached) if cached and self._value is not None: return self._value[int(key)] if self._device.online: reply = self._do_read_key(key) if reply: self._value = self._validator.validate_read(reply) if getattr(self._device, "persister", None) and self.name not in self._device.persister: self._device.persister[self.name] = self._value if self.persist else None return self._value[int(key)] def _do_read_key(self, key): return self._rw.read(self._device, key) def write(self, map, save=True): assert hasattr(self, "_value") assert hasattr(self, "_device") assert map is not None if logger.isEnabledFor(logging.DEBUG): logger.debug("%s: bit field settings write %r to %s", self.name, map, self._device) if self._device.online: self.update(map, save) data_bytes = self._validator.prepare_write(self._value) if data_bytes is not None: if logger.isEnabledFor(logging.DEBUG): logger.debug("%s: settings prepare map write(%s) => %r", self.name, self._value, data_bytes) # if prepare_write returns a list, write one item at a time seq = data_bytes if isinstance(data_bytes, list) else [data_bytes] for b in seq: reply = self._rw.write(self._device, b) if not reply: return None return map def update_key_value(self, key, value, save=True): self._value[int(key)] = value self._pre_write(save) def write_key_value(self, key, value, save=True): assert hasattr(self, "_value") assert hasattr(self, "_device") assert key is not None assert value is not None if logger.isEnabledFor(logging.DEBUG): logger.debug("%s: bit field settings write key %r value %r to %s", self.name, key, value, self._device) if self._device.online: if not self._value: self.read() value = bool(value) self.update_key_value(key, value, save) data_bytes = self._validator.prepare_write(self._value) if data_bytes is not None: if logger.isEnabledFor(logging.DEBUG): logger.debug("%s: settings prepare key value write(%s,%s) => %r", self.name, key, str(value), data_bytes) # if prepare_write returns a list, write one item at a time seq = data_bytes if isinstance(data_bytes, list) else [data_bytes] for b in seq: reply = self._rw.write(self._device, b) if not reply: return None return value class BitFieldWithOffsetAndMaskSetting(BitFieldSetting): """A setting descriptor for a set of choices represented by one bit each, each one having an offset, being a map from options to booleans. Needs to be instantiated for each specific device.""" def _do_read(self): return {r: self._rw.read(self._device, r) for r in self._validator.prepare_read()} def _do_read_key(self, key): r = self._validator.prepare_read_key(key) return {r: self._rw.read(self._device, r)} class RangeFieldSetting(Setting): """A setting descriptor for a set of choices represented by one field each, with map from option names to range(0,n). Needs to be instantiated for each specific device.""" def read(self, cached=True): assert hasattr(self, "_value") assert hasattr(self, "_device") if logger.isEnabledFor(logging.DEBUG): logger.debug("%s: settings read %r from %s", self.name, self._value, self._device) self._pre_read(cached) if cached and self._value is not None: return self._value if self._device.online: reply_map = {} reply = self._do_read() if reply: reply_map = self._validator.validate_read(reply) self._value = reply_map if getattr(self._device, "persister", None) and self.name not in self._device.persister: # Don't update the persister if it already has a value, # otherwise the first read might overwrite the value we wanted. self._device.persister[self.name] = self._value if self.persist else None return self._value def _do_read(self): return self._rw.read(self._device) def read_key(self, key, cached=True): return self.read(cached)[int(key)] def write(self, map, save=True): assert hasattr(self, "_value") assert hasattr(self, "_device") assert map is not None if logger.isEnabledFor(logging.DEBUG): logger.debug("%s: range field setting write %r to %s", self.name, map, self._device) if self._device.online: self.update(map, save) data_bytes = self._validator.prepare_write(self._value) if data_bytes is not None: if logger.isEnabledFor(logging.DEBUG): logger.debug("%s: range field setting prepare map write(%s) => %r", self.name, self._value, data_bytes) reply = self._rw.write(self._device, data_bytes) if not reply: return None elif logger.isEnabledFor(logging.WARNING): logger.warning("%s: range field setting no data to write", self.name) return map def write_key_value(self, key, value, save=True): assert key is not None assert value is not None if logger.isEnabledFor(logging.DEBUG): logger.debug("%s: range field setting write key %r value %r to %s", self.name, key, value, self._device) if self._device.online: if not self._value: self.read() map = self._value map[int(key)] = value self.write(map, save) return value # # read/write low-level operators # class RegisterRW: __slots__ = ("register",) kind = NamedInt(0x01, _("register")) def __init__(self, register: int): assert isinstance(register, int) self.register = register def read(self, device): return device.read_register(self.register) def write(self, device, data_bytes): return device.write_register(self.register, data_bytes) class FeatureRW: kind = NamedInt(0x02, _("feature")) default_read_fnid = 0x00 default_write_fnid = 0x10 def __init__(self, feature, read_fnid=0x00, write_fnid=0x10, prefix=b"", suffix=b"", read_prefix=b"", no_reply=False): assert isinstance(feature, NamedInt) self.feature = feature self.read_fnid = read_fnid self.write_fnid = write_fnid self.no_reply = no_reply self.prefix = prefix self.suffix = suffix self.read_prefix = read_prefix def read(self, device, data_bytes=b""): assert self.feature is not None return device.feature_request(self.feature, self.read_fnid, self.prefix, self.read_prefix, data_bytes) def write(self, device, data_bytes): assert self.feature is not None write_bytes = self.prefix + (data_bytes.to_bytes(1) if isinstance(data_bytes, int) else data_bytes) + self.suffix reply = device.feature_request(self.feature, self.write_fnid, write_bytes, no_reply=self.no_reply) return reply if not self.no_reply else True class FeatureRWMap(FeatureRW): kind = NamedInt(0x02, _("feature")) default_read_fnid = 0x00 default_write_fnid = 0x10 default_key_byte_count = 1 def __init__( self, feature, read_fnid=default_read_fnid, write_fnid=default_write_fnid, key_byte_count=default_key_byte_count, no_reply=False, ): assert isinstance(feature, NamedInt) self.feature = feature self.read_fnid = read_fnid self.write_fnid = write_fnid self.key_byte_count = key_byte_count self.no_reply = no_reply def read(self, device, key): assert self.feature is not None key_bytes = common.int2bytes(key, self.key_byte_count) return device.feature_request(self.feature, self.read_fnid, key_bytes) def write(self, device, key, data_bytes): assert self.feature is not None key_bytes = common.int2bytes(key, self.key_byte_count) reply = device.feature_request(self.feature, self.write_fnid, key_bytes, data_bytes, no_reply=self.no_reply) return reply if not self.no_reply else True class Validator: @classmethod def build(cls, setting_class, device, **kwargs): return cls(**kwargs) @classmethod def to_string(cls, value): return str(value) def compare(self, args, current): if len(args) != 1: return False return args[0] == current class BooleanValidator(Validator): __slots__ = ("true_value", "false_value", "read_skip_byte_count", "write_prefix_bytes", "mask", "needs_current_value") kind = KIND.toggle default_true = 0x01 default_false = 0x00 # mask specifies all the affected bits in the value default_mask = 0xFF def __init__( self, true_value=default_true, false_value=default_false, mask=default_mask, read_skip_byte_count=0, write_prefix_bytes=b"", ): if isinstance(true_value, int): assert isinstance(false_value, int) if mask is None: mask = self.default_mask else: assert isinstance(mask, int) assert true_value & false_value == 0 assert true_value & mask == true_value assert false_value & mask == false_value self.needs_current_value = mask != self.default_mask elif isinstance(true_value, bytes): if false_value is None or false_value == self.default_false: false_value = b"\x00" * len(true_value) else: assert isinstance(false_value, bytes) if mask is None or mask == self.default_mask: mask = b"\xff" * len(true_value) else: assert isinstance(mask, bytes) assert len(mask) == len(true_value) == len(false_value) tv = common.bytes2int(true_value) fv = common.bytes2int(false_value) mv = common.bytes2int(mask) assert tv != fv # true and false might be something other than bit values assert tv & mv == tv assert fv & mv == fv self.needs_current_value = any(m != 0xFF for m in mask) else: raise Exception(f"invalid mask '{mask!r}', type {type(mask)}") self.true_value = true_value self.false_value = false_value self.mask = mask self.read_skip_byte_count = read_skip_byte_count self.write_prefix_bytes = write_prefix_bytes def validate_read(self, reply_bytes): reply_bytes = reply_bytes[self.read_skip_byte_count :] if isinstance(self.mask, int): reply_value = ord(reply_bytes[:1]) & self.mask if logger.isEnabledFor(logging.DEBUG): logger.debug("BooleanValidator: validate read %r => %02X", reply_bytes, reply_value) if reply_value == self.true_value: return True if reply_value == self.false_value: return False logger.warning( "BooleanValidator: reply %02X mismatched %02X/%02X/%02X", reply_value, self.true_value, self.false_value, self.mask, ) return False count = len(self.mask) mask = common.bytes2int(self.mask) reply_value = common.bytes2int(reply_bytes[:count]) & mask true_value = common.bytes2int(self.true_value) if reply_value == true_value: return True false_value = common.bytes2int(self.false_value) if reply_value == false_value: return False logger.warning( "BooleanValidator: reply %r mismatched %r/%r/%r", reply_bytes, self.true_value, self.false_value, self.mask ) return False def prepare_write(self, new_value, current_value=None): if new_value is None: new_value = False else: assert isinstance(new_value, bool), f"New value {new_value} for boolean setting is not a boolean" to_write = self.true_value if new_value else self.false_value if isinstance(self.mask, int): if current_value is not None and self.needs_current_value: to_write |= ord(current_value[:1]) & (0xFF ^ self.mask) if current_value is not None and to_write == ord(current_value[:1]): return None to_write = bytes([to_write]) else: to_write = bytearray(to_write) count = len(self.mask) for i in range(0, count): b = ord(to_write[i : i + 1]) m = ord(self.mask[i : i + 1]) assert b & m == b # b &= m if current_value is not None and self.needs_current_value: b |= ord(current_value[i : i + 1]) & (0xFF ^ m) to_write[i] = b to_write = bytes(to_write) if current_value is not None and to_write == current_value[: len(to_write)]: return None if logger.isEnabledFor(logging.DEBUG): logger.debug("BooleanValidator: prepare_write(%s, %s) => %r", new_value, current_value, to_write) return self.write_prefix_bytes + to_write def acceptable(self, args, current): if len(args) != 1: return None val = bool_or_toggle(current, args[0]) return [val] if val is not None else None class BitFieldValidator(Validator): __slots__ = ("byte_count", "options") kind = KIND.multiple_toggle def __init__(self, options, byte_count=None): assert isinstance(options, list) self.options = options self.byte_count = (max(x.bit_length() for x in options) + 7) // 8 if byte_count: assert isinstance(byte_count, int) and byte_count >= self.byte_count self.byte_count = byte_count def to_string(self, value): def element_to_string(key, val): k = next((k for k in self.options if int(key) == k), None) return str(k) + ":" + str(val) if k is not None else "?" return "{" + ", ".join([element_to_string(k, value[k]) for k in value]) + "}" def validate_read(self, reply_bytes): r = common.bytes2int(reply_bytes[: self.byte_count]) value = {int(k): False for k in self.options} m = 1 for _ignore in range(8 * self.byte_count): if m in self.options: value[int(m)] = bool(r & m) m <<= 1 return value def prepare_write(self, new_value): assert isinstance(new_value, dict) w = 0 for k, v in new_value.items(): if v: w |= int(k) return common.int2bytes(w, self.byte_count) def get_options(self): return self.options def acceptable(self, args, current): if len(args) != 2: return None key = next((key for key in self.options if key == args[0]), None) if key is None: return None val = bool_or_toggle(current[int(key)], args[1]) return None if val is None else [int(key), val] def compare(self, args, current): if len(args) != 2: return False key = next((key for key in self.options if key == args[0]), None) if key is None: return False return args[1] == current[int(key)] class BitFieldWithOffsetAndMaskValidator(Validator): __slots__ = ("byte_count", "options", "_option_from_key", "_mask_from_offset", "_option_from_offset_mask") kind = KIND.multiple_toggle sep = 0x01 def __init__(self, options, om_method=None, byte_count=None): assert isinstance(options, list) # each element of options is an instance of a class # that has an id (which is used as an index in other dictionaries) # and where om_method is a method that returns a byte offset and byte mask # that says how to access and modify the bit toggle for the option self.options = options self.om_method = om_method # to retrieve the options efficiently: self._option_from_key = {} self._mask_from_offset = {} self._option_from_offset_mask = {} for opt in options: offset, mask = om_method(opt) self._option_from_key[int(opt)] = opt try: self._mask_from_offset[offset] |= mask except KeyError: self._mask_from_offset[offset] = mask try: mask_to_opt = self._option_from_offset_mask[offset] except KeyError: mask_to_opt = {} self._option_from_offset_mask[offset] = mask_to_opt mask_to_opt[mask] = opt self.byte_count = (max(om_method(x)[1].bit_length() for x in options) + 7) // 8 # is this correct?? if byte_count: assert isinstance(byte_count, int) and byte_count >= self.byte_count self.byte_count = byte_count def prepare_read(self): r = [] for offset, mask in self._mask_from_offset.items(): b = offset << (8 * (self.byte_count + 1)) b |= (self.sep << (8 * self.byte_count)) | mask r.append(common.int2bytes(b, self.byte_count + 2)) return r def prepare_read_key(self, key): option = self._option_from_key.get(key, None) if option is None: return None offset, mask = option.om_method(option) b = offset << (8 * (self.byte_count + 1)) b |= (self.sep << (8 * self.byte_count)) | mask return common.int2bytes(b, self.byte_count + 2) def validate_read(self, reply_bytes_dict): values = {int(k): False for k in self.options} for query, b in reply_bytes_dict.items(): offset = common.bytes2int(query[0:1]) b += (self.byte_count - len(b)) * b"\x00" value = common.bytes2int(b[: self.byte_count]) mask_to_opt = self._option_from_offset_mask.get(offset, {}) m = 1 for _ignore in range(8 * self.byte_count): if m in mask_to_opt: values[int(mask_to_opt[m])] = bool(value & m) m <<= 1 return values def prepare_write(self, new_value): assert isinstance(new_value, dict) w = {} for k, v in new_value.items(): option = self._option_from_key[int(k)] offset, mask = self.om_method(option) if offset not in w: w[offset] = 0 if v: w[offset] |= mask return [ common.int2bytes( (offset << (8 * (2 * self.byte_count + 1))) | (self.sep << (16 * self.byte_count)) | (self._mask_from_offset[offset] << (8 * self.byte_count)) | value, 2 * self.byte_count + 2, ) for offset, value in w.items() ] def get_options(self): return [int(opt) if isinstance(opt, int) else opt.as_int() for opt in self.options] def acceptable(self, args, current): if len(args) != 2: return None key = next((option.id for option in self.options if option.as_int() == args[0]), None) if key is None: return None val = bool_or_toggle(current[int(key)], args[1]) return None if val is None else [int(key), val] def compare(self, args, current): if len(args) != 2: return False key = next((option.id for option in self.options if option.as_int() == args[0]), None) if key is None: return False return args[1] == current[int(key)] class ChoicesValidator(Validator): """Translates between NamedInts and a byte sequence. :param choices: a list of NamedInts :param byte_count: the size of the derived byte sequence. If None, it will be calculated from the choices.""" kind = KIND.choice def __init__(self, choices=None, byte_count=None, read_skip_byte_count=0, write_prefix_bytes=b""): assert choices is not None assert isinstance(choices, NamedInts) assert len(choices) > 1 self.choices = choices self.needs_current_value = False max_bits = max(x.bit_length() for x in choices) self._byte_count = (max_bits // 8) + (1 if max_bits % 8 else 0) if byte_count: assert self._byte_count <= byte_count self._byte_count = byte_count assert self._byte_count < 8 self._read_skip_byte_count = read_skip_byte_count self._write_prefix_bytes = write_prefix_bytes if write_prefix_bytes else b"" assert self._byte_count + self._read_skip_byte_count <= 14 assert self._byte_count + len(self._write_prefix_bytes) <= 14 def to_string(self, value): return str(self.choices[value]) if isinstance(value, int) else str(value) def validate_read(self, reply_bytes): reply_value = common.bytes2int(reply_bytes[self._read_skip_byte_count : self._read_skip_byte_count + self._byte_count]) valid_value = self.choices[reply_value] assert valid_value is not None, f"{self.__class__.__name__}: failed to validate read value {reply_value:02X}" return valid_value def prepare_write(self, new_value, current_value=None): if new_value is None: value = self.choices[:][0] else: value = self.choice(new_value) if value is None: raise ValueError(f"invalid choice {new_value!r}") assert isinstance(value, NamedInt) return self._write_prefix_bytes + value.bytes(self._byte_count) def choice(self, value): if isinstance(value, int): return self.choices[value] try: int(value) if int(value) in self.choices: return self.choices[int(value)] except Exception: pass if value in self.choices: return self.choices[value] else: return None def acceptable(self, args, current): choice = self.choice(args[0]) if len(args) == 1 else None return None if choice is None else [choice] class ChoicesMapValidator(ChoicesValidator): kind = KIND.map_choice def __init__( self, choices_map, key_byte_count=0, key_postfix_bytes=b"", byte_count=0, read_skip_byte_count=0, write_prefix_bytes=b"", extra_default=None, mask=-1, activate=0, ): assert choices_map is not None assert isinstance(choices_map, dict) max_key_bits = 0 max_value_bits = 0 for key, choices in choices_map.items(): assert isinstance(key, NamedInt) assert isinstance(choices, NamedInts) max_key_bits = max(max_key_bits, key.bit_length()) for key_value in choices: assert isinstance(key_value, NamedInt) max_value_bits = max(max_value_bits, key_value.bit_length()) self._key_byte_count = (max_key_bits + 7) // 8 if key_byte_count: assert self._key_byte_count <= key_byte_count self._key_byte_count = key_byte_count self._byte_count = (max_value_bits + 7) // 8 if byte_count: assert self._byte_count <= byte_count self._byte_count = byte_count self.choices = choices_map self.needs_current_value = False self.extra_default = extra_default self._key_postfix_bytes = key_postfix_bytes self._read_skip_byte_count = read_skip_byte_count if read_skip_byte_count else 0 self._write_prefix_bytes = write_prefix_bytes if write_prefix_bytes else b"" self.activate = activate self.mask = mask assert self._byte_count + self._read_skip_byte_count + self._key_byte_count <= 14 assert self._byte_count + len(self._write_prefix_bytes) + self._key_byte_count <= 14 def to_string(self, value): def element_to_string(key, val): k, c = next(((k, c) for k, c in self.choices.items() if int(key) == k), (None, None)) return str(k) + ":" + str(c[val]) if k is not None else "?" return "{" + ", ".join([element_to_string(k, value[k]) for k in sorted(value)]) + "}" def validate_read(self, reply_bytes, key): start = self._key_byte_count + self._read_skip_byte_count end = start + self._byte_count reply_value = common.bytes2int(reply_bytes[start:end]) & self.mask # reprogrammable keys starts out as 0, which is not a choice, so don't use assert here if self.extra_default is not None and self.extra_default == reply_value: return int(self.choices[key][0]) if reply_value not in self.choices[key]: assert reply_value in self.choices[key], "%s: failed to validate read value %02X" % ( self.__class__.__name__, reply_value, ) return reply_value def prepare_key(self, key): return key.to_bytes(self._key_byte_count, "big") + self._key_postfix_bytes def prepare_write(self, key, new_value): choices = self.choices.get(key) if choices is None or (new_value not in choices and new_value != self.extra_default): logger.error("invalid choice %r for %s", new_value, key) return None new_value = new_value | self.activate return self._write_prefix_bytes + new_value.to_bytes(self._byte_count, "big") def acceptable(self, args, current): if len(args) != 2: return None key, choices = next(((key, item) for key, item in self.choices.items() if key == args[0]), (None, None)) if choices is None or args[1] not in choices: return None choice = next((item for item in choices if item == args[1]), None) return [int(key), int(choice)] if choice is not None else None def compare(self, args, current): if len(args) != 2: return False key = next((key for key in self.choices if key == int(args[0])), None) if key is None: return False return args[1] == current[int(key)] class RangeValidator(Validator): kind = KIND.range """Translates between integers and a byte sequence. :param min_value: minimum accepted value (inclusive) :param max_value: maximum accepted value (inclusive) :param byte_count: the size of the derived byte sequence. If None, it will be calculated from the range.""" min_value = 0 max_value = 255 @classmethod def build(cls, setting_class, device, **kwargs): kwargs["min_value"] = setting_class.min_value kwargs["max_value"] = setting_class.max_value return cls(**kwargs) def __init__(self, min_value=0, max_value=255, byte_count=1): assert max_value > min_value self.min_value = min_value self.max_value = max_value self.needs_current_value = True # read and check before write (needed for ADC power and probably a good idea anyway) self._byte_count = math.ceil(math.log(max_value + 1, 256)) if byte_count: assert self._byte_count <= byte_count self._byte_count = byte_count assert self._byte_count < 8 def validate_read(self, reply_bytes): reply_value = common.bytes2int(reply_bytes[: self._byte_count]) assert reply_value >= self.min_value, f"{self.__class__.__name__}: failed to validate read value {reply_value:02X}" assert reply_value <= self.max_value, f"{self.__class__.__name__}: failed to validate read value {reply_value:02X}" return reply_value def prepare_write(self, new_value, current_value=None): if new_value < self.min_value or new_value > self.max_value: raise ValueError(f"invalid choice {new_value!r}") current_value = self.validate_read(current_value) if current_value is not None else None to_write = common.int2bytes(new_value, self._byte_count) # current value is known and same as value to be written return None to signal not to write it return None if current_value is not None and current_value == new_value else to_write def acceptable(self, args, current): arg = args[0] # None if len(args) != 1 or type(arg) != int or arg < self.min_value or arg > self.max_value else args) return None if len(args) != 1 or isinstance(arg, int) or arg < self.min_value or arg > self.max_value else args def compare(self, args, current): if len(args) == 1: return args[0] == current elif len(args) == 2: return args[0] <= current and current <= args[1] else: return False class HeteroValidator(Validator): kind = KIND.hetero @classmethod def build(cls, setting_class, device, **kwargs): return cls(**kwargs) def __init__(self, data_class=None, options=None, readable=True): assert data_class is not None and options is not None self.data_class = data_class self.options = options self.readable = readable self.needs_current_value = False def validate_read(self, reply_bytes): if self.readable: reply_value = self.data_class.from_bytes(reply_bytes, options=self.options) return reply_value def prepare_write(self, new_value, current_value=None): to_write = new_value.to_bytes(options=self.options) return to_write def acceptable(self, args, current): # should this actually do some checking? return True class PackedRangeValidator(Validator): kind = KIND.packed_range """Several range values, all the same size, all the same min and max""" min_value = 0 max_value = 255 count = 1 rsbc = 0 write_prefix_bytes = b"" def __init__( self, keys, min_value=0, max_value=255, count=1, byte_count=1, read_skip_byte_count=0, write_prefix_bytes=b"" ): assert max_value > min_value self.needs_current_value = True self.keys = keys self.min_value = min_value self.max_value = max_value self.count = count self.bc = math.ceil(math.log(max_value + 1 - min(0, min_value), 256)) if byte_count: assert self.bc <= byte_count self.bc = byte_count assert self.bc * self.count self.rsbc = read_skip_byte_count self.write_prefix_bytes = write_prefix_bytes def validate_read(self, reply_bytes): rvs = { n: common.bytes2int(reply_bytes[self.rsbc + n * self.bc : self.rsbc + (n + 1) * self.bc], signed=True) for n in range(self.count) } for n in range(self.count): assert rvs[n] >= self.min_value, f"{self.__class__.__name__}: failed to validate read value {rvs[n]:02X}" assert rvs[n] <= self.max_value, f"{self.__class__.__name__}: failed to validate read value {rvs[n]:02X}" return rvs def prepare_write(self, new_values): if len(new_values) != self.count: raise ValueError(f"wrong number of values {new_values!r}") for new_value in new_values.values(): if new_value < self.min_value or new_value > self.max_value: raise ValueError(f"invalid value {new_value!r}") bytes = self.write_prefix_bytes + b"".join( common.int2bytes(new_values[n], self.bc, signed=True) for n in range(self.count) ) return bytes def acceptable(self, args, current): if len(args) != 2 or int(args[0]) < 0 or int(args[0]) >= self.count: return None return None if not isinstance(args[1], int) or args[1] < self.min_value or args[1] > self.max_value else args def compare(self, args, current): logger.warning("compare not implemented for packed range settings") return False class MultipleRangeValidator(Validator): kind = KIND.multiple_range def __init__(self, items, sub_items): assert isinstance(items, list) # each element must have .index and its __int__ must return its id (not its index) assert isinstance(sub_items, dict) # sub_items: items -> class with .minimum, .maximum, .length (in bytes), .id (a string) and .widget (e.g. 'Scale') self.items = items self.keys = NamedInts(**{str(item): int(item) for item in items}) self._item_from_id = {int(k): k for k in items} self.sub_items = sub_items def prepare_read_item(self, item): return common.int2bytes((self._item_from_id[int(item)].index << 1) | 0xFF, 2) def validate_read_item(self, reply_bytes, item): item = self._item_from_id[int(item)] start = 0 value = {} for sub_item in self.sub_items[item]: r = reply_bytes[start : start + sub_item.length] if len(r) < sub_item.length: r += b"\x00" * (sub_item.length - len(value)) v = common.bytes2int(r) if not (sub_item.minimum < v < sub_item.maximum): logger.warning( f"{self.__class__.__name__}: failed to validate read value for {item}.{sub_item}: " + f"{v} not in [{sub_item.minimum}..{sub_item.maximum}]" ) value[str(sub_item)] = v start += sub_item.length return value def prepare_write(self, value): seq = [] w = b"" for item in value.keys(): _item = self._item_from_id[int(item)] b = common.int2bytes(_item.index, 1) for sub_item in self.sub_items[_item]: try: v = value[int(item)][str(sub_item)] except KeyError: return None if not (sub_item.minimum <= v <= sub_item.maximum): raise ValueError( f"invalid choice for {item}.{sub_item}: {v} not in [{sub_item.minimum}..{sub_item.maximum}]" ) b += common.int2bytes(v, sub_item.length) if len(w) + len(b) > 15: seq.append(b + b"\xff") w = b"" w += b seq.append(w + b"\xff") return seq def prepare_write_item(self, item, value): _item = self._item_from_id[int(item)] w = common.int2bytes(_item.index, 1) for sub_item in self.sub_items[_item]: try: v = value[str(sub_item)] except KeyError: return None if not (sub_item.minimum <= v <= sub_item.maximum): raise ValueError(f"invalid choice for {item}.{sub_item}: {v} not in [{sub_item.minimum}..{sub_item.maximum}]") w += common.int2bytes(v, sub_item.length) return w + b"\xff" def acceptable(self, args, current): # just one item, with at least one sub-item if not isinstance(args, list) or len(args) != 2 or not isinstance(args[1], dict): return None item = next((p for p in self.items if p.id == args[0] or str(p) == args[0]), None) if not item: return None for sub_key, value in args[1].items(): sub_item = next((it for it in self.sub_items[item] if it.id == sub_key), None) if not sub_item: return None if not isinstance(value, int) or not (sub_item.minimum <= value <= sub_item.maximum): return None return [int(item), {**args[1]}] def compare(self, args, current): logger.warning("compare not implemented for multiple range settings") return False class ActionSettingRW: """Special RW class for settings that turn on and off special processing when a key or button is depressed""" def __init__(self, feature, name="", divert_setting_name="divert-keys"): self.feature = feature # not used? self.name = name self.divert_setting_name = divert_setting_name self.kind = FeatureRW.kind # pretend to be FeatureRW as required for HID++ 2.0 devices self.device = None self.key = None self.active = False self.pressed = False def activate_action(self): # action to take when setting is activated (write non-false) pass def deactivate_action(self): # action to take when setting is deactivated (write false) pass def press_action(self): # action to take when key is pressed pass def release_action(self): # action to take when key is released pass def move_action(self, dx, dy): # action to take when mouse is moved while key is down pass def key_action(self, key): # acction to take when some other diverted key is pressed pass def read(self, device): # need to return bytes, as if read from device return common.int2bytes(self.key.key, 2) if self.active and self.key else b"\x00\x00" def write(self, device, data_bytes): def handler(device, n): # Called on notification events from the device if n.sub_id < 0x40 and device.features.get_feature(n.sub_id) == hidpp20_constants.FEATURE.REPROG_CONTROLS_V4: if n.address == 0x00: cids = struct.unpack("!HHHH", n.data[:8]) if not self.pressed and int(self.key.key) in cids: # trigger key pressed self.pressed = True self.press_action() elif self.pressed: if int(self.key.key) not in cids: # trigger key released self.pressed = False self.release_action() else: for key in cids: if key and not key == self.key.key: # some other diverted key pressed self.key_action(key) elif n.address == 0x10: if self.pressed: dx, dy = struct.unpack("!hh", n.data[:4]) self.move_action(dx, dy) divertSetting = next(filter(lambda s: s.name == self.divert_setting_name, device.settings), None) if divertSetting is None: logger.warning("setting %s not found on %s", self.divert_setting_name, device.name) return None self.device = device key = common.bytes2int(data_bytes) if key: # Enable self.key = next((k for k in device.keys if k.key == key), None) if self.key: self.active = True if divertSetting: divertSetting.write_key_value(int(self.key.key), 1) if self.device.setting_callback: self.device.setting_callback(device, type(divertSetting), [self.key.key, 1]) device.add_notification_handler(self.name, handler) self.activate_action() else: logger.error("cannot enable %s on %s for key %s", self.name, device, key) else: # Disable if self.active: self.active = False if divertSetting: divertSetting.write_key_value(int(self.key.key), 0) if self.device.setting_callback: self.device.setting_callback(device, type(divertSetting), [self.key.key, 0]) try: device.remove_notification_handler(self.name) except Exception: if logger.isEnabledFor(logging.WARNING): logger.warning("cannot disable %s on %s", self.name, device) self.deactivate_action() return data_bytes class RawXYProcessing: """Special class for processing RawXY action messages initiated by pressing a key with rawXY diversion capability""" def __init__(self, device, name=""): self.device = device self.name = name self.keys = [] # the keys that can initiate processing self.initiating_key = None # the key that did initiate processing self.active = False self.feature_offset = device.features[hidpp20_constants.FEATURE.REPROG_CONTROLS_V4] assert self.feature_offset is not False def handler(self, device, n): # Called on notification events from the device if n.sub_id < 0x40 and device.features.get_feature(n.sub_id) == hidpp20_constants.FEATURE.REPROG_CONTROLS_V4: if n.address == 0x00: cids = struct.unpack("!HHHH", n.data[:8]) ## generalize to list of keys if not self.initiating_key: # no initiating key pressed for k in self.keys: if int(k.key) in cids: # initiating key that was pressed self.initiating_key = k if self.initiating_key: self.press_action(self.initiating_key) else: if int(self.initiating_key.key) not in cids: # initiating key released self.initiating_key = None self.release_action() else: for key in cids: if key and key != self.initiating_key.key: self.key_action(key) elif n.address == 0x10: if self.initiating_key: dx, dy = struct.unpack("!hh", n.data[:4]) self.move_action(dx, dy) def start(self, key): device_key = next((k for k in self.device.keys if k.key == key), None) if device_key: self.keys.append(device_key) if not self.active: self.active = True self.activate_action() self.device.add_notification_handler(self.name, self.handler) device_key.set_rawXY_reporting(True) def stop(self, key): # only stop if this is the active key if self.active: processing_key = next((k for k in self.keys if k.key == key), None) if processing_key: processing_key.set_rawXY_reporting(False) self.keys.remove(processing_key) if not self.keys: try: self.device.remove_notification_handler(self.name) except Exception: if logger.isEnabledFor(logging.WARNING): logger.warning("cannot disable %s on %s", self.name, self.device) self.deactivate_action() self.active = False def activate_action(self): # action to take when processing is activated pass def deactivate_action(self): # action to take when processing is deactivated pass def press_action(self, key): # action to take when an initiating key is pressed pass def release_action(self): # action to take when key is released pass def move_action(self, dx, dy): # action to take when mouse is moved while key is down pass def key_action(self, key): # acction to take when some other diverted key is pressed pass def apply_all_settings(device): if device.features and hidpp20_constants.FEATURE.HIRES_WHEEL in device.features: time.sleep(0.2) # delay to try to get out of race condition with Linux HID++ driver persister = getattr(device, "persister", None) sensitives = persister.get("_sensitive", {}) if persister else {} for s in device.settings: ignore = sensitives.get(s.name, False) if ignore != SENSITIVITY_IGNORE: s.apply() Setting.validator_class = BooleanValidator