diff --git a/lib/logitech_receiver/base.py b/lib/logitech_receiver/base.py index 88f593eb..57e307df 100644 --- a/lib/logitech_receiver/base.py +++ b/lib/logitech_receiver/base.py @@ -422,13 +422,24 @@ def acquire_timeout(lock, handle, timeout): lock.release() -# cycle the HID++ 2.0 software ID from x2 to xF, inclusive, to separate results from each other, notifications, and driver -sw_id = 0xF +def _get_next_sw_id() -> int: + """Returns 'random' software ID to separate replies from different devices. + + Cycle the HID++ 2.0 software ID from 0x2 to 0xF to separate + results and notifications. + """ + if not hasattr(_get_next_sw_id, "software_id"): + _get_next_sw_id.software_id = 0xF + + if _get_next_sw_id.software_id < 0xF: + _get_next_sw_id.software_id += 1 + else: + _get_next_sw_id.software_id = 2 + return _get_next_sw_id.software_id # a very few requests (e.g., host switching) do not expect a reply, but use no_reply=True with extreme caution def request(handle, devnumber, request_id, *params, no_reply=False, return_error=False, long_message=False, protocol=1.0): - global sw_id """Makes a feature call to a device and waits for a matching reply. :param handle: an open UR handle. :param devnumber: attached device number. @@ -439,12 +450,10 @@ def request(handle, devnumber, request_id, *params, no_reply=False, return_error with acquire_timeout(handle_lock(handle), handle, 10.0): assert isinstance(request_id, int) if (devnumber != 0xFF or protocol >= 2.0) and request_id < 0x8000: - # For HID++ 2.0 feature requests, randomize the SoftwareId to make it - # easier to recognize the reply for this request. also, always set the - # most significant bit (8) in SoftwareId, to make notifications easier - # to distinguish from request replies. + # Always set the most significant bit (8) in SoftwareId, + # to make notifications easier to distinguish from request replies. # This only applies to peripheral requests, ofc. - sw_id = sw_id + 1 if sw_id < 0xF else 2 + sw_id = _get_next_sw_id() request_id = (request_id & 0xFFF0) | sw_id # was 0x08 | getrandbits(3) timeout = _RECEIVER_REQUEST_TIMEOUT if devnumber == 0xFF else _DEVICE_REQUEST_TIMEOUT @@ -549,7 +558,6 @@ def ping(handle, devnumber, long_message=False): """Check if a device is connected to the receiver. :returns: The HID protocol supported by the device, as a floating point number, if the device is active. """ - global sw_id if logger.isEnabledFor(logging.DEBUG): logger.debug("(%s) pinging device %d", handle, devnumber) with acquire_timeout(handle_lock(handle), handle, 10.0): @@ -561,8 +569,7 @@ def ping(handle, devnumber, long_message=False): return # randomize the mark byte to be able to identify the ping reply - # cycle the sw_id byte from 2 to 15 (see above) - sw_id = sw_id + 1 if sw_id < 0xF else 2 + sw_id = _get_next_sw_id() request_id = 0x0010 | sw_id # was 0x0018 | getrandbits(3) request_data = struct.pack("!HBBB", request_id, 0, 0, getrandbits(8)) write(int(handle), devnumber, request_data, long_message) diff --git a/tests/logitech_receiver/test_base.py b/tests/logitech_receiver/test_base.py index a8a79152..0575b770 100644 --- a/tests/logitech_receiver/test_base.py +++ b/tests/logitech_receiver/test_base.py @@ -22,3 +22,11 @@ def test_product_information(usb_id, expected_name, expected_receiver_kind): if expected_receiver_kind: assert res["receiver_kind"] == expected_receiver_kind + + +def test_get_next_sw_id(): + res1 = base._get_next_sw_id() + res2 = base._get_next_sw_id() + + assert res1 == 2 + assert res2 == 3