863 lines
		
	
	
		
			35 KiB
		
	
	
	
		
			Python
		
	
	
	
			
		
		
	
	
			863 lines
		
	
	
		
			35 KiB
		
	
	
	
		
			Python
		
	
	
	
| ## Copyright (C) 2012-2013  Daniel Pavel
 | |
| ##
 | |
| ## This program is free software; you can redistribute it and/or modify
 | |
| ## it under the terms of the GNU General Public License as published by
 | |
| ## the Free Software Foundation; either version 2 of the License, or
 | |
| ## (at your option) any later version.
 | |
| ##
 | |
| ## This program is distributed in the hope that it will be useful,
 | |
| ## but WITHOUT ANY WARRANTY; without even the implied warranty of
 | |
| ## MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 | |
| ## GNU General Public License for more details.
 | |
| ##
 | |
| ## You should have received a copy of the GNU General Public License along
 | |
| ## with this program; if not, write to the Free Software Foundation, Inc.,
 | |
| ## 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
 | |
| from __future__ import annotations
 | |
| 
 | |
| import logging
 | |
| import struct
 | |
| import time
 | |
| 
 | |
| from enum import IntEnum
 | |
| from typing import Any
 | |
| 
 | |
| from solaar.i18n import _
 | |
| 
 | |
| from . import common
 | |
| from . import hidpp20_constants
 | |
| from . import settings_validator
 | |
| from .common import NamedInt
 | |
| 
 | |
| logger = logging.getLogger(__name__)
 | |
| 
 | |
| SENSITIVITY_IGNORE = "ignore"
 | |
| 
 | |
| 
 | |
| class Kind(IntEnum):
 | |
|     TOGGLE = 0x01
 | |
|     CHOICE = 0x02
 | |
|     RANGE = 0x04
 | |
|     MAP_CHOICE = 0x0A
 | |
|     MULTIPLE_TOGGLE = 0x10
 | |
|     PACKED_RANGE = 0x20
 | |
|     MULTIPLE_RANGE = 0x40
 | |
|     HETERO = 0x80
 | |
| 
 | |
| 
 | |
| class Setting:
 | |
|     """A setting descriptor. Needs to be instantiated for each specific device."""
 | |
| 
 | |
|     name = label = description = ""
 | |
|     feature = register = kind = None
 | |
|     min_version = 0
 | |
|     persist = True
 | |
|     rw_options = {}
 | |
|     validator_class = None
 | |
|     validator_options = {}
 | |
| 
 | |
|     def __init__(self, device, rw, validator):
 | |
|         self._device = device
 | |
|         self._rw = rw
 | |
|         self._validator = validator
 | |
|         self.kind = getattr(self._validator, "kind", None)
 | |
|         self._value = None
 | |
| 
 | |
|     @classmethod
 | |
|     def build(cls, device):
 | |
|         assert cls.feature or cls.register, "Settings require either a feature or a register"
 | |
|         rw_class = cls.rw_class if hasattr(cls, "rw_class") else FeatureRW if cls.feature else RegisterRW
 | |
|         rw = rw_class(cls.feature if cls.feature else cls.register, **cls.rw_options)
 | |
|         p = device.protocol
 | |
|         if p == 1.0:  # HID++ 1.0 devices do not support features
 | |
|             assert rw.kind == RegisterRW.kind
 | |
|         elif p >= 2.0:  # HID++ 2.0 devices do not support registers
 | |
|             assert rw.kind == FeatureRW.kind
 | |
|         validator_class = cls.validator_class
 | |
|         validator = validator_class.build(cls, device, **cls.validator_options)
 | |
|         if validator:
 | |
|             assert cls.kind is None or cls.kind & validator.kind != 0
 | |
|             return cls(device, rw, validator)
 | |
| 
 | |
|     def val_to_string(self, value):
 | |
|         return self._validator.to_string(value)
 | |
| 
 | |
|     @property
 | |
|     def choices(self):
 | |
|         assert hasattr(self, "_value")
 | |
|         assert hasattr(self, "_device")
 | |
| 
 | |
|         return self._validator.choices if self._validator and self._validator.kind & Kind.CHOICE else None
 | |
| 
 | |
|     @property
 | |
|     def range(self):
 | |
|         assert hasattr(self, "_value")
 | |
|         assert hasattr(self, "_device")
 | |
| 
 | |
|         if self._validator.kind == Kind.RANGE:
 | |
|             return self._validator.min_value, self._validator.max_value
 | |
| 
 | |
|     def _pre_read(self, cached, key=None):
 | |
|         if self.persist and self._value is None and getattr(self._device, "persister", None):
 | |
|             # We haven't read a value from the device yet,
 | |
|             # maybe we have something in the configuration.
 | |
|             self._value = self._device.persister.get(self.name)
 | |
|         if cached and self._value is not None:
 | |
|             if getattr(self._device, "persister", None) and self.name not in self._device.persister:
 | |
|                 # If this is a new device (or a new setting for an old device),
 | |
|                 # make sure to save its current value for the next time.
 | |
|                 self._device.persister[self.name] = self._value if self.persist else None
 | |
| 
 | |
|     def read(self, cached=True):
 | |
