receiver: initial implementation of boolean GESTURE 2 settings

This commit is contained in:
Vinícius 2020-08-29 14:57:49 -03:00 committed by Peter F. Patel-Schneider
parent 633760e261
commit 4a5c0ea523
5 changed files with 264 additions and 123 deletions

View File

@ -725,6 +725,7 @@ GESTURE = _NamedInts(
Finger8=97,
Finger9=98,
Finger10=99,
DeviceSpecificRawData=100,
)
GESTURE._fallback = lambda x: 'unknown:%04X' % x
@ -771,9 +772,11 @@ ACTION_ID._fallback = lambda x: 'unknown:%04X' % x
class Gesture(object):
enable_index = 0
def __init__(self, low, high):
index = {}
def __init__(self, device, low, high):
self._device = device
self.id = low
self.gesture = GESTURE[low]
self.can_be_enabled = high & 0x01
@ -782,33 +785,43 @@ class Gesture(object):
self.desired_software_default = high & 0x08
self.persistent = high & 0x10
self.default_enabled = high & 0x20
self.enable_index = None
self.index = None
if self.can_be_enabled or self.default_enabled:
self.enable_index = Gesture.enable_index
Gesture.enable_index += 1
self.index = Gesture.index.get(device, 0)
Gesture.index[device] = self.index + 1
self.offset, self.mask = self._offset_mask()
def enable_offset_mask(self): # offset and mask to enable or disable
if self.enable_index is not None:
offset = self.enable_index >> 3 # 8 gestures per byte
mask = 0x1 << (self.enable_index % 8)
def _offset_mask(self): # offset and mask
if self.index is not None:
offset = self.index >> 3 # 8 gestures per byte
mask = 0x1 << (self.index % 8)
return (offset, mask)
else:
return (None, None)
def enabled(self, device): # is the gesture enabled?
offset, mask = self.enable_offset_mask()
if offset is not None:
result = feature_request(device, FEATURE.GESTURE_2, 0x10, offset, 0x01, mask)
return bool(result[0] & mask) if result else None
def enabled(self): # is the gesture enabled?
if self.offset is not None:
result = feature_request(self._device, FEATURE.GESTURE_2, 0x10, self.offset, 0x01, self.mask)
return bool(result[0] & self.mask) if result else None
def set(self, device, enable): # enable or disable the gesture
def set(self, enable): # enable or disable the gesture
if not self.can_be_enabled:
return None
offset, mask = self.enable_offset_mask()
if offset is not None:
reply = feature_request(device, FEATURE.GESTURE_2, 0x20, offset, 0x01, mask, mask if enable else 0x00)
if self.offset is not None:
reply = feature_request(
self._device, FEATURE.GESTURE_2, 0x20, self.offset, 0x01, self.mask, self.mask if enable else 0x00
)
return reply
def as_int(self):
return self.gesture
def __int__(self):
return self.id
def __repr__(self):
return f'<Gesture {self.gesture} offset={self.offset} mask={self.mask}>'
# allow a gesture to be used as a settings reader/writer to enable and disable the gesture
read = enabled
write = set
@ -817,27 +830,28 @@ class Gesture(object):
class Param(object):
param_index = 0
def __init__(self, low, high):
def __init__(self, device, low, high):
self._device = device
self.id = low
self.param = PARAM(low)
self.param = PARAM[low]
self.size = high & 0x0F
self.show_in_ui = bool(high & 0x1F)
self._value = None
self.index = Param.param_index
Param.param_index += 1
def value(self, device):
return self._value if self._value is not None else self.read(device)
def value(self):
return self._value if self._value is not None else self.read()
def read(self, device): # returns the bytes for the parameter
result = feature_request(device, FEATURE.GESTURE_2, 0x70, self.index, 0xFF)
def read(self): # returns the bytes for the parameter
result = feature_request(self._device, FEATURE.GESTURE_2, 0x70, self.index, 0xFF)
if result:
self._value = result[:self.size]
return self._value
def write(self, device, bytes):
def write(self, bytes):
self._value = bytes
return feature_request(device, FEATURE.GESTURE_2, 0x80, self.index, bytes, 0xFF)
return feature_request(self._device, FEATURE.GESTURE_2, 0x80, self.index, bytes, 0xFF)
class Gestures(object):
@ -862,10 +876,10 @@ class Gestures(object):
if field_high == 0x1: # end of fields
break
elif field_high & 0x80:
gesture = Gesture(field_low, field_high)
gesture = Gesture(device, field_low, field_high)
self.gestures[gesture.gesture] = gesture
elif field_high & 0xF0 == 0x30 or field_high & 0xF0 == 0x20:
param = Param(field_low, field_high)
param = Param(device, field_low, field_high)
self.params[param.param] = param
elif field_high == 0x04:
if field_low != 0x00:
@ -873,6 +887,7 @@ class Gestures(object):
else:
_log.warn(f'Unimplemented GESTURE_2 field {field_low} {field_high} found.')
index += 1
device._gestures = self
def gesture(self, gesture):
return self.gestures.get(gesture, None)

