From f3a276d7a35bc8230da1b6e87a39f458d2071e94 Mon Sep 17 00:00:00 2001 From: "Peter F. Patel-Schneider" Date: Wed, 28 Oct 2020 12:37:09 -0400 Subject: [PATCH] receiver: add rule-based processing of diverted features --- lib/logitech_receiver/diversion.py | 620 +++++++++++++++++++++++++ lib/logitech_receiver/notifications.py | 96 ++-- 2 files changed, 666 insertions(+), 50 deletions(-) create mode 100644 lib/logitech_receiver/diversion.py diff --git a/lib/logitech_receiver/diversion.py b/lib/logitech_receiver/diversion.py new file mode 100644 index 00000000..903d78f0 --- /dev/null +++ b/lib/logitech_receiver/diversion.py @@ -0,0 +1,620 @@ +# -*- python-mode -*- +# -*- coding: UTF-8 -*- + +## Copyright (C) 2020 Peter Patel-Schneider +## +## This program is free software; you can redistribute it and/or modify +## it under the terms of the GNU General Public License as published by +## the Free Software Foundation; either version 2 of the License, or +## (at your option) any later version. +## +## This program is distributed in the hope that it will be useful, +## but WITHOUT ANY WARRANTY; without even the implied warranty of +## MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +## GNU General Public License for more details. +## +## You should have received a copy of the GNU General Public License along +## with this program; if not, write to the Free Software Foundation, Inc., +## 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. + +# Map diverted feature notifications to keyboard and mouse input + +import os as _os +import os.path as _path + +from logging import INFO as _INFO +from logging import getLogger + +import _thread +import psutil +import Xlib + +from pynput import keyboard as _keyboard +from pynput import mouse as _mouse +from Xlib import X +from Xlib.display import Display +from Xlib.ext import record +from Xlib.protocol import rq +from yaml import safe_load_all as _yaml_safe_load_all + +from .common import unpack as _unpack +from .hidpp20 import FEATURE as _F +from .special_keys import CONTROL as _CONTROL + +_log = getLogger(__name__) +del getLogger + +Xlib.XK.load_keysym_group('xf86') + +# determine name of active process + +disp_prog = Display() +NET_ACTIVE_WINDOW = disp_prog.intern_atom('_NET_ACTIVE_WINDOW') +NET_WM_PID = disp_prog.intern_atom('_NET_WM_PID') +root2 = disp_prog.screen().root +root2.change_attributes(event_mask=Xlib.X.PropertyChangeMask) +active_process_name = None + + +def active_program(): + try: + window_id = root2.get_full_property(NET_ACTIVE_WINDOW, Xlib.X.AnyPropertyType).value[0] + window = disp_prog.create_resource_object('window', window_id) + window_pid = window.get_full_property(NET_WM_PID, 0).value[0] + return psutil.Process(window_pid).name() + except Xlib.error.XError: # simplify dealing with BadWindow + return None + + +def determine_active_program(): + global active_process_name + active_process_name = active_program() + while True: + event = disp_prog.next_event() + if event.type == Xlib.X.PropertyNotify and event.atom == NET_ACTIVE_WINDOW: + active_process_name = active_program() + + +_thread.start_new_thread(determine_active_program, ()) + +# determine current key modifiers +# there must be a better way to do this + +display = Display() +context = display.record_create_context( + 0, [record.AllClients], [{ + 'core_requests': (0, 0), + 'core_replies': (0, 0), + 'ext_requests': (0, 0, 0, 0), + 'ext_replies': (0, 0, 0, 0), + 'delivered_events': (0, 0), + 'device_events': (X.KeyPress, X.KeyRelease), + 'errors': (0, 0), + 'client_started': False, + 'client_died': False, + }] +) + +modifier_keycodes = display.get_modifier_mapping() +current_key_modifiers = 0 + + +def modifier_code(keycode): + if keycode == 0: + return None + for m in range(0, len(modifier_keycodes)): + if keycode in modifier_keycodes[m]: + return m + + +def key_press_handler(reply): + global current_key_modifiers + data = reply.data + while len(data): + event, data = rq.EventField(None).parse_binary_value(data, display.display, None, None) + if event.type == X.KeyPress: + mod = modifier_code(event.detail) + current_key_modifiers = event.state | 1 << mod if mod is not None else event.state + # if mod is not None: + # print("press", event.detail, current_key_modifiers) + elif event.type == X.KeyRelease: + mod = modifier_code(event.detail) + current_key_modifiers = event.state & ~(1 << mod) if mod is not None else event.state + # if mod is not None: + # print("release", event.detail, current_key_modifiers) + + +_thread.start_new_thread(display.record_enable_context, (context, key_press_handler)) +# display.record_free_context(context) when should this be done?? + +# Solaar processes feature notifications from devices by evaluating a rule (usually consisting of a +# sequence of sub-rules) on them. +# +# A rule is a sequence of conditions, actions, and sub-rules. +# +# Rules are evaluated by evaluating each of their components in order. +# A rule evaluation is terminated early if a condition component evaluates to false or +# the last evaluated sub-component of a component is an action. +# A Rule is false if its last evaluated component evaluates to a false value. +# +# Not conditions are true if their component evaluates to a false value. +# Or conditions are evaluated by evaluating each of their components in order. +# An Or condition is terminated early if a component evaluates to true or +# the last evaluated sub-component of a component is an action. +# A Or condition is true if its last evaluated component evaluates to a true value. +# And conditions are evaluted the same as rules. +# +# Process conditions are true if the name of the active process starts with their argument. +# Feature conditions are if true if the name of the feature of the current notification is their argument. +# Report conditions are if true if the report number in the current notification is their argument. +# Modifiers conditions are true if the current keyboard modifiers are their arguments. +# The permissable modifiers are 'Shift', 'Control', 'Alt', and 'Super'. +# Key conditions are true if the Logitech name of the last key down is their argument. +# Test conditions are true if their test evaluates to true on the feature, report, and data of the current notification. +# Test conditions can return a number instead of a boolean. +# +# Test conditions consisting of a list of three or four integers use the first two to select bytes of the data. +# Three-element test conditions are true if the selected bytes bit-wise anded with its third element is non-zero. +# The value of these test conditions is the result of the and. +# Four-element test conditions are true if the selected bytes form a signed integer between the third and fourth elements. +# The value of test test condition is the signed value of the selected bytes if that is non-zero otherwise True. +# +# The other test conditions are mnemonic shorthands for meaningful feature, report, and data combinations. +# A crown_right test is the rotation amount of a CROWN right rotation. +# A crown_left test is the rotation amount of a CROWN left rotation. +# A crown_right_ratchet test is the ratchet amount of a CROWN right ratchet rotation. +# A crown_left_ratchet test is the ratchet amount of a CROWN left ratchet rotation. +# A crown_tap test is true for a CROWN tap. +# A crown_start_press test is true for the start of a CROWN press. +# A crown_stop_press test is true for the end of a CROWN press. +# A crown_pressed test is true for a CROWN notification with the Crown pressed. +# A thumb_wheel_up test is the rotation amount of a THUMB WHEEL upward rotation. +# A thumb_wheel_down test is the rotation amount of a THUMB WHEEL downward rotation. +# +# A KeyPress action takes X11 key symbols and simulates a chorded keypress on the keyboard. +# Any key symbols that correspond to modifier keys that are in the current keyboard modifiers are ignored. +# A MouseScroll action takes two numbers and simulates a horizontal and vertical mouse scroll of these amounts. +# If the previous condition in the parent rule returns a number the scroll amounts are multiplied by this number. +# An Execute actions takes a program and arguments and executes it asynchronously. +# +# Solaar reads rules from a configuration file (normally ~/.config/solaar/divert.yaml). +# This file contains zero or more documents, each a rule. +# Rule components are one-element dictionaries with key the component name (with initial Captial letter). +# If a component has multiple arguments they are written as a sequence. +# Solaar constructs the rule that it uses to process feature notifications from these rules (if any) +# followed by some built-in rules. + +keyboard = _keyboard.Controller() +mouse = _mouse.Controller() + +keys_down = [] +key_down = None + + +def signed(bytes): + return int.from_bytes(bytes, 'big', signed=True) + + +TESTS = { + 'crown_right': lambda f, r, d: f == _F.CROWN and r == 0 and d[1] < 128 and d[1], + 'crown_left': lambda f, r, d: f == _F.CROWN and r == 0 and d[1] >= 128 and 256 - d[1], + 'crown_right_ratchet': lambda f, r, d: f == _F.CROWN and r == 0 and d[2] < 128 and d[2], + 'crown_left_ratchet': lambda f, r, d: f == _F.CROWN and r == 0 and d[2] >= 128 and 256 - d[2], + 'crown_tap': lambda f, r, d: f == _F.CROWN and r == 0 and d[5] == 0x01 and d[5], + 'crown_start_press': lambda f, r, d: f == _F.CROWN and r == 0 and d[6] == 0x01 and d[6], + 'crown_end_press': lambda f, r, d: f == _F.CROWN and r == 0 and d[6] == 0x05 and d[6], + 'crown_pressed': lambda f, r, d: f == _F.CROWN and r == 0 and d[6] >= 0x01 and d[6] <= 0x04 and d[6], + 'thumb_wheel_up': lambda f, r, d: _F.THUMB_WHEEL and r == 0 and signed(d[0:2]) < 0 and signed(d[0:2]), + 'thumb_wheel_down': lambda f, r, d: _F.THUMB_WHEEL and r == 0 and signed(d[0:2]) > 0 and signed(d[0:2]), + 'False': lambda f, r, d: False, +} + +COMPONENTS = {} + + +class RuleComponent(object): + def compile(self, c): + if isinstance(c, RuleComponent): + return c + elif isinstance(c, dict) and len(c) == 1: + k, v = next(iter(c.items())) + if k in COMPONENTS: + return COMPONENTS[k](v) + _log.warn('illegal component in rule: %s', c) + return Condition() + + +class Rule(RuleComponent): + def __init__(self, args): + self.components = [self.compile(a) for a in args] + + def __str__(self): + return 'Rule[' + ', '.join([c.__str__() for c in self.components]) + ']' + + def evaluate(self, feature, notification, device, status, last_result): + result = True + for component in self.components: + result = component.evaluate(feature, notification, device, status, result) + if not isinstance(component, Action) and result is None: + return None + if isinstance(component, Condition) and not result: + return result + return result + + +class Condition(RuleComponent): + def __init__(self, *args): + pass + + def __str__(self): + return 'CONDITION' + + def evaluate(self, feature, notification, device, status, last_result): + return False + + +class Not(Condition): + def __init__(self, op): + self.op = op + self.component = self.compile(op) + + def __str__(self): + return 'NOT: ' + str(self.component) + + def evaluate(self, feature, notification, device, status, last_result): + result = self.component.evaluate(feature, notification, device, status, last_result) + return None if result is None else not result + + +class Or(Condition): + def __init__(self, args): + self.components = [self.compile(a) for a in args] + + def __str__(self): + return 'Or: [' + ', '.join(str(c) for c in self.components) + ']' + + def evaluate(self, feature, notification, device, status, last_result): + result = False + for component in self.components: + result = component.evaluate(feature, notification, device, status, last_result) + if not isinstance(component, Action) and result is None: + return None + if isinstance(component, Condition) and result: + return result + return result + + +class And(Condition): + def __init__(self, args): + self.components = [self.compile(a) for a in args] + + def __str__(self): + return 'AND: [' + ', '.join(str(c) for c in self.components) + ']' + + def evaluate(self, feature, notification, device, status, last_result): + result = True + for component in self.components: + result = component.evaluate(feature, notification, device, status, last_result) + if not isinstance(component, Action) and result is None: + return None + if isinstance(component, Condition) and not result: + return result + return result + + +class Process(Condition): + def __init__(self, process): + self.process = process + if not isinstance(process, str): + _log.warn('rule Process argument not a string: %s', process) + + def __str__(self): + return 'PROCESS: ' + str(self.process) + + def evaluate(self, feature, notification, device, status, last_result): + return active_process_name.startswith(self.process) if isinstance(self.process, str) else False + + +class Feature(Condition): + def __init__(self, feature): + if not (isinstance(feature, str) and feature in _F): + _log.warn('rule Feature argument not name of a feature: %s', feature) + self.feature = None + self.feature = _F[feature] + + def __str__(self): + return 'FEATURE: ' + str(self.feature) + + def evaluate(self, feature, notification, device, status, last_result): + return feature == self.feature + + +class Report(Condition): + def __init__(self, report): + if not (isinstance(report, int)): + _log.warn('rule Report argument not an integer: %s', report) + self.report = -1 + self.report = report + + def __str__(self): + return 'REPORT: ' + str(self.report) + + def evaluate(self, report, notification, device, status, last_result): + return (notification.address >> 4) == self.report + + +MODIFIERS = {'Shift': 0x01, 'Control': 0x04, 'Alt': 0x08, 'Super': 0x40} +MODIFIER_MASK = MODIFIERS['Shift'] + MODIFIERS['Control'] + MODIFIERS['Alt'] + MODIFIERS['Super'] + + +class Modifiers(Condition): + def __init__(self, modifiers): + modifiers = [modifiers] if isinstance(modifiers, str) else modifiers + self.desired = 0 + for k in modifiers: + if k in MODIFIERS: + self.desired += MODIFIERS.get(k, 0) + else: + _log.warn('unknown rule Modifier value: %s', k) + + def __str__(self): + return 'MODIFIERS: ' + str(self.desired) + + def evaluate(self, feature, notification, device, status, last_result): + return self.desired == (current_key_modifiers & MODIFIER_MASK) + + +class Key(Condition): + def __init__(self, key): + if isinstance(key, str) and key in _CONTROL: + self.key = _CONTROL[key] + else: + _log.warn('rule Key argument not name of a Logitech key: %s', key) + self.key = 0 + + def __str__(self): + return 'KEY: ' + (str(self.key) if self.key else 'None') + + def evaluate(self, feature, notification, device, status, last_result): + return self.key and self.key == key_down + + +def bit_test(start, end, bits): + return lambda f, r, d: int.from_bytes(d[start:end], byteorder='big', signed=True) & bits + + +def range_test(start, end, min, max): + def range_test_helper(f, r, d): + value = int.from_bytes(d[start:end], byteorder='big', signed=True) + return min <= value <= max and (value if value else True) + + return range_test_helper + + +class Test(Condition): + def __init__(self, test): + self.test = test + if isinstance(test, str): + if test in TESTS: + self.function = TESTS[test] + else: + _log.warn('rule Test string argument not name of a test: %s', test) + self.function = TESTS['False'] + elif ( + isinstance(test, list) and 2 < len(test) <= 4 and all(isinstance(t, int) for t in test) and test[0] >= 0 + and test[0] <= 16 and test[1] >= 0 and test[1] <= 16 and test[0] < test[1] + ): + self.function = bit_test(*test) if len(test) == 3 else range_test(*test) + else: + _log.warn('rule Test argument not valid %s', test) + + def __str__(self): + return 'TEST: ' + str(self.test) + + def evaluate(self, feature, notification, device, status, last_result): + return self.function(feature, notification.address, notification.data) + + +class Action(RuleComponent): + def __init__(self, *args): + pass + + def evaluate(self, feature, notification, device, status, last_result): + return None + + +class KeyPress(Action): + def __init__(self, keys): + if isinstance(keys, str): + keys = [keys] + self.key_symbols = keys + key_from_string = lambda s: s if isinstance(s, str) and len(s) == 1 else _keyboard.KeyCode._from_symbol(s) + self.keys = [isinstance(k, str) and key_from_string(k) for k in keys] + if not all(self.keys): + _log.warn('rule KeyPress argument not sequence of key names %s', keys) + self.keys = [] + + def __str__(self): + return 'KeyPress: ' + ' '.join(self.key_symbols) + + def needed(self, k, current_key_modifiers): + code = modifier_code(display.keysym_to_keycode(k.vk if isinstance(k, _keyboard.KeyCode) else k)) + return not (code and current_key_modifiers & (1 << code)) + + def keyDown(self, keys, modifiers): + for k in keys: + if self.needed(k, modifiers): + keyboard.press(k) + + def keyUp(self, keys, modifiers): + for k in keys: + if self.needed(k, modifiers): + keyboard.release(k) + + def evaluate(self, feature, notification, device, status, last_result): + current = current_key_modifiers + # print("KeyPress", self.key_symbols, current) + if _log.isEnabledFor(_INFO): + _log.info( + 'rule KeyPress action: %s, modifiers %s %s', self.key_symbols, current, + [(hex(k.vk) if isinstance(k, _keyboard.KeyCode) else k) for k in self.keys] + ) + self.keyDown(self.keys, current) + import time + time.sleep(0.1) + self.keyUp(reversed(self.keys), current) + return None + + +# KeyDown is dangerous as the key can auto-repeat and make your system unusable +# class KeyDown(KeyPress): +# def evaluate(self, feature, notification, device, status, last_result): +# print('KEYDOWN', [(hex(k.vk) if isinstance(k, _keyboard.KeyCode) else k) for k in self.keys], current_key_modifiers) +# super().keyDown(self.keys, current_key_modifiers) +# class KeyUp(KeyPress): +# def evaluate(self, feature, notification, device, status, last_result): +# print('KEYUP', [(hex(k.vk) if isinstance(k, _keyboard.KeyCode) else k) for k in self.keys], current_key_modifiers) +# super().keyUp(self.keys, current_key_modifiers) + + +class MouseScroll(Action): + def __init__(self, amounts): + import numbers + if len(amounts) == 1 and isinstance(amounts[0], list): + amounts = amounts[0] + if not (len(amounts) == 2 and all([isinstance(a, numbers.Number) for a in amounts])): + _log.warn('rule MouseScroll argument not two numbers %s', amounts) + self.amounts = amounts + + def __str__(self): + return 'MouseScroll: ' + ' '.join([str(a) for a in self.amounts]) + + def evaluate(self, feature, notification, device, status, last_result): + import math + import numbers + amounts = self.amounts + if isinstance(last_result, numbers.Number): + amounts = [math.floor(last_result * a) for a in self.amounts] + # print("MOUSESCROLL", self.amounts, last_result, amounts) + if _log.isEnabledFor(_INFO): + _log.info('rule MouseScroll action: %s %s %s', self.amounts, last_result, amounts) + mouse.scroll(*amounts) + return None + + +class Execute(Action): + def __init__(self, args): + if isinstance(args, str): + args = [args] + if not (isinstance(args, list) and all(isinstance(arg), str) for arg in args): + _log.warn('rule Execute argument not list of strings: %s', args) + self.args = None + else: + self.args = args + + def __str__(self): + return 'Execute: ' + ' '.join([a for a in self.args]) + + def evaluate(self, feature, notification, device, status, last_result): + import subprocess + if _log.isEnabledFor(_INFO): + _log.info('rule Execute action: %s', self.args) + subprocess.Popen(self.args) + return None + + +COMPONENTS = { + 'Rule': Rule, + 'Not': Not, + 'Or': Or, + 'And': And, + 'Process': Process, + 'Feature': Feature, + 'Report': Report, + 'Modifiers': Modifiers, + 'Key': Key, + 'Test': Test, + 'KeyPress': KeyPress, + 'MouseScroll': MouseScroll, + 'Execute': Execute, +} + + +rules = Rule([ + ## Some malformed Rules for testing + ## Rule([Process(0), Feature(0), Modifiers(['XX', 0]), Modifiers('XXX'), Modifiers([0]), + ## KeyPress(['XXXXX', 0]), KeyPress(['XXXXXX']), KeyPress(0), + ## MouseScroll(0), MouseScroll([0, 0, 0]), MouseScroll(['a', 0]), + ## Rule(["XXXXXXX"])]), + ## Rule([Feature(0)]), + ## Rule([Modifiers(['XXXXXXXXX', 0])]), + ## Rule([KeyPress(['XXXXXSSSSS', 0])]), + {'Rule': [ # Implement problematic keys for Craft and MX Master + {'Feature': 'REPROG_CONTROLS_V4'}, + {'Report': 0x0}, + {'Rule': [{'Key': 'Brightness Down'}, {'KeyPress': 'XF86_MonBrightnessDown'}]}, + {'Rule': [{'Key': 'Brightness Up'}, {'KeyPress': 'XF86_MonBrightnessUp'}]}, + ]}, + {'Rule': [ # In firefox, crown movements emits keys that move up and down if not pressed, rotate through tabs otherwise + {'Process': 'firefox'}, + {'Feature': 'CROWN'}, + {'Report': 0x0}, + {'Rule': [{'Test': 'crown_pressed'}, {'Test': 'crown_right_ratchet'}, {'KeyPress': ['Control_R', 'Tab']}]}, + {'Rule': [{'Test': 'crown_pressed'}, {'Test': 'crown_left_ratchet'}, {'KeyPress': ['Control_R', 'Shift_R', 'Tab']}]}, + {'Rule': [{'Test': 'crown_right_ratchet'}, {'KeyPress': 'Down'}]}, + Rule([Test('crown_left_ratchet'), KeyPress(['Up'])]), + ]}, + {'Rule': [ # Otherwise, crown movements emit keys that modify volume if not pressed, move between tracks otherwise + {'Feature': 'CROWN'}, {'Report': 0x0}, + {'Rule': [{'Test': 'crown_pressed'}, {'Test': 'crown_right_ratchet'}, {'KeyPress': 'XF86_AudioNext'}]}, + {'Rule': [{'Test': 'crown_pressed'}, {'Test': 'crown_left_ratchet'}, {'KeyPress': 'XF86_AudioPrev'}]}, + {'Rule': [{'Test': 'crown_right_ratchet'}, {'KeyPress': 'XF86_AudioRaiseVolume'}]}, + {'Rule': [{'Test': 'crown_left_ratchet'}, {'KeyPress': 'XF86_AudioLowerVolume'}]} + ]}, + {'Rule': [ # Thumb wheel does horizontal movement, doubled if control key not pressed + {'Feature': 'THUMB_WHEEL'}, # with control modifier on mouse scrolling sometimes does something different! + {'Rule': [{'Modifiers': 'Control'}, {'Test': 'thumb_wheel_up'}, {'MouseScroll': [-1, 0]}]}, + {'Rule': [{'Modifiers': 'Control'}, {'Test': 'thumb_wheel_down'}, {'MouseScroll': [-1, 0]}]}, + {'Rule': [{'Or': [{'Test': 'thumb_wheel_up'}, {'Test': 'thumb_wheel_down'}]}, {'MouseScroll': [-2, 0]}]} + ]} +]) + + +# process a notification +def process_notification(device, status, notification, feature): + global keys_down, key_down + key_down = None + # need to keep track of keys that are down to find a new key down + if feature == _F.REPROG_CONTROLS_V4 and notification.address == 0x00: + new_keys_down = _unpack('!4H', notification.data[:8]) + for key in new_keys_down: + if key and key not in keys_down: + key_down = key + keys_down = new_keys_down + rules.evaluate(feature, notification, device, status, True) + + +_XDG_CONFIG_HOME = _os.environ.get('XDG_CONFIG_HOME') or _path.expanduser(_path.join('~', '.config')) +_file_path = _path.join(_XDG_CONFIG_HOME, 'solaar', 'divert.yaml') + + +def _load_config_rule_file(): + global rules + if _path.isfile(_file_path): + try: + with open(_file_path, 'r') as config_file: + loaded_rules = [] + for loaded_rule in _yaml_safe_load_all(config_file): + rule = Rule(loaded_rule) + if _log.isEnabledFor(_INFO): + _log.info('load rule: %s', rule) + loaded_rules.append(rule) + if _log.isEnabledFor(_INFO): + _log.info('loaded %d rules from %s', len(loaded_rules), config_file) + loaded_rules.extend(rules.components) + rules = Rule(loaded_rules) + except Exception as e: + _log.error('failed to load from %s\n%s', _file_path, e) + + +_load_config_rule_file() diff --git a/lib/logitech_receiver/notifications.py b/lib/logitech_receiver/notifications.py index c3cfc542..2e74d231 100644 --- a/lib/logitech_receiver/notifications.py +++ b/lib/logitech_receiver/notifications.py @@ -27,6 +27,7 @@ from logging import INFO as _INFO from logging import WARNING as _WARNING from logging import getLogger +from . import diversion as _diversion from . import hidpp10 as _hidpp10 from . import hidpp20 as _hidpp20 from .base import DJ_MESSAGE_ID as _DJ_MESSAGE_ID @@ -285,50 +286,6 @@ def _process_feature_notification(device, status, n, feature): _log.warn('%s: unknown UNIFIED BATTERY %s', device, n) return True - # TODO: what are REPROG_CONTROLS_V{2,3}? - if feature == _F.REPROG_CONTROLS: - if n.address == 0x00: - if _log.isEnabledFor(_INFO): - _log.info('%s: reprogrammable key: %s', device, n) - else: - _log.warn('%s: unknown REPROG_CONTROLS %s', device, n) - return True - - if feature == _F.REPROG_CONTROLS_V4: - if n.address == 0x00: - if _log.isEnabledFor(_DEBUG): - cid1, cid2, cid3, cid4 = _unpack('!HHHH', n.data[:8]) - _log.debug('%s: diverted controls pressed: %i, %i, %i, %i', device, cid1, cid2, cid3, cid4) - return True - elif n.address == 0x10: - if _log.isEnabledFor(_DEBUG): - dx, dy = _unpack('!hh', n.data[:4]) - _log.debug('%s: rawXY dx=%i dy=%i', device, dx, dy) - return True - elif n.address == 0x20: - if _log.isEnabledFor(_DEBUG): - _log.debug('%s: received analyticsKeyEvents', device) - return True - elif _log.isEnabledFor(_WARNING): - _log.warn('%s: unknown REPROG_CONTROLS_V4 %s', device, n) - - if feature == _F.WIRELESS_DEVICE_STATUS: - if n.address == 0x00: - if _log.isEnabledFor(_DEBUG): - _log.debug('wireless status: %s', n) - reason = 'powered on' if n.data[2] == 1 else None - if n.data[1] == 1: # device is asking for software reconfiguration so need to change status - # only show a user notification if the device can change hosts - # as we want to notify when a device changes to this host - # but the only indication we get is this notification - alert = _ALERT.NOTIFICATION if _F.CHANGE_HOST in device.features else _ALERT.NONE - status.changed(active=True, alert=alert, reason=reason) - else: - _log.warn('%s: unknown WIRELESS %s', device, n) - else: - _log.warn('%s: unknown WIRELESS %s', device, n) - return True - if feature == _F.SOLAR_DASHBOARD: if n.data[5:9] == b'GOOD': charge, lux, adc = _unpack('!BHH', n.data[:5]) @@ -360,6 +317,23 @@ def _process_feature_notification(device, status, n, feature): _log.warn('%s: SOLAR CHARGE not GOOD? %s', device, n) return True + if feature == _F.WIRELESS_DEVICE_STATUS: + if n.address == 0x00: + if _log.isEnabledFor(_DEBUG): + _log.debug('wireless status: %s', n) + reason = 'powered on' if n.data[2] == 1 else None + if n.data[1] == 1: # device is asking for software reconfiguration so need to change status + # only show a user notification if the device can change hosts + # as we want to notify when a device changes to this host + # but the only indication we get is this notification + alert = _ALERT.NOTIFICATION if _F.CHANGE_HOST in device.features else _ALERT.NONE + status.changed(active=True, alert=alert, reason=reason) + else: + _log.warn('%s: unknown WIRELESS %s', device, n) + else: + _log.warn('%s: unknown WIRELESS %s', device, n) + return True + if feature == _F.TOUCHMOUSE_RAW_POINTS: if n.address == 0x00: if _log.isEnabledFor(_INFO): @@ -372,24 +346,46 @@ def _process_feature_notification(device, status, n, feature): _log.info('%s: TOUCH MOUSE status: button_down=%s mouse_lifted=%s', device, button_down, mouse_lifted) else: _log.warn('%s: unknown TOUCH MOUSE %s', device, n) - return True - if feature == _F.HIRES_WHEEL: + # TODO: what are REPROG_CONTROLS_V{2,3}? + elif feature == _F.REPROG_CONTROLS: + if n.address == 0x00: + if _log.isEnabledFor(_INFO): + _log.info('%s: reprogrammable key: %s', device, n) + else: + _log.warn('%s: unknown REPROG_CONTROLS %s', device, n) + + elif feature == _F.REPROG_CONTROLS_V4: + if n.address == 0x00: + if _log.isEnabledFor(_DEBUG): + cid1, cid2, cid3, cid4 = _unpack('!HHHH', n.data[:8]) + _log.debug('%s: diverted controls pressed: %i, %i, %i, %i', device, cid1, cid2, cid3, cid4) + elif n.address == 0x10: + if _log.isEnabledFor(_DEBUG): + dx, dy = _unpack('!hh', n.data[:4]) + _log.debug('%s: rawXY dx=%i dy=%i', device, dx, dy) + elif n.address == 0x20: + if _log.isEnabledFor(_DEBUG): + _log.debug('%s: received analyticsKeyEvents', device) + elif _log.isEnabledFor(_WARNING): + _log.warn('%s: unknown REPROG_CONTROLS_V4 %s', device, n) + + elif feature == _F.HIRES_WHEEL: if (n.address == 0x00): if _log.isEnabledFor(_INFO): flags, delta_v = _unpack('>bh', n.data[:3]) high_res = (flags & 0x10) != 0 periods = flags & 0x0f _log.info('%s: WHEEL: res: %d periods: %d delta V:%-3d', device, high_res, periods, delta_v) - return True elif (n.address == 0x10): if _log.isEnabledFor(_INFO): flags = ord(n.data[:1]) ratchet = flags & 0x01 _log.info('%s: WHEEL: ratchet: %d', device, ratchet) - return True else: _log.warn('%s: unknown WHEEL %s', device, n) - return True - _log.warn('%s: unrecognized %s for feature %s (index %02X)', device, n, feature, n.sub_id) + _diversion.process_notification(device, status, n, feature) + + if _log.isEnabledFor(_DEBUG): + _log.debug('%s: unrecognized %s for feature %s (index %02X)', device, n, feature, n.sub_id)