745 lines
		
	
	
		
			28 KiB
		
	
	
	
		
			Python
		
	
	
	
			
		
		
	
	
			745 lines
		
	
	
		
			28 KiB
		
	
	
	
		
			Python
		
	
	
	
| from __future__ import annotations
 | |
| 
 | |
| import logging
 | |
| import math
 | |
| 
 | |
| from enum import IntEnum
 | |
| 
 | |
| from logitech_receiver import common
 | |
| from logitech_receiver.common import NamedInt
 | |
| from logitech_receiver.common import NamedInts
 | |
| 
 | |
| logger = logging.getLogger(__name__)
 | |
| 
 | |
| 
 | |
| def bool_or_toggle(current: bool | str, new: bool | str) -> bool:
 | |
|     if isinstance(new, bool):
 | |
|         return new
 | |
| 
 | |
|     try:
 | |
|         return bool(int(new))
 | |
|     except (TypeError, ValueError):
 | |
|         new = str(new).lower()
 | |
| 
 | |
|     if new in ("true", "yes", "on", "t", "y"):
 | |
|         return True
 | |
|     if new in ("false", "no", "off", "f", "n"):
 | |
|         return False
 | |
|     if new in ("~", "toggle"):
 | |
|         return not current
 | |
|     return None
 | |
| 
 | |
| 
 | |
| class Kind(IntEnum):
 | |
|     TOGGLE = 0x01
 | |
|     CHOICE = 0x02
 | |
|     RANGE = 0x04
 | |
|     MAP_CHOICE = 0x0A
 | |
|     MULTIPLE_TOGGLE = 0x10
 | |
|     PACKED_RANGE = 0x20
 | |
|     MULTIPLE_RANGE = 0x40
 | |
|     HETERO = 0x80
 | |
| 
 | |
| 
 | |
| class Validator:
 | |
|     @classmethod
 | |
|     def build(cls, setting_class, device, **kwargs) -> Validator:
 | |
|         return cls(**kwargs)
 | |
| 
 | |
|     @classmethod
 | |
|     def to_string(cls, value) -> str:
 | |
|         return str(value)
 | |
| 
 | |
|     def compare(self, args, current):
 | |
|         if len(args) != 1:
 | |
|             return False
 | |
|         return args[0] == current
 | |
| 
 | |
| 
 | |
| class BooleanValidator(Validator):
 | |
|     __slots__ = ("true_value", "false_value", "read_skip_byte_count", "write_prefix_bytes", "mask", "needs_current_value")
 | |
| 
 | |
|     kind = Kind.TOGGLE
 | |
|     default_true = 0x01
 | |
|     default_false = 0x00
 | |
|     # mask specifies all the affected bits in the value
 | |
|     default_mask = 0xFF
 | |
| 
 | |
|     def __init__(
 | |
|         self,
 | |
|         true_value=default_true,
 | |
|         false_value=default_false,
 | |
|         mask=default_mask,
 | |
|         read_skip_byte_count=0,
 | |
|         write_prefix_bytes=b"",
 | |
|     ):
 | |
|         if isinstance(true_value, int):
 | |
|             assert isinstance(false_value, int)
 | |
|             if mask is None:
 | |
|                 mask = self.default_mask
 | |
|             else:
 | |
|                 assert isinstance(mask, int)
 | |
|             assert true_value & false_value == 0
 | |
|             assert true_value & mask == true_value
 | |
|             assert false_value & mask == false_value
 | |
|             self.needs_current_value = mask != self.default_mask
 | |
|         elif isinstance(true_value, bytes):
 | |
|             if false_value is None or false_value == self.default_false:
 | |
|                 false_value = b"\x00" * len(true_value)
 | |
|             else:
 | |
|                 assert isinstance(false_value, bytes)
 | |
|             if mask is None or mask == self.default_mask:
 | |
|                 mask = b"\xff" * len(true_value)
 | |
|             else:
 | |
|                 assert isinstance(mask, bytes)
 | |
|             assert len(mask) == len(true_value) == len(false_value)
 | |
|             tv = common.bytes2int(true_value)
 | |
|             fv = common.bytes2int(false_value)
 | |
|             mv = common.bytes2int(mask)
 | |
|             assert tv != fv  # true and false might be something other than bit values
 | |
|             assert tv & mv == tv
 | |
|             assert fv & mv == fv
 | |
|             self.needs_current_value = any(m != 0xFF for m in mask)
 | |
|         else:
 | |
|             raise Exception(f"invalid mask '{mask!r}', type {type(mask)}")
 | |
