From b19006104f035f63b20aa96cc54c99b04ec9abc3 Mon Sep 17 00:00:00 2001 From: "Peter F. Patel-Schneider" Date: Tue, 6 Oct 2020 08:09:54 -0400 Subject: [PATCH] device: support unknown USB-connected and Bluetooth devices --- lib/hidapi/udev.py | 83 +++++++++++++++---------------- lib/logitech_receiver/base.py | 43 ++++++++++++---- lib/logitech_receiver/base_usb.py | 29 +++++++++-- 3 files changed, 98 insertions(+), 57 deletions(-) diff --git a/lib/hidapi/udev.py b/lib/hidapi/udev.py index 05c18b7f..a2833499 100644 --- a/lib/hidapi/udev.py +++ b/lib/hidapi/udev.py @@ -85,15 +85,10 @@ def exit(): return True -# The filter is used to determine whether this is a device of interest to Solaar -def _match(action, device, filter): - vendor_id = filter.get('vendor_id') - product_id = filter.get('product_id') - interface_number = filter.get('usb_interface') - hid_driver = filter.get('hid_driver') - isDevice = filter.get('isDevice') - bus_id = filter.get('bus_id') - +# The filterfn is used to determine whether this is a device of interest to Solaar. +# It is given the bus id, vendor id, and product id and returns a dictionary +# with the required hid_driver and usb_interface and whether this is a receiver or device. +def _match(action, device, filterfn): hid_device = device.find_parent('hid') if not hid_device: return @@ -102,11 +97,14 @@ def _match(action, device, filter): return # there are reports that sometimes the id isn't set up right so be defensive bid, vid, pid = hid_id.split(':') - if not ((vendor_id is None or vendor_id == int(vid, 16)) and (product_id is None or product_id == int(pid, 16))): - return - if not (bus_id is None or bus_id == int(bid, 16)): + filter = filterfn(int(bid, 16), int(vid, 16), int(pid, 16)) + if not filter: return + hid_driver = filter.get('hid_driver') + interface_number = filter.get('usb_interface') + isDevice = filter.get('isDevice') + if action == 'add': hid_driver_name = hid_device.get('DRIVER') # print ("** found hid", action, device, "hid:", hid_device, hid_driver_name) @@ -119,26 +117,23 @@ def _match(action, device, filter): intf_device = device.find_parent('usb', 'usb_interface') # print ("*** usb interface", action, device, "usb_interface:", intf_device) - if interface_number is None: - usb_interface = None if intf_device is None else intf_device.attributes.asint('bInterfaceNumber') - else: - usb_interface = None if intf_device is None else intf_device.attributes.asint('bInterfaceNumber') - if usb_interface is None or interface_number != usb_interface: - return - + usb_interface = None if intf_device is None else intf_device.attributes.asint('bInterfaceNumber') + if not (interface_number is None or interface_number == usb_interface): + return attrs = intf_device.attributes if intf_device else None + d_info = DeviceInfo( path=device.device_node, + bus_id=int(bid, 16), vendor_id=vid[-4:], product_id=pid[-4:], + driver=hid_driver_name, + interface=usb_interface, + isDevice=isDevice, serial=hid_device.get('HID_UNIQ'), release=attrs.get('bcdDevice') if attrs else None, manufacturer=attrs.get('manufacturer') if attrs else None, - product=attrs.get('product') if attrs else None, - interface=usb_interface, - driver=hid_driver_name, - bus_id=bus_id, - isDevice=isDevice + product=attrs.get('product') if attrs else None ) return d_info @@ -147,16 +142,16 @@ def _match(action, device, filter): d_info = DeviceInfo( path=device.device_node, + bus_id=None, vendor_id=vid[-4:], product_id=pid[-4:], + driver=None, + interface=None, + isDevice=isDevice, serial=None, release=None, manufacturer=None, - product=None, - interface=None, - driver=None, - bus_id=None, - isDevice=isDevice + product=None ) return d_info @@ -203,7 +198,7 @@ def find_paired_node_wpid(receiver_path, index): return None -def monitor_glib(callback, *device_filters): +def monitor_glib(callback, filterfn): from gi.repository import GLib c = _Context() @@ -220,18 +215,16 @@ def monitor_glib(callback, *device_filters): m = _Monitor.from_netlink(c) m.filter_by(subsystem='hidraw') - def _process_udev_event(monitor, condition, cb, filters): + def _process_udev_event(monitor, condition, cb, filterfn): if condition == GLib.IO_IN: event = monitor.receive_device() if event: action, device = event # print ("***", action, device) if action == 'add': - for filter in filters: - d_info = _match(action, device, filter) - if d_info: - GLib.idle_add(cb, action, d_info) - break + d_info = _match(action, device, filterfn) + if d_info: + GLib.idle_add(cb, action, d_info) elif action == 'remove': # the GLib notification does _not_ match! pass @@ -239,21 +232,21 @@ def monitor_glib(callback, *device_filters): try: # io_add_watch_full may not be available... - GLib.io_add_watch_full(m, GLib.PRIORITY_LOW, GLib.IO_IN, _process_udev_event, callback, device_filters) + GLib.io_add_watch_full(m, GLib.PRIORITY_LOW, GLib.IO_IN, _process_udev_event, callback, filterfn) # print ("did io_add_watch_full") except AttributeError: try: # and the priority parameter appeared later in the API - GLib.io_add_watch(m, GLib.PRIORITY_LOW, GLib.IO_IN, _process_udev_event, callback, device_filters) + GLib.io_add_watch(m, GLib.PRIORITY_LOW, GLib.IO_IN, _process_udev_event, callback, filterfn) # print ("did io_add_watch with priority") except Exception: - GLib.io_add_watch(m, GLib.IO_IN, _process_udev_event, callback, device_filters) + GLib.io_add_watch(m, GLib.IO_IN, _process_udev_event, callback, filterfn) # print ("did io_add_watch") m.start() -def enumerate(usb_id): +def enumerate(filterfn): """Enumerate the HID Devices. List all the HID devices attached to the system, optionally filtering by @@ -263,7 +256,7 @@ def enumerate(usb_id): """ for dev in _Context().list_devices(subsystem='hidraw'): - dev_info = _match('add', dev, usb_id) + dev_info = _match('add', dev, filterfn) if dev_info: yield dev_info @@ -275,7 +268,10 @@ def open(vendor_id, product_id, serial=None): :returns: an opaque device handle, or ``None``. """ - for device in enumerate(vendor_id, product_id): + def matchfn(bid, vid, pid): + return vid == vendor_id and pid == product_id + + for device in enumerate(matchfn): if serial is None or serial == device.serial: return open_path(device.path) @@ -283,8 +279,7 @@ def open(vendor_id, product_id, serial=None): def open_path(device_path): """Open a HID device by its path name. - :param device_path: the path of a ``DeviceInfo`` tuple returned by - enumerate(). + :param device_path: the path of a ``DeviceInfo`` tuple returned by enumerate(). :returns: an opaque device handle, or ``None``. """ diff --git a/lib/logitech_receiver/base.py b/lib/logitech_receiver/base.py index a6d5f7cd..b902615a 100644 --- a/lib/logitech_receiver/base.py +++ b/lib/logitech_receiver/base.py @@ -33,7 +33,8 @@ import hidapi as _hid from . import hidpp10 as _hidpp10 from . import hidpp20 as _hidpp20 from .base_usb import ALL as _RECEIVER_USB_IDS -from .base_usb import WIRED_DEVICES as _WIRED_DEVICE_IDS +from .base_usb import DEVICES as _DEVICE_IDS +from .base_usb import other_device_check as _other_device_check from .common import KwException as _KwException from .common import pack as _pack from .common import strhex as _strhex @@ -89,22 +90,46 @@ class DeviceUnreachable(_KwException): # +def match(record, bus_id, vendor_id, product_id): + return ((record.get('bus_id') is None or record.get('bus_id') == bus_id) + and (record.get('vendor_id') is None or record.get('vendor_id') == vendor_id) + and (record.get('product_id') is None or record.get('product_id') == product_id)) + + +def filter_receivers(bus_id, vendor_id, product_id): + """Check that this product is a Logitech receiver and if so return the receiver record for further checking""" + for record in _RECEIVER_USB_IDS: # known receivers + if match(record, bus_id, vendor_id, product_id): + return record + + def receivers(): - """List all the Linux devices exposed by the UR attached to the machine.""" - for receiver_usb_id in _RECEIVER_USB_IDS: - for d in _hid.enumerate(receiver_usb_id): - yield d + """Enumerate all the receivers attached to the machine.""" + for dev in _hid.enumerate(filter_receivers): + yield dev + + +def filter_devices(bus_id, vendor_id, product_id): + """Check that this product is of interest and if so return the device record for further checking""" + for record in _DEVICE_IDS: # known devices + if match(record, bus_id, vendor_id, product_id): + return record + return _other_device_check(bus_id, vendor_id, product_id) # USB and BT devices unknown to Solaar def wired_devices(): - for device_usb_id in _WIRED_DEVICE_IDS: - for dev in _hid.enumerate(device_usb_id): - yield dev + """Enumerate all the USB-connected and Bluetooth devices attached to the machine.""" + for dev in _hid.enumerate(filter_devices): + yield dev + + +def filter_either(bus_id, vendor_id, product_id): + return filter_receivers(bus_id, vendor_id, product_id) or filter_devices(bus_id, vendor_id, product_id) def notify_on_receivers_glib(callback): """Watch for matching devices and notifies the callback on the GLib thread.""" - _hid.monitor_glib(callback, *_RECEIVER_USB_IDS, *_WIRED_DEVICE_IDS) + return _hid.monitor_glib(callback, filter_either) # diff --git a/lib/logitech_receiver/base_usb.py b/lib/logitech_receiver/base_usb.py index fee5f3cb..86550ab6 100644 --- a/lib/logitech_receiver/base_usb.py +++ b/lib/logitech_receiver/base_usb.py @@ -17,6 +17,13 @@ ## with this program; if not, write to the Free Software Foundation, Inc., ## 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. +## According to Logitech, they use the following product IDs (as of September 2020) +## USB product IDs for receivers: 0xC526 - 0xC5xx +## Wireless PIDs for hidpp10 devices: 0x2006 - 0x2019 +## Wireless PIDs for hidpp20 devices: 0x4002 - 0x4097, 0x4101 - 0x4102 +## USB product IDs for hidpp20 devices: 0xC07D - 0xC093, 0xC32B - 0xC344 +## Bluetooth product IDs (for hidpp20 devices): 0xB012 - 0xB0xx, 0xB32A - 0xB3xx + # USB ids of Logitech wireless receivers. # Only receivers supporting the HID++ protocol can go in here. @@ -166,15 +173,26 @@ _wired_device = lambda product_id, interface: { _bt_device = lambda product_id: {'vendor_id': 0x046d, 'product_id': product_id, 'bus_id': 0x5, 'isDevice': True} -WIRED_DEVICES = [] +DEVICES = [] for _ignore, d in _DEVICES.items(): if d.usbid: - WIRED_DEVICES.append(_wired_device(d.usbid, d.interface if d.interface else 2)) + DEVICES.append(_wired_device(d.usbid, d.interface if d.interface else 2)) if d.btid: - WIRED_DEVICES.append(_bt_device(d.btid)) + DEVICES.append(_bt_device(d.btid)) -del _DRIVER, _unifying_receiver, _nano_receiver, _lenovo_receiver, _lightspeed_receiver, _wired_device, _bt_device + +def other_device_check(bus_id, vendor_id, product_id): + """Check whether product is a Logitech USB-connected or Bluetooth device based on bus, vendor, and product IDs + This allows Solaar to support receiverless HID++ 2.0 devices that it knows nothing about""" + if vendor_id != 0x46d: # Logitech + return + if bus_id == 0x3: # USB + if (product_id >= 0xC07D and product_id <= 0xC093 or product_id >= 0xC32B and product_id <= 0xC344): + return _wired_device(product_id, 2) + elif bus_id == 0x5: # Bluetooth + if (product_id >= 0xB012 and product_id <= 0xB0FF or product_id >= 0xB32A and product_id <= 0xB3FF): + return _bt_device(product_id) def product_information(usb_id): @@ -184,3 +202,6 @@ def product_information(usb_id): if usb_id == r.get('product_id'): return r return {} + + +del _DRIVER, _unifying_receiver, _nano_receiver, _lenovo_receiver, _lightspeed_receiver