settings: use classes for settings
This commit is contained in:
parent
de5878d34e
commit
11fa025f1d
|
@ -18,11 +18,10 @@
|
|||
|
||||
from collections import namedtuple
|
||||
|
||||
from . import settings_templates as _ST
|
||||
from .common import NamedInts as _NamedInts
|
||||
from .hidpp10 import DEVICE_KIND as _DK
|
||||
from .hidpp10 import REGISTERS as _R
|
||||
from .settings_templates import FeatureSettings as _FS
|
||||
from .settings_templates import RegisterSettings as _RS
|
||||
|
||||
#
|
||||
#
|
||||
|
@ -66,13 +65,6 @@ def _D(
|
|||
assert codename is not None, 'descriptor for %s does not have codename set' % name
|
||||
|
||||
if protocol is not None:
|
||||
# ? 2.0 devices should not have any registers
|
||||
_kind = lambda s: s._rw.kind if hasattr(s, '_rw') else s._rw_kind
|
||||
if protocol < 2.0:
|
||||
assert settings is None or all(_kind(s) == 1 for s in settings)
|
||||
else:
|
||||
assert registers is None
|
||||
assert settings is None or all(_kind(s) == 2 for s in settings)
|
||||
|
||||
if wpid:
|
||||
for w in wpid if isinstance(wpid, tuple) else (wpid, ):
|
||||
|
@ -136,12 +128,6 @@ def get_btid(btid):
|
|||
return found
|
||||
|
||||
|
||||
#
|
||||
#
|
||||
#
|
||||
|
||||
_PERFORMANCE_MX_DPIS = _NamedInts.range(0x81, 0x8F, lambda x: str((x - 0x80) * 100))
|
||||
|
||||
#
|
||||
#
|
||||
#
|
||||
|
@ -201,7 +187,7 @@ _D('Wireless Keyboard EX100', codename='EX100', protocol=1.0, wpid='0065', regis
|
|||
_D('Wireless Keyboard MK300', protocol=1.0, wpid='0068', registers=(_R.battery_status, ))
|
||||
_D('Number Pad N545', protocol=1.0, wpid='2006', registers=(_R.battery_status, ))
|
||||
_D('Wireless Compact Keyboard K340', protocol=1.0, wpid='2007', registers=(_R.battery_status, ))
|
||||
_D('Wireless Keyboard MK700', protocol=1.0, wpid='2008', registers=(_R.battery_status, ), settings=[_RS.fn_swap()])
|
||||
_D('Wireless Keyboard MK700', protocol=1.0, wpid='2008', registers=(_R.battery_status, ), settings=[_ST.RegisterFnSwap])
|
||||
_D('Wireless Wave Keyboard K350', protocol=1.0, wpid='200A', registers=(_R.battery_status, ))
|
||||
_D('Wireless Keyboard MK320', protocol=1.0, wpid='200F', registers=(_R.battery_status, ))
|
||||
_D(
|
||||
|
@ -213,23 +199,23 @@ _D(
|
|||
_R.three_leds,
|
||||
),
|
||||
settings=[
|
||||
_RS.fn_swap(),
|
||||
_RS.hand_detection(),
|
||||
_ST.RegisterFnSwap,
|
||||
_ST.RegisterHandDetection,
|
||||
],
|
||||
)
|
||||
_D('Wireless Keyboard K520', protocol=1.0, wpid='2011', registers=(_R.battery_status, ), settings=[_RS.fn_swap()])
|
||||
_D('Wireless Solar Keyboard K750', protocol=2.0, wpid='4002', settings=[_FS.fn_swap()])
|
||||
_D('Wireless Keyboard K520', protocol=1.0, wpid='2011', registers=(_R.battery_status, ), settings=[_ST.RegisterFnSwap])
|
||||
_D('Wireless Solar Keyboard K750', protocol=2.0, wpid='4002', settings=[_ST.FnSwap])
|
||||
_D('Wireless Keyboard K270 (unifying)', protocol=2.0, wpid='4003')
|
||||
_D('Wireless Keyboard K360', protocol=2.0, wpid='4004', settings=[_FS.fn_swap()])
|
||||
_D('Wireless Keyboard K360', protocol=2.0, wpid='4004', settings=[_ST.FnSwap])
|
||||
_D('Wireless Keyboard K230', protocol=2.0, wpid='400D')
|
||||
_D('Wireless Touch Keyboard K400', protocol=2.0, wpid=('400E', '4024'), settings=[_FS.fn_swap()])
|
||||
_D('Wireless Keyboard MK270', protocol=2.0, wpid='4023', settings=[_FS.fn_swap()])
|
||||
_D('Illuminated Living-Room Keyboard K830', protocol=2.0, wpid='4032', settings=[_FS.new_fn_swap()])
|
||||
_D('Wireless Touch Keyboard K400', protocol=2.0, wpid=('400E', '4024'), settings=[_ST.FnSwap])
|
||||
_D('Wireless Keyboard MK270', protocol=2.0, wpid='4023', settings=[_ST.FnSwap])
|
||||
_D('Illuminated Living-Room Keyboard K830', protocol=2.0, wpid='4032', settings=[_ST.NewFnSwap])
|
||||
_D('Wireless Touch Keyboard K400 Plus', codename='K400 Plus', protocol=2.0, wpid='404D')
|
||||
_D('Wireless Multi-Device Keyboard K780', protocol=4.5, wpid='405B', settings=[_FS.new_fn_swap()])
|
||||
_D('Wireless Keyboard K375s', protocol=2.0, wpid='4061', settings=[_FS.k375s_fn_swap()])
|
||||
_D('Wireless Multi-Device Keyboard K780', protocol=4.5, wpid='405B', settings=[_ST.NewFnSwap])
|
||||
_D('Wireless Keyboard K375s', protocol=2.0, wpid='4061', settings=[_ST.K375sFnSwap])
|
||||
_D('Craft Advanced Keyboard', codename='Craft', protocol=4.5, wpid='4066', btid=0xB350)
|
||||
_D('Wireless Illuminated Keyboard K800 new', codename='K800 new', protocol=4.5, wpid='406E', settings=[_FS.fn_swap()])
|
||||
_D('Wireless Illuminated Keyboard K800 new', codename='K800 new', protocol=4.5, wpid='406E', settings=[_ST.FnSwap])
|
||||
_D('MX Keys Keyboard', codename='MX Keys', protocol=4.5, wpid='408A', btid=0xB35B)
|
||||
_D('G915 TKL LIGHTSPEED Wireless RGB Mechanical Gaming Keyboard', codename='G915 TKL', protocol=4.2, wpid='408E', usbid=0xC343)
|
||||
_D('G512 RGB Mechanical Gaming Keyboard', codename='G512', usbid=0xc33c, interface=1)
|
||||
|
@ -268,7 +254,7 @@ _D(
|
|||
protocol=1.0,
|
||||
wpid=('100B', '100F'),
|
||||
registers=(_R.battery_charge, ),
|
||||
settings=[_RS.smooth_scroll(), _RS.side_scroll()]
|
||||
settings=[_ST.RegisterSmoothScroll, _ST.RegisterSideScroll]
|
||||
)
|
||||
_D('V450 Nano Cordless Laser Mouse', codename='V450 Nano', protocol=1.0, wpid='1011', registers=(_R.battery_charge, ))
|
||||
_D(
|
||||
|
@ -278,8 +264,8 @@ _D(
|
|||
wpid='1013',
|
||||
registers=(_R.battery_charge, ),
|
||||
settings=[
|
||||
_RS.smooth_scroll(),
|
||||
_RS.side_scroll(),
|
||||
_ST.RegisterSmoothScroll,
|
||||
_ST.RegisterSideScroll,
|
||||
],
|
||||
)
|
||||
_D(
|
||||
|
@ -290,8 +276,8 @@ _D(
|
|||
wpid='1014',
|
||||
registers=(_R.battery_charge, ),
|
||||
settings=[
|
||||
_RS.smooth_scroll(),
|
||||
_RS.side_scroll(),
|
||||
_ST.RegisterSmoothScroll,
|
||||
_ST.RegisterSideScroll,
|
||||
],
|
||||
)
|
||||
_D(
|
||||
|
@ -301,10 +287,17 @@ _D(
|
|||
wpid='1017',
|
||||
registers=(_R.battery_charge, ),
|
||||
settings=[
|
||||
_RS.smooth_scroll(),
|
||||
_RS.side_scroll(),
|
||||
_ST.RegisterSmoothScroll,
|
||||
_ST.RegisterSideScroll,
|
||||
],
|
||||
)
|
||||
|
||||
|
||||
class _PerformanceMXDpi(_ST.RegisterDpi):
|
||||
choices_universe = _NamedInts.range(0x81, 0x8F, lambda x: str((x - 0x80) * 100))
|
||||
validator_options = {'choices': choices_universe}
|
||||
|
||||
|
||||
_D(
|
||||
'Performance Mouse MX',
|
||||
codename='Performance MX',
|
||||
|
@ -315,9 +308,9 @@ _D(
|
|||
_R.three_leds,
|
||||
),
|
||||
settings=[
|
||||
_RS.dpi(choices=_PERFORMANCE_MX_DPIS),
|
||||
_RS.smooth_scroll(),
|
||||
_RS.side_scroll(),
|
||||
_PerformanceMXDpi,
|
||||
_ST.RegisterSmoothScroll,
|
||||
_ST.RegisterSideScroll,
|
||||
],
|
||||
)
|
||||
_D(
|
||||
|
@ -327,8 +320,8 @@ _D(
|
|||
wpid='101B',
|
||||
registers=(_R.battery_charge, ),
|
||||
settings=[
|
||||
_RS.smooth_scroll(),
|
||||
_RS.side_scroll(),
|
||||
_ST.RegisterSmoothScroll,
|
||||
_ST.RegisterSideScroll,
|
||||
],
|
||||
)
|
||||
_D('Wireless Mouse M350', protocol=1.0, wpid='101C', registers=(_R.battery_charge, ))
|
||||
|
@ -339,11 +332,11 @@ _D(
|
|||
wpid='101D',
|
||||
registers=(_R.battery_charge, ),
|
||||
settings=[
|
||||
_RS.smooth_scroll(),
|
||||
_RS.side_scroll(),
|
||||
_ST.RegisterSmoothScroll,
|
||||
_ST.RegisterSideScroll,
|
||||
],
|
||||
)
|
||||
_D('Wireless Mouse M305', protocol=1.0, wpid='101F', registers=(_R.battery_status, ), settings=[_RS.side_scroll()])
|
||||
_D('Wireless Mouse M305', protocol=1.0, wpid='101F', registers=(_R.battery_status, ), settings=[_ST.RegisterSideScroll])
|
||||
_D('Wireless Mouse M215', protocol=1.0, wpid='1020')
|
||||
_D(
|
||||
'G700 Gaming Mouse',
|
||||
|
@ -357,12 +350,12 @@ _D(
|
|||
_R.three_leds,
|
||||
),
|
||||
settings=[
|
||||
_RS.smooth_scroll(),
|
||||
_RS.side_scroll(),
|
||||
_ST.RegisterSmoothScroll,
|
||||
_ST.RegisterSideScroll,
|
||||
],
|
||||
)
|
||||
_D('Wireless Mouse M310', protocol=1.0, wpid='1024', registers=(_R.battery_status, ))
|
||||
_D('Wireless Mouse M510', protocol=1.0, wpid='1025', registers=(_R.battery_status, ), settings=[_RS.side_scroll()])
|
||||
_D('Wireless Mouse M510', protocol=1.0, wpid='1025', registers=(_R.battery_status, ), settings=[_ST.RegisterSideScroll])
|
||||
_D('Fujitsu Sonic Mouse', codename='Sonic', protocol=1.0, wpid='1029')
|
||||
_D(
|
||||
'G700s Gaming Mouse',
|
||||
|
@ -376,14 +369,14 @@ _D(
|
|||
_R.three_leds,
|
||||
),
|
||||
settings=[
|
||||
_RS.smooth_scroll(),
|
||||
_RS.side_scroll(),
|
||||
_ST.RegisterSmoothScroll,
|
||||
_ST.RegisterSideScroll,
|
||||
],
|
||||
)
|
||||
|
||||
_D('Couch Mouse M515', protocol=2.0, wpid='4007')
|
||||
_D('Wireless Mouse M175', protocol=2.0, wpid='4008')
|
||||
_D('Wireless Mouse M325', protocol=2.0, wpid='400A', settings=[_FS.hi_res_scroll()])
|
||||
_D('Wireless Mouse M325', protocol=2.0, wpid='400A', settings=[_ST.HiResScroll])
|
||||
_D('Wireless Mouse M525', protocol=2.0, wpid='4013')
|
||||
_D('Wireless Mouse M345', protocol=2.0, wpid='4017')
|
||||
_D('Wireless Mouse M187', protocol=2.0, wpid='4019')
|
||||
|
@ -391,16 +384,16 @@ _D('Touch Mouse M600', protocol=2.0, wpid='401A')
|
|||
_D('Wireless Mouse M150', protocol=2.0, wpid='4022')
|
||||
_D('Wireless Mouse M185', protocol=2.0, wpid='4038')
|
||||
_D('Wireless Mouse MX Master', codename='MX Master', protocol=4.5, wpid='4041', btid=0xb012)
|
||||
_D('Anywhere Mouse MX 2', codename='Anywhere MX 2', protocol=4.5, wpid='404A', settings=[_FS.hires_smooth_invert()])
|
||||
_D('Wireless Mouse M510', protocol=2.0, wpid='4051', codename='M510v2', settings=[_FS.lowres_smooth_scroll()])
|
||||
_D('Anywhere Mouse MX 2', codename='Anywhere MX 2', protocol=4.5, wpid='404A', settings=[_ST.HiresSmoothInvert])
|
||||
_D('Wireless Mouse M510', protocol=2.0, wpid='4051', codename='M510v2', settings=[_ST.LowresSmoothScroll])
|
||||
_D(
|
||||
'Wireless Mouse M185 new',
|
||||
codename='M185n',
|
||||
protocol=4.5,
|
||||
wpid='4054',
|
||||
settings=[
|
||||
_FS.lowres_smooth_scroll(),
|
||||
_FS.pointer_speed(),
|
||||
_ST.LowresSmoothScroll,
|
||||
_ST.PointerSpeed,
|
||||
]
|
||||
)
|
||||
_D(
|
||||
|
@ -409,8 +402,8 @@ _D(
|
|||
protocol=4.5,
|
||||
wpid='4055',
|
||||
settings=[
|
||||
_FS.lowres_smooth_scroll(),
|
||||
_FS.pointer_speed(),
|
||||
_ST.LowresSmoothScroll,
|
||||
_ST.PointerSpeed,
|
||||
]
|
||||
)
|
||||
_D(
|
||||
|
@ -420,8 +413,7 @@ _D(
|
|||
wpid='4069',
|
||||
btid=0xb019,
|
||||
settings=[
|
||||
_FS.hires_smooth_invert(),
|
||||
_FS.gesture2_gestures(),
|
||||
_ST.HiresSmoothInvert,
|
||||
],
|
||||
)
|
||||
_D(
|
||||
|
@ -430,8 +422,8 @@ _D(
|
|||
protocol=4.5,
|
||||
wpid='406B',
|
||||
settings=[
|
||||
_FS.lowres_smooth_scroll(),
|
||||
_FS.pointer_speed(),
|
||||
_ST.LowresSmoothScroll,
|
||||
_ST.PointerSpeed,
|
||||
],
|
||||
)
|
||||
_D(
|
||||
|
@ -440,8 +432,8 @@ _D(
|
|||
protocol=4.5,
|
||||
wpid='406D',
|
||||
settings=[
|
||||
_FS.hires_smooth_invert(),
|
||||
_FS.pointer_speed(),
|
||||
_ST.HiresSmoothInvert,
|
||||
_ST.PointerSpeed,
|
||||
]
|
||||
)
|
||||
_D('MX Vertical Wireless Mouse', codename='MX Vertical', protocol=4.5, wpid='407B', btid=0xb020, usbid=0xc08a)
|
||||
|
|
|
@ -306,9 +306,9 @@ class Device:
|
|||
if not self._settings:
|
||||
self._settings = []
|
||||
if self.persister and self.descriptor and self.descriptor.settings:
|
||||
for s in self.descriptor.settings:
|
||||
for sclass in self.descriptor.settings:
|
||||
try:
|
||||
setting = s(self)
|
||||
setting = sclass.build(self)
|
||||
except Exception as e: # Do nothing if the device is offline
|
||||
setting = None
|
||||
if self.online:
|
||||
|
|
|
@ -18,7 +18,6 @@
|
|||
|
||||
import math
|
||||
|
||||
from copy import copy as _copy
|
||||
from logging import DEBUG as _DEBUG
|
||||
from logging import WARNING as _WARNING
|
||||
from logging import getLogger
|
||||
|
@ -58,51 +57,170 @@ def bool_or_toggle(current, new):
|
|||
return None
|
||||
|
||||
|
||||
class Setting:
|
||||
"""A setting descriptor.
|
||||
Needs to be instantiated for each specific device."""
|
||||
__slots__ = (
|
||||
'name', 'label', 'description', 'kind', 'device_kind', 'feature', 'persist', '_rw', '_validator', '_callback',
|
||||
'_device', '_value'
|
||||
# moved first for dependency reasons
|
||||
class Validator:
|
||||
@classmethod
|
||||
def build(cls, setting_class, device, **kwargs):
|
||||
return cls(**kwargs)
|
||||
|
||||
|
||||
class BooleanValidator(Validator):
|
||||
__slots__ = ('true_value', 'false_value', 'read_skip_byte_count', 'write_prefix_bytes', 'mask', 'needs_current_value')
|
||||
|
||||
kind = KIND.toggle
|
||||
default_true = 0x01
|
||||
default_false = 0x00
|
||||
# mask specifies all the affected bits in the value
|
||||
default_mask = 0xFF
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
true_value=default_true,
|
||||
false_value=default_false,
|
||||
mask=default_mask,
|
||||
read_skip_byte_count=0,
|
||||
write_prefix_bytes=b''
|
||||
):
|
||||
if isinstance(true_value, int):
|
||||
assert isinstance(false_value, int)
|
||||
if mask is None:
|
||||
mask = self.default_mask
|
||||
else:
|
||||
assert isinstance(mask, int)
|
||||
assert true_value & false_value == 0
|
||||
assert true_value & mask == true_value
|
||||
assert false_value & mask == false_value
|
||||
self.needs_current_value = (mask != self.default_mask)
|
||||
elif isinstance(true_value, bytes):
|
||||
if false_value is None or false_value == self.default_false:
|
||||
false_value = b'\x00' * len(true_value)
|
||||
else:
|
||||
assert isinstance(false_value, bytes)
|
||||
if mask is None or mask == self.default_mask:
|
||||
mask = b'\xFF' * len(true_value)
|
||||
else:
|
||||
assert isinstance(mask, bytes)
|
||||
assert len(mask) == len(true_value) == len(false_value)
|
||||
tv = _bytes2int(true_value)
|
||||
fv = _bytes2int(false_value)
|
||||
mv = _bytes2int(mask)
|
||||
assert tv != fv # true and false might be something other than bit values
|
||||
assert tv & mv == tv
|
||||
assert fv & mv == fv
|
||||
self.needs_current_value = any(m != b'\xFF' for m in mask)
|
||||
else:
|
||||
raise Exception("invalid mask '%r', type %s" % (mask, type(mask)))
|
||||
|
||||
self.true_value = true_value
|
||||
self.false_value = false_value
|
||||
self.mask = mask
|
||||
self.read_skip_byte_count = read_skip_byte_count
|
||||
self.write_prefix_bytes = write_prefix_bytes
|
||||
|
||||
def validate_read(self, reply_bytes):
|
||||
reply_bytes = reply_bytes[self.read_skip_byte_count:]
|
||||
if isinstance(self.mask, int):
|
||||
reply_value = ord(reply_bytes[:1]) & self.mask
|
||||
if _log.isEnabledFor(_DEBUG):
|
||||
_log.debug('BooleanValidator: validate read %r => %02X', reply_bytes, reply_value)
|
||||
if reply_value == self.true_value:
|
||||
return True
|
||||
if reply_value == self.false_value:
|
||||
return False
|
||||
_log.warn(
|
||||
'BooleanValidator: reply %02X mismatched %02X/%02X/%02X', reply_value, self.true_value, self.false_value,
|
||||
self.mask
|
||||
)
|
||||
return False
|
||||
|
||||
def __init__(self, name, rw, validator=None, callback=None, kind=None, device_kind=None, feature=None, persist=True):
|
||||
assert name
|
||||
self.name = name[0]
|
||||
self.label = name[1]
|
||||
self.description = name[2]
|
||||
self.device_kind = device_kind
|
||||
self.feature = getattr(rw, 'feature', None)
|
||||
self.persist = persist
|
||||
self._rw = rw
|
||||
assert (validator and not callback) or (not validator and callback)
|
||||
self._validator = validator
|
||||
self._callback = callback
|
||||
count = len(self.mask)
|
||||
mask = _bytes2int(self.mask)
|
||||
reply_value = _bytes2int(reply_bytes[:count]) & mask
|
||||
|
||||
assert kind is None or validator is None or kind & validator.kind != 0
|
||||
self.kind = kind or getattr(validator, 'kind', None)
|
||||
true_value = _bytes2int(self.true_value)
|
||||
if reply_value == true_value:
|
||||
return True
|
||||
|
||||
def __call__(self, device):
|
||||
assert not hasattr(self, '_value')
|
||||
# combined keyboards and touchpads (e.g., K400) break this assertion so don't use it
|
||||
# assert self.device_kind is None or device.kind in self.device_kind
|
||||
p = device.protocol
|
||||
if p == 1.0:
|
||||
# HID++ 1.0 devices do not support features
|
||||
assert self._rw.kind == RegisterRW.kind
|
||||
elif p >= 2.0:
|
||||
# HID++ 2.0 devices do not support registers
|
||||
assert self._rw.kind == FeatureRW.kind
|
||||
o = _copy(self)
|
||||
if o._callback:
|
||||
o._validator = o._callback(device)
|
||||
if o._validator is None:
|
||||
false_value = _bytes2int(self.false_value)
|
||||
if reply_value == false_value:
|
||||
return False
|
||||
|
||||
_log.warn('BooleanValidator: reply %r mismatched %r/%r/%r', reply_bytes, self.true_value, self.false_value, self.mask)
|
||||
return False
|
||||
|
||||
def prepare_write(self, new_value, current_value=None):
|
||||
if new_value is None:
|
||||
new_value = False
|
||||
else:
|
||||
assert isinstance(new_value, bool), 'New value %s for boolean setting is not a boolean' % new_value
|
||||
|
||||
to_write = self.true_value if new_value else self.false_value
|
||||
|
||||
if isinstance(self.mask, int):
|
||||
if current_value is not None and self.needs_current_value:
|
||||
to_write |= ord(current_value[:1]) & (0xFF ^ self.mask)
|
||||
if current_value is not None and to_write == ord(current_value[:1]):
|
||||
return None
|
||||
assert o.kind is None or o.kind & o._validator.kind != 0
|
||||
o.kind = o.kind or o._validator.kind
|
||||
o._value = None
|
||||
o._device = device
|
||||
return o
|
||||
to_write = bytes([to_write])
|
||||
else:
|
||||
to_write = bytearray(to_write)
|
||||
count = len(self.mask)
|
||||
for i in range(0, count):
|
||||
b = ord(to_write[i:i + 1])
|
||||
m = ord(self.mask[i:i + 1])
|
||||
assert b & m == b
|
||||
# b &= m
|
||||
if current_value is not None and self.needs_current_value:
|
||||
b |= ord(current_value[i:i + 1]) & (0xFF ^ m)
|
||||
to_write[i] = b
|
||||
to_write = bytes(to_write)
|
||||
|
||||
if current_value is not None and to_write == current_value[:len(to_write)]:
|
||||
return None
|
||||
|
||||
if _log.isEnabledFor(_DEBUG):
|
||||
_log.debug('BooleanValidator: prepare_write(%s, %s) => %r', new_value, current_value, to_write)
|
||||
|
||||
return self.write_prefix_bytes + to_write
|
||||
|
||||
def acceptable(self, args, current):
|
||||
if len(args) != 1:
|
||||
return None
|
||||
val = bool_or_toggle(current, args[0])
|
||||
return [val] if val is not None else None
|
||||
|
||||
|
||||
class Setting:
|
||||
"""A setting descriptor. Needs to be instantiated for each specific device."""
|
||||
name = label = description = ''
|
||||
feature = register = kind = None
|
||||
persist = True
|
||||
rw_options = {}
|
||||
validator_class = BooleanValidator
|
||||
validator_options = {}
|
||||
|
||||
def __init__(self, device, rw, validator):
|
||||
self._device = device
|
||||
self._rw = rw
|
||||
self._validator = validator
|
||||
self.kind = getattr(self._validator, 'kind', None)
|
||||
self._value = None
|
||||
|
||||
@classmethod
|
||||
def build(cls, device):
|
||||
assert cls.feature or cls.register, 'Settings require either a feature or a register'
|
||||
rw_class = cls.rw_class if hasattr(cls, 'rw_class') else FeatureRW if cls.feature else RegisterRW
|
||||
rw = rw_class(cls.feature if cls.feature else cls.register, **cls.rw_options)
|
||||
p = device.protocol
|
||||
if p == 1.0: # HID++ 1.0 devices do not support features
|
||||
assert rw.kind == RegisterRW.kind
|
||||
elif p >= 2.0: # HID++ 2.0 devices do not support registers
|
||||
assert rw.kind == FeatureRW.kind
|
||||
validator_class = cls.validator_class
|
||||
validator = validator_class.build(cls, device, **cls.validator_options)
|
||||
if validator:
|
||||
assert cls.kind is None or cls.kind & validator.kind != 0
|
||||
return cls(device, rw, validator)
|
||||
|
||||
@property
|
||||
def choices(self):
|
||||
|
@ -173,7 +291,7 @@ class Setting:
|
|||
|
||||
current_value = None
|
||||
if self._validator.needs_current_value:
|
||||
# the validator needs the current value, possibly to merge flag values
|
||||
# the _validator needs the current value, possibly to merge flag values
|
||||
current_value = self._rw.read(self._device)
|
||||
|
||||
data_bytes = self._validator.prepare_write(value, current_value)
|
||||
|
@ -614,140 +732,7 @@ class FeatureRWMap(FeatureRW):
|
|||
return reply if not self.no_reply else True
|
||||
|
||||
|
||||
#
|
||||
# value validators
|
||||
# handle the conversion from read bytes, to setting value, and back
|
||||
#
|
||||
|
||||
|
||||
class BooleanValidator:
|
||||
__slots__ = ('true_value', 'false_value', 'read_skip_byte_count', 'write_prefix_bytes', 'mask', 'needs_current_value')
|
||||
|
||||
kind = KIND.toggle
|
||||
default_true = 0x01
|
||||
default_false = 0x00
|
||||
# mask specifies all the affected bits in the value
|
||||
default_mask = 0xFF
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
true_value=default_true,
|
||||
false_value=default_false,
|
||||
mask=default_mask,
|
||||
read_skip_byte_count=0,
|
||||
write_prefix_bytes=b''
|
||||
):
|
||||
if isinstance(true_value, int):
|
||||
assert isinstance(false_value, int)
|
||||
if mask is None:
|
||||
mask = self.default_mask
|
||||
else:
|
||||
assert isinstance(mask, int)
|
||||
assert true_value & false_value == 0
|
||||
assert true_value & mask == true_value
|
||||
assert false_value & mask == false_value
|
||||
self.needs_current_value = (mask != self.default_mask)
|
||||
elif isinstance(true_value, bytes):
|
||||
if false_value is None or false_value == self.default_false:
|
||||
false_value = b'\x00' * len(true_value)
|
||||
else:
|
||||
assert isinstance(false_value, bytes)
|
||||
if mask is None or mask == self.default_mask:
|
||||
mask = b'\xFF' * len(true_value)
|
||||
else:
|
||||
assert isinstance(mask, bytes)
|
||||
assert len(mask) == len(true_value) == len(false_value)
|
||||
tv = _bytes2int(true_value)
|
||||
fv = _bytes2int(false_value)
|
||||
mv = _bytes2int(mask)
|
||||
assert tv != fv # true and false might be something other than bit values
|
||||
assert tv & mv == tv
|
||||
assert fv & mv == fv
|
||||
self.needs_current_value = any(m != b'\xFF' for m in mask)
|
||||
else:
|
||||
raise Exception("invalid mask '%r', type %s" % (mask, type(mask)))
|
||||
|
||||
self.true_value = true_value
|
||||
self.false_value = false_value
|
||||
self.mask = mask
|
||||
self.read_skip_byte_count = read_skip_byte_count
|
||||
self.write_prefix_bytes = write_prefix_bytes
|
||||
|
||||
def validate_read(self, reply_bytes):
|
||||
reply_bytes = reply_bytes[self.read_skip_byte_count:]
|
||||
if isinstance(self.mask, int):
|
||||
reply_value = ord(reply_bytes[:1]) & self.mask
|
||||
if _log.isEnabledFor(_DEBUG):
|
||||
_log.debug('BooleanValidator: validate read %r => %02X', reply_bytes, reply_value)
|
||||
if reply_value == self.true_value:
|
||||
return True
|
||||
if reply_value == self.false_value:
|
||||
return False
|
||||
_log.warn(
|
||||
'BooleanValidator: reply %02X mismatched %02X/%02X/%02X', reply_value, self.true_value, self.false_value,
|
||||
self.mask
|
||||
)
|
||||
return False
|
||||
|
||||
count = len(self.mask)
|
||||
mask = _bytes2int(self.mask)
|
||||
reply_value = _bytes2int(reply_bytes[:count]) & mask
|
||||
|
||||
true_value = _bytes2int(self.true_value)
|
||||
if reply_value == true_value:
|
||||
return True
|
||||
|
||||
false_value = _bytes2int(self.false_value)
|
||||
if reply_value == false_value:
|
||||
return False
|
||||
|
||||
_log.warn('BooleanValidator: reply %r mismatched %r/%r/%r', reply_bytes, self.true_value, self.false_value, self.mask)
|
||||
return False
|
||||
|
||||
def prepare_write(self, new_value, current_value=None):
|
||||
if new_value is None:
|
||||
new_value = False
|
||||
else:
|
||||
assert isinstance(new_value, bool), 'New value %s for boolean setting is not a boolean' % new_value
|
||||
|
||||
to_write = self.true_value if new_value else self.false_value
|
||||
|
||||
if isinstance(self.mask, int):
|
||||
if current_value is not None and self.needs_current_value:
|
||||
to_write |= ord(current_value[:1]) & (0xFF ^ self.mask)
|
||||
if current_value is not None and to_write == ord(current_value[:1]):
|
||||
return None
|
||||
to_write = bytes([to_write])
|
||||
else:
|
||||
to_write = bytearray(to_write)
|
||||
count = len(self.mask)
|
||||
for i in range(0, count):
|
||||
b = ord(to_write[i:i + 1])
|
||||
m = ord(self.mask[i:i + 1])
|
||||
assert b & m == b
|
||||
# b &= m
|
||||
if current_value is not None and self.needs_current_value:
|
||||
b |= ord(current_value[i:i + 1]) & (0xFF ^ m)
|
||||
to_write[i] = b
|
||||
to_write = bytes(to_write)
|
||||
|
||||
if current_value is not None and to_write == current_value[:len(to_write)]:
|
||||
return None
|
||||
|
||||
if _log.isEnabledFor(_DEBUG):
|
||||
_log.debug('BooleanValidator: prepare_write(%s, %s) => %r', new_value, current_value, to_write)
|
||||
|
||||
return self.write_prefix_bytes + to_write
|
||||
|
||||
def acceptable(self, args, current):
|
||||
if len(args) != 1:
|
||||
return None
|
||||
|
||||
val = bool_or_toggle(current, args[0])
|
||||
return [val] if val is not None else None
|
||||
|
||||
|
||||
class BitFieldValidator:
|
||||
class BitFieldValidator(Validator):
|
||||
__slots__ = ('byte_count', 'options')
|
||||
|
||||
kind = KIND.multiple_toggle
|
||||
|
@ -791,7 +776,7 @@ class BitFieldValidator:
|
|||
return None if val is None else [str(int(key)), val]
|
||||
|
||||
|
||||
class BitFieldWithOffsetAndMaskValidator:
|
||||
class BitFieldWithOffsetAndMaskValidator(Validator):
|
||||
__slots__ = ('byte_count', 'options', '_option_from_key', '_mask_from_offset', '_option_from_offset_mask')
|
||||
|
||||
kind = KIND.multiple_toggle
|
||||
|
@ -882,13 +867,16 @@ class BitFieldWithOffsetAndMaskValidator:
|
|||
return None if val is None else [str(key), val]
|
||||
|
||||
|
||||
class ChoicesValidator:
|
||||
kind = KIND.choice
|
||||
class ChoicesValidator(Validator):
|
||||
"""Translates between NamedInts and a byte sequence.
|
||||
:param choices: a list of NamedInts
|
||||
:param byte_count: the size of the derived byte sequence. If None, it
|
||||
will be calculated from the choices."""
|
||||
def __init__(self, choices, byte_count=None, read_skip_byte_count=0, write_prefix_bytes=b''):
|
||||
kind = KIND.choice
|
||||
choices_universe = None # the possible choices, or an empty sequence for anything
|
||||
choices_extra = None # an extra choice, so as not to require extending a large NamedInts
|
||||
|
||||
def __init__(self, choices=None, byte_count=None, read_skip_byte_count=0, write_prefix_bytes=b''):
|
||||
assert choices is not None
|
||||
assert isinstance(choices, _NamedInts)
|
||||
assert len(choices) > 1
|
||||
|
@ -943,6 +931,8 @@ class ChoicesValidator:
|
|||
|
||||
class ChoicesMapValidator(ChoicesValidator):
|
||||
kind = KIND.map_choice
|
||||
keys_universe = None # the possible keys, or an empty sequence for anything
|
||||
choices_universe = None # the possible choices, or an empty sequence for anything
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
|
@ -1013,16 +1003,23 @@ class ChoicesMapValidator(ChoicesValidator):
|
|||
return [str(int(key)), int(choice)] if choice is not None else None
|
||||
|
||||
|
||||
class RangeValidator:
|
||||
__slots__ = ('min_value', 'max_value', 'flag', '_byte_count', 'needs_current_value')
|
||||
|
||||
class RangeValidator(Validator):
|
||||
kind = KIND.range
|
||||
"""Translates between integers and a byte sequence.
|
||||
:param min_value: minimum accepted value (inclusive)
|
||||
:param max_value: maximum accepted value (inclusive)
|
||||
:param byte_count: the size of the derived byte sequence. If None, it
|
||||
will be calculated from the range."""
|
||||
def __init__(self, min_value, max_value, byte_count=None):
|
||||
min_value = 0
|
||||
max_value = 255
|
||||
|
||||
@classmethod
|
||||
def build(cls, setting_class, device, **kwargs):
|
||||
kwargs['min_value'] = setting_class.min_value
|
||||
kwargs['max_value'] = setting_class.max_value
|
||||
return cls(**kwargs)
|
||||
|
||||
def __init__(self, min_value=0, max_value=255, byte_count=1):
|
||||
assert max_value > min_value
|
||||
self.min_value = min_value
|
||||
self.max_value = max_value
|
||||
|
@ -1051,7 +1048,7 @@ class RangeValidator:
|
|||
return None if len(args) != 1 or type(arg) != int or arg < self.min_value or arg > self.max_value else args
|
||||
|
||||
|
||||
class MultipleRangeValidator:
|
||||
class MultipleRangeValidator(Validator):
|
||||
|
||||
kind = KIND.multiple_range
|
||||
|
||||
|
@ -1126,7 +1123,8 @@ class MultipleRangeValidator:
|
|||
|
||||
class ActionSettingRW:
|
||||
"""Special RW class for settings that turn on and off special processing when a key or button is depressed"""
|
||||
def __init__(self, name, divert_setting_name):
|
||||
def __init__(self, feature, name='', divert_setting_name='divert-keys'):
|
||||
self.feature = feature # not used?
|
||||
self.name = name
|
||||
self.divert_setting_name = divert_setting_name
|
||||
self.kind = FeatureRW.kind # pretend to be FeatureRW as required for HID++ 2.0 devices
|
||||
|
|
File diff suppressed because it is too large
Load Diff
Loading…
Reference in New Issue