# -*- python-mode -*- # -*- coding: UTF-8 -*- ## Copyright (C) 2012-2013 Daniel Pavel ## ## This program is free software; you can redistribute it and/or modify ## it under the terms of the GNU General Public License as published by ## the Free Software Foundation; either version 2 of the License, or ## (at your option) any later version. ## ## This program is distributed in the hope that it will be useful, ## but WITHOUT ANY WARRANTY; without even the implied warranty of ## MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the ## GNU General Public License for more details. ## ## You should have received a copy of the GNU General Public License along ## with this program; if not, write to the Free Software Foundation, Inc., ## 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. from __future__ import absolute_import, division, print_function, unicode_literals import math from copy import copy as _copy from logging import DEBUG as _DEBUG from logging import getLogger from .common import NamedInt as _NamedInt from .common import NamedInts as _NamedInts from .common import bytes2int as _bytes2int from .common import int2bytes as _int2bytes _log = getLogger(__name__) del getLogger # # # KIND = _NamedInts(toggle=0x01, choice=0x02, range=0x04, map_choice=0x0A, multiple_toggle=0x10) class Setting(object): """A setting descriptor. Needs to be instantiated for each specific device.""" __slots__ = ('name', 'label', 'description', 'kind', 'device_kind', 'feature', '_rw', '_validator', '_device', '_value') def __init__(self, name, rw, validator, kind=None, label=None, description=None, device_kind=None, feature=None): assert name self.name = name self.label = label or name self.description = description self.device_kind = device_kind self.feature = feature self._rw = rw self._validator = validator assert kind is None or kind & validator.kind != 0 self.kind = kind or validator.kind def __call__(self, device): assert not hasattr(self, '_value') # combined keyboards and touchpads (e.g., K400) break this assertion so don't use it # assert self.device_kind is None or device.kind in self.device_kind p = device.protocol if p == 1.0: # HID++ 1.0 devices do not support features assert self._rw.kind == RegisterRW.kind elif p >= 2.0: # HID++ 2.0 devices do not support registers assert self._rw.kind == FeatureRW.kind o = _copy(self) o._value = None o._device = device return o @property def choices(self): assert hasattr(self, '_value') assert hasattr(self, '_device') return self._validator.choices if self._validator.kind & KIND.choice else None @property def range(self): assert hasattr(self, '_value') assert hasattr(self, '_device') if self._validator.kind == KIND.range: return (self._validator.min_value, self._validator.max_value) def read(self, cached=True): assert hasattr(self, '_value') assert hasattr(self, '_device') if _log.isEnabledFor(_DEBUG): _log.debug('%s: settings read %r from %s', self.name, self._value, self._device) if self._value is None and self._device.persister: # We haven't read a value from the device yet, # maybe we have something in the configuration. self._value = self._device.persister.get(self.name) if cached and self._value is not None: if self._device.persister and self.name not in self._device.persister: # If this is a new device (or a new setting for an old device), # make sure to save its current value for the next time. self._device.persister[self.name] = self._value return self._value if self._device.online: reply = self._rw.read(self._device) if reply: self._value = self._validator.validate_read(reply) if self._device.persister and self.name not in self._device.persister: # Don't update the persister if it already has a value, # otherwise the first read might overwrite the value we wanted. self._device.persister[self.name] = self._value return self._value def write(self, value): assert hasattr(self, '_value') assert hasattr(self, '_device') assert value is not None if _log.isEnabledFor(_DEBUG): _log.debug('%s: settings write %r to %s', self.name, value, self._device) if self._device.online: # Remember the value we're trying to set, even if the write fails. # This way even if the device is offline or some other error occurs, # the last value we've tried to write is remembered in the configuration. self._value = value if self._device.persister: self._device.persister[self.name] = value current_value = None if self._validator.needs_current_value: # the validator needs the current value, possibly to merge flag values current_value = self._rw.read(self._device) data_bytes = self._validator.prepare_write(value, current_value) if data_bytes is not None: if _log.isEnabledFor(_DEBUG): _log.debug('%s: settings prepare write(%s) => %r', self.name, value, data_bytes) reply = self._rw.write(self._device, data_bytes) if not reply: # tell whomever is calling that the write failed return None return value def apply(self): assert hasattr(self, '_value') assert hasattr(self, '_device') if _log.isEnabledFor(_DEBUG): _log.debug('%s: apply %s (%s)', self.name, self._value, self._device) value = self.read() if value is not None: self.write(value) def __str__(self): if hasattr(self, '_value'): assert hasattr(self, '_device') return '' % ( self._rw.kind, self._validator.kind, self._device.codename, self.name, self._value ) return '' % (self._rw.kind, self._validator.kind, self.name) __unicode__ = __repr__ = __str__ class Settings(Setting): """A setting descriptor for multiple choices, being a map from keys to values. Needs to be instantiated for each specific device.""" def read(self, cached=True): assert hasattr(self, '_value') assert hasattr(self, '_device') if _log.isEnabledFor(_DEBUG): _log.debug('%s: settings read %r from %s', self.name, self._value, self._device) if self._value is None and getattr(self._device, 'persister', None): # We haven't read a value from the device yet, # maybe we have something in the configuration. self._value = self._device.persister.get(self.name) if cached and self._value is not None: if getattr(self._device, 'persister', None) and self.name not in self._device.persister: # If this is a new device (or a new setting for an old device), # make sure to save its current value for the next time. self._device.persister[self.name] = self._value return self._value if self._device.online: reply_map = {} for key in self._validator.choices: reply = self._rw.read(self._device, key) if reply: # keys are ints, because that is what the device uses, # encoded into strings because JSON requires strings as keys reply_map[str(int(key))] = self._validator.validate_read(reply, key) self._value = reply_map if getattr(self._device, 'persister', None) and self.name not in self._device.persister: # Don't update the persister if it already has a value, # otherwise the first read might overwrite the value we wanted. self._device.persister[self.name] = self._value return self._value def read_key(self, key, cached=True): assert hasattr(self, '_value') assert hasattr(self, '_device') assert key is not None if _log.isEnabledFor(_DEBUG): _log.debug('%s: settings read %r key %r from %s', self.name, self._value, key, self._device) if self._value is None and getattr(self._device, 'persister', None): self._value = self._device.persister.get(self.name) if cached and self._value is not None: if getattr(self._device, 'persister', None) and self.name not in self._device.persister: self._device.persister[self.name] = self._value return self._value[str(int(key))] if self._device.online: reply = self._rw.read(self._device, key) if reply: self._value[str(int(key))] = self._validator.validate_read(reply, key) if getattr(self._device, 'persister', None) and self.name not in self._device.persister: self._device.persister[self.name] = self._value return self._value[str(int(key))] def write(self, map): assert hasattr(self, '_value') assert hasattr(self, '_device') assert map is not None if _log.isEnabledFor(_DEBUG): _log.debug('%s: settings write %r to %s', self.name, map, self._device) if self._device.online: # Remember the value we're trying to set, even if the write fails. # This way even if the device is offline or some other error occurs, # the last value we've tried to write is remembered in the configuration. self._value = map if self._device.persister: self._device.persister[self.name] = map for key, value in map.items(): data_bytes = self._validator.prepare_write(int(key), value) if data_bytes is not None: if _log.isEnabledFor(_DEBUG): _log.debug('%s: settings prepare map write(%s,%s) => %r', self.name, key, value, data_bytes) reply = self._rw.write(self._device, int(key), data_bytes) if not reply: return None return map def write_key_value(self, key, value): assert hasattr(self, '_value') assert hasattr(self, '_device') assert key is not None assert value is not None if _log.isEnabledFor(_DEBUG): _log.debug('%s: settings write key %r value %r to %s', self.name, key, value, self._device) if self._device.online: # Remember the value we're trying to set, even if the write fails. # This way even if the device is offline or some other error occurs, # the last value we've tried to write is remembered in the configuration. self._value[str(key)] = value if self._device.persister: self._device.persister[self.name] = self._value data_bytes = self._validator.prepare_write(int(key), value) if data_bytes is not None: if _log.isEnabledFor(_DEBUG): _log.debug('%s: settings prepare key value write(%s,%s) => %r', self.name, key, value, data_bytes) reply = self._rw.write(self._device, int(key), data_bytes) if not reply: # tell whomever is calling that the write failed return None return value class BitFieldSetting(Setting): """A setting descriptor for a set of choices represented by one bit each, being a map from options to booleans. Needs to be instantiated for each specific device.""" def read(self, cached=True): assert hasattr(self, '_value') assert hasattr(self, '_device') if _log.isEnabledFor(_DEBUG): _log.debug('%s: settings read %r from %s', self.name, self._value, self._device) if self._value is None and getattr(self._device, 'persister', None): # We haven't read a value from the device yet, # maybe we have something in the configuration. self._value = self._device.persister.get(self.name) if cached and self._value is not None: if getattr(self._device, 'persister', None) and self.name not in self._device.persister: # If this is a new device (or a new setting for an old device), # make sure to save its current value for the next time. self._device.persister[self.name] = self._value return self._value if self._device.online: reply_map = {} reply = self._rw.read(self._device) if reply: # keys are ints, because that is what the device uses, # encoded into strings because JSON requires strings as keys reply_map = self._validator.validate_read(reply) self._value = reply_map if getattr(self._device, 'persister', None) and self.name not in self._device.persister: # Don't update the persister if it already has a value, # otherwise the first read might overwrite the value we wanted. self._device.persister[self.name] = self._value return self._value def read_key(self, key, cached=True): assert hasattr(self, '_value') assert hasattr(self, '_device') assert key is not None if _log.isEnabledFor(_DEBUG): _log.debug('%s: settings read %r key %r from %s', self.name, self._value, key, self._device) if self._value is None and getattr(self._device, 'persister', None): self._value = self._device.persister.get(self.name) if cached and self._value is not None: if getattr(self._device, 'persister', None) and self.name not in self._device.persister: self._device.persister[self.name] = self._value return self._value[str(int(key))] if self._device.online: reply = self._rw.read(self._device, key) if reply: self._value = self._validator.validate_read(reply) if getattr(self._device, 'persister', None) and self.name not in self._device.persister: self._device.persister[self.name] = self._value return self._value[str(int(key))] def write(self, map): assert hasattr(self, '_value') assert hasattr(self, '_device') assert map is not None if _log.isEnabledFor(_DEBUG): _log.debug('%s: settings write %r to %s', self.name, map, self._device) if self._device.online: # Remember the value we're trying to set, even if the write fails. # This way even if the device is offline or some other error occurs, # the last value we've tried to write is remembered in the configuration. self._value = map if self._device.persister: self._device.persister[self.name] = map data_bytes = self._validator.prepare_write(self._value) if data_bytes is not None: if _log.isEnabledFor(_DEBUG): _log.debug('%s: settings prepare map write(%s) => %r', self.name, self._value, data_bytes) reply = self._rw.write(self._device, data_bytes) if not reply: return None return map def write_key_value(self, key, value): assert hasattr(self, '_value') assert hasattr(self, '_device') assert key is not None assert value is not None if _log.isEnabledFor(_DEBUG): _log.debug('%s: settings write key %r value %r to %s', self.name, key, value, self._device) if self._device.online: # Remember the value we're trying to set, even if the write fails. # This way even if the device is offline or some other error occurs, # the last value we've tried to write is remembered in the configuration. value = bool(value) self._value[str(key)] = value if self._device.persister: self._device.persister[self.name] = self._value data_bytes = self._validator.prepare_write(self._value) if data_bytes is not None: if _log.isEnabledFor(_DEBUG): _log.debug('%s: settings prepare key value write(%s,%s) => %r', self.name, key, str(value), data_bytes) reply = self._rw.write(self._device, data_bytes) if not reply: # tell whomever is calling that the write failed return None return value # # read/write low-level operators # class RegisterRW(object): __slots__ = ('register', ) kind = _NamedInt(0x01, 'register') def __init__(self, register): assert isinstance(register, int) self.register = register def read(self, device): return device.read_register(self.register) def write(self, device, data_bytes): return device.write_register(self.register, data_bytes) class FeatureRW(object): __slots__ = ('feature', 'read_fnid', 'write_fnid') kind = _NamedInt(0x02, 'feature') default_read_fnid = 0x00 default_write_fnid = 0x10 def __init__(self, feature, read_fnid=default_read_fnid, write_fnid=default_write_fnid): assert isinstance(feature, _NamedInt) self.feature = feature self.read_fnid = read_fnid self.write_fnid = write_fnid def read(self, device): assert self.feature is not None return device.feature_request(self.feature, self.read_fnid) def write(self, device, data_bytes): assert self.feature is not None return device.feature_request(self.feature, self.write_fnid, data_bytes) class FeatureRWMap(FeatureRW): kind = _NamedInt(0x02, 'feature') default_read_fnid = 0x00 default_write_fnid = 0x10 default_key_bytes = 1 def __init__(self, feature, read_fnid=default_read_fnid, write_fnid=default_write_fnid, key_bytes=default_key_bytes): assert isinstance(feature, _NamedInt) self.feature = feature self.read_fnid = read_fnid self.write_fnid = write_fnid self.key_bytes = key_bytes def read(self, device, key): assert self.feature is not None key_bytes = _int2bytes(key, self.key_bytes) return device.feature_request(self.feature, self.read_fnid, key_bytes) def write(self, device, key, data_bytes): assert self.feature is not None key_bytes = _int2bytes(key, self.key_bytes) return device.feature_request(self.feature, self.write_fnid, key_bytes, data_bytes) # # value validators # handle the conversion from read bytes, to setting value, and back # class BooleanValidator(object): __slots__ = ('true_value', 'false_value', 'mask', 'needs_current_value') kind = KIND.toggle default_true = 0x01 default_false = 0x00 # mask specifies all the affected bits in the value default_mask = 0xFF def __init__(self, true_value=default_true, false_value=default_false, mask=default_mask): if isinstance(true_value, int): assert isinstance(false_value, int) if mask is None: mask = self.default_mask else: assert isinstance(mask, int) assert true_value & false_value == 0 assert true_value & mask == true_value assert false_value & mask == false_value self.needs_current_value = (mask != self.default_mask) elif isinstance(true_value, bytes): if false_value is None or false_value == self.default_false: false_value = b'\x00' * len(true_value) else: assert isinstance(false_value, bytes) if mask is None or mask == self.default_mask: mask = b'\xFF' * len(true_value) else: assert isinstance(mask, bytes) assert len(mask) == len(true_value) == len(false_value) tv = _bytes2int(true_value) fv = _bytes2int(false_value) mv = _bytes2int(mask) assert tv != fv # true and false might be something other than bit values assert tv & mv == tv assert fv & mv == fv self.needs_current_value = any(m != b'\xFF' for m in mask) else: raise Exception("invalid mask '%r', type %s" % (mask, type(mask))) self.true_value = true_value self.false_value = false_value self.mask = mask def validate_read(self, reply_bytes): if isinstance(self.mask, int): reply_value = ord(reply_bytes[:1]) & self.mask if _log.isEnabledFor(_DEBUG): _log.debug('BooleanValidator: validate read %r => %02X', reply_bytes, reply_value) if reply_value == self.true_value: return True if reply_value == self.false_value: return False _log.warn( 'BooleanValidator: reply %02X mismatched %02X/%02X/%02X', reply_value, self.true_value, self.false_value, self.mask ) return False count = len(self.mask) mask = _bytes2int(self.mask) reply_value = _bytes2int(reply_bytes[:count]) & mask true_value = _bytes2int(self.true_value) if reply_value == true_value: return True false_value = _bytes2int(self.false_value) if reply_value == false_value: return False _log.warn('BooleanValidator: reply %r mismatched %r/%r/%r', reply_bytes, self.true_value, self.false_value, self.mask) return False def prepare_write(self, new_value, current_value=None): if new_value is None: new_value = False else: assert isinstance(new_value, bool) to_write = self.true_value if new_value else self.false_value if isinstance(self.mask, int): if current_value is not None and self.needs_current_value: to_write |= ord(current_value[:1]) & (0xFF ^ self.mask) if current_value is not None and to_write == ord(current_value[:1]): return None else: to_write = bytearray(to_write) count = len(self.mask) for i in range(0, count): b = ord(to_write[i:i + 1]) m = ord(self.mask[i:i + 1]) assert b & m == b # b &= m if current_value is not None and self.needs_current_value: b |= ord(current_value[i:i + 1]) & (0xFF ^ m) to_write[i] = b to_write = bytes(to_write) if current_value is not None and to_write == current_value[:len(to_write)]: return None if _log.isEnabledFor(_DEBUG): _log.debug('BooleanValidator: prepare_write(%s, %s) => %r', new_value, current_value, to_write) return to_write class BitFieldValidator(object): __slots__ = ('byte_count', 'options') kind = KIND.multiple_toggle def __init__(self, options, byte_count=None): assert (isinstance(options, list)) self.options = options self.byte_count = (max(x.bit_length() for x in options) + 7) // 8 if byte_count: assert (isinstance(byte_count, int) and byte_count >= self.byte_count) self.byte_count = byte_count def validate_read(self, reply_bytes): r = _bytes2int(reply_bytes[:self.byte_count]) value = {str(int(k)): False for k in self.options} m = 1 for _ in range(8 * self.byte_count): if m in self.options: value[str(int(m))] = bool(r & m) m <<= 1 return value def prepare_write(self, new_value): assert (isinstance(new_value, dict)) w = 0 for k, v in new_value.items(): if v: w |= int(k) return _int2bytes(w, self.byte_count) class ChoicesValidator(object): __slots__ = ('choices', 'flag', '_bytes_count', 'needs_current_value') kind = KIND.choice """Translates between NamedInts and a byte sequence. :param choices: a list of NamedInts :param bytes_count: the size of the derived byte sequence. If None, it will be calculated from the choices.""" def __init__(self, choices, bytes_count=None): assert choices is not None assert isinstance(choices, _NamedInts) assert len(choices) > 2 self.choices = choices self.needs_current_value = False max_bits = max(x.bit_length() for x in choices) self._bytes_count = (max_bits // 8) + (1 if max_bits % 8 else 0) if bytes_count: assert self._bytes_count <= bytes_count self._bytes_count = bytes_count assert self._bytes_count < 8 def validate_read(self, reply_bytes): reply_value = _bytes2int(reply_bytes[:self._bytes_count]) valid_value = self.choices[reply_value] assert valid_value is not None, '%s: failed to validate read value %02X' % (self.__class__.__name__, reply_value) return valid_value def prepare_write(self, new_value, current_value=None): if new_value is None: choice = self.choices[:][0] else: if isinstance(new_value, int): choice = self.choices[new_value] elif int(new_value) in self.choices: choice = self.choices[int(new_value)] elif new_value in self.choices: choice = self.choices[new_value] else: raise ValueError(new_value) if choice is None: raise ValueError('invalid choice %r' % new_value) assert isinstance(choice, _NamedInt) return choice.bytes(self._bytes_count) class ChoicesMapValidator(ChoicesValidator): kind = KIND.map_choice def __init__(self, choices_map, key_bytes_count=None, skip_bytes_count=None, value_bytes_count=None, extra_default=None): assert choices_map is not None assert isinstance(choices_map, dict) max_key_bits = 0 max_value_bits = 0 for key, choices in choices_map.items(): assert isinstance(key, _NamedInt) assert isinstance(choices, list) max_key_bits = max(max_key_bits, key.bit_length()) for key_value in choices: assert isinstance(key_value, _NamedInt) max_value_bits = max(max_value_bits, key_value.bit_length()) self.choices = choices_map self.needs_current_value = False self.extra_default = extra_default self._key_bytes_count = (max_key_bits + 7) // 8 if key_bytes_count: assert self._key_bytes_count <= key_bytes_count self._key_bytes_count = key_bytes_count self._value_bytes_count = (max_value_bits + 7) // 8 if value_bytes_count: assert self._value_bytes_count <= value_bytes_count self._value_bytes_count = value_bytes_count self._skip_bytes_count = skip_bytes_count if skip_bytes_count is not None else 0 self._bytes_count = self._key_bytes_count + self._skip_bytes_count + self._value_bytes_count def validate_read(self, reply_bytes, key): start = self._key_bytes_count + self._skip_bytes_count end = start + self._value_bytes_count reply_value = _bytes2int(reply_bytes[start:end]) # reprogrammable keys starts out as 0, which is not a choice, so don't use assert here if self.extra_default is not None and self.extra_default == reply_value: return int(self.choices[key][0]) assert reply_value in self.choices[ key], '%s: failed to validate read value %02X' % (self.__class__.__name__, reply_value) return reply_value def prepare_write(self, key, new_value): choices = self.choices[key] if new_value not in choices and new_value != self.extra_default: raise ValueError('invalid choice %r' % new_value) return _int2bytes(new_value, self._skip_bytes_count + self._value_bytes_count) class RangeValidator(object): __slots__ = ('min_value', 'max_value', 'flag', '_bytes_count', 'needs_current_value') kind = KIND.range """Translates between integers and a byte sequence. :param min_value: minimum accepted value (inclusive) :param max_value: maximum accepted value (inclusive) :param bytes_count: the size of the derived byte sequence. If None, it will be calculated from the range.""" def __init__(self, min_value, max_value, bytes_count=None): assert max_value > min_value self.min_value = min_value self.max_value = max_value self.needs_current_value = False self._bytes_count = math.ceil(math.log(max_value + 1, 256)) if bytes_count: assert self._bytes_count <= bytes_count self._bytes_count = bytes_count assert self._bytes_count < 8 def validate_read(self, reply_bytes): reply_value = _bytes2int(reply_bytes[:self._bytes_count]) assert reply_value >= self.min_value, '%s: failed to validate read value %02X' % (self.__class__.__name__, reply_value) assert reply_value <= self.max_value, '%s: failed to validate read value %02X' % (self.__class__.__name__, reply_value) return reply_value def prepare_write(self, new_value, current_value=None): if new_value < self.min_value or new_value > self.max_value: raise ValueError('invalid choice %r' % new_value) return _int2bytes(new_value, self._bytes_count)