only dispatch notification events from the specialized listener thread

This commit is contained in:
Daniel Pavel 2013-04-28 15:09:09 +02:00
parent 6f0b61e6d8
commit 897dffc426
3 changed files with 55 additions and 57 deletions

View File

@ -192,13 +192,15 @@ def _read(handle, timeout):
return report_id, devnumber, data[2:]
#
#
#
def _skip_incoming(handle):
def _skip_incoming(handle, ihandle, notifications_hook):
"""Read anything already in the input buffer.
Used by request() and ping() before their write.
"""
ihandle = int(handle)
while True:
try:
@ -209,37 +211,18 @@ def _skip_incoming(handle):
raise NoReceiver(reason=reason)
if data:
_unhandled(report_id, ord(data[1:2]), data[2:])
if _log.isEnabledFor(_DEBUG):
report_id = ord(data[:1])
assert (report_id == 0x10 and len(data) == _SHORT_MESSAGE_SIZE or
report_id == 0x11 and len(data) == _LONG_MESSAGE_SIZE or
report_id == 0x20 and len(data) == _MEDIUM_MESSAGE_SIZE)
if notifications_hook:
n = make_notification(ord(data[1:2]), data[2:])
if n:
notifications_hook(n)
else:
return
#
#
#
"""The function that may be called on incoming notifications.
The hook must be a callable accepting one tuple parameter, with the format
``(<int> devnumber, <bytes[2]> request_id, <bytes> data)``.
This hook will only be called by the request()/ping() functions, when received
replies do not match the expected request_id. As such, it is not suitable for
intercepting broadcast notifications from the device (e.g. special keys being
pressed, battery charge notifications, etc), at least not in a timely manner.
"""
notifications_hook = None
def _unhandled(report_id, devnumber, data):
"""Deliver a possible notification to the notifications_hook (if any)."""
if notifications_hook:
n = make_notification(devnumber, data)
if n:
notifications_hook(n)
def make_notification(devnumber, data):
"""Guess if this is a notification (and not just a request reply), and
@ -258,6 +241,9 @@ _HIDPP_Notification.__str__ = lambda self: 'Notification(%d,%02X,%02X,%s)' % (se
_HIDPP_Notification.__unicode__ = _HIDPP_Notification.__str__
del namedtuple
#
#
#
def request(handle, devnumber, request_id, *params):
"""Makes a feature call to a device and waits for a matching reply.
@ -290,8 +276,9 @@ def request(handle, devnumber, request_id, *params):
# _log.debug("(%s) device %d request_id {%04X} params [%s]", handle, devnumber, request_id, _strhex(params))
request_data = _pack(b'!H', request_id) + params
_skip_incoming(handle)
ihandle = int(handle)
notifications_hook = getattr(handle, 'notifications_hook', None)
_skip_incoming(handle, ihandle, notifications_hook)
write(ihandle, devnumber, request_data)
while True:
@ -345,7 +332,10 @@ def request(handle, devnumber, request_id, *params):
else:
return reply_data[2:]
_unhandled(report_id, reply_devnumber, reply_data)
if notifications_hook:
n = make_notification(reply_devnumber, reply_data)
if n:
notifications_hook(n)
if delta >= timeout:
_log.warn("timeout on device %d request {%04X} params[%s]", devnumber, request_id, _strhex(params))
@ -370,13 +360,14 @@ def ping(handle, devnumber):
request_id = 0x0018 | _random_bits(3)
request_data = _pack(b'!HBBB', request_id, 0, 0, _random_bits(8))
_skip_incoming(handle)
ihandle = int(handle)
notifications_hook = getattr(handle, 'notifications_hook', None)
_skip_incoming(handle, ihandle, notifications_hook)
write(ihandle, devnumber, request_data)
while True:
now = _timestamp()
reply = _read(ihandle, _PING_TIMEOUT)
reply = _read(handle, _PING_TIMEOUT)
delta = _timestamp() - now
if reply:
@ -401,7 +392,10 @@ def ping(handle, devnumber):
_log.error("(%s) device %d error on ping request: unknown device", handle, devnumber)
raise NoSuchDevice(number=devnumber, request=request_id)
_unhandled(report_id, number, data)
if notifications_hook:
n = make_notification(reply_devnumber, reply_data)
if n:
notifications_hook(n)
if delta >= _PING_TIMEOUT:
_log.warn("(%s) timeout on device %d ping", handle, devnumber)