| 
 | |
|         self.true_value = true_value
 | |
|         self.false_value = false_value
 | |
|         self.mask = mask
 | |
|         self.read_skip_byte_count = read_skip_byte_count
 | |
|         self.write_prefix_bytes = write_prefix_bytes
 | |
| 
 | |
|     def validate_read(self, reply_bytes):
 | |
|         reply_bytes = reply_bytes[self.read_skip_byte_count :]
 | |
|         if isinstance(self.mask, int):
 | |
|             reply_value = ord(reply_bytes[:1]) & self.mask
 | |
|             if logger.isEnabledFor(logging.DEBUG):
 | |
|                 logger.debug("BooleanValidator: validate read %r => %02X", reply_bytes, reply_value)
 | |
|             if reply_value == self.true_value:
 | |
|                 return True
 | |
|             if reply_value == self.false_value:
 | |
|                 return False
 | |
|             logger.warning(
 | |
|                 "BooleanValidator: reply %02X mismatched %02X/%02X/%02X",
 | |
|                 reply_value,
 | |
|                 self.true_value,
 | |
|                 self.false_value,
 | |
|                 self.mask,
 | |
|             )
 | |
|             return False
 | |
| 
 | |
|         count = len(self.mask)
 | |
|         mask = common.bytes2int(self.mask)
 | |
|         reply_value = common.bytes2int(reply_bytes[:count]) & mask
 | |
| 
 | |
|         true_value = common.bytes2int(self.true_value)
 | |
|         if reply_value == true_value:
 | |
|             return True
 | |
| 
 | |
|         false_value = common.bytes2int(self.false_value)
 | |
|         if reply_value == false_value:
 | |
|             return False
 | |
| 
 | |
|         logger.warning(
 | |
|             "BooleanValidator: reply %r mismatched %r/%r/%r", reply_bytes, self.true_value, self.false_value, self.mask
 | |
|         )
 | |
|         return False
 | |
| 
 | |
|     def prepare_write(self, new_value, current_value=None):
 | |
|         if new_value is None:
 | |
|             new_value = False
 | |
|         else:
 | |
|             assert isinstance(new_value, bool), f"New value {new_value} for boolean setting is not a boolean"
 | |
| 
 | |
|         to_write = self.true_value if new_value else self.false_value
 | |
| 
 | |
|         if isinstance(self.mask, int):
 | |
|             if current_value is not None and self.needs_current_value:
 | |
|                 to_write |= ord(current_value[:1]) & (0xFF ^ self.mask)
 | |
|             if current_value is not None and to_write == ord(current_value[:1]):
 | |
|                 return None
 | |
|             to_write = bytes([to_write])
 | |
|         else:
 | |
|             to_write = bytearray(to_write)
 | |
|             count = len(self.mask)
 | |
|             for i in range(0, count):
 | |
|                 b = ord(to_write[i : i + 1])
 | |
|                 m = ord(self.mask[i : i + 1])
 | |
|                 assert b & m == b
 | |
|                 # b &= m
 | |
|                 if current_value is not None and self.needs_current_value:
 | |
|                     b |= ord(current_value[i : i + 1]) & (0xFF ^ m)
 | |
|                 to_write[i] = b
 | |
|             to_write = bytes(to_write)
 | |
| 
 | |
|             if current_value is not None and to_write == current_value[: len(to_write)]:
 | |
|                 return None
 | |
| 
 | |
|         if logger.isEnabledFor(logging.DEBUG):
 | |
|             logger.debug("BooleanValidator: prepare_write(%s, %s) => %r", new_value, current_value, to_write)
 | |
| 
 | |
|         return self.write_prefix_bytes + to_write
 | |
| 
 | |
|     def acceptable(self, args, current):
 | |
|         if len(args) != 1:
 | |
|             return None
 | |
|         val = bool_or_toggle(current, args[0])
 | |
|         return [val] if val is not None else None
 | |
| 
 | |
| 
 | |
| class BitFieldValidator(Validator):
 | |
|     __slots__ = ("byte_count", "options")
 | |
| 
 | |
|     kind = Kind.MULTIPLE_TOGGLE
 | |
| 
 | |
|     def __init__(self, options, byte_count=None):
 | |
|         assert isinstance(options, list)
 | |
|         self.options = options
 | |
|         self.byte_count = (max(x.bit_length() for x in options) + 7) // 8
 | |
|         if byte_count:
 | |
|             assert isinstance(byte_count, int) and byte_count >= self.byte_count
 | |
