From 19cd40cfddaab5381617dade3d190beec1b6516a Mon Sep 17 00:00:00 2001 From: Daniel Pavel Date: Wed, 12 Dec 2012 21:03:07 +0200 Subject: [PATCH] renamed 'events' to 'notifications' in order to match the name in Logitech's documentation --- app/listener.py | 30 ++--- app/solaar.py | 2 +- app/solaar_cli.py | 51 ++++---- lib/logitech/unifying_receiver/base.py | 88 +++++++------- lib/logitech/unifying_receiver/listener.py | 66 ++++++---- lib/logitech/unifying_receiver/receiver.py | 5 +- lib/logitech/unifying_receiver/status.py | 134 +++++++++++---------- 7 files changed, 202 insertions(+), 174 deletions(-) diff --git a/app/listener.py b/app/listener.py index f7ed5576..852fff87 100644 --- a/app/listener.py +++ b/app/listener.py @@ -43,7 +43,7 @@ class ReceiverListener(_listener.EventsListener): """Keeps the status of a Unifying Receiver. """ def __init__(self, receiver, status_changed_callback=None): - super(ReceiverListener, self).__init__(receiver, self._events_handler) + super(ReceiverListener, self).__init__(receiver, self._notifications_handler) self.tick_period = _POLL_TICK self._last_tick = 0 @@ -65,13 +65,13 @@ class ReceiverListener(_listener.EventsListener): receiver.register_new_device = _MethodType(_register_with_status, receiver) def has_started(self): - _log.info("events listener has started") + _log.info("notifications listener has started") self.receiver.enable_notifications() self.receiver.notify_devices() self._status_changed(self.receiver, _status.ALERT.LOW) def has_stopped(self): - _log.info("events listener has stopped") + _log.info("notifications listener has stopped") if self.receiver: self.receiver.enable_notifications(False) self.receiver.close() @@ -119,26 +119,26 @@ class ReceiverListener(_listener.EventsListener): if device.status is None: self.status_changed_callback(r) - def _events_handler(self, event): + def _notifications_handler(self, n): assert self.receiver - if event.devnumber == 0xFF: - # a receiver event + if n.devnumber == 0xFF: + # a receiver notification if self.receiver.status is not None: - self.receiver.status.process_event(event) + self.receiver.status.process_notification(n) else: - # a device event - assert event.devnumber > 0 and event.devnumber <= self.receiver.max_devices - already_known = event.devnumber in self.receiver - dev = self.receiver[event.devnumber] + # a device notification + assert n.devnumber > 0 and n.devnumber <= self.receiver.max_devices + already_known = n.devnumber in self.receiver + dev = self.receiver[n.devnumber] if dev and dev.status is not None: - dev.status.process_event(event) + dev.status.process_notification(n) if self.receiver.status.lock_open and not already_known: - # this should be the first event after a device was paired - assert event.sub_id == 0x41 and event.address == 0x04 + # this should be the first notification after a device was paired + assert n.sub_id == 0x41 and n.address == 0x04 _log.info("pairing detected new device") self.receiver.status.new_device = dev else: - _log.warn("received event %s for invalid device %d: %s", event, event.devnumber, dev) + _log.warn("received notification %s for invalid device %d: %s", n, n.devnumber, dev) def __str__(self): return '' % (self.receiver.path, self.receiver.handle) diff --git a/app/solaar.py b/app/solaar.py index 7c81cea4..e97e1d9a 100644 --- a/app/solaar.py +++ b/app/solaar.py @@ -91,7 +91,7 @@ def _run(args): from logitech.unifying_receiver import status - # callback delivering status events from the receiver/devices to the UI + # callback delivering status notifications from the receiver/devices to the UI def status_changed(receiver, device=None, alert=status.ALERT.NONE, reason=None): if alert & status.ALERT.MED: GObject.idle_add(window.present) diff --git a/app/solaar_cli.py b/app/solaar_cli.py index 65d418c1..3497ba1f 100644 --- a/app/solaar_cli.py +++ b/app/solaar_cli.py @@ -87,12 +87,13 @@ def _print_receiver(receiver, verbose=False): print (" Has %d paired device(s)." % paired_count) - notifications = receiver.request(0x8100) - if notifications: - notifications = ord(notifications[0:1]) << 16 | ord(notifications[1:2]) << 8 - if notifications: + notification_flags = receiver.request(0x8100) + if notification_flags: + notification_flags = ord(notification_flags[0:1]) << 16 | ord(notification_flags[1:2]) << 8 + if notification_flags: from logitech.unifying_receiver import hidpp10 - print (" Enabled notifications: %06X = %s." % (notifications, ', '.join(hidpp10.NOTIFICATION_FLAG.flag_names(notifications)))) + notification_names = hidpp10.NOTIFICATION_FLAG.flag_names(notification_flags) + print (" Enabled notifications: 0x%06X = %s." % (notification_flags, ', '.join(notification_names))) else: print (" All notifications disabled.") @@ -184,37 +185,41 @@ def pair_device(receiver, args): done = [False] - def _events_handler(event): - if event.devnumber == 0xFF: - r_status.process_event(event) + def _notification_handler(n): + if n.devnumber == 0xFF: + r_status.process_notification(n) if not r_status.lock_open: done[0] = True - elif event.sub_id == 0x41 and event.address == 0x04: - if event.devnumber not in known_devices: - r_status.new_device = receiver[event.devnumber] + elif n.sub_id == 0x41 and n.address == 0x04: + if n.devnumber not in known_devices: + r_status.new_device = receiver[n.devnumber] from logitech.unifying_receiver import base - base.events_hook = _events_handler + base.notifications_hook = _notification_handler # check if it's necessary to set the notification flags - notifications = receiver.request(0x8100) - if notifications: - notifications = ord(notifications[:1]) + ord(notifications[1:2]) + ord(notifications[2:3]) - if not notifications: + notification_flags = receiver.request(0x8100) + if notification_flags: + # just to see if any bits are set + notification_flags = ord(notification_flags[:1]) + ord(notification_flags[1:2]) + ord(notification_flags[2:3]) + if not notification_flags: + # if there are any notifications set, just assume the one we need is already set receiver.enable_notifications() receiver.set_lock(False, timeout=20) print ("Pairing: turn your new device on (timing out in 20 seconds).") while not done[0]: - event = base.read(receiver.handle, 2000) - if event: - event = base.make_event(*event) - if event: - _events_handler(event) + n = base.read(receiver.handle, 2000) + if n: + n = base.make_notification(*n) + if n: + _notification_handler(n) - if not notifications: + if not notification_flags: + # only clear the flags if they weren't set before, otherwise a + # concurrently running Solaar app will stop working properly receiver.enable_notifications(False) - base.events_hook = None + base.notifications_hook = None if r_status.new_device: dev = r_status.new_device diff --git a/lib/logitech/unifying_receiver/base.py b/lib/logitech/unifying_receiver/base.py index 90f47566..5393250c 100644 --- a/lib/logitech/unifying_receiver/base.py +++ b/lib/logitech/unifying_receiver/base.py @@ -218,42 +218,42 @@ def _skip_incoming(handle): # # -"""The function that may be called on incoming events. +"""The function that may be called on incoming notifications. The hook must be a callable accepting one tuple parameter, with the format ``( devnumber, request_id, data)``. This hook will only be called by the request()/ping() functions, when received replies do not match the expected request_id. As such, it is not suitable for -intercepting broadcast events from the device (e.g. special keys being pressed, -battery charge events, etc), at least not in a timely manner. +intercepting broadcast notifications from the device (e.g. special keys being +pressed, battery charge notifications, etc), at least not in a timely manner. """ -events_hook = None +notifications_hook = None def _unhandled(report_id, devnumber, data): - """Deliver a possible event to the unhandled_hook (if any).""" - if events_hook: - event = make_event(devnumber, data) - if event: - events_hook(event) + """Deliver a possible notification to the notifications_hook (if any).""" + if notifications_hook: + n = make_notification(devnumber, data) + if n: + notifications_hook(n) +def make_notification(devnumber, data): + """Guess if this is a notification (and not just a request reply), and + return a Notification tuple if it is.""" + sub_id = ord(data[:1]) + if sub_id & 0x80 != 0x80: + # HID++ 1.0 standard notifications are 0x40 - 0x7F + # HID++ 2.0 feature notifications have the SoftwareID 0 + address = ord(data[1:2]) + if sub_id >= 0x40 or address & 0x0F == 0x00: + return _HIDPP_Notification(devnumber, sub_id, address, data[2:]) from collections import namedtuple -_Event = namedtuple('_Event', ['devnumber', 'sub_id', 'address', 'data']) -_Event.__str__ = lambda self: 'Event(%d,%02X,%02X,%s)' % (self.devnumber, self.sub_id, self.address, _strhex(self.data)) -_Event.__unicode__ = _Event.__str__ +_HIDPP_Notification = namedtuple('_HIDPP_Notification', ['devnumber', 'sub_id', 'address', 'data']) +_HIDPP_Notification.__str__ = lambda self: 'Notification(%d,%02X,%02X,%s)' % (self.devnumber, self.sub_id, self.address, _strhex(self.data)) +_HIDPP_Notification.__unicode__ = _HIDPP_Notification.__str__ del namedtuple -def make_event(devnumber, data): - sub_id = ord(data[:1]) - if devnumber == 0xFF: - if sub_id == 0x4A: # receiver lock event - return _Event(devnumber, sub_id, ord(data[1:2]), data[2:]) - elif sub_id & 0x80 != 0x80: - address = ord(data[1:2]) - if sub_id >= 0x40 or address & 0x01 == 0: - return _Event(devnumber, sub_id, address, data[2:]) - def request(handle, devnumber, request_id, *params): """Makes a feature call to a device and waits for a matching reply. @@ -269,21 +269,22 @@ def request(handle, devnumber, request_id, *params): assert isinstance(request_id, int) if devnumber != 0xFF and request_id < 0x8000: timeout = _DEVICE_REQUEST_TIMEOUT - # for HID++ 2.0 feature request, randomize the swid to make it easier to - # recognize the reply for this request. also, always set the last bit - # (0) in swid, to make events easier to identify - request_id = (request_id & 0xFFF0) | _random_bits(4) | 0x01 + # for HID++ 2.0 feature requests, randomize the SoftwareId to make it + # easier to recognize the reply for this request. also, always set the + # most significant bit (8) in SoftwareId, to make notifications easier + # to distinguish from request replies + request_id = (request_id & 0xFFF0) | 0x08 | _random_bits(3) else: timeout = _RECEIVER_REQUEST_TIMEOUT - request_str = _pack(b'!H', request_id) - params = b''.join(_pack(b'B', p) if type(p) == int else p for p in params) + params = b''.join(_pack(b'B', p) if isinstance(p, int) else p for p in params) # if _log.isEnabledFor(_DEBUG): # _log.debug("(%s) device %d request_id {%04X} params [%s]", handle, devnumber, request_id, _strhex(params)) + request_data = _pack(b'!H', request_id) + params _skip_incoming(handle) ihandle = int(handle) - write(ihandle, devnumber, request_str + params) + write(ihandle, devnumber, request_data) while True: now = _timestamp() @@ -293,7 +294,7 @@ def request(handle, devnumber, request_id, *params): if reply: report_id, reply_devnumber, reply_data = reply if reply_devnumber == devnumber: - if report_id == 0x10 and reply_data[:1] == b'\x8F' and reply_data[1:3] == request_str: + if report_id == 0x10 and reply_data[:1] == b'\x8F' and reply_data[1:3] == request_data[:2]: error = ord(reply_data[3:4]) # if error == _hidpp10.ERROR.resource_error: # device unreachable @@ -304,18 +305,18 @@ def request(handle, devnumber, request_id, *params): # _log.error("(%s) device %d error on request {%04X}: unknown device", handle, devnumber, request_id) # raise NoSuchDevice(number=devnumber, request=request_id) - _log.debug("(%s) device %d error on request {%04X}: %d = %s", + _log.debug("(%s) device 0x%02X error on request {%04X}: %d = %s", handle, devnumber, request_id, error, _hidpp10.ERROR[error]) break - if reply_data[:1] == b'\xFF' and reply_data[1:3] == request_str: + if reply_data[:1] == b'\xFF' and reply_data[1:3] == request_data[:2]: # a HID++ 2.0 feature call returned with an error error = ord(reply_data[3:4]) _log.error("(%s) device %d error on feature request {%04X}: %d = %s", handle, devnumber, request_id, error, _hidpp20.ERROR[error]) raise _hidpp20.FeatureCallError(number=devnumber, request=request_id, error=error, params=params) - if reply_data[:2] == request_str: + if reply_data[:2] == request_data[:2]: if request_id & 0xFF00 == 0x8300: # long registry r/w should return a long reply assert report_id == 0x11 @@ -329,7 +330,7 @@ def request(handle, devnumber, request_id, *params): if reply_data[2:3] == params[:1]: return reply_data[2:] else: - # hm, not mathing my request, and certainly not an event + # hm, not mathing my request, and certainly not a notification continue else: return reply_data[2:] @@ -352,16 +353,15 @@ def ping(handle, devnumber): if _log.isEnabledFor(_DEBUG): _log.debug("(%s) pinging device %d", handle, devnumber) + # randomize the SoftwareId and mark byte to be able to identify the ping + # reply, and set most significant (0x8) bit in SoftwareId so that the reply + # is always distinguishable from notifications + request_id = 0x0018 | _random_bits(3) + request_data = _pack(b'!HBBB', request_id, 0, 0, _random_bits(8)) + _skip_incoming(handle) ihandle = int(handle) - - # randomize the swid and mark byte to positively identify the ping reply, - # and set the last (0) bit in swid to make it easier to distinguish requests - # from events - request_id = 0x0010 | _random_bits(4) | 0x01 - request_str = _pack(b'!H', request_id) - ping_mark = _pack(b'B', _random_bits(8)) - write(ihandle, devnumber, request_str + b'\x00\x00' + ping_mark) + write(ihandle, devnumber, request_data) while True: now = _timestamp() @@ -371,11 +371,11 @@ def ping(handle, devnumber): if reply: report_id, number, data = reply if number == devnumber: - if data[:2] == request_str and data[4:5] == ping_mark: + if data[:2] == request_data[:2] and data[4:5] == request_data[-1:]: # HID++ 2.0+ device, currently connected return ord(data[2:3]) + ord(data[3:4]) / 10.0 - if report_id == 0x10 and data[:1] == b'\x8F' and data[1:3] == request_str: + if report_id == 0x10 and data[:1] == b'\x8F' and data[1:3] == request_data[:2]: assert data[-1:] == b'\x00' error = ord(data[3:4]) diff --git a/lib/logitech/unifying_receiver/listener.py b/lib/logitech/unifying_receiver/listener.py index 4df2cff7..e217831a 100644 --- a/lib/logitech/unifying_receiver/listener.py +++ b/lib/logitech/unifying_receiver/listener.py @@ -24,7 +24,10 @@ from . import base as _base # class ThreadedHandle(object): - """A thread-local wrapper with different open handles for each thread.""" + """A thread-local wrapper with different open handles for each thread. + + Closing a ThreadedHandle will close all handles. + """ __slots__ = ['path', '_local', '_handles'] @@ -85,30 +88,40 @@ class ThreadedHandle(object): # # +# How long to wait during a read for the next packet. +# Ideally this should be rather long (10s ?), but the read is blocking +# and this means that when the thread is signalled to stop, it would take +# a while for it to acknowledge it. _EVENT_READ_TIMEOUT = 500 + +# After this many read that did not produce a packet, call the tick() method. _IDLE_READS = 4 class EventsListener(_threading.Thread): - """Listener thread for events from the Unifying Receiver. + """Listener thread for notifications from the Unifying Receiver. Incoming packets will be passed to the callback function in sequence. """ - def __init__(self, receiver, events_callback): + def __init__(self, receiver, notifications_callback): super(EventsListener, self).__init__(name=self.__class__.__name__) self.daemon = True self._active = False self.receiver = receiver - self._queued_events = _Queue(32) - self._events_callback = events_callback + self._queued_notifications = _Queue(32) + self._notifications_callback = notifications_callback self.tick_period = 0 def run(self): self._active = True - _base.events_hook = self._events_hook + + # This is necessary because notification packets might be received + # during requests made by our callback. + _base.notifications_hook = self._notifications_hook + ihandle = int(self.receiver.handle) _log.info("started with %s (%d)", self.receiver, ihandle) @@ -118,28 +131,28 @@ class EventsListener(_threading.Thread): idle_reads = 0 while self._active: - if self._queued_events.empty(): + if self._queued_notifications.empty(): try: - # _log.debug("read next event") - event = _base.read(ihandle, _EVENT_READ_TIMEOUT) + # _log.debug("read next notification") + n = _base.read(ihandle, _EVENT_READ_TIMEOUT) except _base.NoReceiver: _log.warning("receiver disconnected") self.receiver.close() break - if event: - event = _base.make_event(*event) + if n: + n = _base.make_notification(*n) else: - # deliver any queued events - event = self._queued_events.get() + # deliver any queued notifications + n = self._queued_notifications.get() - if event: + if n: # if _log.isEnabledFor(_DEBUG): - # _log.debug("processing event %s", event) + # _log.debug("processing notification %s", n) try: - self._events_callback(event) + self._notifications_callback(n) except: - _log.exception("processing event %s", event) + _log.exception("processing notification %s", n) elif self.tick_period: idle_reads += 1 if idle_reads % _IDLE_READS == 0: @@ -149,8 +162,8 @@ class EventsListener(_threading.Thread): last_tick = now self.tick(now) - _base.unhandled_hook = None - del self._queued_events + _base.notifications_hook = None + del self._queued_notifications self.has_stopped() @@ -159,7 +172,8 @@ class EventsListener(_threading.Thread): self._active = False def has_started(self): - """Called right after the thread has started.""" + """Called right after the thread has started, and before it starts + reading notification packets.""" pass def has_stopped(self): @@ -167,16 +181,16 @@ class EventsListener(_threading.Thread): pass def tick(self, timestamp): - """Called about every tick_period seconds, if set.""" + """Called about every tick_period seconds.""" pass - def _events_hook(self, event): - # only consider unhandled events that were sent from this thread, - # i.e. triggered during a callback of a previous event + def _notifications_hook(self, n): + # Only consider unhandled notifications that were sent from this thread, + # i.e. triggered by a callback handling a previous notification. if self._active and _threading.current_thread() == self: if _log.isEnabledFor(_DEBUG): - _log.debug("queueing unhandled event %s", event) - self._queued_events.put(event) + _log.debug("queueing unhandled notification %s", n) + self._queued_notifications.put(n) def __bool__(self): return bool(self._active and self.receiver) diff --git a/lib/logitech/unifying_receiver/receiver.py b/lib/logitech/unifying_receiver/receiver.py index ed8f33e7..a72bebb8 100644 --- a/lib/logitech/unifying_receiver/receiver.py +++ b/lib/logitech/unifying_receiver/receiver.py @@ -247,7 +247,8 @@ class Receiver(object): return self._firmware def enable_notifications(self, enable=True): - """Enable or disable device (dis)connection events on this receiver.""" + """Enable or disable device (dis)connection notifications on this + receiver.""" if not self.handle: return False if enable: @@ -267,7 +268,7 @@ class Receiver(object): """Scan all devices.""" if self.handle: if not self.request(0x8002, 0x02): - _log.warn("failed to trigger device events") + _log.warn("failed to trigger device link notifications") def register_new_device(self, number): if self._devices.get(number) is not None: diff --git a/lib/logitech/unifying_receiver/status.py b/lib/logitech/unifying_receiver/status.py index a7a87a17..c34dec1c 100644 --- a/lib/logitech/unifying_receiver/status.py +++ b/lib/logitech/unifying_receiver/status.py @@ -62,16 +62,16 @@ class ReceiverStatus(dict): # self.updated = _timestamp() self._changed_callback(self._receiver, alert=alert, reason=reason) - def process_event(self, event): - if event.sub_id == 0x4A: - self.lock_open = bool(event.address & 0x01) + def process_notification(self, n): + if n.sub_id == 0x4A: + self.lock_open = bool(n.address & 0x01) reason = 'pairing lock is ' + ('open' if self.lock_open else 'closed') _log.info("%s: %s", self._receiver, reason) if self.lock_open: self[ERROR] = None self.new_device = None - pair_error = ord(event.data[:1]) + pair_error = ord(n.data[:1]) if pair_error: self[ERROR] = _hidpp10.PAIRING_ERRORS[pair_error] self.new_device = None @@ -98,15 +98,27 @@ class DeviceStatus(dict): self.updated = 0 def __str__(self): - t = [] - if self.get(BATTERY_LEVEL) is not None: - b = 'Battery: %d%%' % self[BATTERY_LEVEL] - if self.get(BATTERY_STATUS): - b += ' (' + self[BATTERY_STATUS] + ')' - t.append(b) - if self.get(LIGHT_LEVEL) is not None: - t.append('Light: %d lux' % self[LIGHT_LEVEL]) - return ', '.join(t) + def _item(name, format): + value = self.get(name) + if value is not None: + return format % value + + def _items(): + battery_level = _item(BATTERY_LEVEL, 'Battery: %d%%') + if battery_level: + yield battery_level + battery_status = _item(BATTERY_STATUS, ' (%s)') + if battery_status: + yield battery_status + + light_level = _item(LIGHT_LEVEL, 'Light: %d lux') + if light_level: + if battery_level: + yield ', ' + yield light_level + + return ''.join(i for i in _items()) + __unicode__ = __str__ def __bool__(self): @@ -156,25 +168,25 @@ class DeviceStatus(dict): self.clear() self._changed(active=False, alert=ALERT.LOW, timestamp=timestamp) - def process_event(self, event): - assert event.sub_id < 0x80 + def process_notification(self, n): + assert n.sub_id < 0x80 - if event.sub_id == 0x40: - if event.address == 0x02: + if n.sub_id == 0x40: + if n.address == 0x02: # device un-paired self.clear() self._device.status = None self._changed(False, ALERT.HIGH, 'unpaired') else: - _log.warn("device %d disconnection notification %s with unknown type %02X", self._device.number, event, event.address) + _log.warn("device %d disconnection notification %s with unknown type %02X", self._device.number, n, n.address) return True - if event.sub_id == 0x41: - if event.address == 0x04: # unifying protocol - # wpid = _strhex(event.data[4:5] + event.data[3:4]) + if n.sub_id == 0x41: + if n.address == 0x04: # unifying protocol + # wpid = _strhex(n.data[4:5] + n.data[3:4]) # assert wpid == device.wpid - flags = ord(event.data[:1]) & 0xF0 + flags = ord(n.data[:1]) & 0xF0 link_encrypyed = bool(flags & 0x20) link_established = not (flags & 0x40) if _log.isEnabledFor(_DEBUG): @@ -185,47 +197,43 @@ class DeviceStatus(dict): self[ENCRYPTED] = link_encrypyed self._changed(link_established) - elif event.address == 0x03: - _log.warn("device %d connection notification %s with eQuad protocol, ignored", self._device.number, event) + elif n.address == 0x03: + _log.warn("device %d connection notification %s with eQuad protocol, ignored", self._device.number, n) else: - _log.warn("device %d connection notification %s with unknown protocol %02X", self._device.number, event, event.address) + _log.warn("device %d connection notification %s with unknown protocol %02X", self._device.number, n, n.address) return True - if event.sub_id == 0x49: + if n.sub_id == 0x49: # raw input event? just ignore it - # if event.address == 0x01, no idea what it is - # if event.address == 0x03, it's an actual input event + # if n.address == 0x01, no idea what it is, but they keep on coming + # if n.address == 0x03, it's an actual input event return True - if event.sub_id == 0x4B: - if event.address == 0x01: - _log.debug("device came online %d", event.devnumber) + if n.sub_id == 0x4B: + if n.address == 0x01: + _log.debug("device came online %d", n.devnumber) self._changed(alert=ALERT.LOW, reason='powered on') else: - _log.warn("unknown event %s", event) + _log.warn("unknown notification %s", n) return True - if event.sub_id >= 0x40: - _log.warn("don't know how to handle event %s", event) - return False - - # this must be a feature event, assuming no device has more than 0x40 features - if event.sub_id >= len(self._device.features): - _log.warn("device %d got event from unknown feature index %02X", self._device.number, event.sub_id) + # this must be a feature notification, assuming no device has more than 0x40 features + if n.sub_id >= len(self._device.features): + _log.warn("device %s got notification from invalid feature index %02X", self._device, n.sub_id) return False try: - feature = self._device.features[event.sub_id] + feature = self._device.features[n.sub_id] except IndexError: - _log.warn("don't know how to handle event %s for feature with invalid index %02X", event, event.sub_id) + _log.warn("device %s got notification from invalid feature index %02X", self._device, n.sub_id) return False if feature == _hidpp20.FEATURE.BATTERY: - if event.address == 0x00: - discharge = ord(event.data[:1]) - battery_status = ord(event.data[1:2]) + if n.address == 0x00: + discharge = ord(n.data[:1]) + battery_status = ord(n.data[1:2]) self[BATTERY_LEVEL] = discharge self[BATTERY_STATUS] = BATTERY_STATUS[battery_status] if _hidpp20.BATTERY_OK(battery_status): @@ -236,40 +244,40 @@ class DeviceStatus(dict): reason = self[ERROR] = self[BATTERY_STATUS] self._changed(alert=alert, reason=reason) else: - _log.warn("don't know how to handle BATTERY event %s", event) + _log.warn("don't know how to handle BATTERY notification %s", n) return True if feature == _hidpp20.FEATURE.REPROGRAMMABLE_KEYS: - if event.address == 0x00: - _log.debug('reprogrammable key: %s', event) + if n.address == 0x00: + _log.warn('unknown reprogrammable key: %s', n) else: - _log.warn("don't know how to handle REPROGRAMMABLE KEYS event %s", event) + _log.warn("don't know how to handle REPROGRAMMABLE KEYS notification %s", n) return True if feature == _hidpp20.FEATURE.WIRELESS: - if event.address == 0x00: - _log.debug("wireless status: %s", event) - if event.data[0:3] == b'\x01\x01\x01': + if n.address == 0x00: + _log.debug("wireless status: %s", n) + if n.data[0:3] == b'\x01\x01\x01': self._changed(alert=ALERT.LOW, reason='powered on') else: - _log.warn("don't know how to handle WIRELESS event %s", event) + _log.warn("don't know how to handle WIRELESS notification %s", n) return True if feature == _hidpp20.FEATURE.SOLAR_CHARGE: - if event.data[5:9] == b'GOOD': - charge, lux, adc = _unpack(b'!BHH', event.data[:5]) + if n.data[5:9] == b'GOOD': + charge, lux, adc = _unpack(b'!BHH', n.data[:5]) self[BATTERY_LEVEL] = charge # guesstimate the battery voltage, emphasis on 'guess' self[BATTERY_STATUS] = '%1.2fV' % (adc * 2.67793237653 / 0x0672) - if event.address == 0x00: + if n.address == 0x00: self[LIGHT_LEVEL] = None self._changed() - elif event.address == 0x10: + elif n.address == 0x10: self[LIGHT_LEVEL] = lux if lux > 200: # guesstimate self[BATTERY_STATUS] += ', charging' self._changed() - elif event.address == 0x20: + elif n.address == 0x20: _log.debug("Solar key pressed") # first cancel any reporting self._device.feature_request(_hidpp20.FEATURE.SOLAR_CHARGE) @@ -281,17 +289,17 @@ class DeviceStatus(dict): else: self._changed() else: - _log.warn("SOLAR CHARGE event not GOOD? %s", event) + _log.warn("SOLAR CHARGE notification not GOOD? %s", n) return True if feature == _hidpp20.FEATURE.TOUCH_MOUSE: - if event.address == 0x00: - _log.debug("TOUCH MOUSE points event: %s", event) - elif event.address == 0x10: - touch = ord(event.data[:1]) + if n.address == 0x00: + _log.debug("TOUCH MOUSE points notification: %s", n) + elif n.address == 0x10: + touch = ord(n.data[:1]) button_down = bool(touch & 0x02) mouse_lifted = bool(touch & 0x01) _log.debug("TOUCH MOUSE status: button_down=%s mouse_lifted=%s", button_down, mouse_lifted) return True - _log.warn("don't know how to handle event %s for feature %s (%02X)", event, feature, event.sub_id) + _log.warn("don't know how to handle %s for feature %s (%02X)", n, feature, n.sub_id)