base_usb: Add external interface
Clean up, type hint and tests base_usb and related modules.
This commit is contained in:
		
							parent
							
								
									a75c4b9679
								
							
						
					
					
						commit
						8d0672ac3c
					
				|  | @ -78,7 +78,7 @@ def exit(): | |||
| # The filterfn is used to determine whether this is a device of interest to Solaar. | ||||
| # It is given the bus id, vendor id, and product id and returns a dictionary | ||||
| # with the required hid_driver and usb_interface and whether this is a receiver or device. | ||||
| def _match(action, device, filterfn): | ||||
| def _match(action, device, filter_func: typing.Callable[[int, int, int, bool, bool], dict[str, typing.Any]]): | ||||
|     if logger.isEnabledFor(logging.DEBUG): | ||||
|         logger.debug(f"Dbus event {action} {device}") | ||||
|     hid_device = device.find_parent("hid") | ||||
|  | @ -113,11 +113,11 @@ def _match(action, device, filterfn): | |||
|             "Report Descriptor not processed for DEVICE %s BID %s VID %s PID %s: %s", device.device_node, bid, vid, pid, e | ||||
|         ) | ||||
| 
 | ||||
|     filter = filterfn(int(bid, 16), int(vid, 16), int(pid, 16), hidpp_short, hidpp_long) | ||||
|     if not filter: | ||||
|     filtered_result = filter_func(int(bid, 16), int(vid, 16), int(pid, 16), hidpp_short, hidpp_long) | ||||
|     if not filtered_result: | ||||
|         return | ||||
|     interface_number = filter.get("usb_interface") | ||||
|     isDevice = filter.get("isDevice") | ||||
|     interface_number = filtered_result.get("usb_interface") | ||||
|     isDevice = filtered_result.get("isDevice") | ||||
| 
 | ||||
|     if action == "add": | ||||
|         hid_driver_name = hid_device.properties.get("DRIVER") | ||||
|  | @ -260,7 +260,7 @@ def monitor_glib(glib: GLib, callback, filterfn): | |||
|     m.start() | ||||
| 
 | ||||
| 
 | ||||
| def enumerate(filterfn): | ||||
| def enumerate(filter_func: typing.Callable[[int, int, int, bool, bool], dict[str, typing.Any]]): | ||||
|     """Enumerate the HID Devices. | ||||
| 
 | ||||
|     List all the HID devices attached to the system, optionally filtering by | ||||
|  | @ -272,7 +272,7 @@ def enumerate(filterfn): | |||
|     if logger.isEnabledFor(logging.DEBUG): | ||||
|         logger.debug("Starting dbus enumeration") | ||||
|     for dev in pyudev.Context().list_devices(subsystem="hidraw"): | ||||
|         dev_info = _match("add", dev, filterfn) | ||||
|         dev_info = _match("add", dev, filter_func) | ||||
|         if dev_info: | ||||
|             yield dev_info | ||||
| 
 | ||||
|  |  | |||
|  | @ -94,26 +94,23 @@ for _ignore, d in descriptors.DEVICES.items(): | |||
|         KNOWN_DEVICE_IDS.append(_bluetooth_device(d.btid)) | ||||
| 
 | ||||
| 
 | ||||
| def other_device_check(bus_id: int, vendor_id: int, product_id: int): | ||||
| def other_device_check(bus_id: int, vendor_id: int, product_id: int) -> dict[str, Any] | None: | ||||
|     """Check whether product is a Logitech USB-connected or Bluetooth device based on bus, vendor, and product IDs | ||||
|     This allows Solaar to support receiverless HID++ 2.0 devices that it knows nothing about""" | ||||
|     if vendor_id != LOGITECH_VENDOR_ID: | ||||
|         return | ||||
| 
 | ||||
|     if bus_id == BusID.USB: | ||||
|         if product_id >= 0xC07D and product_id <= 0xC094 or product_id >= 0xC32B and product_id <= 0xC344: | ||||
|             return _usb_device(product_id, 2) | ||||
|     elif bus_id == BusID.BLUETOOTH: | ||||
|         if product_id >= 0xB012 and product_id <= 0xB0FF or product_id >= 0xB317 and product_id <= 0xB3FF: | ||||
|             return _bluetooth_device(product_id) | ||||
|     device_info = None | ||||
|     if bus_id == BusID.USB and (0xC07D <= product_id <= 0xC094 or 0xC32B <= product_id <= 0xC344): | ||||
|         device_info = _usb_device(product_id, 2) | ||||
|     elif bus_id == BusID.BLUETOOTH and (0xB012 <= product_id <= 0xB0FF or 0xB317 <= product_id <= 0xB3FF): | ||||
|         device_info = _bluetooth_device(product_id) | ||||
|     return device_info | ||||
| 
 | ||||
| 
 | ||||
| def product_information(usb_id: int) -> dict[str, Any]: | ||||
|     """Returns hardcoded information from USB receiver.""" | ||||
|     for receiver in base_usb.KNOWN_RECEIVER: | ||||
|         if usb_id == receiver.get("product_id"): | ||||
|             return receiver | ||||
|     raise ValueError(f"Unknown receiver type: 0x{usb_id:02X}") | ||||
|     return base_usb.get_receiver_info(usb_id) | ||||
| 
 | ||||
| 
 | ||||
| _SHORT_MESSAGE_SIZE = 7 | ||||
|  | @ -142,7 +139,7 @@ _DEVICE_REQUEST_TIMEOUT = DEFAULT_TIMEOUT | |||
| _PING_TIMEOUT = DEFAULT_TIMEOUT | ||||
| 
 | ||||
| 
 | ||||
| def match(record, bus_id, vendor_id, product_id): | ||||
| def _match(record: dict[str, Any], bus_id: int, vendor_id: int, product_id: int): | ||||
|     return ( | ||||
|         (record.get("bus_id") is None or record.get("bus_id") == bus_id) | ||||
|         and (record.get("vendor_id") is None or record.get("vendor_id") == vendor_id) | ||||
|  | @ -150,14 +147,22 @@ def match(record, bus_id, vendor_id, product_id): | |||
|     ) | ||||
| 
 | ||||
| 
 | ||||
| def filter_receivers(bus_id: int, vendor_id: int, product_id: int, hidpp_short=False, hidpp_long=False): | ||||
|     """Check that this product is a Logitech receiver | ||||
| def filter_receivers( | ||||
|     bus_id: int, vendor_id: int, product_id: int, hidpp_short: bool = False, hidpp_long: bool = False | ||||
| ) -> dict[str, Any]: | ||||
|     """Check that this product is a Logitech receiver. | ||||
| 
 | ||||
|     Filters based on bus_id, vendor_id and product_id. | ||||
| 
 | ||||
|     If so return the receiver record for further checking. | ||||
|     """ | ||||
|     for record in base_usb.KNOWN_RECEIVER:  # known receivers | ||||
|         if match(record, bus_id, vendor_id, product_id): | ||||
|     try: | ||||
|         record = base_usb.get_receiver_info(product_id) | ||||
|         if _match(record, bus_id, vendor_id, product_id): | ||||
|             return record | ||||
|     except ValueError: | ||||
|         pass | ||||
| 
 | ||||