View File

@ -101,23 +101,26 @@ class Setting(object):
if self._validator.kind == KIND.range:
return (self._validator.min_value, self._validator.max_value)
def read(self, cached=True):
assert hasattr(self, '_value')
assert hasattr(self, '_device')
if _log.isEnabledFor(_DEBUG):
_log.debug('%s: settings read %r from %s', self.name, self._value, self._device)
if self.persist and self._value is None and self._device.persister:
def _pre_read(self, cached, key=None):
if self.persist and self._value is None and getattr(self._device, 'persister', None):
# We haven't read a value from the device yet,
# maybe we have something in the configuration.
self._value = self._device.persister.get(self.name)
if cached and self._value is not None:
if self.persist and self._device.persister and self.name not in self._device.persister:
if self.persist and getattr(self._device, 'persister', None) and self.name not in self._device.persister:
# If this is a new device (or a new setting for an old device),
# make sure to save its current value for the next time.
self._device.persister[self.name] = self._value
def read(self, cached=True):
assert hasattr(self, '_value')
assert hasattr(self, '_device')
if _log.isEnabledFor(_DEBUG):
_log.debug('%s: settings read %r from %s', self.name, self._value, self._device)
self._pre_read(cached)
if cached and self._value is not None:
return self._value
if self._device.online:
@ -130,6 +133,13 @@ class Setting(object):
self._device.persister[self.name] = self._value
return self._value
def _pre_write(self):
# Remember the value we're trying to set, even if the write fails.
# This way even if the device is offline or some other error occurs,
# the last value we've tried to write is remembered in the configuration.
if self.persist and self._device.persister:
self._device.persister[self.name] = self._value
def write(self, value):
assert hasattr(self, '_value')
assert hasattr(self, '_device')
@ -139,12 +149,8 @@ class Setting(object):
_log.debug('%s: settings write %r to %s', self.name, value, self._device)
if self._device.online:
# Remember the value we're trying to set, even if the write fails.
# This way even if the device is offline or some other error occurs,
# the last value we've tried to write is remembered in the configuration.
self._value = value
if self.persist and self._device.persister:
self._device.persister[self.name] = value
self._pre_write()
current_value = None
if self._validator.needs_current_value:
@ -191,20 +197,12 @@ class Settings(Setting):
def read(self, cached=True):
assert hasattr(self, '_value')
assert hasattr(self, '_device')
if _log.isEnabledFor(_DEBUG):
_log.debug('%s: settings read %r from %s', self.name, self._value, self._device)
if self.persist and self._value is None and getattr(self._device, 'persister', None):
# We haven't read a value from the device yet,
# maybe we have something in the configuration.
self._value = self._device.persister.get(self.name)
self._pre_read(cached)
if cached and self._value is not None:
if self.persist and getattr(self._device, 'persister', None) and self.name not in self._device.persister:
# If this is a new device (or a new setting for an old device),
# make sure to save its current value for the next time.
self._device.persister[self.name] = self._value
return self._value
if self._device.online:
@ -226,16 +224,11 @@ class Settings(Setting):
assert hasattr(self, '_value')
assert hasattr(self, '_device')
assert key is not None
if _log.isEnabledFor(_DEBUG):
_log.debug('%s: settings read %r key %r from %s', self.name, self._value, key, self._device)
if self._value is None and getattr(self._device, 'persister', None):
self._value = self._device.persister.get(self.name)
self._pre_read(cached)
if cached and self._value is not None:
if self.persist and getattr(self._device, 'persister', None) and self.name not in self._device.persister:
self._device.persister[self.name] = self._value
return self._value[str(int(key))]
if self._device.online:
@ -255,13 +248,8 @@ class Settings(Setting):
_log.debug('%s: settings write %r to %s', self.name, map, self._device)
if self._device.online:
# Remember the value we're trying to set, even if the write fails.
# This way even if the device is offline or some other error occurs,
# the last value we've tried to write is remembered in the configuration.
self._value = map
if self.persist and self._device.persister:
self._device.persister[self.name] = map
self._pre_write()
for key, value in map.items():
data_bytes = self._validator.prepare_write(int(key), value)
if data_bytes is not None:
@ -270,7 +258,6 @@ class Settings(Setting):
reply = self._rw.write(self._device, int(key), data_bytes)
if not reply:
return None
return map
def write_key_value(self, key, value):
@ -283,14 +270,10 @@ class Settings(Setting):
_log.debug('%s: settings write key %r value %r to %s', self.name, key, value, self._device)
if self._device.online:
# Remember the value we're trying to set, even if the write fails.
# This way even if the device is offline or some other error occurs,
# the last value we've tried to write is remembered in the configuration.
try:
data_bytes = self._validator.prepare_write(int(key), value)
self._value[str(key)] = value
if self.persist and self._device.persister:
self._device.persister[self.name] = self._value
self._pre_write()
except ValueError:
data_bytes = value = None
if data_bytes is not None:
@ -309,25 +292,17 @@ class BitFieldSetting(Setting):
def read(self, cached=True):
assert hasattr(self, '_value')
assert hasattr(self, '_device')
if _log.isEnabledFor(_DEBUG):
_log.debug('%s: settings read %r from %s', self.name, self._value, self._device)
if self._value is None and getattr(self._device, 'persister', None):
# We haven't read a value from the device yet,
# maybe we have something in the configuration.
self._value = self._device.persister.get(self.name)
self._pre_read(cached)
if cached and self._value is not None:
if self.persist and getattr(self._device, 'persister', None) and self.name not in self._device.persister:
# If this is a new device (or a new setting for an old device),
# make sure to save its current value for the next time.
self._device.persister[self.name] = self._value
return self._value
if self._device.online:
reply_map = {}
reply = self._rw.read(self._device)
reply = self._do_read()
if reply:
# keys are ints, because that is what the device uses,
# encoded into strings because JSON requires strings as keys
@ -339,30 +314,32 @@ class BitFieldSetting(Setting):
self._device.persister[self.name] = self._value
return self._value
def _do_read(self):
return self._rw.read(self._device)
def read_key(self, key, cached=True):
assert hasattr(self, '_value')
assert hasattr(self, '_device')
assert key is not None
if _log.isEnabledFor(_DEBUG):
_log.debug('%s: settings read %r key %r from %s', self.name, self._value, key, self._device)
if self._value is None and getattr(self._device, 'persister', None):
self._value = self._device.persister.get(self.name)
self._pre_read(cached)
if cached and self._value is not None:
if self.persist and getattr(self._device, 'persister', None) and self.name not in self._device.persister:
self._device.persister[self.name] = self._value
return self._value[str(int(key))]
if self._device.online:
reply = self._rw.read(self._device, key)
reply = self._do_read_key(key)
if reply:
self._value = self._validator.validate_read(reply)
if self.persist and getattr(self._device, 'persister', None) and self.name not in self._device.persister:
self._device.persister[self.name] = self._value
return self._value[str(int(key))]
def _do_read_key(self, key):
return self._rw.read(self._device, key)
def write(self, map):
assert hasattr(self, '_value')
assert hasattr(self, '_device')
@ -372,19 +349,18 @@ class BitFieldSetting(Setting):
_log.debug('%s: settings write %r to %s', self.name, map, self._device)
if self._device.online:
# Remember the value we're trying to set, even if the write fails.
# This way even if the device is offline or some other error occurs,
# the last value we've tried to write is remembered in the configuration.
self._value = map
if self.persist and self._device.persister:
self._device.persister[self.name] = map
self._pre_write()
data_bytes = self._validator.prepare_write(self._value)
if data_bytes is not None:
if _log.isEnabledFor(_DEBUG):
_log.debug('%s: settings prepare map write(%s) => %r', self.name, self._value, data_bytes)
reply = self._rw.write(self._device, data_bytes)
if not reply:
return None
# if prepare_write returns a list, write one item at a time
seq = data_bytes if isinstance(data_bytes, list) else [data_bytes]
for b in seq:
reply = self._rw.write(self._device, b)
if not reply:
return None
return map
def write_key_value(self, key, value):
@ -397,26 +373,36 @@ class BitFieldSetting(Setting):
_log.debug('%s: settings write key %r value %r to %s', self.name, key, value, self._device)
if self._device.online:
# Remember the value we're trying to set, even if the write fails.
# This way even if the device is offline or some other error occurs,
# the last value we've tried to write is remembered in the configuration.
value = bool(value)
self._value[str(key)] = value
if self.persist and self._device.persister:
self._device.persister[self.name] = self._value
self._pre_write()
data_bytes = self._validator.prepare_write(self._value)
if data_bytes is not None:
if _log.isEnabledFor(_DEBUG):
_log.debug('%s: settings prepare key value write(%s,%s) => %r', self.name, key, str(value), data_bytes)
reply = self._rw.write(self._device, data_bytes)
if not reply:
# tell whomever is calling that the write failed
return None
# if prepare_write returns a list, write one item at a time
seq = data_bytes if isinstance(data_bytes, list) else [data_bytes]
for b in seq:
reply = self._rw.write(self._device, b)
if not reply:
return None
return value
class BitFieldWithOffsetAndMaskSetting(BitFieldSetting):
"""A setting descriptor for a set of choices represented by one bit each,
each one having an offset, being a map from options to booleans.
Needs to be instantiated for each specific device."""
def _do_read(self):
return {r: self._rw.read(self._device, r) for r in self._validator.prepare_read()}
def _do_read_key(self, key):
r = self._validator.prepare_read_key(key)
return {r: self._rw.read(self._device, r)}
#
# read/write low-level operators
#
@ -452,9 +438,9 @@ class FeatureRW(object):
self.write_fnid = write_fnid
self.no_reply = no_reply
def read(self, device):
def read(self, device, data_bytes=b''):
assert self.feature is not None
return device.feature_request(self.feature, self.read_fnid)
return device.feature_request(self.feature, self.read_fnid, data_bytes)
def write(self, device, data_bytes):
assert self.feature is not None
@ -641,6 +627,89 @@ class BitFieldValidator(object):
w |= int(k)
return _int2bytes(w, self.byte_count)
def all_options(self):
return self.options
class BitFieldWithOffsetAndMaskValidator(object):
__slots__ = ('byte_count', 'options', '_option_from_key', '_mask_from_offset', '_option_from_offset_mask')
kind = KIND.multiple_toggle
sep = 0x01
def __init__(self, options, byte_count=None):
assert (isinstance(options, list))
self.options = options
# to retrieve the options efficiently:
self._option_from_key = {}
self._mask_from_offset = {}
self._option_from_offset_mask = {}
for opt in options:
self._option_from_key[opt.gesture] = opt
try:
self._mask_from_offset[opt.offset] |= opt.mask
except KeyError:
self._mask_from_offset[opt.offset] = opt.mask
try:
mask_to_opt = self._option_from_offset_mask[opt.offset]
except KeyError:
mask_to_opt = {}
self._option_from_offset_mask[opt.offset] = mask_to_opt
mask_to_opt[opt.mask] = opt
self.byte_count = (max(x.mask.bit_length() for x in options) + 7) // 8
if byte_count:
assert (isinstance(byte_count, int) and byte_count >= self.byte_count)
self.byte_count = byte_count
def prepare_read(self):
r = []
for offset, mask in self._mask_from_offset.items():
b = (offset << (8 * (self.byte_count + 1)))
b |= (self.sep << (8 * self.byte_count)) | mask
r.append(_int2bytes(b, self.byte_count + 2))
return r
def prepare_read_key(self, key):
option = self._option_from_key.get(key, None)
if option is None:
return None
b = option.offset << (8 * (self.byte_count + 1))
b |= (self.sep << (8 * self.byte_count)) | option.mask
return _int2bytes(b, self.byte_count + 2)
def validate_read(self, reply_bytes_dict):
values = {str(int(k)): False for k in self.options}
for query, b in reply_bytes_dict.items():
offset = _bytes2int(query[0:1])
b += (self.byte_count - len(b)) * b'\x00'
value = _bytes2int(b[:self.byte_count])
mask_to_opt = self._option_from_offset_mask.get(offset, {})
m = 1
for _ in range(8 * self.byte_count):
if m in mask_to_opt:
values[str(int(mask_to_opt[m]))] = bool(value & m)
m <<= 1
return values
def prepare_write(self, new_value):
assert (isinstance(new_value, dict))
w = {}
for k, v in new_value.items():
option = self._option_from_key[int(k)]
if option.offset not in w:
w[option.offset] = 0
if v:
w[option.offset] |= option.mask
return [
_int2bytes((offset << (8 * (2 * self.byte_count + 1)))
| (self.sep << (16 * self.byte_count))
| (self._mask_from_offset[offset] << (8 * self.byte_count))
| value, 2 * self.byte_count + 2) for offset, value in w.items()
]
def all_options(self):
return [int(opt) if isinstance(opt, int) else opt.as_int() for opt in self.options]
class ChoicesValidator(object):
kind = KIND.choice