|         assert hasattr(self, "_value")
 | |
|         assert hasattr(self, "_device")
 | |
| 
 | |
|         self._pre_read(cached)
 | |
|         if cached and self._value is not None:
 | |
|             if logger.isEnabledFor(logging.DEBUG):
 | |
|                 logger.debug("%s: cached value %r on %s", self.name, self._value, self._device)
 | |
|             return self._value
 | |
| 
 | |
|         if self._device.online:
 | |
|             reply = self._rw.read(self._device)
 | |
|             if reply:
 | |
|                 self._value = self._validator.validate_read(reply)
 | |
|             if self._value is not None and self._device.persister and self.name not in self._device.persister:
 | |
|                 # Don't update the persister if it already has a value,
 | |
|                 # otherwise the first read might overwrite the value we wanted.
 | |
|                 self._device.persister[self.name] = self._value if self.persist else None
 | |
|             if logger.isEnabledFor(logging.DEBUG):
 | |
|                 logger.debug("%s: read value %r on %s", self.name, self._value, self._device)
 | |
|             return self._value
 | |
| 
 | |
|     def _pre_write(self, save=True):
 | |
|         # Remember the value we're trying to set, even if the write fails.
 | |
|         # This way even if the device is offline or some other error occurs,
 | |
|         # the last value we've tried to write is remembered in the configuration.
 | |
|         if self._device.persister and save:
 | |
|             self._device.persister[self.name] = self._value if self.persist else None
 | |
| 
 | |
|     def update(self, value, save=True):
 | |
|         self._value = value
 | |
|         self._pre_write(save)
 | |
| 
 | |
|     def write(self, value, save=True):
 | |
|         assert hasattr(self, "_value")
 | |
|         assert hasattr(self, "_device")
 | |
|         assert value is not None
 | |
| 
 | |
|         if logger.isEnabledFor(logging.DEBUG):
 | |
|             logger.debug("%s: write %r to %s", self.name, value, self._device)
 | |
| 
 | |
|         if self._device.online:
 | |
|             if self._value != value:
 | |
|                 self.update(value, save)
 | |
| 
 | |
|             current_value = None
 | |
|             if self._validator.needs_current_value:
 | |
|                 # the _validator needs the current value, possibly to merge flag values
 | |
|                 current_value = self._rw.read(self._device)
 | |
|                 if logger.isEnabledFor(logging.DEBUG):
 | |
|                     logger.debug("%s: current value %r on %s", self.name, current_value, self._device)
 | |
| 
 | |
|             data_bytes = self._validator.prepare_write(value, current_value)
 | |
|             if data_bytes is not None:
 | |
|                 if logger.isEnabledFor(logging.DEBUG):
 | |
|                     logger.debug("%s: prepare write(%s) => %r", self.name, value, data_bytes)
 | |
| 
 | |
|                 reply = self._rw.write(self._device, data_bytes)
 | |
|                 if not reply:
 | |
|                     # tell whomever is calling that the write failed
 | |
|                     return None
 | |
| 
 | |
|             return value
 | |
| 
 | |
|     def acceptable(self, args, current):
 | |
|         return self._validator.acceptable(args, current) if self._validator else None
 | |
| 
 | |
|     def compare(self, args, current):
 | |
|         return self._validator.compare(args, current) if self._validator else None
 | |
| 
 | |
|     def apply(self):
 | |
|         assert hasattr(self, "_value")
 | |
|         assert hasattr(self, "_device")
 | |
|         if logger.isEnabledFor(logging.DEBUG):
 | |
|             logger.debug("%s: apply (%s)", self.name, self._device)
 | |
|         try:
 | |
|             value = self.read(self.persist)  # Don't use persisted value if setting doesn't persist
 | |
|             if self.persist and value is not None:  # If setting doesn't persist no need to write value just read
 | |
|                 self.write(value, save=False)
 | |
|         except Exception as e:
 | |
|             if logger.isEnabledFor(logging.WARNING):
 | |
|                 logger.warning("%s: error applying %s so ignore it (%s): %s", self.name, self._value, self._device, repr(e))
 | |
| 
 | |
|     def __str__(self):
 | |
|         if hasattr(self, "_value"):
 | |
|             assert hasattr(self, "_device")
 | |
|             return "<Setting([%s:%s] %s:%s=%s)>" % (
 | |
|                 self._rw.kind,
 | |
|                 self._validator.kind if self._validator else None,
 | |
|                 self._device.codename,
 | |
|                 self.name,
 | |
|                 self._value,
 | |
|             )
 | |
|         return f"<Setting([{self._rw.kind}:{self._validator.kind if self._validator else None}] {self.name})>"
 | |
| 
 | |
|     __repr__ = __str__
 | |
| 
 | |
| 
 | |
| class Settings(Setting):
 | |
|     """A setting descriptor for multiple choices, being a map from keys to values.
 | |
|     Needs to be instantiated for each specific device."""
 | |
| 
 | |
|     def read(self, cached=True):
 | |
|         assert hasattr(self, "_value")
 | |
|         assert hasattr(self, "_device")
 | |
|         if logger.isEnabledFor(logging.DEBUG):
 | |
