# # # from __future__ import absolute_import, division, print_function, unicode_literals from time import time as _timestamp from struct import unpack as _unpack from weakref import proxy as _proxy from logging import getLogger, DEBUG as _DEBUG _log = getLogger('LUR.status') del getLogger from .common import NamedInts as _NamedInts from . import hidpp10 as _hidpp10 from . import hidpp20 as _hidpp20 # # # ALERT = _NamedInts(NONE=0x00, LOW=0x01, MED=0x02, HIGH=0xFF) # device properties that may be reported ENCRYPTED='encrypted' BATTERY_LEVEL='battery-level' BATTERY_STATUS='battery-status' LIGHT_LEVEL='light-level' ERROR='error' # if not updates have been receiver from the device for a while, assume # it has gone offline and clear all its know properties. _STATUS_TIMEOUT = 120 # seconds # # # class ReceiverStatus(dict): def __init__(self, receiver, changed_callback): assert receiver self._receiver = _proxy(receiver) assert changed_callback self._changed_callback = changed_callback # self.updated = 0 self.lock_open = False self.new_device = None self[ERROR] = None def __str__(self): count = len(self._receiver) return ('No devices found.' if count == 0 else '1 device found.' if count == 1 else '%d devices found.' % count) __unicode__ = __str__ def _changed(self, alert=ALERT.LOW, reason=None): # self.updated = _timestamp() self._changed_callback(self._receiver, alert=alert, reason=reason) def process_notification(self, n): if n.sub_id == 0x4A: self.lock_open = bool(n.address & 0x01) reason = 'pairing lock is ' + ('open' if self.lock_open else 'closed') _log.info("%s: %s", self._receiver, reason) if self.lock_open: self[ERROR] = None self.new_device = None pair_error = ord(n.data[:1]) if pair_error: self[ERROR] = _hidpp10.PAIRING_ERRORS[pair_error] self.new_device = None _log.warn("pairing error %d: %s", pair_error, self[ERROR]) else: self[ERROR] = None self._changed(reason=reason) return True # # # class DeviceStatus(dict): def __init__(self, device, changed_callback): assert device self._device = _proxy(device) assert changed_callback self._changed_callback = changed_callback self._active = None self.updated = 0 def __str__(self): def _item(name, format): value = self.get(name) if value is not None: return format % value def _items(): battery_level = _item(BATTERY_LEVEL, 'Battery: %d%%') if battery_level: yield battery_level battery_status = _item(BATTERY_STATUS, ' (%s)') if battery_status: yield battery_status light_level = _item(LIGHT_LEVEL, 'Light: %d lux') if light_level: if battery_level: yield ', ' yield light_level return ''.join(i for i in _items()) __unicode__ = __str__ def __bool__(self): return bool(self._active) __nonzero__ = __bool__ def _changed(self, active=True, alert=ALERT.NONE, reason=None, timestamp=None): assert self._changed_callback self._active = active if not active: battery = self.get(BATTERY_LEVEL) self.clear() if battery is not None: self[BATTERY_LEVEL] = battery if self.updated == 0: alert |= ALERT.LOW self.updated = timestamp or _timestamp() # if _log.isEnabledFor(_DEBUG): # _log.debug("device %d changed: active=%s %s", self._device.number, self._active, dict(self)) self._changed_callback(self._device, alert, reason) def poll(self, timestamp): if self._active: d = self._device # read these in case they haven't been read already d.protocol, d.serial, d.firmware if BATTERY_LEVEL not in self: battery = _hidpp10.get_battery(d) if battery is None and d.protocol >= 2.0: battery = _hidpp20.get_battery(d) # if battery is None and _hidpp20.FEATURE.SOLAR_CHARGE in d.features: # d.feature_request(_hidpp20.FEATURE.SOLAR_CHARGE, 0x00, 1, 1) # return if battery: self[BATTERY_LEVEL], self[BATTERY_STATUS] = battery self._changed(timestamp=timestamp) elif BATTERY_STATUS in self: self[BATTERY_STATUS] = None self._changed(timestamp=timestamp) elif len(self) > 0 and timestamp - self.updated > _STATUS_TIMEOUT: # if the device has been inactive for too long, clear out any known # properties, they are most likely obsolete anyway self.clear() self._changed(active=False, alert=ALERT.LOW, timestamp=timestamp) def process_notification(self, n): assert n.sub_id < 0x80 if n.sub_id == 0x40: if n.address == 0x02: # device un-paired self.clear() self._device.status = None self._changed(False, ALERT.HIGH, 'unpaired') else: _log.warn("device %d disconnection notification %s with unknown type %02X", self._device.number, n, n.address) return True if n.sub_id == 0x41: if n.address == 0x04: # unifying protocol # wpid = _strhex(n.data[4:5] + n.data[3:4]) # assert wpid == device.wpid flags = ord(n.data[:1]) & 0xF0 link_encrypyed = bool(flags & 0x20) link_established = not (flags & 0x40) if _log.isEnabledFor(_DEBUG): sw_present = bool(flags & 0x10) has_payload = bool(flags & 0x80) _log.debug("device %d connection notification: software=%s, encrypted=%s, link=%s, payload=%s", self._device.number, sw_present, link_encrypyed, link_established, has_payload) self[ENCRYPTED] = link_encrypyed self._changed(link_established) elif n.address == 0x03: _log.warn("device %d connection notification %s with eQuad protocol, ignored", self._device.number, n) else: _log.warn("device %d connection notification %s with unknown protocol %02X", self._device.number, n, n.address) return True if n.sub_id == 0x49: # raw input event? just ignore it # if n.address == 0x01, no idea what it is, but they keep on coming # if n.address == 0x03, it's an actual input event return True if n.sub_id == 0x4B: if n.address == 0x01: _log.debug("device came online %d", n.devnumber) self._changed(alert=ALERT.LOW, reason='powered on') else: _log.warn("unknown notification %s", n) return True # this must be a feature notification, assuming no device has more than 0x40 features if n.sub_id >= len(self._device.features): _log.warn("device %s got notification from invalid feature index %02X", self._device, n.sub_id) return False try: feature = self._device.features[n.sub_id] except IndexError: _log.warn("device %s got notification from invalid feature index %02X", self._device, n.sub_id) return False if feature == _hidpp20.FEATURE.BATTERY: if n.address == 0x00: discharge = ord(n.data[:1]) battery_status = ord(n.data[1:2]) self[BATTERY_LEVEL] = discharge self[BATTERY_STATUS] = BATTERY_STATUS[battery_status] if _hidpp20.BATTERY_OK(battery_status): alert = ALERT.NONE reason = self[ERROR] = None else: alert = ALERT.MED reason = self[ERROR] = self[BATTERY_STATUS] self._changed(alert=alert, reason=reason) else: _log.warn("don't know how to handle BATTERY notification %s", n) return True if feature == _hidpp20.FEATURE.REPROGRAMMABLE_KEYS: if n.address == 0x00: _log.warn('unknown reprogrammable key: %s', n) else: _log.warn("don't know how to handle REPROGRAMMABLE KEYS notification %s", n) return True if feature == _hidpp20.FEATURE.WIRELESS: if n.address == 0x00: _log.debug("wireless status: %s", n) if n.data[0:3] == b'\x01\x01\x01': self._changed(alert=ALERT.LOW, reason='powered on') else: _log.warn("don't know how to handle WIRELESS notification %s", n) return True if feature == _hidpp20.FEATURE.SOLAR_CHARGE: if n.data[5:9] == b'GOOD': charge, lux, adc = _unpack(b'!BHH', n.data[:5]) self[BATTERY_LEVEL] = charge # guesstimate the battery voltage, emphasis on 'guess' self[BATTERY_STATUS] = '%1.2fV' % (adc * 2.67793237653 / 0x0672) if n.address == 0x00: self[LIGHT_LEVEL] = None self._changed() elif n.address == 0x10: self[LIGHT_LEVEL] = lux if lux > 200: # guesstimate self[BATTERY_STATUS] += ', charging' self._changed() elif n.address == 0x20: _log.debug("Solar key pressed") # first cancel any reporting self._device.feature_request(_hidpp20.FEATURE.SOLAR_CHARGE) reports_count = 15 reports_period = 2 # seconds self._changed(alert=ALERT.MED) # trigger a new report chain self._device.feature_request(_hidpp20.FEATURE.SOLAR_CHARGE, 0x00, reports_count, reports_period) else: self._changed() else: _log.warn("SOLAR CHARGE notification not GOOD? %s", n) return True if feature == _hidpp20.FEATURE.TOUCH_MOUSE: if n.address == 0x00: _log.debug("TOUCH MOUSE points notification: %s", n) elif n.address == 0x10: touch = ord(n.data[:1]) button_down = bool(touch & 0x02) mouse_lifted = bool(touch & 0x01) _log.debug("TOUCH MOUSE status: button_down=%s mouse_lifted=%s", button_down, mouse_lifted) return True _log.warn("don't know how to handle %s for feature %s (%02X)", n, feature, n.sub_id)