parent
5d86c74df4
commit
1e6af7fa7d
|
@ -87,8 +87,6 @@ class HIDAPI(typing.Protocol):
|
||||||
...
|
...
|
||||||
|
|
||||||
|
|
||||||
hidapi = typing.cast(HIDAPI, hidapi)
|
|
||||||
|
|
||||||
SHORT_MESSAGE_SIZE = 7
|
SHORT_MESSAGE_SIZE = 7
|
||||||
_LONG_MESSAGE_SIZE = 20
|
_LONG_MESSAGE_SIZE = 20
|
||||||
_MEDIUM_MESSAGE_SIZE = 15
|
_MEDIUM_MESSAGE_SIZE = 15
|
||||||
|
@ -108,6 +106,11 @@ _DEVICE_REQUEST_TIMEOUT = DEFAULT_TIMEOUT
|
||||||
# when pinging, be extra patient (no longer)
|
# when pinging, be extra patient (no longer)
|
||||||
_PING_TIMEOUT = DEFAULT_TIMEOUT
|
_PING_TIMEOUT = DEFAULT_TIMEOUT
|
||||||
|
|
||||||
|
hidapi = typing.cast(HIDAPI, hidapi)
|
||||||
|
|
||||||
|
request_lock = threading.Lock() # serialize all requests
|
||||||
|
handles_lock = {}
|
||||||
|
|
||||||
|
|
||||||
@dataclasses.dataclass
|
@dataclasses.dataclass
|
||||||
class HIDPPNotification:
|
class HIDPPNotification:
|
||||||
|
@ -146,25 +149,48 @@ for _ignore, d in descriptors.DEVICES.items():
|
||||||
KNOWN_DEVICE_IDS.append(_bluetooth_device(d.btid))
|
KNOWN_DEVICE_IDS.append(_bluetooth_device(d.btid))
|
||||||
|
|
||||||
|
|
||||||
def _other_device_check(bus_id: int, vendor_id: int, product_id: int) -> dict[str, Any] | None:
|
|
||||||
"""Check whether product is a Logitech USB-connected or Bluetooth device based on bus, vendor, and product IDs
|
|
||||||
This allows Solaar to support receiverless HID++ 2.0 devices that it knows nothing about"""
|
|
||||||
if vendor_id != LOGITECH_VENDOR_ID:
|
|
||||||
return
|
|
||||||
|
|
||||||
device_info = None
|
|
||||||
if bus_id == BusID.USB and (0xC07D <= product_id <= 0xC094 or 0xC32B <= product_id <= 0xC344):
|
|
||||||
device_info = _usb_device(product_id, 2)
|
|
||||||
elif bus_id == BusID.BLUETOOTH and (0xB012 <= product_id <= 0xB0FF or 0xB317 <= product_id <= 0xB3FF):
|
|
||||||
device_info = _bluetooth_device(product_id)
|
|
||||||
return device_info
|
|
||||||
|
|
||||||
|
|
||||||
def product_information(usb_id: int) -> dict[str, Any]:
|
def product_information(usb_id: int) -> dict[str, Any]:
|
||||||
"""Returns hardcoded information from USB receiver."""
|
"""Returns hardcoded information from USB receiver."""
|
||||||
return base_usb.get_receiver_info(usb_id)
|
return base_usb.get_receiver_info(usb_id)
|
||||||
|
|
||||||
|
|
||||||
|
def receivers():
|
||||||
|
"""Enumerate all the receivers attached to the machine."""
|
||||||
|
yield from hidapi.enumerate(_filter_receivers)
|
||||||
|
|
||||||
|
|
||||||
|
def filter_products_of_interest(
|
||||||
|
bus_id: int, vendor_id: int, product_id: int, hidpp_short: bool = False, hidpp_long: bool = False
|
||||||
|
) -> dict[str, Any] | None:
|
||||||
|
"""Check that this product is of interest and if so return the device record for further checking"""
|
||||||
|
|
||||||
|
def _other_device_check(bus_id: int, vendor_id: int, product_id: int) -> dict[str, Any] | None:
|
||||||
|
"""Check whether product is a Logitech USB-connected or Bluetooth device based on bus, vendor, and product IDs
|
||||||
|
This allows Solaar to support receiverless HID++ 2.0 devices that it knows nothing about"""
|
||||||
|
if vendor_id != LOGITECH_VENDOR_ID:
|
||||||
|
return
|
||||||
|
|
||||||
|
device_info = None
|
||||||
|
if bus_id == BusID.USB and (0xC07D <= product_id <= 0xC094 or 0xC32B <= product_id <= 0xC344):
|
||||||
|
device_info = _usb_device(product_id, 2)
|
||||||
|
elif bus_id == BusID.BLUETOOTH and (0xB012 <= product_id <= 0xB0FF or 0xB317 <= product_id <= 0xB3FF):
|
||||||
|
device_info = _bluetooth_device(product_id)
|
||||||
|
return device_info
|
||||||
|
|
||||||
|
record = _filter_receivers(bus_id, vendor_id, product_id, hidpp_short, hidpp_long)
|
||||||
|
if record: # known or unknown receiver
|
||||||
|
return record
|
||||||
|
|
||||||
|
for record in KNOWN_DEVICE_IDS:
|
||||||
|
if _match(record, bus_id, vendor_id, product_id):
|
||||||
|
return record
|
||||||
|
if hidpp_short or hidpp_long: # unknown devices that use HID++
|
||||||
|
return {"vendor_id": vendor_id, "product_id": product_id, "bus_id": bus_id, "isDevice": True}
|
||||||
|
elif hidpp_short is None and hidpp_long is None: # unknown devices in correct range of IDs
|
||||||
|
return _other_device_check(bus_id, vendor_id, product_id)
|
||||||
|
return None
|
||||||
|
|
||||||
|
|
||||||
def _match(record: dict[str, Any], bus_id: int, vendor_id: int, product_id: int):
|
def _match(record: dict[str, Any], bus_id: int, vendor_id: int, product_id: int):
|
||||||
return (
|
return (
|
||||||
(record.get("bus_id") is None or record.get("bus_id") == bus_id)
|
(record.get("bus_id") is None or record.get("bus_id") == bus_id)
|
||||||
|
@ -194,29 +220,6 @@ def _filter_receivers(
|
||||||
return None
|
return None
|
||||||
|
|
||||||
|
|
||||||
def receivers():
|
|
||||||
"""Enumerate all the receivers attached to the machine."""
|
|
||||||
yield from hidapi.enumerate(_filter_receivers)
|
|
||||||
|
|
||||||
|
|
||||||
def filter_products_of_interest(
|
|
||||||
bus_id: int, vendor_id: int, product_id: int, hidpp_short: bool = False, hidpp_long: bool = False
|
|
||||||
) -> dict[str, Any] | None:
|
|
||||||
"""Check that this product is of interest and if so return the device record for further checking"""
|
|
||||||
record = _filter_receivers(bus_id, vendor_id, product_id, hidpp_short, hidpp_long)
|
|
||||||
if record: # known or unknown receiver
|
|
||||||
return record
|
|
||||||
|
|
||||||
for record in KNOWN_DEVICE_IDS:
|
|
||||||
if _match(record, bus_id, vendor_id, product_id):
|
|
||||||
return record
|
|
||||||
if hidpp_short or hidpp_long: # unknown devices that use HID++
|
|
||||||
return {"vendor_id": vendor_id, "product_id": product_id, "bus_id": bus_id, "isDevice": True}
|
|
||||||
elif hidpp_short is None and hidpp_long is None: # unknown devices in correct range of IDs
|
|
||||||
return _other_device_check(bus_id, vendor_id, product_id)
|
|
||||||
return None
|
|
||||||
|
|
||||||
|
|
||||||
def receivers_and_devices():
|
def receivers_and_devices():
|
||||||
"""Enumerate all the receivers and devices directly attached to the machine."""
|
"""Enumerate all the receivers and devices directly attached to the machine."""
|
||||||
yield from hidapi.enumerate(filter_products_of_interest)
|
yield from hidapi.enumerate(filter_products_of_interest)
|
||||||
|
@ -393,33 +396,6 @@ def _read(handle, timeout) -> tuple[int, int, bytes]:
|
||||||
return report_id, devnumber, data[2:]
|
return report_id, devnumber, data[2:]
|
||||||
|
|
||||||
|
|
||||||
def _skip_incoming(handle, ihandle, notifications_hook):
|
|
||||||
"""Read anything already in the input buffer.
|
|
||||||
|
|
||||||
Used by request() and ping() before their write.
|
|
||||||
"""
|
|
||||||
|
|
||||||
while True:
|
|
||||||
try:
|
|
||||||
# read whatever is already in the buffer, if any
|
|
||||||
data = hidapi.read(ihandle, _MAX_READ_SIZE, 0)
|
|
||||||
except Exception as reason:
|
|
||||||
logger.error("read failed, assuming receiver %s no longer available", handle)
|
|
||||||
close(handle)
|
|
||||||
raise exceptions.NoReceiver(reason=reason) from reason
|
|
||||||
|
|
||||||
if data:
|
|
||||||
if _is_relevant_message(data): # only process messages that pass check
|
|
||||||
# report_id = ord(data[:1])
|
|
||||||
if notifications_hook:
|
|
||||||
n = make_notification(ord(data[:1]), ord(data[1:2]), data[2:])
|
|
||||||
if n:
|
|
||||||
notifications_hook(n)
|
|
||||||
else:
|
|
||||||
# nothing in the input buffer, we're done
|
|
||||||
return
|
|
||||||
|
|
||||||
|
|
||||||
def make_notification(report_id: int, devnumber: int, data: bytes) -> HIDPPNotification | None:
|
def make_notification(report_id: int, devnumber: int, data: bytes) -> HIDPPNotification | None:
|
||||||
"""Guess if this is a notification (and not just a request reply), and
|
"""Guess if this is a notification (and not just a request reply), and
|
||||||
return a Notification if it is."""
|
return a Notification if it is."""
|
||||||
|
@ -455,10 +431,6 @@ def make_notification(report_id: int, devnumber: int, data: bytes) -> HIDPPNotif
|
||||||
return None
|
return None
|
||||||
|
|
||||||
|
|
||||||
request_lock = threading.Lock() # serialize all requests
|
|
||||||
handles_lock = {}
|
|
||||||
|
|
||||||
|
|
||||||
def handle_lock(handle):
|
def handle_lock(handle):
|
||||||
with request_lock:
|
with request_lock:
|
||||||
if handles_lock.get(handle) is None:
|
if handles_lock.get(handle) is None:
|
||||||
|
@ -481,22 +453,6 @@ def acquire_timeout(lock, handle, timeout):
|
||||||
lock.release()
|
lock.release()
|
||||||
|
|
||||||
|
|
||||||
def _get_next_sw_id() -> int:
|
|
||||||
"""Returns 'random' software ID to separate replies from different devices.
|
|
||||||
|
|
||||||
Cycle the HID++ 2.0 software ID from 0x2 to 0xF to separate
|
|
||||||
results and notifications.
|
|
||||||
"""
|
|
||||||
if not hasattr(_get_next_sw_id, "software_id"):
|
|
||||||
_get_next_sw_id.software_id = 0xF
|
|
||||||
|
|
||||||
if _get_next_sw_id.software_id < 0xF:
|
|
||||||
_get_next_sw_id.software_id += 1
|
|
||||||
else:
|
|
||||||
_get_next_sw_id.software_id = 2
|
|
||||||
return _get_next_sw_id.software_id
|
|
||||||
|
|
||||||
|
|
||||||
def find_paired_node(receiver_path: str, index: int, timeout: int):
|
def find_paired_node(receiver_path: str, index: int, timeout: int):
|
||||||
"""Find the node of a device paired with a receiver."""
|
"""Find the node of a device paired with a receiver."""
|
||||||
return hidapi.find_paired_node(receiver_path, index, timeout)
|
return hidapi.find_paired_node(receiver_path, index, timeout)
|
||||||
|
@ -693,3 +649,46 @@ def ping(handle, devnumber, long_message: bool = False):
|
||||||
delta = time() - request_started
|
delta = time() - request_started
|
||||||
|
|
||||||
logger.warning("(%s) timeout (%0.2f/%0.2f) on device %d ping", handle, delta, _PING_TIMEOUT, devnumber)
|
logger.warning("(%s) timeout (%0.2f/%0.2f) on device %d ping", handle, delta, _PING_TIMEOUT, devnumber)
|
||||||
|
|
||||||
|
|
||||||
|
def _skip_incoming(handle, ihandle, notifications_hook):
|
||||||
|
"""Read anything already in the input buffer.
|
||||||
|
|
||||||
|
Used by request() and ping() before their write.
|
||||||
|
"""
|
||||||
|
|
||||||
|
while True:
|
||||||
|
try:
|
||||||
|
# read whatever is already in the buffer, if any
|
||||||
|
data = hidapi.read(ihandle, _MAX_READ_SIZE, 0)
|
||||||
|
except Exception as reason:
|
||||||
|
logger.error("read failed, assuming receiver %s no longer available", handle)
|
||||||
|
close(handle)
|
||||||
|
raise exceptions.NoReceiver(reason=reason) from reason
|
||||||
|
|
||||||
|
if data:
|
||||||
|
if _is_relevant_message(data): # only process messages that pass check
|
||||||
|
# report_id = ord(data[:1])
|
||||||
|
if notifications_hook:
|
||||||
|
n = make_notification(ord(data[:1]), ord(data[1:2]), data[2:])
|
||||||
|
if n:
|
||||||
|
notifications_hook(n)
|
||||||
|
else:
|
||||||
|
# nothing in the input buffer, we're done
|
||||||
|
return
|
||||||
|
|
||||||
|
|
||||||
|
def _get_next_sw_id() -> int:
|
||||||
|
"""Returns 'random' software ID to separate replies from different devices.
|
||||||
|
|
||||||
|
Cycle the HID++ 2.0 software ID from 0x2 to 0xF to separate
|
||||||
|
results and notifications.
|
||||||
|
"""
|
||||||
|
if not hasattr(_get_next_sw_id, "software_id"):
|
||||||
|
_get_next_sw_id.software_id = 0xF
|
||||||
|
|
||||||
|
if _get_next_sw_id.software_id < 0xF:
|
||||||
|
_get_next_sw_id.software_id += 1
|
||||||
|
else:
|
||||||
|
_get_next_sw_id.software_id = 2
|
||||||
|
return _get_next_sw_id.software_id
|
||||||
|
|
Loading…
Reference in New Issue