Solaar/lib/solaar/configuration.py

237 lines
8.6 KiB
Python

# -*- python-mode -*-
## Copyright (C) 2012-2013 Daniel Pavel
##
## This program is free software; you can redistribute it and/or modify
## it under the terms of the GNU General Public License as published by
## the Free Software Foundation; either version 2 of the License, or
## (at your option) any later version.
##
## This program is distributed in the hope that it will be useful,
## but WITHOUT ANY WARRANTY; without even the implied warranty of
## MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
## GNU General Public License for more details.
##
## You should have received a copy of the GNU General Public License along
## with this program; if not, write to the Free Software Foundation, Inc.,
## 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
import json as _json
import os as _os
import os.path as _path
from logging import DEBUG as _DEBUG
from logging import INFO as _INFO
from logging import getLogger
from threading import Lock as _Lock
from threading import Timer as _Timer
import yaml as _yaml
from gi.repository import GLib
from logitech_receiver.common import NamedInt as _NamedInt
from solaar import __version__
_log = getLogger(__name__)
del getLogger
_XDG_CONFIG_HOME = _os.environ.get('XDG_CONFIG_HOME') or _path.expanduser(_path.join('~', '.config'))
_file_path = _path.join(_XDG_CONFIG_HOME, 'solaar', 'config.json')
_yaml_file_path = _path.join(_XDG_CONFIG_HOME, 'solaar', 'config.yaml')
_KEY_VERSION = '_version'
_KEY_NAME = '_NAME'
_KEY_WPID = '_wpid'
_KEY_SERIAL = '_serial'
_KEY_MODEL_ID = '_modelId'
_KEY_UNIT_ID = '_unitId'
_KEY_ABSENT = '_absent'
_KEY_SENSITIVE = '_sensitive'
_config = []
def _load():
global _config
loaded_config = []
if _path.isfile(_yaml_file_path):
try:
with open(_yaml_file_path) as config_file:
loaded_config = _yaml.safe_load(config_file)
except Exception as e:
_log.error('failed to load from %s: %s', _yaml_file_path, e)
elif _path.isfile(_file_path):
try:
with open(_file_path) as config_file:
loaded_config = _json.load(config_file)
except Exception as e:
_log.error('failed to load from %s: %s', _file_path, e)
loaded_config = _convert_json(loaded_config)
if _log.isEnabledFor(_DEBUG):
_log.debug('load => %s', loaded_config)
_config = _cleanup_load(loaded_config)
save_timer = None
save_lock = _Lock()
def save(defer=False):
global save_timer
if not _config:
return
dirname = _os.path.dirname(_yaml_file_path)
if not _path.isdir(dirname):
try:
_os.makedirs(dirname)
except Exception:
_log.error('failed to create %s', dirname)
return
if not defer:
do_save()
else:
with save_lock:
if not save_timer:
save_timer = _Timer(5.0, lambda: GLib.idle_add(do_save))
save_timer.start()
def do_save():
global save_timer
with save_lock:
if save_timer:
save_timer.cancel()
save_timer = None
try:
with open(_yaml_file_path, 'w') as config_file:
_yaml.dump(_config, config_file, default_flow_style=None, width=150)
if _log.isEnabledFor(_INFO):
_log.info('saved %s to %s', _config, _yaml_file_path)
except Exception as e:
_log.error('failed to save to %s: %s', _yaml_file_path, e)
def _convert_json(json_dict):
config = [json_dict.get(_KEY_VERSION)]
for key, dev in json_dict.items():
key = key.split(':')
if len(key) == 2:
dev[_KEY_WPID] = dev.get(_KEY_WPID) if dev.get(_KEY_WPID) else key[0]
dev[_KEY_SERIAL] = dev.get(_KEY_SERIAL) if dev.get(_KEY_SERIAL) else key[1]
for k, v in dev.items():
if type(k) == str and not k.startswith('_') and type(v) == dict: # convert string keys to ints
v = {int(dk) if type(dk) == str else dk: dv for dk, dv in v.items()}
dev[k] = v
for k in ['mouse-gestures', 'dpi-sliding']:
v = dev.get(k, None)
if v is True or v is False:
dev.pop(k)
if '_name' in dev:
dev[_KEY_NAME] = dev['_name']
dev.pop('_name')
config.append(dev)
return config
def _cleanup_load(c):
_config = [__version__]
for element in c:
if isinstance(element, dict):
divert = element.get('divert-keys')
if divert:
sliding = element.get('dpi-sliding')
if sliding: # convert old-style dpi-sliding setting to divert-keys entry
divert[int(sliding)] = 3
element.pop('dpi-sliding', None)
gestures = element.get('mouse-gestures')
if gestures: # convert old-style mouse-gestures setting to divert-keys entry
divert[int(gestures)] = 2
element.pop('mouse-gestures', None)
# remove any string entries (from bad conversions)
element['divert-keys'] = {k: v for k, v in divert.items() if isinstance(k, int)}
# convert to device entries
element = _DeviceEntry(**element)
_config.append(element)
return _config
class _DeviceEntry(dict):
def __init__(self, **kwargs):
super().__init__(**kwargs)
def __setitem__(self, key, value):
super().__setitem__(key, value)
save(defer=True)
def update(self, device, modelId):
if device.name and device.name != self.get(_KEY_NAME):
super().__setitem__(_KEY_NAME, device.name)
if device.wpid and device.wpid != self.get(_KEY_WPID):
super().__setitem__(_KEY_WPID, device.wpid)
if device.serial and device.serial != '?' and device.serial != self.get(_KEY_SERIAL):
super().__setitem__(_KEY_SERIAL, device.serial)
if modelId and modelId != self.get(_KEY_MODEL_ID):
super().__setitem__(_KEY_MODEL_ID, modelId)
if device.unitId and device.unitId != self.get(_KEY_UNIT_ID):
super().__setitem__(_KEY_UNIT_ID, device.unitId)
def get_sensitivity(self, name):
return self.get(_KEY_SENSITIVE, {}).get(name, False)
def set_sensitivity(self, name, value):
sensitives = self.get(_KEY_SENSITIVE, {})
if sensitives.get(name) != value:
sensitives[name] = value
self.__setitem__(_KEY_SENSITIVE, sensitives)
def device_representer(dumper, data):
return dumper.represent_mapping('tag:yaml.org,2002:map', data)
_yaml.add_representer(_DeviceEntry, device_representer)
def named_int_representer(dumper, data):
return dumper.represent_scalar('tag:yaml.org,2002:int', str(int(data)))
_yaml.add_representer(_NamedInt, named_int_representer)
# A device can be identified by a combination of WPID and serial number (for receiver-connected devices)
# or a combination of modelId and unitId (for direct-connected devices).
# But some devices have empty (all zero) modelIds and unitIds. Use the device name as a backup for the modelId.
# The worst situation is a receiver-connected device that Solaar has never seen on-line
# that is directly connected. Here there is no way to realize that the two devices are the same.
# So new entries are not created for unseen off-line receiver-connected devices except for those with protocol 1.0
def persister(device):
def match(wpid, serial, modelId, unitId, c):
return ((wpid and wpid == c.get(_KEY_WPID) and serial and serial == c.get(_KEY_SERIAL)) or (
modelId and modelId != '000000000000' and modelId == c.get(_KEY_MODEL_ID) and unitId
and unitId == c.get(_KEY_UNIT_ID)
))
if not _config:
_load()
entry = None
modelId = device.modelId if device.modelId != '000000000000' else device.name if device.modelId else None
for c in _config:
if isinstance(c, _DeviceEntry) and match(device.wpid, device.serial, modelId, device.unitId, c):
entry = c
break
if not entry:
if not device.online and not device.serial: # don't create entry for offline devices without serial number
if _log.isEnabledFor(_INFO):
_log.info('not setting up persister for offline device %s with missing serial number', device.name)
return
if _log.isEnabledFor(_INFO):
_log.info('setting up persister for device %s', device.name)
entry = _DeviceEntry()
_config.append(entry)
entry.update(device, modelId)
return entry
def attach_to(device):
pass