|     if vendor_id == LOGITECH_VENDOR_ID and 0xC500 <= product_id <= 0xC5FF:  # unknown receiver | ||||
|         return {"vendor_id": vendor_id, "product_id": product_id, "bus_id": bus_id, "isDevice": False} | ||||
| 
 | ||||
|  | @ -167,13 +172,14 @@ def receivers(): | |||
|     yield from hidapi.enumerate(filter_receivers) | ||||
| 
 | ||||
| 
 | ||||
| def filter(bus_id, vendor_id, product_id, hidpp_short=False, hidpp_long=False): | ||||
| def filter(bus_id: int, vendor_id: int, product_id: int, hidpp_short: bool = False, hidpp_long: bool = False): | ||||
|     """Check that this product is of interest and if so return the device record for further checking""" | ||||
|     record = filter_receivers(bus_id, vendor_id, product_id, hidpp_short, hidpp_long) | ||||
|     if record:  # known or unknown receiver | ||||
|         return record | ||||
| 
 | ||||
|     for record in KNOWN_DEVICE_IDS: | ||||
|         if match(record, bus_id, vendor_id, product_id): | ||||
|         if _match(record, bus_id, vendor_id, product_id): | ||||
|             return record | ||||
|     if hidpp_short or hidpp_long:  # unknown devices that use HID++ | ||||
|         return {"vendor_id": vendor_id, "product_id": product_id, "bus_id": bus_id, "isDevice": True} | ||||
|  |  | |||
|  | @ -14,20 +14,21 @@ | |||
| ## with this program; if not, write to the Free Software Foundation, Inc., | ||||
| ## 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. | ||||
| 
 | ||||