|             logger.debug("%s: settings read %r from %s", self.name, self._value, self._device)
 | |
| 
 | |
|         self._pre_read(cached)
 | |
| 
 | |
|         if cached and self._value is not None:
 | |
|             return self._value
 | |
| 
 | |
|         if self._device.online:
 | |
|             reply_map = {}
 | |
|             for key in self._validator.choices:
 | |
|                 reply = self._rw.read(self._device, key)
 | |
|                 if reply:
 | |
|                     reply_map[int(key)] = self._validator.validate_read(reply, key)
 | |
|             self._value = reply_map
 | |
|             if getattr(self._device, "persister", None) and self.name not in self._device.persister:
 | |
|                 # Don't update the persister if it already has a value,
 | |
|                 # otherwise the first read might overwrite the value we wanted.
 | |
|                 self._device.persister[self.name] = self._value if self.persist else None
 | |
|             return self._value
 | |
| 
 | |
|     def read_key(self, key, cached=True):
 | |
|         assert hasattr(self, "_value")
 | |
|         assert hasattr(self, "_device")
 | |
|         assert key is not None
 | |
|         if logger.isEnabledFor(logging.DEBUG):
 | |
|             logger.debug("%s: settings read %r key %r from %s", self.name, self._value, key, self._device)
 | |
| 
 | |
|         self._pre_read(cached)
 | |
|         if cached and self._value is not None:
 | |
|             return self._value[int(key)]
 | |
| 
 | |
|         if self._device.online:
 | |
|             reply = self._rw.read(self._device, key)
 | |
|             if reply:
 | |
|                 self._value[int(key)] = self._validator.validate_read(reply, key)
 | |
|             if getattr(self._device, "persister", None) and self.name not in self._device.persister:
 | |
|                 self._device.persister[self.name] = self._value if self.persist else None
 | |
|             return self._value[int(key)]
 | |
| 
 | |
|     def write(self, map, save=True):
 | |
|         assert hasattr(self, "_value")
 | |
|         assert hasattr(self, "_device")
 | |
|         assert map is not None
 | |
| 
 | |
|         if logger.isEnabledFor(logging.DEBUG):
 | |
|             logger.debug("%s: settings write %r to %s", self.name, map, self._device)
 | |
| 
 | |
|         if self._device.online:
 | |
|             self.update(map, save)
 | |
|             for key, value in map.items():
 | |
|                 data_bytes = self._validator.prepare_write(int(key), value)
 | |
|                 if data_bytes is not None:
 | |
|                     if logger.isEnabledFor(logging.DEBUG):
 | |
|                         logger.debug("%s: settings prepare map write(%s,%s) => %r", self.name, key, value, data_bytes)
 | |
|                     reply = self._rw.write(self._device, int(key), data_bytes)
 | |
|                     if not reply:
 | |
|                         return None
 | |
|             return map
 | |
| 
 | |
|     def update_key_value(self, key, value, save=True):
 | |
|         self._value[int(key)] = value
 | |
|         self._pre_write(save)
 | |
| 
 | |
|     def write_key_value(self, key, value, save=True) -> Any | None:
 | |
|         assert hasattr(self, "_value")
 | |
|         assert hasattr(self, "_device")
 | |
|         assert key is not None
 | |
|         assert value is not None
 | |
| 
 | |
|         if logger.isEnabledFor(logging.DEBUG):
 | |
|             logger.debug("%s: settings write key %r value %r to %s", self.name, key, value, self._device)
 | |
| 
 | |
|         if self._device.online:
 | |
|             if not self._value:
 | |
|                 self.read()
 | |
|             try:
 | |
|                 data_bytes = self._validator.prepare_write(int(key), value)
 | |
|                 # always need to write to configuration because dictionary is shared and could have changed
 | |
|                 self.update_key_value(key, value, save)
 | |
|             except ValueError:
 | |
|                 data_bytes = value = None
 | |
|             if data_bytes is not None:
 | |
|                 if logger.isEnabledFor(logging.DEBUG):
 | |
|                     logger.debug("%s: settings prepare key value write(%s,%s) => %r", self.name, key, value, data_bytes)
 | |
|                 reply = self._rw.write(self._device, int(key), data_bytes)
 | |
|                 if not reply:
 | |
|                     return None
 | |
|             return value
 | |
| 
 | |
| 
 | |
| class LongSettings(Setting):
 | |
|     """A setting descriptor for multiple choices, being a map from keys to values.
 | |
|     Allows multiple write requests, if the options don't fit in 16 bytes.
 | |
|     The validator must return a list.
 | |
|     Needs to be instantiated for each specific device."""
 | |
| 
 | |
|     def read(self, cached=True):
 | |
|         assert hasattr(self, "_value")
 | |
|         assert hasattr(self, "_device")
 | |
|         if logger.isEnabledFor(logging.DEBUG):
 | |
|             logger.debug("%s: settings read %r from %s", self.name, self._value, self._device)
 | |
| 
 | |
|         self._pre_read(cached)
 | |
| 
 | |
|         if cached and self._value is not None:
 | |
|             return self._value
 | |
| 
 | |
|         if self._device.online:
 | |
|             reply_map = {}
 | |
