made notifications handling clearer in status.py

This commit is contained in:
Daniel Pavel 2012-12-12 21:39:04 +02:00
parent 19cd40cfdd
commit 0ed623caf9
1 changed files with 61 additions and 31 deletions

View File

@ -143,7 +143,11 @@ class DeviceStatus(dict):
def poll(self, timestamp):
if self._active:
d = self._device
# read these in case they haven't been read already
if not d:
_log.error("polling status of invalid device")
return
# read these from the device in case they haven't been read already
d.protocol, d.serial, d.firmware
if BATTERY_LEVEL not in self:
@ -151,6 +155,8 @@ class DeviceStatus(dict):
if battery is None and d.protocol >= 2.0:
battery = _hidpp20.get_battery(d)
# really unnecessary, if the device has SOLAR_CHARGE it should be
# broadcasting it's battery status anyway, it will just take a little while
# if battery is None and _hidpp20.FEATURE.SOLAR_CHARGE in d.features:
# d.feature_request(_hidpp20.FEATURE.SOLAR_CHARGE, 0x00, 1, 1)
# return
@ -162,6 +168,9 @@ class DeviceStatus(dict):
self[BATTERY_STATUS] = None
self._changed(timestamp=timestamp)
# make sure we know all the features of the device
# d.features[:]
elif len(self) > 0 and timestamp - self.updated > _STATUS_TIMEOUT:
# if the device has been inactive for too long, clear out any known
# properties, they are most likely obsolete anyway
@ -169,8 +178,28 @@ class DeviceStatus(dict):
self._changed(active=False, alert=ALERT.LOW, timestamp=timestamp)
def process_notification(self, n):
# incoming packets with SubId >= 0x80 are supposedly replies from
# HID++ 1.0 requests, should never get here
assert n.sub_id < 0x80
# 0x40 to 0x7F appear to be HID++ 1.0 notifications
if n.sub_id >= 0x40:
return self._process_hidpp10_notification(n)
# if n.sub_id >= len(self._device.features):
# _log.warn("%s: notification from invalid feature index %02X", self._device, n.sub_id)
# return False
# assuming 0x00 to 0x3F are device feature (HID++ 2.0) notifications
try:
feature = self._device.features[n.sub_id]
except IndexError:
_log.warn("%s: notification from invalid feature index %02X: %s", self._device, n.sub_id, n)
return False
return self._process_feature_notification(n, feature)
def _process_hidpp10_notification(self, n):
if n.sub_id == 0x40:
if n.address == 0x02:
# device un-paired
@ -178,7 +207,7 @@ class DeviceStatus(dict):
self._device.status = None
self._changed(False, ALERT.HIGH, 'unpaired')
else:
_log.warn("device %d disconnection notification %s with unknown type %02X", self._device.number, n, n.address)
_log.warn("%s: disconnection with unknown type %02X: %s", self._device, n.address, n)
return True
if n.sub_id == 0x41:
@ -192,16 +221,16 @@ class DeviceStatus(dict):
if _log.isEnabledFor(_DEBUG):
sw_present = bool(flags & 0x10)
has_payload = bool(flags & 0x80)
_log.debug("device %d connection notification: software=%s, encrypted=%s, link=%s, payload=%s",
self._device.number, sw_present, link_encrypyed, link_established, has_payload)
_log.debug("%s: connection notification: software=%s, encrypted=%s, link=%s, payload=%s",
self._device, sw_present, link_encrypyed, link_established, has_payload)
self[ENCRYPTED] = link_encrypyed
self._changed(link_established)
elif n.address == 0x03:
_log.warn("device %d connection notification %s with eQuad protocol, ignored", self._device.number, n)
_log.warn("%s: connection notification with eQuad protocol, ignored: %s", self._device.number, n)
else:
_log.warn("device %d connection notification %s with unknown protocol %02X", self._device.number, n, n.address)
_log.warn("%s: connection notification with unknown protocol %02X: %s", self._device.number, n.address, n)
return True
@ -213,23 +242,16 @@ class DeviceStatus(dict):
if n.sub_id == 0x4B:
if n.address == 0x01:
_log.debug("device came online %d", n.devnumber)
if _log.isEnabledFor(_DEBUG):
_log.debug("%s: device powered on", self._device)
self._changed(alert=ALERT.LOW, reason='powered on')
else:
_log.warn("unknown notification %s", n)
_log.info("%s: unknown %s", self._device, n)
return True
# this must be a feature notification, assuming no device has more than 0x40 features
if n.sub_id >= len(self._device.features):
_log.warn("device %s got notification from invalid feature index %02X", self._device, n.sub_id)
return False
try:
feature = self._device.features[n.sub_id]
except IndexError:
_log.warn("device %s got notification from invalid feature index %02X", self._device, n.sub_id)
return False
_log.warn("%s: unrecognized %s", self._device, n)
def _process_feature_notification(self, n, feature):
if feature == _hidpp20.FEATURE.BATTERY:
if n.address == 0x00:
discharge = ord(n.data[:1])
@ -239,28 +261,34 @@ class DeviceStatus(dict):
if _hidpp20.BATTERY_OK(battery_status):
alert = ALERT.NONE
reason = self[ERROR] = None
if _log.isEnabledFor(_DEBUG):
_log.debug("%s: battery %d% charged, %s", self._device, discharge, self[BATTERY_STATUS])
else:
alert = ALERT.MED
reason = self[ERROR] = self[BATTERY_STATUS]
_log.warn("%s: battery %d% charged, ALERT %s", self._device, discharge, reason)
self._changed(alert=alert, reason=reason)
else:
_log.warn("don't know how to handle BATTERY notification %s", n)
_log.info("%s: unknown BATTERY %s", self._device, n)
return True
if feature == _hidpp20.FEATURE.REPROGRAMMABLE_KEYS:
if n.address == 0x00:
_log.warn('unknown reprogrammable key: %s', n)
_log.info("%s: reprogrammable key: %s", self._device, n)
else:
_log.warn("don't know how to handle REPROGRAMMABLE KEYS notification %s", n)
_log.info("%s: unknown REPROGRAMMABLE KEYS %s", self._device, n)
return True
if feature == _hidpp20.FEATURE.WIRELESS:
if n.address == 0x00:
_log.debug("wireless status: %s", n)
if _log.isEnabledFor(_DEBUG):
_log.debug("wireless status: %s", n)
if n.data[0:3] == b'\x01\x01\x01':
self._changed(alert=ALERT.LOW, reason='powered on')
else:
_log.info("%s: unknown WIRELESS %s", self._device, n)
else:
_log.warn("don't know how to handle WIRELESS notification %s", n)
_log.info("%s: unknown WIRELESS %s", self._device, n)
return True
if feature == _hidpp20.FEATURE.SOLAR_CHARGE:
@ -278,28 +306,30 @@ class DeviceStatus(dict):
self[BATTERY_STATUS] += ', charging'
self._changed()
elif n.address == 0x20:
_log.debug("Solar key pressed")
_log.debug("%s: Solar key pressed", self._device)
self._changed(alert=ALERT.MED)
# first cancel any reporting
self._device.feature_request(_hidpp20.FEATURE.SOLAR_CHARGE)
# trigger a new report chain
reports_count = 15
reports_period = 2 # seconds
self._changed(alert=ALERT.MED)
# trigger a new report chain
self._device.feature_request(_hidpp20.FEATURE.SOLAR_CHARGE, 0x00, reports_count, reports_period)
else:
self._changed()
_log.info("%s: unknown SOLAR CHAGE %s", self._device, n)
else:
_log.warn("SOLAR CHARGE notification not GOOD? %s", n)
_log.warn("%s: SOLAR CHARGE not GOOD? %s", self._device, n)
return True
if feature == _hidpp20.FEATURE.TOUCH_MOUSE:
if n.address == 0x00:
_log.debug("TOUCH MOUSE points notification: %s", n)
_log.info("%s: TOUCH MOUSE points %s", self._device, n)
elif n.address == 0x10:
touch = ord(n.data[:1])
button_down = bool(touch & 0x02)
mouse_lifted = bool(touch & 0x01)
_log.debug("TOUCH MOUSE status: button_down=%s mouse_lifted=%s", button_down, mouse_lifted)
_log.info("%s: TOUCH MOUSE status: button_down=%s mouse_lifted=%s", self._device, button_down, mouse_lifted)
else:
_log.info("%s: unknown TOUCH MOUSE %s", self._device, n)
return True
_log.warn("don't know how to handle %s for feature %s (%02X)", n, feature, n.sub_id)
_log.info("%s: unrecognized %s for feature %s (index %02X)", self._device, n, feature, n.sub_id)