From ade9c816c7d003b7ad63f042c770a069b8d38532 Mon Sep 17 00:00:00 2001 From: "Peter F. Patel-Schneider" Date: Sun, 25 Oct 2020 13:37:45 -0400 Subject: [PATCH] receiver: add report_id to notifications and use where appropriate --- lib/logitech_receiver/base.py | 45 +++++++++++++++----------- lib/logitech_receiver/notifications.py | 4 +-- lib/solaar/listener.py | 2 +- 3 files changed, 29 insertions(+), 22 deletions(-) diff --git a/lib/logitech_receiver/base.py b/lib/logitech_receiver/base.py index b902615a..86eda65f 100644 --- a/lib/logitech_receiver/base.py +++ b/lib/logitech_receiver/base.py @@ -51,8 +51,17 @@ _LONG_MESSAGE_SIZE = 20 _MEDIUM_MESSAGE_SIZE = 15 _MAX_READ_SIZE = 32 +HIDPP_SHORT_MESSAGE_ID = 0x10 +HIDPP_LONG_MESSAGE_ID = 0x11 +DJ_MESSAGE_ID = 0x20 + # mapping from report_id to message length -report_lengths = {0x10: _SHORT_MESSAGE_SIZE, 0x11: _LONG_MESSAGE_SIZE, 0x20: _MEDIUM_MESSAGE_SIZE, 0x21: _MAX_READ_SIZE} +report_lengths = { + HIDPP_SHORT_MESSAGE_ID: _SHORT_MESSAGE_SIZE, + HIDPP_LONG_MESSAGE_ID: _LONG_MESSAGE_SIZE, + DJ_MESSAGE_ID: _MEDIUM_MESSAGE_SIZE, + 0x21: _MAX_READ_SIZE +} """Default timeout on read (in seconds).""" DEFAULT_TIMEOUT = 4 # the receiver itself should reply very fast, within 500ms @@ -199,9 +208,9 @@ def write(handle, devnumber, data, long_message=False): assert isinstance(data, bytes), (repr(data), type(data)) if long_message or len(data) > _SHORT_MESSAGE_SIZE - 2 or data[:1] == b'\x82': - wdata = _pack('!BB18s', 0x11, devnumber, data) + wdata = _pack('!BB18s', HIDPP_LONG_MESSAGE_ID, devnumber, data) else: - wdata = _pack('!BB5s', 0x10, devnumber, data) + wdata = _pack('!BB5s', HIDPP_SHORT_MESSAGE_ID, devnumber, data) if _log.isEnabledFor(_DEBUG): _log.debug('(%s) <= w[%02X %02X %s %s]', handle, ord(wdata[:1]), devnumber, _strhex(wdata[2:4]), _strhex(wdata[4:])) @@ -228,7 +237,7 @@ def read(handle, timeout=DEFAULT_TIMEOUT): """ reply = _read(handle, timeout) if reply: - return reply[1:] + return reply # sanity checks on message report id and size @@ -295,7 +304,7 @@ def _skip_incoming(handle, ihandle, notifications_hook): if check_message(data): # only process messages that pass check # report_id = ord(data[:1]) if notifications_hook: - n = make_notification(ord(data[1:2]), data[2:]) + n = make_notification(ord(data[:1]), ord(data[1:2]), data[2:]) if n: notifications_hook(n) else: @@ -303,7 +312,7 @@ def _skip_incoming(handle, ihandle, notifications_hook): return -def make_notification(devnumber, data): +def make_notification(report_id, devnumber, data): """Guess if this is a notification (and not just a request reply), and return a Notification tuple if it is.""" @@ -313,8 +322,7 @@ def make_notification(devnumber, data): return # DJ input records are not notifications - # it would be better to check for report_id 0x20 but that information is not sent here - if len(data) == _MEDIUM_MESSAGE_SIZE - 2 and (sub_id < 0x10): + if report_id == DJ_MESSAGE_ID and (sub_id < 0x10): return address = ord(data[1:2]) @@ -328,15 +336,14 @@ def make_notification(devnumber, data): # HID++ 2.0 feature notifications have the SoftwareID 0 (address & 0x0F == 0x00) ): # noqa: E129 - return _HIDPP_Notification(devnumber, sub_id, address, data[2:]) + return _HIDPP_Notification(report_id, devnumber, sub_id, address, data[2:]) -_HIDPP_Notification = namedtuple('_HIDPP_Notification', ('devnumber', 'sub_id', 'address', 'data')) -_HIDPP_Notification.__str__ = lambda self: 'Notification(%d,%02X,%02X,%s)' % ( - self.devnumber, self.sub_id, self.address, _strhex(self.data) +_HIDPP_Notification = namedtuple('_HIDPP_Notification', ('report_id', 'devnumber', 'sub_id', 'address', 'data')) +_HIDPP_Notification.__str__ = lambda self: 'Notification(%02x,%d,%02X,%02X,%s)' % ( + self.report_id, self.devnumber, self.sub_id, self.address, _strhex(self.data) ) _HIDPP_Notification.__unicode__ = _HIDPP_Notification.__str__ -DJ_NOTIFICATION_LENGTH = _MEDIUM_MESSAGE_SIZE - 4 # to allow easy distinguishing of DJ notifications del namedtuple # @@ -397,7 +404,7 @@ def request(handle, devnumber, request_id, *params, no_reply=False, return_error if reply: report_id, reply_devnumber, reply_data = reply if reply_devnumber == devnumber: - if report_id == 0x10 and reply_data[:1] == b'\x8F' and reply_data[1:3] == request_data[:2]: + if report_id == HIDPP_SHORT_MESSAGE_ID and reply_data[:1] == b'\x8F' and reply_data[1:3] == request_data[:2]: error = ord(reply_data[3:4]) # if error == _hidpp10.ERROR.resource_error: # device unreachable @@ -426,10 +433,10 @@ def request(handle, devnumber, request_id, *params, no_reply=False, return_error if reply_data[:2] == request_data[:2]: if request_id & 0xFE00 == 0x8200: # long registry r/w should return a long reply - assert report_id == 0x11 + assert report_id == HIDPP_LONG_MESSAGE_ID elif request_id & 0xFE00 == 0x8000: # short registry r/w should return a short reply - assert report_id == 0x10 + assert report_id == HIDPP_SHORT_MESSAGE_ID if devnumber == 0xFF: if request_id == 0x83B5 or request_id == 0x81F1: @@ -449,7 +456,7 @@ def request(handle, devnumber, request_id, *params, no_reply=False, return_error request_started = _timestamp() if notifications_hook: - n = make_notification(reply_devnumber, reply_data) + n = make_notification(report_id, reply_devnumber, reply_data) if n: notifications_hook(n) # elif _log.isEnabledFor(_DEBUG): @@ -508,7 +515,7 @@ def ping(handle, devnumber, long_message=False): # HID++ 2.0+ device, currently connected return ord(reply_data[2:3]) + ord(reply_data[3:4]) / 10.0 - if report_id == 0x10 and reply_data[:1] == b'\x8F' and reply_data[1:3] == request_data[:2]: + if report_id == HIDPP_SHORT_MESSAGE_ID and reply_data[:1] == b'\x8F' and reply_data[1:3] == request_data[:2]: assert reply_data[-1:] == b'\x00' error = ord(reply_data[3:4]) @@ -523,7 +530,7 @@ def ping(handle, devnumber, long_message=False): raise NoSuchDevice(number=devnumber, request=request_id) if notifications_hook: - n = make_notification(reply_devnumber, reply_data) + n = make_notification(report_id, reply_devnumber, reply_data) if n: notifications_hook(n) # elif _log.isEnabledFor(_DEBUG): diff --git a/lib/logitech_receiver/notifications.py b/lib/logitech_receiver/notifications.py index c0c97f4d..c3cfc542 100644 --- a/lib/logitech_receiver/notifications.py +++ b/lib/logitech_receiver/notifications.py @@ -29,7 +29,7 @@ from logging import getLogger from . import hidpp10 as _hidpp10 from . import hidpp20 as _hidpp20 -from .base import DJ_NOTIFICATION_LENGTH as _DJ_NOTIFICATION_LENGTH +from .base import DJ_MESSAGE_ID as _DJ_MESSAGE_ID from .common import strhex as _strhex from .common import unpack as _unpack from .i18n import _ @@ -111,7 +111,7 @@ def _process_device_notification(device, status, n): # 0x40 to 0x7F appear to be HID++ 1.0 or DJ notifications if n.sub_id >= 0x40: - if len(n.data) == _DJ_NOTIFICATION_LENGTH: + if n.report_id == _DJ_MESSAGE_ID: return _process_dj_notification(device, status, n) else: return _process_hidpp10_notification(device, status, n) diff --git a/lib/solaar/listener.py b/lib/solaar/listener.py index 93a7c8fe..0f718822 100644 --- a/lib/solaar/listener.py +++ b/lib/solaar/listener.py @@ -205,7 +205,7 @@ class ReceiverListener(_listener.EventsListener): if n.sub_id == 0x40 and not already_known: return # disconnecting something that is not known - nothing to do - if n.sub_id == 0x41 and len(n.data) > _base._SHORT_MESSAGE_SIZE - 4: + if n.sub_id == 0x41 and n.report_id == _base.DJ_MESSAGE_ID: # DJ pairing notification - ignore - hid++ 1.0 pairing notification is all that is needed if _log.isEnabledFor(_INFO): _log.info('ignoring DJ pairing notification %s', n)