rules: add Solaar rules command to set settings

testing
This commit is contained in:
Peter F. Patel-Schneider 2022-01-02 15:20:48 -05:00
parent 5b1125cd11
commit 8a87b9b013
5 changed files with 156 additions and 46 deletions

View File

@ -27,7 +27,9 @@ from math import sqrt as _sqrt
import _thread import _thread
import psutil import psutil
import solaar.ui.window as _window
from solaar.ui.config_panel import change_setting as _change_setting
from yaml import add_representer as _yaml_add_representer from yaml import add_representer as _yaml_add_representer
from yaml import dump_all as _yaml_dump_all from yaml import dump_all as _yaml_dump_all
from yaml import safe_load_all as _yaml_safe_load_all from yaml import safe_load_all as _yaml_safe_load_all
@ -686,6 +688,41 @@ class MouseClick(Action):
return {'MouseClick': [self.button, self.count]} return {'MouseClick': [self.button, self.count]}
class Set(Action):
def __init__(self, args):
if not (isinstance(args, list) and len(args) > 2):
_log.warn('rule Set argument not list with minimum length 3: %s', args)
self.args = []
else:
self.args = args
def __str__(self):
return 'Set: ' + ' '.join([str(a) for a in self.args])
def evaluate(self, feature, notification, device, status, last_result):
if len(self.args) < 3:
return None
if _log.isEnabledFor(_INFO):
_log.info('Set action: %s', self.args)
dev = _window.find_device(self.args[0]) if self.args[0] is not None else device
if dev is None:
_log.error('Set action: device %s is not known', self.args[0])
return None
setting = next((s for s in dev.settings if s.name == self.args[1]), None)
if setting is None:
_log.error('Set action: setting %s is not the name of a setting for %s', self.args[1], dev.name)
return None
args = setting.acceptable(self.args[2:], setting.read())
if args is None:
_log.error('Set Action: invalid args %s for setting %s of %s', self.args[2:], self.args[1], self.args[0])
return None
_change_setting(dev, setting, args)
return None
def data(self):
return {'Set': self.args[:]}
class Execute(Action): class Execute(Action):
def __init__(self, args): def __init__(self, args):
if isinstance(args, str): if isinstance(args, str):
@ -726,6 +763,7 @@ COMPONENTS = {
'KeyPress': KeyPress, 'KeyPress': KeyPress,
'MouseScroll': MouseScroll, 'MouseScroll': MouseScroll,
'MouseClick': MouseClick, 'MouseClick': MouseClick,
'Set': Set,
'Execute': Execute, 'Execute': Execute,
} }

View File

