solaar: use list for configurations and write in yaml

This commit is contained in:
Peter F. Patel-Schneider 2022-04-01 16:39:31 -04:00
parent b47cfbf024
commit e5b11ca2f9
1 changed files with 99 additions and 93 deletions

View File

@ -16,15 +16,17 @@
## with this program; if not, write to the Free Software Foundation, Inc., ## with this program; if not, write to the Free Software Foundation, Inc.,
## 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. ## 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
import json as _json
import os as _os import os as _os
import os.path as _path import os.path as _path
from json import dump as _json_save
from json import load as _json_load
from logging import DEBUG as _DEBUG from logging import DEBUG as _DEBUG
from logging import INFO as _INFO from logging import INFO as _INFO
from logging import getLogger from logging import getLogger
import yaml as _yaml
from logitech_receiver.common import NamedInt as _NamedInt
from solaar import __version__ from solaar import __version__
_log = getLogger(__name__) _log = getLogger(__name__)
@ -32,43 +34,45 @@ del getLogger
_XDG_CONFIG_HOME = _os.environ.get('XDG_CONFIG_HOME') or _path.expanduser(_path.join('~', '.config')) _XDG_CONFIG_HOME = _os.environ.get('XDG_CONFIG_HOME') or _path.expanduser(_path.join('~', '.config'))
_file_path = _path.join(_XDG_CONFIG_HOME, 'solaar', 'config.json') _file_path = _path.join(_XDG_CONFIG_HOME, 'solaar', 'config.json')
_yaml_file_path = _path.join(_XDG_CONFIG_HOME, 'solaar', 'config.yaml')
_KEY_VERSION = '_version' _KEY_VERSION = '_version'
_KEY_NAME = '_name' _KEY_NAME = '_NAME'
_KEY_WPID = '_wpid'
_KEY_SERIAL = '_serial' _KEY_SERIAL = '_serial'
_KEY_MODEL_ID = '_modelId' _KEY_MODEL_ID = '_modelId'
_KEY_UNIT_ID = '_unitId' _KEY_UNIT_ID = '_unitId'
_configuration = {} _KEY_ABSENT = '_absent'
_KEY_SENSITIVE = '_sensitive'
_config = []
def _load(): def _load():
if _path.isfile(_file_path): global _config
loaded_configuration = {} loaded_config = []
if _path.isfile(_yaml_file_path):
try:
with open(_yaml_file_path) as config_file:
loaded_config = _yaml.safe_load(config_file)
except Exception as e:
_log.error('failed to load from %s: %s', _yaml_file_path, e)
elif _path.isfile(_file_path):
try: try:
with open(_file_path) as config_file: with open(_file_path) as config_file:
loaded_configuration = _json_load(config_file) loaded_config = _json.load(config_file)
except Exception: loaded_config = _convert_json(loaded_config)
_log.error('failed to load from %s', _file_path) except Exception as e:
_log.error('failed to load from %s: %s', _file_path, e)
# loaded_configuration.update(_configuration)
_configuration.clear()
_configuration.update(loaded_configuration)
if _log.isEnabledFor(_DEBUG): if _log.isEnabledFor(_DEBUG):
_log.debug('load => %s', _configuration) _log.debug('load => %s', loaded_config)
_config = _cleanup_load(loaded_config)
_cleanup(_configuration)
_cleanup_load(_configuration)
_configuration[_KEY_VERSION] = __version__
return _configuration
def save(): def save():
# don't save if the configuration hasn't been loaded if not _config:
if _KEY_VERSION not in _configuration:
return return
dirname = _os.path.dirname(_yaml_file_path)
dirname = _os.path.dirname(_file_path)
if not _path.isdir(dirname): if not _path.isdir(dirname):
try: try:
_os.makedirs(dirname) _os.makedirs(dirname)
@ -76,108 +80,110 @@ def save():
_log.error('failed to create %s', dirname) _log.error('failed to create %s', dirname)
return False return False
_cleanup(_configuration)
try: try:
with open(_file_path, 'w') as config_file: with open(_yaml_file_path, 'w') as config_file:
_json_save(_configuration, config_file, skipkeys=True, indent=2, sort_keys=True) _yaml.dump(_config, config_file)
if _log.isEnabledFor(_INFO): if _log.isEnabledFor(_INFO):
_log.info('saved %s to %s', _configuration, _file_path) _log.info('saved %s to %s', _config, _yaml_file_path)
return True return True
except Exception as e: except Exception as e:
_log.error('failed to save to %s: %s', _file_path, e) _log.error('failed to save to %s: %s', _yaml_file_path, e)
def _cleanup(d): def _convert_json(json_dict):
# remove None values from the dict config = [json_dict.get(_KEY_VERSION)]
for key in list(d.keys()): for key, dev in json_dict.items():
value = d.get(key) key = key.split(':')
if value is None: if len(key) == 2:
del d[key] dev[_KEY_WPID] = dev.get(_KEY_WPID) if dev.get(_KEY_WPID) else key[0]
elif isinstance(value, dict): dev[_KEY_SERIAL] = dev.get(_KEY_SERIAL) if dev.get(_KEY_SERIAL) else key[1]
_cleanup(value) config.append(dev)
return config
def _cleanup_load(d): def _cleanup_load(c):
# remove boolean values for mouse-gesture and dpi-sliding _config = [__version__]
for device in d.values(): for element in c:
if isinstance(device, dict): if isinstance(element, dict):
# remove boolean values for mouse-gesture and dpi-sliding
for setting in ['mouse-gestures', 'dpi-sliding']: for setting in ['mouse-gestures', 'dpi-sliding']:
mg = device.get(setting, None) mg = element.get(setting, None)
if mg is True or mg is False: if mg is True or mg is False:
del device[setting] del element[setting]
# convert to device entries
element = _DeviceEntry(**element)
_config.append(element)
return _config
class _DeviceEntry(dict): class _DeviceEntry(dict):
def __init__(self, device, **kwargs): def __init__(self, **kwargs):
super().__init__(**kwargs) super().__init__(**kwargs)
if self.get(_KEY_NAME) != device.name:
self[_KEY_NAME] = device.name
self.update(device)
def __setitem__(self, key, value): def __setitem__(self, key, value):
super().__setitem__(key, value) super().__setitem__(key, value)
save() save()
def update(self, device): def update(self, device):
if device.modelId and device.modelId != self.get(_KEY_MODEL_ID): if device.name and device.name != self.get(_KEY_NAME):
self[_KEY_MODEL_ID] = device.modelId super().__setitem__(_KEY_NAME, device.name)
if device.unitId and device.unitId != self.get(_KEY_UNIT_ID): if device.wpid and device.wpid != self.get(_KEY_WPID):
self[_KEY_UNIT_ID] = device.unitId super().__setitem__(_KEY_WPID, device.wpid)
if device.serial and device.serial != '?' and device.serial != self.get(_KEY_SERIAL): if device.serial and device.serial != '?' and device.serial != self.get(_KEY_SERIAL):
self[_KEY_SERIAL] = device.serial super().__setitem__(_KEY_SERIAL, device.serial)
if device.modelId and device.modelId != self.get(_KEY_MODEL_ID):
super().__setitem__(_KEY_MODEL_ID, device.modelId)
if device.unitId and device.unitId != self.get(_KEY_UNIT_ID):
super().__setitem__(_KEY_UNIT_ID, device.unitId)
def get_sensitivity(self, name): def get_sensitivity(self, name):
return self.get('_sensitive', {}).get(name, False) return self.get(_KEY_SENSITIVE, {}).get(name, False)
def set_sensitivity(self, name, value): def set_sensitivity(self, name, value):
sensitives = self.get('_sensitive', {}) sensitives = self.get(_KEY_SENSITIVE, {})
if sensitives.get(name) != value: if sensitives.get(name) != value:
sensitives[name] = value sensitives[name] = value
self.__setitem__('_sensitive', sensitives) self.__setitem__(_KEY_SENSITIVE, sensitives)
# This is neccessarily complicate because the same device can be attached in several different ways. def device_representer(dumper, data):
# All HID++ 2.0 devices have a modelId and unitId, which can be accessed when they are connected. return dumper.represent_mapping('tag:yaml.org,2002:map', data)
# When paired via a receiver the receiver provides a WPID and a serial number.
# The unitId and serial number are supposed to be the same, but for some models they are not
# so even though the modelId includes the WPID it is not always possible to determine the identity of a _yaml.add_representer(_DeviceEntry, device_representer)
# paired but not receiver-connected device for which the unitId is not known.
# This only happens is Solaar has never seen the device while it is paired and connected through a receiver.
def named_int_representer(dumper, data):
return dumper.represent_scalar('tag:yaml.org,2002:int', str(int(data)))
_yaml.add_representer(_NamedInt, named_int_representer)
# A device can be identified by a combination of WPID and serial number (for receiver-connected devices)
# or a combination of modelId and unitId (for direct-connected devices).
# The worst situation is a receiver-connected device that Solaar has never seen on-line
# that is directly connected. Here there is no way to realize that the two devices are the same.
# So new entries are not created for unseen off-line receiver-connected devices except for those with protocol 1.0
def persister(device): def persister(device):
if not _configuration: def match(wpid, serial, modelId, unitId, c):
return ((wpid and wpid == c.get(_KEY_WPID) and serial and serial == c.get(_KEY_SERIAL))
or (modelId and modelId == c.get(_KEY_MODEL_ID) and unitId and unitId == c.get(_KEY_UNIT_ID)))
if not _config:
_load() _load()
entry = None
entry = {} for c in _config:
key = None if isinstance(c, _DeviceEntry) and match(device.wpid, device.serial, device.modelId, device.unitId, c):
if device.wpid: # connected via receiver entry = c
entry = _configuration.get('%s:%s' % (device.wpid, device.serial), {}) break
if entry or device.protocol == 1.0: # found entry or create entry for old-style devices if not entry:
key = '%s:%s' % (device.wpid, device.serial) if not device.online and device.protocol > 1.0: # don't create entry for unseen offline modern devices
elif not entry and device.modelId: # online new-style device so look for modelId and unitId return
for k, c in _configuration.items(): entry = _DeviceEntry()
if isinstance(c, dict) and c.get(_KEY_MODEL_ID) == device.modelId and c.get(_KEY_UNIT_ID) == device.unitId: _config.append(entry)
entry = c # use the entry that matches modelId and unitId entry.update(device)
key = k
break
if device.wpid and entry: # move entry to wpid:serial
del _configuration[key]
key = '%s:%s' % (device.wpid, device.serial)
_configuration[key] = entry
elif device.wpid and not entry: # create now with wpid:serial
key = '%s:%s' % (device.wpid, device.serial)
elif not entry: # create now with modelId:unitId
key = '%s:%s' % (device.modelId, device.unitId)
else: # defer until more is known (i.e., device comes on line)
return
if key and not isinstance(entry, _DeviceEntry):
entry = _DeviceEntry(device, **entry)
_configuration[key] = entry
if isinstance(entry, _DeviceEntry):
entry.update(device)
return entry return entry