| ## According to Logitech, they use the following product IDs (as of September 2020) | ||||
| ## USB product IDs for receivers: 0xC526 - 0xC5xx | ||||
| ## Wireless PIDs for hidpp10 devices: 0x2006 - 0x2019 | ||||
| ## Wireless PIDs for hidpp20 devices: 0x4002 - 0x4097, 0x4101 - 0x4102 | ||||
| ## USB product IDs for hidpp20 devices: 0xC07D - 0xC094, 0xC32B - 0xC344 | ||||
| ## Bluetooth product IDs (for hidpp20 devices): 0xB012 - 0xB0xx, 0xB32A - 0xB3xx | ||||
| """Collection of known Logitech product IDs. | ||||
| 
 | ||||
| # USB ids of Logitech wireless receivers. | ||||
| # Only receivers supporting the HID++ protocol can go in here. | ||||
| According to Logitech, they use the following product IDs (as of September 2020) | ||||
| USB product IDs for receivers: 0xC526 - 0xC5xx | ||||
| Wireless PIDs for hidpp10 devices: 0x2006 - 0x2019 | ||||
| Wireless PIDs for hidpp20 devices: 0x4002 - 0x4097, 0x4101 - 0x4102 | ||||
| USB product IDs for hidpp20 devices: 0xC07D - 0xC094, 0xC32B - 0xC344 | ||||
| Bluetooth product IDs (for hidpp20 devices): 0xB012 - 0xB0xx, 0xB32A - 0xB3xx | ||||
| 
 | ||||
| USB ids of Logitech wireless receivers. | ||||
| Only receivers supporting the HID++ protocol can go in here. | ||||
| """ | ||||
| 
 | ||||
| from solaar.i18n import _ | ||||
| 
 | ||||
| from logitech_receiver.common import LOGITECH_VENDOR_ID | ||||
| 
 | ||||
| # max_devices is only used for receivers that do not support reading from Registers.RECEIVER_INFO offset 0x03, default | ||||
| # to 1. | ||||
| # may_unpair is only used for receivers that do not support reading from Registers.RECEIVER_INFO offset 0x03, | ||||
|  | @ -36,8 +37,10 @@ from logitech_receiver.common import LOGITECH_VENDOR_ID | |||
| ## should this last be changed so that may_unpair is used for all receivers? writing to Registers.RECEIVER_PAIRING | ||||
| ## doesn't seem right | ||||
| 
 | ||||
| LOGITECH_VENDOR_ID = 0x046D | ||||
| 
 | ||||
| def _bolt_receiver(product_id): | ||||
| 
 | ||||
| def _bolt_receiver(product_id: int) -> dict: | ||||
|     return { | ||||
|         "vendor_id": LOGITECH_VENDOR_ID, | ||||
|         "product_id": product_id, | ||||
|  | @ -49,7 +52,7 @@ def _bolt_receiver(product_id): | |||
|     } | ||||
| 
 | ||||
| 
 | ||||
| def _unifying_receiver(product_id): | ||||
| def _unifying_receiver(product_id: int) -> dict: | ||||
|     return { | ||||
|         "vendor_id": LOGITECH_VENDOR_ID, | ||||
|         "product_id": product_id, | ||||
|  | @ -60,7 +63,7 @@ def _unifying_receiver(product_id): | |||
|     } | ||||
| 
 | ||||
| 
 | ||||
| def _nano_receiver(product_id): | ||||
| def _nano_receiver(product_id: int) -> dict: | ||||
|     return { | ||||
|         "vendor_id": LOGITECH_VENDOR_ID, | ||||
|         "product_id": product_id, | ||||
|  | @ -72,7 +75,7 @@ def _nano_receiver(product_id): | |||
|     } | ||||
| 
 | ||||
| 
 | ||||
| def _nano_receiver_no_unpair(product_id): | ||||
| def _nano_receiver_no_unpair(product_id: int) -> dict: | ||||
|     return { | ||||
|         "vendor_id": LOGITECH_VENDOR_ID, | ||||
|         "product_id": product_id, | ||||
|  | @ -85,7 +88,7 @@ def _nano_receiver_no_unpair(product_id): | |||
|     } | ||||
| 
 | ||||
| 
 | ||||
| def _nano_receiver_max2(product_id): | ||||
| def _nano_receiver_max2(product_id: int) -> dict: | ||||
|     return { | ||||
|         "vendor_id": LOGITECH_VENDOR_ID, | ||||
|         "product_id": product_id, | ||||
|  | @ -98,20 +101,7 @@ def _nano_receiver_max2(product_id): | |||
|     } | ||||
| 
 | ||||
| 
 | ||||
| def _nano_receiver_maxn(product_id, max): | ||||
|     return { | ||||
|         "vendor_id": LOGITECH_VENDOR_ID, | ||||
|         "product_id": product_id, | ||||
|         "usb_interface": 1, | ||||
|         "name": _("Nano Receiver"), | ||||
|         "receiver_kind": "nano", | ||||
|         "max_devices": max, | ||||
|         "may_unpair": False, | ||||
|         "re_pairs": True, | ||||
|     } | ||||
| 
 | ||||
| 
 | ||||