|             # Reading one item at a time. This can probably be optimised
 | |
|             for item in self._validator.items:
 | |
|                 r = self._validator.prepare_read_item(item)
 | |
|                 reply = self._rw.read(self._device, r)
 | |
|                 if reply:
 | |
|                     reply_map[int(item)] = self._validator.validate_read_item(reply, item)
 | |
|             self._value = reply_map
 | |
|             if getattr(self._device, "persister", None) and self.name not in self._device.persister:
 | |
|                 # Don't update the persister if it already has a value,
 | |
|                 # otherwise the first read might overwrite the value we wanted.
 | |
|                 self._device.persister[self.name] = self._value if self.persist else None
 | |
|             return self._value
 | |
| 
 | |
|     def read_item(self, item, cached=True):
 | |
|         assert hasattr(self, "_value")
 | |
|         assert hasattr(self, "_device")
 | |
|         assert item is not None
 | |
|         if logger.isEnabledFor(logging.DEBUG):
 | |
|             logger.debug("%s: settings read %r item %r from %s", self.name, self._value, item, self._device)
 | |
| 
 | |
|         self._pre_read(cached)
 | |
|         if cached and self._value is not None:
 | |
|             return self._value[int(item)]
 | |
| 
 | |
|         if self._device.online:
 | |
|             r = self._validator.prepare_read_item(item)
 | |
|             reply = self._rw.read(self._device, r)
 | |
|             if reply:
 | |
|                 self._value[int(item)] = self._validator.validate_read_item(reply, item)
 | |
|             if getattr(self._device, "persister", None) and self.name not in self._device.persister:
 | |
|                 self._device.persister[self.name] = self._value if self.persist else None
 | |
|             return self._value[int(item)]
 | |
| 
 | |
|     def write(self, map, save=True):
 | |
|         assert hasattr(self, "_value")
 | |
|         assert hasattr(self, "_device")
 | |
|         assert map is not None
 | |
| 
 | |
|         if logger.isEnabledFor(logging.DEBUG):
 | |
|             logger.debug("%s: long settings write %r to %s", self.name, map, self._device)
 | |
|         if self._device.online:
 | |
|             self.update(map, save)
 | |
|             for item, value in map.items():
 | |
|                 data_bytes_list = self._validator.prepare_write(self._value)
 | |
|                 if data_bytes_list is not None:
 | |
|                     for data_bytes in data_bytes_list:
 | |
|                         if data_bytes is not None:
 | |
|                             if logger.isEnabledFor(logging.DEBUG):
 | |
|                                 logger.debug("%s: settings prepare map write(%s,%s) => %r", self.name, item, value, data_bytes)
 | |
|                             reply = self._rw.write(self._device, data_bytes)
 | |
|                             if not reply:
 | |
|                                 return None
 | |
|             return map
 | |
| 
 | |
|     def update_key_value(self, key, value, save=True):
 | |
|         self._value[int(key)] = value
 | |
|         self._pre_write(save)
 | |
| 
 | |
|     def write_key_value(self, item, value, save=True):
 | |
|         assert hasattr(self, "_value")
 | |
|         assert hasattr(self, "_device")
 | |
|         assert item is not None
 | |
|         assert value is not None
 | |
| 
 | |
|         if logger.isEnabledFor(logging.DEBUG):
 | |
|             logger.debug("%s: long settings write item %r value %r to %s", self.name, item, value, self._device)
 | |
| 
 | |
|         if self._device.online:
 | |
|             if not self._value:
 | |
|                 self.read()
 | |
|             data_bytes = self._validator.prepare_write_item(item, value)
 | |
|             self.update_key_value(item, value, save)
 | |
|             if data_bytes is not None:
 | |
|                 if logger.isEnabledFor(logging.DEBUG):
 | |
|                     logger.debug("%s: settings prepare item value write(%s,%s) => %r", self.name, item, value, data_bytes)
 | |
|                 reply = self._rw.write(self._device, data_bytes)
 | |
|                 if not reply:
 | |
|                     return None
 | |
|             return value
 | |
| 
 | |
| 
 | |
| class BitFieldSetting(Setting):
 | |
|     """A setting descriptor for a set of choices represented by one bit each, being a map from options to booleans.
 | |
|     Needs to be instantiated for each specific device."""
 | |
| 
 | |
|     def read(self, cached=True):
 | |
|         assert hasattr(self, "_value")
 | |
|         assert hasattr(self, "_device")
 | |
|         if logger.isEnabledFor(logging.DEBUG):
 | |
|             logger.debug("%s: settings read %r from %s", self.name, self._value, self._device)
 | |
| 
 | |
|         self._pre_read(cached)
 | |
| 
 | |
|         if cached and self._value is not None:
 | |
|             return self._value
 | |
| 
 | |
|         if self._device.online:
 | |
|             reply_map = {}
 | |
|             reply = self._do_read()
 | |
|             if reply:
 | |
|                 reply_map = self._validator.validate_read(reply)
 | |
|             self._value = reply_map
 | |
|             if getattr(self._device, "persister", None) and self.name not in self._device.persister:
 | |
|                 # Don't update the persister if it already has a value,
 | |
