# -*- python-mode -*- ## Copyright (C) 2020 Peter Patel-Schneider ## ## This program is free software; you can redistribute it and/or modify ## it under the terms of the GNU General Public License as published by ## the Free Software Foundation; either version 2 of the License, or ## (at your option) any later version. ## ## This program is distributed in the hope that it will be useful, ## but WITHOUT ANY WARRANTY; without even the implied warranty of ## MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the ## GNU General Public License for more details. ## ## You should have received a copy of the GNU General Public License along ## with this program; if not, write to the Free Software Foundation, Inc., ## 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. import os as _os import os.path as _path import sys as _sys from logging import DEBUG as _DEBUG from logging import INFO as _INFO from logging import getLogger from math import sqrt as _sqrt import keysyms.keysymdef as _keysymdef import psutil from gi.repository import Gdk, GLib from solaar.ui.config_panel import change_setting as _change_setting from yaml import add_representer as _yaml_add_representer from yaml import dump_all as _yaml_dump_all from yaml import safe_load_all as _yaml_safe_load_all from .common import NamedInt from .common import unpack as _unpack from .hidpp20 import FEATURE as _F from .special_keys import CONTROL as _CONTROL _log = getLogger(__name__) del getLogger # # See docs/rules.md for documentation # # Several capabilities of rules depend on aspects of GDK, X11, or XKB # As the Solaar GUI uses GTK, Glib and GDK are always available and are obtained from gi.repository # # Process condition depends on X11 from python-xlib, and is probably not possible at all in Wayland # MouseProcess condition depends on X11 from python-xlib, and is probably not possible at all in Wayland # Modifiers condition depends only on GDK # KeyPress action currently only works in X11, and is not currently available under Wayland # KeyPress action determines whether a keysym is a currently-down modifier using get_modifier_mapping from python-xlib; # under Wayland no modifier keys are considered down so all modifier keys are pressed, potentially leading to problems # KeyPress action translates key names to keysysms using the local file described for GUI keyname determination # KeyPress action gets the current keyboard group using XkbGetState from libX11.so using ctypes definitions # under Wayland the keyboard group is None resulting in using the first keyboard group # KeyPress action translates keysyms to keycodes using the GDK keymap # KeyPress action simulates keyboard input with X11 XTest from python-xlib # MouseScroll and MouseClick actions currently only work in X11, and are not currently available under Wayland # MouseScroll and MouseClick actions simulate mouse input with X11 XTest from python-xlib # # Rule GUI keyname determination uses a local file generated # from http://cgit.freedesktop.org/xorg/proto/x11proto/plain/keysymdef.h # and http://cgit.freedesktop.org/xorg/proto/x11proto/plain/XF86keysym.h # because there does not seem to be a non-X11 file for this set of key names XK_KEYS = _keysymdef.keysymdef try: import Xlib from Xlib import X from Xlib.display import Display xdisplay = Display() modifier_keycodes = xdisplay.get_modifier_mapping() # there should be a way to do this in Gdk x11 = True NET_ACTIVE_WINDOW = xdisplay.intern_atom('_NET_ACTIVE_WINDOW') NET_WM_PID = xdisplay.intern_atom('_NET_WM_PID') WM_CLASS = xdisplay.intern_atom('WM_CLASS') # set up to get keyboard state using ctypes interface to libx11 import ctypes class X11Display(ctypes.Structure): """ opaque struct """ class X11XkbStateRec(ctypes.Structure): _fields_ = [('group', ctypes.c_ubyte), ('locked_group', ctypes.c_ubyte), ('base_group', ctypes.c_ushort), ('latched_group', ctypes.c_ushort), ('mods', ctypes.c_ubyte), ('base_mods', ctypes.c_ubyte), ('latched_mods', ctypes.c_ubyte), ('locked_mods', ctypes.c_ubyte), ('compat_state', ctypes.c_ubyte), ('grab_mods', ctypes.c_ubyte), ('compat_grab_mods', ctypes.c_ubyte), ('lookup_mods', ctypes.c_ubyte), ('compat_lookup_mods', ctypes.c_ubyte), ('ptr_buttons', ctypes.c_ushort)] # something strange is happening here but it is not being used X11Lib = ctypes.cdll.LoadLibrary('libX11.so') X11Lib.XOpenDisplay.restype = ctypes.POINTER(X11Display) X11Lib.XkbGetState.argtypes = [ctypes.POINTER(X11Display), ctypes.c_uint, ctypes.POINTER(X11XkbStateRec)] display = X11Lib.XOpenDisplay(None) except Exception: _log.warn( 'X11 not available - rules cannot access current process or keyboard group and cannot simulate input. %s', exc_info=_sys.exc_info() ) modifier_keycodes = [] x11 = False def kbdgroup(): if x11: state = X11XkbStateRec() X11Lib.XkbGetState(display, 0x100, ctypes.pointer(state)) # 0x100 is core device FIXME return state.group else: return None def modifier_code(keycode): if keycode == 0: return None for m in range(0, len(modifier_keycodes)): if keycode in modifier_keycodes[m]: return m gdisplay = Gdk.Display.get_default() gkeymap = Gdk.Keymap.get_for_display(gdisplay) key_down = None key_up = None def signed(bytes): return int.from_bytes(bytes, 'big', signed=True) def xy_direction(_x, _y): # normalize x and y m = _sqrt((_x * _x) + (_y * _y)) if m == 0: return 'noop' x = round(_x / m) y = round(_y / m) if x < 0 and y < 0: return 'Mouse Up-left' elif x > 0 and y < 0: return 'Mouse Up-right' elif x < 0 and y > 0: return 'Mouse Down-left' elif x > 0 and y > 0: return 'Mouse Down-right' elif x > 0: return 'Mouse Right' elif x < 0: return 'Mouse Left' elif y > 0: return 'Mouse Down' elif y < 0: return 'Mouse Up' else: return 'noop' TESTS = { 'crown_right': lambda f, r, d: f == _F.CROWN and r == 0 and d[1] < 128 and d[1], 'crown_left': lambda f, r, d: f == _F.CROWN and r == 0 and d[1] >= 128 and 256 - d[1], 'crown_right_ratchet': lambda f, r, d: f == _F.CROWN and r == 0 and d[2] < 128 and d[2], 'crown_left_ratchet': lambda f, r, d: f == _F.CROWN and r == 0 and d[2] >= 128 and 256 - d[2], 'crown_tap': lambda f, r, d: f == _F.CROWN and r == 0 and d[5] == 0x01 and d[5], 'crown_start_press': lambda f, r, d: f == _F.CROWN and r == 0 and d[6] == 0x01 and d[6], 'crown_end_press': lambda f, r, d: f == _F.CROWN and r == 0 and d[6] == 0x05 and d[6], 'crown_pressed': lambda f, r, d: f == _F.CROWN and r == 0 and d[6] >= 0x01 and d[6] <= 0x04 and d[6], 'thumb_wheel_up': lambda f, r, d: f == _F.THUMB_WHEEL and r == 0 and signed(d[0:2]) < 0 and signed(d[0:2]), 'thumb_wheel_down': lambda f, r, d: f == _F.THUMB_WHEEL and r == 0 and signed(d[0:2]) > 0 and signed(d[0:2]), 'lowres_wheel_up': lambda f, r, d: f == _F.LOWRES_WHEEL and r == 0 and signed(d[0:1]) > 0 and signed(d[0:1]), 'lowres_wheel_down': lambda f, r, d: f == _F.LOWRES_WHEEL and r == 0 and signed(d[0:1]) < 0 and signed(d[0:1]), 'hires_wheel_up': lambda f, r, d: f == _F.HIRES_WHEEL and r == 0 and signed(d[1:3]) > 0 and signed(d[1:3]), 'hires_wheel_down': lambda f, r, d: f == _F.HIRES_WHEEL and r == 0 and signed(d[1:3]) < 0 and signed(d[1:3]), 'False': lambda f, r, d: False, 'True': lambda f, r, d: True, } MOUSE_GESTURE_TESTS = { 'mouse-down': ['Mouse Down'], 'mouse-up': ['Mouse Up'], 'mouse-left': ['Mouse Left'], 'mouse-right': ['Mouse Right'], 'mouse-noop': [], } COMPONENTS = {} if x11: displayt = Display() else: displayt = None class RuleComponent: def compile(self, c): if isinstance(c, RuleComponent): return c elif isinstance(c, dict) and len(c) == 1: k, v = next(iter(c.items())) if k in COMPONENTS: return COMPONENTS[k](v) _log.warn('illegal component in rule: %s', c) return Condition() class Rule(RuleComponent): def __init__(self, args, source=None): self.components = [self.compile(a) for a in args] self.source = source def __str__(self): source = '(' + self.source + ')' if self.source else '' return 'Rule%s[%s]' % (source, ', '.join([c.__str__() for c in self.components])) def evaluate(self, feature, notification, device, status, last_result): result = True for component in self.components: result = component.evaluate(feature, notification, device, status, result) if not isinstance(component, Action) and result is None: return None if isinstance(component, Condition) and not result: return result return result def data(self): return {'Rule': [c.data() for c in self.components]} class Condition(RuleComponent): def __init__(self, *args): pass def __str__(self): return 'CONDITION' def evaluate(self, feature, notification, device, status, last_result): return False class Not(Condition): def __init__(self, op): if isinstance(op, list) and len(op) == 1: op = op[0] self.op = op self.component = self.compile(op) def __str__(self): return 'Not: ' + str(self.component) def evaluate(self, feature, notification, device, status, last_result): result = self.component.evaluate(feature, notification, device, status, last_result) return None if result is None else not result def data(self): return {'Not': self.component.data()} class Or(Condition): def __init__(self, args): self.components = [self.compile(a) for a in args] def __str__(self): return 'Or: [' + ', '.join(str(c) for c in self.components) + ']' def evaluate(self, feature, notification, device, status, last_result): result = False for component in self.components: result = component.evaluate(feature, notification, device, status, last_result) if not isinstance(component, Action) and result is None: return None if isinstance(component, Condition) and result: return result return result def data(self): return {'Or': [c.data() for c in self.components]} class And(Condition): def __init__(self, args): self.components = [self.compile(a) for a in args] def __str__(self): return 'And: [' + ', '.join(str(c) for c in self.components) + ']' def evaluate(self, feature, notification, device, status, last_result): result = True for component in self.components: result = component.evaluate(feature, notification, device, status, last_result) if not isinstance(component, Action) and result is None: return None if isinstance(component, Condition) and not result: return result return result def data(self): return {'And': [c.data() for c in self.components]} def x11_focus_prog(): pid = wm_class = None window = xdisplay.get_input_focus().focus while window: pid = window.get_full_property(NET_WM_PID, 0) wm_class = window.get_wm_class() if wm_class and pid: break window = window.query_tree().parent try: name = psutil.Process(pid.value[0]).name() if pid else '' except Exception: name = '' return (wm_class[0], wm_class[1], name) if wm_class else (name, ) def x11_pointer_prog(): pid = wm_class = None window = xdisplay.screen().root.query_pointer().child for window in reversed(window.query_tree().children): pid = window.get_full_property(NET_WM_PID, 0) wm_class = window.get_wm_class() if wm_class: break window = window.query_tree().parent name = psutil.Process(pid.value[0]).name() if pid else '' return (wm_class[0], wm_class[1], name) if wm_class else (name, ) class Process(Condition): def __init__(self, process): self.process = process if not x11: _log.warn('X11 not available - rules cannot access current process - %s', self) if not isinstance(process, str): _log.warn('rule Process argument not a string: %s', process) self.process = str(process) def __str__(self): return 'Process: ' + str(self.process) def evaluate(self, feature, notification, device, status, last_result): if not isinstance(self.process, str): return False focus = x11_focus_prog() if x11 else None result = any(bool(s and s.startswith(self.process)) for s in focus) if focus else None return result def data(self): return {'Process': str(self.process)} class MouseProcess(Condition): def __init__(self, process): self.process = process if not x11: _log.warn('X11 not available - rules cannot access current mouse process - %s', self) if not isinstance(process, str): _log.warn('rule MouseProcess argument not a string: %s', process) self.process = str(process) def __str__(self): return 'MouseProcess: ' + str(self.process) def evaluate(self, feature, notification, device, status, last_result): if not isinstance(self.process, str): return False result = any(bool(s and s.startswith(self.process)) for s in x11_pointer_prog()) if x11 else None return result def data(self): return {'MouseProcess': str(self.process)} class Feature(Condition): def __init__(self, feature): if not (isinstance(feature, str) and feature in _F): _log.warn('rule Feature argument not name of a feature: %s', feature) self.feature = None self.feature = _F[feature] def __str__(self): return 'Feature: ' + str(self.feature) def evaluate(self, feature, notification, device, status, last_result): return feature == self.feature def data(self): return {'Feature': str(self.feature)} class Report(Condition): def __init__(self, report): if not (isinstance(report, int)): _log.warn('rule Report argument not an integer: %s', report) self.report = -1 self.report = report def __str__(self): return 'Report: ' + str(self.report) def evaluate(self, report, notification, device, status, last_result): return (notification.address >> 4) == self.report def data(self): return {'Report': self.report} # Setting(device, setting, [key], value...) class Setting(Condition): def __init__(self, args): if not (isinstance(args, list) and len(args) > 2): _log.warn('rule Setting argument not list with minimum length 3: %s', args) self.args = [] else: self.args = args def __str__(self): return 'Setting: ' + ' '.join([str(a) for a in self.args]) def evaluate(self, report, notification, device, status, last_result): import solaar.ui.window as _window if len(self.args) < 3: return None dev = _window.find_device(self.args[0]) if self.args[0] is not None else device if dev is None: _log.warn('Setting condition: device %s is not known', self.args[0]) return False setting = next((s for s in dev.settings if s.name == self.args[1]), None) if setting is None: _log.warn('Setting condition: setting %s is not the name of a setting for %s', self.args[1], dev.name) return None # should the value argument be checked to be sure it is acceptable?? needs to be careful about boolean toggle # TODO add compare methods for more validators try: result = setting.compare(self.args[2:], setting.read()) except Exception as e: _log.warn('Setting condition: error when checking setting %s: %s', self.args, e) result = False return result def data(self): return {'Setting': self.args[:]} MODIFIERS = { 'Shift': int(Gdk.ModifierType.SHIFT_MASK), 'Control': int(Gdk.ModifierType.CONTROL_MASK), 'Alt': int(Gdk.ModifierType.MOD1_MASK), 'Super': int(Gdk.ModifierType.MOD4_MASK) } MODIFIER_MASK = MODIFIERS['Shift'] + MODIFIERS['Control'] + MODIFIERS['Alt'] + MODIFIERS['Super'] class Modifiers(Condition): def __init__(self, modifiers): modifiers = [modifiers] if isinstance(modifiers, str) else modifiers self.desired = 0 self.modifiers = [] for k in modifiers: if k in MODIFIERS: self.desired += MODIFIERS.get(k, 0) self.modifiers.append(k) else: _log.warn('unknown rule Modifier value: %s', k) def __str__(self): return 'Modifiers: ' + str(self.desired) def evaluate(self, feature, notification, device, status, last_result): current = gkeymap.get_modifier_state() # get the current keyboard modifier return self.desired == (current & MODIFIER_MASK) def data(self): return {'Modifiers': [str(m) for m in self.modifiers]} class Key(Condition): DOWN = 'pressed' UP = 'released' def __init__(self, args): default_key = 0 default_action = self.DOWN key, action = None, None if not args or not isinstance(args, (list, str)): _log.warn('rule Key arguments unknown: %s' % args) key = default_key action = default_action elif isinstance(args, str): _log.debug('rule Key assuming action "%s" for "%s"' % (default_action, args)) key = args action = default_action elif isinstance(args, list): if len(args) == 1: _log.debug('rule Key assuming action "%s" for "%s"' % (default_action, args)) key, action = args[0], default_action elif len(args) >= 2: key, action = args[:2] if isinstance(key, str) and key in _CONTROL: self.key = _CONTROL[key] else: _log.warn('rule Key key name not name of a Logitech key: %s' % key) self.key = default_key if isinstance(action, str) and action in (self.DOWN, self.UP): self.action = action else: _log.warn('rule Key action unknown: %s, assuming %s' % (action, default_action)) self.action = default_action def __str__(self): return 'Key: %s (%s)' % ((str(self.key) if self.key else 'None'), self.action) def evaluate(self, feature, notification, device, status, last_result): return bool(self.key and self.key == (key_down if self.action == self.DOWN else key_up)) def data(self): return {'Key': [str(self.key), self.action]} def bit_test(start, end, bits): return lambda f, r, d: int.from_bytes(d[start:end], byteorder='big', signed=True) & bits def range_test(start, end, min, max): def range_test_helper(f, r, d): value = int.from_bytes(d[start:end], byteorder='big', signed=True) return min <= value <= max and (value if value else True) return range_test_helper class Test(Condition): def __init__(self, test): self.test = test if isinstance(test, str): if test in MOUSE_GESTURE_TESTS: _log.warn('mouse movement test %s deprecated, converting to a MouseGesture', test) self.__class__ = MouseGesture self.__init__(MOUSE_GESTURE_TESTS[test]) elif test in TESTS: self.function = TESTS[test] else: _log.warn('rule Test string argument not name of a test: %s', test) self.function = TESTS['False'] elif isinstance(test, list) and all(isinstance(t, int) for t in test): _log.warn('Test rules consisting of numbers are deprecated, converting to a TestBytes condition') self.__class__ = TestBytes self.__init__(test) else: _log.warn('rule Test argument not valid %s', test) def __str__(self): return 'Test: ' + str(self.test) def evaluate(self, feature, notification, device, status, last_result): return self.function(feature, notification.address, notification.data) def data(self): return {'Test': str(self.test)} class TestBytes(Condition): def __init__(self, test): self.test = test if ( isinstance(test, list) and 2 < len(test) <= 4 and all(isinstance(t, int) for t in test) and test[0] >= 0 and test[0] <= 16 and test[1] >= 0 and test[1] <= 16 and test[0] < test[1] ): self.function = bit_test(*test) if len(test) == 3 else range_test(*test) else: _log.warn('rule TestBytes argument not valid %s', test) def __str__(self): return 'TestBytes: ' + str(self.test) def evaluate(self, feature, notification, device, status, last_result): return self.function(feature, notification.address, notification.data) def data(self): return {'TestBytes': self.test[:]} class MouseGesture(Condition): MOVEMENTS = [ 'Mouse Up', 'Mouse Down', 'Mouse Left', 'Mouse Right', 'Mouse Up-left', 'Mouse Up-right', 'Mouse Down-left', 'Mouse Down-right' ] def __init__(self, movements): if isinstance(movements, str): movements = [movements] for x in movements: if x not in self.MOVEMENTS and x not in _CONTROL: _log.warn('rule Key argument not name of a Logitech key: %s', x) self.movements = movements def __str__(self): return 'MouseGesture: ' + ' '.join(self.movements) def evaluate(self, feature, notification, device, status, last_result): if feature == _F.MOUSE_GESTURE: d = notification.data count = _unpack('!h', d[:2])[0] data = _unpack('!' + ((int(len(d) / 2) - 1) * 'h'), d[2:]) if count != len(self.movements): return False x = 0 z = 0 while x < len(data): if data[x] == 0: direction = xy_direction(data[x + 1], data[x + 2]) if self.movements[z] != direction: return False x += 3 elif data[x] == 1: if data[x + 1] not in _CONTROL: return False if self.movements[z] != str(_CONTROL[data[x + 1]]): return False x += 2 z += 1 return True return False def data(self): return {'MouseGesture': [str(m) for m in self.movements]} class Action(RuleComponent): def __init__(self, *args): pass def evaluate(self, feature, notification, device, status, last_result): return None class KeyPress(Action): def __init__(self, keys): if isinstance(keys, str): keys = [keys] self.key_names = keys self.key_symbols = [XK_KEYS.get(k, None) for k in keys] if not all(self.key_symbols): _log.warn('rule KeyPress not sequence of key names %s', keys) self.key_symbols = [] if not x11: _log.warn('rule KeyPress action only available in X11 %s', keys) self.key_symbols = [] def keysym_to_keycode(self, keysym, modifiers): # maybe should take shift into account group = kbdgroup() or 0 keycodes = gkeymap.get_entries_for_keyval(keysym) if len(keycodes.keys) == 1: k = keycodes.keys[0] return k.keycode else: for k in keycodes.keys: if group == k.group: return k.keycode _log.warn('rule KeyPress key symbol not currently available %s', self) def __str__(self): return 'KeyPress: ' + ' '.join(self.key_names) def needed(self, k, modifiers): code = modifier_code(k) return not (code is not None and modifiers & (1 << code)) def keyDown(self, keysyms, modifiers): for k in keysyms: keycode = self.keysym_to_keycode(k, modifiers) if self.needed(keycode, modifiers) and x11 and keycode: Xlib.ext.xtest.fake_input(displayt, X.KeyPress, keycode) def keyUp(self, keysyms, modifiers): for k in keysyms: keycode = self.keysym_to_keycode(k, modifiers) if self.needed(keycode, modifiers) and x11 and keycode: Xlib.ext.xtest.fake_input(displayt, X.KeyRelease, keycode) def evaluate(self, feature, notification, device, status, last_result): current = gkeymap.get_modifier_state() if _log.isEnabledFor(_INFO): _log.info('KeyPress action: %s, modifiers %s %s', self.key_symbols, current, [hex(k) for k in self.key_symbols]) self.keyDown(self.key_symbols, current) self.keyUp(reversed(self.key_symbols), current) if x11: displayt.sync() return None def data(self): return {'KeyPress': [str(k) for k in self.key_names]} # KeyDown is dangerous as the key can auto-repeat and make your system unusable # class KeyDown(KeyPress): # def evaluate(self, feature, notification, device, status, last_result): # super().keyDown(self.keys, current_key_modifiers) # class KeyUp(KeyPress): # def evaluate(self, feature, notification, device, status, last_result): # super().keyUp(self.keys, current_key_modifiers) buttons = { 'unknown': None, 'left': 1, 'middle': 2, 'right': 3, 'scroll_up': 4, 'scroll_down': 5, 'scroll_left': 6, 'scroll_right': 7 } for i in range(8, 31): buttons['button%d' % i] = i def click(button, count): if x11: for _ in range(count): Xlib.ext.xtest.fake_input(displayt, Xlib.X.ButtonPress, button) Xlib.ext.xtest.fake_input(displayt, Xlib.X.ButtonRelease, button) else: _log.warn('X11 not available - rules cannot simulate mouse clicks') class MouseScroll(Action): def __init__(self, amounts): import numbers if len(amounts) == 1 and isinstance(amounts[0], list): amounts = amounts[0] if not (len(amounts) == 2 and all([isinstance(a, numbers.Number) for a in amounts])): _log.warn('rule MouseScroll argument not two numbers %s', amounts) amounts = [0, 0] self.amounts = amounts if not x11: _log.warn('X11 not available - rules cannot simulate mouse scrolling - %s', self) def __str__(self): return 'MouseScroll: ' + ' '.join([str(a) for a in self.amounts]) def evaluate(self, feature, notification, device, status, last_result): import math import numbers amounts = self.amounts if isinstance(last_result, numbers.Number): amounts = [math.floor(last_result * a) for a in self.amounts] if _log.isEnabledFor(_INFO): _log.info('MouseScroll action: %s %s %s', self.amounts, last_result, amounts) dx, dy = amounts if dx: click(button=buttons['scroll_right'] if dx > 0 else buttons['scroll_left'], count=abs(dx)) if dy: click(button=buttons['scroll_up'] if dy > 0 else buttons['scroll_down'], count=abs(dy)) if x11: displayt.sync() return None def data(self): return {'MouseScroll': self.amounts[:]} class MouseClick(Action): def __init__(self, args): if len(args) == 1 and isinstance(args[0], list): args = args[0] if not isinstance(args, list): args = [args] self.button = str(args[0]) if len(args) >= 0 else None if self.button not in buttons: _log.warn('rule MouseClick action: button %s not known', self.button) self.button = None count = args[1] if len(args) >= 2 else 1 try: self.count = int(count) except (ValueError, TypeError): _log.warn('rule MouseClick action: count %s should be an integer', count) self.count = 1 if not x11: _log.warn('X11 not available - rules cannot simulate mouse clicks - %s', self) def __str__(self): return 'MouseClick: %s (%d)' % (self.button, self.count) def evaluate(self, feature, notification, device, status, last_result): if _log.isEnabledFor(_INFO): _log.info('MouseClick action: %d %s' % (self.count, self.button)) if self.button and self.count: click(buttons[self.button], self.count) if x11: displayt.sync() return None def data(self): return {'MouseClick': [self.button, self.count]} class Set(Action): def __init__(self, args): if not (isinstance(args, list) and len(args) > 2): _log.warn('rule Set argument not list with minimum length 3: %s', args) self.args = [] else: self.args = args def __str__(self): return 'Set: ' + ' '.join([str(a) for a in self.args]) def evaluate(self, feature, notification, device, status, last_result): import solaar.ui.window as _window # importing here to avoid circular imports if len(self.args) < 3: return None if _log.isEnabledFor(_INFO): _log.info('Set action: %s', self.args) dev = _window.find_device(self.args[0]) if self.args[0] is not None else device if dev is None: _log.error('Set action: device %s is not known', self.args[0]) return None setting = next((s for s in dev.settings if s.name == self.args[1]), None) if setting is None: _log.error('Set action: setting %s is not the name of a setting for %s', self.args[1], dev.name) return None args = setting.acceptable(self.args[2:], setting.read()) if args is None: _log.error('Set Action: invalid args %s for setting %s of %s', self.args[2:], self.args[1], self.args[0]) return None _change_setting(dev, setting, args) return None def data(self): return {'Set': self.args[:]} class Execute(Action): def __init__(self, args): if isinstance(args, str): args = [args] if not (isinstance(args, list) and all(isinstance(arg), str) for arg in args): _log.warn('rule Execute argument not list of strings: %s', args) self.args = [] else: self.args = args def __str__(self): return 'Execute: ' + ' '.join([a for a in self.args]) def evaluate(self, feature, notification, device, status, last_result): import subprocess if _log.isEnabledFor(_INFO): _log.info('Execute action: %s', self.args) subprocess.Popen(self.args) return None def data(self): return {'Execute': self.args[:]} COMPONENTS = { 'Rule': Rule, 'Not': Not, 'Or': Or, 'And': And, 'Process': Process, 'MouseProcess': MouseProcess, 'Feature': Feature, 'Report': Report, 'Setting': Setting, 'Modifiers': Modifiers, 'Key': Key, 'Test': Test, 'TestBytes': TestBytes, 'MouseGesture': MouseGesture, 'KeyPress': KeyPress, 'MouseScroll': MouseScroll, 'MouseClick': MouseClick, 'Set': Set, 'Execute': Execute, } built_in_rules = Rule([]) if True: # x11 built_in_rules = Rule([ {'Rule': [ # Implement problematic keys for Craft and MX Master {'Rule': [{'Key': ['Brightness Down', 'pressed']}, {'KeyPress': 'XF86_MonBrightnessDown'}]}, {'Rule': [{'Key': ['Brightness Up', 'pressed']}, {'KeyPress': 'XF86_MonBrightnessUp'}]}, ]}, {'Rule': [ # In firefox, crown emits keys that move up and down if not pressed, rotate through tabs otherwise {'Process': 'firefox'}, {'Rule': [{'Test': 'crown_pressed'}, {'Test': 'crown_right_ratchet'}, {'KeyPress': ['Control_R', 'Tab']}]}, {'Rule': [{'Test': 'crown_pressed'}, {'Test': 'crown_left_ratchet'}, {'KeyPress': ['Control_R', 'Shift_R', 'Tab']}]}, {'Rule': [{'Test': 'crown_right_ratchet'}, {'KeyPress': 'Down'}]}, {'Rule': [{'Test': 'crown_left_ratchet'}, {'KeyPress': 'Up'}]}, ]}, {'Rule': [ # Otherwise, crown movements emit keys that modify volume if not pressed, move between tracks otherwise {'Feature': 'CROWN'}, {'Report': 0x0}, {'Rule': [{'Test': 'crown_pressed'}, {'Test': 'crown_right_ratchet'}, {'KeyPress': 'XF86_AudioNext'}]}, {'Rule': [{'Test': 'crown_pressed'}, {'Test': 'crown_left_ratchet'}, {'KeyPress': 'XF86_AudioPrev'}]}, {'Rule': [{'Test': 'crown_right_ratchet'}, {'KeyPress': 'XF86_AudioRaiseVolume'}]}, {'Rule': [{'Test': 'crown_left_ratchet'}, {'KeyPress': 'XF86_AudioLowerVolume'}]} ]}, {'Rule': [ # Thumb wheel does horizontal movement, doubled if control key not pressed {'Feature': 'THUMB WHEEL'}, # with control modifier on mouse scrolling sometimes does something different! {'Rule': [{'Modifiers': 'Control'}, {'Test': 'thumb_wheel_up'}, {'MouseScroll': [-1, 0]}]}, {'Rule': [{'Modifiers': 'Control'}, {'Test': 'thumb_wheel_down'}, {'MouseScroll': [-1, 0]}]}, {'Rule': [{'Or': [{'Test': 'thumb_wheel_up'}, {'Test': 'thumb_wheel_down'}]}, {'MouseScroll': [-2, 0]}]} ]} ]) keys_down = [] g_keys_down = [0, 0, 0, 0] m_keys_down = 0 mr_key_down = False # process a notification def process_notification(device, status, notification, feature): if False: # not x11 return global keys_down, g_keys_down, m_keys_down, mr_key_down, key_down, key_up key_down, key_up = None, None # need to keep track of keys that are down to find a new key down if feature == _F.REPROG_CONTROLS_V4 and notification.address == 0x00: new_keys_down = _unpack('!4H', notification.data[:8]) for key in new_keys_down: if key and key not in keys_down: key_down = key for key in keys_down: if key and key not in new_keys_down: key_up = key keys_down = new_keys_down # and also G keys down elif feature == _F.GKEY and notification.address == 0x00: new_g_keys_down = _unpack('!4B', notification.data[:4]) # process 32 bits, byte by byte for byte_idx in range(4): new_byte, old_byte = new_g_keys_down[byte_idx], g_keys_down[byte_idx] for i in range(1, 9): if new_byte & (0x01 << (i - 1)) and not old_byte & (0x01 << (i - 1)): key_down = _CONTROL['G' + str(i + 8 * byte_idx)] if old_byte & (0x01 << (i - 1)) and not new_byte & (0x01 << (i - 1)): key_up = _CONTROL['G' + str(i + 8 * byte_idx)] g_keys_down = new_g_keys_down # and also M keys down elif feature == _F.MKEYS and notification.address == 0x00: new_m_keys_down = _unpack('!1B', notification.data[:1])[0] for i in range(1, 9): if new_m_keys_down & (0x01 << (i - 1)) and not m_keys_down & (0x01 << (i - 1)): key_down = _CONTROL['M' + str(i)] if m_keys_down & (0x01 << (i - 1)) and not new_m_keys_down & (0x01 << (i - 1)): key_up = _CONTROL['M' + str(i)] m_keys_down = new_m_keys_down # and also MR key elif feature == _F.MR and notification.address == 0x00: new_mr_key_down = _unpack('!1B', notification.data[:1])[0] if not mr_key_down and new_mr_key_down: key_down = _CONTROL['MR'] if mr_key_down and not new_mr_key_down: key_up = _CONTROL['MR'] mr_key_down = new_mr_key_down GLib.idle_add(rules.evaluate, feature, notification, device, status, True) _XDG_CONFIG_HOME = _os.environ.get('XDG_CONFIG_HOME') or _path.expanduser(_path.join('~', '.config')) _file_path = _path.join(_XDG_CONFIG_HOME, 'solaar', 'rules.yaml') rules = built_in_rules def _save_config_rule_file(file_name=_file_path): # This is a trick to show str/float/int lists in-line (inspired by https://stackoverflow.com/a/14001707) class inline_list(list): pass def blockseq_rep(dumper, data): return dumper.represent_sequence('tag:yaml.org,2002:seq', data, flow_style=True) _yaml_add_representer(inline_list, blockseq_rep) def convert(elem): if isinstance(elem, list): if len(elem) == 1 and isinstance(elem[0], (int, str, float)): # All diversion classes that expect a list of scalars also support a single scalar without a list return elem[0] if all(isinstance(c, (int, str, float)) for c in elem): return inline_list([convert(c) for c in elem]) return [convert(c) for c in elem] if isinstance(elem, dict): return {k: convert(v) for k, v in elem.items()} if isinstance(elem, NamedInt): return int(elem) return elem # YAML format settings dump_settings = { 'encoding': 'utf-8', 'explicit_start': True, 'explicit_end': True, 'default_flow_style': False # 'version': (1, 3), # it would be printed for every rule } # Save only user-defined rules rules_to_save = sum((r.data()['Rule'] for r in rules.components if r.source == file_name), []) if rules_to_save: if _log.isEnabledFor(_INFO): _log.info('saving %d rule(s) to %s', len(rules_to_save), file_name) try: with open(file_name, 'w') as f: f.write('%YAML 1.3\n') # Write version manually _yaml_dump_all(convert([r['Rule'] for r in rules_to_save]), f, **dump_settings) except Exception as e: _log.error('failed to save to %s\n%s', file_name, e) return False return True def _load_config_rule_file(): global rules loaded_rules = [] if _path.isfile(_file_path): try: with open(_file_path) as config_file: loaded_rules = [] for loaded_rule in _yaml_safe_load_all(config_file): rule = Rule(loaded_rule, source=_file_path) if _log.isEnabledFor(_DEBUG): _log.debug('load rule: %s', rule) loaded_rules.append(rule) if _log.isEnabledFor(_INFO): _log.info('loaded %d rules from %s', len(loaded_rules), config_file.name) except Exception as e: _log.error('failed to load from %s\n%s', _file_path, e) rules = Rule([Rule(loaded_rules, source=_file_path), built_in_rules]) if True: # x11 _load_config_rule_file()