@ -172,6 +172,9 @@ class Setting:
return value return value
def acceptable(self, args, current):
return self._validator.acceptable(args, current) if self._validator else None
def apply(self): def apply(self):
assert hasattr(self, '_value') assert hasattr(self, '_value')
assert hasattr(self, '_device') assert hasattr(self, '_device')
@ -371,7 +374,7 @@ class LongSettings(Setting):
return None return None
return map return map
def write_item_value(self, item, value): def write_key_value(self, item, value):
assert hasattr(self, '_value') assert hasattr(self, '_value')
assert hasattr(self, '_device') assert hasattr(self, '_device')
assert item is not None assert item is not None
@ -689,7 +692,7 @@ class BooleanValidator:
if new_value is None: if new_value is None:
new_value = False new_value = False
else: else:
assert isinstance(new_value, bool) assert isinstance(new_value, bool), 'New value %s for boolean setting is not a boolean' % new_value
to_write = self.true_value if new_value else self.false_value to_write = self.true_value if new_value else self.false_value
@ -720,6 +723,12 @@ class BooleanValidator:
return self.write_prefix_bytes + to_write return self.write_prefix_bytes + to_write
def acceptable(self, args, current):
if len(args) != 1:
return None
val = [args[0]] if type(args[0]) == bool else [not current] if args[0] == '~' else None
return val
class BitFieldValidator: class BitFieldValidator:
__slots__ = ('byte_count', 'options') __slots__ = ('byte_count', 'options')
@ -755,6 +764,15 @@ class BitFieldValidator:
def all_options(self): def all_options(self):
return self.options return self.options
def acceptable(self, args, current):
if len(args) != 2:
return None
key = next((key for key in self.options if key == args[0]), None)
if key is None:
return None
val = args[1] if type(args[1]) == bool else not current[str(int(key))] if args[1] == '~' else None
return None if val is None else [str(int(key)), val]
class BitFieldWithOffsetAndMaskValidator: class BitFieldWithOffsetAndMaskValidator:
__slots__ = ('byte_count', 'options', '_option_from_key', '_mask_from_offset', '_option_from_offset_mask') __slots__ = ('byte_count', 'options', '_option_from_key', '_mask_from_offset', '_option_from_offset_mask')
@ -837,6 +855,15 @@ class BitFieldWithOffsetAndMaskValidator:
def all_options(self): def all_options(self):
return [int(opt) if isinstance(opt, int) else opt.as_int() for opt in self.options] return [int(opt) if isinstance(opt, int) else opt.as_int() for opt in self.options]
def acceptable(self, args, current):
if len(args) != 2:
return None
key = next((option.id for option in self.options if option.as_int() == args[0]), None)
if key is None:
return None
val = args[1] if type(args[1]) == bool else not current[str(int(key))] if args[1] == '~' else None
return None if val is None else [str(key), val]
class ChoicesValidator: class ChoicesValidator:
kind = KIND.choice kind = KIND.choice
@ -870,21 +897,31 @@ class ChoicesValidator:
def prepare_write(self, new_value, current_value=None): def prepare_write(self, new_value, current_value=None):
if new_value is None: if new_value is None:
choice = self.choices[:][0] value = self.choices[:][0]
else: else:
if isinstance(new_value, int): value = self.choice(new_value)
choice = self.choices[new_value] if value is None:
elif int(new_value) in self.choices:
choice = self.choices[int(new_value)]
elif new_value in self.choices:
choice = self.choices[new_value]
else:
raise ValueError(new_value)
if choice is None:
raise ValueError('invalid choice %r' % new_value) raise ValueError('invalid choice %r' % new_value)
assert isinstance(choice, _NamedInt) assert isinstance(value, _NamedInt)
return self._write_prefix_bytes + choice.bytes(self._byte_count) return self._write_prefix_bytes + value.bytes(self._byte_count)
def choice(self, value):
if isinstance(value, int):
return self.choices[value]
try:
int(value)
if int(value) in self.choices:
return self.choices[int(value)]
except Exception:
pass
if value in self.choices:
return self.choices[value]
else:
return None
def acceptable(self, args, current):
choice = self.choice(args[0]) if len(args) == 1 else None
return None if choice is None else [choice]
class ChoicesMapValidator(ChoicesValidator): class ChoicesMapValidator(ChoicesValidator):
@ -949,6 +986,15 @@ class ChoicesMapValidator(ChoicesValidator):
new_value = new_value | self.activate new_value = new_value | self.activate
return self._write_prefix_bytes + new_value.to_bytes(self._byte_count, 'big') return self._write_prefix_bytes + new_value.to_bytes(self._byte_count, 'big')
def acceptable(self, args, current):
if len(args) != 2:
return None
key, choices = next(((key, item) for key, item in self.choices.items() if key == args[0]), (None, None))
if choices is None or args[1] not in choices:
return None
choice = next((item for item in choices if item == args[1]), None)
return [str(int(key)), int(choice)] if choice is not None else None
class RangeValidator: class RangeValidator:
__slots__ = ('min_value', 'max_value', 'flag', '_byte_count', 'needs_current_value') __slots__ = ('min_value', 'max_value', 'flag', '_byte_count', 'needs_current_value')
@ -982,6 +1028,11 @@ class RangeValidator:
raise ValueError('invalid choice %r' % new_value) raise ValueError('invalid choice %r' % new_value)
return _int2bytes(new_value, self._byte_count) return _int2bytes(new_value, self._byte_count)
def acceptable(self, args, current):
arg = args[0]
# None if len(args) != 1 or type(arg) != int or arg < self.min_value or arg > self.max_value else args)
return None if len(args) != 1 or type(arg) != int or arg < self.min_value or arg > self.max_value else args
class MultipleRangeValidator: class MultipleRangeValidator:
@ -1052,6 +1103,9 @@ class MultipleRangeValidator:
w += _int2bytes(v, sub_item.length) w += _int2bytes(v, sub_item.length)
return w + b'\xFF' return w + b'\xFF'
def acceptable(self, args, current):
pass # not implemented yet
class ActionSettingRW: class ActionSettingRW:
"""Special RW class for settings that turn on and off special processing when a key or button is depressed""" """Special RW class for settings that turn on and off special processing when a key or button is depressed"""

