receiver: serialize requests per handle so that threads do not receive response for a different request

This commit is contained in:
Peter F. Patel-Schneider 2021-07-11 19:12:32 -04:00
parent d898edc4a3
commit 538ab9c947
2 changed files with 165 additions and 139 deletions

View File

@ -22,8 +22,12 @@
from __future__ import absolute_import, division, print_function, unicode_literals from __future__ import absolute_import, division, print_function, unicode_literals
import threading as _threading
from collections import namedtuple from collections import namedtuple
from contextlib import contextmanager
from logging import DEBUG as _DEBUG from logging import DEBUG as _DEBUG
from logging import INFO as _INFO
from logging import getLogger from logging import getLogger
from random import getrandbits as _random_bits from random import getrandbits as _random_bits
from time import time as _timestamp from time import time as _timestamp
@ -350,6 +354,31 @@ del namedtuple
# #
# #
request_lock = _threading.Lock() # serialize all requests
handles_lock = {}
def handle_lock(handle):
with request_lock:
if handles_lock.get(int(handle)) is None:
if _log.isEnabledFor(_INFO):
_log.info('New lock %s', int(handle))
handles_lock[int(handle)] = _threading.Lock() # Serialize requests on the handle
return handles_lock[int(handle)]
# context manager for locks with a timeout
@contextmanager
def acquire_timeout(lock, handle, timeout):
result = lock.acquire(timeout=timeout)
try:
if not result:
_log.error('lock on handle %d not acquired, probably due to timeout', int(handle))
yield result
finally:
if result:
lock.release()
# a very few requests (e.g., host switching) do not expect a reply, but use no_reply=True with extreme caution # a very few requests (e.g., host switching) do not expect a reply, but use no_reply=True with extreme caution
def request(handle, devnumber, request_id, *params, no_reply=False, return_error=False, long_message=False): def request(handle, devnumber, request_id, *params, no_reply=False, return_error=False, long_message=False):
@ -364,119 +393,113 @@ def request(handle, devnumber, request_id, *params, no_reply=False, return_error
# import inspect as _inspect # import inspect as _inspect
# print ('\n '.join(str(s) for s in _inspect.stack())) # print ('\n '.join(str(s) for s in _inspect.stack()))
assert isinstance(request_id, int) with acquire_timeout(handle_lock(handle), handle, 10.):
if devnumber != 0xFF and request_id < 0x8000: assert isinstance(request_id, int)
# For HID++ 2.0 feature requests, randomize the SoftwareId to make it if devnumber != 0xFF and request_id < 0x8000:
# easier to recognize the reply for this request. also, always set the # For HID++ 2.0 feature requests, randomize the SoftwareId to make it
# most significant bit (8) in SoftwareId, to make notifications easier # easier to recognize the reply for this request. also, always set the
# to distinguish from request replies. # most significant bit (8) in SoftwareId, to make notifications easier
# This only applies to peripheral requests, ofc. # to distinguish from request replies.
request_id = (request_id & 0xFFF0) | 0x08 | _random_bits(3) # This only applies to peripheral requests, ofc.
request_id = (request_id & 0xFFF0) | 0x08 | _random_bits(3)
timeout = _RECEIVER_REQUEST_TIMEOUT if devnumber == 0xFF else _DEVICE_REQUEST_TIMEOUT timeout = _RECEIVER_REQUEST_TIMEOUT if devnumber == 0xFF else _DEVICE_REQUEST_TIMEOUT
# be extra patient on long register read # be extra patient on long register read
if request_id & 0xFF00 == 0x8300: if request_id & 0xFF00 == 0x8300:
timeout *= 2 timeout *= 2
if params: if params:
params = b''.join(_pack('B', p) if isinstance(p, int) else p for p in params) params = b''.join(_pack('B', p) if isinstance(p, int) else p for p in params)
else: else:
params = b'' params = b''
# if _log.isEnabledFor(_DEBUG): # if _log.isEnabledFor(_DEBUG):
# _log.debug("(%s) device %d request_id {%04X} params [%s]", handle, devnumber, request_id, _strhex(params)) # _log.debug("(%s) device %d request_id {%04X} params [%s]", handle, devnumber, request_id, _strhex(params))
request_data = _pack('!H', request_id) + params request_data = _pack('!H', request_id) + params
ihandle = int(handle) ihandle = int(handle)
notifications_hook = getattr(handle, 'notifications_hook', None) notifications_hook = getattr(handle, 'notifications_hook', None)
try: try:
_skip_incoming(handle, ihandle, notifications_hook) _skip_incoming(handle, ihandle, notifications_hook)
except NoReceiver: except NoReceiver:
_log.warn('device or receiver disconnected') _log.warn('device or receiver disconnected')
return None return None
write(ihandle, devnumber, request_data, long_message) write(ihandle, devnumber, request_data, long_message)
if no_reply: if no_reply:
return None return None
# we consider timeout from this point # we consider timeout from this point
request_started = _timestamp() request_started = _timestamp()
delta = 0 delta = 0
while delta < timeout: while delta < timeout:
reply = _read(handle, timeout) reply = _read(handle, timeout)
if reply: if reply:
report_id, reply_devnumber, reply_data = reply report_id, reply_devnumber, reply_data = reply
if reply_devnumber == devnumber: if reply_devnumber == devnumber:
if report_id == HIDPP_SHORT_MESSAGE_ID and reply_data[:1] == b'\x8F' and reply_data[1:3] == request_data[:2]: if report_id == HIDPP_SHORT_MESSAGE_ID and reply_data[:1] == b'\x8F' and reply_data[1:3] == request_data[:2
error = ord(reply_data[3:4]) ]:
error = ord(reply_data[3:4])
# if error == _hidpp10.ERROR.resource_error: # device unreachable if _log.isEnabledFor(_DEBUG):
# _log.warn("(%s) device %d error on request {%04X}: unknown device", handle, devnumber, request_id) _log.debug(
# raise DeviceUnreachable(number=devnumber, request=request_id) '(%s) device 0x%02X error on request {%04X}: %d = %s', handle, devnumber, request_id, error,
_hidpp10.ERROR[error]
# if error == _hidpp10.ERROR.unknown_device: # unknown device )
# _log.error("(%s) device %d error on request {%04X}: unknown device", handle, devnumber, request_id) return _hidpp10.ERROR[error] if return_error else None
# raise NoSuchDevice(number=devnumber, request=request_id) if reply_data[:1] == b'\xFF' and reply_data[1:3] == request_data[:2]:
# a HID++ 2.0 feature call returned with an error
if _log.isEnabledFor(_DEBUG): error = ord(reply_data[3:4])
_log.debug( _log.error(
'(%s) device 0x%02X error on request {%04X}: %d = %s', handle, devnumber, request_id, error, '(%s) device %d error on feature request {%04X}: %d = %s', handle, devnumber, request_id, error,
_hidpp10.ERROR[error] _hidpp20.ERROR[error]
) )
return _hidpp10.ERROR[error] if return_error else None raise _hidpp20.FeatureCallError(number=devnumber, request=request_id, error=error, params=params)
if reply_data[:1] == b'\xFF' and reply_data[1:3] == request_data[:2]:
# a HID++ 2.0 feature call returned with an error
error = ord(reply_data[3:4])
_log.error(
'(%s) device %d error on feature request {%04X}: %d = %s', handle, devnumber, request_id, error,
_hidpp20.ERROR[error]
)
raise _hidpp20.FeatureCallError(number=devnumber, request=request_id, error=error, params=params)
if reply_data[:2] == request_data[:2]: if reply_data[:2] == request_data[:2]:
if request_id & 0xFE00 == 0x8200: if request_id & 0xFE00 == 0x8200:
# long registry r/w should return a long reply # long registry r/w should return a long reply
assert report_id == HIDPP_LONG_MESSAGE_ID assert report_id == HIDPP_LONG_MESSAGE_ID
elif request_id & 0xFE00 == 0x8000: elif request_id & 0xFE00 == 0x8000:
# short registry r/w should return a short reply # short registry r/w should return a short reply
assert report_id == HIDPP_SHORT_MESSAGE_ID assert report_id == HIDPP_SHORT_MESSAGE_ID
if devnumber == 0xFF: if devnumber == 0xFF:
if request_id == 0x83B5 or request_id == 0x81F1: if request_id == 0x83B5 or request_id == 0x81F1:
# these replies have to match the first parameter as well # these replies have to match the first parameter as well
if reply_data[2:3] == params[:1]: if reply_data[2:3] == params[:1]:
return reply_data[2:] return reply_data[2:]
else:
# hm, not matching my request, and certainly not a notification
continue
else: else:
# hm, not matching my request, and certainly not a notification return reply_data[2:]
continue
else: else:
return reply_data[2:] return reply_data[2:]
else: else:
return reply_data[2:] # a reply was received, but did not match our request in any way
else: # reset the timeout starting point
# a reply was received, but did not match our request in any way request_started = _timestamp()
# reset the timeout starting point
request_started = _timestamp()
if notifications_hook: if notifications_hook:
n = make_notification(report_id, reply_devnumber, reply_data) n = make_notification(report_id, reply_devnumber, reply_data)
if n: if n:
notifications_hook(n) notifications_hook(n)
# elif _log.isEnabledFor(_DEBUG):
# _log.debug("(%s) ignoring reply %02X [%s]", handle, reply_devnumber, _strhex(reply_data))
# elif _log.isEnabledFor(_DEBUG): # elif _log.isEnabledFor(_DEBUG):
# _log.debug("(%s) ignoring reply %02X [%s]", handle, reply_devnumber, _strhex(reply_data)) # _log.debug("(%s) ignoring reply %02X [%s]", handle, reply_devnumber, _strhex(reply_data))
# elif _log.isEnabledFor(_DEBUG):
# _log.debug("(%s) ignoring reply %02X [%s]", handle, reply_devnumber, _strhex(reply_data))
delta = _timestamp() - request_started delta = _timestamp() - request_started
# if _log.isEnabledFor(_DEBUG): # if _log.isEnabledFor(_DEBUG):
# _log.debug("(%s) still waiting for reply, delta %f", handle, delta) # _log.debug("(%s) still waiting for reply, delta %f", handle, delta)
_log.warn( _log.warn(
'timeout (%0.2f/%0.2f) on device %d request {%04X} params [%s]', delta, timeout, devnumber, request_id, 'timeout (%0.2f/%0.2f) on device %d request {%04X} params [%s]', delta, timeout, devnumber, request_id,
_strhex(params) _strhex(params)
) )
# raise DeviceUnreachable(number=devnumber, request=request_id) # raise DeviceUnreachable(number=devnumber, request=request_id)
def ping(handle, devnumber, long_message=False): def ping(handle, devnumber, long_message=False):
@ -494,58 +517,61 @@ def ping(handle, devnumber, long_message=False):
assert devnumber >= 0x00 assert devnumber >= 0x00
assert devnumber < 0x0F assert devnumber < 0x0F
# randomize the SoftwareId and mark byte to be able to identify the ping with acquire_timeout(handle_lock(handle), handle, 10.):
# reply, and set most significant (0x8) bit in SoftwareId so that the reply
# is always distinguishable from notifications
request_id = 0x0018 | _random_bits(3)
request_data = _pack('!HBBB', request_id, 0, 0, _random_bits(8))
ihandle = int(handle) # randomize the SoftwareId and mark byte to be able to identify the ping
notifications_hook = getattr(handle, 'notifications_hook', None) # reply, and set most significant (0x8) bit in SoftwareId so that the reply
try: # is always distinguishable from notifications
_skip_incoming(handle, ihandle, notifications_hook) request_id = 0x0018 | _random_bits(3)
except NoReceiver: request_data = _pack('!HBBB', request_id, 0, 0, _random_bits(8))
_log.warn('device or receiver disconnected')
return
write(ihandle, devnumber, request_data, long_message) ihandle = int(handle)
notifications_hook = getattr(handle, 'notifications_hook', None)
try:
_skip_incoming(handle, ihandle, notifications_hook)
except NoReceiver:
_log.warn('device or receiver disconnected')
return
# we consider timeout from this point write(ihandle, devnumber, request_data, long_message)
request_started = _timestamp()
delta = 0
while delta < _PING_TIMEOUT: # we consider timeout from this point
reply = _read(handle, _PING_TIMEOUT) request_started = _timestamp()
delta = 0
if reply: while delta < _PING_TIMEOUT:
report_id, reply_devnumber, reply_data = reply reply = _read(handle, _PING_TIMEOUT)
if reply_devnumber == devnumber:
if reply_data[:2] == request_data[:2] and reply_data[4:5] == request_data[-1:]:
# HID++ 2.0+ device, currently connected
return ord(reply_data[2:3]) + ord(reply_data[3:4]) / 10.0
if report_id == HIDPP_SHORT_MESSAGE_ID and reply_data[:1] == b'\x8F' and reply_data[1:3] == request_data[:2]: if reply:
assert reply_data[-1:] == b'\x00' report_id, reply_devnumber, reply_data = reply
error = ord(reply_data[3:4]) if reply_devnumber == devnumber:
if reply_data[:2] == request_data[:2] and reply_data[4:5] == request_data[-1:]:
# HID++ 2.0+ device, currently connected
return ord(reply_data[2:3]) + ord(reply_data[3:4]) / 10.0
if error == _hidpp10.ERROR.invalid_SubID__command: # a valid reply from a HID++ 1.0 device if report_id == HIDPP_SHORT_MESSAGE_ID and reply_data[:1] == b'\x8F' and reply_data[1:3] == request_data[:2
return 1.0 ]:
assert reply_data[-1:] == b'\x00'
error = ord(reply_data[3:4])
if error == _hidpp10.ERROR.resource_error: # device unreachable if error == _hidpp10.ERROR.invalid_SubID__command: # a valid reply from a HID++ 1.0 device
return return 1.0
if error == _hidpp10.ERROR.unknown_device: # no paired device with that number if error == _hidpp10.ERROR.resource_error: # device unreachable
_log.error('(%s) device %d error on ping request: unknown device', handle, devnumber) return
raise NoSuchDevice(number=devnumber, request=request_id)
if notifications_hook: if error == _hidpp10.ERROR.unknown_device: # no paired device with that number
n = make_notification(report_id, reply_devnumber, reply_data) _log.error('(%s) device %d error on ping request: unknown device', handle, devnumber)
if n: raise NoSuchDevice(number=devnumber, request=request_id)
notifications_hook(n)
# elif _log.isEnabledFor(_DEBUG):
# _log.debug("(%s) ignoring reply %02X [%s]", handle, reply_devnumber, _strhex(reply_data))
delta = _timestamp() - request_started if notifications_hook:
n = make_notification(report_id, reply_devnumber, reply_data)
if n:
notifications_hook(n)
# elif _log.isEnabledFor(_DEBUG):
# _log.debug("(%s) ignoring reply %02X [%s]", handle, reply_devnumber, _strhex(reply_data))
_log.warn('(%s) timeout (%0.2f/%0.2f) on device %d ping', handle, delta, _PING_TIMEOUT, devnumber) delta = _timestamp() - request_started
# raise DeviceUnreachable(number=devnumber, request=request_id)
_log.warn('(%s) timeout (%0.2f/%0.2f) on device %d ping', handle, delta, _PING_TIMEOUT, devnumber)
# raise DeviceUnreachable(number=devnumber, request=request_id)

View File

@ -129,7 +129,7 @@ class _ThreadedHandle(object):
# a while for it to acknowledge it. # a while for it to acknowledge it.
# Forcibly closing the file handle on another thread does _not_ interrupt the # Forcibly closing the file handle on another thread does _not_ interrupt the
# read on Linux systems. # read on Linux systems.
_EVENT_READ_TIMEOUT = 0.4 # in seconds _EVENT_READ_TIMEOUT = 1. # in seconds
# After this many reads that did not produce a packet, call the tick() method. # After this many reads that did not produce a packet, call the tick() method.
# This only happens if tick_period is enabled (>0) for the Listener instance. # This only happens if tick_period is enabled (>0) for the Listener instance.
@ -174,7 +174,7 @@ class EventsListener(_threading.Thread):
if self._queued_notifications.empty(): if self._queued_notifications.empty():
try: try:
# _log.debug("read next notification") # _log.debug("read next notification")
n = _base.read(ihandle, _EVENT_READ_TIMEOUT) n = _base.read(self.receiver.handle, _EVENT_READ_TIMEOUT)
except _base.NoReceiver: except _base.NoReceiver:
_log.warning('receiver disconnected') _log.warning('receiver disconnected')
self.receiver.close() self.receiver.close()