From 7c6bd4202da1d9b3b210c1527ded07579fba0dc6 Mon Sep 17 00:00:00 2001 From: "Peter F. Patel-Schneider" Date: Tue, 21 Jan 2020 09:22:12 -0500 Subject: [PATCH] receiver: use dictionary for expected message length checking; ignore messages with unknown report ids --- lib/logitech_receiver/base.py | 45 +++++++++++++++++++---------------- 1 file changed, 25 insertions(+), 20 deletions(-) diff --git a/lib/logitech_receiver/base.py b/lib/logitech_receiver/base.py index 7d1162e8..29cfefca 100644 --- a/lib/logitech_receiver/base.py +++ b/lib/logitech_receiver/base.py @@ -44,6 +44,15 @@ _LONG_MESSAGE_SIZE = 20 _MEDIUM_MESSAGE_SIZE = 15 _MAX_READ_SIZE = 32 +# mapping from report_id to message length +report_lengths = { + 0x10: _SHORT_MESSAGE_SIZE, + 0x11: _LONG_MESSAGE_SIZE, + 0x20: _MEDIUM_MESSAGE_SIZE, + 0x21: _MAX_READ_SIZE +} + + """Default timeout on read (in seconds).""" DEFAULT_TIMEOUT = 4 # the receiver itself should reply very fast, within 500ms @@ -189,15 +198,16 @@ def read(handle, timeout=DEFAULT_TIMEOUT): return reply[1:] -# sanity checks on message and report incorrect sizes when debugging -def debug_message(data) : +# sanity checks on message report id and size +def check_message(data) : assert isinstance(data, bytes), (repr(data), type(data)) report_id = ord(data[:1]) - assert ((report_id & 0xF0 == 0) or - (report_id == 0x10 and len(data) == _SHORT_MESSAGE_SIZE) or - (report_id == 0x11 and len(data) == _LONG_MESSAGE_SIZE) or - (report_id == 0x20 and len(data) == _MEDIUM_MESSAGE_SIZE)), \ - "unexpected message size: report_id %02X message %s" % (report_id, _strhex(data)) + if report_id in report_lengths: # is this an HID++ or DJ message? + if report_lengths.get(report_id) == len(data): + return True + else: + _log.warn("unexpected message size: report_id %02X message %s" % (report_id, _strhex(data))) + return False def _read(handle, timeout): @@ -218,13 +228,8 @@ def _read(handle, timeout): close(handle) raise NoReceiver(reason=reason) - if data: - debug_message(data) + if data and check_message(data): # ignore messages that fail check report_id = ord(data[:1]) - if report_id & 0xF0 == 0x00: - if _log.isEnabledFor(_DEBUG): - _log.debug("(%s) => r[%02X %s] ignoring unknown report", handle, report_id, _strhex(data[1:])) - return devnumber = ord(data[1:2]) if _log.isEnabledFor(_DEBUG): @@ -252,12 +257,12 @@ def _skip_incoming(handle, ihandle, notifications_hook): raise NoReceiver(reason=reason) if data: - debug_message(data) - report_id = ord(data[:1]) - if notifications_hook and report_id & 0xF0: - n = make_notification(ord(data[1:2]), data[2:]) - if n: - notifications_hook(n) + if check_message(data): # only process messages that pass check + report_id = ord(data[:1]) + if notifications_hook: + n = make_notification(ord(data[1:2]), data[2:]) + if n: + notifications_hook(n) else: # nothing in the input buffer, we're done return @@ -274,7 +279,7 @@ def make_notification(devnumber, data): # regular keyboard key press reports and mouse movement reports are not notifications # it would be better to check for report_id 0x20 but that information is not sent here - if ( len(data) == _MEDIUM_MESSAGE_SIZE-2 and ( sub_id==0x02 or sub_id==0x01 ) ) : + if len(data) == _MEDIUM_MESSAGE_SIZE-2 and (sub_id==0x02 or sub_id==0x01): return address = ord(data[1:2])