View File

@ -23,24 +23,25 @@ from . import base as _base
#
#
class ThreadedHandle(object):
class _ThreadedHandle(object):
"""A thread-local wrapper with different open handles for each thread.
Closing a ThreadedHandle will close all handles.
"""
__slots__ = ['path', '_local', '_handles']
__slots__ = ['path', '_local', '_handles', '_listener']
def __init__(self, initial_handle, path):
assert initial_handle
if type(initial_handle) != int:
raise TypeError('expected int as initial handle, got %r' % initial_handle)
def __init__(self, listener, path, handle):
assert listener is not None
assert path is not None
assert handle is not None
assert type(handle) == int
assert path
self._listener = listener
self.path = path
self._local = _threading.local()
self._local.handle = initial_handle
self._handles = [initial_handle]
self._local.handle = handle
self._handles = [handle]
def _open(self):
handle = _base.open_path(self.path)
@ -61,7 +62,15 @@ class ThreadedHandle(object):
for h in handles:
_base.close(h)
@property
def notifications_hook(self):
if self._listener:
assert isinstance(self._listener, _threading.Thread)
if _threading.current_thread() == self._listener:
return self._listener._notifications_hook
def __del__(self):
self._listener = None
self.close()
def __index__(self):
@ -106,11 +115,14 @@ class EventsListener(_threading.Thread):
def __init__(self, receiver, notifications_callback):
super(EventsListener, self).__init__(name=self.__class__.__name__)
# replace the handle with a threaded one
receiver.handle = _ThreadedHandle(self, receiver.path, receiver.handle)
self.daemon = True
self._active = False
self.receiver = receiver
self._queued_notifications = _Queue(32)
self._queued_notifications = _Queue(16)
self._notifications_callback = notifications_callback
self.tick_period = 0
@ -118,10 +130,6 @@ class EventsListener(_threading.Thread):
def run(self):
self._active = True
# This is necessary because notification packets might be received
# during requests made by our callback.
_base.notifications_hook = self._notifications_hook
ihandle = int(self.receiver.handle)
_log.info("started with %s (%d)", self.receiver, ihandle)
@ -163,9 +171,7 @@ class EventsListener(_threading.Thread):
last_tick = now
self.tick(now)
_base.notifications_hook = None
del self._queued_notifications
self.has_stopped()
def stop(self):
@ -188,9 +194,10 @@ class EventsListener(_threading.Thread):
def _notifications_hook(self, n):
# Only consider unhandled notifications that were sent from this thread,
# i.e. triggered by a callback handling a previous notification.
if self._active and _threading.current_thread() == self:
if _log.isEnabledFor(_DEBUG):
_log.debug("queueing unhandled %s", n)
assert _threading.current_thread() == self
if self._active: # and _threading.current_thread() == self:
# if _log.isEnabledFor(_DEBUG):
# _log.debug("queueing unhandled %s", n)
self._queued_notifications.put(n)
def __bool__(self):

View File

@ -37,19 +37,15 @@ _POLL_TICK = 60 # seconds
class ReceiverListener(_listener.EventsListener):
"""Keeps the status of a Unifying Receiver.
"""Keeps the status of a Receiver.
"""
def __init__(self, receiver, status_changed_callback=None):
def __init__(self, receiver, status_changed_callback):
super(ReceiverListener, self).__init__(receiver, self._notifications_handler)
self.tick_period = _POLL_TICK
self._last_tick = 0
assert status_changed_callback
self.status_changed_callback = status_changed_callback
# make it a bit similar with the regular devices
receiver.kind = None
# replace the
receiver.handle = _listener.ThreadedHandle(receiver.handle, receiver.path)
receiver.status = _status.ReceiverStatus(receiver, self._status_changed)
def has_started(self):
@ -150,7 +146,8 @@ class ReceiverListener(_listener.EventsListener):
__unicode__ = __str__
@classmethod
def open(self, status_changed_callback=None):
def open(self, status_changed_callback):
assert status_changed_callback
receiver = Receiver.open()
if receiver:
rl = ReceiverListener(receiver, status_changed_callback)