settings: add packed ranges setting

This commit is contained in:
Peter F. Patel-Schneider 2022-06-07 07:34:14 -04:00
parent f1e2a0c449
commit 777a7138c1
1 changed files with 122 additions and 6 deletions

View File

@ -38,7 +38,9 @@ del getLogger
#
SENSITIVITY_IGNORE = 'ignore'
KIND = _NamedInts(toggle=0x01, choice=0x02, range=0x04, map_choice=0x0A, multiple_toggle=0x10, multiple_range=0x40)
KIND = _NamedInts(
toggle=0x01, choice=0x02, range=0x04, map_choice=0x0A, multiple_toggle=0x10, packed_range=0x20, multiple_range=0x40
)
def bool_or_toggle(current, new):
@ -658,6 +660,67 @@ class BitFieldWithOffsetAndMaskSetting(BitFieldSetting):
return {r: self._rw.read(self._device, r)}
class RangeFieldSetting(Setting):
"""A setting descriptor for a set of choices represented by one field each, with map from option names to range(0,n).
Needs to be instantiated for each specific device."""
def read(self, cached=True):
assert hasattr(self, '_value')
assert hasattr(self, '_device')
if _log.isEnabledFor(_DEBUG):
_log.debug('%s: settings read %r from %s', self.name, self._value, self._device)
self._pre_read(cached)
if cached and self._value is not None:
return self._value
if self._device.online:
reply_map = {}
reply = self._do_read()
if reply:
reply_map = self._validator.validate_read(reply)
self._value = reply_map
if getattr(self._device, 'persister', None) and self.name not in self._device.persister:
# Don't update the persister if it already has a value,
# otherwise the first read might overwrite the value we wanted.
self._device.persister[self.name] = self._value if self.persist else None
return self._value
def _do_read(self):
return self._rw.read(self._device)
def read_key(self, key, cached=True):
return self.read(cached)[int(key)]
def write(self, map, save=True):
assert hasattr(self, '_value')
assert hasattr(self, '_device')
assert map is not None
if _log.isEnabledFor(_DEBUG):
_log.debug('%s: settings write %r to %s', self.name, map, self._device)
if self._device.online:
self._value = map
self._pre_write(save)
data_bytes = self._validator.prepare_write(self._value)
if data_bytes is not None:
if _log.isEnabledFor(_DEBUG):
_log.debug('%s: settings prepare map write(%s) => %r', self.name, self._value, data_bytes)
reply = self._rw.write(self._device, data_bytes)
if not reply:
return None
return map
def write_key_value(self, key, value):
assert key is not None
assert value is not None
if _log.isEnabledFor(_DEBUG):
_log.debug('%s: settings write key %r value %r to %s', self.name, key, value, self._device)
if self._device.online:
if not self._value:
self.read()
map = self._value
map[int(key)] = value
self.write(map)
return value
#
# read/write low-level operators
#
@ -684,9 +747,7 @@ class FeatureRW:
default_read_fnid = 0x00
default_write_fnid = 0x10
def __init__(
self, feature, read_fnid=default_read_fnid, write_fnid=default_write_fnid, prefix=b'', suffix=b'', no_reply=False
):
def __init__(self, feature, read_fnid=0x00, write_fnid=0x10, prefix=b'', suffix=b'', read_prefix=b'', no_reply=False):
assert isinstance(feature, _NamedInt)
self.feature = feature
self.read_fnid = read_fnid
@ -694,10 +755,11 @@ class FeatureRW:
self.no_reply = no_reply
self.prefix = prefix
self.suffix = suffix
self.read_prefix = read_prefix
def read(self, device, data_bytes=b''):
assert self.feature is not None
return device.feature_request(self.feature, self.read_fnid, self.prefix, data_bytes)
return device.feature_request(self.feature, self.read_fnid, self.prefix, self.read_prefix, data_bytes)
def write(self, device, data_bytes):
assert self.feature is not None
@ -1118,8 +1180,62 @@ class RangeValidator(Validator):
return False
class MultipleRangeValidator(Validator):
class PackedRangeValidator(Validator):
kind = KIND.packed_range
"""Several range values, all the same size, all the same min and max"""
min_value = 0
max_value = 255
count = 1
rsbc = 0
write_prefix_bytes = b''
def __init__(
self, keys, min_value=0, max_value=255, count=1, byte_count=1, read_skip_byte_count=0, write_prefix_bytes=b''
):
assert max_value > min_value
self.needs_current_value = True
self.keys = keys
self.min_value = min_value
self.max_value = max_value
self.count = count
self.bc = math.ceil(math.log(max_value + 1 - min(0, min_value), 256))
if byte_count:
assert self.bc <= byte_count
self.bc = byte_count
assert self.bc * self.count
self.rsbc = read_skip_byte_count
self.write_prefix_bytes = write_prefix_bytes
def validate_read(self, reply_bytes):
rvs = {
n: _bytes2int(reply_bytes[self.rsbc + n * self.bc:self.rsbc + (n + 1) * self.bc], signed=True)
for n in range(self.count)
}
for n in range(self.count):
assert rvs[n] >= self.min_value, '%s: failed to validate read value %02X' % (self.__class__.__name__, rvs[n])
assert rvs[n] <= self.max_value, '%s: failed to validate read value %02X' % (self.__class__.__name__, rvs[n])
return rvs
def prepare_write(self, new_values):
if len(new_values) != self.count:
raise ValueError('wrong number of values %r' % new_values)
for new_value in new_values:
if new_value < self.min_value or new_value > self.max_value:
raise ValueError('invalid value %r' % new_value)
bytes = self.write_prefix_bytes + b''.join(_int2bytes(new_values[n], self.bc, signed=True) for n in range(self.count))
return bytes
def acceptable(self, args, current):
if len(args) != 2 or int(args[0]) < 0 or int(args[0]) >= self.count:
return None
return None if type(args[1]) != int or args[1] < self.min_value or args[1] > self.max_value else args
def compare(self, args, current):
_log.warn('compare not implemented for packed range settings')
return False
class MultipleRangeValidator(Validator):
kind = KIND.multiple_range
def __init__(self, items, sub_items):