|                 # otherwise the first read might overwrite the value we wanted.
 | |
|                 self._device.persister[self.name] = self._value if self.persist else None
 | |
|             return self._value
 | |
| 
 | |
|     def _do_read(self):
 | |
|         return self._rw.read(self._device)
 | |
| 
 | |
|     def read_key(self, key, cached=True):
 | |
|         assert hasattr(self, "_value")
 | |
|         assert hasattr(self, "_device")
 | |
|         assert key is not None
 | |
|         if logger.isEnabledFor(logging.DEBUG):
 | |
|             logger.debug("%s: settings read %r key %r from %s", self.name, self._value, key, self._device)
 | |
| 
 | |
|         self._pre_read(cached)
 | |
| 
 | |
|         if cached and self._value is not None:
 | |
|             return self._value[int(key)]
 | |
| 
 | |
|         if self._device.online:
 | |
|             reply = self._do_read_key(key)
 | |
|             if reply:
 | |
|                 self._value = self._validator.validate_read(reply)
 | |
|             if getattr(self._device, "persister", None) and self.name not in self._device.persister:
 | |
|                 self._device.persister[self.name] = self._value if self.persist else None
 | |
|             return self._value[int(key)]
 | |
| 
 | |
|     def _do_read_key(self, key):
 | |
|         return self._rw.read(self._device, key)
 | |
| 
 | |
|     def write(self, map, save=True):
 | |
|         assert hasattr(self, "_value")
 | |
|         assert hasattr(self, "_device")
 | |
|         assert map is not None
 | |
| 
 | |
|         if logger.isEnabledFor(logging.DEBUG):
 | |
|             logger.debug("%s: bit field settings write %r to %s", self.name, map, self._device)
 | |
|         if self._device.online:
 | |
|             self.update(map, save)
 | |
|             data_bytes = self._validator.prepare_write(self._value)
 | |
|             if data_bytes is not None:
 | |
|                 if logger.isEnabledFor(logging.DEBUG):
 | |
|                     logger.debug("%s: settings prepare map write(%s) => %r", self.name, self._value, data_bytes)
 | |
|                 # if prepare_write returns a list, write one item at a time
 | |
|                 seq = data_bytes if isinstance(data_bytes, list) else [data_bytes]
 | |
|                 for b in seq:
 | |
|                     reply = self._rw.write(self._device, b)
 | |
|                     if not reply:
 | |
|                         return None
 | |
|             return map
 | |
| 
 | |
|     def update_key_value(self, key, value, save=True):
 | |
|         self._value[int(key)] = value
 | |
|         self._pre_write(save)
 | |
| 
 | |
|     def write_key_value(self, key, value, save=True):
 | |
|         assert hasattr(self, "_value")
 | |
|         assert hasattr(self, "_device")
 | |
|         assert key is not None
 | |
|         assert value is not None
 | |
| 
 | |
|         if logger.isEnabledFor(logging.DEBUG):
 | |
|             logger.debug("%s: bit field settings write key %r value %r to %s", self.name, key, value, self._device)
 | |
| 
 | |
|         if self._device.online:
 | |
|             if not self._value:
 | |
|                 self.read()
 | |
|             value = bool(value)
 | |
|             self.update_key_value(key, value, save)
 | |
| 
 | |
|             data_bytes = self._validator.prepare_write(self._value)
 | |
|             if data_bytes is not None:
 | |
|                 if logger.isEnabledFor(logging.DEBUG):
 | |
|                     logger.debug("%s: settings prepare key value write(%s,%s) => %r", self.name, key, str(value), data_bytes)
 | |
|                 # if prepare_write returns a list, write one item at a time
 | |
|                 seq = data_bytes if isinstance(data_bytes, list) else [data_bytes]
 | |
|                 for b in seq:
 | |
|                     reply = self._rw.write(self._device, b)
 | |
|                     if not reply:
 | |
|                         return None
 | |
| 
 | |
|             return value
 | |
| 
 | |
| 
 | |
| class BitFieldWithOffsetAndMaskSetting(BitFieldSetting):
 | |
|     """A setting descriptor for a set of choices represented by one bit each,
 | |
|     each one having an offset, being a map from options to booleans.
 | |
|     Needs to be instantiated for each specific device."""
 | |
| 
 | |
|     def _do_read(self):
 | |
|         return {r: self._rw.read(self._device, r) for r in self._validator.prepare_read()}
 | |
| 
 | |
|     def _do_read_key(self, key):
 | |
|         r = self._validator.prepare_read_key(key)
 | |
|         return {r: self._rw.read(self._device, r)}
 | |
| 
 | |
| 
 | |
| class RangeFieldSetting(Setting):
 | |
|     """A setting descriptor for a set of choices represented by one field each, with map from option names to range(0,n).
 | |
|     Needs to be instantiated for each specific device."""
 | |
| 
 | |
|     def read(self, cached=True):
 | |
|         assert hasattr(self, "_value")
 | |
|         assert hasattr(self, "_device")
 | |
|         if logger.isEnabledFor(logging.DEBUG):
 | |
|             logger.debug("%s: settings read %r from %s", self.name, self._value, self._device)
 | |
|         self._pre_read(cached)
 | |