View File

@ -211,7 +211,7 @@ def set(dev, setting, args):
else: else:
raise Exception("%s: key '%s' not in setting" % (setting.name, key)) raise Exception("%s: key '%s' not in setting" % (setting.name, key))
message = 'Setting %s key %s parameter %s to %r' % (setting.name, k, args.extra_subkey, item[args.extra_subkey]) message = 'Setting %s key %s parameter %s to %r' % (setting.name, k, args.extra_subkey, item[args.extra_subkey])
result = setting.write_item_value(int(k), item) result = setting.write_key_value(int(k), item)
else: else:
raise Exception('NotImplemented') raise Exception('NotImplemented')

View File

@ -37,18 +37,24 @@ def _read_async(setting, force_read, sbox, device_is_online, sensitive):
_ui_async(_do_read, setting, force_read, sbox, device_is_online, sensitive) _ui_async(_do_read, setting, force_read, sbox, device_is_online, sensitive)
def _write_async(setting, value, sbox): def _write_async(setting, value, sbox, sensitive=True, key=None):
failed, spinner, control = _get_failed_spinner_control(sbox) if sbox:
control.set_sensitive(False) failed, spinner, control = _get_failed_spinner_control(sbox)
failed.set_visible(False) control.set_sensitive(False)
spinner.set_visible(True) failed.set_visible(False)
spinner.start() spinner.set_visible(True)
spinner.start()
def _do_write(s, v, sb): def _do_write(s, v, sb, key):
v = setting.write(v) if key is None:
GLib.idle_add(_update_setting_item, sb, v, True, priority=99) v = setting.write(v)
else:
v = setting.write_key_value(key, v)
v = {key: v}
if sb:
GLib.idle_add(_update_setting_item, sb, v, True, sensitive, priority=99)
_ui_async(_do_write, setting, value, sbox) _ui_async(_do_write, setting, value, sbox, key)
def _write_async_key_value(setting, key, value, sbox): def _write_async_key_value(setting, key, value, sbox):
@ -65,20 +71,6 @@ def _write_async_key_value(setting, key, value, sbox):
_ui_async(_do_write_key_value, setting, key, value, sbox) _ui_async(_do_write_key_value, setting, key, value, sbox)
def _write_async_item_value(setting, item, value, sbox):
failed, spinner, control = _get_failed_spinner_control(sbox)
control.set_sensitive(False)
failed.set_visible(False)
spinner.set_visible(True)
spinner.start()
def _do_write_item_value(s, k, v, sb):
v = setting.write_item_value(k, v)
GLib.idle_add(_update_setting_item, sb, {k: v}, True, priority=99)
_ui_async(_do_write_item_value, setting, item, value, sbox)
# #
# #
# #
@ -257,7 +249,7 @@ def _create_multiple_range_control(setting, change):
p = control p = control
for _i in range(7): for _i in range(7):
p = p.get_parent() p = p.get_parent()
_write_async_item_value(setting, str(int(item)), setting._value[str(int(item))], p) _write_async_key_value(setting, str(int(item)), setting._value[str(int(item))], p)
def _changed(control, setting, item, sub_item): def _changed(control, setting, item, sub_item):
if control.get_sensitive(): if control.get_sensitive():
@ -385,9 +377,10 @@ def _change_click(button, arg):
def _change_icon(allowed, icon): def _change_icon(allowed, icon):
allowed = allowed if allowed in _allowables_icons else True if allowed in _allowables_icons:
icon.set_from_icon_name(_allowables_icons[allowed], Gtk.IconSize.LARGE_TOOLBAR) icon._allowed = allowed
icon.set_tooltip_text(_allowables_tooltips[allowed]) icon.set_from_icon_name(_allowables_icons[allowed], Gtk.IconSize.LARGE_TOOLBAR)
icon.set_tooltip_text(_allowables_tooltips[allowed])
def _create_sbox(s, device): def _create_sbox(s, device):
@ -478,7 +471,7 @@ def _update_setting_item(sbox, value, is_online=True, sensitive=True):
control.set_sensitive(False) control.set_sensitive(False)
failed.set_visible(False) failed.set_visible(False)
if isinstance(control, Gtk.Switch): if isinstance(control, Gtk.Switch):
control.set_active(value) control.set_state(value)
elif isinstance(control, Gtk.ComboBoxText): elif isinstance(control, Gtk.ComboBoxText):
control.set_active_id(str(int(value))) control.set_active_id(str(int(value)))
elif isinstance(control, Gtk.Scale): elif isinstance(control, Gtk.Scale):
@ -527,6 +520,7 @@ def _update_setting_item(sbox, value, is_online=True, sensitive=True):
else: else:
raise Exception('NotImplemented') raise Exception('NotImplemented')
sensitive = sbox._change_icon._allowed if sensitive is None else sensitive
control.set_sensitive(sensitive is True) control.set_sensitive(sensitive is True)
_change_icon(sensitive, sbox._change_icon) _change_icon(sensitive, sbox._change_icon)
@ -583,7 +577,6 @@ def update(device, is_online=None):
else: else:
sbox = _items[k] = _create_sbox(s, device) sbox = _items[k] = _create_sbox(s, device)
_box.pack_start(sbox, False, False, 0) _box.pack_start(sbox, False, False, 0)
sensitive = device.persister.get_sensitivity(s.name) if device.persister else True sensitive = device.persister.get_sensitivity(s.name) if device.persister else True
_read_async(s, False, sbox, is_online, sensitive) _read_async(s, False, sbox, is_online, sensitive)
@ -606,3 +599,13 @@ def destroy():
global _box global _box
_box = None _box = None
_items.clear() _items.clear()
def change_setting(device, setting, values):
device = setting._device
device_path = device.receiver.path if device.receiver else device.path
if (device_path, device.number, setting.name) in _items:
sbox = _items[(device_path, device.number, setting.name)]
else:
sbox = None
_write_async(setting, values[-1], sbox, None, key=values[0] if len(values) > 1 else None)

View File

@ -926,3 +926,18 @@ def update_device(device, item, selected_device_id, need_popup, full=False):
elif selected_device_id == (device.receiver.path if device.receiver else device.path, device.number): elif selected_device_id == (device.receiver.path if device.receiver else device.path, device.number):
full_update = full or was_online != is_online full_update = full or was_online != is_online
_update_info_panel(device, full=full_update) _update_info_panel(device, full=full_update)
def find_device(serial):
assert serial, 'need serial number or unit ID to find a device'
result = None
def check(_store, _treepath, row):
nonlocal result
device = _model.get_value(row, _COLUMN.DEVICE)
if device and device.kind and (device.unitId == serial or device.serial == serial):
result = device
return True
_model.foreach(check)
return result