820 lines
31 KiB
Python
820 lines
31 KiB
Python
# -*- python-mode -*-
|
|
# -*- coding: UTF-8 -*-
|
|
|
|
## Copyright (C) 2012-2013 Daniel Pavel
|
|
##
|
|
## This program is free software; you can redistribute it and/or modify
|
|
## it under the terms of the GNU General Public License as published by
|
|
## the Free Software Foundation; either version 2 of the License, or
|
|
## (at your option) any later version.
|
|
##
|
|
## This program is distributed in the hope that it will be useful,
|
|
## but WITHOUT ANY WARRANTY; without even the implied warranty of
|
|
## MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
|
## GNU General Public License for more details.
|
|
##
|
|
## You should have received a copy of the GNU General Public License along
|
|
## with this program; if not, write to the Free Software Foundation, Inc.,
|
|
## 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
|
|
|
|
from __future__ import absolute_import, division, print_function, unicode_literals
|
|
|
|
import math
|
|
|
|
from copy import copy as _copy
|
|
from logging import DEBUG as _DEBUG
|
|
from logging import getLogger
|
|
|
|
from .common import NamedInt as _NamedInt
|
|
from .common import NamedInts as _NamedInts
|
|
from .common import bytes2int as _bytes2int
|
|
from .common import int2bytes as _int2bytes
|
|
|
|
_log = getLogger(__name__)
|
|
del getLogger
|
|
|
|
#
|
|
#
|
|
#
|
|
|
|
KIND = _NamedInts(toggle=0x01,
|
|
choice=0x02,
|
|
range=0x04,
|
|
map_choice=0x0A,
|
|
multiple_toggle=0x10)
|
|
|
|
|
|
class Setting(object):
|
|
"""A setting descriptor.
|
|
Needs to be instantiated for each specific device."""
|
|
__slots__ = ('name', 'label', 'description', 'kind', 'device_kind',
|
|
'feature', '_rw', '_validator', '_device', '_value')
|
|
|
|
def __init__(self,
|
|
name,
|
|
rw,
|
|
validator,
|
|
kind=None,
|
|
label=None,
|
|
description=None,
|
|
device_kind=None,
|
|
feature=None):
|
|
assert name
|
|
self.name = name
|
|
self.label = label or name
|
|
self.description = description
|
|
self.device_kind = device_kind
|
|
self.feature = feature
|
|
|
|
self._rw = rw
|
|
self._validator = validator
|
|
|
|
assert kind is None or kind & validator.kind != 0
|
|
self.kind = kind or validator.kind
|
|
|
|
def __call__(self, device):
|
|
assert not hasattr(self, '_value')
|
|
# combined keyboards and touchpads (e.g., K400) break this assertion so don't use it
|
|
# assert self.device_kind is None or device.kind in self.device_kind
|
|
p = device.protocol
|
|
if p == 1.0:
|
|
# HID++ 1.0 devices do not support features
|
|
assert self._rw.kind == RegisterRW.kind
|
|
elif p >= 2.0:
|
|
# HID++ 2.0 devices do not support registers
|
|
assert self._rw.kind == FeatureRW.kind
|
|
|
|
o = _copy(self)
|
|
o._value = None
|
|
o._device = device
|
|
return o
|
|
|
|
@property
|
|
def choices(self):
|
|
assert hasattr(self, '_value')
|
|
assert hasattr(self, '_device')
|
|
|
|
return self._validator.choices if self._validator.kind & KIND.choice else None
|
|
|
|
@property
|
|
def range(self):
|
|
assert hasattr(self, '_value')
|
|
assert hasattr(self, '_device')
|
|
|
|
if self._validator.kind == KIND.range:
|
|
return (self._validator.min_value, self._validator.max_value)
|
|
|
|
def read(self, cached=True):
|
|
assert hasattr(self, '_value')
|
|
assert hasattr(self, '_device')
|
|
|
|
if _log.isEnabledFor(_DEBUG):
|
|
_log.debug('%s: settings read %r from %s', self.name, self._value,
|
|
self._device)
|
|
|
|
if self._value is None and self._device.persister:
|
|
# We haven't read a value from the device yet,
|
|
# maybe we have something in the configuration.
|
|
self._value = self._device.persister.get(self.name)
|
|
|
|
if cached and self._value is not None:
|
|
if self._device.persister and self.name not in self._device.persister:
|
|
# If this is a new device (or a new setting for an old device),
|
|
# make sure to save its current value for the next time.
|
|
self._device.persister[self.name] = self._value
|
|
return self._value
|
|
|
|
if self._device.online:
|
|
reply = self._rw.read(self._device)
|
|
if reply:
|
|
self._value = self._validator.validate_read(reply)
|
|
if self._device.persister and self.name not in self._device.persister:
|
|
# Don't update the persister if it already has a value,
|
|
# otherwise the first read might overwrite the value we wanted.
|
|
self._device.persister[self.name] = self._value
|
|
return self._value
|
|
|
|
def write(self, value):
|
|
assert hasattr(self, '_value')
|
|
assert hasattr(self, '_device')
|
|
assert value is not None
|
|
|
|
if _log.isEnabledFor(_DEBUG):
|
|
_log.debug('%s: settings write %r to %s', self.name, value,
|
|
self._device)
|
|
|
|
if self._device.online:
|
|
# Remember the value we're trying to set, even if the write fails.
|
|
# This way even if the device is offline or some other error occurs,
|
|
# the last value we've tried to write is remembered in the configuration.
|
|
self._value = value
|
|
if self._device.persister:
|
|
self._device.persister[self.name] = value
|
|
|
|
current_value = None
|
|
if self._validator.needs_current_value:
|
|
# the validator needs the current value, possibly to merge flag values
|
|
current_value = self._rw.read(self._device)
|
|
|
|
data_bytes = self._validator.prepare_write(value, current_value)
|
|
if data_bytes is not None:
|
|
if _log.isEnabledFor(_DEBUG):
|
|
_log.debug('%s: settings prepare write(%s) => %r',
|
|
self.name, value, data_bytes)
|
|
|
|
reply = self._rw.write(self._device, data_bytes)
|
|
if not reply:
|
|
# tell whomever is calling that the write failed
|
|
return None
|
|
|
|
return value
|
|
|
|
def apply(self):
|
|
assert hasattr(self, '_value')
|
|
assert hasattr(self, '_device')
|
|
|
|
if _log.isEnabledFor(_DEBUG):
|
|
_log.debug('%s: apply %s (%s)', self.name, self._value,
|
|
self._device)
|
|
|
|
value = self.read()
|
|
if value is not None:
|
|
self.write(value)
|
|
|
|
def __str__(self):
|
|
if hasattr(self, '_value'):
|
|
assert hasattr(self, '_device')
|
|
return '<Setting([%s:%s] %s:%s=%s)>' % (
|
|
self._rw.kind, self._validator.kind, self._device.codename,
|
|
self.name, self._value)
|
|
return '<Setting([%s:%s] %s)>' % (self._rw.kind, self._validator.kind,
|
|
self.name)
|
|
|
|
__unicode__ = __repr__ = __str__
|
|
|
|
|
|
class Settings(Setting):
|
|
"""A setting descriptor for multiple choices, being a map from keys to values.
|
|
Needs to be instantiated for each specific device."""
|
|
def read(self, cached=True):
|
|
assert hasattr(self, '_value')
|
|
assert hasattr(self, '_device')
|
|
|
|
if _log.isEnabledFor(_DEBUG):
|
|
_log.debug('%s: settings read %r from %s', self.name, self._value,
|
|
self._device)
|
|
|
|
if self._value is None and getattr(self._device, 'persister', None):
|
|
# We haven't read a value from the device yet,
|
|
# maybe we have something in the configuration.
|
|
self._value = self._device.persister.get(self.name)
|
|
|
|
if cached and self._value is not None:
|
|
if getattr(self._device, 'persister',
|
|
None) and self.name not in self._device.persister:
|
|
# If this is a new device (or a new setting for an old device),
|
|
# make sure to save its current value for the next time.
|
|
self._device.persister[self.name] = self._value
|
|
return self._value
|
|
|
|
if self._device.online:
|
|
reply_map = {}
|
|
for key in self._validator.choices:
|
|
reply = self._rw.read(self._device, key)
|
|
if reply:
|
|
# keys are ints, because that is what the device uses,
|
|
# encoded into strings because JSON requires strings as keys
|
|
reply_map[str(int(key))] = self._validator.validate_read(
|
|
reply, key)
|
|
self._value = reply_map
|
|
if getattr(self._device, 'persister',
|
|
None) and self.name not in self._device.persister:
|
|
# Don't update the persister if it already has a value,
|
|
# otherwise the first read might overwrite the value we wanted.
|
|
self._device.persister[self.name] = self._value
|
|
return self._value
|
|
|
|
def read_key(self, key, cached=True):
|
|
assert hasattr(self, '_value')
|
|
assert hasattr(self, '_device')
|
|
assert key is not None
|
|
|
|
if _log.isEnabledFor(_DEBUG):
|
|
_log.debug('%s: settings read %r key %r from %s', self.name,
|
|
self._value, key, self._device)
|
|
|
|
if self._value is None and getattr(self._device, 'persister', None):
|
|
self._value = self._device.persister.get(self.name)
|
|
|
|
if cached and self._value is not None:
|
|
if getattr(self._device, 'persister',
|
|
None) and self.name not in self._device.persister:
|
|
self._device.persister[self.name] = self._value
|
|
return self._value[str(int(key))]
|
|
|
|
if self._device.online:
|
|
reply = self._rw.read(self._device, key)
|
|
if reply:
|
|
self._value[str(int(key))] = self._validator.validate_read(
|
|
reply, key)
|
|
if getattr(self._device, 'persister',
|
|
None) and self.name not in self._device.persister:
|
|
self._device.persister[self.name] = self._value
|
|
return self._value[str(int(key))]
|
|
|
|
def write(self, map):
|
|
assert hasattr(self, '_value')
|
|
assert hasattr(self, '_device')
|
|
assert map is not None
|
|
|
|
if _log.isEnabledFor(_DEBUG):
|
|
_log.debug('%s: settings write %r to %s', self.name, map,
|
|
self._device)
|
|
|
|
if self._device.online:
|
|
# Remember the value we're trying to set, even if the write fails.
|
|
# This way even if the device is offline or some other error occurs,
|
|
# the last value we've tried to write is remembered in the configuration.
|
|
self._value = map
|
|
if self._device.persister:
|
|
self._device.persister[self.name] = map
|
|
|
|
for key, value in map.items():
|
|
data_bytes = self._validator.prepare_write(int(key), value)
|
|
if data_bytes is not None:
|
|
if _log.isEnabledFor(_DEBUG):
|
|
_log.debug(
|
|
'%s: settings prepare map write(%s,%s) => %r',
|
|
self.name, key, value, data_bytes)
|
|
reply = self._rw.write(self._device, int(key), data_bytes)
|
|
if not reply:
|
|
return None
|
|
|
|
return map
|
|
|
|
def write_key_value(self, key, value):
|
|
assert hasattr(self, '_value')
|
|
assert hasattr(self, '_device')
|
|
assert key is not None
|
|
assert value is not None
|
|
|
|
if _log.isEnabledFor(_DEBUG):
|
|
_log.debug('%s: settings write key %r value %r to %s', self.name,
|
|
key, value, self._device)
|
|
|
|
if self._device.online:
|
|
# Remember the value we're trying to set, even if the write fails.
|
|
# This way even if the device is offline or some other error occurs,
|
|
# the last value we've tried to write is remembered in the configuration.
|
|
self._value[str(key)] = value
|
|
if self._device.persister:
|
|
self._device.persister[self.name] = self._value
|
|
|
|
data_bytes = self._validator.prepare_write(int(key), value)
|
|
if data_bytes is not None:
|
|
if _log.isEnabledFor(_DEBUG):
|
|
_log.debug(
|
|
'%s: settings prepare key value write(%s,%s) => %r',
|
|
self.name, key, value, data_bytes)
|
|
reply = self._rw.write(self._device, int(key), data_bytes)
|
|
if not reply:
|
|
# tell whomever is calling that the write failed
|
|
return None
|
|
|
|
return value
|
|
|
|
|
|
class BitFieldSetting(Setting):
|
|
"""A setting descriptor for a set of choices represented by one bit each, being a map from options to booleans.
|
|
Needs to be instantiated for each specific device."""
|
|
def read(self, cached=True):
|
|
assert hasattr(self, '_value')
|
|
assert hasattr(self, '_device')
|
|
|
|
if _log.isEnabledFor(_DEBUG):
|
|
_log.debug('%s: settings read %r from %s', self.name, self._value,
|
|
self._device)
|
|
|
|
if self._value is None and getattr(self._device, 'persister', None):
|
|
# We haven't read a value from the device yet,
|
|
# maybe we have something in the configuration.
|
|
self._value = self._device.persister.get(self.name)
|
|
|
|
if cached and self._value is not None:
|
|
if getattr(self._device, 'persister',
|
|
None) and self.name not in self._device.persister:
|
|
# If this is a new device (or a new setting for an old device),
|
|
# make sure to save its current value for the next time.
|
|
self._device.persister[self.name] = self._value
|
|
return self._value
|
|
|
|
if self._device.online:
|
|
reply_map = {}
|
|
reply = self._rw.read(self._device)
|
|
if reply:
|
|
# keys are ints, because that is what the device uses,
|
|
# encoded into strings because JSON requires strings as keys
|
|
reply_map = self._validator.validate_read(reply)
|
|
self._value = reply_map
|
|
if getattr(self._device, 'persister',
|
|
None) and self.name not in self._device.persister:
|
|
# Don't update the persister if it already has a value,
|
|
# otherwise the first read might overwrite the value we wanted.
|
|
self._device.persister[self.name] = self._value
|
|
return self._value
|
|
|
|
def read_key(self, key, cached=True):
|
|
assert hasattr(self, '_value')
|
|
assert hasattr(self, '_device')
|
|
assert key is not None
|
|
|
|
if _log.isEnabledFor(_DEBUG):
|
|
_log.debug('%s: settings read %r key %r from %s', self.name,
|
|
self._value, key, self._device)
|
|
|
|
if self._value is None and getattr(self._device, 'persister', None):
|
|
self._value = self._device.persister.get(self.name)
|
|
|
|
if cached and self._value is not None:
|
|
if getattr(self._device, 'persister',
|
|
None) and self.name not in self._device.persister:
|
|
self._device.persister[self.name] = self._value
|
|
return self._value[str(int(key))]
|
|
|
|
if self._device.online:
|
|
reply = self._rw.read(self._device, key)
|
|
if reply:
|
|
self._value = self._validator.validate_read(reply)
|
|
if getattr(self._device, 'persister',
|
|
None) and self.name not in self._device.persister:
|
|
self._device.persister[self.name] = self._value
|
|
return self._value[str(int(key))]
|
|
|
|
def write(self, map):
|
|
assert hasattr(self, '_value')
|
|
assert hasattr(self, '_device')
|
|
assert map is not None
|
|
|
|
if _log.isEnabledFor(_DEBUG):
|
|
_log.debug('%s: settings write %r to %s', self.name, map,
|
|
self._device)
|
|
|
|
if self._device.online:
|
|
# Remember the value we're trying to set, even if the write fails.
|
|
# This way even if the device is offline or some other error occurs,
|
|
# the last value we've tried to write is remembered in the configuration.
|
|
self._value = map
|
|
if self._device.persister:
|
|
self._device.persister[self.name] = map
|
|
data_bytes = self._validator.prepare_write(self._value)
|
|
if data_bytes is not None:
|
|
if _log.isEnabledFor(_DEBUG):
|
|
_log.debug('%s: settings prepare map write(%s) => %r',
|
|
self.name, self._value, data_bytes)
|
|
reply = self._rw.write(self._device, data_bytes)
|
|
if not reply:
|
|
return None
|
|
return map
|
|
|
|
def write_key_value(self, key, value):
|
|
assert hasattr(self, '_value')
|
|
assert hasattr(self, '_device')
|
|
assert key is not None
|
|
assert value is not None
|
|
|
|
if _log.isEnabledFor(_DEBUG):
|
|
_log.debug('%s: settings write key %r value %r to %s', self.name,
|
|
key, value, self._device)
|
|
|
|
if self._device.online:
|
|
# Remember the value we're trying to set, even if the write fails.
|
|
# This way even if the device is offline or some other error occurs,
|
|
# the last value we've tried to write is remembered in the configuration.
|
|
value = bool(value)
|
|
self._value[str(key)] = value
|
|
if self._device.persister:
|
|
self._device.persister[self.name] = self._value
|
|
|
|
data_bytes = self._validator.prepare_write(self._value)
|
|
if data_bytes is not None:
|
|
if _log.isEnabledFor(_DEBUG):
|
|
_log.debug(
|
|
'%s: settings prepare key value write(%s,%s) => %r',
|
|
self.name, key, str(value), data_bytes)
|
|
reply = self._rw.write(self._device, data_bytes)
|
|
if not reply:
|
|
# tell whomever is calling that the write failed
|
|
return None
|
|
|
|
return value
|
|
|
|
|
|
#
|
|
# read/write low-level operators
|
|
#
|
|
|
|
|
|
class RegisterRW(object):
|
|
__slots__ = ('register', )
|
|
|
|
kind = _NamedInt(0x01, 'register')
|
|
|
|
def __init__(self, register):
|
|
assert isinstance(register, int)
|
|
self.register = register
|
|
|
|
def read(self, device):
|
|
return device.read_register(self.register)
|
|
|
|
def write(self, device, data_bytes):
|
|
return device.write_register(self.register, data_bytes)
|
|
|
|
|
|
class FeatureRW(object):
|
|
__slots__ = ('feature', 'read_fnid', 'write_fnid')
|
|
|
|
kind = _NamedInt(0x02, 'feature')
|
|
default_read_fnid = 0x00
|
|
default_write_fnid = 0x10
|
|
|
|
def __init__(self,
|
|
feature,
|
|
read_fnid=default_read_fnid,
|
|
write_fnid=default_write_fnid):
|
|
assert isinstance(feature, _NamedInt)
|
|
self.feature = feature
|
|
self.read_fnid = read_fnid
|
|
self.write_fnid = write_fnid
|
|
|
|
def read(self, device):
|
|
assert self.feature is not None
|
|
return device.feature_request(self.feature, self.read_fnid)
|
|
|
|
def write(self, device, data_bytes):
|
|
assert self.feature is not None
|
|
return device.feature_request(self.feature, self.write_fnid,
|
|
data_bytes)
|
|
|
|
|
|
class FeatureRWMap(FeatureRW):
|
|
kind = _NamedInt(0x02, 'feature')
|
|
default_read_fnid = 0x00
|
|
default_write_fnid = 0x10
|
|
default_key_bytes = 1
|
|
|
|
def __init__(self,
|
|
feature,
|
|
read_fnid=default_read_fnid,
|
|
write_fnid=default_write_fnid,
|
|
key_bytes=default_key_bytes):
|
|
assert isinstance(feature, _NamedInt)
|
|
self.feature = feature
|
|
self.read_fnid = read_fnid
|
|
self.write_fnid = write_fnid
|
|
self.key_bytes = key_bytes
|
|
|
|
def read(self, device, key):
|
|
assert self.feature is not None
|
|
key_bytes = _int2bytes(key, self.key_bytes)
|
|
return device.feature_request(self.feature, self.read_fnid, key_bytes)
|
|
|
|
def write(self, device, key, data_bytes):
|
|
assert self.feature is not None
|
|
key_bytes = _int2bytes(key, self.key_bytes)
|
|
return device.feature_request(self.feature, self.write_fnid, key_bytes,
|
|
data_bytes)
|
|
|
|
|
|
#
|
|
# value validators
|
|
# handle the conversion from read bytes, to setting value, and back
|
|
#
|
|
|
|
|
|
class BooleanValidator(object):
|
|
__slots__ = ('true_value', 'false_value', 'mask', 'needs_current_value')
|
|
|
|
kind = KIND.toggle
|
|
default_true = 0x01
|
|
default_false = 0x00
|
|
# mask specifies all the affected bits in the value
|
|
default_mask = 0xFF
|
|
|
|
def __init__(self,
|
|
true_value=default_true,
|
|
false_value=default_false,
|
|
mask=default_mask):
|
|
if isinstance(true_value, int):
|
|
assert isinstance(false_value, int)
|
|
if mask is None:
|
|
mask = self.default_mask
|
|
else:
|
|
assert isinstance(mask, int)
|
|
assert true_value & false_value == 0
|
|
assert true_value & mask == true_value
|
|
assert false_value & mask == false_value
|
|
self.needs_current_value = (mask != self.default_mask)
|
|
elif isinstance(true_value, bytes):
|
|
if false_value is None or false_value == self.default_false:
|
|
false_value = b'\x00' * len(true_value)
|
|
else:
|
|
assert isinstance(false_value, bytes)
|
|
if mask is None or mask == self.default_mask:
|
|
mask = b'\xFF' * len(true_value)
|
|
else:
|
|
assert isinstance(mask, bytes)
|
|
assert len(mask) == len(true_value) == len(false_value)
|
|
tv = _bytes2int(true_value)
|
|
fv = _bytes2int(false_value)
|
|
mv = _bytes2int(mask)
|
|
assert tv != fv # true and false might be something other than bit values
|
|
assert tv & mv == tv
|
|
assert fv & mv == fv
|
|
self.needs_current_value = any(m != b'\xFF' for m in mask)
|
|
else:
|
|
raise Exception("invalid mask '%r', type %s" % (mask, type(mask)))
|
|
|
|
self.true_value = true_value
|
|
self.false_value = false_value
|
|
self.mask = mask
|
|
|
|
def validate_read(self, reply_bytes):
|
|
if isinstance(self.mask, int):
|
|
reply_value = ord(reply_bytes[:1]) & self.mask
|
|
if _log.isEnabledFor(_DEBUG):
|
|
_log.debug('BooleanValidator: validate read %r => %02X',
|
|
reply_bytes, reply_value)
|
|
if reply_value == self.true_value:
|
|
return True
|
|
if reply_value == self.false_value:
|
|
return False
|
|
_log.warn('BooleanValidator: reply %02X mismatched %02X/%02X/%02X',
|
|
reply_value, self.true_value, self.false_value,
|
|
self.mask)
|
|
return False
|
|
|
|
count = len(self.mask)
|
|
mask = _bytes2int(self.mask)
|
|
reply_value = _bytes2int(reply_bytes[:count]) & mask
|
|
|
|
true_value = _bytes2int(self.true_value)
|
|
if reply_value == true_value:
|
|
return True
|
|
|
|
false_value = _bytes2int(self.false_value)
|
|
if reply_value == false_value:
|
|
return False
|
|
|
|
_log.warn('BooleanValidator: reply %r mismatched %r/%r/%r',
|
|
reply_bytes, self.true_value, self.false_value, self.mask)
|
|
return False
|
|
|
|
def prepare_write(self, new_value, current_value=None):
|
|
if new_value is None:
|
|
new_value = False
|
|
else:
|
|
assert isinstance(new_value, bool)
|
|
|
|
to_write = self.true_value if new_value else self.false_value
|
|
|
|
if isinstance(self.mask, int):
|
|
if current_value is not None and self.needs_current_value:
|
|
to_write |= ord(current_value[:1]) & (0xFF ^ self.mask)
|
|
if current_value is not None and to_write == ord(
|
|
current_value[:1]):
|
|
return None
|
|
else:
|
|
to_write = bytearray(to_write)
|
|
count = len(self.mask)
|
|
for i in range(0, count):
|
|
b = ord(to_write[i:i + 1])
|
|
m = ord(self.mask[i:i + 1])
|
|
assert b & m == b
|
|
# b &= m
|
|
if current_value is not None and self.needs_current_value:
|
|
b |= ord(current_value[i:i + 1]) & (0xFF ^ m)
|
|
to_write[i] = b
|
|
to_write = bytes(to_write)
|
|
|
|
if current_value is not None and to_write == current_value[:len(
|
|
to_write)]:
|
|
return None
|
|
|
|
if _log.isEnabledFor(_DEBUG):
|
|
_log.debug('BooleanValidator: prepare_write(%s, %s) => %r',
|
|
new_value, current_value, to_write)
|
|
|
|
return to_write
|
|
|
|
|
|
class BitFieldValidator(object):
|
|
__slots__ = ('byte_count', 'options')
|
|
|
|
kind = KIND.multiple_toggle
|
|
|
|
def __init__(self, options, byte_count=None):
|
|
assert (isinstance(options, list))
|
|
self.options = options
|
|
self.byte_count = (max(x.bit_length() for x in options) + 7) // 8
|
|
if byte_count:
|
|
assert (isinstance(byte_count, int)
|
|
and byte_count >= self.byte_count)
|
|
self.byte_count = byte_count
|
|
|
|
def validate_read(self, reply_bytes):
|
|
r = _bytes2int(reply_bytes[:self.byte_count])
|
|
value = {str(int(k)): False for k in self.options}
|
|
m = 1
|
|
for _ in range(8 * self.byte_count):
|
|
if m in self.options:
|
|
value[str(int(m))] = bool(r & m)
|
|
m <<= 1
|
|
return value
|
|
|
|
def prepare_write(self, new_value):
|
|
assert (isinstance(new_value, dict))
|
|
w = 0
|
|
for k, v in new_value.items():
|
|
if v:
|
|
w |= int(k)
|
|
return _int2bytes(w, self.byte_count)
|
|
|
|
|
|
class ChoicesValidator(object):
|
|
__slots__ = ('choices', 'flag', '_bytes_count', 'needs_current_value')
|
|
|
|
kind = KIND.choice
|
|
"""Translates between NamedInts and a byte sequence.
|
|
:param choices: a list of NamedInts
|
|
:param bytes_count: the size of the derived byte sequence. If None, it
|
|
will be calculated from the choices."""
|
|
def __init__(self, choices, bytes_count=None):
|
|
assert choices is not None
|
|
assert isinstance(choices, _NamedInts)
|
|
assert len(choices) > 2
|
|
self.choices = choices
|
|
self.needs_current_value = False
|
|
|
|
max_bits = max(x.bit_length() for x in choices)
|
|
self._bytes_count = (max_bits // 8) + (1 if max_bits % 8 else 0)
|
|
if bytes_count:
|
|
assert self._bytes_count <= bytes_count
|
|
self._bytes_count = bytes_count
|
|
assert self._bytes_count < 8
|
|
|
|
def validate_read(self, reply_bytes):
|
|
reply_value = _bytes2int(reply_bytes[:self._bytes_count])
|
|
valid_value = self.choices[reply_value]
|
|
assert valid_value is not None, '%s: failed to validate read value %02X' % (
|
|
self.__class__.__name__, reply_value)
|
|
return valid_value
|
|
|
|
def prepare_write(self, new_value, current_value=None):
|
|
if new_value is None:
|
|
choice = self.choices[:][0]
|
|
else:
|
|
if isinstance(new_value, int):
|
|
choice = self.choices[new_value]
|
|
elif int(new_value) in self.choices:
|
|
choice = self.choices[int(new_value)]
|
|
elif new_value in self.choices:
|
|
choice = self.choices[new_value]
|
|
else:
|
|
raise ValueError(new_value)
|
|
|
|
if choice is None:
|
|
raise ValueError('invalid choice %r' % new_value)
|
|
assert isinstance(choice, _NamedInt)
|
|
return choice.bytes(self._bytes_count)
|
|
|
|
|
|
class ChoicesMapValidator(ChoicesValidator):
|
|
kind = KIND.map_choice
|
|
|
|
def __init__(self,
|
|
choices_map,
|
|
key_bytes_count=None,
|
|
skip_bytes_count=None,
|
|
value_bytes_count=None,
|
|
extra_default=None):
|
|
assert choices_map is not None
|
|
assert isinstance(choices_map, dict)
|
|
max_key_bits = 0
|
|
max_value_bits = 0
|
|
for key, choices in choices_map.items():
|
|
assert isinstance(key, _NamedInt)
|
|
assert isinstance(choices, list)
|
|
max_key_bits = max(max_key_bits, key.bit_length())
|
|
for key_value in choices:
|
|
assert isinstance(key_value, _NamedInt)
|
|
max_value_bits = max(max_value_bits, key_value.bit_length())
|
|
self.choices = choices_map
|
|
self.needs_current_value = False
|
|
self.extra_default = extra_default
|
|
|
|
self._key_bytes_count = (max_key_bits + 7) // 8
|
|
if key_bytes_count:
|
|
assert self._key_bytes_count <= key_bytes_count
|
|
self._key_bytes_count = key_bytes_count
|
|
self._value_bytes_count = (max_value_bits + 7) // 8
|
|
if value_bytes_count:
|
|
assert self._value_bytes_count <= value_bytes_count
|
|
self._value_bytes_count = value_bytes_count
|
|
self._skip_bytes_count = skip_bytes_count if skip_bytes_count is not None else 0
|
|
self._bytes_count = self._key_bytes_count + self._skip_bytes_count + self._value_bytes_count
|
|
|
|
def validate_read(self, reply_bytes, key):
|
|
start = self._key_bytes_count + self._skip_bytes_count
|
|
end = start + self._value_bytes_count
|
|
reply_value = _bytes2int(reply_bytes[start:end])
|
|
# reprogrammable keys starts out as 0, which is not a choice, so don't use assert here
|
|
if self.extra_default is not None and self.extra_default == reply_value:
|
|
return int(self.choices[key][0])
|
|
assert reply_value in self.choices[
|
|
key], '%s: failed to validate read value %02X' % (
|
|
self.__class__.__name__, reply_value)
|
|
return reply_value
|
|
|
|
def prepare_write(self, key, new_value):
|
|
choices = self.choices[key]
|
|
if new_value not in choices and new_value != self.extra_default:
|
|
raise ValueError('invalid choice %r' % new_value)
|
|
return _int2bytes(new_value,
|
|
self._skip_bytes_count + self._value_bytes_count)
|
|
|
|
|
|
class RangeValidator(object):
|
|
__slots__ = ('min_value', 'max_value', 'flag', '_bytes_count',
|
|
'needs_current_value')
|
|
|
|
kind = KIND.range
|
|
"""Translates between integers and a byte sequence.
|
|
:param min_value: minimum accepted value (inclusive)
|
|
:param max_value: maximum accepted value (inclusive)
|
|
:param bytes_count: the size of the derived byte sequence. If None, it
|
|
will be calculated from the range."""
|
|
def __init__(self, min_value, max_value, bytes_count=None):
|
|
assert max_value > min_value
|
|
self.min_value = min_value
|
|
self.max_value = max_value
|
|
self.needs_current_value = False
|
|
|
|
self._bytes_count = math.ceil(math.log(max_value + 1, 256))
|
|
if bytes_count:
|
|
assert self._bytes_count <= bytes_count
|
|
self._bytes_count = bytes_count
|
|
assert self._bytes_count < 8
|
|
|
|
def validate_read(self, reply_bytes):
|
|
reply_value = _bytes2int(reply_bytes[:self._bytes_count])
|
|
assert reply_value >= self.min_value, '%s: failed to validate read value %02X' % (
|
|
self.__class__.__name__, reply_value)
|
|
assert reply_value <= self.max_value, '%s: failed to validate read value %02X' % (
|
|
self.__class__.__name__, reply_value)
|
|
return reply_value
|
|
|
|
def prepare_write(self, new_value, current_value=None):
|
|
if new_value < self.min_value or new_value > self.max_value:
|
|
raise ValueError('invalid choice %r' % new_value)
|
|
return _int2bytes(new_value, self._bytes_count)
|