|         if cached and self._value is not None:
 | |
|             return self._value
 | |
|         if self._device.online:
 | |
|             reply_map = {}
 | |
|             reply = self._do_read()
 | |
|             if reply:
 | |
|                 reply_map = self._validator.validate_read(reply)
 | |
|             self._value = reply_map
 | |
|             if getattr(self._device, "persister", None) and self.name not in self._device.persister:
 | |
|                 # Don't update the persister if it already has a value,
 | |
|                 # otherwise the first read might overwrite the value we wanted.
 | |
|                 self._device.persister[self.name] = self._value if self.persist else None
 | |
|             return self._value
 | |
| 
 | |
|     def _do_read(self):
 | |
|         return self._rw.read(self._device)
 | |
| 
 | |
|     def read_key(self, key, cached=True):
 | |
|         return self.read(cached)[int(key)]
 | |
| 
 | |
|     def write(self, map, save=True):
 | |
|         assert hasattr(self, "_value")
 | |
|         assert hasattr(self, "_device")
 | |
|         assert map is not None
 | |
|         if logger.isEnabledFor(logging.DEBUG):
 | |
|             logger.debug("%s: range field setting write %r to %s", self.name, map, self._device)
 | |
|         if self._device.online:
 | |
|             self.update(map, save)
 | |
|             data_bytes = self._validator.prepare_write(self._value)
 | |
|             if data_bytes is not None:
 | |
|                 if logger.isEnabledFor(logging.DEBUG):
 | |
|                     logger.debug("%s: range field setting prepare map write(%s) => %r", self.name, self._value, data_bytes)
 | |
|                 reply = self._rw.write(self._device, data_bytes)
 | |
|                 if not reply:
 | |
|                     return None
 | |
|             elif logger.isEnabledFor(logging.WARNING):
 | |
|                 logger.warning("%s: range field setting no data to write", self.name)
 | |
|             return map
 | |
| 
 | |
|     def write_key_value(self, key, value, save=True):
 | |
|         assert key is not None
 | |
|         assert value is not None
 | |
|         if logger.isEnabledFor(logging.DEBUG):
 | |
|             logger.debug("%s: range field setting write key %r value %r to %s", self.name, key, value, self._device)
 | |
|         if self._device.online:
 | |
|             if not self._value:
 | |
|                 self.read()
 | |
|             map = self._value
 | |
|             map[int(key)] = value
 | |
|             self.write(map, save)
 | |
|             return value
 | |
| 
 | |
| 
 | |
| #
 | |
| # read/write low-level operators
 | |
| #
 | |
| 
 | |
| 
 | |
| class RegisterRW:
 | |
|     __slots__ = ("register",)
 | |
| 
 | |
|     kind = NamedInt(0x01, _("register"))
 | |
| 
 | |
|     def __init__(self, register: int):
 | |
|         assert isinstance(register, int)
 | |
|         self.register = register
 | |
| 
 | |
|     def read(self, device):
 | |
|         return device.read_register(self.register)
 | |
| 
 | |
|     def write(self, device, data_bytes):
 | |
|         return device.write_register(self.register, data_bytes)
 | |
| 
 | |
| 
 | |
| class FeatureRW:
 | |
|     kind = NamedInt(0x02, _("feature"))
 | |
|     default_read_fnid = 0x00
 | |
|     default_write_fnid = 0x10
 | |
| 
 | |
|     def __init__(
 | |
|         self,
 | |
|         feature: hidpp20_constants.SupportedFeature,
 | |
|         read_fnid=0x00,
 | |
|         write_fnid=0x10,
 | |
|         prefix=b"",
 | |
|         suffix=b"",
 | |
|         read_prefix=b"",
 | |
|         no_reply=False,
 | |
|     ):
 | |
|         assert isinstance(feature, hidpp20_constants.SupportedFeature)
 | |
|         self.feature = feature
 | |
|         self.read_fnid = read_fnid
 | |
|         self.write_fnid = write_fnid
 | |
|         self.no_reply = no_reply
 | |
|         self.prefix = prefix
 | |
|         self.suffix = suffix
 | |
|         self.read_prefix = read_prefix
 | |
| 
 | |
|     def read(self, device, data_bytes=b""):
 | |
|         assert self.feature is not None
 | |
|         return device.feature_request(self.feature, self.read_fnid, self.prefix, self.read_prefix, data_bytes)
 | |
| 
 | |
|     def write(self, device, data_bytes):
 | |
|         assert self.feature is not None
 | |
|         write_bytes = self.prefix + (data_bytes.to_bytes(1) if isinstance(data_bytes, int) else data_bytes) + self.suffix
 | |
|         reply = device.feature_request(self.feature, self.write_fnid, write_bytes, no_reply=self.no_reply)
 | |
|         return reply if not self.no_reply else True
 | |
| 
 | |
| 
 | |
| class FeatureRWMap(FeatureRW):
 | |
|     kind = NamedInt(0x02, _("feature"))
 | |
|     default_read_fnid = 0x00
 | |
|     default_write_fnid = 0x10
 | |
|     default_key_byte_count = 1
 | |
| 
 | |