|             self.byte_count = byte_count
 | |
| 
 | |
|     def to_string(self, value) -> str:
 | |
|         def element_to_string(key, val):
 | |
|             k = next((k for k in self.options if int(key) == k), None)
 | |
|             return str(k) + ":" + str(val) if k is not None else "?"
 | |
| 
 | |
|         return "{" + ", ".join([element_to_string(k, value[k]) for k in value]) + "}"
 | |
| 
 | |
|     def validate_read(self, reply_bytes):
 | |
|         r = common.bytes2int(reply_bytes[: self.byte_count])
 | |
|         value = {int(k): False for k in self.options}
 | |
|         m = 1
 | |
|         for _ignore in range(8 * self.byte_count):
 | |
|             if m in self.options:
 | |
|                 value[int(m)] = bool(r & m)
 | |
|             m <<= 1
 | |
|         return value
 | |
| 
 | |
|     def prepare_write(self, new_value):
 | |
|         assert isinstance(new_value, dict)
 | |
|         w = 0
 | |
|         for k, v in new_value.items():
 | |
|             if v:
 | |
|                 w |= int(k)
 | |
|         return common.int2bytes(w, self.byte_count)
 | |
| 
 | |
|     def get_options(self):
 | |
|         return self.options
 | |
| 
 | |
|     def acceptable(self, args, current):
 | |
|         if len(args) != 2:
 | |
|             return None
 | |
|         key = next((key for key in self.options if key == args[0]), None)
 | |
|         if key is None:
 | |
|             return None
 | |
|         val = bool_or_toggle(current[int(key)], args[1])
 | |
|         return None if val is None else [int(key), val]
 | |
| 
 | |
|     def compare(self, args, current):
 | |
|         if len(args) != 2:
 | |
|             return False
 | |
|         key = next((key for key in self.options if key == args[0]), None)
 | |
|         if key is None:
 | |
|             return False
 | |
|         return args[1] == current[int(key)]
 | |
| 
 | |
| 
 | |
| class BitFieldWithOffsetAndMaskValidator(Validator):
 | |
|     __slots__ = ("byte_count", "options", "_option_from_key", "_mask_from_offset", "_option_from_offset_mask")
 | |
| 
 | |
|     kind = Kind.MULTIPLE_TOGGLE
 | |
|     sep = 0x01
 | |
| 
 | |
|     def __init__(self, options, om_method=None, byte_count=None):
 | |
|         assert isinstance(options, list)
 | |
|         # each element of options is an instance of a class
 | |
|         # that has an id (which is used as an index in other dictionaries)
 | |
|         # and where om_method is a method that returns a byte offset and byte mask
 | |
|         # that says how to access and modify the bit toggle for the option
 | |
|         self.options = options
 | |
|         self.om_method = om_method
 | |
|         # to retrieve the options efficiently:
 | |
|         self._option_from_key = {}
 | |
|         self._mask_from_offset = {}
 | |
|         self._option_from_offset_mask = {}
 | |
|         for opt in options:
 | |
|             offset, mask = om_method(opt)
 | |
|             self._option_from_key[int(opt)] = opt
 | |
|             try:
 | |
|                 self._mask_from_offset[offset] |= mask
 | |
|             except KeyError:
 | |
|                 self._mask_from_offset[offset] = mask
 | |
|             try:
 | |
|                 mask_to_opt = self._option_from_offset_mask[offset]
 | |
|             except KeyError:
 | |
|                 mask_to_opt = {}
 | |
|                 self._option_from_offset_mask[offset] = mask_to_opt
 | |
|             mask_to_opt[mask] = opt
 | |
|         self.byte_count = (max(om_method(x)[1].bit_length() for x in options) + 7) // 8  # is this correct??
 | |
|         if byte_count:
 | |
|             assert isinstance(byte_count, int) and byte_count >= self.byte_count
 | |
|             self.byte_count = byte_count
 | |
| 
 | |
|     def prepare_read(self):
 | |
|         r = []
 | |
|         for offset, mask in self._mask_from_offset.items():
 | |
|             b = offset << (8 * (self.byte_count + 1))
 | |
|             b |= (self.sep << (8 * self.byte_count)) | mask
 | |
|             r.append(common.int2bytes(b, self.byte_count + 2))
 | |
|         return r
 | |
| 
 | |
|     def prepare_read_key(self, key):
 | |
|         option = self._option_from_key.get(key, None)
 | |
|         if option is None:
 | |
|             return None
 | |