| def _lenovo_receiver(product_id): | ||||
| def _lenovo_receiver(product_id: int) -> dict: | ||||
|     return { | ||||
|         "vendor_id": 6127, | ||||
|         "product_id": product_id, | ||||
|  | @ -122,7 +112,7 @@ def _lenovo_receiver(product_id): | |||
|     } | ||||
| 
 | ||||
| 
 | ||||
| def _lightspeed_receiver(product_id): | ||||
| def _lightspeed_receiver(product_id: int) -> dict: | ||||
|     return { | ||||
|         "vendor_id": LOGITECH_VENDOR_ID, | ||||
|         "product_id": product_id, | ||||
|  | @ -133,7 +123,7 @@ def _lightspeed_receiver(product_id): | |||
|     } | ||||
| 
 | ||||
| 
 | ||||
| def _ex100_receiver(product_id): | ||||
| def _ex100_receiver(product_id: int) -> dict: | ||||
|     return { | ||||
|         "vendor_id": LOGITECH_VENDOR_ID, | ||||
|         "product_id": product_id, | ||||
|  | @ -147,7 +137,7 @@ def _ex100_receiver(product_id): | |||
| 
 | ||||
| 
 | ||||
| # Receivers added here should also be listed in | ||||
| # share/solaar/io.github.pwr_solaar.solaar.metainfo.xml | ||||
| # share/solaar/io.github.pwr_solaar.solaar.meta-info.xml | ||||
| # Look in https://github.com/torvalds/linux/blob/master/drivers/hid/hid-ids.h | ||||
| 
 | ||||
| # Bolt receivers (marked with the yellow lightning bolt logo) | ||||
|  | @ -184,7 +174,7 @@ LIGHTSPEED_RECEIVER_C547 = _lightspeed_receiver(0xC547) | |||
| # EX100 old style receiver pre-unifying protocol | ||||
| EX100_27MHZ_RECEIVER_C517 = _ex100_receiver(0xC517) | ||||
| 
 | ||||
| KNOWN_RECEIVER = ( | ||||
| KNOWN_RECEIVERS = ( | ||||
|     BOLT_RECEIVER_C548, | ||||
|     UNIFYING_RECEIVER_C52B, | ||||
|     UNIFYING_RECEIVER_C532, | ||||
|  | @ -210,3 +200,23 @@ KNOWN_RECEIVER = ( | |||
|     LIGHTSPEED_RECEIVER_C547, | ||||
|     EX100_27MHZ_RECEIVER_C517, | ||||
| ) | ||||
| 
 | ||||
| 
 | ||||
| def get_receiver_info(product_id: int) -> dict: | ||||
|     """Returns hardcoded information about Logitech receiver. | ||||
| 
 | ||||
|     Parameters | ||||
|     ---------- | ||||
|     product_id | ||||
|         Product ID of receiver e.g. 0xC548 for a Logitech Bolt receiver. | ||||
| 
 | ||||
|     Returns | ||||
|     ------- | ||||
|     dict | ||||
|         Product info with mandatory vendor_id, product_id, | ||||
|         usb_interface, name, receiver_kind | ||||
|     """ | ||||
|     for receiver in KNOWN_RECEIVERS: | ||||
|         if product_id == receiver.get("product_id"): | ||||
|             return receiver | ||||
|     raise ValueError(f"Unknown product ID '0x{product_id:02X}") | ||||
|  |  | |||
|  | @ -24,6 +24,28 @@ def test_product_information(usb_id, expected_name, expected_receiver_kind): | |||
|         assert res["receiver_kind"] == expected_receiver_kind | ||||
| 
 | ||||
| 
 | ||||
| def test_filter_receivers_known(): | ||||
|     bus_id = 2 | ||||
|     vendor_id = 0x046D | ||||
|     product_id = 0xC548 | ||||
| 
 | ||||
|     receiver_info = base.filter_receivers(bus_id, vendor_id, product_id) | ||||
| 
 | ||||
|     assert receiver_info["name"] == "Bolt Receiver" | ||||
|     assert receiver_info["receiver_kind"] == "bolt" | ||||
| 
 | ||||
| 
 | ||||
| def test_filter_receivers_unknown(): | ||||
|     bus_id = 1 | ||||
|     vendor_id = 0x046D | ||||
|     product_id = 0xC500 | ||||
| 
 | ||||
|     receiver_info = base.filter_receivers(bus_id, vendor_id, product_id) | ||||
| 
 | ||||
|     assert receiver_info["bus_id"] == bus_id | ||||
|     assert receiver_info["product_id"] == product_id | ||||
| 
 | ||||
| 
 | ||||
| def test_get_next_sw_id(): | ||||
|     res1 = base._get_next_sw_id() | ||||
|     res2 = base._get_next_sw_id() | ||||
|  |  | |||
		Loading…
	
		Reference in New Issue