device: reorder code in settings
This commit is contained in:
parent
e8dadcd5c2
commit
67be689866
|
@ -64,153 +64,6 @@ def bool_or_toggle(current, new):
|
|||
return None
|
||||
|
||||
|
||||
# moved first for dependency reasons
|
||||
class Validator:
|
||||
@classmethod
|
||||
def build(cls, setting_class, device, **kwargs):
|
||||
return cls(**kwargs)
|
||||
|
||||
@classmethod
|
||||
def to_string(cls, value):
|
||||
return str(value)
|
||||
|
||||
def compare(self, args, current):
|
||||
if len(args) != 1:
|
||||
return False
|
||||
return args[0] == current
|
||||
|
||||
|
||||
class BooleanValidator(Validator):
|
||||
__slots__ = ("true_value", "false_value", "read_skip_byte_count", "write_prefix_bytes", "mask", "needs_current_value")
|
||||
|
||||
kind = KIND.toggle
|
||||
default_true = 0x01
|
||||
default_false = 0x00
|
||||
# mask specifies all the affected bits in the value
|
||||
default_mask = 0xFF
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
true_value=default_true,
|
||||
false_value=default_false,
|
||||
mask=default_mask,
|
||||
read_skip_byte_count=0,
|
||||
write_prefix_bytes=b"",
|
||||
):
|
||||
if isinstance(true_value, int):
|
||||
assert isinstance(false_value, int)
|
||||
if mask is None:
|
||||
mask = self.default_mask
|
||||
else:
|
||||
assert isinstance(mask, int)
|
||||
assert true_value & false_value == 0
|
||||
assert true_value & mask == true_value
|
||||
assert false_value & mask == false_value
|
||||
self.needs_current_value = mask != self.default_mask
|
||||
elif isinstance(true_value, bytes):
|
||||
if false_value is None or false_value == self.default_false:
|
||||
false_value = b"\x00" * len(true_value)
|
||||
else:
|
||||
assert isinstance(false_value, bytes)
|
||||
if mask is None or mask == self.default_mask:
|
||||
mask = b"\xFF" * len(true_value)
|
||||
else:
|
||||
assert isinstance(mask, bytes)
|
||||
assert len(mask) == len(true_value) == len(false_value)
|
||||
tv = _bytes2int(true_value)
|
||||
fv = _bytes2int(false_value)
|
||||
mv = _bytes2int(mask)
|
||||
assert tv != fv # true and false might be something other than bit values
|
||||
assert tv & mv == tv
|
||||
assert fv & mv == fv
|
||||
self.needs_current_value = any(m != 0xFF for m in mask)
|
||||
else:
|
||||
raise Exception("invalid mask '%r', type %s" % (mask, type(mask)))
|
||||
|
||||
self.true_value = true_value
|
||||
self.false_value = false_value
|
||||
self.mask = mask
|
||||
self.read_skip_byte_count = read_skip_byte_count
|
||||
self.write_prefix_bytes = write_prefix_bytes
|
||||
|
||||
def validate_read(self, reply_bytes):
|
||||
reply_bytes = reply_bytes[self.read_skip_byte_count :]
|
||||
if isinstance(self.mask, int):
|
||||
reply_value = ord(reply_bytes[:1]) & self.mask
|
||||
if logger.isEnabledFor(logging.DEBUG):
|
||||
logger.debug("BooleanValidator: validate read %r => %02X", reply_bytes, reply_value)
|
||||
if reply_value == self.true_value:
|
||||
return True
|
||||
if reply_value == self.false_value:
|
||||
return False
|
||||
logger.warning(
|
||||
"BooleanValidator: reply %02X mismatched %02X/%02X/%02X",
|
||||
reply_value,
|
||||
self.true_value,
|
||||
self.false_value,
|
||||
self.mask,
|
||||
)
|
||||
return False
|
||||
|
||||
count = len(self.mask)
|
||||
mask = _bytes2int(self.mask)
|
||||
reply_value = _bytes2int(reply_bytes[:count]) & mask
|
||||
|
||||
true_value = _bytes2int(self.true_value)
|
||||
if reply_value == true_value:
|
||||
return True
|
||||
|
||||
false_value = _bytes2int(self.false_value)
|
||||
if reply_value == false_value:
|
||||
return False
|
||||
|
||||
logger.warning(
|
||||
"BooleanValidator: reply %r mismatched %r/%r/%r", reply_bytes, self.true_value, self.false_value, self.mask
|
||||
)
|
||||
return False
|
||||
|
||||
def prepare_write(self, new_value, current_value=None):
|
||||
if new_value is None:
|
||||
new_value = False
|
||||
else:
|
||||
assert isinstance(new_value, bool), "New value %s for boolean setting is not a boolean" % new_value
|
||||
|
||||
to_write = self.true_value if new_value else self.false_value
|
||||
|
||||
if isinstance(self.mask, int):
|
||||
if current_value is not None and self.needs_current_value:
|
||||
to_write |= ord(current_value[:1]) & (0xFF ^ self.mask)
|
||||
if current_value is not None and to_write == ord(current_value[:1]):
|
||||
return None
|
||||
to_write = bytes([to_write])
|
||||
else:
|
||||
to_write = bytearray(to_write)
|
||||
count = len(self.mask)
|
||||
for i in range(0, count):
|
||||
b = ord(to_write[i : i + 1])
|
||||
m = ord(self.mask[i : i + 1])
|
||||
assert b & m == b
|
||||
# b &= m
|
||||
if current_value is not None and self.needs_current_value:
|
||||
b |= ord(current_value[i : i + 1]) & (0xFF ^ m)
|
||||
to_write[i] = b
|
||||
to_write = bytes(to_write)
|
||||
|
||||
if current_value is not None and to_write == current_value[: len(to_write)]:
|
||||
return None
|
||||
|
||||
if logger.isEnabledFor(logging.DEBUG):
|
||||
logger.debug("BooleanValidator: prepare_write(%s, %s) => %r", new_value, current_value, to_write)
|
||||
|
||||
return self.write_prefix_bytes + to_write
|
||||
|
||||
def acceptable(self, args, current):
|
||||
if len(args) != 1:
|
||||
return None
|
||||
val = bool_or_toggle(current, args[0])
|
||||
return [val] if val is not None else None
|
||||
|
||||
|
||||
class Setting:
|
||||
"""A setting descriptor. Needs to be instantiated for each specific device."""
|
||||
|
||||
|
@ -219,7 +72,7 @@ class Setting:
|
|||
min_version = 0
|
||||
persist = True
|
||||
rw_options = {}
|
||||
validator_class = BooleanValidator
|
||||
validator_class = None
|
||||
validator_options = {}
|
||||
|
||||
def __init__(self, device, rw, validator):
|
||||
|
@ -836,6 +689,152 @@ class FeatureRWMap(FeatureRW):
|
|||
return reply if not self.no_reply else True
|
||||
|
||||
|
||||
class Validator:
|
||||
@classmethod
|
||||
def build(cls, setting_class, device, **kwargs):
|
||||
return cls(**kwargs)
|
||||
|
||||
@classmethod
|
||||
def to_string(cls, value):
|
||||
return str(value)
|
||||
|
||||
def compare(self, args, current):
|
||||
if len(args) != 1:
|
||||
return False
|
||||
return args[0] == current
|
||||
|
||||
|
||||
class BooleanValidator(Validator):
|
||||
__slots__ = ("true_value", "false_value", "read_skip_byte_count", "write_prefix_bytes", "mask", "needs_current_value")
|
||||
|
||||
kind = KIND.toggle
|
||||
default_true = 0x01
|
||||
default_false = 0x00
|
||||
# mask specifies all the affected bits in the value
|
||||
default_mask = 0xFF
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
true_value=default_true,
|
||||
false_value=default_false,
|
||||
mask=default_mask,
|
||||
read_skip_byte_count=0,
|
||||
write_prefix_bytes=b"",
|
||||
):
|
||||
if isinstance(true_value, int):
|
||||
assert isinstance(false_value, int)
|
||||
if mask is None:
|
||||
mask = self.default_mask
|
||||
else:
|
||||
assert isinstance(mask, int)
|
||||
assert true_value & false_value == 0
|
||||
assert true_value & mask == true_value
|
||||
assert false_value & mask == false_value
|
||||
self.needs_current_value = mask != self.default_mask
|
||||
elif isinstance(true_value, bytes):
|
||||
if false_value is None or false_value == self.default_false:
|
||||
false_value = b"\x00" * len(true_value)
|
||||
else:
|
||||
assert isinstance(false_value, bytes)
|
||||
if mask is None or mask == self.default_mask:
|
||||
mask = b"\xFF" * len(true_value)
|
||||
else:
|
||||
assert isinstance(mask, bytes)
|
||||
assert len(mask) == len(true_value) == len(false_value)
|
||||
tv = _bytes2int(true_value)
|
||||
fv = _bytes2int(false_value)
|
||||
mv = _bytes2int(mask)
|
||||
assert tv != fv # true and false might be something other than bit values
|
||||
assert tv & mv == tv
|
||||
assert fv & mv == fv
|
||||
self.needs_current_value = any(m != 0xFF for m in mask)
|
||||
else:
|
||||
raise Exception("invalid mask '%r', type %s" % (mask, type(mask)))
|
||||
|
||||
self.true_value = true_value
|
||||
self.false_value = false_value
|
||||
self.mask = mask
|
||||
self.read_skip_byte_count = read_skip_byte_count
|
||||
self.write_prefix_bytes = write_prefix_bytes
|
||||
|
||||
def validate_read(self, reply_bytes):
|
||||
reply_bytes = reply_bytes[self.read_skip_byte_count :]
|
||||
if isinstance(self.mask, int):
|
||||
reply_value = ord(reply_bytes[:1]) & self.mask
|
||||
if logger.isEnabledFor(logging.DEBUG):
|
||||
logger.debug("BooleanValidator: validate read %r => %02X", reply_bytes, reply_value)
|
||||
if reply_value == self.true_value:
|
||||
return True
|
||||
if reply_value == self.false_value:
|
||||
return False
|
||||
logger.warning(
|
||||
"BooleanValidator: reply %02X mismatched %02X/%02X/%02X",
|
||||
reply_value,
|
||||
self.true_value,
|
||||
self.false_value,
|
||||
self.mask,
|
||||
)
|
||||
return False
|
||||
|
||||
count = len(self.mask)
|
||||
mask = _bytes2int(self.mask)
|
||||
reply_value = _bytes2int(reply_bytes[:count]) & mask
|
||||
|
||||
true_value = _bytes2int(self.true_value)
|
||||
if reply_value == true_value:
|
||||
return True
|
||||
|
||||
false_value = _bytes2int(self.false_value)
|
||||
if reply_value == false_value:
|
||||
return False
|
||||
|
||||
logger.warning(
|
||||
"BooleanValidator: reply %r mismatched %r/%r/%r", reply_bytes, self.true_value, self.false_value, self.mask
|
||||
)
|
||||
return False
|
||||
|
||||
def prepare_write(self, new_value, current_value=None):
|
||||
if new_value is None:
|
||||
new_value = False
|
||||
else:
|
||||
assert isinstance(new_value, bool), "New value %s for boolean setting is not a boolean" % new_value
|
||||
|
||||
to_write = self.true_value if new_value else self.false_value
|
||||
|
||||
if isinstance(self.mask, int):
|
||||
if current_value is not None and self.needs_current_value:
|
||||
to_write |= ord(current_value[:1]) & (0xFF ^ self.mask)
|
||||
if current_value is not None and to_write == ord(current_value[:1]):
|
||||
return None
|
||||
to_write = bytes([to_write])
|
||||
else:
|
||||
to_write = bytearray(to_write)
|
||||
count = len(self.mask)
|
||||
for i in range(0, count):
|
||||
b = ord(to_write[i : i + 1])
|
||||
m = ord(self.mask[i : i + 1])
|
||||
assert b & m == b
|
||||
# b &= m
|
||||
if current_value is not None and self.needs_current_value:
|
||||
b |= ord(current_value[i : i + 1]) & (0xFF ^ m)
|
||||
to_write[i] = b
|
||||
to_write = bytes(to_write)
|
||||
|
||||
if current_value is not None and to_write == current_value[: len(to_write)]:
|
||||
return None
|
||||
|
||||
if logger.isEnabledFor(logging.DEBUG):
|
||||
logger.debug("BooleanValidator: prepare_write(%s, %s) => %r", new_value, current_value, to_write)
|
||||
|
||||
return self.write_prefix_bytes + to_write
|
||||
|
||||
def acceptable(self, args, current):
|
||||
if len(args) != 1:
|
||||
return None
|
||||
val = bool_or_toggle(current, args[0])
|
||||
return [val] if val is not None else None
|
||||
|
||||
|
||||
class BitFieldValidator(Validator):
|
||||
__slots__ = ("byte_count", "options")
|
||||
|
||||
|
@ -1569,3 +1568,6 @@ def apply_all_settings(device):
|
|||
ignore = sensitives.get(s.name, False)
|
||||
if ignore != SENSITIVITY_IGNORE:
|
||||
s.apply()
|
||||
|
||||
|
||||
Setting.validator_class = BooleanValidator
|
||||
|
|
Loading…
Reference in New Issue