parent
2442299539
commit
aa354dc4c4
|
@ -422,13 +422,24 @@ def acquire_timeout(lock, handle, timeout):
|
||||||
lock.release()
|
lock.release()
|
||||||
|
|
||||||
|
|
||||||
# cycle the HID++ 2.0 software ID from x2 to xF, inclusive, to separate results from each other, notifications, and driver
|
def _get_next_sw_id() -> int:
|
||||||
sw_id = 0xF
|
"""Returns 'random' software ID to separate replies from different devices.
|
||||||
|
|
||||||
|
Cycle the HID++ 2.0 software ID from 0x2 to 0xF to separate
|
||||||
|
results and notifications.
|
||||||
|
"""
|
||||||
|
if not hasattr(_get_next_sw_id, "software_id"):
|
||||||
|
_get_next_sw_id.software_id = 0xF
|
||||||
|
|
||||||
|
if _get_next_sw_id.software_id < 0xF:
|
||||||
|
_get_next_sw_id.software_id += 1
|
||||||
|
else:
|
||||||
|
_get_next_sw_id.software_id = 2
|
||||||
|
return _get_next_sw_id.software_id
|
||||||
|
|
||||||
|
|
||||||
# a very few requests (e.g., host switching) do not expect a reply, but use no_reply=True with extreme caution
|
# a very few requests (e.g., host switching) do not expect a reply, but use no_reply=True with extreme caution
|
||||||
def request(handle, devnumber, request_id, *params, no_reply=False, return_error=False, long_message=False, protocol=1.0):
|
def request(handle, devnumber, request_id, *params, no_reply=False, return_error=False, long_message=False, protocol=1.0):
|
||||||
global sw_id
|
|
||||||
"""Makes a feature call to a device and waits for a matching reply.
|
"""Makes a feature call to a device and waits for a matching reply.
|
||||||
:param handle: an open UR handle.
|
:param handle: an open UR handle.
|
||||||
:param devnumber: attached device number.
|
:param devnumber: attached device number.
|
||||||
|
@ -439,12 +450,10 @@ def request(handle, devnumber, request_id, *params, no_reply=False, return_error
|
||||||
with acquire_timeout(handle_lock(handle), handle, 10.0):
|
with acquire_timeout(handle_lock(handle), handle, 10.0):
|
||||||
assert isinstance(request_id, int)
|
assert isinstance(request_id, int)
|
||||||
if (devnumber != 0xFF or protocol >= 2.0) and request_id < 0x8000:
|
if (devnumber != 0xFF or protocol >= 2.0) and request_id < 0x8000:
|
||||||
# For HID++ 2.0 feature requests, randomize the SoftwareId to make it
|
# Always set the most significant bit (8) in SoftwareId,
|
||||||
# easier to recognize the reply for this request. also, always set the
|
# to make notifications easier to distinguish from request replies.
|
||||||
# most significant bit (8) in SoftwareId, to make notifications easier
|
|
||||||
# to distinguish from request replies.
|
|
||||||
# This only applies to peripheral requests, ofc.
|
# This only applies to peripheral requests, ofc.
|
||||||
sw_id = sw_id + 1 if sw_id < 0xF else 2
|
sw_id = _get_next_sw_id()
|
||||||
request_id = (request_id & 0xFFF0) | sw_id # was 0x08 | getrandbits(3)
|
request_id = (request_id & 0xFFF0) | sw_id # was 0x08 | getrandbits(3)
|
||||||
|
|
||||||
timeout = _RECEIVER_REQUEST_TIMEOUT if devnumber == 0xFF else _DEVICE_REQUEST_TIMEOUT
|
timeout = _RECEIVER_REQUEST_TIMEOUT if devnumber == 0xFF else _DEVICE_REQUEST_TIMEOUT
|
||||||
|
@ -549,7 +558,6 @@ def ping(handle, devnumber, long_message=False):
|
||||||
"""Check if a device is connected to the receiver.
|
"""Check if a device is connected to the receiver.
|
||||||
:returns: The HID protocol supported by the device, as a floating point number, if the device is active.
|
:returns: The HID protocol supported by the device, as a floating point number, if the device is active.
|
||||||
"""
|
"""
|
||||||
global sw_id
|
|
||||||
if logger.isEnabledFor(logging.DEBUG):
|
if logger.isEnabledFor(logging.DEBUG):
|
||||||
logger.debug("(%s) pinging device %d", handle, devnumber)
|
logger.debug("(%s) pinging device %d", handle, devnumber)
|
||||||
with acquire_timeout(handle_lock(handle), handle, 10.0):
|
with acquire_timeout(handle_lock(handle), handle, 10.0):
|
||||||
|
@ -561,8 +569,7 @@ def ping(handle, devnumber, long_message=False):
|
||||||
return
|
return
|
||||||
|
|
||||||
# randomize the mark byte to be able to identify the ping reply
|
# randomize the mark byte to be able to identify the ping reply
|
||||||
# cycle the sw_id byte from 2 to 15 (see above)
|
sw_id = _get_next_sw_id()
|
||||||
sw_id = sw_id + 1 if sw_id < 0xF else 2
|
|
||||||
request_id = 0x0010 | sw_id # was 0x0018 | getrandbits(3)
|
request_id = 0x0010 | sw_id # was 0x0018 | getrandbits(3)
|
||||||
request_data = struct.pack("!HBBB", request_id, 0, 0, getrandbits(8))
|
request_data = struct.pack("!HBBB", request_id, 0, 0, getrandbits(8))
|
||||||
write(int(handle), devnumber, request_data, long_message)
|
write(int(handle), devnumber, request_data, long_message)
|
||||||
|
|
|
@ -22,3 +22,11 @@ def test_product_information(usb_id, expected_name, expected_receiver_kind):
|
||||||
|
|
||||||
if expected_receiver_kind:
|
if expected_receiver_kind:
|
||||||
assert res["receiver_kind"] == expected_receiver_kind
|
assert res["receiver_kind"] == expected_receiver_kind
|
||||||
|
|
||||||
|
|
||||||
|
def test_get_next_sw_id():
|
||||||
|
res1 = base._get_next_sw_id()
|
||||||
|
res2 = base._get_next_sw_id()
|
||||||
|
|
||||||
|
assert res1 == 2
|
||||||
|
assert res2 == 3
|
||||||
|
|
Loading…
Reference in New Issue