|         offset, mask = option.om_method(option)
 | |
|         b = offset << (8 * (self.byte_count + 1))
 | |
|         b |= (self.sep << (8 * self.byte_count)) | mask
 | |
|         return common.int2bytes(b, self.byte_count + 2)
 | |
| 
 | |
|     def validate_read(self, reply_bytes_dict):
 | |
|         values = {int(k): False for k in self.options}
 | |
|         for query, b in reply_bytes_dict.items():
 | |
|             offset = common.bytes2int(query[0:1])
 | |
|             b += (self.byte_count - len(b)) * b"\x00"
 | |
|             value = common.bytes2int(b[: self.byte_count])
 | |
|             mask_to_opt = self._option_from_offset_mask.get(offset, {})
 | |
|             m = 1
 | |
|             for _ignore in range(8 * self.byte_count):
 | |
|                 if m in mask_to_opt:
 | |
|                     values[int(mask_to_opt[m])] = bool(value & m)
 | |
|                 m <<= 1
 | |
|         return values
 | |
| 
 | |
|     def prepare_write(self, new_value):
 | |
|         assert isinstance(new_value, dict)
 | |
|         w = {}
 | |
|         for k, v in new_value.items():
 | |
|             option = self._option_from_key[int(k)]
 | |
|             offset, mask = self.om_method(option)
 | |
|             if offset not in w:
 | |
|                 w[offset] = 0
 | |
|             if v:
 | |
|                 w[offset] |= mask
 | |
|         return [
 | |
|             common.int2bytes(
 | |
|                 (offset << (8 * (2 * self.byte_count + 1)))
 | |
|                 | (self.sep << (16 * self.byte_count))
 | |
|                 | (self._mask_from_offset[offset] << (8 * self.byte_count))
 | |
|                 | value,
 | |
|                 2 * self.byte_count + 2,
 | |
|             )
 | |
|             for offset, value in w.items()
 | |
|         ]
 | |
| 
 | |
|     def get_options(self):
 | |
|         return [int(opt) if isinstance(opt, int) else opt.as_int() for opt in self.options]
 | |
| 
 | |
|     def acceptable(self, args, current):
 | |
|         if len(args) != 2:
 | |
|             return None
 | |
|         key = next((option.id for option in self.options if option.as_int() == args[0]), None)
 | |
|         if key is None:
 | |
|             return None
 | |
|         val = bool_or_toggle(current[int(key)], args[1])
 | |
|         return None if val is None else [int(key), val]
 | |
| 
 | |
|     def compare(self, args, current):
 | |
|         if len(args) != 2:
 | |
|             return False
 | |
|         key = next((option.id for option in self.options if option.as_int() == args[0]), None)
 | |
|         if key is None:
 | |
|             return False
 | |
|         return args[1] == current[int(key)]
 | |
| 
 | |
| 
 | |
| class ChoicesValidator(Validator):
 | |
|     """Translates between NamedInts and a byte sequence.
 | |
|     :param choices: a list of NamedInts
 | |
|     :param byte_count: the size of the derived byte sequence. If None, it
 | |
|     will be calculated from the choices."""
 | |
| 
 | |
|     kind = Kind.CHOICE
 | |
| 
 | |
|     def __init__(self, choices=None, byte_count=None, read_skip_byte_count=0, write_prefix_bytes=b""):
 | |
|         assert choices is not None
 | |
|         assert isinstance(choices, NamedInts)
 | |
|         assert len(choices) > 1
 | |
|         self.choices = choices
 | |
|         self.needs_current_value = False
 | |
| 
 | |
|         max_bits = max(x.bit_length() for x in choices)
 | |
