diff --git a/lib/logitech_receiver/settings.py b/lib/logitech_receiver/settings.py index 278ad0b6..9fe8ae8d 100644 --- a/lib/logitech_receiver/settings.py +++ b/lib/logitech_receiver/settings.py @@ -64,153 +64,6 @@ def bool_or_toggle(current, new): return None -# moved first for dependency reasons -class Validator: - @classmethod - def build(cls, setting_class, device, **kwargs): - return cls(**kwargs) - - @classmethod - def to_string(cls, value): - return str(value) - - def compare(self, args, current): - if len(args) != 1: - return False - return args[0] == current - - -class BooleanValidator(Validator): - __slots__ = ("true_value", "false_value", "read_skip_byte_count", "write_prefix_bytes", "mask", "needs_current_value") - - kind = KIND.toggle - default_true = 0x01 - default_false = 0x00 - # mask specifies all the affected bits in the value - default_mask = 0xFF - - def __init__( - self, - true_value=default_true, - false_value=default_false, - mask=default_mask, - read_skip_byte_count=0, - write_prefix_bytes=b"", - ): - if isinstance(true_value, int): - assert isinstance(false_value, int) - if mask is None: - mask = self.default_mask - else: - assert isinstance(mask, int) - assert true_value & false_value == 0 - assert true_value & mask == true_value - assert false_value & mask == false_value - self.needs_current_value = mask != self.default_mask - elif isinstance(true_value, bytes): - if false_value is None or false_value == self.default_false: - false_value = b"\x00" * len(true_value) - else: - assert isinstance(false_value, bytes) - if mask is None or mask == self.default_mask: - mask = b"\xFF" * len(true_value) - else: - assert isinstance(mask, bytes) - assert len(mask) == len(true_value) == len(false_value) - tv = _bytes2int(true_value) - fv = _bytes2int(false_value) - mv = _bytes2int(mask) - assert tv != fv # true and false might be something other than bit values - assert tv & mv == tv - assert fv & mv == fv - self.needs_current_value = any(m != 0xFF for m in mask) - else: - raise Exception("invalid mask '%r', type %s" % (mask, type(mask))) - - self.true_value = true_value - self.false_value = false_value - self.mask = mask - self.read_skip_byte_count = read_skip_byte_count - self.write_prefix_bytes = write_prefix_bytes - - def validate_read(self, reply_bytes): - reply_bytes = reply_bytes[self.read_skip_byte_count :] - if isinstance(self.mask, int): - reply_value = ord(reply_bytes[:1]) & self.mask - if logger.isEnabledFor(logging.DEBUG): - logger.debug("BooleanValidator: validate read %r => %02X", reply_bytes, reply_value) - if reply_value == self.true_value: - return True - if reply_value == self.false_value: - return False - logger.warning( - "BooleanValidator: reply %02X mismatched %02X/%02X/%02X", - reply_value, - self.true_value, - self.false_value, - self.mask, - ) - return False - - count = len(self.mask) - mask = _bytes2int(self.mask) - reply_value = _bytes2int(reply_bytes[:count]) & mask - - true_value = _bytes2int(self.true_value) - if reply_value == true_value: - return True - - false_value = _bytes2int(self.false_value) - if reply_value == false_value: - return False - - logger.warning( - "BooleanValidator: reply %r mismatched %r/%r/%r", reply_bytes, self.true_value, self.false_value, self.mask - ) - return False - - def prepare_write(self, new_value, current_value=None): - if new_value is None: - new_value = False - else: - assert isinstance(new_value, bool), "New value %s for boolean setting is not a boolean" % new_value - - to_write = self.true_value if new_value else self.false_value - - if isinstance(self.mask, int): - if current_value is not None and self.needs_current_value: - to_write |= ord(current_value[:1]) & (0xFF ^ self.mask) - if current_value is not None and to_write == ord(current_value[:1]): - return None - to_write = bytes([to_write]) - else: - to_write = bytearray(to_write) - count = len(self.mask) - for i in range(0, count): - b = ord(to_write[i : i + 1]) - m = ord(self.mask[i : i + 1]) - assert b & m == b - # b &= m - if current_value is not None and self.needs_current_value: - b |= ord(current_value[i : i + 1]) & (0xFF ^ m) - to_write[i] = b - to_write = bytes(to_write) - - if current_value is not None and to_write == current_value[: len(to_write)]: - return None - - if logger.isEnabledFor(logging.DEBUG): - logger.debug("BooleanValidator: prepare_write(%s, %s) => %r", new_value, current_value, to_write) - - return self.write_prefix_bytes + to_write - - def acceptable(self, args, current): - if len(args) != 1: - return None - val = bool_or_toggle(current, args[0]) - return [val] if val is not None else None - - class Setting: """A setting descriptor. Needs to be instantiated for each specific device.""" @@ -219,7 +72,7 @@ class Setting: min_version = 0 persist = True rw_options = {} - validator_class = BooleanValidator + validator_class = None validator_options = {} def __init__(self, device, rw, validator): @@ -836,6 +689,152 @@ class FeatureRWMap(FeatureRW): return reply if not self.no_reply else True +class Validator: + @classmethod + def build(cls, setting_class, device, **kwargs): + return cls(**kwargs) + + @classmethod + def to_string(cls, value): + return str(value) + + def compare(self, args, current): + if len(args) != 1: + return False + return args[0] == current + + +class BooleanValidator(Validator): + __slots__ = ("true_value", "false_value", "read_skip_byte_count", "write_prefix_bytes", "mask", "needs_current_value") + + kind = KIND.toggle + default_true = 0x01 + default_false = 0x00 + # mask specifies all the affected bits in the value + default_mask = 0xFF + + def __init__( + self, + true_value=default_true, + false_value=default_false, + mask=default_mask, + read_skip_byte_count=0, + write_prefix_bytes=b"", + ): + if isinstance(true_value, int): + assert isinstance(false_value, int) + if mask is None: + mask = self.default_mask + else: + assert isinstance(mask, int) + assert true_value & false_value == 0 + assert true_value & mask == true_value + assert false_value & mask == false_value + self.needs_current_value = mask != self.default_mask + elif isinstance(true_value, bytes): + if false_value is None or false_value == self.default_false: + false_value = b"\x00" * len(true_value) + else: + assert isinstance(false_value, bytes) + if mask is None or mask == self.default_mask: + mask = b"\xFF" * len(true_value) + else: + assert isinstance(mask, bytes) + assert len(mask) == len(true_value) == len(false_value) + tv = _bytes2int(true_value) + fv = _bytes2int(false_value) + mv = _bytes2int(mask) + assert tv != fv # true and false might be something other than bit values + assert tv & mv == tv + assert fv & mv == fv + self.needs_current_value = any(m != 0xFF for m in mask) + else: + raise Exception("invalid mask '%r', type %s" % (mask, type(mask))) + + self.true_value = true_value + self.false_value = false_value + self.mask = mask + self.read_skip_byte_count = read_skip_byte_count + self.write_prefix_bytes = write_prefix_bytes + + def validate_read(self, reply_bytes): + reply_bytes = reply_bytes[self.read_skip_byte_count :] + if isinstance(self.mask, int): + reply_value = ord(reply_bytes[:1]) & self.mask + if logger.isEnabledFor(logging.DEBUG): + logger.debug("BooleanValidator: validate read %r => %02X", reply_bytes, reply_value) + if reply_value == self.true_value: + return True + if reply_value == self.false_value: + return False + logger.warning( + "BooleanValidator: reply %02X mismatched %02X/%02X/%02X", + reply_value, + self.true_value, + self.false_value, + self.mask, + ) + return False + + count = len(self.mask) + mask = _bytes2int(self.mask) + reply_value = _bytes2int(reply_bytes[:count]) & mask + + true_value = _bytes2int(self.true_value) + if reply_value == true_value: + return True + + false_value = _bytes2int(self.false_value) + if reply_value == false_value: + return False + + logger.warning( + "BooleanValidator: reply %r mismatched %r/%r/%r", reply_bytes, self.true_value, self.false_value, self.mask + ) + return False + + def prepare_write(self, new_value, current_value=None): + if new_value is None: + new_value = False + else: + assert isinstance(new_value, bool), "New value %s for boolean setting is not a boolean" % new_value + + to_write = self.true_value if new_value else self.false_value + + if isinstance(self.mask, int): + if current_value is not None and self.needs_current_value: + to_write |= ord(current_value[:1]) & (0xFF ^ self.mask) + if current_value is not None and to_write == ord(current_value[:1]): + return None + to_write = bytes([to_write]) + else: + to_write = bytearray(to_write) + count = len(self.mask) + for i in range(0, count): + b = ord(to_write[i : i + 1]) + m = ord(self.mask[i : i + 1]) + assert b & m == b + # b &= m + if current_value is not None and self.needs_current_value: + b |= ord(current_value[i : i + 1]) & (0xFF ^ m) + to_write[i] = b + to_write = bytes(to_write) + + if current_value is not None and to_write == current_value[: len(to_write)]: + return None + + if logger.isEnabledFor(logging.DEBUG): + logger.debug("BooleanValidator: prepare_write(%s, %s) => %r", new_value, current_value, to_write) + + return self.write_prefix_bytes + to_write + + def acceptable(self, args, current): + if len(args) != 1: + return None + val = bool_or_toggle(current, args[0]) + return [val] if val is not None else None + + class BitFieldValidator(Validator): __slots__ = ("byte_count", "options") @@ -1569,3 +1568,6 @@ def apply_all_settings(device): ignore = sensitives.get(s.name, False) if ignore != SENSITIVITY_IGNORE: s.apply() + + +Setting.validator_class = BooleanValidator