diff --git a/lib/logitech_receiver/hidpp10.py b/lib/logitech_receiver/hidpp10.py index 21b8c006..8d341953 100644 --- a/lib/logitech_receiver/hidpp10.py +++ b/lib/logitech_receiver/hidpp10.py @@ -117,7 +117,7 @@ REGISTERS = _NamedInts( receiver_info=0x2B5, # only apply to devices - mouse_smooth_scroll=0x01, + mouse_button_flags=0x01, keyboard_hand_detection=0x01, battery_status=0x07, keyboard_fn_swap=0x09, diff --git a/lib/logitech_receiver/settings.py b/lib/logitech_receiver/settings.py index cb454c58..2a4396b2 100644 --- a/lib/logitech_receiver/settings.py +++ b/lib/logitech_receiver/settings.py @@ -19,16 +19,24 @@ from __future__ import absolute_import, division, print_function, unicode_literals +from logging import getLogger, DEBUG as _DEBUG +_log = getLogger(__name__) +del getLogger + from copy import copy as _copy -from .common import NamedInt as _NamedInt, NamedInts as _NamedInts +from .common import ( + NamedInt as _NamedInt, + NamedInts as _NamedInts, + bytes2int as _bytes2int, + ) # # # -KIND = _NamedInts(toggle=0x1, choice=0x02, range=0x12) +KIND = _NamedInts(toggle=0x01, choice=0x02, range=0x12) class Setting(object): """A setting descriptor. @@ -102,22 +110,46 @@ class Setting(object): def write(self, value): assert hasattr(self, '_value') assert hasattr(self, '_device') + assert value is not None - if self._device: - data_bytes = self._validator.prepare_write(value) - reply = self._rw.write(self._device, data_bytes) - if reply: - self._value = self._validator.validate_write(value, reply) - if self.persister and self._value is not None: - self.persister[self.name] = self._value - return self._value + if _log.isEnabledFor(_DEBUG): + _log.debug("%s: write %r to %s", self.name, value, self._device) + + if self._device.online: + # Remember the value we're trying to set, even if the write fails. + # This way even if the device is offline or some other error occurs, + # the last value we've tried to write is remembered in the configuration. + self._value = value + if self.persister: + self.persister[self.name] = value + + current_value = None + if self._validator.needs_current_value: + # the validator needs the current value, possibly to merge flag values + current_value = self._rw.read(self._device) + + data_bytes = self._validator.prepare_write(value, current_value) + if data_bytes is not None: + if _log.isEnabledFor(_DEBUG): + _log.debug("%s: prepare write(%s) => %r", self.name, value, data_bytes) + + reply = self._rw.write(self._device, data_bytes) + if not reply: + # tell whomever is calling that the write failed + return None + + return value def apply(self): assert hasattr(self, '_value') assert hasattr(self, '_device') - if self._value is not None: - self.write(self._value) + if _log.isEnabledFor(_DEBUG): + _log.debug("%s: apply %s (%s)", self.name, self._value, self._device) + + value = self.read() + if value is not None: + self.write(value) def __str__(self): if hasattr(self, '_value'): @@ -173,7 +205,7 @@ class FeatureRW(object): # class BooleanValidator(object): - __slots__ = ['true_value', 'false_value', 'mask', 'write_returns_value'] + __slots__ = ['true_value', 'false_value', 'mask', 'needs_current_value'] kind = KIND.toggle default_true = 0x01 @@ -181,72 +213,139 @@ class BooleanValidator(object): # mask specifies all the affected bits in the value default_mask = 0xFF - def __init__(self, true_value=default_true, false_value=default_false, mask=default_mask, write_returns_value=False): + def __init__(self, true_value=default_true, false_value=default_false, mask=default_mask): + if isinstance(true_value, int): + assert isinstance(false_value, int) + if mask is None: + mask = self.default_mask + else: + assert isinstance(mask, int) + assert true_value & false_value == 0 + assert true_value & mask == true_value + assert false_value & mask == false_value + self.needs_current_value = (mask != self.default_mask) + elif isinstance(true_value, bytes): + if false_value is None or false_value == self.default_false: + false_value = b'\x00' * len(true_value) + else: + assert isinstance(false_value, bytes) + if mask is None or mask == self.default_mask: + mask = b'\xFF' * len(true_value) + else: + assert isinstance(mask, bytes) + assert len(mask) == len(true_value) == len(false_value) + tv = _bytes2int(true_value) + fv = _bytes2int(false_value) + mv = _bytes2int(mask) + assert tv & fv == 0 + assert tv & mv == tv + assert fv & mv == fv + self.needs_current_value = any(m != b'\xFF' for m in mask) + else: + raise Exception("invalid mask '%r', type %s" % (mask, type(mask))) + self.true_value = true_value self.false_value = false_value self.mask = mask - self.write_returns_value = write_returns_value - - def _validate_value(self, reply_bytes, expected_value): - if isinstance(expected_value, int): - return ord(reply_bytes[:1]) & self.mask == expected_value - else: - for i in range(0, len(self.mask)): - masked_value = ord(reply_bytes[i:i+1]) & ord(self.mask[i:i+1]) - if masked_value != ord(expected_value[i:i+1]): - return False - return True def validate_read(self, reply_bytes): - return self._validate_value(reply_bytes, self.true_value) + if isinstance(self.mask, int): + reply_value = ord(reply_bytes[:1]) & self.mask + if _log.isEnabledFor(_DEBUG): + _log.debug("BooleanValidator: validate read %r => %02X", reply_bytes, reply_value) + if reply_value == self.true_value: + return True + if reply_value == self.false_value: + return False + _log.warn("BooleanValidator: reply %02X mismatched %02X/%02X/%02X", + reply_value, self.true_value, self.false_value, self.mask) + return False - def prepare_write(self, value): - # FIXME: this does not work right when there is more than one flag in - # the same register! - return self.true_value if value else self.false_value + count = len(self.mask) + mask = _bytes2int(self.mask) + reply_value = _bytes2int(reply_bytes[:count]) & mask - def validate_write(self, value, reply_bytes): - if self.write_returns_value: - return self._validate_value(reply_bytes, self.true_value) + true_value = _bytes2int(self.true_value) + if reply_value == true_value: + return True - # just assume the value was written correctly, otherwise there would not - # be any reply_bytes to check - return bool(value) + false_value = _bytes2int(self.false_value) + if reply_value == false_value: + return False + + _log.warn("BooleanValidator: reply %r mismatched %r/%r/%r", + reply_bytes, self.true_value, self.false_value, self.mask) + return False + + def prepare_write(self, new_value, current_value=None): + if new_value is None: + new_value = False + else: + assert isinstance(new_value, bool) + + to_write = self.true_value if new_value else self.false_value + + if isinstance(self.mask, int): + if current_value is not None and self.needs_current_value: + to_write |= ord(current_value[:1]) & (0xFF ^ self.mask) + if current_value is not None and to_write == ord(current_value[:1]): + return None + else: + to_write = list(to_write) + count = len(self.mask) + for i in range(0, count): + b = ord(to_write[i]) + m = ord(self.mask[i : i + 1]) + assert b & m == b + # b &= m + if current_value is not None and self.needs_current_value: + b |= ord(current_value[i : i + 1]) & (0xFF ^ m) + to_write[i] = chr(b) + to_write = b''.join(to_write) + + if current_value is not None and to_write == current_value[:len(to_write)]: + return None + + if _log.isEnabledFor(_DEBUG): + _log.debug("BooleanValidator: prepare_write(%s, %s) => %r", new_value, current_value, to_write) + + return to_write class ChoicesValidator(object): - __slots__ = ['choices', 'write_returns_value'] + __slots__ = ['choices', 'flag', '_bytes_count', 'needs_current_value'] kind = KIND.choice - def __init__(self, choices, write_returns_value=False): + def __init__(self, choices): + assert choices is not None assert isinstance(choices, _NamedInts) + assert len(choices) > 2 self.choices = choices - self.write_returns_value = write_returns_value + self.needs_current_value = False + + max_bits = max(x.bit_length() for x in choices) + self._bytes_count = (max_bits // 8) + (1 if max_bits % 8 else 0) + assert self._bytes_count < 8 def validate_read(self, reply_bytes): - assert self.choices is not None - reply_value = ord(reply_bytes[:1]) + reply_value = _bytes2int(reply_bytes[:self._bytes_count]) valid_value = self.choices[reply_value] - assert valid_value is not None, "%: failed to validate read value %02X" % (self.__class__.__name__, reply_value) + assert valid_value is not None, "%s: failed to validate read value %02X" % (self.__class__.__name__, reply_value) return valid_value - def prepare_write(self, value): - assert self.choices is not None - choice = self.choices[value] + def prepare_write(self, new_value, current_value=None): + if new_value is None: + choice = self.choices[:][0] + else: + if isinstance(new_value, int): + choice = self.choices[new_value] + elif new_value in self.choices: + choice = self.choices[new_value] + else: + raise ValueError(new_value) + if choice is None: - raise ValueError("invalid choice " + repr(value)) + raise ValueError("invalid choice %r" % new_value) assert isinstance(choice, _NamedInt) - return choice.bytes(1) - - def validate_write(self, value, reply_bytes): - assert self.choices is not None - if self.write_returns_value: - reply_value = ord(reply_bytes[:1]) - choice = self.choices[reply_value] - assert choice is not None, "failed to validate write reply %02X" % reply_value - return choice - - # just assume the value was written correctly, otherwise there would not - # be any reply_bytes to check - return self.choices[value] + return choice.bytes(self._bytes_count) diff --git a/lib/logitech_receiver/settings_templates.py b/lib/logitech_receiver/settings_templates.py index a5f7af2d..68c43857 100644 --- a/lib/logitech_receiver/settings_templates.py +++ b/lib/logitech_receiver/settings_templates.py @@ -41,30 +41,33 @@ _F = _hidpp20.FEATURE # def register_toggle(name, register, - true_value=_BooleanV.default_true, false_value=_BooleanV.default_false, - mask=_BooleanV.default_mask, write_returns_value=False, + true_value=_BooleanV.default_true, + false_value=_BooleanV.default_false, + mask=_BooleanV.default_mask, label=None, description=None, device_kind=None): + validator = _BooleanV(true_value=true_value, false_value=false_value, mask=mask) rw = _RegisterRW(register) - validator = _BooleanV(true_value=true_value, false_value=false_value, mask=mask, write_returns_value=write_returns_value) return _Setting(name, rw, validator, label=label, description=description, device_kind=device_kind) def register_choices(name, register, choices, - kind=_KIND.choice, write_returns_value=False, + kind=_KIND.choice, label=None, description=None, device_kind=None): assert choices + validator = _ChoicesV(choices) rw = _RegisterRW(register) - validator = _ChoicesV(choices, write_returns_value=write_returns_value) return _Setting(name, rw, validator, kind=kind, label=label, description=description, device_kind=device_kind) def feature_toggle(name, feature, - read_function_id=_FeatureRW.default_read_fnid, write_function_id=_FeatureRW.default_write_fnid, - true_value=_BooleanV.default_true, false_value=_BooleanV.default_false, - mask=_BooleanV.default_mask, write_returns_value=False, + read_function_id=_FeatureRW.default_read_fnid, + write_function_id=_FeatureRW.default_write_fnid, + true_value=_BooleanV.default_true, + false_value=_BooleanV.default_false, + mask=_BooleanV.default_mask, label=None, description=None, device_kind=None): + validator = _BooleanV(true_value=true_value, false_value=false_value, mask=mask) rw = _FeatureRW(feature, read_function_id, write_function_id) - validator = _BooleanV(true_value=true_value, false_value=false_value, mask=mask, write_returns_value=write_returns_value) return _Setting(name, rw, validator, label=label, description=description, device_kind=device_kind) # @@ -90,7 +93,7 @@ def _register_fn_swap(register=_R.keyboard_fn_swap, true_value=b'\x00\x01', mask label=_FN_SWAP[1], description=_FN_SWAP[2], device_kind=_DK.keyboard) -def _register_smooth_scroll(register=_R.mouse_smooth_scroll, true_value=0x40, mask=0x40): +def _register_smooth_scroll(register=_R.mouse_button_flags, true_value=0x40, mask=0x40): return register_toggle(_SMOOTH_SCROLL[0], register, true_value=true_value, mask=mask, label=_SMOOTH_SCROLL[1], description=_SMOOTH_SCROLL[2], device_kind=_DK.mouse) @@ -103,7 +106,6 @@ def _register_dpi(register=_R.mouse_dpi, choices=None): def _feature_fn_swap(): return feature_toggle(_FN_SWAP[0], _F.FN_INVERSION, - write_returns_value=True, label=_FN_SWAP[1], description=_FN_SWAP[2], device_kind=_DK.keyboard)