receiver: use dictionary for expected message length checking; ignore messages with unknown report ids

This commit is contained in:
Peter F. Patel-Schneider 2020-01-21 09:22:12 -05:00
parent 1740a9a8c4
commit 7c6bd4202d
1 changed files with 25 additions and 20 deletions

View File

@ -44,6 +44,15 @@ _LONG_MESSAGE_SIZE = 20
_MEDIUM_MESSAGE_SIZE = 15
_MAX_READ_SIZE = 32
# mapping from report_id to message length
report_lengths = {
0x10: _SHORT_MESSAGE_SIZE,
0x11: _LONG_MESSAGE_SIZE,
0x20: _MEDIUM_MESSAGE_SIZE,
0x21: _MAX_READ_SIZE
}
"""Default timeout on read (in seconds)."""
DEFAULT_TIMEOUT = 4
# the receiver itself should reply very fast, within 500ms
@ -189,15 +198,16 @@ def read(handle, timeout=DEFAULT_TIMEOUT):
return reply[1:]
# sanity checks on message and report incorrect sizes when debugging
def debug_message(data) :
# sanity checks on message report id and size
def check_message(data) :
assert isinstance(data, bytes), (repr(data), type(data))
report_id = ord(data[:1])
assert ((report_id & 0xF0 == 0) or
(report_id == 0x10 and len(data) == _SHORT_MESSAGE_SIZE) or
(report_id == 0x11 and len(data) == _LONG_MESSAGE_SIZE) or
(report_id == 0x20 and len(data) == _MEDIUM_MESSAGE_SIZE)), \
"unexpected message size: report_id %02X message %s" % (report_id, _strhex(data))
if report_id in report_lengths: # is this an HID++ or DJ message?
if report_lengths.get(report_id) == len(data):
return True
else:
_log.warn("unexpected message size: report_id %02X message %s" % (report_id, _strhex(data)))
return False
def _read(handle, timeout):
@ -218,13 +228,8 @@ def _read(handle, timeout):
close(handle)
raise NoReceiver(reason=reason)
if data:
debug_message(data)
if data and check_message(data): # ignore messages that fail check
report_id = ord(data[:1])
if report_id & 0xF0 == 0x00:
if _log.isEnabledFor(_DEBUG):
_log.debug("(%s) => r[%02X %s] ignoring unknown report", handle, report_id, _strhex(data[1:]))
return
devnumber = ord(data[1:2])
if _log.isEnabledFor(_DEBUG):
@ -252,12 +257,12 @@ def _skip_incoming(handle, ihandle, notifications_hook):
raise NoReceiver(reason=reason)
if data:
debug_message(data)
report_id = ord(data[:1])
if notifications_hook and report_id & 0xF0:
n = make_notification(ord(data[1:2]), data[2:])
if n:
notifications_hook(n)
if check_message(data): # only process messages that pass check
report_id = ord(data[:1])
if notifications_hook:
n = make_notification(ord(data[1:2]), data[2:])
if n:
notifications_hook(n)
else:
# nothing in the input buffer, we're done
return
@ -274,7 +279,7 @@ def make_notification(devnumber, data):
# regular keyboard key press reports and mouse movement reports are not notifications
# it would be better to check for report_id 0x20 but that information is not sent here
if ( len(data) == _MEDIUM_MESSAGE_SIZE-2 and ( sub_id==0x02 or sub_id==0x01 ) ) :
if len(data) == _MEDIUM_MESSAGE_SIZE-2 and (sub_id==0x02 or sub_id==0x01):
return
address = ord(data[1:2])