|     def __init__(
 | |
|         self,
 | |
|         feature: hidpp20_constants.SupportedFeature,
 | |
|         read_fnid=default_read_fnid,
 | |
|         write_fnid=default_write_fnid,
 | |
|         key_byte_count=default_key_byte_count,
 | |
|         no_reply=False,
 | |
|     ):
 | |
|         assert isinstance(feature, hidpp20_constants.SupportedFeature)
 | |
|         self.feature = feature
 | |
|         self.read_fnid = read_fnid
 | |
|         self.write_fnid = write_fnid
 | |
|         self.key_byte_count = key_byte_count
 | |
|         self.no_reply = no_reply
 | |
| 
 | |
|     def read(self, device, key):
 | |
|         assert self.feature is not None
 | |
|         key_bytes = common.int2bytes(key, self.key_byte_count)
 | |
|         return device.feature_request(self.feature, self.read_fnid, key_bytes)
 | |
| 
 | |
|     def write(self, device, key, data_bytes):
 | |
|         assert self.feature is not None
 | |
|         key_bytes = common.int2bytes(key, self.key_byte_count)
 | |
|         reply = device.feature_request(self.feature, self.write_fnid, key_bytes, data_bytes, no_reply=self.no_reply)
 | |
|         return reply if not self.no_reply else True
 | |
| 
 | |
| 
 | |
| class ActionSettingRW:
 | |
|     """Special RW class for settings that turn on and off special processing when a key or button is depressed"""
 | |
| 
 | |
|     def __init__(self, feature, name="", divert_setting_name="divert-keys"):
 | |
|         self.feature = feature  # not used?
 | |
|         self.name = name
 | |
|         self.divert_setting_name = divert_setting_name
 | |
|         self.kind = FeatureRW.kind  # pretend to be FeatureRW as required for HID++ 2.0 devices
 | |
|         self.device = None
 | |
|         self.key = None
 | |
|         self.active = False
 | |
|         self.pressed = False
 | |
| 
 | |
|     def activate_action(self):  # action to take when setting is activated (write non-false)
 | |
|         pass
 | |
| 
 | |
|     def deactivate_action(self):  # action to take when setting is deactivated (write false)
 | |
|         pass
 | |
| 
 | |
|     def press_action(self):  # action to take when key is pressed
 | |
|         pass
 | |
| 
 | |
|     def release_action(self):  # action to take when key is released
 | |
|         pass
 | |
| 
 | |
|     def move_action(self, dx, dy):  # action to take when mouse is moved while key is down
 | |
|         pass
 | |
| 
 | |
|     def key_action(self, key):  # acction to take when some other diverted key is pressed
 | |
|         pass
 | |
| 
 | |
|     def read(self, device):  # need to return bytes, as if read from device
 | |
|         return common.int2bytes(self.key.key, 2) if self.active and self.key else b"\x00\x00"
 | |
| 
 | |
|     def write(self, device, data_bytes):
 | |
|         def handler(device, n):  # Called on notification events from the device
 | |
|             if (
 | |
|                 n.sub_id < 0x40
 | |
|                 and device.features.get_feature(n.sub_id) == hidpp20_constants.SupportedFeature.REPROG_CONTROLS_V4
 | |
|             ):
 | |
|                 if n.address == 0x00:
 | |
|                     cids = struct.unpack("!HHHH", n.data[:8])
 | |
|                     if not self.pressed and int(self.key.key) in cids:  # trigger key pressed
 | |
|                         self.pressed = True
 | |
|                         self.press_action()
 | |
|                     elif self.pressed:
 | |
|                         if int(self.key.key) not in cids:  # trigger key released
 | |
|                             self.pressed = False
 | |
|                             self.release_action()
 | |
|                         else:
 | |
|                             for key in cids:
 | |
|                                 if key and not key == self.key.key:  # some other diverted key pressed
 | |
|                                     self.key_action(key)
 | |
|                 elif n.address == 0x10:
 | |
|                     if self.pressed:
 | |
|                         dx, dy = struct.unpack("!hh", n.data[:4])
 | |
|                         self.move_action(dx, dy)
 | |
| 
 | |
|         divertSetting = next(filter(lambda s: s.name == self.divert_setting_name, device.settings), None)
 | |
|         if divertSetting is None:
 | |
|             logger.warning("setting %s not found on %s", self.divert_setting_name, device.name)
 | |
|             return None
 | |
|         self.device = device
 | |
|         key = common.bytes2int(data_bytes)
 | |
|         if key:  # Enable
 | |
|             self.key = next((k for k in device.keys if k.key == key), None)
 | |
|             if self.key:
 | |
|                 self.active = True
 | |
|                 if divertSetting:
 | |
|                     divertSetting.write_key_value(int(self.key.key), 1)
 | |
|                     if self.device.setting_callback:
 | |
|                         self.device.setting_callback(device, type(divertSetting), [self.key.key, 1])
 | |
|                 device.add_notification_handler(self.name, handler)
 | |
|                 self.activate_action()
 | |
|             else:
 | |
|                 logger.error("cannot enable %s on %s for key %s", self.name, device, key)
 | |
|         else:  # Disable
 | |
|             if self.active:
 | |
