From 03de6fb2766c6ee0e886e430ac97175a723704e1 Mon Sep 17 00:00:00 2001 From: MattHag <16444067+MattHag@users.noreply.github.com> Date: Sun, 3 Nov 2024 18:42:41 +0100 Subject: [PATCH] Split up huge settings module - Move validators into their own module. - Convert Kind to IntEnum Related #2273 --- lib/logitech_receiver/settings.py | 738 +---------------- lib/logitech_receiver/settings_templates.py | 96 +-- lib/logitech_receiver/settings_validator.py | 744 ++++++++++++++++++ lib/solaar/cli/config.py | 22 +- lib/solaar/ui/config_panel.py | 16 +- lib/solaar/ui/diversion_rules.py | 36 +- ...settings.py => test_settings_validator.py} | 4 +- 7 files changed, 847 insertions(+), 809 deletions(-) create mode 100644 lib/logitech_receiver/settings_validator.py rename tests/logitech_receiver/{test_settings.py => test_settings_validator.py} (82%) diff --git a/lib/logitech_receiver/settings.py b/lib/logitech_receiver/settings.py index f6674eb8..cb1a6e6f 100644 --- a/lib/logitech_receiver/settings.py +++ b/lib/logitech_receiver/settings.py @@ -16,14 +16,16 @@ from __future__ import annotations import logging -import math import struct import time +from enum import IntEnum + from solaar.i18n import _ from . import common from . import hidpp20_constants +from . import settings_validator from .common import NamedInt from .common import NamedInts @@ -43,22 +45,15 @@ KIND = NamedInts( ) -def bool_or_toggle(current: bool | str, new: bool | str) -> bool: - if isinstance(new, bool): - return new - - try: - return bool(int(new)) - except (TypeError, ValueError): - new = str(new).lower() - - if new in ("true", "yes", "on", "t", "y"): - return True - if new in ("false", "no", "off", "f", "n"): - return False - if new in ("~", "toggle"): - return not current - return None +class Kind(IntEnum): + TOGGLE = 0x01 + CHOICE = 0x02 + RANGE = 0x04 + MAP_CHOICE = 0x0A + MULTIPLE_TOGGLE = 0x10 + PACKED_RANGE = 0x20 + MULTIPLE_RANGE = 0x40 + HETERO = 0x80 class Setting: @@ -103,14 +98,14 @@ class Setting: assert hasattr(self, "_value") assert hasattr(self, "_device") - return self._validator.choices if self._validator and self._validator.kind & KIND.choice else None + return self._validator.choices if self._validator and self._validator.kind & Kind.CHOICE else None @property def range(self): assert hasattr(self, "_value") assert hasattr(self, "_device") - if self._validator.kind == KIND.range: + if self._validator.kind == Kind.RANGE: return self._validator.min_value, self._validator.max_value def _pre_read(self, cached, key=None): @@ -692,709 +687,6 @@ class FeatureRWMap(FeatureRW): return reply if not self.no_reply else True -class Validator: - @classmethod - def build(cls, setting_class, device, **kwargs): - return cls(**kwargs) - - @classmethod - def to_string(cls, value): - return str(value) - - def compare(self, args, current): - if len(args) != 1: - return False - return args[0] == current - - -class BooleanValidator(Validator): - __slots__ = ("true_value", "false_value", "read_skip_byte_count", "write_prefix_bytes", "mask", "needs_current_value") - - kind = KIND.toggle - default_true = 0x01 - default_false = 0x00 - # mask specifies all the affected bits in the value - default_mask = 0xFF - - def __init__( - self, - true_value=default_true, - false_value=default_false, - mask=default_mask, - read_skip_byte_count=0, - write_prefix_bytes=b"", - ): - if isinstance(true_value, int): - assert isinstance(false_value, int) - if mask is None: - mask = self.default_mask - else: - assert isinstance(mask, int) - assert true_value & false_value == 0 - assert true_value & mask == true_value - assert false_value & mask == false_value - self.needs_current_value = mask != self.default_mask - elif isinstance(true_value, bytes): - if false_value is None or false_value == self.default_false: - false_value = b"\x00" * len(true_value) - else: - assert isinstance(false_value, bytes) - if mask is None or mask == self.default_mask: - mask = b"\xff" * len(true_value) - else: - assert isinstance(mask, bytes) - assert len(mask) == len(true_value) == len(false_value) - tv = common.bytes2int(true_value) - fv = common.bytes2int(false_value) - mv = common.bytes2int(mask) - assert tv != fv # true and false might be something other than bit values - assert tv & mv == tv - assert fv & mv == fv - self.needs_current_value = any(m != 0xFF for m in mask) - else: - raise Exception(f"invalid mask '{mask!r}', type {type(mask)}") - - self.true_value = true_value - self.false_value = false_value - self.mask = mask - self.read_skip_byte_count = read_skip_byte_count - self.write_prefix_bytes = write_prefix_bytes - - def validate_read(self, reply_bytes): - reply_bytes = reply_bytes[self.read_skip_byte_count :] - if isinstance(self.mask, int): - reply_value = ord(reply_bytes[:1]) & self.mask - if logger.isEnabledFor(logging.DEBUG): - logger.debug("BooleanValidator: validate read %r => %02X", reply_bytes, reply_value) - if reply_value == self.true_value: - return True - if reply_value == self.false_value: - return False - logger.warning( - "BooleanValidator: reply %02X mismatched %02X/%02X/%02X", - reply_value, - self.true_value, - self.false_value, - self.mask, - ) - return False - - count = len(self.mask) - mask = common.bytes2int(self.mask) - reply_value = common.bytes2int(reply_bytes[:count]) & mask - - true_value = common.bytes2int(self.true_value) - if reply_value == true_value: - return True - - false_value = common.bytes2int(self.false_value) - if reply_value == false_value: - return False - - logger.warning( - "BooleanValidator: reply %r mismatched %r/%r/%r", reply_bytes, self.true_value, self.false_value, self.mask - ) - return False - - def prepare_write(self, new_value, current_value=None): - if new_value is None: - new_value = False - else: - assert isinstance(new_value, bool), f"New value {new_value} for boolean setting is not a boolean" - - to_write = self.true_value if new_value else self.false_value - - if isinstance(self.mask, int): - if current_value is not None and self.needs_current_value: - to_write |= ord(current_value[:1]) & (0xFF ^ self.mask) - if current_value is not None and to_write == ord(current_value[:1]): - return None - to_write = bytes([to_write]) - else: - to_write = bytearray(to_write) - count = len(self.mask) - for i in range(0, count): - b = ord(to_write[i : i + 1]) - m = ord(self.mask[i : i + 1]) - assert b & m == b - # b &= m - if current_value is not None and self.needs_current_value: - b |= ord(current_value[i : i + 1]) & (0xFF ^ m) - to_write[i] = b - to_write = bytes(to_write) - - if current_value is not None and to_write == current_value[: len(to_write)]: - return None - - if logger.isEnabledFor(logging.DEBUG): - logger.debug("BooleanValidator: prepare_write(%s, %s) => %r", new_value, current_value, to_write) - - return self.write_prefix_bytes + to_write - - def acceptable(self, args, current): - if len(args) != 1: - return None - val = bool_or_toggle(current, args[0]) - return [val] if val is not None else None - - -class BitFieldValidator(Validator): - __slots__ = ("byte_count", "options") - - kind = KIND.multiple_toggle - - def __init__(self, options, byte_count=None): - assert isinstance(options, list) - self.options = options - self.byte_count = (max(x.bit_length() for x in options) + 7) // 8 - if byte_count: - assert isinstance(byte_count, int) and byte_count >= self.byte_count - self.byte_count = byte_count - - def to_string(self, value): - def element_to_string(key, val): - k = next((k for k in self.options if int(key) == k), None) - return str(k) + ":" + str(val) if k is not None else "?" - - return "{" + ", ".join([element_to_string(k, value[k]) for k in value]) + "}" - - def validate_read(self, reply_bytes): - r = common.bytes2int(reply_bytes[: self.byte_count]) - value = {int(k): False for k in self.options} - m = 1 - for _ignore in range(8 * self.byte_count): - if m in self.options: - value[int(m)] = bool(r & m) - m <<= 1 - return value - - def prepare_write(self, new_value): - assert isinstance(new_value, dict) - w = 0 - for k, v in new_value.items(): - if v: - w |= int(k) - return common.int2bytes(w, self.byte_count) - - def get_options(self): - return self.options - - def acceptable(self, args, current): - if len(args) != 2: - return None - key = next((key for key in self.options if key == args[0]), None) - if key is None: - return None - val = bool_or_toggle(current[int(key)], args[1]) - return None if val is None else [int(key), val] - - def compare(self, args, current): - if len(args) != 2: - return False - key = next((key for key in self.options if key == args[0]), None) - if key is None: - return False - return args[1] == current[int(key)] - - -class BitFieldWithOffsetAndMaskValidator(Validator): - __slots__ = ("byte_count", "options", "_option_from_key", "_mask_from_offset", "_option_from_offset_mask") - - kind = KIND.multiple_toggle - sep = 0x01 - - def __init__(self, options, om_method=None, byte_count=None): - assert isinstance(options, list) - # each element of options is an instance of a class - # that has an id (which is used as an index in other dictionaries) - # and where om_method is a method that returns a byte offset and byte mask - # that says how to access and modify the bit toggle for the option - self.options = options - self.om_method = om_method - # to retrieve the options efficiently: - self._option_from_key = {} - self._mask_from_offset = {} - self._option_from_offset_mask = {} - for opt in options: - offset, mask = om_method(opt) - self._option_from_key[int(opt)] = opt - try: - self._mask_from_offset[offset] |= mask - except KeyError: - self._mask_from_offset[offset] = mask - try: - mask_to_opt = self._option_from_offset_mask[offset] - except KeyError: - mask_to_opt = {} - self._option_from_offset_mask[offset] = mask_to_opt - mask_to_opt[mask] = opt - self.byte_count = (max(om_method(x)[1].bit_length() for x in options) + 7) // 8 # is this correct?? - if byte_count: - assert isinstance(byte_count, int) and byte_count >= self.byte_count - self.byte_count = byte_count - - def prepare_read(self): - r = [] - for offset, mask in self._mask_from_offset.items(): - b = offset << (8 * (self.byte_count + 1)) - b |= (self.sep << (8 * self.byte_count)) | mask - r.append(common.int2bytes(b, self.byte_count + 2)) - return r - - def prepare_read_key(self, key): - option = self._option_from_key.get(key, None) - if option is None: - return None - offset, mask = option.om_method(option) - b = offset << (8 * (self.byte_count + 1)) - b |= (self.sep << (8 * self.byte_count)) | mask - return common.int2bytes(b, self.byte_count + 2) - - def validate_read(self, reply_bytes_dict): - values = {int(k): False for k in self.options} - for query, b in reply_bytes_dict.items(): - offset = common.bytes2int(query[0:1]) - b += (self.byte_count - len(b)) * b"\x00" - value = common.bytes2int(b[: self.byte_count]) - mask_to_opt = self._option_from_offset_mask.get(offset, {}) - m = 1 - for _ignore in range(8 * self.byte_count): - if m in mask_to_opt: - values[int(mask_to_opt[m])] = bool(value & m) - m <<= 1 - return values - - def prepare_write(self, new_value): - assert isinstance(new_value, dict) - w = {} - for k, v in new_value.items(): - option = self._option_from_key[int(k)] - offset, mask = self.om_method(option) - if offset not in w: - w[offset] = 0 - if v: - w[offset] |= mask - return [ - common.int2bytes( - (offset << (8 * (2 * self.byte_count + 1))) - | (self.sep << (16 * self.byte_count)) - | (self._mask_from_offset[offset] << (8 * self.byte_count)) - | value, - 2 * self.byte_count + 2, - ) - for offset, value in w.items() - ] - - def get_options(self): - return [int(opt) if isinstance(opt, int) else opt.as_int() for opt in self.options] - - def acceptable(self, args, current): - if len(args) != 2: - return None - key = next((option.id for option in self.options if option.as_int() == args[0]), None) - if key is None: - return None - val = bool_or_toggle(current[int(key)], args[1]) - return None if val is None else [int(key), val] - - def compare(self, args, current): - if len(args) != 2: - return False - key = next((option.id for option in self.options if option.as_int() == args[0]), None) - if key is None: - return False - return args[1] == current[int(key)] - - -class ChoicesValidator(Validator): - """Translates between NamedInts and a byte sequence. - :param choices: a list of NamedInts - :param byte_count: the size of the derived byte sequence. If None, it - will be calculated from the choices.""" - - kind = KIND.choice - - def __init__(self, choices=None, byte_count=None, read_skip_byte_count=0, write_prefix_bytes=b""): - assert choices is not None - assert isinstance(choices, NamedInts) - assert len(choices) > 1 - self.choices = choices - self.needs_current_value = False - - max_bits = max(x.bit_length() for x in choices) - self._byte_count = (max_bits // 8) + (1 if max_bits % 8 else 0) - if byte_count: - assert self._byte_count <= byte_count - self._byte_count = byte_count - assert self._byte_count < 8 - self._read_skip_byte_count = read_skip_byte_count - self._write_prefix_bytes = write_prefix_bytes if write_prefix_bytes else b"" - assert self._byte_count + self._read_skip_byte_count <= 14 - assert self._byte_count + len(self._write_prefix_bytes) <= 14 - - def to_string(self, value): - return str(self.choices[value]) if isinstance(value, int) else str(value) - - def validate_read(self, reply_bytes): - reply_value = common.bytes2int(reply_bytes[self._read_skip_byte_count : self._read_skip_byte_count + self._byte_count]) - valid_value = self.choices[reply_value] - assert valid_value is not None, f"{self.__class__.__name__}: failed to validate read value {reply_value:02X}" - return valid_value - - def prepare_write(self, new_value, current_value=None): - if new_value is None: - value = self.choices[:][0] - else: - value = self.choice(new_value) - if value is None: - raise ValueError(f"invalid choice {new_value!r}") - assert isinstance(value, NamedInt) - return self._write_prefix_bytes + value.bytes(self._byte_count) - - def choice(self, value): - if isinstance(value, int): - return self.choices[value] - try: - int(value) - if int(value) in self.choices: - return self.choices[int(value)] - except Exception: - pass - if value in self.choices: - return self.choices[value] - else: - return None - - def acceptable(self, args, current): - choice = self.choice(args[0]) if len(args) == 1 else None - return None if choice is None else [choice] - - -class ChoicesMapValidator(ChoicesValidator): - kind = KIND.map_choice - - def __init__( - self, - choices_map, - key_byte_count=0, - key_postfix_bytes=b"", - byte_count=0, - read_skip_byte_count=0, - write_prefix_bytes=b"", - extra_default=None, - mask=-1, - activate=0, - ): - assert choices_map is not None - assert isinstance(choices_map, dict) - max_key_bits = 0 - max_value_bits = 0 - for key, choices in choices_map.items(): - assert isinstance(key, NamedInt) - assert isinstance(choices, NamedInts) - max_key_bits = max(max_key_bits, key.bit_length()) - for key_value in choices: - assert isinstance(key_value, NamedInt) - max_value_bits = max(max_value_bits, key_value.bit_length()) - self._key_byte_count = (max_key_bits + 7) // 8 - if key_byte_count: - assert self._key_byte_count <= key_byte_count - self._key_byte_count = key_byte_count - self._byte_count = (max_value_bits + 7) // 8 - if byte_count: - assert self._byte_count <= byte_count - self._byte_count = byte_count - - self.choices = choices_map - self.needs_current_value = False - self.extra_default = extra_default - self._key_postfix_bytes = key_postfix_bytes - self._read_skip_byte_count = read_skip_byte_count if read_skip_byte_count else 0 - self._write_prefix_bytes = write_prefix_bytes if write_prefix_bytes else b"" - self.activate = activate - self.mask = mask - assert self._byte_count + self._read_skip_byte_count + self._key_byte_count <= 14 - assert self._byte_count + len(self._write_prefix_bytes) + self._key_byte_count <= 14 - - def to_string(self, value): - def element_to_string(key, val): - k, c = next(((k, c) for k, c in self.choices.items() if int(key) == k), (None, None)) - return str(k) + ":" + str(c[val]) if k is not None else "?" - - return "{" + ", ".join([element_to_string(k, value[k]) for k in sorted(value)]) + "}" - - def validate_read(self, reply_bytes, key): - start = self._key_byte_count + self._read_skip_byte_count - end = start + self._byte_count - reply_value = common.bytes2int(reply_bytes[start:end]) & self.mask - # reprogrammable keys starts out as 0, which is not a choice, so don't use assert here - if self.extra_default is not None and self.extra_default == reply_value: - return int(self.choices[key][0]) - if reply_value not in self.choices[key]: - assert reply_value in self.choices[key], "%s: failed to validate read value %02X" % ( - self.__class__.__name__, - reply_value, - ) - return reply_value - - def prepare_key(self, key): - return key.to_bytes(self._key_byte_count, "big") + self._key_postfix_bytes - - def prepare_write(self, key, new_value): - choices = self.choices.get(key) - if choices is None or (new_value not in choices and new_value != self.extra_default): - logger.error("invalid choice %r for %s", new_value, key) - return None - new_value = new_value | self.activate - return self._write_prefix_bytes + new_value.to_bytes(self._byte_count, "big") - - def acceptable(self, args, current): - if len(args) != 2: - return None - key, choices = next(((key, item) for key, item in self.choices.items() if key == args[0]), (None, None)) - if choices is None or args[1] not in choices: - return None - choice = next((item for item in choices if item == args[1]), None) - return [int(key), int(choice)] if choice is not None else None - - def compare(self, args, current): - if len(args) != 2: - return False - key = next((key for key in self.choices if key == int(args[0])), None) - if key is None: - return False - return args[1] == current[int(key)] - - -class RangeValidator(Validator): - kind = KIND.range - """Translates between integers and a byte sequence. - :param min_value: minimum accepted value (inclusive) - :param max_value: maximum accepted value (inclusive) - :param byte_count: the size of the derived byte sequence. If None, it - will be calculated from the range.""" - min_value = 0 - max_value = 255 - - @classmethod - def build(cls, setting_class, device, **kwargs): - kwargs["min_value"] = setting_class.min_value - kwargs["max_value"] = setting_class.max_value - return cls(**kwargs) - - def __init__(self, min_value=0, max_value=255, byte_count=1): - assert max_value > min_value - self.min_value = min_value - self.max_value = max_value - self.needs_current_value = True # read and check before write (needed for ADC power and probably a good idea anyway) - - self._byte_count = math.ceil(math.log(max_value + 1, 256)) - if byte_count: - assert self._byte_count <= byte_count - self._byte_count = byte_count - assert self._byte_count < 8 - - def validate_read(self, reply_bytes): - reply_value = common.bytes2int(reply_bytes[: self._byte_count]) - assert reply_value >= self.min_value, f"{self.__class__.__name__}: failed to validate read value {reply_value:02X}" - assert reply_value <= self.max_value, f"{self.__class__.__name__}: failed to validate read value {reply_value:02X}" - return reply_value - - def prepare_write(self, new_value, current_value=None): - if new_value < self.min_value or new_value > self.max_value: - raise ValueError(f"invalid choice {new_value!r}") - current_value = self.validate_read(current_value) if current_value is not None else None - to_write = common.int2bytes(new_value, self._byte_count) - # current value is known and same as value to be written return None to signal not to write it - return None if current_value is not None and current_value == new_value else to_write - - def acceptable(self, args, current): - arg = args[0] - # None if len(args) != 1 or type(arg) != int or arg < self.min_value or arg > self.max_value else args) - return None if len(args) != 1 or isinstance(arg, int) or arg < self.min_value or arg > self.max_value else args - - def compare(self, args, current): - if len(args) == 1: - return args[0] == current - elif len(args) == 2: - return args[0] <= current <= args[1] - else: - return False - - -class HeteroValidator(Validator): - kind = KIND.hetero - - @classmethod - def build(cls, setting_class, device, **kwargs): - return cls(**kwargs) - - def __init__(self, data_class=None, options=None, readable=True): - assert data_class is not None and options is not None - self.data_class = data_class - self.options = options - self.readable = readable - self.needs_current_value = False - - def validate_read(self, reply_bytes): - if self.readable: - reply_value = self.data_class.from_bytes(reply_bytes, options=self.options) - return reply_value - - def prepare_write(self, new_value, current_value=None): - to_write = new_value.to_bytes(options=self.options) - return to_write - - def acceptable(self, args, current): # should this actually do some checking? - return True - - -class PackedRangeValidator(Validator): - kind = KIND.packed_range - """Several range values, all the same size, all the same min and max""" - min_value = 0 - max_value = 255 - count = 1 - rsbc = 0 - write_prefix_bytes = b"" - - def __init__( - self, keys, min_value=0, max_value=255, count=1, byte_count=1, read_skip_byte_count=0, write_prefix_bytes=b"" - ): - assert max_value > min_value - self.needs_current_value = True - self.keys = keys - self.min_value = min_value - self.max_value = max_value - self.count = count - self.bc = math.ceil(math.log(max_value + 1 - min(0, min_value), 256)) - if byte_count: - assert self.bc <= byte_count - self.bc = byte_count - assert self.bc * self.count - self.rsbc = read_skip_byte_count - self.write_prefix_bytes = write_prefix_bytes - - def validate_read(self, reply_bytes): - rvs = { - n: common.bytes2int(reply_bytes[self.rsbc + n * self.bc : self.rsbc + (n + 1) * self.bc], signed=True) - for n in range(self.count) - } - for n in range(self.count): - assert rvs[n] >= self.min_value, f"{self.__class__.__name__}: failed to validate read value {rvs[n]:02X}" - assert rvs[n] <= self.max_value, f"{self.__class__.__name__}: failed to validate read value {rvs[n]:02X}" - return rvs - - def prepare_write(self, new_values): - if len(new_values) != self.count: - raise ValueError(f"wrong number of values {new_values!r}") - for new_value in new_values.values(): - if new_value < self.min_value or new_value > self.max_value: - raise ValueError(f"invalid value {new_value!r}") - bytes = self.write_prefix_bytes + b"".join( - common.int2bytes(new_values[n], self.bc, signed=True) for n in range(self.count) - ) - return bytes - - def acceptable(self, args, current): - if len(args) != 2 or int(args[0]) < 0 or int(args[0]) >= self.count: - return None - return None if not isinstance(args[1], int) or args[1] < self.min_value or args[1] > self.max_value else args - - def compare(self, args, current): - logger.warning("compare not implemented for packed range settings") - return False - - -class MultipleRangeValidator(Validator): - kind = KIND.multiple_range - - def __init__(self, items, sub_items): - assert isinstance(items, list) # each element must have .index and its __int__ must return its id (not its index) - assert isinstance(sub_items, dict) - # sub_items: items -> class with .minimum, .maximum, .length (in bytes), .id (a string) and .widget (e.g. 'Scale') - self.items = items - self.keys = NamedInts(**{str(item): int(item) for item in items}) - self._item_from_id = {int(k): k for k in items} - self.sub_items = sub_items - - def prepare_read_item(self, item): - return common.int2bytes((self._item_from_id[int(item)].index << 1) | 0xFF, 2) - - def validate_read_item(self, reply_bytes, item): - item = self._item_from_id[int(item)] - start = 0 - value = {} - for sub_item in self.sub_items[item]: - r = reply_bytes[start : start + sub_item.length] - if len(r) < sub_item.length: - r += b"\x00" * (sub_item.length - len(value)) - v = common.bytes2int(r) - if not (sub_item.minimum < v < sub_item.maximum): - logger.warning( - f"{self.__class__.__name__}: failed to validate read value for {item}.{sub_item}: " - + f"{v} not in [{sub_item.minimum}..{sub_item.maximum}]" - ) - value[str(sub_item)] = v - start += sub_item.length - return value - - def prepare_write(self, value): - seq = [] - w = b"" - for item in value.keys(): - _item = self._item_from_id[int(item)] - b = common.int2bytes(_item.index, 1) - for sub_item in self.sub_items[_item]: - try: - v = value[int(item)][str(sub_item)] - except KeyError: - return None - if not (sub_item.minimum <= v <= sub_item.maximum): - raise ValueError( - f"invalid choice for {item}.{sub_item}: {v} not in [{sub_item.minimum}..{sub_item.maximum}]" - ) - b += common.int2bytes(v, sub_item.length) - if len(w) + len(b) > 15: - seq.append(b + b"\xff") - w = b"" - w += b - seq.append(w + b"\xff") - return seq - - def prepare_write_item(self, item, value): - _item = self._item_from_id[int(item)] - w = common.int2bytes(_item.index, 1) - for sub_item in self.sub_items[_item]: - try: - v = value[str(sub_item)] - except KeyError: - return None - if not (sub_item.minimum <= v <= sub_item.maximum): - raise ValueError(f"invalid choice for {item}.{sub_item}: {v} not in [{sub_item.minimum}..{sub_item.maximum}]") - w += common.int2bytes(v, sub_item.length) - return w + b"\xff" - - def acceptable(self, args, current): - # just one item, with at least one sub-item - if not isinstance(args, list) or len(args) != 2 or not isinstance(args[1], dict): - return None - item = next((p for p in self.items if p.id == args[0] or str(p) == args[0]), None) - if not item: - return None - for sub_key, value in args[1].items(): - sub_item = next((it for it in self.sub_items[item] if it.id == sub_key), None) - if not sub_item: - return None - if not isinstance(value, int) or not (sub_item.minimum <= value <= sub_item.maximum): - return None - return [int(item), {**args[1]}] - - def compare(self, args, current): - logger.warning("compare not implemented for multiple range settings") - return False - - class ActionSettingRW: """Special RW class for settings that turn on and off special processing when a key or button is depressed""" @@ -1578,4 +870,4 @@ def apply_all_settings(device): s.apply() -Setting.validator_class = BooleanValidator +Setting.validator_class = settings_validator.BooleanValidator diff --git a/lib/logitech_receiver/settings_templates.py b/lib/logitech_receiver/settings_templates.py index 5553dc66..a93294af 100644 --- a/lib/logitech_receiver/settings_templates.py +++ b/lib/logitech_receiver/settings_templates.py @@ -36,6 +36,7 @@ from . import hidpp10_constants from . import hidpp20 from . import hidpp20_constants from . import settings +from . import settings_validator from . import special_keys from .hidpp10_constants import Registers from .hidpp20_constants import GestureId @@ -177,7 +178,7 @@ class RegisterDpi(settings.Setting): description = _("Mouse movement sensitivity") register = Registers.MOUSE_DPI choices_universe = common.NamedInts.range(0x81, 0x8F, lambda x: str((x - 0x80) * 100)) - validator_class = settings.ChoicesValidator + validator_class = settings_validator.ChoicesValidator validator_options = {"choices": choices_universe} @@ -251,7 +252,7 @@ class Backlight(settings.Setting): description = _("Set illumination time for keyboard.") feature = _F.BACKLIGHT choices_universe = common.NamedInts(Off=0, Varying=2, VeryShort=5, Short=10, Medium=20, Long=60, VeryLong=180) - validator_class = settings.ChoicesValidator + validator_class = settings_validator.ChoicesValidator validator_options = {"choices": choices_universe} @@ -285,7 +286,7 @@ class Backlight2(settings.Setting): backlight.write() return True - class validator_class(settings.ChoicesValidator): + class validator_class(settings_validator.ChoicesValidator): @classmethod def build(cls, setting_class, device): backlight = device.backlight @@ -322,7 +323,7 @@ class Backlight2Level(settings.Setting): device.backlight.write() return True - class validator_class(settings.RangeValidator): + class validator_class(settings_validator.RangeValidator): @classmethod def build(cls, setting_class, device): reply = device.feature_request(_F.BACKLIGHT2, 0x20) @@ -334,7 +335,7 @@ class Backlight2Level(settings.Setting): class Backlight2Duration(settings.Setting): feature = _F.BACKLIGHT2 min_version = 3 - validator_class = settings.RangeValidator + validator_class = settings_validator.RangeValidator min_value = 1 max_value = 600 # 10 minutes - actual maximum is 2 hours validator_options = {"byte_count": 2} @@ -363,7 +364,7 @@ class Backlight2DurationHandsOut(Backlight2Duration): label = _("Backlight Delay Hands Out") description = _("Delay in seconds until backlight fades out with hands away from keyboard.") feature = _F.BACKLIGHT2 - validator_class = settings.RangeValidator + validator_class = settings_validator.RangeValidator rw_options = {"field": "dho"} @@ -372,7 +373,7 @@ class Backlight2DurationHandsIn(Backlight2Duration): label = _("Backlight Delay Hands In") description = _("Delay in seconds until backlight fades out with hands near keyboard.") feature = _F.BACKLIGHT2 - validator_class = settings.RangeValidator + validator_class = settings_validator.RangeValidator rw_options = {"field": "dhi"} @@ -381,7 +382,7 @@ class Backlight2DurationPowered(Backlight2Duration): label = _("Backlight Delay Powered") description = _("Delay in seconds until backlight fades out with external power.") feature = _F.BACKLIGHT2 - validator_class = settings.RangeValidator + validator_class = settings_validator.RangeValidator rw_options = {"field": "dpow"} @@ -391,7 +392,7 @@ class Backlight3(settings.Setting): description = _("Set illumination time for keyboard.") feature = _F.BACKLIGHT3 rw_options = {"read_fnid": 0x10, "write_fnid": 0x20, "suffix": b"\x09"} - validator_class = settings.RangeValidator + validator_class = settings_validator.RangeValidator min_value = 0 max_value = 1000 validator_options = {"byte_count": 2} @@ -455,7 +456,7 @@ class PointerSpeed(settings.Setting): label = _("Sensitivity (Pointer Speed)") description = _("Speed multiplier for mouse (256 is normal multiplier).") feature = _F.POINTER_SPEED - validator_class = settings.RangeValidator + validator_class = settings_validator.RangeValidator min_value = 0x002E max_value = 0x01FF validator_options = {"byte_count": 2} @@ -502,7 +503,7 @@ class OnboardProfiles(settings.Setting): for i in range(1, 16): choices_universe[i] = f"Profile {i}" choices_universe[i + 0x100] = f"Read-Only Profile {i}" - validator_class = settings.ChoicesValidator + validator_class = settings_validator.ChoicesValidator class rw_class: def __init__(self, feature): @@ -526,7 +527,7 @@ class OnboardProfiles(settings.Setting): profile_change(device, common.bytes2int(data_bytes)) return result - class validator_class(settings.ChoicesValidator): + class validator_class(settings_validator.ChoicesValidator): @classmethod def build(cls, setting_class, device): headers = hidpp20.OnboardProfiles.get_profile_headers(device) @@ -556,7 +557,7 @@ class ReportRate(settings.Setting): choices_universe[7] = "7ms" choices_universe[8] = "8ms" - class validator_class(settings.ChoicesValidator): + class validator_class(settings_validator.ChoicesValidator): @classmethod def build(cls, setting_class, device): # if device.wpid == '408E': @@ -588,7 +589,7 @@ class ExtendedReportRate(settings.Setting): choices_universe[5] = "250us" choices_universe[6] = "125us" - class validator_class(settings.ChoicesValidator): + class validator_class(settings_validator.ChoicesValidator): @classmethod def build(cls, setting_class, device): reply = device.feature_request(_F.EXTENDED_ADJUSTABLE_REPORT_RATE, 0x10) @@ -640,7 +641,7 @@ class ScrollRatchet(settings.Setting): description = _("Switch the mouse wheel between speed-controlled ratcheting and always freespin.") feature = _F.SMART_SHIFT choices_universe = common.NamedInts(**{_("Freespinning"): 1, _("Ratcheted"): 2}) - validator_class = settings.ChoicesValidator + validator_class = settings_validator.ChoicesValidator validator_options = {"choices": choices_universe} @@ -683,7 +684,7 @@ class SmartShift(settings.Setting): min_value = rw_class.MIN_VALUE max_value = rw_class.MAX_VALUE - validator_class = settings.RangeValidator + validator_class = settings_validator.RangeValidator class SmartShiftEnhanced(SmartShift): @@ -730,7 +731,7 @@ class ReprogrammableKeys(settings.Settings): key_struct.remap(special_keys.CONTROL[common.bytes2int(data_bytes)]) return True - class validator_class(settings.ChoicesMapValidator): + class validator_class(settings_validator.ChoicesMapValidator): @classmethod def build(cls, setting_class, device): choices = {} @@ -907,7 +908,7 @@ class DivertKeys(settings.Settings): key_struct.set_diverted(common.bytes2int(data_bytes) != 0) # not regular return True - class validator_class(settings.ChoicesMapValidator): + class validator_class(settings_validator.ChoicesMapValidator): def __init__(self, choices, key_byte_count=2, byte_count=1, mask=0x01): super().__init__(choices, key_byte_count, byte_count, mask) @@ -983,7 +984,7 @@ class AdjustableDpi(settings.Setting): rw_options = {"read_fnid": 0x20, "write_fnid": 0x30} choices_universe = common.NamedInts.range(100, 4000, str, 50) - class validator_class(settings.ChoicesValidator): + class validator_class(settings_validator.ChoicesValidator): @classmethod def build(cls, setting_class, device): dpilist = produce_dpi_list(setting_class.feature, 0x10, 1, device, 0) @@ -1024,7 +1025,7 @@ class ExtendedAdjustableDpi(settings.Setting): result = self.write(self._value, save) return result[key] if isinstance(result, dict) else result - class validator_class(settings.ChoicesMapValidator): + class validator_class(settings_validator.ChoicesMapValidator): @classmethod def build(cls, setting_class, device): reply = device.feature_request(setting_class.feature, 0x10, 0x00) @@ -1107,7 +1108,7 @@ class SpeedChange(settings.Setting): if self.device.persister: self.device.persister["_speed-change"] = currentSpeed - class validator_class(settings.ChoicesValidator): + class validator_class(settings_validator.ChoicesValidator): @classmethod def build(cls, setting_class, device): key_index = device.keys.index(special_keys.CONTROL.DPI_Change) @@ -1126,7 +1127,7 @@ class DisableKeyboardKeys(settings.BitFieldSetting): _labels = {k: (None, _("Disables the %s key.") % k) for k in special_keys.DISABLE} choices_universe = special_keys.DISABLE - class validator_class(settings.BitFieldValidator): + class validator_class(settings_validator.BitFieldValidator): @classmethod def build(cls, setting_class, device): mask = device.feature_request(_F.KEYBOARD_DISABLE_KEYS, 0x00)[0] @@ -1158,7 +1159,7 @@ class Multiplatform(settings.Setting): # the problem here is how to construct the right values for the rules Set GUI, # as, for example, the integer value for 'Windows' can be different on different devices - class validator_class(settings.ChoicesValidator): + class validator_class(settings_validator.ChoicesValidator): @classmethod def build(cls, setting_class, device): def _str_os_versions(low, high): @@ -1200,7 +1201,7 @@ class DualPlatform(settings.Setting): choices_universe[0x01] = "Android, Windows" feature = _F.DUALPLATFORM rw_options = {"read_fnid": 0x00, "write_fnid": 0x20} - validator_class = settings.ChoicesValidator + validator_class = settings_validator.ChoicesValidator validator_options = {"choices": choices_universe} @@ -1213,7 +1214,7 @@ class ChangeHost(settings.Setting): rw_options = {"read_fnid": 0x00, "write_fnid": 0x10, "no_reply": True} choices_universe = common.NamedInts(**{"Host " + str(i + 1): i for i in range(3)}) - class validator_class(settings.ChoicesValidator): + class validator_class(settings_validator.ChoicesValidator): @classmethod def build(cls, setting_class, device): infos = device.feature_request(_F.CHANGE_HOST) @@ -1325,7 +1326,7 @@ class Gesture2Gestures(settings.BitFieldWithOffsetAndMaskSetting): choices_universe = hidpp20_constants.GestureId _labels = _GESTURE2_GESTURES_LABELS - class validator_class(settings.BitFieldWithOffsetAndMaskValidator): + class validator_class(settings_validator.BitFieldWithOffsetAndMaskValidator): @classmethod def build(cls, setting_class, device, om_method=None): options = [g for g in device.gestures.gestures.values() if g.can_be_enabled or g.default_enabled] @@ -1342,7 +1343,7 @@ class Gesture2Divert(settings.BitFieldWithOffsetAndMaskSetting): choices_universe = hidpp20_constants.GestureId _labels = _GESTURE2_GESTURES_LABELS - class validator_class(settings.BitFieldWithOffsetAndMaskValidator): + class validator_class(settings_validator.BitFieldWithOffsetAndMaskValidator): @classmethod def build(cls, setting_class, device, om_method=None): options = [g for g in device.gestures.gestures.values() if g.can_be_diverted] @@ -1363,7 +1364,7 @@ class Gesture2Params(settings.LongSettings): _labels = _GESTURE2_PARAMS_LABELS _labels_sub = _GESTURE2_PARAMS_LABELS_SUB - class validator_class(settings.MultipleRangeValidator): + class validator_class(settings_validator.MultipleRangeValidator): @classmethod def build(cls, setting_class, device): params = _hidpp20.get_gestures(device).params.values() @@ -1397,7 +1398,7 @@ class MKeyLEDs(settings.BitFieldSetting): def read(self, device): # no way to read, so just assume off return b"\x00" - class validator_class(settings.BitFieldValidator): + class validator_class(settings_validator.BitFieldValidator): @classmethod def build(cls, setting_class, device): number = device.feature_request(setting_class.feature, 0x00)[0] @@ -1455,7 +1456,7 @@ class PersistentRemappableAction(settings.Settings): v = ks.remap(data_bytes) return v - class validator_class(settings.ChoicesMapValidator): + class validator_class(settings_validator.ChoicesMapValidator): @classmethod def build(cls, setting_class, device): remap_keys = device.remap_keys @@ -1494,7 +1495,7 @@ class Sidetone(settings.Setting): label = _("Sidetone") description = _("Set sidetone level.") feature = _F.SIDETONE - validator_class = settings.RangeValidator + validator_class = settings_validator.RangeValidator min_value = 0 max_value = 100 @@ -1507,7 +1508,7 @@ class Equalizer(settings.RangeFieldSetting): rw_options = {"read_fnid": 0x20, "write_fnid": 0x30, "read_prefix": b"\x00"} keys_universe = [] - class validator_class(settings.PackedRangeValidator): + class validator_class(settings_validator.PackedRangeValidator): @classmethod def build(cls, setting_class, device): data = device.feature_request(_F.EQUALIZER, 0x00) @@ -1534,7 +1535,7 @@ class ADCPower(settings.Setting): description = _("Power off in minutes (0 for never).") feature = _F.ADC_MEASUREMENT rw_options = {"read_fnid": 0x10, "write_fnid": 0x20} - validator_class = settings.RangeValidator + validator_class = settings_validator.RangeValidator min_value = 0x00 max_value = 0xFF validator_options = {"byte_count": 1} @@ -1546,7 +1547,7 @@ class BrightnessControl(settings.Setting): description = _("Control overall brightness") feature = _F.BRIGHTNESS_CONTROL rw_options = {"read_fnid": 0x10, "write_fnid": 0x20} - validator_class = settings.RangeValidator + validator_class = settings_validator.RangeValidator def __init__(self, device, rw, validator): super().__init__(device, rw, validator) @@ -1570,7 +1571,7 @@ class BrightnessControl(settings.Setting): return reply return super().write(device, data_bytes) - class validator_class(settings.RangeValidator): + class validator_class(settings_validator.RangeValidator): @classmethod def build(cls, setting_class, device): reply = device.feature_request(_F.BRIGHTNESS_CONTROL) @@ -1591,7 +1592,7 @@ class LEDControl(settings.Setting): feature = _F.COLOR_LED_EFFECTS rw_options = {"read_fnid": 0x70, "write_fnid": 0x80} choices_universe = common.NamedInts(Device=0, Solaar=1) - validator_class = settings.ChoicesValidator + validator_class = settings_validator.ChoicesValidator validator_options = {"choices": choices_universe} @@ -1605,12 +1606,13 @@ class LEDZoneSetting(settings.Setting): label = _("LED Zone Effects") description = _("Set effect for LED Zone") + "\n" + _("LED Control needs to be set to Solaar to be effective.") feature = _F.COLOR_LED_EFFECTS - color_field = {"name": _LEDP.color, "kind": settings.KIND.choice, "label": None, "choices": colors} - speed_field = {"name": _LEDP.speed, "kind": settings.KIND.range, "label": _("Speed"), "min": 0, "max": 255} - period_field = {"name": _LEDP.period, "kind": settings.KIND.range, "label": _("Period"), "min": 100, "max": 5000} - intensity_field = {"name": _LEDP.intensity, "kind": settings.KIND.range, "label": _("Intensity"), "min": 0, "max": 100} - ramp_field = {"name": _LEDP.ramp, "kind": settings.KIND.choice, "label": _("Ramp"), "choices": hidpp20.LEDRampChoices} - # form_field = {"name": _LEDP.form, "kind": settings.KIND.choice, "label": _("Form"), "choices": _hidpp20.LEDFormChoices} + color_field = {"name": _LEDP.color, "kind": settings.Kind.CHOICE, "label": None, "choices": colors} + speed_field = {"name": _LEDP.speed, "kind": settings.Kind.RANGE, "label": _("Speed"), "min": 0, "max": 255} + period_field = {"name": _LEDP.period, "kind": settings.Kind.RANGE, "label": _("Period"), "min": 100, "max": 5000} + intensity_field = {"name": _LEDP.intensity, "kind": settings.Kind.RANGE, "label": _("Intensity"), "min": 0, "max": 100} + ramp_field = {"name": _LEDP.ramp, "kind": settings.Kind.CHOICE, "label": _("Ramp"), "choices": hidpp20.LEDRampChoices} + # form_field = {"name": _LEDP.form, "kind": settings.Kind.CHOICE, "label": _("Form"), "choices": + # _hidpp20.LEDFormChoices} possible_fields = [color_field, speed_field, period_field, intensity_field, ramp_field] @classmethod @@ -1620,14 +1622,14 @@ class LEDZoneSetting(settings.Setting): for zone in infos.zones: prefix = common.int2bytes(zone.index, 1) rw = settings.FeatureRW(cls.feature, read_fnid, write_fnid, prefix=prefix, suffix=suffix) - validator = settings.HeteroValidator( + validator = settings_validator.HeteroValidator( data_class=hidpp20.LEDEffectSetting, options=zone.effects, readable=infos.readable ) setting = cls(device, rw, validator) setting.name = cls.name + str(int(zone.location)) setting.label = _("LEDs") + " " + str(hidpp20.LEDZoneLocations[zone.location]) choices = [hidpp20.LEDEffects[e.ID][0] for e in zone.effects if e.ID in hidpp20.LEDEffects] - ID_field = {"name": "ID", "kind": settings.KIND.choice, "label": None, "choices": choices} + ID_field = {"name": "ID", "kind": settings.Kind.CHOICE, "label": None, "choices": choices} setting.possible_fields = [ID_field] + cls.possible_fields setting.fields_map = hidpp20.LEDEffects settings_.append(setting) @@ -1645,7 +1647,7 @@ class RGBControl(settings.Setting): feature = _F.RGB_EFFECTS rw_options = {"read_fnid": 0x50, "write_fnid": 0x50} choices_universe = common.NamedInts(Device=0, Solaar=1) - validator_class = settings.ChoicesValidator + validator_class = settings_validator.ChoicesValidator validator_options = {"choices": choices_universe, "write_prefix_bytes": b"\x01", "read_skip_byte_count": 1} @@ -1723,7 +1725,7 @@ class PerKeyLighting(settings.Settings): class rw_class(settings.FeatureRWMap): pass - class validator_class(settings.ChoicesMapValidator): + class validator_class(settings_validator.ChoicesMapValidator): @classmethod def build(cls, setting_class, device): choices_map = {} @@ -1742,7 +1744,7 @@ class PerKeyLighting(settings.Settings): return result -SETTINGS = [ +SETTINGS: list[settings.Setting] = [ RegisterHandDetection, # simple RegisterSmoothScroll, # simple RegisterSideScroll, # simple diff --git a/lib/logitech_receiver/settings_validator.py b/lib/logitech_receiver/settings_validator.py new file mode 100644 index 00000000..b2102d04 --- /dev/null +++ b/lib/logitech_receiver/settings_validator.py @@ -0,0 +1,744 @@ +from __future__ import annotations + +import logging +import math + +from enum import IntEnum + +from logitech_receiver import common +from logitech_receiver.common import NamedInt +from logitech_receiver.common import NamedInts + +logger = logging.getLogger(__name__) + + +def bool_or_toggle(current: bool | str, new: bool | str) -> bool: + if isinstance(new, bool): + return new + + try: + return bool(int(new)) + except (TypeError, ValueError): + new = str(new).lower() + + if new in ("true", "yes", "on", "t", "y"): + return True + if new in ("false", "no", "off", "f", "n"): + return False + if new in ("~", "toggle"): + return not current + return None + + +class Kind(IntEnum): + TOGGLE = 0x01 + CHOICE = 0x02 + RANGE = 0x04 + MAP_CHOICE = 0x0A + MULTIPLE_TOGGLE = 0x10 + PACKED_RANGE = 0x20 + MULTIPLE_RANGE = 0x40 + HETERO = 0x80 + + +class Validator: + @classmethod + def build(cls, setting_class, device, **kwargs): + return cls(**kwargs) + + @classmethod + def to_string(cls, value): + return str(value) + + def compare(self, args, current): + if len(args) != 1: + return False + return args[0] == current + + +class BooleanValidator(Validator): + __slots__ = ("true_value", "false_value", "read_skip_byte_count", "write_prefix_bytes", "mask", "needs_current_value") + + kind = Kind.TOGGLE + default_true = 0x01 + default_false = 0x00 + # mask specifies all the affected bits in the value + default_mask = 0xFF + + def __init__( + self, + true_value=default_true, + false_value=default_false, + mask=default_mask, + read_skip_byte_count=0, + write_prefix_bytes=b"", + ): + if isinstance(true_value, int): + assert isinstance(false_value, int) + if mask is None: + mask = self.default_mask + else: + assert isinstance(mask, int) + assert true_value & false_value == 0 + assert true_value & mask == true_value + assert false_value & mask == false_value + self.needs_current_value = mask != self.default_mask + elif isinstance(true_value, bytes): + if false_value is None or false_value == self.default_false: + false_value = b"\x00" * len(true_value) + else: + assert isinstance(false_value, bytes) + if mask is None or mask == self.default_mask: + mask = b"\xff" * len(true_value) + else: + assert isinstance(mask, bytes) + assert len(mask) == len(true_value) == len(false_value) + tv = common.bytes2int(true_value) + fv = common.bytes2int(false_value) + mv = common.bytes2int(mask) + assert tv != fv # true and false might be something other than bit values + assert tv & mv == tv + assert fv & mv == fv + self.needs_current_value = any(m != 0xFF for m in mask) + else: + raise Exception(f"invalid mask '{mask!r}', type {type(mask)}") + + self.true_value = true_value + self.false_value = false_value + self.mask = mask + self.read_skip_byte_count = read_skip_byte_count + self.write_prefix_bytes = write_prefix_bytes + + def validate_read(self, reply_bytes): + reply_bytes = reply_bytes[self.read_skip_byte_count :] + if isinstance(self.mask, int): + reply_value = ord(reply_bytes[:1]) & self.mask + if logger.isEnabledFor(logging.DEBUG): + logger.debug("BooleanValidator: validate read %r => %02X", reply_bytes, reply_value) + if reply_value == self.true_value: + return True + if reply_value == self.false_value: + return False + logger.warning( + "BooleanValidator: reply %02X mismatched %02X/%02X/%02X", + reply_value, + self.true_value, + self.false_value, + self.mask, + ) + return False + + count = len(self.mask) + mask = common.bytes2int(self.mask) + reply_value = common.bytes2int(reply_bytes[:count]) & mask + + true_value = common.bytes2int(self.true_value) + if reply_value == true_value: + return True + + false_value = common.bytes2int(self.false_value) + if reply_value == false_value: + return False + + logger.warning( + "BooleanValidator: reply %r mismatched %r/%r/%r", reply_bytes, self.true_value, self.false_value, self.mask + ) + return False + + def prepare_write(self, new_value, current_value=None): + if new_value is None: + new_value = False + else: + assert isinstance(new_value, bool), f"New value {new_value} for boolean setting is not a boolean" + + to_write = self.true_value if new_value else self.false_value + + if isinstance(self.mask, int): + if current_value is not None and self.needs_current_value: + to_write |= ord(current_value[:1]) & (0xFF ^ self.mask) + if current_value is not None and to_write == ord(current_value[:1]): + return None + to_write = bytes([to_write]) + else: + to_write = bytearray(to_write) + count = len(self.mask) + for i in range(0, count): + b = ord(to_write[i : i + 1]) + m = ord(self.mask[i : i + 1]) + assert b & m == b + # b &= m + if current_value is not None and self.needs_current_value: + b |= ord(current_value[i : i + 1]) & (0xFF ^ m) + to_write[i] = b + to_write = bytes(to_write) + + if current_value is not None and to_write == current_value[: len(to_write)]: + return None + + if logger.isEnabledFor(logging.DEBUG): + logger.debug("BooleanValidator: prepare_write(%s, %s) => %r", new_value, current_value, to_write) + + return self.write_prefix_bytes + to_write + + def acceptable(self, args, current): + if len(args) != 1: + return None + val = bool_or_toggle(current, args[0]) + return [val] if val is not None else None + + +class BitFieldValidator(Validator): + __slots__ = ("byte_count", "options") + + kind = Kind.MULTIPLE_TOGGLE + + def __init__(self, options, byte_count=None): + assert isinstance(options, list) + self.options = options + self.byte_count = (max(x.bit_length() for x in options) + 7) // 8 + if byte_count: + assert isinstance(byte_count, int) and byte_count >= self.byte_count + self.byte_count = byte_count + + def to_string(self, value): + def element_to_string(key, val): + k = next((k for k in self.options if int(key) == k), None) + return str(k) + ":" + str(val) if k is not None else "?" + + return "{" + ", ".join([element_to_string(k, value[k]) for k in value]) + "}" + + def validate_read(self, reply_bytes): + r = common.bytes2int(reply_bytes[: self.byte_count]) + value = {int(k): False for k in self.options} + m = 1 + for _ignore in range(8 * self.byte_count): + if m in self.options: + value[int(m)] = bool(r & m) + m <<= 1 + return value + + def prepare_write(self, new_value): + assert isinstance(new_value, dict) + w = 0 + for k, v in new_value.items(): + if v: + w |= int(k) + return common.int2bytes(w, self.byte_count) + + def get_options(self): + return self.options + + def acceptable(self, args, current): + if len(args) != 2: + return None + key = next((key for key in self.options if key == args[0]), None) + if key is None: + return None + val = bool_or_toggle(current[int(key)], args[1]) + return None if val is None else [int(key), val] + + def compare(self, args, current): + if len(args) != 2: + return False + key = next((key for key in self.options if key == args[0]), None) + if key is None: + return False + return args[1] == current[int(key)] + + +class BitFieldWithOffsetAndMaskValidator(Validator): + __slots__ = ("byte_count", "options", "_option_from_key", "_mask_from_offset", "_option_from_offset_mask") + + kind = Kind.MULTIPLE_TOGGLE + sep = 0x01 + + def __init__(self, options, om_method=None, byte_count=None): + assert isinstance(options, list) + # each element of options is an instance of a class + # that has an id (which is used as an index in other dictionaries) + # and where om_method is a method that returns a byte offset and byte mask + # that says how to access and modify the bit toggle for the option + self.options = options + self.om_method = om_method + # to retrieve the options efficiently: + self._option_from_key = {} + self._mask_from_offset = {} + self._option_from_offset_mask = {} + for opt in options: + offset, mask = om_method(opt) + self._option_from_key[int(opt)] = opt + try: + self._mask_from_offset[offset] |= mask + except KeyError: + self._mask_from_offset[offset] = mask + try: + mask_to_opt = self._option_from_offset_mask[offset] + except KeyError: + mask_to_opt = {} + self._option_from_offset_mask[offset] = mask_to_opt + mask_to_opt[mask] = opt + self.byte_count = (max(om_method(x)[1].bit_length() for x in options) + 7) // 8 # is this correct?? + if byte_count: + assert isinstance(byte_count, int) and byte_count >= self.byte_count + self.byte_count = byte_count + + def prepare_read(self): + r = [] + for offset, mask in self._mask_from_offset.items(): + b = offset << (8 * (self.byte_count + 1)) + b |= (self.sep << (8 * self.byte_count)) | mask + r.append(common.int2bytes(b, self.byte_count + 2)) + return r + + def prepare_read_key(self, key): + option = self._option_from_key.get(key, None) + if option is None: + return None + offset, mask = option.om_method(option) + b = offset << (8 * (self.byte_count + 1)) + b |= (self.sep << (8 * self.byte_count)) | mask + return common.int2bytes(b, self.byte_count + 2) + + def validate_read(self, reply_bytes_dict): + values = {int(k): False for k in self.options} + for query, b in reply_bytes_dict.items(): + offset = common.bytes2int(query[0:1]) + b += (self.byte_count - len(b)) * b"\x00" + value = common.bytes2int(b[: self.byte_count]) + mask_to_opt = self._option_from_offset_mask.get(offset, {}) + m = 1 + for _ignore in range(8 * self.byte_count): + if m in mask_to_opt: + values[int(mask_to_opt[m])] = bool(value & m) + m <<= 1 + return values + + def prepare_write(self, new_value): + assert isinstance(new_value, dict) + w = {} + for k, v in new_value.items(): + option = self._option_from_key[int(k)] + offset, mask = self.om_method(option) + if offset not in w: + w[offset] = 0 + if v: + w[offset] |= mask + return [ + common.int2bytes( + (offset << (8 * (2 * self.byte_count + 1))) + | (self.sep << (16 * self.byte_count)) + | (self._mask_from_offset[offset] << (8 * self.byte_count)) + | value, + 2 * self.byte_count + 2, + ) + for offset, value in w.items() + ] + + def get_options(self): + return [int(opt) if isinstance(opt, int) else opt.as_int() for opt in self.options] + + def acceptable(self, args, current): + if len(args) != 2: + return None + key = next((option.id for option in self.options if option.as_int() == args[0]), None) + if key is None: + return None + val = bool_or_toggle(current[int(key)], args[1]) + return None if val is None else [int(key), val] + + def compare(self, args, current): + if len(args) != 2: + return False + key = next((option.id for option in self.options if option.as_int() == args[0]), None) + if key is None: + return False + return args[1] == current[int(key)] + + +class ChoicesValidator(Validator): + """Translates between NamedInts and a byte sequence. + :param choices: a list of NamedInts + :param byte_count: the size of the derived byte sequence. If None, it + will be calculated from the choices.""" + + kind = Kind.CHOICE + + def __init__(self, choices=None, byte_count=None, read_skip_byte_count=0, write_prefix_bytes=b""): + assert choices is not None + assert isinstance(choices, NamedInts) + assert len(choices) > 1 + self.choices = choices + self.needs_current_value = False + + max_bits = max(x.bit_length() for x in choices) + self._byte_count = (max_bits // 8) + (1 if max_bits % 8 else 0) + if byte_count: + assert self._byte_count <= byte_count + self._byte_count = byte_count + assert self._byte_count < 8 + self._read_skip_byte_count = read_skip_byte_count + self._write_prefix_bytes = write_prefix_bytes if write_prefix_bytes else b"" + assert self._byte_count + self._read_skip_byte_count <= 14 + assert self._byte_count + len(self._write_prefix_bytes) <= 14 + + def to_string(self, value): + return str(self.choices[value]) if isinstance(value, int) else str(value) + + def validate_read(self, reply_bytes): + reply_value = common.bytes2int(reply_bytes[self._read_skip_byte_count : self._read_skip_byte_count + self._byte_count]) + valid_value = self.choices[reply_value] + assert valid_value is not None, f"{self.__class__.__name__}: failed to validate read value {reply_value:02X}" + return valid_value + + def prepare_write(self, new_value, current_value=None): + if new_value is None: + value = self.choices[:][0] + else: + value = self.choice(new_value) + if value is None: + raise ValueError(f"invalid choice {new_value!r}") + assert isinstance(value, NamedInt) + return self._write_prefix_bytes + value.bytes(self._byte_count) + + def choice(self, value): + if isinstance(value, int): + return self.choices[value] + try: + int(value) + if int(value) in self.choices: + return self.choices[int(value)] + except Exception: + pass + if value in self.choices: + return self.choices[value] + else: + return None + + def acceptable(self, args, current): + choice = self.choice(args[0]) if len(args) == 1 else None + return None if choice is None else [choice] + + +class ChoicesMapValidator(ChoicesValidator): + kind = Kind.MAP_CHOICE + + def __init__( + self, + choices_map, + key_byte_count=0, + key_postfix_bytes=b"", + byte_count=0, + read_skip_byte_count=0, + write_prefix_bytes=b"", + extra_default=None, + mask=-1, + activate=0, + ): + assert choices_map is not None + assert isinstance(choices_map, dict) + max_key_bits = 0 + max_value_bits = 0 + for key, choices in choices_map.items(): + assert isinstance(key, NamedInt) + assert isinstance(choices, NamedInts) + max_key_bits = max(max_key_bits, key.bit_length()) + for key_value in choices: + assert isinstance(key_value, NamedInt) + max_value_bits = max(max_value_bits, key_value.bit_length()) + self._key_byte_count = (max_key_bits + 7) // 8 + if key_byte_count: + assert self._key_byte_count <= key_byte_count + self._key_byte_count = key_byte_count + self._byte_count = (max_value_bits + 7) // 8 + if byte_count: + assert self._byte_count <= byte_count + self._byte_count = byte_count + + self.choices = choices_map + self.needs_current_value = False + self.extra_default = extra_default + self._key_postfix_bytes = key_postfix_bytes + self._read_skip_byte_count = read_skip_byte_count if read_skip_byte_count else 0 + self._write_prefix_bytes = write_prefix_bytes if write_prefix_bytes else b"" + self.activate = activate + self.mask = mask + assert self._byte_count + self._read_skip_byte_count + self._key_byte_count <= 14 + assert self._byte_count + len(self._write_prefix_bytes) + self._key_byte_count <= 14 + + def to_string(self, value): + def element_to_string(key, val): + k, c = next(((k, c) for k, c in self.choices.items() if int(key) == k), (None, None)) + return str(k) + ":" + str(c[val]) if k is not None else "?" + + return "{" + ", ".join([element_to_string(k, value[k]) for k in sorted(value)]) + "}" + + def validate_read(self, reply_bytes, key): + start = self._key_byte_count + self._read_skip_byte_count + end = start + self._byte_count + reply_value = common.bytes2int(reply_bytes[start:end]) & self.mask + # reprogrammable keys starts out as 0, which is not a choice, so don't use assert here + if self.extra_default is not None and self.extra_default == reply_value: + return int(self.choices[key][0]) + if reply_value not in self.choices[key]: + assert reply_value in self.choices[key], "%s: failed to validate read value %02X" % ( + self.__class__.__name__, + reply_value, + ) + return reply_value + + def prepare_key(self, key): + return key.to_bytes(self._key_byte_count, "big") + self._key_postfix_bytes + + def prepare_write(self, key, new_value): + choices = self.choices.get(key) + if choices is None or (new_value not in choices and new_value != self.extra_default): + logger.error("invalid choice %r for %s", new_value, key) + return None + new_value = new_value | self.activate + return self._write_prefix_bytes + new_value.to_bytes(self._byte_count, "big") + + def acceptable(self, args, current): + if len(args) != 2: + return None + key, choices = next(((key, item) for key, item in self.choices.items() if key == args[0]), (None, None)) + if choices is None or args[1] not in choices: + return None + choice = next((item for item in choices if item == args[1]), None) + return [int(key), int(choice)] if choice is not None else None + + def compare(self, args, current): + if len(args) != 2: + return False + key = next((key for key in self.choices if key == int(args[0])), None) + if key is None: + return False + return args[1] == current[int(key)] + + +class RangeValidator(Validator): + kind = Kind.RANGE + """Translates between integers and a byte sequence. + :param min_value: minimum accepted value (inclusive) + :param max_value: maximum accepted value (inclusive) + :param byte_count: the size of the derived byte sequence. If None, it + will be calculated from the range.""" + min_value = 0 + max_value = 255 + + @classmethod + def build(cls, setting_class, device, **kwargs): + kwargs["min_value"] = setting_class.min_value + kwargs["max_value"] = setting_class.max_value + return cls(**kwargs) + + def __init__(self, min_value=0, max_value=255, byte_count=1): + assert max_value > min_value + self.min_value = min_value + self.max_value = max_value + self.needs_current_value = True # read and check before write (needed for ADC power and probably a good idea anyway) + + self._byte_count = math.ceil(math.log(max_value + 1, 256)) + if byte_count: + assert self._byte_count <= byte_count + self._byte_count = byte_count + assert self._byte_count < 8 + + def validate_read(self, reply_bytes): + reply_value = common.bytes2int(reply_bytes[: self._byte_count]) + assert reply_value >= self.min_value, f"{self.__class__.__name__}: failed to validate read value {reply_value:02X}" + assert reply_value <= self.max_value, f"{self.__class__.__name__}: failed to validate read value {reply_value:02X}" + return reply_value + + def prepare_write(self, new_value, current_value=None): + if new_value < self.min_value or new_value > self.max_value: + raise ValueError(f"invalid choice {new_value!r}") + current_value = self.validate_read(current_value) if current_value is not None else None + to_write = common.int2bytes(new_value, self._byte_count) + # current value is known and same as value to be written return None to signal not to write it + return None if current_value is not None and current_value == new_value else to_write + + def acceptable(self, args, current): + arg = args[0] + # None if len(args) != 1 or type(arg) != int or arg < self.min_value or arg > self.max_value else args) + return None if len(args) != 1 or isinstance(arg, int) or arg < self.min_value or arg > self.max_value else args + + def compare(self, args, current): + if len(args) == 1: + return args[0] == current + elif len(args) == 2: + return args[0] <= current <= args[1] + else: + return False + + +class HeteroValidator(Validator): + kind = Kind.HETERO + + @classmethod + def build(cls, setting_class, device, **kwargs): + return cls(**kwargs) + + def __init__(self, data_class=None, options=None, readable=True): + assert data_class is not None and options is not None + self.data_class = data_class + self.options = options + self.readable = readable + self.needs_current_value = False + + def validate_read(self, reply_bytes): + if self.readable: + reply_value = self.data_class.from_bytes(reply_bytes, options=self.options) + return reply_value + + def prepare_write(self, new_value, current_value=None): + to_write = new_value.to_bytes(options=self.options) + return to_write + + def acceptable(self, args, current): # should this actually do some checking? + return True + + +class PackedRangeValidator(Validator): + kind = Kind.PACKED_RANGE + """Several range values, all the same size, all the same min and max""" + min_value = 0 + max_value = 255 + count = 1 + rsbc = 0 + write_prefix_bytes = b"" + + def __init__( + self, keys, min_value=0, max_value=255, count=1, byte_count=1, read_skip_byte_count=0, write_prefix_bytes=b"" + ): + assert max_value > min_value + self.needs_current_value = True + self.keys = keys + self.min_value = min_value + self.max_value = max_value + self.count = count + self.bc = math.ceil(math.log(max_value + 1 - min(0, min_value), 256)) + if byte_count: + assert self.bc <= byte_count + self.bc = byte_count + assert self.bc * self.count + self.rsbc = read_skip_byte_count + self.write_prefix_bytes = write_prefix_bytes + + def validate_read(self, reply_bytes): + rvs = { + n: common.bytes2int(reply_bytes[self.rsbc + n * self.bc : self.rsbc + (n + 1) * self.bc], signed=True) + for n in range(self.count) + } + for n in range(self.count): + assert rvs[n] >= self.min_value, f"{self.__class__.__name__}: failed to validate read value {rvs[n]:02X}" + assert rvs[n] <= self.max_value, f"{self.__class__.__name__}: failed to validate read value {rvs[n]:02X}" + return rvs + + def prepare_write(self, new_values): + if len(new_values) != self.count: + raise ValueError(f"wrong number of values {new_values!r}") + for new_value in new_values.values(): + if new_value < self.min_value or new_value > self.max_value: + raise ValueError(f"invalid value {new_value!r}") + bytes = self.write_prefix_bytes + b"".join( + common.int2bytes(new_values[n], self.bc, signed=True) for n in range(self.count) + ) + return bytes + + def acceptable(self, args, current): + if len(args) != 2 or int(args[0]) < 0 or int(args[0]) >= self.count: + return None + return None if not isinstance(args[1], int) or args[1] < self.min_value or args[1] > self.max_value else args + + def compare(self, args, current): + logger.warning("compare not implemented for packed range settings") + return False + + +class MultipleRangeValidator(Validator): + kind = Kind.MULTIPLE_RANGE + + def __init__(self, items, sub_items): + assert isinstance(items, list) # each element must have .index and its __int__ must return its id (not its index) + assert isinstance(sub_items, dict) + # sub_items: items -> class with .minimum, .maximum, .length (in bytes), .id (a string) and .widget (e.g. 'Scale') + self.items = items + self.keys = NamedInts(**{str(item): int(item) for item in items}) + self._item_from_id = {int(k): k for k in items} + self.sub_items = sub_items + + def prepare_read_item(self, item): + return common.int2bytes((self._item_from_id[int(item)].index << 1) | 0xFF, 2) + + def validate_read_item(self, reply_bytes, item): + item = self._item_from_id[int(item)] + start = 0 + value = {} + for sub_item in self.sub_items[item]: + r = reply_bytes[start : start + sub_item.length] + if len(r) < sub_item.length: + r += b"\x00" * (sub_item.length - len(value)) + v = common.bytes2int(r) + if not (sub_item.minimum < v < sub_item.maximum): + logger.warning( + f"{self.__class__.__name__}: failed to validate read value for {item}.{sub_item}: " + + f"{v} not in [{sub_item.minimum}..{sub_item.maximum}]" + ) + value[str(sub_item)] = v + start += sub_item.length + return value + + def prepare_write(self, value): + seq = [] + w = b"" + for item in value.keys(): + _item = self._item_from_id[int(item)] + b = common.int2bytes(_item.index, 1) + for sub_item in self.sub_items[_item]: + try: + v = value[int(item)][str(sub_item)] + except KeyError: + return None + if not (sub_item.minimum <= v <= sub_item.maximum): + raise ValueError( + f"invalid choice for {item}.{sub_item}: {v} not in [{sub_item.minimum}..{sub_item.maximum}]" + ) + b += common.int2bytes(v, sub_item.length) + if len(w) + len(b) > 15: + seq.append(b + b"\xff") + w = b"" + w += b + seq.append(w + b"\xff") + return seq + + def prepare_write_item(self, item, value): + _item = self._item_from_id[int(item)] + w = common.int2bytes(_item.index, 1) + for sub_item in self.sub_items[_item]: + try: + v = value[str(sub_item)] + except KeyError: + return None + if not (sub_item.minimum <= v <= sub_item.maximum): + raise ValueError(f"invalid choice for {item}.{sub_item}: {v} not in [{sub_item.minimum}..{sub_item.maximum}]") + w += common.int2bytes(v, sub_item.length) + return w + b"\xff" + + def acceptable(self, args, current): + # just one item, with at least one sub-item + if not isinstance(args, list) or len(args) != 2 or not isinstance(args[1], dict): + return None + item = next((p for p in self.items if p.id == args[0] or str(p) == args[0]), None) + if not item: + return None + for sub_key, value in args[1].items(): + sub_item = next((it for it in self.sub_items[item] if it.id == sub_key), None) + if not sub_item: + return None + if not isinstance(value, int) or not (sub_item.minimum <= value <= sub_item.maximum): + return None + return [int(item), {**args[1]}] + + def compare(self, args, current): + logger.warning("compare not implemented for multiple range settings") + return False diff --git a/lib/solaar/cli/config.py b/lib/solaar/cli/config.py index 1e26a43e..914a2aa7 100644 --- a/lib/solaar/cli/config.py +++ b/lib/solaar/cli/config.py @@ -30,9 +30,9 @@ def _print_setting(s, verbose=True): if verbose: if s.description: print("#", s.description.replace("\n", " ")) - if s.kind == settings.KIND.toggle: + if s.kind == settings.Kind.TOGGLE: print("# possible values: on/true/t/yes/y/1 or off/false/f/no/n/0 or Toggle/~") - elif s.kind == settings.KIND.choice: + elif s.kind == settings.Kind.CHOICE: print( "# possible values: one of [", ", ".join(str(v) for v in s.choices), @@ -53,7 +53,7 @@ def _print_setting_keyed(s, key, verbose=True): if verbose: if s.description: print("#", s.description.replace("\n", " ")) - if s.kind == settings.KIND.multiple_toggle: + if s.kind == settings.Kind.MULTIPLE_TOGGLE: k = next((k for k in s._labels if key == k), None) if k is None: print(s.name, "=? (key not found)") @@ -64,7 +64,7 @@ def _print_setting_keyed(s, key, verbose=True): print(s.name, "= ? (failed to read from device)") else: print(s.name, s.val_to_string({k: value[str(int(k))]})) - elif s.kind == settings.KIND.map_choice: + elif s.kind == settings.Kind.MAP_CHOICE: k = next((k for k in s.choices.keys() if key == k), None) if k is None: print(s.name, "=? (key not found)") @@ -215,26 +215,26 @@ def run(receivers, args, _find_receiver, find_device): dev.persister[setting.name] = setting._value -def set(dev, setting, args, save): - if setting.kind == settings.KIND.toggle: +def set(dev, setting: settings.Setting, args, save): + if setting.kind == settings.Kind.TOGGLE: value = select_toggle(args.value_key, setting) args.value_key = value message = f"Setting {setting.name} of {dev.name} to {value}" result = setting.write(value, save=save) - elif setting.kind == settings.KIND.range: + elif setting.kind == settings.Kind.RANGE: value = select_range(args.value_key, setting) args.value_key = value message = f"Setting {setting.name} of {dev.name} to {value}" result = setting.write(value, save=save) - elif setting.kind == settings.KIND.choice: + elif setting.kind == settings.Kind.CHOICE: value = select_choice(args.value_key, setting.choices, setting, None) args.value_key = int(value) message = f"Setting {setting.name} of {dev.name} to {value}" result = setting.write(value, save=save) - elif setting.kind == settings.KIND.map_choice: + elif setting.kind == settings.Kind.MAP_CHOICE: if args.extra_subkey is None: _print_setting_keyed(setting, args.value_key) return None, None, None @@ -252,7 +252,7 @@ def set(dev, setting, args, save): message = f"Setting {setting.name} of {dev.name} key {k!r} to {value!r}" result = setting.write_key_value(int(k), value, save=save) - elif setting.kind == settings.KIND.multiple_toggle: + elif setting.kind == settings.Kind.MULTIPLE_TOGGLE: if args.extra_subkey is None: _print_setting_keyed(setting, args.value_key) return None, None, None @@ -271,7 +271,7 @@ def set(dev, setting, args, save): message = f"Setting {setting.name} key {k!r} to {value!r}" result = setting.write_key_value(str(int(k)), value, save=save) - elif setting.kind == settings.KIND.multiple_range: + elif setting.kind == settings.Kind.MULTIPLE_RANGE: if args.extra_subkey is None: raise Exception(f"{setting.name}: setting needs both key and value to set") key = args.value_key diff --git a/lib/solaar/ui/config_panel.py b/lib/solaar/ui/config_panel.py index e8f99d56..b5450b7f 100644 --- a/lib/solaar/ui/config_panel.py +++ b/lib/solaar/ui/config_panel.py @@ -666,21 +666,21 @@ def _create_sbox(s, _device): change.set_sensitive(True) change.connect("clicked", _change_click, sbox) - if s.kind == settings.KIND.toggle: + if s.kind == settings.Kind.TOGGLE: control = ToggleControl(sbox) - elif s.kind == settings.KIND.range: + elif s.kind == settings.Kind.RANGE: control = SliderControl(sbox) - elif s.kind == settings.KIND.choice: + elif s.kind == settings.Kind.CHOICE: control = _create_choice_control(sbox) - elif s.kind == settings.KIND.map_choice: + elif s.kind == settings.Kind.MAP_CHOICE: control = MapChoiceControl(sbox) - elif s.kind == settings.KIND.multiple_toggle: + elif s.kind == settings.Kind.MULTIPLE_TOGGLE: control = MultipleToggleControl(sbox, change) - elif s.kind == settings.KIND.multiple_range: + elif s.kind == settings.Kind.MULTIPLE_RANGE: control = MultipleRangeControl(sbox, change) - elif s.kind == settings.KIND.packed_range: + elif s.kind == settings.Kind.PACKED_RANGE: control = PackedRangeControl(sbox, change) - elif s.kind == settings.KIND.hetero: + elif s.kind == settings.Kind.HETERO: control = HeteroKeyControl(sbox, change) else: if logger.isEnabledFor(logging.WARNING): diff --git a/lib/solaar/ui/diversion_rules.py b/lib/solaar/ui/diversion_rules.py index f502d7ea..669b182d 100644 --- a/lib/solaar/ui/diversion_rules.py +++ b/lib/solaar/ui/diversion_rules.py @@ -39,7 +39,7 @@ from logitech_receiver import diversion as _DIV from logitech_receiver.common import NamedInt from logitech_receiver.common import NamedInts from logitech_receiver.common import UnsortedNamedInts -from logitech_receiver.settings import KIND as _SKIND +from logitech_receiver.settings import Kind from logitech_receiver.settings import Setting from logitech_receiver.settings_templates import SETTINGS @@ -1455,7 +1455,7 @@ class HostUI(ConditionUI): class _SettingWithValueUI: ALL_SETTINGS = _all_settings() - MULTIPLE = [_SKIND.multiple_toggle, _SKIND.map_choice, _SKIND.multiple_range] + MULTIPLE = [Kind.MULTIPLE_TOGGLE, Kind.MAP_CHOICE, Kind.MULTIPLE_RANGE] ACCEPT_TOGGLE = True label_text = "" @@ -1569,7 +1569,7 @@ class _SettingWithValueUI: if kind in cls.MULTIPLE: keys = UnsortedNamedInts() for s in settings: - universe = getattr(s, "keys_universe" if kind == _SKIND.map_choice else "choices_universe", None) + universe = getattr(s, "keys_universe" if kind == Kind.MAP_CHOICE else "choices_universe", None) if universe: keys |= universe # only one key per number is used @@ -1641,12 +1641,12 @@ class _SettingWithValueUI: supported_keys = None if device_setting: val = device_setting._validator - if device_setting.kind == _SKIND.multiple_toggle: + if device_setting.kind == Kind.MULTIPLE_TOGGLE: supported_keys = val.get_options() or None - elif device_setting.kind == _SKIND.map_choice: + elif device_setting.kind == Kind.MAP_CHOICE: choices = val.choices or None supported_keys = choices.keys() if choices else None - elif device_setting.kind == _SKIND.multiple_range: + elif device_setting.kind == Kind.MULTIPLE_RANGE: supported_keys = val.keys self.key_field.show_only(supported_keys, include_new=True) self._update_validation() @@ -1655,24 +1655,24 @@ class _SettingWithValueUI: setting, val_class, kind, keys = self._setting_attributes(setting_name, device) ds = device.settings if device else {} device_setting = ds.get(setting_name, None) - if kind in (_SKIND.toggle, _SKIND.multiple_toggle): + if kind in (Kind.TOGGLE, Kind.MULTIPLE_TOGGLE): self.value_field.make_toggle() - elif kind in (_SKIND.choice, _SKIND.map_choice): + elif kind in (Kind.CHOICE, Kind.MAP_CHOICE): all_values, extra = self._all_choices(device_setting or setting_name) self.value_field.make_choice(all_values, extra) supported_values = None if device_setting: val = device_setting._validator choices = getattr(val, "choices", None) or None - if kind == _SKIND.choice: + if kind == Kind.CHOICE: supported_values = choices - elif kind == _SKIND.map_choice and isinstance(choices, dict): + elif kind == Kind.MAP_CHOICE and isinstance(choices, dict): supported_values = choices.get(key, None) or None self.value_field.choice_widget.show_only(supported_values, include_new=True) self._update_validation() - elif kind == _SKIND.range: + elif kind == Kind.RANGE: self.value_field.make_range(val_class.min_value, val_class.max_value) - elif kind == _SKIND.multiple_range: + elif kind == Kind.MULTIPLE_RANGE: self.value_field.make_range_with_key( getattr(setting, "sub_items_universe", {}).get(key, {}) if setting else {}, getattr(setting, "_labels_sub", None) if setting else None, @@ -1703,7 +1703,7 @@ class _SettingWithValueUI: key = self.key_field.get_value(invalid_as_str=False, accept_hidden=False) icon = "dialog-warning" if key is None else "" self.key_field.get_child().set_icon_from_icon_name(Gtk.EntryIconPosition.SECONDARY, icon) - if kind in (_SKIND.choice, _SKIND.map_choice): + if kind in (Kind.CHOICE, Kind.MAP_CHOICE): value = self.value_field.choice_widget.get_value(invalid_as_str=False, accept_hidden=False) icon = "dialog-warning" if value is None else "" self.value_field.choice_widget.get_child().set_icon_from_icon_name(Gtk.EntryIconPosition.SECONDARY, icon) @@ -1758,26 +1758,26 @@ class _SettingWithValueUI: key_label = getattr(setting, "_labels", {}).get(key, [None])[0] if setting else None disp.append(key_label or key) value = next(a, None) - if setting and (kind in (_SKIND.choice, _SKIND.map_choice)): + if setting and (kind in (Kind.CHOICE, Kind.MAP_CHOICE)): all_values = cls._all_choices(setting or setting_name)[0] supported_values = None if device_setting: val = device_setting._validator choices = getattr(val, "choices", None) or None - if kind == _SKIND.choice: + if kind == Kind.CHOICE: supported_values = choices - elif kind == _SKIND.map_choice and isinstance(choices, dict): + elif kind == Kind.MAP_CHOICE and isinstance(choices, dict): supported_values = choices.get(key, None) or None if supported_values and isinstance(supported_values, NamedInts): value = supported_values[value] if not supported_values and all_values and isinstance(all_values, NamedInts): value = all_values[value] disp.append(value) - elif kind == _SKIND.multiple_range and isinstance(value, dict) and len(value) == 1: + elif kind == Kind.MULTIPLE_RANGE and isinstance(value, dict) and len(value) == 1: k, v = next(iter(value.items())) k = (getattr(setting, "_labels_sub", {}).get(k, (None,))[0] if setting else None) or k disp.append(f"{k}={v}") - elif kind in (_SKIND.toggle, _SKIND.multiple_toggle): + elif kind in (Kind.TOGGLE, Kind.MULTIPLE_TOGGLE): disp.append(_(str(value))) else: disp.append(value) diff --git a/tests/logitech_receiver/test_settings.py b/tests/logitech_receiver/test_settings_validator.py similarity index 82% rename from tests/logitech_receiver/test_settings.py rename to tests/logitech_receiver/test_settings_validator.py index 5d50a3a4..383fae22 100644 --- a/tests/logitech_receiver/test_settings.py +++ b/tests/logitech_receiver/test_settings_validator.py @@ -1,6 +1,6 @@ import pytest -from logitech_receiver import settings +from logitech_receiver import settings_validator @pytest.mark.parametrize( @@ -20,6 +20,6 @@ from logitech_receiver import settings ], ) def test_bool_or_toggle(current, new, expected): - result = settings.bool_or_toggle(current=current, new=new) + result = settings_validator.bool_or_toggle(current=current, new=new) assert result == expected