View File

@ -33,6 +33,8 @@ from .common import unpack as _unpack
from .i18n import _
from .settings import BitFieldSetting as _BitFieldSetting
from .settings import BitFieldValidator as _BitFieldV
from .settings import BitFieldWithOffsetAndMaskSetting as _BitFieldOMSetting
from .settings import BitFieldWithOffsetAndMaskValidator as _BitFieldOMV
from .settings import BooleanValidator as _BooleanV
from .settings import ChoicesMapValidator as _ChoicesMapV
from .settings import ChoicesValidator as _ChoicesV
@ -91,6 +93,7 @@ _THUMB_SCROLL_MODE = ('thumb-scroll-mode', _('HID++ Thumb Scrolling'),
_('HID++ mode for horizontal scroll with the thumb wheel.') + '\n' +
_('Effectively turns off thumb scrolling in Linux.'))
_THUMB_SCROLL_INVERT = ('thumb-scroll-invert', _('Thumb Scroll Invert'), _('Invert thumb scroll direction.'))
_GESTURE2_GESTURES = ('gesture2-gestures', _('Touchpad gestures'), _('Tweaks the touchpad behaviour.'))
# yapf: enable
# Setting template functions need to set up the setting itself, the validator, and the reader/writer.
@ -401,6 +404,18 @@ def _feature_thumb_invert():
return _Setting(_THUMB_SCROLL_INVERT, rw, validator, device_kind=(_DK.mouse, _DK.trackball))
def _feature_gesture2_gesture_callback(device):
options = [g for g in _hidpp20.get_gestures(device).gestures.values() if g.can_be_enabled or g.default_enabled]
return _BitFieldOMV(options) if options else None
def _feature_gesture2_gesture():
rw = _FeatureRW(_F.GESTURE_2, read_fnid=0x10, write_fnid=0x20)
return _BitFieldOMSetting(
_GESTURE2_GESTURES, rw, callback=_feature_gesture2_gesture_callback, device_kind=(_DK.touchpad, )
)
#
#
#
@ -432,6 +447,7 @@ _SETTINGS_TABLE = [
_S(_CHANGE_HOST, _F.CHANGE_HOST, _feature_change_host),
_S(_THUMB_SCROLL_MODE, _F.THUMB_WHEEL, _feature_thumb_mode),
_S(_THUMB_SCROLL_INVERT, _F.THUMB_WHEEL, _feature_thumb_invert),
_S(_GESTURE2_GESTURES, _F.GESTURE_2, _feature_gesture2_gesture),
]
_SETTINGS_LIST = namedtuple('_SETTINGS_LIST', [s[4] for s in _SETTINGS_TABLE])

