From 777a7138c17b0b633b0e1d22ec2c200e603fb501 Mon Sep 17 00:00:00 2001 From: "Peter F. Patel-Schneider" Date: Tue, 7 Jun 2022 07:34:14 -0400 Subject: [PATCH] settings: add packed ranges setting --- lib/logitech_receiver/settings.py | 128 ++++++++++++++++++++++++++++-- 1 file changed, 122 insertions(+), 6 deletions(-) diff --git a/lib/logitech_receiver/settings.py b/lib/logitech_receiver/settings.py index 7d1d19ca..e3125dab 100644 --- a/lib/logitech_receiver/settings.py +++ b/lib/logitech_receiver/settings.py @@ -38,7 +38,9 @@ del getLogger # SENSITIVITY_IGNORE = 'ignore' -KIND = _NamedInts(toggle=0x01, choice=0x02, range=0x04, map_choice=0x0A, multiple_toggle=0x10, multiple_range=0x40) +KIND = _NamedInts( + toggle=0x01, choice=0x02, range=0x04, map_choice=0x0A, multiple_toggle=0x10, packed_range=0x20, multiple_range=0x40 +) def bool_or_toggle(current, new): @@ -658,6 +660,67 @@ class BitFieldWithOffsetAndMaskSetting(BitFieldSetting): return {r: self._rw.read(self._device, r)} +class RangeFieldSetting(Setting): + """A setting descriptor for a set of choices represented by one field each, with map from option names to range(0,n). + Needs to be instantiated for each specific device.""" + def read(self, cached=True): + assert hasattr(self, '_value') + assert hasattr(self, '_device') + if _log.isEnabledFor(_DEBUG): + _log.debug('%s: settings read %r from %s', self.name, self._value, self._device) + self._pre_read(cached) + if cached and self._value is not None: + return self._value + if self._device.online: + reply_map = {} + reply = self._do_read() + if reply: + reply_map = self._validator.validate_read(reply) + self._value = reply_map + if getattr(self._device, 'persister', None) and self.name not in self._device.persister: + # Don't update the persister if it already has a value, + # otherwise the first read might overwrite the value we wanted. + self._device.persister[self.name] = self._value if self.persist else None + return self._value + + def _do_read(self): + return self._rw.read(self._device) + + def read_key(self, key, cached=True): + return self.read(cached)[int(key)] + + def write(self, map, save=True): + assert hasattr(self, '_value') + assert hasattr(self, '_device') + assert map is not None + if _log.isEnabledFor(_DEBUG): + _log.debug('%s: settings write %r to %s', self.name, map, self._device) + if self._device.online: + self._value = map + self._pre_write(save) + data_bytes = self._validator.prepare_write(self._value) + if data_bytes is not None: + if _log.isEnabledFor(_DEBUG): + _log.debug('%s: settings prepare map write(%s) => %r', self.name, self._value, data_bytes) + reply = self._rw.write(self._device, data_bytes) + if not reply: + return None + return map + + def write_key_value(self, key, value): + assert key is not None + assert value is not None + if _log.isEnabledFor(_DEBUG): + _log.debug('%s: settings write key %r value %r to %s', self.name, key, value, self._device) + if self._device.online: + if not self._value: + self.read() + map = self._value + map[int(key)] = value + self.write(map) + return value + + # # read/write low-level operators # @@ -684,9 +747,7 @@ class FeatureRW: default_read_fnid = 0x00 default_write_fnid = 0x10 - def __init__( - self, feature, read_fnid=default_read_fnid, write_fnid=default_write_fnid, prefix=b'', suffix=b'', no_reply=False - ): + def __init__(self, feature, read_fnid=0x00, write_fnid=0x10, prefix=b'', suffix=b'', read_prefix=b'', no_reply=False): assert isinstance(feature, _NamedInt) self.feature = feature self.read_fnid = read_fnid @@ -694,10 +755,11 @@ class FeatureRW: self.no_reply = no_reply self.prefix = prefix self.suffix = suffix + self.read_prefix = read_prefix def read(self, device, data_bytes=b''): assert self.feature is not None - return device.feature_request(self.feature, self.read_fnid, self.prefix, data_bytes) + return device.feature_request(self.feature, self.read_fnid, self.prefix, self.read_prefix, data_bytes) def write(self, device, data_bytes): assert self.feature is not None @@ -1118,8 +1180,62 @@ class RangeValidator(Validator): return False -class MultipleRangeValidator(Validator): +class PackedRangeValidator(Validator): + kind = KIND.packed_range + """Several range values, all the same size, all the same min and max""" + min_value = 0 + max_value = 255 + count = 1 + rsbc = 0 + write_prefix_bytes = b'' + def __init__( + self, keys, min_value=0, max_value=255, count=1, byte_count=1, read_skip_byte_count=0, write_prefix_bytes=b'' + ): + assert max_value > min_value + self.needs_current_value = True + self.keys = keys + self.min_value = min_value + self.max_value = max_value + self.count = count + self.bc = math.ceil(math.log(max_value + 1 - min(0, min_value), 256)) + if byte_count: + assert self.bc <= byte_count + self.bc = byte_count + assert self.bc * self.count + self.rsbc = read_skip_byte_count + self.write_prefix_bytes = write_prefix_bytes + + def validate_read(self, reply_bytes): + rvs = { + n: _bytes2int(reply_bytes[self.rsbc + n * self.bc:self.rsbc + (n + 1) * self.bc], signed=True) + for n in range(self.count) + } + for n in range(self.count): + assert rvs[n] >= self.min_value, '%s: failed to validate read value %02X' % (self.__class__.__name__, rvs[n]) + assert rvs[n] <= self.max_value, '%s: failed to validate read value %02X' % (self.__class__.__name__, rvs[n]) + return rvs + + def prepare_write(self, new_values): + if len(new_values) != self.count: + raise ValueError('wrong number of values %r' % new_values) + for new_value in new_values: + if new_value < self.min_value or new_value > self.max_value: + raise ValueError('invalid value %r' % new_value) + bytes = self.write_prefix_bytes + b''.join(_int2bytes(new_values[n], self.bc, signed=True) for n in range(self.count)) + return bytes + + def acceptable(self, args, current): + if len(args) != 2 or int(args[0]) < 0 or int(args[0]) >= self.count: + return None + return None if type(args[1]) != int or args[1] < self.min_value or args[1] > self.max_value else args + + def compare(self, args, current): + _log.warn('compare not implemented for packed range settings') + return False + + +class MultipleRangeValidator(Validator): kind = KIND.multiple_range def __init__(self, items, sub_items):