From 1e6af7fa7d2562f1998d6bd284406164401c8fd6 Mon Sep 17 00:00:00 2001 From: MattHag <16444067+MattHag@users.noreply.github.com> Date: Sun, 29 Dec 2024 01:29:17 +0100 Subject: [PATCH] base: Reorder private functions and variable definitions Related #2273 --- lib/logitech_receiver/base.py | 171 +++++++++++++++++----------------- 1 file changed, 85 insertions(+), 86 deletions(-) diff --git a/lib/logitech_receiver/base.py b/lib/logitech_receiver/base.py index 5701743e..1af147c3 100644 --- a/lib/logitech_receiver/base.py +++ b/lib/logitech_receiver/base.py @@ -87,8 +87,6 @@ class HIDAPI(typing.Protocol): ... -hidapi = typing.cast(HIDAPI, hidapi) - SHORT_MESSAGE_SIZE = 7 _LONG_MESSAGE_SIZE = 20 _MEDIUM_MESSAGE_SIZE = 15 @@ -108,6 +106,11 @@ _DEVICE_REQUEST_TIMEOUT = DEFAULT_TIMEOUT # when pinging, be extra patient (no longer) _PING_TIMEOUT = DEFAULT_TIMEOUT +hidapi = typing.cast(HIDAPI, hidapi) + +request_lock = threading.Lock() # serialize all requests +handles_lock = {} + @dataclasses.dataclass class HIDPPNotification: @@ -146,25 +149,48 @@ for _ignore, d in descriptors.DEVICES.items(): KNOWN_DEVICE_IDS.append(_bluetooth_device(d.btid)) -def _other_device_check(bus_id: int, vendor_id: int, product_id: int) -> dict[str, Any] | None: - """Check whether product is a Logitech USB-connected or Bluetooth device based on bus, vendor, and product IDs - This allows Solaar to support receiverless HID++ 2.0 devices that it knows nothing about""" - if vendor_id != LOGITECH_VENDOR_ID: - return - - device_info = None - if bus_id == BusID.USB and (0xC07D <= product_id <= 0xC094 or 0xC32B <= product_id <= 0xC344): - device_info = _usb_device(product_id, 2) - elif bus_id == BusID.BLUETOOTH and (0xB012 <= product_id <= 0xB0FF or 0xB317 <= product_id <= 0xB3FF): - device_info = _bluetooth_device(product_id) - return device_info - - def product_information(usb_id: int) -> dict[str, Any]: """Returns hardcoded information from USB receiver.""" return base_usb.get_receiver_info(usb_id) +def receivers(): + """Enumerate all the receivers attached to the machine.""" + yield from hidapi.enumerate(_filter_receivers) + + +def filter_products_of_interest( + bus_id: int, vendor_id: int, product_id: int, hidpp_short: bool = False, hidpp_long: bool = False +) -> dict[str, Any] | None: + """Check that this product is of interest and if so return the device record for further checking""" + + def _other_device_check(bus_id: int, vendor_id: int, product_id: int) -> dict[str, Any] | None: + """Check whether product is a Logitech USB-connected or Bluetooth device based on bus, vendor, and product IDs + This allows Solaar to support receiverless HID++ 2.0 devices that it knows nothing about""" + if vendor_id != LOGITECH_VENDOR_ID: + return + + device_info = None + if bus_id == BusID.USB and (0xC07D <= product_id <= 0xC094 or 0xC32B <= product_id <= 0xC344): + device_info = _usb_device(product_id, 2) + elif bus_id == BusID.BLUETOOTH and (0xB012 <= product_id <= 0xB0FF or 0xB317 <= product_id <= 0xB3FF): + device_info = _bluetooth_device(product_id) + return device_info + + record = _filter_receivers(bus_id, vendor_id, product_id, hidpp_short, hidpp_long) + if record: # known or unknown receiver + return record + + for record in KNOWN_DEVICE_IDS: + if _match(record, bus_id, vendor_id, product_id): + return record + if hidpp_short or hidpp_long: # unknown devices that use HID++ + return {"vendor_id": vendor_id, "product_id": product_id, "bus_id": bus_id, "isDevice": True} + elif hidpp_short is None and hidpp_long is None: # unknown devices in correct range of IDs + return _other_device_check(bus_id, vendor_id, product_id) + return None + + def _match(record: dict[str, Any], bus_id: int, vendor_id: int, product_id: int): return ( (record.get("bus_id") is None or record.get("bus_id") == bus_id) @@ -194,29 +220,6 @@ def _filter_receivers( return None -def receivers(): - """Enumerate all the receivers attached to the machine.""" - yield from hidapi.enumerate(_filter_receivers) - - -def filter_products_of_interest( - bus_id: int, vendor_id: int, product_id: int, hidpp_short: bool = False, hidpp_long: bool = False -) -> dict[str, Any] | None: - """Check that this product is of interest and if so return the device record for further checking""" - record = _filter_receivers(bus_id, vendor_id, product_id, hidpp_short, hidpp_long) - if record: # known or unknown receiver - return record - - for record in KNOWN_DEVICE_IDS: - if _match(record, bus_id, vendor_id, product_id): - return record - if hidpp_short or hidpp_long: # unknown devices that use HID++ - return {"vendor_id": vendor_id, "product_id": product_id, "bus_id": bus_id, "isDevice": True} - elif hidpp_short is None and hidpp_long is None: # unknown devices in correct range of IDs - return _other_device_check(bus_id, vendor_id, product_id) - return None - - def receivers_and_devices(): """Enumerate all the receivers and devices directly attached to the machine.""" yield from hidapi.enumerate(filter_products_of_interest) @@ -393,33 +396,6 @@ def _read(handle, timeout) -> tuple[int, int, bytes]: return report_id, devnumber, data[2:] -def _skip_incoming(handle, ihandle, notifications_hook): - """Read anything already in the input buffer. - - Used by request() and ping() before their write. - """ - - while True: - try: - # read whatever is already in the buffer, if any - data = hidapi.read(ihandle, _MAX_READ_SIZE, 0) - except Exception as reason: - logger.error("read failed, assuming receiver %s no longer available", handle) - close(handle) - raise exceptions.NoReceiver(reason=reason) from reason - - if data: - if _is_relevant_message(data): # only process messages that pass check - # report_id = ord(data[:1]) - if notifications_hook: - n = make_notification(ord(data[:1]), ord(data[1:2]), data[2:]) - if n: - notifications_hook(n) - else: - # nothing in the input buffer, we're done - return - - def make_notification(report_id: int, devnumber: int, data: bytes) -> HIDPPNotification | None: """Guess if this is a notification (and not just a request reply), and return a Notification if it is.""" @@ -455,10 +431,6 @@ def make_notification(report_id: int, devnumber: int, data: bytes) -> HIDPPNotif return None -request_lock = threading.Lock() # serialize all requests -handles_lock = {} - - def handle_lock(handle): with request_lock: if handles_lock.get(handle) is None: @@ -481,22 +453,6 @@ def acquire_timeout(lock, handle, timeout): lock.release() -def _get_next_sw_id() -> int: - """Returns 'random' software ID to separate replies from different devices. - - Cycle the HID++ 2.0 software ID from 0x2 to 0xF to separate - results and notifications. - """ - if not hasattr(_get_next_sw_id, "software_id"): - _get_next_sw_id.software_id = 0xF - - if _get_next_sw_id.software_id < 0xF: - _get_next_sw_id.software_id += 1 - else: - _get_next_sw_id.software_id = 2 - return _get_next_sw_id.software_id - - def find_paired_node(receiver_path: str, index: int, timeout: int): """Find the node of a device paired with a receiver.""" return hidapi.find_paired_node(receiver_path, index, timeout) @@ -693,3 +649,46 @@ def ping(handle, devnumber, long_message: bool = False): delta = time() - request_started logger.warning("(%s) timeout (%0.2f/%0.2f) on device %d ping", handle, delta, _PING_TIMEOUT, devnumber) + + +def _skip_incoming(handle, ihandle, notifications_hook): + """Read anything already in the input buffer. + + Used by request() and ping() before their write. + """ + + while True: + try: + # read whatever is already in the buffer, if any + data = hidapi.read(ihandle, _MAX_READ_SIZE, 0) + except Exception as reason: + logger.error("read failed, assuming receiver %s no longer available", handle) + close(handle) + raise exceptions.NoReceiver(reason=reason) from reason + + if data: + if _is_relevant_message(data): # only process messages that pass check + # report_id = ord(data[:1]) + if notifications_hook: + n = make_notification(ord(data[:1]), ord(data[1:2]), data[2:]) + if n: + notifications_hook(n) + else: + # nothing in the input buffer, we're done + return + + +def _get_next_sw_id() -> int: + """Returns 'random' software ID to separate replies from different devices. + + Cycle the HID++ 2.0 software ID from 0x2 to 0xF to separate + results and notifications. + """ + if not hasattr(_get_next_sw_id, "software_id"): + _get_next_sw_id.software_id = 0xF + + if _get_next_sw_id.software_id < 0xF: + _get_next_sw_id.software_id += 1 + else: + _get_next_sw_id.software_id = 2 + return _get_next_sw_id.software_id