|                 self.active = False
 | |
|                 if divertSetting:
 | |
|                     divertSetting.write_key_value(int(self.key.key), 0)
 | |
|                     if self.device.setting_callback:
 | |
|                         self.device.setting_callback(device, type(divertSetting), [self.key.key, 0])
 | |
|                 try:
 | |
|                     device.remove_notification_handler(self.name)
 | |
|                 except Exception:
 | |
|                     if logger.isEnabledFor(logging.WARNING):
 | |
|                         logger.warning("cannot disable %s on %s", self.name, device)
 | |
|                 self.deactivate_action()
 | |
|         return data_bytes
 | |
| 
 | |
| 
 | |
| class RawXYProcessing:
 | |
|     """Special class for processing RawXY action messages initiated by pressing a key with rawXY diversion capability"""
 | |
| 
 | |
|     def __init__(self, device, name=""):
 | |
|         self.device = device
 | |
|         self.name = name
 | |
|         self.keys = []  # the keys that can initiate processing
 | |
|         self.initiating_key = None  # the key that did initiate processing
 | |
|         self.active = False
 | |
|         self.feature_offset = device.features[hidpp20_constants.SupportedFeature.REPROG_CONTROLS_V4]
 | |
|         assert self.feature_offset is not False
 | |
| 
 | |
|     def handler(self, device, n):  # Called on notification events from the device
 | |
|         if n.sub_id < 0x40 and device.features.get_feature(n.sub_id) == hidpp20_constants.SupportedFeature.REPROG_CONTROLS_V4:
 | |
|             if n.address == 0x00:
 | |
|                 cids = struct.unpack("!HHHH", n.data[:8])
 | |
|                 ## generalize to list of keys
 | |
|                 if not self.initiating_key:  # no initiating key pressed
 | |
|                     for k in self.keys:
 | |
|                         if int(k.key) in cids:  # initiating key that was pressed
 | |
|                             self.initiating_key = k
 | |
|                     if self.initiating_key:
 | |
|                         self.press_action(self.initiating_key)
 | |
|                 else:
 | |
|                     if int(self.initiating_key.key) not in cids:  # initiating key released
 | |
|                         self.initiating_key = None
 | |
|                         self.release_action()
 | |
|                     else:
 | |
|                         for key in cids:
 | |
|                             if key and key != self.initiating_key.key:
 | |
|                                 self.key_action(key)
 | |
|             elif n.address == 0x10:
 | |
|                 if self.initiating_key:
 | |
|                     dx, dy = struct.unpack("!hh", n.data[:4])
 | |
|                     self.move_action(dx, dy)
 | |
| 
 | |
|     def start(self, key):
 | |
|         device_key = next((k for k in self.device.keys if k.key == key), None)
 | |
|         if device_key:
 | |
|             self.keys.append(device_key)
 | |
|             if not self.active:
 | |
|                 self.active = True
 | |
|                 self.activate_action()
 | |
|                 self.device.add_notification_handler(self.name, self.handler)
 | |
|             device_key.set_rawXY_reporting(True)
 | |
| 
 | |
|     def stop(self, key):  # only stop if this is the active key
 | |
|         if self.active:
 | |
|             processing_key = next((k for k in self.keys if k.key == key), None)
 | |
|             if processing_key:
 | |
|                 processing_key.set_rawXY_reporting(False)
 | |
|                 self.keys.remove(processing_key)
 | |
|             if not self.keys:
 | |
|                 try:
 | |
|                     self.device.remove_notification_handler(self.name)
 | |
|                 except Exception:
 | |
|                     if logger.isEnabledFor(logging.WARNING):
 | |
|                         logger.warning("cannot disable %s on %s", self.name, self.device)
 | |
|                 self.deactivate_action()
 | |
|                 self.active = False
 | |
| 
 | |
|     def activate_action(self):  # action to take when processing is activated
 | |
|         pass
 | |
| 
 | |
|     def deactivate_action(self):  # action to take when processing is deactivated
 | |
|         pass
 | |
| 
 | |
|     def press_action(self, key):  # action to take when an initiating key is pressed
 | |
|         pass
 | |
| 
 | |
|     def release_action(self):  # action to take when key is released
 | |
|         pass
 | |
| 
 | |
|     def move_action(self, dx, dy):  # action to take when mouse is moved while key is down
 | |
|         pass
 | |
| 
 | |
|     def key_action(self, key):  # acction to take when some other diverted key is pressed
 | |
|         pass
 | |
| 
 | |
| 
 | |
| def apply_all_settings(device):
 | |
|     if device.features and hidpp20_constants.SupportedFeature.HIRES_WHEEL in device.features:
 | |
|         time.sleep(0.2)  # delay to try to get out of race condition with Linux HID++ driver
 | |
|     persister = getattr(device, "persister", None)
 | |
|     sensitives = persister.get("_sensitive", {}) if persister else {}
 | |
|     for s in device.settings:
 | |
|         ignore = sensitives.get(s.name, False)
 | |
|         if ignore != SENSITIVITY_IGNORE:
 | |
|             s.apply()
 | |
| 
 | |
| 
 | |
| Setting.validator_class = settings_validator.BooleanValidator
 |