|         self._byte_count = (max_bits // 8) + (1 if max_bits % 8 else 0)
 | |
|         if byte_count:
 | |
|             assert self._byte_count <= byte_count
 | |
|             self._byte_count = byte_count
 | |
|         assert self._byte_count < 8
 | |
|         self._read_skip_byte_count = read_skip_byte_count
 | |
|         self._write_prefix_bytes = write_prefix_bytes if write_prefix_bytes else b""
 | |
|         assert self._byte_count + self._read_skip_byte_count <= 14
 | |
|         assert self._byte_count + len(self._write_prefix_bytes) <= 14
 | |
| 
 | |
|     def to_string(self, value) -> str:
 | |
|         return str(self.choices[value]) if isinstance(value, int) else str(value)
 | |
| 
 | |
|     def validate_read(self, reply_bytes):
 | |
|         reply_value = common.bytes2int(reply_bytes[self._read_skip_byte_count : self._read_skip_byte_count + self._byte_count])
 | |
|         valid_value = self.choices[reply_value]
 | |
|         assert valid_value is not None, f"{self.__class__.__name__}: failed to validate read value {reply_value:02X}"
 | |
|         return valid_value
 | |
| 
 | |
|     def prepare_write(self, new_value, current_value=None):
 | |
|         if new_value is None:
 | |
|             value = self.choices[:][0]
 | |
|         else:
 | |
|             value = self.choice(new_value)
 | |
|         if value is None:
 | |
|             raise ValueError(f"invalid choice {new_value!r}")
 | |
|         assert isinstance(value, NamedInt)
 | |
|         return self._write_prefix_bytes + value.bytes(self._byte_count)
 | |
| 
 | |
|     def choice(self, value):
 | |
|         if isinstance(value, int):
 | |
|             return self.choices[value]
 | |
|         try:
 | |
|             int(value)
 | |
|             if int(value) in self.choices:
 | |
|                 return self.choices[int(value)]
 | |
|         except Exception:
 | |
|             pass
 | |
|         if value in self.choices:
 | |
|             return self.choices[value]
 | |
|         else:
 | |
|             return None
 | |
| 
 | |
|     def acceptable(self, args, current):
 | |
|         choice = self.choice(args[0]) if len(args) == 1 else None
 | |
|         return None if choice is None else [choice]
 | |
| 
 | |
| 
 | |
| class ChoicesMapValidator(ChoicesValidator):
 | |
|     kind = Kind.MAP_CHOICE
 | |
| 
 | |
|     def __init__(
 | |
|         self,
 | |
|         choices_map,
 | |
|         key_byte_count=0,
 | |
|         key_postfix_bytes=b"",
 | |
|         byte_count=0,
 | |
|         read_skip_byte_count=0,
 | |
|         write_prefix_bytes=b"",
 | |
|         extra_default=None,
 | |
|         mask=-1,
 | |
|         activate=0,
 | |
|     ):
 | |
|         assert choices_map is not None
 | |
|         assert isinstance(choices_map, dict)
 | |
|         max_key_bits = 0
 | |
|         max_value_bits = 0
 | |
|         for key, choices in choices_map.items():
 | |
|             assert isinstance(key, NamedInt)
 | |
|             assert isinstance(choices, NamedInts)
 | |
|             max_key_bits = max(max_key_bits, key.bit_length())
 | |
|             for key_value in choices:
 | |
|                 assert isinstance(key_value, NamedInt)
 | |
|                 max_value_bits = max(max_value_bits, key_value.bit_length())
 | |
|         self._key_byte_count = (max_key_bits + 7) // 8
 | |
|         if key_byte_count:
 | |
|             assert self._key_byte_count <= key_byte_count
 | |
|             self._key_byte_count = key_byte_count
 | |
|         self._byte_count = (max_value_bits + 7) // 8
 | |
|         if byte_count:
 | |
|             assert self._byte_count <= byte_count
 | |
|             self._byte_count = byte_count
 | |
| 
 | |
|         self.choices = choices_map
 | |
|         self.needs_current_value = False
 | |
|         self.extra_default = extra_default
 | |
|         self._key_postfix_bytes = key_postfix_bytes
 | |
|         self._read_skip_byte_count = read_skip_byte_count if read_skip_byte_count else 0
 | |
|         self._write_prefix_bytes = write_prefix_bytes if write_prefix_bytes else b""
 | |
|         self.activate = activate
 | |
|         self.mask = mask
 | |
|         assert self._byte_count + self._read_skip_byte_count + self._key_byte_count <= 14
 | |
|         assert self._byte_count + len(self._write_prefix_bytes) + self._key_byte_count <= 14
 | |
| 
 | |
|     def to_string(self, value) -> str:
 | |
|         def element_to_string(key, val):
 | |
|             k, c = next(((k, c) for k, c in self.choices.items() if int(key) == k), (None, None))
 | |
|             return str(k) + ":" + str(c[val]) if k is not None else "?"
 | |
| 
 | |
|         return "{" + ", ".join([element_to_string(k, value[k]) for k in sorted(value)]) + "}"
 | |
| 
 | |
|     def validate_read(self, reply_bytes, key):
 | |
|         start = self._key_byte_count + self._read_skip_byte_count
 | |
|         end = start + self._byte_count
 | |
|         reply_value = common.bytes2int(reply_bytes[start:end]) & self.mask
 | |
|         # reprogrammable keys starts out as 0, which is not a choice, so don't use assert here
 | |
|         if self.extra_default is not None and self.extra_default == reply_value:
 | |
|             return int(self.choices[key][0])
 | |
|         if reply_value not in self.choices[key]:
 | |
|             assert reply_value in self.choices[key], "%s: failed to validate read value %02X" % (
 | |
|                 self.__class__.__name__,
 | |
|                 reply_value,
 | |
|             )
 | |
|         return reply_value
 | |
| 
 | |
|     def prepare_key(self, key):
 | |
|         return key.to_bytes(self._key_byte_count, "big") + self._key_postfix_bytes
 | |
| 
 | |
|     def prepare_write(self, key, new_value):
 | |
|         choices = self.choices.get(key)
 | |
|         if choices is None or (new_value not in choices and new_value != self.extra_default):
 | |
|             logger.error("invalid choice %r for %s", new_value, key)
 | |
|             return None
 | |
|         new_value = new_value | self.activate
 | |
|         return self._write_prefix_bytes + new_value.to_bytes(self._byte_count, "big")
 | |
| 
 | |
|     def acceptable(self, args, current):
 | |
|         if len(args) != 2:
 | |
|             return None
 | |
|         key, choices = next(((key, item) for key, item in self.choices.items() if key == args[0]), (None, None))
 | |
|         if choices is None or args[1] not in choices:
 | |
|             return None
 | |
|         choice = next((item for item in choices if item == args[1]), None)
 | |
|         return [int(key), int(choice)] if choice is not None else None
 | |
| 
 | |
|     def compare(self, args, current):
 | |
|         if len(args) != 2:
 | |
|             return False
 | |
|         key = next((key for key in self.choices if key == int(args[0])), None)
 | |
|         if key is None:
 | |
|             return False
 | |
|         return args[1] == current[int(key)]
 | |
| 
 | |
| 
 | |
| class RangeValidator(Validator):
 | |
|     kind = Kind.RANGE
 | |
|     """Translates between integers and a byte sequence.
 | |
|     :param min_value: minimum accepted value (inclusive)
 | |
|     :param max_value: maximum accepted value (inclusive)
 | |
|     :param byte_count: the size of the derived byte sequence. If None, it
 | |
|     will be calculated from the range."""
 | |
|     min_value = 0
 | |
|     max_value = 255
 | |
| 
 | |
|     @classmethod
 | |
|     def build(cls, setting_class, device, **kwargs):
 | |
|         kwargs["min_value"] = setting_class.min_value
 | |
|         kwargs["max_value"] = setting_class.max_value
 | |
|         return cls(**kwargs)
 | |
| 
 | |
|     def __init__(self, min_value=0, max_value=255, byte_count=1):
 | |
|         assert max_value > min_value
 | |
|         self.min_value = min_value
 | |
|         self.max_value = max_value
 | |
|         self.needs_current_value = True  # read and check before write (needed for ADC power and probably a good idea anyway)
 | |
| 
 | |
|         self._byte_count = math.ceil(math.log(max_value + 1, 256))
 | |
|         if byte_count:
 | |
|             assert self._byte_count <= byte_count
 | |
|             self._byte_count = byte_count
 | |
|         assert self._byte_count < 8
 | |
| 
 | |
|     def validate_read(self, reply_bytes):
 | |
|         reply_value = common.bytes2int(reply_bytes[: self._byte_count])
 | |
|         assert reply_value >= self.min_value, f"{self.__class__.__name__}: failed to validate read value {reply_value:02X}"
 | |
|         assert reply_value <= self.max_value, f"{self.__class__.__name__}: failed to validate read value {reply_value:02X}"
 | |
|         return reply_value
 | |
| 
 | |
|     def prepare_write(self, new_value, current_value=None):
 | |
|         if new_value < self.min_value or new_value > self.max_value:
 | |
|             raise ValueError(f"invalid choice {new_value!r}")
 | |
|         current_value = self.validate_read(current_value) if current_value is not None else None
 | |
|         to_write = common.int2bytes(new_value, self._byte_count)
 | |
|         # current value is known and same as value to be written return None to signal not to write it
 | |
|         return None if current_value is not None and current_value == new_value else to_write
 | |
| 
 | |
|     def acceptable(self, args, current):
 | |
|         arg = args[0]
 | |
|         #  None if len(args) != 1 or type(arg) != int or arg < self.min_value or arg > self.max_value else args)
 | |
|         return None if len(args) != 1 or isinstance(arg, int) or arg < self.min_value or arg > self.max_value else args
 | |
| 
 | |
|     def compare(self, args, current):
 | |
|         if len(args) == 1:
 | |
|             return args[0] == current
 | |
|         elif len(args) == 2:
 | |
|             return args[0] <= current <= args[1]
 | |
|         else:
 | |
|             return False
 | |
| 
 | |
| 
 | |
| class HeteroValidator(Validator):
 | |
|     kind = Kind.HETERO
 | |
| 
 | |
|     @classmethod
 | |
|     def build(cls, setting_class, device, **kwargs):
 | |
|         return cls(**kwargs)
 | |
| 
 | |
|     def __init__(self, data_class=None, options=None, readable=True):
 | |
|         assert data_class is not None and options is not None
 | |
|         self.data_class = data_class
 | |
|         self.options = options
 | |
|         self.readable = readable
 | |
|         self.needs_current_value = False
 | |
| 
 | |
|     def validate_read(self, reply_bytes):
 | |
|         if self.readable:
 | |
|             reply_value = self.data_class.from_bytes(reply_bytes, options=self.options)
 | |
|             return reply_value
 | |
| 
 | |
|     def prepare_write(self, new_value, current_value=None):
 | |
|         to_write = new_value.to_bytes(options=self.options)
 | |
|         return to_write
 | |
| 
 | |
|     def acceptable(self, args, current):  # should this actually do some checking?
 | |
|         return True
 | |
| 
 | |
| 
 | |
| class PackedRangeValidator(Validator):
 | |
|     kind = Kind.PACKED_RANGE
 | |
|     """Several range values, all the same size, all the same min and max"""
 | |
|     min_value = 0
 | |
|     max_value = 255
 | |
|     count = 1
 | |
|     rsbc = 0
 | |
|     write_prefix_bytes = b""
 | |
| 
 | |
|     def __init__(
 | |
|         self, keys, min_value=0, max_value=255, count=1, byte_count=1, read_skip_byte_count=0, write_prefix_bytes=b""
 | |
|     ):
 | |
|         assert max_value > min_value
 | |
|         self.needs_current_value = True
 | |
|         self.keys = keys
 | |
|         self.min_value = min_value
 | |
|         self.max_value = max_value
 | |
|         self.count = count
 | |
|         self.bc = math.ceil(math.log(max_value + 1 - min(0, min_value), 256))
 | |
|         if byte_count:
 | |
|             assert self.bc <= byte_count
 | |
|             self.bc = byte_count
 | |
|         assert self.bc * self.count
 | |
|         self.rsbc = read_skip_byte_count
 | |
|         self.write_prefix_bytes = write_prefix_bytes
 | |
| 
 | |
|     def validate_read(self, reply_bytes):
 | |
|         rvs = {
 | |
|             n: common.bytes2int(reply_bytes[self.rsbc + n * self.bc : self.rsbc + (n + 1) * self.bc], signed=True)
 | |
|             for n in range(self.count)
 | |
|         }
 | |
|         for n in range(self.count):
 | |
|             assert rvs[n] >= self.min_value, f"{self.__class__.__name__}: failed to validate read value {rvs[n]:02X}"
 | |
|             assert rvs[n] <= self.max_value, f"{self.__class__.__name__}: failed to validate read value {rvs[n]:02X}"
 | |
|         return rvs
 | |
| 
 | |
|     def prepare_write(self, new_values):
 | |
|         if len(new_values) != self.count:
 | |
|             raise ValueError(f"wrong number of values {new_values!r}")
 | |
|         for new_value in new_values.values():
 | |
|             if new_value < self.min_value or new_value > self.max_value:
 | |
|                 raise ValueError(f"invalid value {new_value!r}")
 | |
|         bytes = self.write_prefix_bytes + b"".join(
 | |
|             common.int2bytes(new_values[n], self.bc, signed=True) for n in range(self.count)
 | |
|         )
 | |
|         return bytes
 | |
| 
 | |
|     def acceptable(self, args, current):
 | |
|         if len(args) != 2 or int(args[0]) < 0 or int(args[0]) >= self.count:
 | |
|             return None
 | |
|         return None if not isinstance(args[1], int) or args[1] < self.min_value or args[1] > self.max_value else args
 | |
| 
 | |
|     def compare(self, args, current):
 | |
|         logger.warning("compare not implemented for packed range settings")
 | |
|         return False
 | |
| 
 | |
| 
 | |
| class MultipleRangeValidator(Validator):
 | |
|     kind = Kind.MULTIPLE_RANGE
 | |
| 
 | |
|     def __init__(self, items, sub_items):
 | |
|         assert isinstance(items, list)  # each element must have .index and its __int__ must return its id (not its index)
 | |
|         assert isinstance(sub_items, dict)
 | |
|         # sub_items: items -> class with .minimum, .maximum, .length (in bytes), .id (a string) and .widget (e.g. 'Scale')
 | |
|         self.items = items
 | |
|         self.keys = NamedInts(**{str(item): int(item) for item in items})
 | |
|         self._item_from_id = {int(k): k for k in items}
 | |
|         self.sub_items = sub_items
 | |
| 
 | |
|     def prepare_read_item(self, item):
 | |
|         return common.int2bytes((self._item_from_id[int(item)].index << 1) | 0xFF, 2)
 | |
| 
 | |
|     def validate_read_item(self, reply_bytes, item):
 | |
|         item = self._item_from_id[int(item)]
 | |
|         start = 0
 | |
|         value = {}
 | |
|         for sub_item in self.sub_items[item]:
 | |
|             r = reply_bytes[start : start + sub_item.length]
 | |
|             if len(r) < sub_item.length:
 | |
|                 r += b"\x00" * (sub_item.length - len(value))
 | |
|             v = common.bytes2int(r)
 | |
|             if not (sub_item.minimum < v < sub_item.maximum):
 | |
|                 logger.warning(
 | |
|                     f"{self.__class__.__name__}: failed to validate read value for {item}.{sub_item}: "
 | |
|                     + f"{v} not in [{sub_item.minimum}..{sub_item.maximum}]"
 | |
|                 )
 | |
|             value[str(sub_item)] = v
 | |
|             start += sub_item.length
 | |
|         return value
 | |
| 
 | |
|     def prepare_write(self, value):
 | |
|         seq = []
 | |
|         w = b""
 | |
|         for item in value.keys():
 | |
|             _item = self._item_from_id[int(item)]
 | |
|             b = common.int2bytes(_item.index, 1)
 | |
|             for sub_item in self.sub_items[_item]:
 | |
|                 try:
 | |
|                     v = value[int(item)][str(sub_item)]
 | |
|                 except KeyError:
 | |
|                     return None
 | |
|                 if not (sub_item.minimum <= v <= sub_item.maximum):
 | |
|                     raise ValueError(
 | |
|                         f"invalid choice for {item}.{sub_item}: {v} not in [{sub_item.minimum}..{sub_item.maximum}]"
 | |
|                     )
 | |
|                 b += common.int2bytes(v, sub_item.length)
 | |
|             if len(w) + len(b) > 15:
 | |
|                 seq.append(b + b"\xff")
 | |
|                 w = b""
 | |
|             w += b
 | |
|         seq.append(w + b"\xff")
 | |
|         return seq
 | |
| 
 | |
|     def prepare_write_item(self, item, value):
 | |
|         _item = self._item_from_id[int(item)]
 | |
|         w = common.int2bytes(_item.index, 1)
 | |
|         for sub_item in self.sub_items[_item]:
 | |
|             try:
 | |
|                 v = value[str(sub_item)]
 | |
|             except KeyError:
 | |
|                 return None
 | |
|             if not (sub_item.minimum <= v <= sub_item.maximum):
 | |
|                 raise ValueError(f"invalid choice for {item}.{sub_item}: {v} not in [{sub_item.minimum}..{sub_item.maximum}]")
 | |
|             w += common.int2bytes(v, sub_item.length)
 | |
|         return w + b"\xff"
 | |
| 
 | |
|     def acceptable(self, args, current):
 | |
|         # just one item, with at least one sub-item
 | |
|         if not isinstance(args, list) or len(args) != 2 or not isinstance(args[1], dict):
 | |
|             return None
 | |
|         item = next((p for p in self.items if p.id == args[0] or str(p) == args[0]), None)
 | |
|         if not item:
 | |
|             return None
 | |
|         for sub_key, value in args[1].items():
 | |
|             sub_item = next((it for it in self.sub_items[item] if it.id == sub_key), None)
 | |
|             if not sub_item:
 | |
|                 return None
 | |
|             if not isinstance(value, int) or not (sub_item.minimum <= value <= sub_item.maximum):
 | |
|                 return None
 | |
|         return [int(item), {**args[1]}]
 | |
| 
 | |
|     def compare(self, args, current):
 | |
|         logger.warning("compare not implemented for multiple range settings")
 | |
|         return False
 |