745 lines
28 KiB
Python
745 lines
28 KiB
Python
from __future__ import annotations
|
|
|
|
import logging
|
|
import math
|
|
|
|
from enum import IntEnum
|
|
|
|
from logitech_receiver import common
|
|
from logitech_receiver.common import NamedInt
|
|
from logitech_receiver.common import NamedInts
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
def bool_or_toggle(current: bool | str, new: bool | str) -> bool:
|
|
if isinstance(new, bool):
|
|
return new
|
|
|
|
try:
|
|
return bool(int(new))
|
|
except (TypeError, ValueError):
|
|
new = str(new).lower()
|
|
|
|
if new in ("true", "yes", "on", "t", "y"):
|
|
return True
|
|
if new in ("false", "no", "off", "f", "n"):
|
|
return False
|
|
if new in ("~", "toggle"):
|
|
return not current
|
|
return None
|
|
|
|
|
|
class Kind(IntEnum):
|
|
TOGGLE = 0x01
|
|
CHOICE = 0x02
|
|
RANGE = 0x04
|
|
MAP_CHOICE = 0x0A
|
|
MULTIPLE_TOGGLE = 0x10
|
|
PACKED_RANGE = 0x20
|
|
MULTIPLE_RANGE = 0x40
|
|
HETERO = 0x80
|
|
|
|
|
|
class Validator:
|
|
@classmethod
|
|
def build(cls, setting_class, device, **kwargs) -> Validator:
|
|
return cls(**kwargs)
|
|
|
|
@classmethod
|
|
def to_string(cls, value) -> str:
|
|
return str(value)
|
|
|
|
def compare(self, args, current):
|
|
if len(args) != 1:
|
|
return False
|
|
return args[0] == current
|
|
|
|
|
|
class BooleanValidator(Validator):
|
|
__slots__ = ("true_value", "false_value", "read_skip_byte_count", "write_prefix_bytes", "mask", "needs_current_value")
|
|
|
|
kind = Kind.TOGGLE
|
|
default_true = 0x01
|
|
default_false = 0x00
|
|
# mask specifies all the affected bits in the value
|
|
default_mask = 0xFF
|
|
|
|
def __init__(
|
|
self,
|
|
true_value=default_true,
|
|
false_value=default_false,
|
|
mask=default_mask,
|
|
read_skip_byte_count=0,
|
|
write_prefix_bytes=b"",
|
|
):
|
|
if isinstance(true_value, int):
|
|
assert isinstance(false_value, int)
|
|
if mask is None:
|
|
mask = self.default_mask
|
|
else:
|
|
assert isinstance(mask, int)
|
|
assert true_value & false_value == 0
|
|
assert true_value & mask == true_value
|
|
assert false_value & mask == false_value
|
|
self.needs_current_value = mask != self.default_mask
|
|
elif isinstance(true_value, bytes):
|
|
if false_value is None or false_value == self.default_false:
|
|
false_value = b"\x00" * len(true_value)
|
|
else:
|
|
assert isinstance(false_value, bytes)
|
|
if mask is None or mask == self.default_mask:
|
|
mask = b"\xff" * len(true_value)
|
|
else:
|
|
assert isinstance(mask, bytes)
|
|
assert len(mask) == len(true_value) == len(false_value)
|
|
tv = common.bytes2int(true_value)
|
|
fv = common.bytes2int(false_value)
|
|
mv = common.bytes2int(mask)
|
|
assert tv != fv # true and false might be something other than bit values
|
|
assert tv & mv == tv
|
|
assert fv & mv == fv
|
|
self.needs_current_value = any(m != 0xFF for m in mask)
|
|
else:
|
|
raise Exception(f"invalid mask '{mask!r}', type {type(mask)}")
|
|
|
|
self.true_value = true_value
|
|
self.false_value = false_value
|
|
self.mask = mask
|
|
self.read_skip_byte_count = read_skip_byte_count
|
|
self.write_prefix_bytes = write_prefix_bytes
|
|
|
|
def validate_read(self, reply_bytes):
|
|
reply_bytes = reply_bytes[self.read_skip_byte_count :]
|
|
if isinstance(self.mask, int):
|
|
reply_value = ord(reply_bytes[:1]) & self.mask
|
|
if logger.isEnabledFor(logging.DEBUG):
|
|
logger.debug("BooleanValidator: validate read %r => %02X", reply_bytes, reply_value)
|
|
if reply_value == self.true_value:
|
|
return True
|
|
if reply_value == self.false_value:
|
|
return False
|
|
logger.warning(
|
|
"BooleanValidator: reply %02X mismatched %02X/%02X/%02X",
|
|
reply_value,
|
|
self.true_value,
|
|
self.false_value,
|
|
self.mask,
|
|
)
|
|
return False
|
|
|
|
count = len(self.mask)
|
|
mask = common.bytes2int(self.mask)
|
|
reply_value = common.bytes2int(reply_bytes[:count]) & mask
|
|
|
|
true_value = common.bytes2int(self.true_value)
|
|
if reply_value == true_value:
|
|
return True
|
|
|
|
false_value = common.bytes2int(self.false_value)
|
|
if reply_value == false_value:
|
|
return False
|
|
|
|
logger.warning(
|
|
"BooleanValidator: reply %r mismatched %r/%r/%r", reply_bytes, self.true_value, self.false_value, self.mask
|
|
)
|
|
return False
|
|
|
|
def prepare_write(self, new_value, current_value=None):
|
|
if new_value is None:
|
|
new_value = False
|
|
else:
|
|
assert isinstance(new_value, bool), f"New value {new_value} for boolean setting is not a boolean"
|
|
|
|
to_write = self.true_value if new_value else self.false_value
|
|
|
|
if isinstance(self.mask, int):
|
|
if current_value is not None and self.needs_current_value:
|
|
to_write |= ord(current_value[:1]) & (0xFF ^ self.mask)
|
|
if current_value is not None and to_write == ord(current_value[:1]):
|
|
return None
|
|
to_write = bytes([to_write])
|
|
else:
|
|
to_write = bytearray(to_write)
|
|
count = len(self.mask)
|
|
for i in range(0, count):
|
|
b = ord(to_write[i : i + 1])
|
|
m = ord(self.mask[i : i + 1])
|
|
assert b & m == b
|
|
# b &= m
|
|
if current_value is not None and self.needs_current_value:
|
|
b |= ord(current_value[i : i + 1]) & (0xFF ^ m)
|
|
to_write[i] = b
|
|
to_write = bytes(to_write)
|
|
|
|
if current_value is not None and to_write == current_value[: len(to_write)]:
|
|
return None
|
|
|
|
if logger.isEnabledFor(logging.DEBUG):
|
|
logger.debug("BooleanValidator: prepare_write(%s, %s) => %r", new_value, current_value, to_write)
|
|
|
|
return self.write_prefix_bytes + to_write
|
|
|
|
def acceptable(self, args, current):
|
|
if len(args) != 1:
|
|
return None
|
|
val = bool_or_toggle(current, args[0])
|
|
return [val] if val is not None else None
|
|
|
|
|
|
class BitFieldValidator(Validator):
|
|
__slots__ = ("byte_count", "options")
|
|
|
|
kind = Kind.MULTIPLE_TOGGLE
|
|
|
|
def __init__(self, options, byte_count=None):
|
|
assert isinstance(options, list)
|
|
self.options = options
|
|
self.byte_count = (max(x.bit_length() for x in options) + 7) // 8
|
|
if byte_count:
|
|
assert isinstance(byte_count, int) and byte_count >= self.byte_count
|
|
self.byte_count = byte_count
|
|
|
|
def to_string(self, value) -> str:
|
|
def element_to_string(key, val):
|
|
k = next((k for k in self.options if int(key) == k), None)
|
|
return str(k) + ":" + str(val) if k is not None else "?"
|
|
|
|
return "{" + ", ".join([element_to_string(k, value[k]) for k in value]) + "}"
|
|
|
|
def validate_read(self, reply_bytes):
|
|
r = common.bytes2int(reply_bytes[: self.byte_count])
|
|
value = {int(k): False for k in self.options}
|
|
m = 1
|
|
for _ignore in range(8 * self.byte_count):
|
|
if m in self.options:
|
|
value[int(m)] = bool(r & m)
|
|
m <<= 1
|
|
return value
|
|
|
|
def prepare_write(self, new_value):
|
|
assert isinstance(new_value, dict)
|
|
w = 0
|
|
for k, v in new_value.items():
|
|
if v:
|
|
w |= int(k)
|
|
return common.int2bytes(w, self.byte_count)
|
|
|
|
def get_options(self):
|
|
return self.options
|
|
|
|
def acceptable(self, args, current):
|
|
if len(args) != 2:
|
|
return None
|
|
key = next((key for key in self.options if key == args[0]), None)
|
|
if key is None:
|
|
return None
|
|
val = bool_or_toggle(current[int(key)], args[1])
|
|
return None if val is None else [int(key), val]
|
|
|
|
def compare(self, args, current):
|
|
if len(args) != 2:
|
|
return False
|
|
key = next((key for key in self.options if key == args[0]), None)
|
|
if key is None:
|
|
return False
|
|
return args[1] == current[int(key)]
|
|
|
|
|
|
class BitFieldWithOffsetAndMaskValidator(Validator):
|
|
__slots__ = ("byte_count", "options", "_option_from_key", "_mask_from_offset", "_option_from_offset_mask")
|
|
|
|
kind = Kind.MULTIPLE_TOGGLE
|
|
sep = 0x01
|
|
|
|
def __init__(self, options, om_method=None, byte_count=None):
|
|
assert isinstance(options, list)
|
|
# each element of options is an instance of a class
|
|
# that has an id (which is used as an index in other dictionaries)
|
|
# and where om_method is a method that returns a byte offset and byte mask
|
|
# that says how to access and modify the bit toggle for the option
|
|
self.options = options
|
|
self.om_method = om_method
|
|
# to retrieve the options efficiently:
|
|
self._option_from_key = {}
|
|
self._mask_from_offset = {}
|
|
self._option_from_offset_mask = {}
|
|
for opt in options:
|
|
offset, mask = om_method(opt)
|
|
self._option_from_key[int(opt)] = opt
|
|
try:
|
|
self._mask_from_offset[offset] |= mask
|
|
except KeyError:
|
|
self._mask_from_offset[offset] = mask
|
|
try:
|
|
mask_to_opt = self._option_from_offset_mask[offset]
|
|
except KeyError:
|
|
mask_to_opt = {}
|
|
self._option_from_offset_mask[offset] = mask_to_opt
|
|
mask_to_opt[mask] = opt
|
|
self.byte_count = (max(om_method(x)[1].bit_length() for x in options) + 7) // 8 # is this correct??
|
|
if byte_count:
|
|
assert isinstance(byte_count, int) and byte_count >= self.byte_count
|
|
self.byte_count = byte_count
|
|
|
|
def prepare_read(self):
|
|
r = []
|
|
for offset, mask in self._mask_from_offset.items():
|
|
b = offset << (8 * (self.byte_count + 1))
|
|
b |= (self.sep << (8 * self.byte_count)) | mask
|
|
r.append(common.int2bytes(b, self.byte_count + 2))
|
|
return r
|
|
|
|
def prepare_read_key(self, key):
|
|
option = self._option_from_key.get(key, None)
|
|
if option is None:
|
|
return None
|
|
offset, mask = option.om_method(option)
|
|
b = offset << (8 * (self.byte_count + 1))
|
|
b |= (self.sep << (8 * self.byte_count)) | mask
|
|
return common.int2bytes(b, self.byte_count + 2)
|
|
|
|
def validate_read(self, reply_bytes_dict):
|
|
values = {int(k): False for k in self.options}
|
|
for query, b in reply_bytes_dict.items():
|
|
offset = common.bytes2int(query[0:1])
|
|
b += (self.byte_count - len(b)) * b"\x00"
|
|
value = common.bytes2int(b[: self.byte_count])
|
|
mask_to_opt = self._option_from_offset_mask.get(offset, {})
|
|
m = 1
|
|
for _ignore in range(8 * self.byte_count):
|
|
if m in mask_to_opt:
|
|
values[int(mask_to_opt[m])] = bool(value & m)
|
|
m <<= 1
|
|
return values
|
|
|
|
def prepare_write(self, new_value):
|
|
assert isinstance(new_value, dict)
|
|
w = {}
|
|
for k, v in new_value.items():
|
|
option = self._option_from_key[int(k)]
|
|
offset, mask = self.om_method(option)
|
|
if offset not in w:
|
|
w[offset] = 0
|
|
if v:
|
|
w[offset] |= mask
|
|
return [
|
|
common.int2bytes(
|
|
(offset << (8 * (2 * self.byte_count + 1)))
|
|
| (self.sep << (16 * self.byte_count))
|
|
| (self._mask_from_offset[offset] << (8 * self.byte_count))
|
|
| value,
|
|
2 * self.byte_count + 2,
|
|
)
|
|
for offset, value in w.items()
|
|
]
|
|
|
|
def get_options(self):
|
|
return [int(opt) if isinstance(opt, int) else opt.as_int() for opt in self.options]
|
|
|
|
def acceptable(self, args, current):
|
|
if len(args) != 2:
|
|
return None
|
|
key = next((option.id for option in self.options if option.as_int() == args[0]), None)
|
|
if key is None:
|
|
return None
|
|
val = bool_or_toggle(current[int(key)], args[1])
|
|
return None if val is None else [int(key), val]
|
|
|
|
def compare(self, args, current):
|
|
if len(args) != 2:
|
|
return False
|
|
key = next((option.id for option in self.options if option.as_int() == args[0]), None)
|
|
if key is None:
|
|
return False
|
|
return args[1] == current[int(key)]
|
|
|
|
|
|
class ChoicesValidator(Validator):
|
|
"""Translates between NamedInts and a byte sequence.
|
|
:param choices: a list of NamedInts
|
|
:param byte_count: the size of the derived byte sequence. If None, it
|
|
will be calculated from the choices."""
|
|
|
|
kind = Kind.CHOICE
|
|
|
|
def __init__(self, choices=None, byte_count=None, read_skip_byte_count=0, write_prefix_bytes=b""):
|
|
assert choices is not None
|
|
assert isinstance(choices, NamedInts)
|
|
assert len(choices) > 1
|
|
self.choices = choices
|
|
self.needs_current_value = False
|
|
|
|
max_bits = max(x.bit_length() for x in choices)
|
|
self._byte_count = (max_bits // 8) + (1 if max_bits % 8 else 0)
|
|
if byte_count:
|
|
assert self._byte_count <= byte_count
|
|
self._byte_count = byte_count
|
|
assert self._byte_count < 8
|
|
self._read_skip_byte_count = read_skip_byte_count
|
|
self._write_prefix_bytes = write_prefix_bytes if write_prefix_bytes else b""
|
|
assert self._byte_count + self._read_skip_byte_count <= 14
|
|
assert self._byte_count + len(self._write_prefix_bytes) <= 14
|
|
|
|
def to_string(self, value) -> str:
|
|
return str(self.choices[value]) if isinstance(value, int) else str(value)
|
|
|
|
def validate_read(self, reply_bytes):
|
|
reply_value = common.bytes2int(reply_bytes[self._read_skip_byte_count : self._read_skip_byte_count + self._byte_count])
|
|
valid_value = self.choices[reply_value]
|
|
assert valid_value is not None, f"{self.__class__.__name__}: failed to validate read value {reply_value:02X}"
|
|
return valid_value
|
|
|
|
def prepare_write(self, new_value, current_value=None):
|
|
if new_value is None:
|
|
value = self.choices[:][0]
|
|
else:
|
|
value = self.choice(new_value)
|
|
if value is None:
|
|
raise ValueError(f"invalid choice {new_value!r}")
|
|
assert isinstance(value, NamedInt)
|
|
return self._write_prefix_bytes + value.bytes(self._byte_count)
|
|
|
|
def choice(self, value):
|
|
if isinstance(value, int):
|
|
return self.choices[value]
|
|
try:
|
|
int(value)
|
|
if int(value) in self.choices:
|
|
return self.choices[int(value)]
|
|
except Exception:
|
|
pass
|
|
if value in self.choices:
|
|
return self.choices[value]
|
|
else:
|
|
return None
|
|
|
|
def acceptable(self, args, current):
|
|
choice = self.choice(args[0]) if len(args) == 1 else None
|
|
return None if choice is None else [choice]
|
|
|
|
|
|
class ChoicesMapValidator(ChoicesValidator):
|
|
kind = Kind.MAP_CHOICE
|
|
|
|
def __init__(
|
|
self,
|
|
choices_map,
|
|
key_byte_count=0,
|
|
key_postfix_bytes=b"",
|
|
byte_count=0,
|
|
read_skip_byte_count=0,
|
|
write_prefix_bytes=b"",
|
|
extra_default=None,
|
|
mask=-1,
|
|
activate=0,
|
|
):
|
|
assert choices_map is not None
|
|
assert isinstance(choices_map, dict)
|
|
max_key_bits = 0
|
|
max_value_bits = 0
|
|
for key, choices in choices_map.items():
|
|
assert isinstance(key, NamedInt)
|
|
assert isinstance(choices, NamedInts)
|
|
max_key_bits = max(max_key_bits, key.bit_length())
|
|
for key_value in choices:
|
|
assert isinstance(key_value, NamedInt)
|
|
max_value_bits = max(max_value_bits, key_value.bit_length())
|
|
self._key_byte_count = (max_key_bits + 7) // 8
|
|
if key_byte_count:
|
|
assert self._key_byte_count <= key_byte_count
|
|
self._key_byte_count = key_byte_count
|
|
self._byte_count = (max_value_bits + 7) // 8
|
|
if byte_count:
|
|
assert self._byte_count <= byte_count
|
|
self._byte_count = byte_count
|
|
|
|
self.choices = choices_map
|
|
self.needs_current_value = False
|
|
self.extra_default = extra_default
|
|
self._key_postfix_bytes = key_postfix_bytes
|
|
self._read_skip_byte_count = read_skip_byte_count if read_skip_byte_count else 0
|
|
self._write_prefix_bytes = write_prefix_bytes if write_prefix_bytes else b""
|
|
self.activate = activate
|
|
self.mask = mask
|
|
assert self._byte_count + self._read_skip_byte_count + self._key_byte_count <= 14
|
|
assert self._byte_count + len(self._write_prefix_bytes) + self._key_byte_count <= 14
|
|
|
|
def to_string(self, value) -> str:
|
|
def element_to_string(key, val):
|
|
k, c = next(((k, c) for k, c in self.choices.items() if int(key) == k), (None, None))
|
|
return str(k) + ":" + str(c[val]) if k is not None else "?"
|
|
|
|
return "{" + ", ".join([element_to_string(k, value[k]) for k in sorted(value)]) + "}"
|
|
|
|
def validate_read(self, reply_bytes, key):
|
|
start = self._key_byte_count + self._read_skip_byte_count
|
|
end = start + self._byte_count
|
|
reply_value = common.bytes2int(reply_bytes[start:end]) & self.mask
|
|
# reprogrammable keys starts out as 0, which is not a choice, so don't use assert here
|
|
if self.extra_default is not None and self.extra_default == reply_value:
|
|
return int(self.choices[key][0])
|
|
if reply_value not in self.choices[key]:
|
|
assert reply_value in self.choices[key], "%s: failed to validate read value %02X" % (
|
|
self.__class__.__name__,
|
|
reply_value,
|
|
)
|
|
return reply_value
|
|
|
|
def prepare_key(self, key):
|
|
return key.to_bytes(self._key_byte_count, "big") + self._key_postfix_bytes
|
|
|
|
def prepare_write(self, key, new_value):
|
|
choices = self.choices.get(key)
|
|
if choices is None or (new_value not in choices and new_value != self.extra_default):
|
|
logger.error("invalid choice %r for %s", new_value, key)
|
|
return None
|
|
new_value = new_value | self.activate
|
|
return self._write_prefix_bytes + new_value.to_bytes(self._byte_count, "big")
|
|
|
|
def acceptable(self, args, current):
|
|
if len(args) != 2:
|
|
return None
|
|
key, choices = next(((key, item) for key, item in self.choices.items() if key == args[0]), (None, None))
|
|
if choices is None or args[1] not in choices:
|
|
return None
|
|
choice = next((item for item in choices if item == args[1]), None)
|
|
return [int(key), int(choice)] if choice is not None else None
|
|
|
|
def compare(self, args, current):
|
|
if len(args) != 2:
|
|
return False
|
|
key = next((key for key in self.choices if key == int(args[0])), None)
|
|
if key is None:
|
|
return False
|
|
return args[1] == current[int(key)]
|
|
|
|
|
|
class RangeValidator(Validator):
|
|
kind = Kind.RANGE
|
|
"""Translates between integers and a byte sequence.
|
|
:param min_value: minimum accepted value (inclusive)
|
|
:param max_value: maximum accepted value (inclusive)
|
|
:param byte_count: the size of the derived byte sequence. If None, it
|
|
will be calculated from the range."""
|
|
min_value = 0
|
|
max_value = 255
|
|
|
|
@classmethod
|
|
def build(cls, setting_class, device, **kwargs):
|
|
kwargs["min_value"] = setting_class.min_value
|
|
kwargs["max_value"] = setting_class.max_value
|
|
return cls(**kwargs)
|
|
|
|
def __init__(self, min_value=0, max_value=255, byte_count=1):
|
|
assert max_value > min_value
|
|
self.min_value = min_value
|
|
self.max_value = max_value
|
|
self.needs_current_value = True # read and check before write (needed for ADC power and probably a good idea anyway)
|
|
|
|
self._byte_count = math.ceil(math.log(max_value + 1, 256))
|
|
if byte_count:
|
|
assert self._byte_count <= byte_count
|
|
self._byte_count = byte_count
|
|
assert self._byte_count < 8
|
|
|
|
def validate_read(self, reply_bytes):
|
|
reply_value = common.bytes2int(reply_bytes[: self._byte_count])
|
|
assert reply_value >= self.min_value, f"{self.__class__.__name__}: failed to validate read value {reply_value:02X}"
|
|
assert reply_value <= self.max_value, f"{self.__class__.__name__}: failed to validate read value {reply_value:02X}"
|
|
return reply_value
|
|
|
|
def prepare_write(self, new_value, current_value=None):
|
|
if new_value < self.min_value or new_value > self.max_value:
|
|
raise ValueError(f"invalid choice {new_value!r}")
|
|
current_value = self.validate_read(current_value) if current_value is not None else None
|
|
to_write = common.int2bytes(new_value, self._byte_count)
|
|
# current value is known and same as value to be written return None to signal not to write it
|
|
return None if current_value is not None and current_value == new_value else to_write
|
|
|
|
def acceptable(self, args, current):
|
|
arg = args[0]
|
|
# None if len(args) != 1 or type(arg) != int or arg < self.min_value or arg > self.max_value else args)
|
|
return None if len(args) != 1 or isinstance(arg, int) or arg < self.min_value or arg > self.max_value else args
|
|
|
|
def compare(self, args, current):
|
|
if len(args) == 1:
|
|
return args[0] == current
|
|
elif len(args) == 2:
|
|
return args[0] <= current <= args[1]
|
|
else:
|
|
return False
|
|
|
|
|
|
class HeteroValidator(Validator):
|
|
kind = Kind.HETERO
|
|
|
|
@classmethod
|
|
def build(cls, setting_class, device, **kwargs):
|
|
return cls(**kwargs)
|
|
|
|
def __init__(self, data_class=None, options=None, readable=True):
|
|
assert data_class is not None and options is not None
|
|
self.data_class = data_class
|
|
self.options = options
|
|
self.readable = readable
|
|
self.needs_current_value = False
|
|
|
|
def validate_read(self, reply_bytes):
|
|
if self.readable:
|
|
reply_value = self.data_class.from_bytes(reply_bytes, options=self.options)
|
|
return reply_value
|
|
|
|
def prepare_write(self, new_value, current_value=None):
|
|
to_write = new_value.to_bytes(options=self.options)
|
|
return to_write
|
|
|
|
def acceptable(self, args, current): # should this actually do some checking?
|
|
return True
|
|
|
|
|
|
class PackedRangeValidator(Validator):
|
|
kind = Kind.PACKED_RANGE
|
|
"""Several range values, all the same size, all the same min and max"""
|
|
min_value = 0
|
|
max_value = 255
|
|
count = 1
|
|
rsbc = 0
|
|
write_prefix_bytes = b""
|
|
|
|
def __init__(
|
|
self, keys, min_value=0, max_value=255, count=1, byte_count=1, read_skip_byte_count=0, write_prefix_bytes=b""
|
|
):
|
|
assert max_value > min_value
|
|
self.needs_current_value = True
|
|
self.keys = keys
|
|
self.min_value = min_value
|
|
self.max_value = max_value
|
|
self.count = count
|
|
self.bc = math.ceil(math.log(max_value + 1 - min(0, min_value), 256))
|
|
if byte_count:
|
|
assert self.bc <= byte_count
|
|
self.bc = byte_count
|
|
assert self.bc * self.count
|
|
self.rsbc = read_skip_byte_count
|
|
self.write_prefix_bytes = write_prefix_bytes
|
|
|
|
def validate_read(self, reply_bytes):
|
|
rvs = {
|
|
n: common.bytes2int(reply_bytes[self.rsbc + n * self.bc : self.rsbc + (n + 1) * self.bc], signed=True)
|
|
for n in range(self.count)
|
|
}
|
|
for n in range(self.count):
|
|
assert rvs[n] >= self.min_value, f"{self.__class__.__name__}: failed to validate read value {rvs[n]:02X}"
|
|
assert rvs[n] <= self.max_value, f"{self.__class__.__name__}: failed to validate read value {rvs[n]:02X}"
|
|
return rvs
|
|
|
|
def prepare_write(self, new_values):
|
|
if len(new_values) != self.count:
|
|
raise ValueError(f"wrong number of values {new_values!r}")
|
|
for new_value in new_values.values():
|
|
if new_value < self.min_value or new_value > self.max_value:
|
|
raise ValueError(f"invalid value {new_value!r}")
|
|
bytes = self.write_prefix_bytes + b"".join(
|
|
common.int2bytes(new_values[n], self.bc, signed=True) for n in range(self.count)
|
|
)
|
|
return bytes
|
|
|
|
def acceptable(self, args, current):
|
|
if len(args) != 2 or int(args[0]) < 0 or int(args[0]) >= self.count:
|
|
return None
|
|
return None if not isinstance(args[1], int) or args[1] < self.min_value or args[1] > self.max_value else args
|
|
|
|
def compare(self, args, current):
|
|
logger.warning("compare not implemented for packed range settings")
|
|
return False
|
|
|
|
|
|
class MultipleRangeValidator(Validator):
|
|
kind = Kind.MULTIPLE_RANGE
|
|
|
|
def __init__(self, items, sub_items):
|
|
assert isinstance(items, list) # each element must have .index and its __int__ must return its id (not its index)
|
|
assert isinstance(sub_items, dict)
|
|
# sub_items: items -> class with .minimum, .maximum, .length (in bytes), .id (a string) and .widget (e.g. 'Scale')
|
|
self.items = items
|
|
self.keys = NamedInts(**{str(item): int(item) for item in items})
|
|
self._item_from_id = {int(k): k for k in items}
|
|
self.sub_items = sub_items
|
|
|
|
def prepare_read_item(self, item):
|
|
return common.int2bytes((self._item_from_id[int(item)].index << 1) | 0xFF, 2)
|
|
|
|
def validate_read_item(self, reply_bytes, item):
|
|
item = self._item_from_id[int(item)]
|
|
start = 0
|
|
value = {}
|
|
for sub_item in self.sub_items[item]:
|
|
r = reply_bytes[start : start + sub_item.length]
|
|
if len(r) < sub_item.length:
|
|
r += b"\x00" * (sub_item.length - len(value))
|
|
v = common.bytes2int(r)
|
|
if not (sub_item.minimum < v < sub_item.maximum):
|
|
logger.warning(
|
|
f"{self.__class__.__name__}: failed to validate read value for {item}.{sub_item}: "
|
|
+ f"{v} not in [{sub_item.minimum}..{sub_item.maximum}]"
|
|
)
|
|
value[str(sub_item)] = v
|
|
start += sub_item.length
|
|
return value
|
|
|
|
def prepare_write(self, value):
|
|
seq = []
|
|
w = b""
|
|
for item in value.keys():
|
|
_item = self._item_from_id[int(item)]
|
|
b = common.int2bytes(_item.index, 1)
|
|
for sub_item in self.sub_items[_item]:
|
|
try:
|
|
v = value[int(item)][str(sub_item)]
|
|
except KeyError:
|
|
return None
|
|
if not (sub_item.minimum <= v <= sub_item.maximum):
|
|
raise ValueError(
|
|
f"invalid choice for {item}.{sub_item}: {v} not in [{sub_item.minimum}..{sub_item.maximum}]"
|
|
)
|
|
b += common.int2bytes(v, sub_item.length)
|
|
if len(w) + len(b) > 15:
|
|
seq.append(b + b"\xff")
|
|
w = b""
|
|
w += b
|
|
seq.append(w + b"\xff")
|
|
return seq
|
|
|
|
def prepare_write_item(self, item, value):
|
|
_item = self._item_from_id[int(item)]
|
|
w = common.int2bytes(_item.index, 1)
|
|
for sub_item in self.sub_items[_item]:
|
|
try:
|
|
v = value[str(sub_item)]
|
|
except KeyError:
|
|
return None
|
|
if not (sub_item.minimum <= v <= sub_item.maximum):
|
|
raise ValueError(f"invalid choice for {item}.{sub_item}: {v} not in [{sub_item.minimum}..{sub_item.maximum}]")
|
|
w += common.int2bytes(v, sub_item.length)
|
|
return w + b"\xff"
|
|
|
|
def acceptable(self, args, current):
|
|
# just one item, with at least one sub-item
|
|
if not isinstance(args, list) or len(args) != 2 or not isinstance(args[1], dict):
|
|
return None
|
|
item = next((p for p in self.items if p.id == args[0] or str(p) == args[0]), None)
|
|
if not item:
|
|
return None
|
|
for sub_key, value in args[1].items():
|
|
sub_item = next((it for it in self.sub_items[item] if it.id == sub_key), None)
|
|
if not sub_item:
|
|
return None
|
|
if not isinstance(value, int) or not (sub_item.minimum <= value <= sub_item.maximum):
|
|
return None
|
|
return [int(item), {**args[1]}]
|
|
|
|
def compare(self, args, current):
|
|
logger.warning("compare not implemented for multiple range settings")
|
|
return False
|