From 897dffc42630df0309c7824f5b1bde3e897c9894 Mon Sep 17 00:00:00 2001 From: Daniel Pavel Date: Sun, 28 Apr 2013 15:09:09 +0200 Subject: [PATCH] only dispatch notification events from the specialized listener thread --- lib/logitech/unifying_receiver/base.py | 54 ++++++++++------------ lib/logitech/unifying_receiver/listener.py | 45 ++++++++++-------- lib/solaar/listener.py | 13 ++---- 3 files changed, 55 insertions(+), 57 deletions(-) diff --git a/lib/logitech/unifying_receiver/base.py b/lib/logitech/unifying_receiver/base.py index 0d9b6957..86a4ad64 100644 --- a/lib/logitech/unifying_receiver/base.py +++ b/lib/logitech/unifying_receiver/base.py @@ -192,13 +192,15 @@ def _read(handle, timeout): return report_id, devnumber, data[2:] +# +# +# -def _skip_incoming(handle): +def _skip_incoming(handle, ihandle, notifications_hook): """Read anything already in the input buffer. Used by request() and ping() before their write. """ - ihandle = int(handle) while True: try: @@ -209,37 +211,18 @@ def _skip_incoming(handle): raise NoReceiver(reason=reason) if data: - _unhandled(report_id, ord(data[1:2]), data[2:]) if _log.isEnabledFor(_DEBUG): report_id = ord(data[:1]) assert (report_id == 0x10 and len(data) == _SHORT_MESSAGE_SIZE or report_id == 0x11 and len(data) == _LONG_MESSAGE_SIZE or report_id == 0x20 and len(data) == _MEDIUM_MESSAGE_SIZE) + if notifications_hook: + n = make_notification(ord(data[1:2]), data[2:]) + if n: + notifications_hook(n) else: return -# -# -# - -"""The function that may be called on incoming notifications. - -The hook must be a callable accepting one tuple parameter, with the format -``( devnumber, request_id, data)``. - -This hook will only be called by the request()/ping() functions, when received -replies do not match the expected request_id. As such, it is not suitable for -intercepting broadcast notifications from the device (e.g. special keys being -pressed, battery charge notifications, etc), at least not in a timely manner. -""" -notifications_hook = None - -def _unhandled(report_id, devnumber, data): - """Deliver a possible notification to the notifications_hook (if any).""" - if notifications_hook: - n = make_notification(devnumber, data) - if n: - notifications_hook(n) def make_notification(devnumber, data): """Guess if this is a notification (and not just a request reply), and @@ -258,6 +241,9 @@ _HIDPP_Notification.__str__ = lambda self: 'Notification(%d,%02X,%02X,%s)' % (se _HIDPP_Notification.__unicode__ = _HIDPP_Notification.__str__ del namedtuple +# +# +# def request(handle, devnumber, request_id, *params): """Makes a feature call to a device and waits for a matching reply. @@ -290,8 +276,9 @@ def request(handle, devnumber, request_id, *params): # _log.debug("(%s) device %d request_id {%04X} params [%s]", handle, devnumber, request_id, _strhex(params)) request_data = _pack(b'!H', request_id) + params - _skip_incoming(handle) ihandle = int(handle) + notifications_hook = getattr(handle, 'notifications_hook', None) + _skip_incoming(handle, ihandle, notifications_hook) write(ihandle, devnumber, request_data) while True: @@ -345,7 +332,10 @@ def request(handle, devnumber, request_id, *params): else: return reply_data[2:] - _unhandled(report_id, reply_devnumber, reply_data) + if notifications_hook: + n = make_notification(reply_devnumber, reply_data) + if n: + notifications_hook(n) if delta >= timeout: _log.warn("timeout on device %d request {%04X} params[%s]", devnumber, request_id, _strhex(params)) @@ -370,13 +360,14 @@ def ping(handle, devnumber): request_id = 0x0018 | _random_bits(3) request_data = _pack(b'!HBBB', request_id, 0, 0, _random_bits(8)) - _skip_incoming(handle) ihandle = int(handle) + notifications_hook = getattr(handle, 'notifications_hook', None) + _skip_incoming(handle, ihandle, notifications_hook) write(ihandle, devnumber, request_data) while True: now = _timestamp() - reply = _read(ihandle, _PING_TIMEOUT) + reply = _read(handle, _PING_TIMEOUT) delta = _timestamp() - now if reply: @@ -401,7 +392,10 @@ def ping(handle, devnumber): _log.error("(%s) device %d error on ping request: unknown device", handle, devnumber) raise NoSuchDevice(number=devnumber, request=request_id) - _unhandled(report_id, number, data) + if notifications_hook: + n = make_notification(reply_devnumber, reply_data) + if n: + notifications_hook(n) if delta >= _PING_TIMEOUT: _log.warn("(%s) timeout on device %d ping", handle, devnumber) diff --git a/lib/logitech/unifying_receiver/listener.py b/lib/logitech/unifying_receiver/listener.py index 57436e49..d56c3a66 100644 --- a/lib/logitech/unifying_receiver/listener.py +++ b/lib/logitech/unifying_receiver/listener.py @@ -23,24 +23,25 @@ from . import base as _base # # -class ThreadedHandle(object): +class _ThreadedHandle(object): """A thread-local wrapper with different open handles for each thread. Closing a ThreadedHandle will close all handles. """ - __slots__ = ['path', '_local', '_handles'] + __slots__ = ['path', '_local', '_handles', '_listener'] - def __init__(self, initial_handle, path): - assert initial_handle - if type(initial_handle) != int: - raise TypeError('expected int as initial handle, got %r' % initial_handle) + def __init__(self, listener, path, handle): + assert listener is not None + assert path is not None + assert handle is not None + assert type(handle) == int - assert path + self._listener = listener self.path = path self._local = _threading.local() - self._local.handle = initial_handle - self._handles = [initial_handle] + self._local.handle = handle + self._handles = [handle] def _open(self): handle = _base.open_path(self.path) @@ -61,7 +62,15 @@ class ThreadedHandle(object): for h in handles: _base.close(h) + @property + def notifications_hook(self): + if self._listener: + assert isinstance(self._listener, _threading.Thread) + if _threading.current_thread() == self._listener: + return self._listener._notifications_hook + def __del__(self): + self._listener = None self.close() def __index__(self): @@ -106,11 +115,14 @@ class EventsListener(_threading.Thread): def __init__(self, receiver, notifications_callback): super(EventsListener, self).__init__(name=self.__class__.__name__) + # replace the handle with a threaded one + receiver.handle = _ThreadedHandle(self, receiver.path, receiver.handle) + self.daemon = True self._active = False self.receiver = receiver - self._queued_notifications = _Queue(32) + self._queued_notifications = _Queue(16) self._notifications_callback = notifications_callback self.tick_period = 0 @@ -118,10 +130,6 @@ class EventsListener(_threading.Thread): def run(self): self._active = True - # This is necessary because notification packets might be received - # during requests made by our callback. - _base.notifications_hook = self._notifications_hook - ihandle = int(self.receiver.handle) _log.info("started with %s (%d)", self.receiver, ihandle) @@ -163,9 +171,7 @@ class EventsListener(_threading.Thread): last_tick = now self.tick(now) - _base.notifications_hook = None del self._queued_notifications - self.has_stopped() def stop(self): @@ -188,9 +194,10 @@ class EventsListener(_threading.Thread): def _notifications_hook(self, n): # Only consider unhandled notifications that were sent from this thread, # i.e. triggered by a callback handling a previous notification. - if self._active and _threading.current_thread() == self: - if _log.isEnabledFor(_DEBUG): - _log.debug("queueing unhandled %s", n) + assert _threading.current_thread() == self + if self._active: # and _threading.current_thread() == self: + # if _log.isEnabledFor(_DEBUG): + # _log.debug("queueing unhandled %s", n) self._queued_notifications.put(n) def __bool__(self): diff --git a/lib/solaar/listener.py b/lib/solaar/listener.py index a58302e3..594f1c52 100644 --- a/lib/solaar/listener.py +++ b/lib/solaar/listener.py @@ -37,19 +37,15 @@ _POLL_TICK = 60 # seconds class ReceiverListener(_listener.EventsListener): - """Keeps the status of a Unifying Receiver. + """Keeps the status of a Receiver. """ - def __init__(self, receiver, status_changed_callback=None): + def __init__(self, receiver, status_changed_callback): super(ReceiverListener, self).__init__(receiver, self._notifications_handler) self.tick_period = _POLL_TICK self._last_tick = 0 + assert status_changed_callback self.status_changed_callback = status_changed_callback - - # make it a bit similar with the regular devices - receiver.kind = None - # replace the - receiver.handle = _listener.ThreadedHandle(receiver.handle, receiver.path) receiver.status = _status.ReceiverStatus(receiver, self._status_changed) def has_started(self): @@ -150,7 +146,8 @@ class ReceiverListener(_listener.EventsListener): __unicode__ = __str__ @classmethod - def open(self, status_changed_callback=None): + def open(self, status_changed_callback): + assert status_changed_callback receiver = Receiver.open() if receiver: rl = ReceiverListener(receiver, status_changed_callback)