properly mask flags when writing device settings; fixes #86
This commit is contained in:
parent
4074fb7750
commit
f7159e9338
|
@ -117,7 +117,7 @@ REGISTERS = _NamedInts(
|
||||||
receiver_info=0x2B5,
|
receiver_info=0x2B5,
|
||||||
|
|
||||||
# only apply to devices
|
# only apply to devices
|
||||||
mouse_smooth_scroll=0x01,
|
mouse_button_flags=0x01,
|
||||||
keyboard_hand_detection=0x01,
|
keyboard_hand_detection=0x01,
|
||||||
battery_status=0x07,
|
battery_status=0x07,
|
||||||
keyboard_fn_swap=0x09,
|
keyboard_fn_swap=0x09,
|
||||||
|
|
|
@ -19,16 +19,24 @@
|
||||||
|
|
||||||
from __future__ import absolute_import, division, print_function, unicode_literals
|
from __future__ import absolute_import, division, print_function, unicode_literals
|
||||||
|
|
||||||
|
from logging import getLogger, DEBUG as _DEBUG
|
||||||
|
_log = getLogger(__name__)
|
||||||
|
del getLogger
|
||||||
|
|
||||||
from copy import copy as _copy
|
from copy import copy as _copy
|
||||||
|
|
||||||
|
|
||||||
from .common import NamedInt as _NamedInt, NamedInts as _NamedInts
|
from .common import (
|
||||||
|
NamedInt as _NamedInt,
|
||||||
|
NamedInts as _NamedInts,
|
||||||
|
bytes2int as _bytes2int,
|
||||||
|
)
|
||||||
|
|
||||||
#
|
#
|
||||||
#
|
#
|
||||||
#
|
#
|
||||||
|
|
||||||
KIND = _NamedInts(toggle=0x1, choice=0x02, range=0x12)
|
KIND = _NamedInts(toggle=0x01, choice=0x02, range=0x12)
|
||||||
|
|
||||||
class Setting(object):
|
class Setting(object):
|
||||||
"""A setting descriptor.
|
"""A setting descriptor.
|
||||||
|
@ -102,22 +110,46 @@ class Setting(object):
|
||||||
def write(self, value):
|
def write(self, value):
|
||||||
assert hasattr(self, '_value')
|
assert hasattr(self, '_value')
|
||||||
assert hasattr(self, '_device')
|
assert hasattr(self, '_device')
|
||||||
|
assert value is not None
|
||||||
|
|
||||||
if self._device:
|
if _log.isEnabledFor(_DEBUG):
|
||||||
data_bytes = self._validator.prepare_write(value)
|
_log.debug("%s: write %r to %s", self.name, value, self._device)
|
||||||
reply = self._rw.write(self._device, data_bytes)
|
|
||||||
if reply:
|
if self._device.online:
|
||||||
self._value = self._validator.validate_write(value, reply)
|
# Remember the value we're trying to set, even if the write fails.
|
||||||
if self.persister and self._value is not None:
|
# This way even if the device is offline or some other error occurs,
|
||||||
self.persister[self.name] = self._value
|
# the last value we've tried to write is remembered in the configuration.
|
||||||
return self._value
|
self._value = value
|
||||||
|
if self.persister:
|
||||||
|
self.persister[self.name] = value
|
||||||
|
|
||||||
|
current_value = None
|
||||||
|
if self._validator.needs_current_value:
|
||||||
|
# the validator needs the current value, possibly to merge flag values
|
||||||
|
current_value = self._rw.read(self._device)
|
||||||
|
|
||||||
|
data_bytes = self._validator.prepare_write(value, current_value)
|
||||||
|
if data_bytes is not None:
|
||||||
|
if _log.isEnabledFor(_DEBUG):
|
||||||
|
_log.debug("%s: prepare write(%s) => %r", self.name, value, data_bytes)
|
||||||
|
|
||||||
|
reply = self._rw.write(self._device, data_bytes)
|
||||||
|
if not reply:
|
||||||
|
# tell whomever is calling that the write failed
|
||||||
|
return None
|
||||||
|
|
||||||
|
return value
|
||||||
|
|
||||||
def apply(self):
|
def apply(self):
|
||||||
assert hasattr(self, '_value')
|
assert hasattr(self, '_value')
|
||||||
assert hasattr(self, '_device')
|
assert hasattr(self, '_device')
|
||||||
|
|
||||||
if self._value is not None:
|
if _log.isEnabledFor(_DEBUG):
|
||||||
self.write(self._value)
|
_log.debug("%s: apply %s (%s)", self.name, self._value, self._device)
|
||||||
|
|
||||||
|
value = self.read()
|
||||||
|
if value is not None:
|
||||||
|
self.write(value)
|
||||||
|
|
||||||
def __str__(self):
|
def __str__(self):
|
||||||
if hasattr(self, '_value'):
|
if hasattr(self, '_value'):
|
||||||
|
@ -173,7 +205,7 @@ class FeatureRW(object):
|
||||||
#
|
#
|
||||||
|
|
||||||
class BooleanValidator(object):
|
class BooleanValidator(object):
|
||||||
__slots__ = ['true_value', 'false_value', 'mask', 'write_returns_value']
|
__slots__ = ['true_value', 'false_value', 'mask', 'needs_current_value']
|
||||||
|
|
||||||
kind = KIND.toggle
|
kind = KIND.toggle
|
||||||
default_true = 0x01
|
default_true = 0x01
|
||||||
|
@ -181,72 +213,139 @@ class BooleanValidator(object):
|
||||||
# mask specifies all the affected bits in the value
|
# mask specifies all the affected bits in the value
|
||||||
default_mask = 0xFF
|
default_mask = 0xFF
|
||||||
|
|
||||||
def __init__(self, true_value=default_true, false_value=default_false, mask=default_mask, write_returns_value=False):
|
def __init__(self, true_value=default_true, false_value=default_false, mask=default_mask):
|
||||||
|
if isinstance(true_value, int):
|
||||||
|
assert isinstance(false_value, int)
|
||||||
|
if mask is None:
|
||||||
|
mask = self.default_mask
|
||||||
|
else:
|
||||||
|
assert isinstance(mask, int)
|
||||||
|
assert true_value & false_value == 0
|
||||||
|
assert true_value & mask == true_value
|
||||||
|
assert false_value & mask == false_value
|
||||||
|
self.needs_current_value = (mask != self.default_mask)
|
||||||
|
elif isinstance(true_value, bytes):
|
||||||
|
if false_value is None or false_value == self.default_false:
|
||||||
|
false_value = b'\x00' * len(true_value)
|
||||||
|
else:
|
||||||
|
assert isinstance(false_value, bytes)
|
||||||
|
if mask is None or mask == self.default_mask:
|
||||||
|
mask = b'\xFF' * len(true_value)
|
||||||
|
else:
|
||||||
|
assert isinstance(mask, bytes)
|
||||||
|
assert len(mask) == len(true_value) == len(false_value)
|
||||||
|
tv = _bytes2int(true_value)
|
||||||
|
fv = _bytes2int(false_value)
|
||||||
|
mv = _bytes2int(mask)
|
||||||
|
assert tv & fv == 0
|
||||||
|
assert tv & mv == tv
|
||||||
|
assert fv & mv == fv
|
||||||
|
self.needs_current_value = any(m != b'\xFF' for m in mask)
|
||||||
|
else:
|
||||||
|
raise Exception("invalid mask '%r', type %s" % (mask, type(mask)))
|
||||||
|
|
||||||
self.true_value = true_value
|
self.true_value = true_value
|
||||||
self.false_value = false_value
|
self.false_value = false_value
|
||||||
self.mask = mask
|
self.mask = mask
|
||||||
self.write_returns_value = write_returns_value
|
|
||||||
|
|
||||||
def _validate_value(self, reply_bytes, expected_value):
|
|
||||||
if isinstance(expected_value, int):
|
|
||||||
return ord(reply_bytes[:1]) & self.mask == expected_value
|
|
||||||
else:
|
|
||||||
for i in range(0, len(self.mask)):
|
|
||||||
masked_value = ord(reply_bytes[i:i+1]) & ord(self.mask[i:i+1])
|
|
||||||
if masked_value != ord(expected_value[i:i+1]):
|
|
||||||
return False
|
|
||||||
return True
|
|
||||||
|
|
||||||
def validate_read(self, reply_bytes):
|
def validate_read(self, reply_bytes):
|
||||||
return self._validate_value(reply_bytes, self.true_value)
|
if isinstance(self.mask, int):
|
||||||
|
reply_value = ord(reply_bytes[:1]) & self.mask
|
||||||
|
if _log.isEnabledFor(_DEBUG):
|
||||||
|
_log.debug("BooleanValidator: validate read %r => %02X", reply_bytes, reply_value)
|
||||||
|
if reply_value == self.true_value:
|
||||||
|
return True
|
||||||
|
if reply_value == self.false_value:
|
||||||
|
return False
|
||||||
|
_log.warn("BooleanValidator: reply %02X mismatched %02X/%02X/%02X",
|
||||||
|
reply_value, self.true_value, self.false_value, self.mask)
|
||||||
|
return False
|
||||||
|
|
||||||
def prepare_write(self, value):
|
count = len(self.mask)
|
||||||
# FIXME: this does not work right when there is more than one flag in
|
mask = _bytes2int(self.mask)
|
||||||
# the same register!
|
reply_value = _bytes2int(reply_bytes[:count]) & mask
|
||||||
return self.true_value if value else self.false_value
|
|
||||||
|
|
||||||
def validate_write(self, value, reply_bytes):
|
true_value = _bytes2int(self.true_value)
|
||||||
if self.write_returns_value:
|
if reply_value == true_value:
|
||||||
return self._validate_value(reply_bytes, self.true_value)
|
return True
|
||||||
|
|
||||||
# just assume the value was written correctly, otherwise there would not
|
false_value = _bytes2int(self.false_value)
|
||||||
# be any reply_bytes to check
|
if reply_value == false_value:
|
||||||
return bool(value)
|
return False
|
||||||
|
|
||||||
|
_log.warn("BooleanValidator: reply %r mismatched %r/%r/%r",
|
||||||
|
reply_bytes, self.true_value, self.false_value, self.mask)
|
||||||
|
return False
|
||||||
|
|
||||||
|
def prepare_write(self, new_value, current_value=None):
|
||||||
|
if new_value is None:
|
||||||
|
new_value = False
|
||||||
|
else:
|
||||||
|
assert isinstance(new_value, bool)
|
||||||
|
|
||||||
|
to_write = self.true_value if new_value else self.false_value
|
||||||
|
|
||||||
|
if isinstance(self.mask, int):
|
||||||
|
if current_value is not None and self.needs_current_value:
|
||||||
|
to_write |= ord(current_value[:1]) & (0xFF ^ self.mask)
|
||||||
|
if current_value is not None and to_write == ord(current_value[:1]):
|
||||||
|
return None
|
||||||
|
else:
|
||||||
|
to_write = list(to_write)
|
||||||
|
count = len(self.mask)
|
||||||
|
for i in range(0, count):
|
||||||
|
b = ord(to_write[i])
|
||||||
|
m = ord(self.mask[i : i + 1])
|
||||||
|
assert b & m == b
|
||||||
|
# b &= m
|
||||||
|
if current_value is not None and self.needs_current_value:
|
||||||
|
b |= ord(current_value[i : i + 1]) & (0xFF ^ m)
|
||||||
|
to_write[i] = chr(b)
|
||||||
|
to_write = b''.join(to_write)
|
||||||
|
|
||||||
|
if current_value is not None and to_write == current_value[:len(to_write)]:
|
||||||
|
return None
|
||||||
|
|
||||||
|
if _log.isEnabledFor(_DEBUG):
|
||||||
|
_log.debug("BooleanValidator: prepare_write(%s, %s) => %r", new_value, current_value, to_write)
|
||||||
|
|
||||||
|
return to_write
|
||||||
|
|
||||||
|
|
||||||
class ChoicesValidator(object):
|
class ChoicesValidator(object):
|
||||||
__slots__ = ['choices', 'write_returns_value']
|
__slots__ = ['choices', 'flag', '_bytes_count', 'needs_current_value']
|
||||||
|
|
||||||
kind = KIND.choice
|
kind = KIND.choice
|
||||||
|
|
||||||
def __init__(self, choices, write_returns_value=False):
|
def __init__(self, choices):
|
||||||
|
assert choices is not None
|
||||||
assert isinstance(choices, _NamedInts)
|
assert isinstance(choices, _NamedInts)
|
||||||
|
assert len(choices) > 2
|
||||||
self.choices = choices
|
self.choices = choices
|
||||||
self.write_returns_value = write_returns_value
|
self.needs_current_value = False
|
||||||
|
|
||||||
|
max_bits = max(x.bit_length() for x in choices)
|
||||||
|
self._bytes_count = (max_bits // 8) + (1 if max_bits % 8 else 0)
|
||||||
|
assert self._bytes_count < 8
|
||||||
|
|
||||||
def validate_read(self, reply_bytes):
|
def validate_read(self, reply_bytes):
|
||||||
assert self.choices is not None
|
reply_value = _bytes2int(reply_bytes[:self._bytes_count])
|
||||||
reply_value = ord(reply_bytes[:1])
|
|
||||||
valid_value = self.choices[reply_value]
|
valid_value = self.choices[reply_value]
|
||||||
assert valid_value is not None, "%: failed to validate read value %02X" % (self.__class__.__name__, reply_value)
|
assert valid_value is not None, "%s: failed to validate read value %02X" % (self.__class__.__name__, reply_value)
|
||||||
return valid_value
|
return valid_value
|
||||||
|
|
||||||
def prepare_write(self, value):
|
def prepare_write(self, new_value, current_value=None):
|
||||||
assert self.choices is not None
|
if new_value is None:
|
||||||
choice = self.choices[value]
|
choice = self.choices[:][0]
|
||||||
|
else:
|
||||||
|
if isinstance(new_value, int):
|
||||||
|
choice = self.choices[new_value]
|
||||||
|
elif new_value in self.choices:
|
||||||
|
choice = self.choices[new_value]
|
||||||
|
else:
|
||||||
|
raise ValueError(new_value)
|
||||||
|
|
||||||
if choice is None:
|
if choice is None:
|
||||||
raise ValueError("invalid choice " + repr(value))
|
raise ValueError("invalid choice %r" % new_value)
|
||||||
assert isinstance(choice, _NamedInt)
|
assert isinstance(choice, _NamedInt)
|
||||||
return choice.bytes(1)
|
return choice.bytes(self._bytes_count)
|
||||||
|
|
||||||
def validate_write(self, value, reply_bytes):
|
|
||||||
assert self.choices is not None
|
|
||||||
if self.write_returns_value:
|
|
||||||
reply_value = ord(reply_bytes[:1])
|
|
||||||
choice = self.choices[reply_value]
|
|
||||||
assert choice is not None, "failed to validate write reply %02X" % reply_value
|
|
||||||
return choice
|
|
||||||
|
|
||||||
# just assume the value was written correctly, otherwise there would not
|
|
||||||
# be any reply_bytes to check
|
|
||||||
return self.choices[value]
|
|
||||||
|
|
|
@ -41,30 +41,33 @@ _F = _hidpp20.FEATURE
|
||||||
#
|
#
|
||||||
|
|
||||||
def register_toggle(name, register,
|
def register_toggle(name, register,
|
||||||
true_value=_BooleanV.default_true, false_value=_BooleanV.default_false,
|
true_value=_BooleanV.default_true,
|
||||||
mask=_BooleanV.default_mask, write_returns_value=False,
|
false_value=_BooleanV.default_false,
|
||||||
|
mask=_BooleanV.default_mask,
|
||||||
label=None, description=None, device_kind=None):
|
label=None, description=None, device_kind=None):
|
||||||
|
validator = _BooleanV(true_value=true_value, false_value=false_value, mask=mask)
|
||||||
rw = _RegisterRW(register)
|
rw = _RegisterRW(register)
|
||||||
validator = _BooleanV(true_value=true_value, false_value=false_value, mask=mask, write_returns_value=write_returns_value)
|
|
||||||
return _Setting(name, rw, validator, label=label, description=description, device_kind=device_kind)
|
return _Setting(name, rw, validator, label=label, description=description, device_kind=device_kind)
|
||||||
|
|
||||||
|
|
||||||
def register_choices(name, register, choices,
|
def register_choices(name, register, choices,
|
||||||
kind=_KIND.choice, write_returns_value=False,
|
kind=_KIND.choice,
|
||||||
label=None, description=None, device_kind=None):
|
label=None, description=None, device_kind=None):
|
||||||
assert choices
|
assert choices
|
||||||
|
validator = _ChoicesV(choices)
|
||||||
rw = _RegisterRW(register)
|
rw = _RegisterRW(register)
|
||||||
validator = _ChoicesV(choices, write_returns_value=write_returns_value)
|
|
||||||
return _Setting(name, rw, validator, kind=kind, label=label, description=description, device_kind=device_kind)
|
return _Setting(name, rw, validator, kind=kind, label=label, description=description, device_kind=device_kind)
|
||||||
|
|
||||||
|
|
||||||
def feature_toggle(name, feature,
|
def feature_toggle(name, feature,
|
||||||
read_function_id=_FeatureRW.default_read_fnid, write_function_id=_FeatureRW.default_write_fnid,
|
read_function_id=_FeatureRW.default_read_fnid,
|
||||||
true_value=_BooleanV.default_true, false_value=_BooleanV.default_false,
|
write_function_id=_FeatureRW.default_write_fnid,
|
||||||
mask=_BooleanV.default_mask, write_returns_value=False,
|
true_value=_BooleanV.default_true,
|
||||||
|
false_value=_BooleanV.default_false,
|
||||||
|
mask=_BooleanV.default_mask,
|
||||||
label=None, description=None, device_kind=None):
|
label=None, description=None, device_kind=None):
|
||||||
|
validator = _BooleanV(true_value=true_value, false_value=false_value, mask=mask)
|
||||||
rw = _FeatureRW(feature, read_function_id, write_function_id)
|
rw = _FeatureRW(feature, read_function_id, write_function_id)
|
||||||
validator = _BooleanV(true_value=true_value, false_value=false_value, mask=mask, write_returns_value=write_returns_value)
|
|
||||||
return _Setting(name, rw, validator, label=label, description=description, device_kind=device_kind)
|
return _Setting(name, rw, validator, label=label, description=description, device_kind=device_kind)
|
||||||
|
|
||||||
#
|
#
|
||||||
|
@ -90,7 +93,7 @@ def _register_fn_swap(register=_R.keyboard_fn_swap, true_value=b'\x00\x01', mask
|
||||||
label=_FN_SWAP[1], description=_FN_SWAP[2],
|
label=_FN_SWAP[1], description=_FN_SWAP[2],
|
||||||
device_kind=_DK.keyboard)
|
device_kind=_DK.keyboard)
|
||||||
|
|
||||||
def _register_smooth_scroll(register=_R.mouse_smooth_scroll, true_value=0x40, mask=0x40):
|
def _register_smooth_scroll(register=_R.mouse_button_flags, true_value=0x40, mask=0x40):
|
||||||
return register_toggle(_SMOOTH_SCROLL[0], register, true_value=true_value, mask=mask,
|
return register_toggle(_SMOOTH_SCROLL[0], register, true_value=true_value, mask=mask,
|
||||||
label=_SMOOTH_SCROLL[1], description=_SMOOTH_SCROLL[2],
|
label=_SMOOTH_SCROLL[1], description=_SMOOTH_SCROLL[2],
|
||||||
device_kind=_DK.mouse)
|
device_kind=_DK.mouse)
|
||||||
|
@ -103,7 +106,6 @@ def _register_dpi(register=_R.mouse_dpi, choices=None):
|
||||||
|
|
||||||
def _feature_fn_swap():
|
def _feature_fn_swap():
|
||||||
return feature_toggle(_FN_SWAP[0], _F.FN_INVERSION,
|
return feature_toggle(_FN_SWAP[0], _F.FN_INVERSION,
|
||||||
write_returns_value=True,
|
|
||||||
label=_FN_SWAP[1], description=_FN_SWAP[2],
|
label=_FN_SWAP[1], description=_FN_SWAP[2],
|
||||||
device_kind=_DK.keyboard)
|
device_kind=_DK.keyboard)
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue