diff --git a/lib/logitech/unifying_receiver/base.py b/lib/logitech/unifying_receiver/base.py index 9e3e4a71..5e74dce0 100644 --- a/lib/logitech/unifying_receiver/base.py +++ b/lib/logitech/unifying_receiver/base.py @@ -8,20 +8,12 @@ from __future__ import absolute_import, division, print_function, unicode_litera from time import time as _timestamp from random import getrandbits as _random_bits -from struct import pack as _pack -try: - unicode - # if Python2, unicode_literals will mess our first (un)pack() argument - _pack_str = _pack - _pack = lambda x, *args: _pack_str(str(x), *args) -except: - pass - from logging import getLogger, DEBUG as _DEBUG _log = getLogger('LUR.base') del getLogger -from .common import strhex as _strhex, KwException as _KwException + +from .common import strhex as _strhex, KwException as _KwException, pack as _pack from . import hidpp10 as _hidpp10 from . import hidpp20 as _hidpp20 import hidapi as _hid @@ -242,7 +234,8 @@ def _skip_incoming(handle, ihandle, notifications_hook): report_id = ord(data[:1]) assert (report_id == 0x10 and len(data) == _SHORT_MESSAGE_SIZE or report_id == 0x11 and len(data) == _LONG_MESSAGE_SIZE or - report_id == 0x20 and len(data) == _MEDIUM_MESSAGE_SIZE) + report_id == 0x20 and len(data) == _MEDIUM_MESSAGE_SIZE), \ + "unexpected message size: report_id %02X message %s" % (report_id, _strhex(data)) if notifications_hook: n = make_notification(ord(data[1:2]), data[2:]) if n: diff --git a/lib/logitech/unifying_receiver/common.py b/lib/logitech/unifying_receiver/common.py index 1b4bf790..692722d0 100644 --- a/lib/logitech/unifying_receiver/common.py +++ b/lib/logitech/unifying_receiver/common.py @@ -5,14 +5,14 @@ from __future__ import absolute_import, division, print_function, unicode_literals from binascii import hexlify as _hexlify -from struct import pack as _pack, unpack as _unpack +from struct import pack, unpack try: unicode # if Python2, unicode_literals will mess our first (un)pack() argument - _pack_str = _pack - _unpack_str = _unpack - _pack = lambda x, *args: _pack_str(str(x), *args) - _unpack = lambda x, *args: _unpack_str(str(x), *args) + _pack_str = pack + _unpack_str = unpack + pack = lambda x, *args: _pack_str(str(x), *args) + unpack = lambda x, *args: _unpack_str(str(x), *args) except: pass @@ -57,7 +57,6 @@ class NamedInt(int): if other is not None: raise TypeError("Unsupported type " + str(type(other))) - def __ne__(self, other): return not self.__eq__(other) @@ -205,7 +204,7 @@ def bytes2int(x): assert isinstance(x, bytes) assert len(x) < 9 qx = (b'\x00' * 8) + x - result, = _unpack('!Q', qx[-8:]) + result, = unpack('!Q', qx[-8:]) # assert x == int2bytes(result, len(x)) return result @@ -216,7 +215,7 @@ def int2bytes(x, count=None): If 'count' is not given, the necessary number of bytes is computed. """ assert isinstance(x, int) - result = _pack('!Q', x) + result = pack('!Q', x) assert isinstance(result, bytes) # assert x == bytes2int(result) diff --git a/lib/logitech/unifying_receiver/hidpp20.py b/lib/logitech/unifying_receiver/hidpp20.py index 4b40d52c..638e2f23 100644 --- a/lib/logitech/unifying_receiver/hidpp20.py +++ b/lib/logitech/unifying_receiver/hidpp20.py @@ -8,23 +8,15 @@ from logging import getLogger, DEBUG as _DEBUG _log = getLogger('LUR.hidpp20') del getLogger -from struct import pack as _pack, unpack as _unpack -try: - unicode - # if Python2, unicode_literals will mess our first (un)pack() argument - _pack_str = _pack - _unpack_str = _unpack - _pack = lambda x, *args: _pack_str(str(x), *args) - _unpack = lambda x, *args: _unpack_str(str(x), *args) -except: - pass # from weakref import proxy as _proxy from .common import (FirmwareInfo as _FirmwareInfo, ReprogrammableKeyInfo as _ReprogrammableKeyInfo, KwException as _KwException, - NamedInts as _NamedInts) + NamedInts as _NamedInts, + pack as _pack, + unpack as _unpack) from . import special_keys # diff --git a/lib/logitech/unifying_receiver/notifications.py b/lib/logitech/unifying_receiver/notifications.py new file mode 100644 index 00000000..75affa7b --- /dev/null +++ b/lib/logitech/unifying_receiver/notifications.py @@ -0,0 +1,249 @@ +# +# Handles incoming events from the receiver/devices, updating the related +# status object as appropiate. +# + +from __future__ import absolute_import, division, print_function, unicode_literals + +from logging import getLogger, DEBUG as _DEBUG +_log = getLogger('LUR.notifications') +del getLogger + +from .common import strhex as _strhex, unpack as _unpack +from . import hidpp10 as _hidpp10 +from . import hidpp20 as _hidpp20 +from .status import KEYS as _K, ALERT as _ALERT + +_R = _hidpp10.REGISTERS +_F = _hidpp20.FEATURE + +# +# +# + +def process(device, notification): + assert device + assert notification + + assert hasattr(device, 'status') + status = device.status + assert status is not None + + if device.kind is None: + return _process_receiver_notification(device, status, notification) + + return _process_device_notification(device, status, notification) + +# +# +# + +def _process_receiver_notification(receiver, status, n): + # supposedly only 0x4x notifications arrive for the receiver + assert n.sub_id & 0x40 == 0x40 + + # pairing lock notification + if n.sub_id == 0x4A: + status.lock_open = bool(n.address & 0x01) + reason = 'pairing lock is ' + ('open' if status.lock_open else 'closed') + _log.info("%s: %s", receiver, reason) + + status[_K.ERROR] = None + if status.lock_open: + status.new_device = None + + pair_error = ord(n.data[:1]) + if pair_error: + status[_K.ERROR] = error_string = _hidpp10.PAIRING_ERRORS[pair_error] + status.new_device = None + _log.warn("pairing error %d: %s", pair_error, error_string) + + status.changed(reason=reason) + return True + + _log.warn("%s: unhandled notification %s", receiver, n) + +# +# +# + +def _process_device_notification(device, status, n): + # incoming packets with SubId >= 0x80 are supposedly replies from + # HID++ 1.0 requests, should never get here + assert n.sub_id & 0x80 == 0 + + # 0x40 to 0x7F appear to be HID++ 1.0 notifications + if n.sub_id >= 0x40: + return _process_hidpp10_notification(device, status, n) + + # At this point, we need to know the device's protocol, otherwise it's + # possible to not know how to handle it. + assert device.protocol is not None + + # some custom battery events for HID++ 1.0 devices + if device.protocol < 2.0: + return _process_hidpp10_custom_notification(device, status, n) + + # assuming 0x00 to 0x3F are feature (HID++ 2.0) notifications + assert device.features + try: + feature = device.features[n.sub_id] + except IndexError: + _log.warn("%s: notification from invalid feature index %02X: %s", device, n.sub_id, n) + return False + + return _process_feature_notification(device, status, n, feature) + + +def _process_hidpp10_custom_notification(device, status, n): + if _log.isEnabledFor(_DEBUG): + _log.debug("%s (%s) custom notification %s", device, device.protocol, n) + + if n.sub_id in (_R.battery_status, _R.battery_charge): + assert n.data[-1:] == b'\x00' + # message layout: 10 ix <00> + data = '%c%s' % (n.address, n.data) + charge, status_text = _hidpp10.parse_battery_status(n.sub_id, data) + status.set_battery_info(charge, status_text) + return True + + if n.sub_id == _R.illumination: + # message layout: 10 ix 17("address") + # TODO anything we can do with this? + _log.info("illumination event: %s", n) + return True + + _log.warn("%s: unrecognized %s", device, n) + + +def _process_hidpp10_notification(device, status, n): + # unpair notification + if n.sub_id == 0x40: + if n.address == 0x02: + # device un-paired + status.clear() + device.wpid = None + device.status = None + status.changed(active=False, alert=_ALERT.ALL, reason='unpaired') + else: + _log.warn("%s: disconnection with unknown type %02X: %s", device, n.address, n) + return True + + # wireless link notification + if n.sub_id == 0x41: + protocol_name = ('unifying (eQuad DJ)' if n.address == 0x04 + else 'eQuad' if n.address == 0x03 + else None) + if protocol_name: + if _log.isEnabledFor(_DEBUG): + wpid = _strhex(n.data[2:3] + n.data[1:2]) + assert wpid == device.wpid, "%s wpid mismatch, got %s" % (device, wpid) + + flags = ord(n.data[:1]) & 0xF0 + link_encrypyed = bool(flags & 0x20) + link_established = not (flags & 0x40) + if _log.isEnabledFor(_DEBUG): + sw_present = bool(flags & 0x10) + has_payload = bool(flags & 0x80) + _log.debug("%s: %s connection notification: software=%s, encrypted=%s, link=%s, payload=%s", + device, protocol_name, sw_present, link_encrypyed, link_established, has_payload) + status[_K.LINK_ENCRYPTED] = link_encrypyed + status.changed(active=link_established) + else: + _log.warn("%s: connection notification with unknown protocol %02X: %s", device.number, n.address, n) + + return True + + if n.sub_id == 0x49: + # raw input event? just ignore it + # if n.address == 0x01, no idea what it is, but they keep on coming + # if n.address == 0x03, appears to be an actual input event, + # because they only come when input happents + return True + + # power notification + if n.sub_id == 0x4B: + if n.address == 0x01: + if _log.isEnabledFor(_DEBUG): + _log.debug("%s: device powered on", device) + reason = str(status) or 'powered on' + status.changed(active=True, alert=_ALERT.NOTIFICATION, reason=reason) + else: + _log.info("%s: unknown %s", device, n) + return True + + _log.warn("%s: unrecognized %s", device, n) + + +def _process_feature_notification(device, status, n, feature): + if feature == _F.BATTERY_STATUS: + if n.address == 0x00: + discharge = ord(n.data[:1]) + battery_status = ord(n.data[1:2]) + status.set_battery_info(discharge, _hidpp20.BATTERY_STATUS[battery_status]) + else: + _log.info("%s: unknown BATTERY %s", device, n) + return True + + # TODO: what are REPROG_CONTROLS_V{2,3}? + if feature == _F.REPROG_CONTROLS: + if n.address == 0x00: + _log.info("%s: reprogrammable key: %s", device, n) + else: + _log.info("%s: unknown REPROGRAMMABLE KEYS %s", device, n) + return True + + if feature == _F.WIRELESS_DEVICE_STATUS: + if n.address == 0x00: + if _log.isEnabledFor(_DEBUG): + _log.debug("wireless status: %s", n) + if n.data[0:3] == b'\x01\x01\x01': + status.changed(active=True, alert=_ALERT.NOTIFICATION, reason='powered on') + else: + _log.info("%s: unknown WIRELESS %s", device, n) + else: + _log.info("%s: unknown WIRELESS %s", device, n) + return True + + if feature == _F.SOLAR_DASHBOARD: + if n.data[5:9] == b'GOOD': + charge, lux, adc = _unpack('!BHH', n.data[:5]) + status[_K.BATTERY_LEVEL] = charge + # guesstimate the battery voltage, emphasis on 'guess' + status[_K.BATTERY_STATUS] = '%1.2fV' % (adc * 2.67793237653 / 0x0672) + if n.address == 0x00: + status[_K.LIGHT_LEVEL] = None + status[_K.BATTERY_CHARGING] = None + status.changed(active=True) + elif n.address == 0x10: + status[_K.LIGHT_LEVEL] = lux + status[_K.BATTERY_CHARGING] = lux > 200 + status.changed(active=True) + elif n.address == 0x20: + _log.debug("%s: Light Check button pressed", device) + status.changed(alert=_ALERT.SHOW_WINDOW) + # first cancel any reporting + # device.feature_request(_F.SOLAR_DASHBOARD) + # trigger a new report chain + reports_count = 15 + reports_period = 2 # seconds + device.feature_request(_F.SOLAR_DASHBOARD, 0x00, reports_count, reports_period) + else: + _log.info("%s: unknown SOLAR CHAGE %s", device, n) + else: + _log.warn("%s: SOLAR CHARGE not GOOD? %s", device, n) + return True + + if feature == _F.TOUCHMOUSE_RAW_POINTS: + if n.address == 0x00: + _log.info("%s: TOUCH MOUSE points %s", device, n) + elif n.address == 0x10: + touch = ord(n.data[:1]) + button_down = bool(touch & 0x02) + mouse_lifted = bool(touch & 0x01) + _log.info("%s: TOUCH MOUSE status: button_down=%s mouse_lifted=%s", device, button_down, mouse_lifted) + else: + _log.warn("%s: unknown TOUCH MOUSE %s", device, n) + return True + + _log.info("%s: unrecognized %s for feature %s (index %02X)", device, n, feature, n.sub_id) diff --git a/lib/logitech/unifying_receiver/status.py b/lib/logitech/unifying_receiver/status.py index ef326256..9f1ab3ee 100644 --- a/lib/logitech/unifying_receiver/status.py +++ b/lib/logitech/unifying_receiver/status.py @@ -7,20 +7,12 @@ from __future__ import absolute_import, division, print_function, unicode_litera from time import time as _timestamp # from weakref import proxy as _proxy -from struct import unpack as _unpack -try: - unicode - # if Python2, unicode_literals will mess our first (un)pack() argument - _unpack_str = _unpack - _unpack = lambda x, *args: _unpack_str(str(x), *args) -except: - pass - from logging import getLogger, DEBUG as _DEBUG _log = getLogger('LUR.status') del getLogger -from .common import NamedInts as _NamedInts, NamedInt as _NamedInt, strhex as _strhex + +from .common import NamedInts as _NamedInts, NamedInt as _NamedInt from . import hidpp10 as _hidpp10 from . import hidpp20 as _hidpp20 @@ -48,7 +40,20 @@ _BATTERY_ATTENTION_LEVEL = 5 # If no updates have been receiver from the device for a while, ping the device # and update it status accordinly. -_STATUS_TIMEOUT = 5 * 60 # seconds +# _STATUS_TIMEOUT = 5 * 60 # seconds + +# +# +# + +def attach_to(device, changed_callback): + assert device + assert changed_callback + + if device.kind is None: + device.status = ReceiverStatus(device, changed_callback) + else: + device.status = DeviceStatus(device, changed_callback) # # @@ -71,7 +76,6 @@ class ReceiverStatus(dict): self.new_device = None self[KEYS.ERROR] = None - # self[KEYS.NOTIFICATION_FLAGS] = receiver.enable_notifications() def __str__(self): count = len(self._receiver) @@ -80,7 +84,7 @@ class ReceiverStatus(dict): '%d paired devices.' % count) __unicode__ = __str__ - def _changed(self, alert=ALERT.NOTIFICATION, reason=None): + def changed(self, alert=ALERT.NOTIFICATION, reason=None): # self.updated = _timestamp() self._changed_callback(self._receiver, alert=alert, reason=reason) @@ -97,25 +101,6 @@ class ReceiverStatus(dict): # # get an update of the notification flags # # self[KEYS.NOTIFICATION_FLAGS] = _hidpp10.get_notification_flags(r) - def process_notification(self, n): - if n.sub_id == 0x4A: - self.lock_open = bool(n.address & 0x01) - reason = 'pairing lock is ' + ('open' if self.lock_open else 'closed') - _log.info("%s: %s", self._receiver, reason) - - self[KEYS.ERROR] = None - if self.lock_open: - self.new_device = None - - pair_error = ord(n.data[:1]) - if pair_error: - self[KEYS.ERROR] = error_string = _hidpp10.PAIRING_ERRORS[pair_error] - self.new_device = None - _log.warn("pairing error %d: %s", pair_error, error_string) - - self._changed(reason=reason) - return True - # # # @@ -203,7 +188,7 @@ class DeviceStatus(dict): if changed or reason: # update the leds on the device, if any _hidpp10.set_3leds(self._device, level, charging=charging, warning=bool(alert)) - self._changed(alert=alert, reason=reason, timestamp=timestamp) + self.changed(alert=alert, reason=reason, timestamp=timestamp) def read_battery(self, timestamp=None): if self._active: @@ -229,9 +214,9 @@ class DeviceStatus(dict): elif KEYS.BATTERY_STATUS in self: self[KEYS.BATTERY_STATUS] = None self[KEYS.BATTERY_CHARGING] = None - self._changed() + self.changed() - def _changed(self, active=None, alert=ALERT.NONE, reason=None, timestamp=None): + def changed(self, active=None, alert=ALERT.NONE, reason=None, timestamp=None): assert self._changed_callback d = self._device # assert d # may be invalid when processing the 'unpaired' notification @@ -296,7 +281,7 @@ class DeviceStatus(dict): # if d.ping(): # timestamp = self.updated = _timestamp() # else: - # self._changed(active=False, reason='out of range') + # self.changed(active=False, reason='out of range') # # # if still active, make sure we know the battery level # if KEYS.BATTERY_LEVEL not in self: @@ -304,182 +289,6 @@ class DeviceStatus(dict): # # elif timestamp - self.updated > _STATUS_TIMEOUT: # if d.ping(): - # self._changed(active=True) + # self.changed(active=True) # else: # self.updated = _timestamp() - - def process_notification(self, n): - # incoming packets with SubId >= 0x80 are supposedly replies from - # HID++ 1.0 requests, should never get here - assert n.sub_id < 0x80 - - # 0x40 to 0x7F appear to be HID++ 1.0 notifications - if n.sub_id >= 0x40: - return self._process_hidpp10_notification(n) - - # some custom battery events for HID++ 1.0 devices - if self._device.protocol < 2.0: - # README assuming HID++ 2.0 devices don't use the 0x07/0x0D registers - # however, this has not been fully verified yet - if n.sub_id in (_R.battery_charge, _R.battery_status) and len(n.data) == 3 and n.data[2:3] == b'\x00': - return self._process_hidpp10_custom_notification(n) - if n.sub_id == _R.illumination and len(n.data) == 3: - return self._process_hidpp10_custom_notification(n) - else: - # assuming 0x00 to 0x3F are feature (HID++ 2.0) notifications - try: - feature = self._device.features[n.sub_id] - except IndexError: - _log.warn("%s: notification from invalid feature index %02X: %s", self._device, n.sub_id, n) - return False - - return self._process_feature_notification(n, feature) - - def _process_hidpp10_custom_notification(self, n): - if _log.isEnabledFor(_DEBUG): - _log.debug("%s (%s) custom battery notification %s", self._device, self._device.protocol, n) - - if n.sub_id in (_R.battery_status, _R.battery_charge): - data = '%c%s' % (n.address, n.data) - level, status = _hidpp10.parse_battery_status(n.sub_id, data) - self.set_battery_info(level, status) - return True - - if n.sub_id == _R.illumination: - # message layout: 10 ix 17("address") - # TODO anything we can do with this? - _log.info("illumination event: %s", n) - return True - - _log.warn("%s: unrecognized %s", self._device, n) - - def _process_hidpp10_notification(self, n): - # unpair notification - if n.sub_id == 0x40: - if n.address == 0x02: - # device un-paired - self.clear() - dev = self._device - dev.wpid = None - dev.status = None - self._changed(active=False, alert=ALERT.ALL, reason='unpaired') - else: - _log.warn("%s: disconnection with unknown type %02X: %s", self._device, n.address, n) - return True - - # wireless link notification - if n.sub_id == 0x41: - protocol_name = ('unifying (eQuad DJ)' if n.address == 0x04 - else 'eQuad' if n.address == 0x03 - else None) - if protocol_name: - if _log.isEnabledFor(_DEBUG): - wpid = _strhex(n.data[2:3] + n.data[1:2]) - assert wpid == self._device.wpid, "%s wpid mismatch, got %s" % (self._device, wpid) - - flags = ord(n.data[:1]) & 0xF0 - link_encrypyed = bool(flags & 0x20) - link_established = not (flags & 0x40) - if _log.isEnabledFor(_DEBUG): - sw_present = bool(flags & 0x10) - has_payload = bool(flags & 0x80) - _log.debug("%s: %s connection notification: software=%s, encrypted=%s, link=%s, payload=%s", - self._device, protocol_name, sw_present, link_encrypyed, link_established, has_payload) - self[KEYS.LINK_ENCRYPTED] = link_encrypyed - self._changed(active=link_established) - else: - _log.warn("%s: connection notification with unknown protocol %02X: %s", self._device.number, n.address, n) - - return True - - if n.sub_id == 0x49: - # raw input event? just ignore it - # if n.address == 0x01, no idea what it is, but they keep on coming - # if n.address == 0x03, it's an actual input event - return True - - # power notification - if n.sub_id == 0x4B: - if n.address == 0x01: - if _log.isEnabledFor(_DEBUG): - _log.debug("%s: device powered on", self._device) - reason = str(self) or 'powered on' - self._changed(active=True, alert=ALERT.NOTIFICATION, reason=reason) - else: - _log.info("%s: unknown %s", self._device, n) - return True - - _log.warn("%s: unrecognized %s", self._device, n) - - def _process_feature_notification(self, n, feature): - if feature == _hidpp20.FEATURE.BATTERY_STATUS: - if n.address == 0x00: - discharge = ord(n.data[:1]) - battery_status = ord(n.data[1:2]) - self.set_battery_info(discharge, _hidpp20.BATTERY_STATUS[battery_status]) - else: - _log.info("%s: unknown BATTERY %s", self._device, n) - return True - - # TODO: what are REPROG_CONTROLS_V{2,3}? - if feature == _hidpp20.FEATURE.REPROG_CONTROLS: - if n.address == 0x00: - _log.info("%s: reprogrammable key: %s", self._device, n) - else: - _log.info("%s: unknown REPROGRAMMABLE KEYS %s", self._device, n) - return True - - if feature == _hidpp20.FEATURE.WIRELESS_DEVICE_STATUS: - if n.address == 0x00: - if _log.isEnabledFor(_DEBUG): - _log.debug("wireless status: %s", n) - if n.data[0:3] == b'\x01\x01\x01': - self._changed(active=True, alert=ALERT.NOTIFICATION, reason='powered on') - else: - _log.info("%s: unknown WIRELESS %s", self._device, n) - else: - _log.info("%s: unknown WIRELESS %s", self._device, n) - return True - - if feature == _hidpp20.FEATURE.SOLAR_DASHBOARD: - if n.data[5:9] == b'GOOD': - charge, lux, adc = _unpack('!BHH', n.data[:5]) - self[KEYS.BATTERY_LEVEL] = charge - # guesstimate the battery voltage, emphasis on 'guess' - self[KEYS.BATTERY_STATUS] = '%1.2fV' % (adc * 2.67793237653 / 0x0672) - if n.address == 0x00: - self[KEYS.LIGHT_LEVEL] = None - self[KEYS.BATTERY_CHARGING] = None - self._changed(active=True) - elif n.address == 0x10: - self[KEYS.LIGHT_LEVEL] = lux - self[KEYS.BATTERY_CHARGING] = lux > 200 - self._changed(active=True) - elif n.address == 0x20: - _log.debug("%s: Light Check button pressed", self._device) - self._changed(alert=ALERT.SHOW_WINDOW) - # first cancel any reporting - # self._device.feature_request(_hidpp20.FEATURE.SOLAR_DASHBOARD) - # trigger a new report chain - reports_count = 15 - reports_period = 2 # seconds - self._device.feature_request(_hidpp20.FEATURE.SOLAR_DASHBOARD, 0x00, reports_count, reports_period) - else: - _log.info("%s: unknown SOLAR CHAGE %s", self._device, n) - else: - _log.warn("%s: SOLAR CHARGE not GOOD? %s", self._device, n) - return True - - if feature == _hidpp20.FEATURE.TOUCHMOUSE_RAW_POINTS: - if n.address == 0x00: - _log.info("%s: TOUCH MOUSE points %s", self._device, n) - elif n.address == 0x10: - touch = ord(n.data[:1]) - button_down = bool(touch & 0x02) - mouse_lifted = bool(touch & 0x01) - _log.info("%s: TOUCH MOUSE status: button_down=%s mouse_lifted=%s", self._device, button_down, mouse_lifted) - else: - _log.info("%s: unknown TOUCH MOUSE %s", self._device, n) - return True - - _log.info("%s: unrecognized %s for feature %s (index %02X)", self._device, n, feature, n.sub_id) diff --git a/lib/solaar/listener.py b/lib/solaar/listener.py index 58bf689f..deb620da 100644 --- a/lib/solaar/listener.py +++ b/lib/solaar/listener.py @@ -4,14 +4,15 @@ from __future__ import absolute_import, division, print_function, unicode_literals -from logging import getLogger, DEBUG as _DEBUG +from logging import getLogger, INFO as _INFO _log = getLogger(__name__) del getLogger from . import configuration from logitech.unifying_receiver import (Receiver, listener as _listener, - status as _status) + status as _status, + notifications as _notifications) # # @@ -52,10 +53,11 @@ class ReceiverListener(_listener.EventsListener): assert status_changed_callback self.status_changed_callback = status_changed_callback - receiver.status = _status.ReceiverStatus(receiver, self._status_changed) + _status.attach_to(receiver, self._status_changed) def has_started(self): - _log.info("%s: notifications listener has started (%s)", self.receiver, self.receiver.handle) + if _log.isEnabledFor(_INFO): + _log.info("%s: notifications listener has started (%s)", self.receiver, self.receiver.handle) notification_flags = self.receiver.enable_notifications() self.receiver.status[_status.KEYS.NOTIFICATION_FLAGS] = notification_flags self.receiver.notify_devices() @@ -64,7 +66,8 @@ class ReceiverListener(_listener.EventsListener): def has_stopped(self): r, self.receiver = self.receiver, None assert r is not None - _log.info("%s: notifications listener has stopped", r) + if _log.isEnabledFor(_INFO): + _log.info("%s: notifications listener has stopped", r) # because udev is not notifying us about device removal, # make sure to clean up in _all_listeners @@ -117,15 +120,16 @@ class ReceiverListener(_listener.EventsListener): def _status_changed(self, device, alert=_status.ALERT.NONE, reason=None): assert device is not None - if device.kind is None: - _log.info("status_changed %s: %s, %s (%X) %s", device, - 'present' if bool(device) else 'removed', - device.status, alert, reason or '') - else: - _log.info("status_changed %s: %s %s, %s (%X) %s", device, - 'paired' if bool(device) else 'unpaired', - 'online' if device.online else 'offline', - device.status, alert, reason or '') + if _log.isEnabledFor(_INFO): + if device.kind is None: + _log.info("status_changed %s: %s, %s (%X) %s", device, + 'present' if bool(device) else 'removed', + device.status, alert, reason or '') + else: + _log.info("status_changed %s: %s %s, %s (%X) %s", device, + 'paired' if bool(device) else 'unpaired', + 'online' if device.online else 'offline', + device.status, alert, reason or '') if device.kind is None: assert device == self.receiver @@ -152,8 +156,7 @@ class ReceiverListener(_listener.EventsListener): # _log.debug("%s: handling %s", self.receiver, n) if n.devnumber == 0xFF: # a receiver notification - if self.receiver.status is not None: - self.receiver.status.process_notification(n) + _notifications.process(self.receiver, n) return # a device notification @@ -169,21 +172,23 @@ class ReceiverListener(_listener.EventsListener): return if not already_known: - _log.info("%s triggered new device %s (%s)", n, dev, dev.kind) + if _log.isEnabledFor(_INFO): + _log.info("%s triggered new device %s (%s)", n, dev, dev.kind) # If there are saved configs, bring the device's settings up-to-date. # They will be applied when the device is marked as online. configuration.attach_to(dev) - dev.status = _status.DeviceStatus(dev, self._status_changed) + _status.attach_to(dev, self._status_changed) # the receiver changed status as well self._status_changed(self.receiver) assert dev assert dev.status is not None - dev.status.process_notification(n) + _notifications.process(dev, n) if self.receiver.status.lock_open and not already_known: # this should be the first notification after a device was paired assert n.sub_id == 0x41 and n.address == 0x04 - _log.info("%s: pairing detected new device", self.receiver) + if _log.isEnabledFor(_INFO): + _log.info("%s: pairing detected new device", self.receiver) self.receiver.status.new_device = dev else: if dev.online is None: @@ -210,15 +215,16 @@ def _start(device_info): rl.start() _all_listeners[device_info.path] = rl return rl - else: - _log.warn("failed to open %s", device_info) + + _log.warn("failed to open %s", device_info) def start_all(): # just in case this it called twice in a row... stop_all() - _log.info("starting receiver listening threads") + if _log.isEnabledFor(_INFO): + _log.info("starting receiver listening threads") for device_info in _base.receivers(): _process_receiver_event('add', device_info) @@ -228,7 +234,8 @@ def stop_all(): _all_listeners.clear() if listeners: - _log.info("stopping receiver listening threads %s", listeners) + if _log.isEnabledFor(_INFO): + _log.info("stopping receiver listening threads %s", listeners) for l in listeners: l.stop() @@ -266,7 +273,8 @@ def _process_receiver_event(action, device_info): assert device_info is not None assert _error_callback - _log.info("receiver event %s %s", action, device_info) + if _log.isEnabledFor(_INFO): + _log.info("receiver event %s %s", action, device_info) # whatever the action, stop any previous receivers at this path l = _all_listeners.pop(device_info.path, None)