View File

@ -206,9 +206,9 @@ def _print_device(dev, num=None):
if dev.online and dev.gestures:
print(' Has %d gestures and %d param:' % (len(dev.gestures.gestures), len(dev.gestures.params)))
for k in dev.gestures.gestures.values():
print(' %-26s Enabled (%4s): %s' % (k.gesture, k.enable_index, k.enabled(dev)))
print(' %-26s Enabled (%4s): %s' % (k.gesture, k.index, k.enabled()))
for k in dev.gestures.params.values():
print(' %-26s Value (%4s): %s' % (k.param, k.index, k.value(dev)))
print(' %-26s Value (%4s): %s' % (k.param, k.index, k.value()))
if dev.online:
battery = _hidpp20.get_battery(dev)
if battery is None:

View File

@ -167,6 +167,46 @@ def _create_slider_control(setting):
return control.gtk_range
def _create_multiple_toggle_control(setting):
def _toggle_notify(control, _, setting):
if control.get_sensitive():
key = control._setting_key
new_state = control.get_active()
if setting._value[key] != new_state:
setting._value[key] = new_state
_write_async_key_value(setting, key, new_state, control.get_parent().get_parent().get_parent().get_parent())
def _toggle_display(lb):
lb._showing = not lb._showing
if not lb._showing:
for c in lb.get_children()[1:]:
lb._hidden_rows.append(c)
lb.remove(c)
else:
for c in lb._hidden_rows:
lb.add(c)
lb._hidden_rows = []
lb = Gtk.ListBox()
lb._hidden_rows = []
lb._toggle_display = (lambda l: (lambda: _toggle_display(l)))(lb)
lb.set_selection_mode(Gtk.SelectionMode.NONE)
btn = Gtk.Button('? / ?')
lb.add(btn)
lb._showing = True
for k in setting._validator.all_options():
h = Gtk.HBox(homogeneous=False, spacing=0)
lbl = Gtk.Label(k)
control = Gtk.Switch()
control._setting_key = str(int(k))
control.connect('notify::active', _toggle_notify, setting)
h.pack_start(lbl, False, False, 0)
h.pack_end(control, False, False, 0)
lb.add(h)
btn.connect('clicked', lambda _: lb._toggle_display())
return lb
#
#
#
@ -195,22 +235,9 @@ def _create_sbox(s):
control = _create_map_choice_control(s)
sbox.pack_end(control, True, True, 0)
elif s.kind == _SETTING_KIND.multiple_toggle:
# ugly temporary hack!
choices = {k: [False, True] for k in s._validator.options}
class X:
def __init__(self, obj, ext):
self.obj = obj
self.ext = ext
def __getattr__(self, attr):
try:
return self.ext[attr]
except KeyError:
return getattr(self.obj, attr)
control = _create_map_choice_control(X(s, {'choices': choices}))
sbox.pack_end(control, True, True, 0)
control = _create_multiple_toggle_control(s)
sbox.get_children()[0].set_valign(Gtk.Align.START)
sbox.pack_end(control, False, False, 0)
else:
raise Exception('NotImplemented')
@ -222,6 +249,7 @@ def _create_sbox(s):
sbox.set_tooltip_text(s.description)
sbox.show_all()
spinner.start() # the first read will stop it
failed.set_visible(False)
@ -250,6 +278,19 @@ def _update_setting_item(sbox, value, is_online=True):
kbox, vbox = control.get_children() # depends on box layout
if value.get(kbox.get_active_id()):
vbox.set_active_id(str(value.get(kbox.get_active_id())))
elif isinstance(control, Gtk.ListBox):
hidden = getattr(control, '_hidden_rows', [])
total = len(control.get_children()) + len(hidden) - 1
active = 0
for ch in control.get_children()[1:] + hidden:
elem = ch.get_children()[0].get_children()[-1]
v = value.get(elem._setting_key, None)
if v is not None:
elem.set_active(v)
if elem.get_active():
active += 1
control.get_children()[0].get_children()[0].set_label(f'{active} / {total}')
else:
raise Exception('NotImplemented')
control.set_sensitive(True)