more chair switching -- moved notifications handling into own .py
This commit is contained in:
parent
3275aa4c12
commit
a03cc9ce64
|
@ -8,20 +8,12 @@ from __future__ import absolute_import, division, print_function, unicode_litera
|
||||||
from time import time as _timestamp
|
from time import time as _timestamp
|
||||||
from random import getrandbits as _random_bits
|
from random import getrandbits as _random_bits
|
||||||
|
|
||||||
from struct import pack as _pack
|
|
||||||
try:
|
|
||||||
unicode
|
|
||||||
# if Python2, unicode_literals will mess our first (un)pack() argument
|
|
||||||
_pack_str = _pack
|
|
||||||
_pack = lambda x, *args: _pack_str(str(x), *args)
|
|
||||||
except:
|
|
||||||
pass
|
|
||||||
|
|
||||||
from logging import getLogger, DEBUG as _DEBUG
|
from logging import getLogger, DEBUG as _DEBUG
|
||||||
_log = getLogger('LUR.base')
|
_log = getLogger('LUR.base')
|
||||||
del getLogger
|
del getLogger
|
||||||
|
|
||||||
from .common import strhex as _strhex, KwException as _KwException
|
|
||||||
|
from .common import strhex as _strhex, KwException as _KwException, pack as _pack
|
||||||
from . import hidpp10 as _hidpp10
|
from . import hidpp10 as _hidpp10
|
||||||
from . import hidpp20 as _hidpp20
|
from . import hidpp20 as _hidpp20
|
||||||
import hidapi as _hid
|
import hidapi as _hid
|
||||||
|
@ -242,7 +234,8 @@ def _skip_incoming(handle, ihandle, notifications_hook):
|
||||||
report_id = ord(data[:1])
|
report_id = ord(data[:1])
|
||||||
assert (report_id == 0x10 and len(data) == _SHORT_MESSAGE_SIZE or
|
assert (report_id == 0x10 and len(data) == _SHORT_MESSAGE_SIZE or
|
||||||
report_id == 0x11 and len(data) == _LONG_MESSAGE_SIZE or
|
report_id == 0x11 and len(data) == _LONG_MESSAGE_SIZE or
|
||||||
report_id == 0x20 and len(data) == _MEDIUM_MESSAGE_SIZE)
|
report_id == 0x20 and len(data) == _MEDIUM_MESSAGE_SIZE), \
|
||||||
|
"unexpected message size: report_id %02X message %s" % (report_id, _strhex(data))
|
||||||
if notifications_hook:
|
if notifications_hook:
|
||||||
n = make_notification(ord(data[1:2]), data[2:])
|
n = make_notification(ord(data[1:2]), data[2:])
|
||||||
if n:
|
if n:
|
||||||
|
|
|
@ -5,14 +5,14 @@
|
||||||
from __future__ import absolute_import, division, print_function, unicode_literals
|
from __future__ import absolute_import, division, print_function, unicode_literals
|
||||||
|
|
||||||
from binascii import hexlify as _hexlify
|
from binascii import hexlify as _hexlify
|
||||||
from struct import pack as _pack, unpack as _unpack
|
from struct import pack, unpack
|
||||||
try:
|
try:
|
||||||
unicode
|
unicode
|
||||||
# if Python2, unicode_literals will mess our first (un)pack() argument
|
# if Python2, unicode_literals will mess our first (un)pack() argument
|
||||||
_pack_str = _pack
|
_pack_str = pack
|
||||||
_unpack_str = _unpack
|
_unpack_str = unpack
|
||||||
_pack = lambda x, *args: _pack_str(str(x), *args)
|
pack = lambda x, *args: _pack_str(str(x), *args)
|
||||||
_unpack = lambda x, *args: _unpack_str(str(x), *args)
|
unpack = lambda x, *args: _unpack_str(str(x), *args)
|
||||||
except:
|
except:
|
||||||
pass
|
pass
|
||||||
|
|
||||||
|
@ -57,7 +57,6 @@ class NamedInt(int):
|
||||||
if other is not None:
|
if other is not None:
|
||||||
raise TypeError("Unsupported type " + str(type(other)))
|
raise TypeError("Unsupported type " + str(type(other)))
|
||||||
|
|
||||||
|
|
||||||
def __ne__(self, other):
|
def __ne__(self, other):
|
||||||
return not self.__eq__(other)
|
return not self.__eq__(other)
|
||||||
|
|
||||||
|
@ -205,7 +204,7 @@ def bytes2int(x):
|
||||||
assert isinstance(x, bytes)
|
assert isinstance(x, bytes)
|
||||||
assert len(x) < 9
|
assert len(x) < 9
|
||||||
qx = (b'\x00' * 8) + x
|
qx = (b'\x00' * 8) + x
|
||||||
result, = _unpack('!Q', qx[-8:])
|
result, = unpack('!Q', qx[-8:])
|
||||||
# assert x == int2bytes(result, len(x))
|
# assert x == int2bytes(result, len(x))
|
||||||
return result
|
return result
|
||||||
|
|
||||||
|
@ -216,7 +215,7 @@ def int2bytes(x, count=None):
|
||||||
If 'count' is not given, the necessary number of bytes is computed.
|
If 'count' is not given, the necessary number of bytes is computed.
|
||||||
"""
|
"""
|
||||||
assert isinstance(x, int)
|
assert isinstance(x, int)
|
||||||
result = _pack('!Q', x)
|
result = pack('!Q', x)
|
||||||
assert isinstance(result, bytes)
|
assert isinstance(result, bytes)
|
||||||
# assert x == bytes2int(result)
|
# assert x == bytes2int(result)
|
||||||
|
|
||||||
|
|
|
@ -8,23 +8,15 @@ from logging import getLogger, DEBUG as _DEBUG
|
||||||
_log = getLogger('LUR.hidpp20')
|
_log = getLogger('LUR.hidpp20')
|
||||||
del getLogger
|
del getLogger
|
||||||
|
|
||||||
from struct import pack as _pack, unpack as _unpack
|
|
||||||
try:
|
|
||||||
unicode
|
|
||||||
# if Python2, unicode_literals will mess our first (un)pack() argument
|
|
||||||
_pack_str = _pack
|
|
||||||
_unpack_str = _unpack
|
|
||||||
_pack = lambda x, *args: _pack_str(str(x), *args)
|
|
||||||
_unpack = lambda x, *args: _unpack_str(str(x), *args)
|
|
||||||
except:
|
|
||||||
pass
|
|
||||||
|
|
||||||
# from weakref import proxy as _proxy
|
# from weakref import proxy as _proxy
|
||||||
|
|
||||||
from .common import (FirmwareInfo as _FirmwareInfo,
|
from .common import (FirmwareInfo as _FirmwareInfo,
|
||||||
ReprogrammableKeyInfo as _ReprogrammableKeyInfo,
|
ReprogrammableKeyInfo as _ReprogrammableKeyInfo,
|
||||||
KwException as _KwException,
|
KwException as _KwException,
|
||||||
NamedInts as _NamedInts)
|
NamedInts as _NamedInts,
|
||||||
|
pack as _pack,
|
||||||
|
unpack as _unpack)
|
||||||
from . import special_keys
|
from . import special_keys
|
||||||
|
|
||||||
#
|
#
|
||||||
|
|
|
@ -0,0 +1,249 @@
|
||||||
|
#
|
||||||
|
# Handles incoming events from the receiver/devices, updating the related
|
||||||
|
# status object as appropiate.
|
||||||
|
#
|
||||||
|
|
||||||
|
from __future__ import absolute_import, division, print_function, unicode_literals
|
||||||
|
|
||||||
|
from logging import getLogger, DEBUG as _DEBUG
|
||||||
|
_log = getLogger('LUR.notifications')
|
||||||
|
del getLogger
|
||||||
|
|
||||||
|
from .common import strhex as _strhex, unpack as _unpack
|
||||||
|
from . import hidpp10 as _hidpp10
|
||||||
|
from . import hidpp20 as _hidpp20
|
||||||
|
from .status import KEYS as _K, ALERT as _ALERT
|
||||||
|
|
||||||
|
_R = _hidpp10.REGISTERS
|
||||||
|
_F = _hidpp20.FEATURE
|
||||||
|
|
||||||
|
#
|
||||||
|
#
|
||||||
|
#
|
||||||
|
|
||||||
|
def process(device, notification):
|
||||||
|
assert device
|
||||||
|
assert notification
|
||||||
|
|
||||||
|
assert hasattr(device, 'status')
|
||||||
|
status = device.status
|
||||||
|
assert status is not None
|
||||||
|
|
||||||
|
if device.kind is None:
|
||||||
|
return _process_receiver_notification(device, status, notification)
|
||||||
|
|
||||||
|
return _process_device_notification(device, status, notification)
|
||||||
|
|
||||||
|
#
|
||||||
|
#
|
||||||
|
#
|
||||||
|
|
||||||
|
def _process_receiver_notification(receiver, status, n):
|
||||||
|
# supposedly only 0x4x notifications arrive for the receiver
|
||||||
|
assert n.sub_id & 0x40 == 0x40
|
||||||
|
|
||||||
|
# pairing lock notification
|
||||||
|
if n.sub_id == 0x4A:
|
||||||
|
status.lock_open = bool(n.address & 0x01)
|
||||||
|
reason = 'pairing lock is ' + ('open' if status.lock_open else 'closed')
|
||||||
|
_log.info("%s: %s", receiver, reason)
|
||||||
|
|
||||||
|
status[_K.ERROR] = None
|
||||||
|
if status.lock_open:
|
||||||
|
status.new_device = None
|
||||||
|
|
||||||
|
pair_error = ord(n.data[:1])
|
||||||
|
if pair_error:
|
||||||
|
status[_K.ERROR] = error_string = _hidpp10.PAIRING_ERRORS[pair_error]
|
||||||
|
status.new_device = None
|
||||||
|
_log.warn("pairing error %d: %s", pair_error, error_string)
|
||||||
|
|
||||||
|
status.changed(reason=reason)
|
||||||
|
return True
|
||||||
|
|
||||||
|
_log.warn("%s: unhandled notification %s", receiver, n)
|
||||||
|
|
||||||
|
#
|
||||||
|
#
|
||||||
|
#
|
||||||
|
|
||||||
|
def _process_device_notification(device, status, n):
|
||||||
|
# incoming packets with SubId >= 0x80 are supposedly replies from
|
||||||
|
# HID++ 1.0 requests, should never get here
|
||||||
|
assert n.sub_id & 0x80 == 0
|
||||||
|
|
||||||
|
# 0x40 to 0x7F appear to be HID++ 1.0 notifications
|
||||||
|
if n.sub_id >= 0x40:
|
||||||
|
return _process_hidpp10_notification(device, status, n)
|
||||||
|
|
||||||
|
# At this point, we need to know the device's protocol, otherwise it's
|
||||||
|
# possible to not know how to handle it.
|
||||||
|
assert device.protocol is not None
|
||||||
|
|
||||||
|
# some custom battery events for HID++ 1.0 devices
|
||||||
|
if device.protocol < 2.0:
|
||||||
|
return _process_hidpp10_custom_notification(device, status, n)
|
||||||
|
|
||||||
|
# assuming 0x00 to 0x3F are feature (HID++ 2.0) notifications
|
||||||
|
assert device.features
|
||||||
|
try:
|
||||||
|
feature = device.features[n.sub_id]
|
||||||
|
except IndexError:
|
||||||
|
_log.warn("%s: notification from invalid feature index %02X: %s", device, n.sub_id, n)
|
||||||
|
return False
|
||||||
|
|
||||||
|
return _process_feature_notification(device, status, n, feature)
|
||||||
|
|
||||||
|
|
||||||
|
def _process_hidpp10_custom_notification(device, status, n):
|
||||||
|
if _log.isEnabledFor(_DEBUG):
|
||||||
|
_log.debug("%s (%s) custom notification %s", device, device.protocol, n)
|
||||||
|
|
||||||
|
if n.sub_id in (_R.battery_status, _R.battery_charge):
|
||||||
|
assert n.data[-1:] == b'\x00'
|
||||||
|
# message layout: 10 ix <register> <xx> <yy> <zz> <00>
|
||||||
|
data = '%c%s' % (n.address, n.data)
|
||||||
|
charge, status_text = _hidpp10.parse_battery_status(n.sub_id, data)
|
||||||
|
status.set_battery_info(charge, status_text)
|
||||||
|
return True
|
||||||
|
|
||||||
|
if n.sub_id == _R.illumination:
|
||||||
|
# message layout: 10 ix 17("address") <??> <?> <??> <light level 1=off..5=max>
|
||||||
|
# TODO anything we can do with this?
|
||||||
|
_log.info("illumination event: %s", n)
|
||||||
|
return True
|
||||||
|
|
||||||
|
_log.warn("%s: unrecognized %s", device, n)
|
||||||
|
|
||||||
|
|
||||||
|
def _process_hidpp10_notification(device, status, n):
|
||||||
|
# unpair notification
|
||||||
|
if n.sub_id == 0x40:
|
||||||
|
if n.address == 0x02:
|
||||||
|
# device un-paired
|
||||||
|
status.clear()
|
||||||
|
device.wpid = None
|
||||||
|
device.status = None
|
||||||
|
status.changed(active=False, alert=_ALERT.ALL, reason='unpaired')
|
||||||
|
else:
|
||||||
|
_log.warn("%s: disconnection with unknown type %02X: %s", device, n.address, n)
|
||||||
|
return True
|
||||||
|
|
||||||
|
# wireless link notification
|
||||||
|
if n.sub_id == 0x41:
|
||||||
|
protocol_name = ('unifying (eQuad DJ)' if n.address == 0x04
|
||||||
|
else 'eQuad' if n.address == 0x03
|
||||||
|
else None)
|
||||||
|
if protocol_name:
|
||||||
|
if _log.isEnabledFor(_DEBUG):
|
||||||
|
wpid = _strhex(n.data[2:3] + n.data[1:2])
|
||||||
|
assert wpid == device.wpid, "%s wpid mismatch, got %s" % (device, wpid)
|
||||||
|
|
||||||
|
flags = ord(n.data[:1]) & 0xF0
|
||||||
|
link_encrypyed = bool(flags & 0x20)
|
||||||
|
link_established = not (flags & 0x40)
|
||||||
|
if _log.isEnabledFor(_DEBUG):
|
||||||
|
sw_present = bool(flags & 0x10)
|
||||||
|
has_payload = bool(flags & 0x80)
|
||||||
|
_log.debug("%s: %s connection notification: software=%s, encrypted=%s, link=%s, payload=%s",
|
||||||
|
device, protocol_name, sw_present, link_encrypyed, link_established, has_payload)
|
||||||
|
status[_K.LINK_ENCRYPTED] = link_encrypyed
|
||||||
|
status.changed(active=link_established)
|
||||||
|
else:
|
||||||
|
_log.warn("%s: connection notification with unknown protocol %02X: %s", device.number, n.address, n)
|
||||||
|
|
||||||
|
return True
|
||||||
|
|
||||||
|
if n.sub_id == 0x49:
|
||||||
|
# raw input event? just ignore it
|
||||||
|
# if n.address == 0x01, no idea what it is, but they keep on coming
|
||||||
|
# if n.address == 0x03, appears to be an actual input event,
|
||||||
|
# because they only come when input happents
|
||||||
|
return True
|
||||||
|
|
||||||
|
# power notification
|
||||||
|
if n.sub_id == 0x4B:
|
||||||
|
if n.address == 0x01:
|
||||||
|
if _log.isEnabledFor(_DEBUG):
|
||||||
|
_log.debug("%s: device powered on", device)
|
||||||
|
reason = str(status) or 'powered on'
|
||||||
|
status.changed(active=True, alert=_ALERT.NOTIFICATION, reason=reason)
|
||||||
|
else:
|
||||||
|
_log.info("%s: unknown %s", device, n)
|
||||||
|
return True
|
||||||
|
|
||||||
|
_log.warn("%s: unrecognized %s", device, n)
|
||||||
|
|
||||||
|
|
||||||
|
def _process_feature_notification(device, status, n, feature):
|
||||||
|
if feature == _F.BATTERY_STATUS:
|
||||||
|
if n.address == 0x00:
|
||||||
|
discharge = ord(n.data[:1])
|
||||||
|
battery_status = ord(n.data[1:2])
|
||||||
|
status.set_battery_info(discharge, _hidpp20.BATTERY_STATUS[battery_status])
|
||||||
|
else:
|
||||||
|
_log.info("%s: unknown BATTERY %s", device, n)
|
||||||
|
return True
|
||||||
|
|
||||||
|
# TODO: what are REPROG_CONTROLS_V{2,3}?
|
||||||
|
if feature == _F.REPROG_CONTROLS:
|
||||||
|
if n.address == 0x00:
|
||||||
|
_log.info("%s: reprogrammable key: %s", device, n)
|
||||||
|
else:
|
||||||
|
_log.info("%s: unknown REPROGRAMMABLE KEYS %s", device, n)
|
||||||
|
return True
|
||||||
|
|
||||||
|
if feature == _F.WIRELESS_DEVICE_STATUS:
|
||||||
|
if n.address == 0x00:
|
||||||
|
if _log.isEnabledFor(_DEBUG):
|
||||||
|
_log.debug("wireless status: %s", n)
|
||||||
|
if n.data[0:3] == b'\x01\x01\x01':
|
||||||
|
status.changed(active=True, alert=_ALERT.NOTIFICATION, reason='powered on')
|
||||||
|
else:
|
||||||
|
_log.info("%s: unknown WIRELESS %s", device, n)
|
||||||
|
else:
|
||||||
|
_log.info("%s: unknown WIRELESS %s", device, n)
|
||||||
|
return True
|
||||||
|
|
||||||
|
if feature == _F.SOLAR_DASHBOARD:
|
||||||
|
if n.data[5:9] == b'GOOD':
|
||||||
|
charge, lux, adc = _unpack('!BHH', n.data[:5])
|
||||||
|
status[_K.BATTERY_LEVEL] = charge
|
||||||
|
# guesstimate the battery voltage, emphasis on 'guess'
|
||||||
|
status[_K.BATTERY_STATUS] = '%1.2fV' % (adc * 2.67793237653 / 0x0672)
|
||||||
|
if n.address == 0x00:
|
||||||
|
status[_K.LIGHT_LEVEL] = None
|
||||||
|
status[_K.BATTERY_CHARGING] = None
|
||||||
|
status.changed(active=True)
|
||||||
|
elif n.address == 0x10:
|
||||||
|
status[_K.LIGHT_LEVEL] = lux
|
||||||
|
status[_K.BATTERY_CHARGING] = lux > 200
|
||||||
|
status.changed(active=True)
|
||||||
|
elif n.address == 0x20:
|
||||||
|
_log.debug("%s: Light Check button pressed", device)
|
||||||
|
status.changed(alert=_ALERT.SHOW_WINDOW)
|
||||||
|
# first cancel any reporting
|
||||||
|
# device.feature_request(_F.SOLAR_DASHBOARD)
|
||||||
|
# trigger a new report chain
|
||||||
|
reports_count = 15
|
||||||
|
reports_period = 2 # seconds
|
||||||
|
device.feature_request(_F.SOLAR_DASHBOARD, 0x00, reports_count, reports_period)
|
||||||
|
else:
|
||||||
|
_log.info("%s: unknown SOLAR CHAGE %s", device, n)
|
||||||
|
else:
|
||||||
|
_log.warn("%s: SOLAR CHARGE not GOOD? %s", device, n)
|
||||||
|
return True
|
||||||
|
|
||||||
|
if feature == _F.TOUCHMOUSE_RAW_POINTS:
|
||||||
|
if n.address == 0x00:
|
||||||
|
_log.info("%s: TOUCH MOUSE points %s", device, n)
|
||||||
|
elif n.address == 0x10:
|
||||||
|
touch = ord(n.data[:1])
|
||||||
|
button_down = bool(touch & 0x02)
|
||||||
|
mouse_lifted = bool(touch & 0x01)
|
||||||
|
_log.info("%s: TOUCH MOUSE status: button_down=%s mouse_lifted=%s", device, button_down, mouse_lifted)
|
||||||
|
else:
|
||||||
|
_log.warn("%s: unknown TOUCH MOUSE %s", device, n)
|
||||||
|
return True
|
||||||
|
|
||||||
|
_log.info("%s: unrecognized %s for feature %s (index %02X)", device, n, feature, n.sub_id)
|
|
@ -7,20 +7,12 @@ from __future__ import absolute_import, division, print_function, unicode_litera
|
||||||
from time import time as _timestamp
|
from time import time as _timestamp
|
||||||
# from weakref import proxy as _proxy
|
# from weakref import proxy as _proxy
|
||||||
|
|
||||||
from struct import unpack as _unpack
|
|
||||||
try:
|
|
||||||
unicode
|
|
||||||
# if Python2, unicode_literals will mess our first (un)pack() argument
|
|
||||||
_unpack_str = _unpack
|
|
||||||
_unpack = lambda x, *args: _unpack_str(str(x), *args)
|
|
||||||
except:
|
|
||||||
pass
|
|
||||||
|
|
||||||
from logging import getLogger, DEBUG as _DEBUG
|
from logging import getLogger, DEBUG as _DEBUG
|
||||||
_log = getLogger('LUR.status')
|
_log = getLogger('LUR.status')
|
||||||
del getLogger
|
del getLogger
|
||||||
|
|
||||||
from .common import NamedInts as _NamedInts, NamedInt as _NamedInt, strhex as _strhex
|
|
||||||
|
from .common import NamedInts as _NamedInts, NamedInt as _NamedInt
|
||||||
from . import hidpp10 as _hidpp10
|
from . import hidpp10 as _hidpp10
|
||||||
from . import hidpp20 as _hidpp20
|
from . import hidpp20 as _hidpp20
|
||||||
|
|
||||||
|
@ -48,7 +40,20 @@ _BATTERY_ATTENTION_LEVEL = 5
|
||||||
|
|
||||||
# If no updates have been receiver from the device for a while, ping the device
|
# If no updates have been receiver from the device for a while, ping the device
|
||||||
# and update it status accordinly.
|
# and update it status accordinly.
|
||||||
_STATUS_TIMEOUT = 5 * 60 # seconds
|
# _STATUS_TIMEOUT = 5 * 60 # seconds
|
||||||
|
|
||||||
|
#
|
||||||
|
#
|
||||||
|
#
|
||||||
|
|
||||||
|
def attach_to(device, changed_callback):
|
||||||
|
assert device
|
||||||
|
assert changed_callback
|
||||||
|
|
||||||
|
if device.kind is None:
|
||||||
|
device.status = ReceiverStatus(device, changed_callback)
|
||||||
|
else:
|
||||||
|
device.status = DeviceStatus(device, changed_callback)
|
||||||
|
|
||||||
#
|
#
|
||||||
#
|
#
|
||||||
|
@ -71,7 +76,6 @@ class ReceiverStatus(dict):
|
||||||
self.new_device = None
|
self.new_device = None
|
||||||
|
|
||||||
self[KEYS.ERROR] = None
|
self[KEYS.ERROR] = None
|
||||||
# self[KEYS.NOTIFICATION_FLAGS] = receiver.enable_notifications()
|
|
||||||
|
|
||||||
def __str__(self):
|
def __str__(self):
|
||||||
count = len(self._receiver)
|
count = len(self._receiver)
|
||||||
|
@ -80,7 +84,7 @@ class ReceiverStatus(dict):
|
||||||
'%d paired devices.' % count)
|
'%d paired devices.' % count)
|
||||||
__unicode__ = __str__
|
__unicode__ = __str__
|
||||||
|
|
||||||
def _changed(self, alert=ALERT.NOTIFICATION, reason=None):
|
def changed(self, alert=ALERT.NOTIFICATION, reason=None):
|
||||||
# self.updated = _timestamp()
|
# self.updated = _timestamp()
|
||||||
self._changed_callback(self._receiver, alert=alert, reason=reason)
|
self._changed_callback(self._receiver, alert=alert, reason=reason)
|
||||||
|
|
||||||
|
@ -97,25 +101,6 @@ class ReceiverStatus(dict):
|
||||||
# # get an update of the notification flags
|
# # get an update of the notification flags
|
||||||
# # self[KEYS.NOTIFICATION_FLAGS] = _hidpp10.get_notification_flags(r)
|
# # self[KEYS.NOTIFICATION_FLAGS] = _hidpp10.get_notification_flags(r)
|
||||||
|
|
||||||
def process_notification(self, n):
|
|
||||||
if n.sub_id == 0x4A:
|
|
||||||
self.lock_open = bool(n.address & 0x01)
|
|
||||||
reason = 'pairing lock is ' + ('open' if self.lock_open else 'closed')
|
|
||||||
_log.info("%s: %s", self._receiver, reason)
|
|
||||||
|
|
||||||
self[KEYS.ERROR] = None
|
|
||||||
if self.lock_open:
|
|
||||||
self.new_device = None
|
|
||||||
|
|
||||||
pair_error = ord(n.data[:1])
|
|
||||||
if pair_error:
|
|
||||||
self[KEYS.ERROR] = error_string = _hidpp10.PAIRING_ERRORS[pair_error]
|
|
||||||
self.new_device = None
|
|
||||||
_log.warn("pairing error %d: %s", pair_error, error_string)
|
|
||||||
|
|
||||||
self._changed(reason=reason)
|
|
||||||
return True
|
|
||||||
|
|
||||||
#
|
#
|
||||||
#
|
#
|
||||||
#
|
#
|
||||||
|
@ -203,7 +188,7 @@ class DeviceStatus(dict):
|
||||||
if changed or reason:
|
if changed or reason:
|
||||||
# update the leds on the device, if any
|
# update the leds on the device, if any
|
||||||
_hidpp10.set_3leds(self._device, level, charging=charging, warning=bool(alert))
|
_hidpp10.set_3leds(self._device, level, charging=charging, warning=bool(alert))
|
||||||
self._changed(alert=alert, reason=reason, timestamp=timestamp)
|
self.changed(alert=alert, reason=reason, timestamp=timestamp)
|
||||||
|
|
||||||
def read_battery(self, timestamp=None):
|
def read_battery(self, timestamp=None):
|
||||||
if self._active:
|
if self._active:
|
||||||
|
@ -229,9 +214,9 @@ class DeviceStatus(dict):
|
||||||
elif KEYS.BATTERY_STATUS in self:
|
elif KEYS.BATTERY_STATUS in self:
|
||||||
self[KEYS.BATTERY_STATUS] = None
|
self[KEYS.BATTERY_STATUS] = None
|
||||||
self[KEYS.BATTERY_CHARGING] = None
|
self[KEYS.BATTERY_CHARGING] = None
|
||||||
self._changed()
|
self.changed()
|
||||||
|
|
||||||
def _changed(self, active=None, alert=ALERT.NONE, reason=None, timestamp=None):
|
def changed(self, active=None, alert=ALERT.NONE, reason=None, timestamp=None):
|
||||||
assert self._changed_callback
|
assert self._changed_callback
|
||||||
d = self._device
|
d = self._device
|
||||||
# assert d # may be invalid when processing the 'unpaired' notification
|
# assert d # may be invalid when processing the 'unpaired' notification
|
||||||
|
@ -296,7 +281,7 @@ class DeviceStatus(dict):
|
||||||
# if d.ping():
|
# if d.ping():
|
||||||
# timestamp = self.updated = _timestamp()
|
# timestamp = self.updated = _timestamp()
|
||||||
# else:
|
# else:
|
||||||
# self._changed(active=False, reason='out of range')
|
# self.changed(active=False, reason='out of range')
|
||||||
#
|
#
|
||||||
# # if still active, make sure we know the battery level
|
# # if still active, make sure we know the battery level
|
||||||
# if KEYS.BATTERY_LEVEL not in self:
|
# if KEYS.BATTERY_LEVEL not in self:
|
||||||
|
@ -304,182 +289,6 @@ class DeviceStatus(dict):
|
||||||
#
|
#
|
||||||
# elif timestamp - self.updated > _STATUS_TIMEOUT:
|
# elif timestamp - self.updated > _STATUS_TIMEOUT:
|
||||||
# if d.ping():
|
# if d.ping():
|
||||||
# self._changed(active=True)
|
# self.changed(active=True)
|
||||||
# else:
|
# else:
|
||||||
# self.updated = _timestamp()
|
# self.updated = _timestamp()
|
||||||
|
|
||||||
def process_notification(self, n):
|
|
||||||
# incoming packets with SubId >= 0x80 are supposedly replies from
|
|
||||||
# HID++ 1.0 requests, should never get here
|
|
||||||
assert n.sub_id < 0x80
|
|
||||||
|
|
||||||
# 0x40 to 0x7F appear to be HID++ 1.0 notifications
|
|
||||||
if n.sub_id >= 0x40:
|
|
||||||
return self._process_hidpp10_notification(n)
|
|
||||||
|
|
||||||
# some custom battery events for HID++ 1.0 devices
|
|
||||||
if self._device.protocol < 2.0:
|
|
||||||
# README assuming HID++ 2.0 devices don't use the 0x07/0x0D registers
|
|
||||||
# however, this has not been fully verified yet
|
|
||||||
if n.sub_id in (_R.battery_charge, _R.battery_status) and len(n.data) == 3 and n.data[2:3] == b'\x00':
|
|
||||||
return self._process_hidpp10_custom_notification(n)
|
|
||||||
if n.sub_id == _R.illumination and len(n.data) == 3:
|
|
||||||
return self._process_hidpp10_custom_notification(n)
|
|
||||||
else:
|
|
||||||
# assuming 0x00 to 0x3F are feature (HID++ 2.0) notifications
|
|
||||||
try:
|
|
||||||
feature = self._device.features[n.sub_id]
|
|
||||||
except IndexError:
|
|
||||||
_log.warn("%s: notification from invalid feature index %02X: %s", self._device, n.sub_id, n)
|
|
||||||
return False
|
|
||||||
|
|
||||||
return self._process_feature_notification(n, feature)
|
|
||||||
|
|
||||||
def _process_hidpp10_custom_notification(self, n):
|
|
||||||
if _log.isEnabledFor(_DEBUG):
|
|
||||||
_log.debug("%s (%s) custom battery notification %s", self._device, self._device.protocol, n)
|
|
||||||
|
|
||||||
if n.sub_id in (_R.battery_status, _R.battery_charge):
|
|
||||||
data = '%c%s' % (n.address, n.data)
|
|
||||||
level, status = _hidpp10.parse_battery_status(n.sub_id, data)
|
|
||||||
self.set_battery_info(level, status)
|
|
||||||
return True
|
|
||||||
|
|
||||||
if n.sub_id == _R.illumination:
|
|
||||||
# message layout: 10 ix 17("address") <??> <?> <??> <light level 1=off..5=max>
|
|
||||||
# TODO anything we can do with this?
|
|
||||||
_log.info("illumination event: %s", n)
|
|
||||||
return True
|
|
||||||
|
|
||||||
_log.warn("%s: unrecognized %s", self._device, n)
|
|
||||||
|
|
||||||
def _process_hidpp10_notification(self, n):
|
|
||||||
# unpair notification
|
|
||||||
if n.sub_id == 0x40:
|
|
||||||
if n.address == 0x02:
|
|
||||||
# device un-paired
|
|
||||||
self.clear()
|
|
||||||
dev = self._device
|
|
||||||
dev.wpid = None
|
|
||||||
dev.status = None
|
|
||||||
self._changed(active=False, alert=ALERT.ALL, reason='unpaired')
|
|
||||||
else:
|
|
||||||
_log.warn("%s: disconnection with unknown type %02X: %s", self._device, n.address, n)
|
|
||||||
return True
|
|
||||||
|
|
||||||
# wireless link notification
|
|
||||||
if n.sub_id == 0x41:
|
|
||||||
protocol_name = ('unifying (eQuad DJ)' if n.address == 0x04
|
|
||||||
else 'eQuad' if n.address == 0x03
|
|
||||||
else None)
|
|
||||||
if protocol_name:
|
|
||||||
if _log.isEnabledFor(_DEBUG):
|
|
||||||
wpid = _strhex(n.data[2:3] + n.data[1:2])
|
|
||||||
assert wpid == self._device.wpid, "%s wpid mismatch, got %s" % (self._device, wpid)
|
|
||||||
|
|
||||||
flags = ord(n.data[:1]) & 0xF0
|
|
||||||
link_encrypyed = bool(flags & 0x20)
|
|
||||||
link_established = not (flags & 0x40)
|
|
||||||
if _log.isEnabledFor(_DEBUG):
|
|
||||||
sw_present = bool(flags & 0x10)
|
|
||||||
has_payload = bool(flags & 0x80)
|
|
||||||
_log.debug("%s: %s connection notification: software=%s, encrypted=%s, link=%s, payload=%s",
|
|
||||||
self._device, protocol_name, sw_present, link_encrypyed, link_established, has_payload)
|
|
||||||
self[KEYS.LINK_ENCRYPTED] = link_encrypyed
|
|
||||||
self._changed(active=link_established)
|
|
||||||
else:
|
|
||||||
_log.warn("%s: connection notification with unknown protocol %02X: %s", self._device.number, n.address, n)
|
|
||||||
|
|
||||||
return True
|
|
||||||
|
|
||||||
if n.sub_id == 0x49:
|
|
||||||
# raw input event? just ignore it
|
|
||||||
# if n.address == 0x01, no idea what it is, but they keep on coming
|
|
||||||
# if n.address == 0x03, it's an actual input event
|
|
||||||
return True
|
|
||||||
|
|
||||||
# power notification
|
|
||||||
if n.sub_id == 0x4B:
|
|
||||||
if n.address == 0x01:
|
|
||||||
if _log.isEnabledFor(_DEBUG):
|
|
||||||
_log.debug("%s: device powered on", self._device)
|
|
||||||
reason = str(self) or 'powered on'
|
|
||||||
self._changed(active=True, alert=ALERT.NOTIFICATION, reason=reason)
|
|
||||||
else:
|
|
||||||
_log.info("%s: unknown %s", self._device, n)
|
|
||||||
return True
|
|
||||||
|
|
||||||
_log.warn("%s: unrecognized %s", self._device, n)
|
|
||||||
|
|
||||||
def _process_feature_notification(self, n, feature):
|
|
||||||
if feature == _hidpp20.FEATURE.BATTERY_STATUS:
|
|
||||||
if n.address == 0x00:
|
|
||||||
discharge = ord(n.data[:1])
|
|
||||||
battery_status = ord(n.data[1:2])
|
|
||||||
self.set_battery_info(discharge, _hidpp20.BATTERY_STATUS[battery_status])
|
|
||||||
else:
|
|
||||||
_log.info("%s: unknown BATTERY %s", self._device, n)
|
|
||||||
return True
|
|
||||||
|
|
||||||
# TODO: what are REPROG_CONTROLS_V{2,3}?
|
|
||||||
if feature == _hidpp20.FEATURE.REPROG_CONTROLS:
|
|
||||||
if n.address == 0x00:
|
|
||||||
_log.info("%s: reprogrammable key: %s", self._device, n)
|
|
||||||
else:
|
|
||||||
_log.info("%s: unknown REPROGRAMMABLE KEYS %s", self._device, n)
|
|
||||||
return True
|
|
||||||
|
|
||||||
if feature == _hidpp20.FEATURE.WIRELESS_DEVICE_STATUS:
|
|
||||||
if n.address == 0x00:
|
|
||||||
if _log.isEnabledFor(_DEBUG):
|
|
||||||
_log.debug("wireless status: %s", n)
|
|
||||||
if n.data[0:3] == b'\x01\x01\x01':
|
|
||||||
self._changed(active=True, alert=ALERT.NOTIFICATION, reason='powered on')
|
|
||||||
else:
|
|
||||||
_log.info("%s: unknown WIRELESS %s", self._device, n)
|
|
||||||
else:
|
|
||||||
_log.info("%s: unknown WIRELESS %s", self._device, n)
|
|
||||||
return True
|
|
||||||
|
|
||||||
if feature == _hidpp20.FEATURE.SOLAR_DASHBOARD:
|
|
||||||
if n.data[5:9] == b'GOOD':
|
|
||||||
charge, lux, adc = _unpack('!BHH', n.data[:5])
|
|
||||||
self[KEYS.BATTERY_LEVEL] = charge
|
|
||||||
# guesstimate the battery voltage, emphasis on 'guess'
|
|
||||||
self[KEYS.BATTERY_STATUS] = '%1.2fV' % (adc * 2.67793237653 / 0x0672)
|
|
||||||
if n.address == 0x00:
|
|
||||||
self[KEYS.LIGHT_LEVEL] = None
|
|
||||||
self[KEYS.BATTERY_CHARGING] = None
|
|
||||||
self._changed(active=True)
|
|
||||||
elif n.address == 0x10:
|
|
||||||
self[KEYS.LIGHT_LEVEL] = lux
|
|
||||||
self[KEYS.BATTERY_CHARGING] = lux > 200
|
|
||||||
self._changed(active=True)
|
|
||||||
elif n.address == 0x20:
|
|
||||||
_log.debug("%s: Light Check button pressed", self._device)
|
|
||||||
self._changed(alert=ALERT.SHOW_WINDOW)
|
|
||||||
# first cancel any reporting
|
|
||||||
# self._device.feature_request(_hidpp20.FEATURE.SOLAR_DASHBOARD)
|
|
||||||
# trigger a new report chain
|
|
||||||
reports_count = 15
|
|
||||||
reports_period = 2 # seconds
|
|
||||||
self._device.feature_request(_hidpp20.FEATURE.SOLAR_DASHBOARD, 0x00, reports_count, reports_period)
|
|
||||||
else:
|
|
||||||
_log.info("%s: unknown SOLAR CHAGE %s", self._device, n)
|
|
||||||
else:
|
|
||||||
_log.warn("%s: SOLAR CHARGE not GOOD? %s", self._device, n)
|
|
||||||
return True
|
|
||||||
|
|
||||||
if feature == _hidpp20.FEATURE.TOUCHMOUSE_RAW_POINTS:
|
|
||||||
if n.address == 0x00:
|
|
||||||
_log.info("%s: TOUCH MOUSE points %s", self._device, n)
|
|
||||||
elif n.address == 0x10:
|
|
||||||
touch = ord(n.data[:1])
|
|
||||||
button_down = bool(touch & 0x02)
|
|
||||||
mouse_lifted = bool(touch & 0x01)
|
|
||||||
_log.info("%s: TOUCH MOUSE status: button_down=%s mouse_lifted=%s", self._device, button_down, mouse_lifted)
|
|
||||||
else:
|
|
||||||
_log.info("%s: unknown TOUCH MOUSE %s", self._device, n)
|
|
||||||
return True
|
|
||||||
|
|
||||||
_log.info("%s: unrecognized %s for feature %s (index %02X)", self._device, n, feature, n.sub_id)
|
|
||||||
|
|
|
@ -4,14 +4,15 @@
|
||||||
|
|
||||||
from __future__ import absolute_import, division, print_function, unicode_literals
|
from __future__ import absolute_import, division, print_function, unicode_literals
|
||||||
|
|
||||||
from logging import getLogger, DEBUG as _DEBUG
|
from logging import getLogger, INFO as _INFO
|
||||||
_log = getLogger(__name__)
|
_log = getLogger(__name__)
|
||||||
del getLogger
|
del getLogger
|
||||||
|
|
||||||
from . import configuration
|
from . import configuration
|
||||||
from logitech.unifying_receiver import (Receiver,
|
from logitech.unifying_receiver import (Receiver,
|
||||||
listener as _listener,
|
listener as _listener,
|
||||||
status as _status)
|
status as _status,
|
||||||
|
notifications as _notifications)
|
||||||
|
|
||||||
#
|
#
|
||||||
#
|
#
|
||||||
|
@ -52,10 +53,11 @@ class ReceiverListener(_listener.EventsListener):
|
||||||
|
|
||||||
assert status_changed_callback
|
assert status_changed_callback
|
||||||
self.status_changed_callback = status_changed_callback
|
self.status_changed_callback = status_changed_callback
|
||||||
receiver.status = _status.ReceiverStatus(receiver, self._status_changed)
|
_status.attach_to(receiver, self._status_changed)
|
||||||
|
|
||||||
def has_started(self):
|
def has_started(self):
|
||||||
_log.info("%s: notifications listener has started (%s)", self.receiver, self.receiver.handle)
|
if _log.isEnabledFor(_INFO):
|
||||||
|
_log.info("%s: notifications listener has started (%s)", self.receiver, self.receiver.handle)
|
||||||
notification_flags = self.receiver.enable_notifications()
|
notification_flags = self.receiver.enable_notifications()
|
||||||
self.receiver.status[_status.KEYS.NOTIFICATION_FLAGS] = notification_flags
|
self.receiver.status[_status.KEYS.NOTIFICATION_FLAGS] = notification_flags
|
||||||
self.receiver.notify_devices()
|
self.receiver.notify_devices()
|
||||||
|
@ -64,7 +66,8 @@ class ReceiverListener(_listener.EventsListener):
|
||||||
def has_stopped(self):
|
def has_stopped(self):
|
||||||
r, self.receiver = self.receiver, None
|
r, self.receiver = self.receiver, None
|
||||||
assert r is not None
|
assert r is not None
|
||||||
_log.info("%s: notifications listener has stopped", r)
|
if _log.isEnabledFor(_INFO):
|
||||||
|
_log.info("%s: notifications listener has stopped", r)
|
||||||
|
|
||||||
# because udev is not notifying us about device removal,
|
# because udev is not notifying us about device removal,
|
||||||
# make sure to clean up in _all_listeners
|
# make sure to clean up in _all_listeners
|
||||||
|
@ -117,15 +120,16 @@ class ReceiverListener(_listener.EventsListener):
|
||||||
|
|
||||||
def _status_changed(self, device, alert=_status.ALERT.NONE, reason=None):
|
def _status_changed(self, device, alert=_status.ALERT.NONE, reason=None):
|
||||||
assert device is not None
|
assert device is not None
|
||||||
if device.kind is None:
|
if _log.isEnabledFor(_INFO):
|
||||||
_log.info("status_changed %s: %s, %s (%X) %s", device,
|
if device.kind is None:
|
||||||
'present' if bool(device) else 'removed',
|
_log.info("status_changed %s: %s, %s (%X) %s", device,
|
||||||
device.status, alert, reason or '')
|
'present' if bool(device) else 'removed',
|
||||||
else:
|
device.status, alert, reason or '')
|
||||||
_log.info("status_changed %s: %s %s, %s (%X) %s", device,
|
else:
|
||||||
'paired' if bool(device) else 'unpaired',
|
_log.info("status_changed %s: %s %s, %s (%X) %s", device,
|
||||||
'online' if device.online else 'offline',
|
'paired' if bool(device) else 'unpaired',
|
||||||
device.status, alert, reason or '')
|
'online' if device.online else 'offline',
|
||||||
|
device.status, alert, reason or '')
|
||||||
|
|
||||||
if device.kind is None:
|
if device.kind is None:
|
||||||
assert device == self.receiver
|
assert device == self.receiver
|
||||||
|
@ -152,8 +156,7 @@ class ReceiverListener(_listener.EventsListener):
|
||||||
# _log.debug("%s: handling %s", self.receiver, n)
|
# _log.debug("%s: handling %s", self.receiver, n)
|
||||||
if n.devnumber == 0xFF:
|
if n.devnumber == 0xFF:
|
||||||
# a receiver notification
|
# a receiver notification
|
||||||
if self.receiver.status is not None:
|
_notifications.process(self.receiver, n)
|
||||||
self.receiver.status.process_notification(n)
|
|
||||||
return
|
return
|
||||||
|
|
||||||
# a device notification
|
# a device notification
|
||||||
|
@ -169,21 +172,23 @@ class ReceiverListener(_listener.EventsListener):
|
||||||
return
|
return
|
||||||
|
|
||||||
if not already_known:
|
if not already_known:
|
||||||
_log.info("%s triggered new device %s (%s)", n, dev, dev.kind)
|
if _log.isEnabledFor(_INFO):
|
||||||
|
_log.info("%s triggered new device %s (%s)", n, dev, dev.kind)
|
||||||
# If there are saved configs, bring the device's settings up-to-date.
|
# If there are saved configs, bring the device's settings up-to-date.
|
||||||
# They will be applied when the device is marked as online.
|
# They will be applied when the device is marked as online.
|
||||||
configuration.attach_to(dev)
|
configuration.attach_to(dev)
|
||||||
dev.status = _status.DeviceStatus(dev, self._status_changed)
|
_status.attach_to(dev, self._status_changed)
|
||||||
# the receiver changed status as well
|
# the receiver changed status as well
|
||||||
self._status_changed(self.receiver)
|
self._status_changed(self.receiver)
|
||||||
|
|
||||||
assert dev
|
assert dev
|
||||||
assert dev.status is not None
|
assert dev.status is not None
|
||||||
dev.status.process_notification(n)
|
_notifications.process(dev, n)
|
||||||
if self.receiver.status.lock_open and not already_known:
|
if self.receiver.status.lock_open and not already_known:
|
||||||
# this should be the first notification after a device was paired
|
# this should be the first notification after a device was paired
|
||||||
assert n.sub_id == 0x41 and n.address == 0x04
|
assert n.sub_id == 0x41 and n.address == 0x04
|
||||||
_log.info("%s: pairing detected new device", self.receiver)
|
if _log.isEnabledFor(_INFO):
|
||||||
|
_log.info("%s: pairing detected new device", self.receiver)
|
||||||
self.receiver.status.new_device = dev
|
self.receiver.status.new_device = dev
|
||||||
else:
|
else:
|
||||||
if dev.online is None:
|
if dev.online is None:
|
||||||
|
@ -210,15 +215,16 @@ def _start(device_info):
|
||||||
rl.start()
|
rl.start()
|
||||||
_all_listeners[device_info.path] = rl
|
_all_listeners[device_info.path] = rl
|
||||||
return rl
|
return rl
|
||||||
else:
|
|
||||||
_log.warn("failed to open %s", device_info)
|
_log.warn("failed to open %s", device_info)
|
||||||
|
|
||||||
|
|
||||||
def start_all():
|
def start_all():
|
||||||
# just in case this it called twice in a row...
|
# just in case this it called twice in a row...
|
||||||
stop_all()
|
stop_all()
|
||||||
|
|
||||||
_log.info("starting receiver listening threads")
|
if _log.isEnabledFor(_INFO):
|
||||||
|
_log.info("starting receiver listening threads")
|
||||||
for device_info in _base.receivers():
|
for device_info in _base.receivers():
|
||||||
_process_receiver_event('add', device_info)
|
_process_receiver_event('add', device_info)
|
||||||
|
|
||||||
|
@ -228,7 +234,8 @@ def stop_all():
|
||||||
_all_listeners.clear()
|
_all_listeners.clear()
|
||||||
|
|
||||||
if listeners:
|
if listeners:
|
||||||
_log.info("stopping receiver listening threads %s", listeners)
|
if _log.isEnabledFor(_INFO):
|
||||||
|
_log.info("stopping receiver listening threads %s", listeners)
|
||||||
|
|
||||||
for l in listeners:
|
for l in listeners:
|
||||||
l.stop()
|
l.stop()
|
||||||
|
@ -266,7 +273,8 @@ def _process_receiver_event(action, device_info):
|
||||||
assert device_info is not None
|
assert device_info is not None
|
||||||
assert _error_callback
|
assert _error_callback
|
||||||
|
|
||||||
_log.info("receiver event %s %s", action, device_info)
|
if _log.isEnabledFor(_INFO):
|
||||||
|
_log.info("receiver event %s %s", action, device_info)
|
||||||
|
|
||||||
# whatever the action, stop any previous receivers at this path
|
# whatever the action, stop any previous receivers at this path
|
||||||
l = _all_listeners.pop(device_info.path, None)
|
l = _all_listeners.pop(device_info.path, None)
|
||||||
|
|
Loading…
Reference in New Issue