enable notifications for peripherals; fixes #27
This commit is contained in:
parent
e2cf9255ac
commit
a5eeac6e5a
|
@ -85,7 +85,7 @@ class NamedInts(object):
|
||||||
if the value already exists in the set (int or string), ValueError will be
|
if the value already exists in the set (int or string), ValueError will be
|
||||||
raised.
|
raised.
|
||||||
"""
|
"""
|
||||||
__slots__ = ['__dict__', '_values', '_indexed', '_fallback']
|
__slots__ = ['__dict__', '_values', '_indexed', '_fallback', '_all_bits']
|
||||||
|
|
||||||
def __init__(self, **kwargs):
|
def __init__(self, **kwargs):
|
||||||
def _readable_name(n):
|
def _readable_name(n):
|
||||||
|
@ -99,6 +99,7 @@ class NamedInts(object):
|
||||||
self._values = sorted(list(values.values()))
|
self._values = sorted(list(values.values()))
|
||||||
self._indexed = {int(v): v for v in self._values}
|
self._indexed = {int(v): v for v in self._values}
|
||||||
self._fallback = None
|
self._fallback = None
|
||||||
|
self._all_bits = sum(self._values)
|
||||||
|
|
||||||
@classmethod
|
@classmethod
|
||||||
def range(cls, from_value, to_value, name_generator=lambda x: str(x), step=1):
|
def range(cls, from_value, to_value, name_generator=lambda x: str(x), step=1):
|
||||||
|
@ -116,6 +117,9 @@ class NamedInts(object):
|
||||||
if unknown_bits:
|
if unknown_bits:
|
||||||
yield 'unknown:%06X' % unknown_bits
|
yield 'unknown:%06X' % unknown_bits
|
||||||
|
|
||||||
|
def all_bits(self):
|
||||||
|
return self._all_bits
|
||||||
|
|
||||||
def __getitem__(self, index):
|
def __getitem__(self, index):
|
||||||
if isinstance(index, int):
|
if isinstance(index, int):
|
||||||
if index in self._indexed:
|
if index in self._indexed:
|
||||||
|
|
|
@ -40,6 +40,7 @@ POWER_SWITCH_LOCATION = _NamedInts(
|
||||||
|
|
||||||
NOTIFICATION_FLAG = _NamedInts(
|
NOTIFICATION_FLAG = _NamedInts(
|
||||||
battery_status=0x100000,
|
battery_status=0x100000,
|
||||||
|
# reserved_r1b4=0x001000, unknown
|
||||||
wireless=0x000100,
|
wireless=0x000100,
|
||||||
software_present=0x000800)
|
software_present=0x000800)
|
||||||
|
|
||||||
|
@ -76,7 +77,8 @@ def get_register(device, name, default_number=-1):
|
||||||
return reply
|
return reply
|
||||||
|
|
||||||
if not known_register and device.ping():
|
if not known_register and device.ping():
|
||||||
_log.warn("%s: failed to read '%s' from default register 0x%02X, blacklisting", device, name, default_number)
|
_log.warn("%s: failed to read '%s' from default register 0x%02X, blacklisting",
|
||||||
|
device, name, default_number)
|
||||||
device.registers[name] = -default_number
|
device.registers[name] = -default_number
|
||||||
|
|
||||||
|
|
||||||
|
@ -160,3 +162,16 @@ def get_firmware(device):
|
||||||
firmware.append(bl)
|
firmware.append(bl)
|
||||||
|
|
||||||
return tuple(firmware)
|
return tuple(firmware)
|
||||||
|
|
||||||
|
|
||||||
|
def get_notification_flags(device):
|
||||||
|
flags = device.request(0x8100)
|
||||||
|
if flags is not None:
|
||||||
|
assert len(flags) == 3
|
||||||
|
return ord(flags[0:1]) << 16 | ord(flags[1:2]) << 8 | ord(flags[2:3])
|
||||||
|
|
||||||
|
|
||||||
|
def set_notification_flags(device, *flag_bits):
|
||||||
|
flag_bits = sum(int(b) for b in flag_bits)
|
||||||
|
result = device.request(0x8000, 0xFF & (flag_bits >> 16), 0xFF & (flag_bits >> 8), 0xFF & flag_bits)
|
||||||
|
return result is not None
|
||||||
|
|
|
@ -165,6 +165,25 @@ class PairedDevice(object):
|
||||||
_descriptors.check_features(self, self._settings)
|
_descriptors.check_features(self, self._settings)
|
||||||
return self._settings
|
return self._settings
|
||||||
|
|
||||||
|
def enable_notifications(self, enable=True):
|
||||||
|
"""Enable or disable device (dis)connection notifications on this
|
||||||
|
receiver."""
|
||||||
|
if not self.receiver or not self.receiver.handle:
|
||||||
|
return False
|
||||||
|
|
||||||
|
set_flag_bits = _hidpp10.NOTIFICATION_FLAG.all_bits() if enable else 0
|
||||||
|
ok = _hidpp10.set_notification_flags(self, set_flag_bits)
|
||||||
|
|
||||||
|
flag_bits = _hidpp10.get_notification_flags(self)
|
||||||
|
if flag_bits is not None:
|
||||||
|
flag_bits = tuple(_hidpp10.NOTIFICATION_FLAG.flag_names(flag_bits))
|
||||||
|
|
||||||
|
if ok:
|
||||||
|
_log.info("%s: device notifications %s %s", self, 'enabled' if enable else 'disabled', flag_bits)
|
||||||
|
else:
|
||||||
|
_log.warn("%s: failed to %s device notifications %s", self, 'enable' if enable else 'disable', flag_bits)
|
||||||
|
return ok
|
||||||
|
|
||||||
def request(self, request_id, *params):
|
def request(self, request_id, *params):
|
||||||
return _base.request(self.receiver.handle, self.number, request_id, *params)
|
return _base.request(self.receiver.handle, self.number, request_id, *params)
|
||||||
|
|
||||||
|
@ -252,22 +271,18 @@ class Receiver(object):
|
||||||
receiver."""
|
receiver."""
|
||||||
if not self.handle:
|
if not self.handle:
|
||||||
return False
|
return False
|
||||||
if enable:
|
|
||||||
# set all possible flags
|
|
||||||
ok = self.request(0x8000, 0xFF, 0xFF, 0xFF)
|
|
||||||
else:
|
|
||||||
# clear out all possible flags
|
|
||||||
ok = self.request(0x8000)
|
|
||||||
|
|
||||||
flags = self.request(0x8100)
|
flag_bits = _hidpp10.NOTIFICATION_FLAG.all_bits() if enable else 0
|
||||||
if flags:
|
ok = _hidpp10.set_notification_flags(self, flag_bits)
|
||||||
flags = ord(flags[0:1]) << 16 | ord(flags[1:2]) << 8 | ord(flags[2:3])
|
|
||||||
flags = tuple(_hidpp10.NOTIFICATION_FLAG.flag_names(flags))
|
flag_bits = _hidpp10.get_notification_flags(self)
|
||||||
|
if flag_bits is not None:
|
||||||
|
flag_bits = tuple(_hidpp10.NOTIFICATION_FLAG.flag_names(flag_bits))
|
||||||
|
|
||||||
if ok:
|
if ok:
|
||||||
_log.info("%s: device notifications %s %s", self, 'enabled' if enable else 'disabled', flags)
|
_log.info("%s: device notifications %s %s", self, 'enabled' if enable else 'disabled', flag_bits)
|
||||||
else:
|
else:
|
||||||
_log.warn("%s: failed to %s device notifications %s", self, 'enable' if enable else 'disable', flags)
|
_log.warn("%s: failed to %s device notifications %s", self, 'enable' if enable else 'disable', flag_bits)
|
||||||
return ok
|
return ok
|
||||||
|
|
||||||
def notify_devices(self):
|
def notify_devices(self):
|
||||||
|
@ -375,7 +390,7 @@ class Receiver(object):
|
||||||
|
|
||||||
@classmethod
|
@classmethod
|
||||||
def open(self, path):
|
def open(self, path):
|
||||||
"""Opens a Logitech Receiver found attached to the machine, by device path.
|
"""Opens a Logitech Receiver found attached to the machine, by Linux device path.
|
||||||
|
|
||||||
:returns: An open file handle for the found receiver, or ``None``.
|
:returns: An open file handle for the found receiver, or ``None``.
|
||||||
"""
|
"""
|
||||||
|
|
|
@ -173,8 +173,11 @@ class DeviceStatus(dict):
|
||||||
|
|
||||||
def _changed(self, active=True, alert=ALERT.NONE, reason=None, timestamp=None):
|
def _changed(self, active=True, alert=ALERT.NONE, reason=None, timestamp=None):
|
||||||
assert self._changed_callback
|
assert self._changed_callback
|
||||||
self._active = active
|
was_active, self._active = self._active, active
|
||||||
if not active:
|
if active:
|
||||||
|
if not was_active:
|
||||||
|
self._device.enable_notifications()
|
||||||
|
else:
|
||||||
battery = self.get(BATTERY_LEVEL)
|
battery = self.get(BATTERY_LEVEL)
|
||||||
self.clear()
|
self.clear()
|
||||||
if battery is not None:
|
if battery is not None:
|
||||||
|
|
|
@ -59,7 +59,8 @@ class ReceiverListener(_listener.EventsListener):
|
||||||
r.status = 'The device was unplugged.'
|
r.status = 'The device was unplugged.'
|
||||||
if r:
|
if r:
|
||||||
try:
|
try:
|
||||||
r.enable_notifications(False)
|
pass
|
||||||
|
# r.enable_notifications(False)
|
||||||
except:
|
except:
|
||||||
_log.exception("disabling notifications on receiver %s" % r)
|
_log.exception("disabling notifications on receiver %s" % r)
|
||||||
finally:
|
finally:
|
||||||
|
|
|
@ -7,7 +7,7 @@ from __future__ import absolute_import, division, print_function, unicode_litera
|
||||||
from gi.repository import Gtk, Gdk, GLib
|
from gi.repository import Gtk, Gdk, GLib
|
||||||
|
|
||||||
from solaar import NAME
|
from solaar import NAME
|
||||||
from logitech.unifying_receiver import status as _status
|
from logitech.unifying_receiver import status as _status, hidpp10 as _hidpp10
|
||||||
from . import config_panel as _config_panel
|
from . import config_panel as _config_panel
|
||||||
from . import action as _action, icons as _icons
|
from . import action as _action, icons as _icons
|
||||||
|
|
||||||
|
@ -67,10 +67,18 @@ def _make_receiver_box(receiver):
|
||||||
|
|
||||||
def _update_info_label(f):
|
def _update_info_label(f):
|
||||||
device = f._device
|
device = f._device
|
||||||
if f._info_label.get_visible() and '\n' not in f._info_label.get_text():
|
if True: # f._info_label.get_visible() and '\n' not in f._info_label.get_text():
|
||||||
items = [('Path', device.path), ('Serial', device.serial)] + \
|
items = [('Path', device.path), ('Serial', device.serial)] + \
|
||||||
[(fw.kind, fw.version) for fw in device.firmware]
|
[(fw.kind, fw.version) for fw in device.firmware]
|
||||||
f._info_label.set_markup('<small><tt>%s</tt></small>' % '\n'.join('%-13s: %s' % item for item in items))
|
|
||||||
|
notification_flags = _hidpp10.get_notification_flags(device)
|
||||||
|
if notification_flags:
|
||||||
|
notification_flags = _hidpp10.NOTIFICATION_FLAG.flag_names(notification_flags)
|
||||||
|
else:
|
||||||
|
notification_flags = ('(none)',)
|
||||||
|
items.append(('Notifications', ('\n%16s' % ' ').join(notification_flags)))
|
||||||
|
|
||||||
|
f._info_label.set_markup('<small><tt>%s</tt></small>' % '\n'.join('%-14s: %s' % item for item in items))
|
||||||
f._info_label.set_sensitive(True)
|
f._info_label.set_sensitive(True)
|
||||||
|
|
||||||
def _toggle_info_label(action, f):
|
def _toggle_info_label(action, f):
|
||||||
|
@ -170,11 +178,11 @@ def _make_device_box(index):
|
||||||
frame._info_label = info_label
|
frame._info_label = info_label
|
||||||
|
|
||||||
def _update_info_label(f):
|
def _update_info_label(f):
|
||||||
if frame._info_label.get_text().count('\n') < 4:
|
if True: # frame._info_label.get_text().count('\n') < 4:
|
||||||
device = f._device
|
device = f._device
|
||||||
assert device
|
assert device
|
||||||
|
|
||||||
items = [None, None, None, None, None, None, None, None]
|
items = [None, None, None, None, None, None, None, None, None]
|
||||||
hid = device.protocol
|
hid = device.protocol
|
||||||
items[0] = ('Protocol', 'HID++ %1.1f' % hid if hid else 'unknown')
|
items[0] = ('Protocol', 'HID++ %1.1f' % hid if hid else 'unknown')
|
||||||
items[1] = ('Polling rate', '%d ms' % device.polling_rate)
|
items[1] = ('Polling rate', '%d ms' % device.polling_rate)
|
||||||
|
@ -184,7 +192,17 @@ def _make_device_box(index):
|
||||||
if firmware:
|
if firmware:
|
||||||
items[4:] = [(fw.kind, (fw.name + ' ' + fw.version).strip()) for fw in firmware]
|
items[4:] = [(fw.kind, (fw.name + ' ' + fw.version).strip()) for fw in firmware]
|
||||||
|
|
||||||
frame._info_label.set_markup('<small><tt>%s</tt></small>' % '\n'.join('%-13s: %s' % i for i in items if i))
|
if device.status:
|
||||||
|
notification_flags = _hidpp10.get_notification_flags(device)
|
||||||
|
if notification_flags:
|
||||||
|
notification_flags = _hidpp10.NOTIFICATION_FLAG.flag_names(notification_flags)
|
||||||
|
else:
|
||||||
|
notification_flags = ('(none)',)
|
||||||
|
items[-1] = ('Notifications', ('\n%16s' % ' ').join(notification_flags))
|
||||||
|
else:
|
||||||
|
items[-1] = None
|
||||||
|
|
||||||
|
frame._info_label.set_markup('<small><tt>%s</tt></small>' % '\n'.join('%-14s: %s' % i for i in items if i))
|
||||||
frame._info_label.set_sensitive(True)
|
frame._info_label.set_sensitive(True)
|
||||||
|
|
||||||
def _toggle_info_label(action, f):
|
def _toggle_info_label(action, f):
|
||||||
|
|
Loading…
Reference in New Issue