device: support unknown USB-connected and Bluetooth devices

This commit is contained in:
Peter F. Patel-Schneider 2020-10-06 08:09:54 -04:00
parent dba9d472a8
commit b19006104f
3 changed files with 98 additions and 57 deletions

View File

@ -85,15 +85,10 @@ def exit():
return True
# The filter is used to determine whether this is a device of interest to Solaar
def _match(action, device, filter):
vendor_id = filter.get('vendor_id')
product_id = filter.get('product_id')
interface_number = filter.get('usb_interface')
hid_driver = filter.get('hid_driver')
isDevice = filter.get('isDevice')
bus_id = filter.get('bus_id')
# The filterfn is used to determine whether this is a device of interest to Solaar.
# It is given the bus id, vendor id, and product id and returns a dictionary
# with the required hid_driver and usb_interface and whether this is a receiver or device.
def _match(action, device, filterfn):
hid_device = device.find_parent('hid')
if not hid_device:
return
@ -102,11 +97,14 @@ def _match(action, device, filter):
return # there are reports that sometimes the id isn't set up right so be defensive
bid, vid, pid = hid_id.split(':')
if not ((vendor_id is None or vendor_id == int(vid, 16)) and (product_id is None or product_id == int(pid, 16))):
return
if not (bus_id is None or bus_id == int(bid, 16)):
filter = filterfn(int(bid, 16), int(vid, 16), int(pid, 16))
if not filter:
return
hid_driver = filter.get('hid_driver')
interface_number = filter.get('usb_interface')
isDevice = filter.get('isDevice')
if action == 'add':
hid_driver_name = hid_device.get('DRIVER')
# print ("** found hid", action, device, "hid:", hid_device, hid_driver_name)
@ -119,26 +117,23 @@ def _match(action, device, filter):
intf_device = device.find_parent('usb', 'usb_interface')
# print ("*** usb interface", action, device, "usb_interface:", intf_device)
if interface_number is None:
usb_interface = None if intf_device is None else intf_device.attributes.asint('bInterfaceNumber')
else:
usb_interface = None if intf_device is None else intf_device.attributes.asint('bInterfaceNumber')
if usb_interface is None or interface_number != usb_interface:
return
usb_interface = None if intf_device is None else intf_device.attributes.asint('bInterfaceNumber')
if not (interface_number is None or interface_number == usb_interface):
return
attrs = intf_device.attributes if intf_device else None
d_info = DeviceInfo(
path=device.device_node,
bus_id=int(bid, 16),
vendor_id=vid[-4:],
product_id=pid[-4:],
driver=hid_driver_name,
interface=usb_interface,
isDevice=isDevice,
serial=hid_device.get('HID_UNIQ'),
release=attrs.get('bcdDevice') if attrs else None,
manufacturer=attrs.get('manufacturer') if attrs else None,
product=attrs.get('product') if attrs else None,
interface=usb_interface,
driver=hid_driver_name,
bus_id=bus_id,
isDevice=isDevice
product=attrs.get('product') if attrs else None
)
return d_info
@ -147,16 +142,16 @@ def _match(action, device, filter):
d_info = DeviceInfo(
path=device.device_node,
bus_id=None,
vendor_id=vid[-4:],
product_id=pid[-4:],
driver=None,
interface=None,
isDevice=isDevice,
serial=None,
release=None,
manufacturer=None,
product=None,
interface=None,
driver=None,
bus_id=None,
isDevice=isDevice
product=None
)
return d_info
@ -203,7 +198,7 @@ def find_paired_node_wpid(receiver_path, index):
return None
def monitor_glib(callback, *device_filters):
def monitor_glib(callback, filterfn):
from gi.repository import GLib
c = _Context()
@ -220,18 +215,16 @@ def monitor_glib(callback, *device_filters):
m = _Monitor.from_netlink(c)
m.filter_by(subsystem='hidraw')
def _process_udev_event(monitor, condition, cb, filters):
def _process_udev_event(monitor, condition, cb, filterfn):
if condition == GLib.IO_IN:
event = monitor.receive_device()
if event:
action, device = event
# print ("***", action, device)
if action == 'add':
for filter in filters:
d_info = _match(action, device, filter)
if d_info:
GLib.idle_add(cb, action, d_info)
break
d_info = _match(action, device, filterfn)
if d_info:
GLib.idle_add(cb, action, d_info)
elif action == 'remove':
# the GLib notification does _not_ match!
pass
@ -239,21 +232,21 @@ def monitor_glib(callback, *device_filters):
try:
# io_add_watch_full may not be available...
GLib.io_add_watch_full(m, GLib.PRIORITY_LOW, GLib.IO_IN, _process_udev_event, callback, device_filters)
GLib.io_add_watch_full(m, GLib.PRIORITY_LOW, GLib.IO_IN, _process_udev_event, callback, filterfn)
# print ("did io_add_watch_full")
except AttributeError:
try:
# and the priority parameter appeared later in the API
GLib.io_add_watch(m, GLib.PRIORITY_LOW, GLib.IO_IN, _process_udev_event, callback, device_filters)
GLib.io_add_watch(m, GLib.PRIORITY_LOW, GLib.IO_IN, _process_udev_event, callback, filterfn)
# print ("did io_add_watch with priority")
except Exception:
GLib.io_add_watch(m, GLib.IO_IN, _process_udev_event, callback, device_filters)
GLib.io_add_watch(m, GLib.IO_IN, _process_udev_event, callback, filterfn)
# print ("did io_add_watch")
m.start()
def enumerate(usb_id):
def enumerate(filterfn):
"""Enumerate the HID Devices.
List all the HID devices attached to the system, optionally filtering by
@ -263,7 +256,7 @@ def enumerate(usb_id):
"""
for dev in _Context().list_devices(subsystem='hidraw'):
dev_info = _match('add', dev, usb_id)
dev_info = _match('add', dev, filterfn)
if dev_info:
yield dev_info
@ -275,7 +268,10 @@ def open(vendor_id, product_id, serial=None):
:returns: an opaque device handle, or ``None``.
"""
for device in enumerate(vendor_id, product_id):
def matchfn(bid, vid, pid):
return vid == vendor_id and pid == product_id
for device in enumerate(matchfn):
if serial is None or serial == device.serial:
return open_path(device.path)
@ -283,8 +279,7 @@ def open(vendor_id, product_id, serial=None):
def open_path(device_path):
"""Open a HID device by its path name.
:param device_path: the path of a ``DeviceInfo`` tuple returned by
enumerate().
:param device_path: the path of a ``DeviceInfo`` tuple returned by enumerate().
:returns: an opaque device handle, or ``None``.
"""

View File

@ -33,7 +33,8 @@ import hidapi as _hid
from . import hidpp10 as _hidpp10
from . import hidpp20 as _hidpp20
from .base_usb import ALL as _RECEIVER_USB_IDS
from .base_usb import WIRED_DEVICES as _WIRED_DEVICE_IDS
from .base_usb import DEVICES as _DEVICE_IDS
from .base_usb import other_device_check as _other_device_check
from .common import KwException as _KwException
from .common import pack as _pack
from .common import strhex as _strhex
@ -89,22 +90,46 @@ class DeviceUnreachable(_KwException):
#
def match(record, bus_id, vendor_id, product_id):
return ((record.get('bus_id') is None or record.get('bus_id') == bus_id)
and (record.get('vendor_id') is None or record.get('vendor_id') == vendor_id)
and (record.get('product_id') is None or record.get('product_id') == product_id))
def filter_receivers(bus_id, vendor_id, product_id):
"""Check that this product is a Logitech receiver and if so return the receiver record for further checking"""
for record in _RECEIVER_USB_IDS: # known receivers
if match(record, bus_id, vendor_id, product_id):
return record
def receivers():
"""List all the Linux devices exposed by the UR attached to the machine."""
for receiver_usb_id in _RECEIVER_USB_IDS:
for d in _hid.enumerate(receiver_usb_id):
yield d
"""Enumerate all the receivers attached to the machine."""
for dev in _hid.enumerate(filter_receivers):
yield dev
def filter_devices(bus_id, vendor_id, product_id):
"""Check that this product is of interest and if so return the device record for further checking"""
for record in _DEVICE_IDS: # known devices
if match(record, bus_id, vendor_id, product_id):
return record
return _other_device_check(bus_id, vendor_id, product_id) # USB and BT devices unknown to Solaar
def wired_devices():
for device_usb_id in _WIRED_DEVICE_IDS:
for dev in _hid.enumerate(device_usb_id):
yield dev
"""Enumerate all the USB-connected and Bluetooth devices attached to the machine."""
for dev in _hid.enumerate(filter_devices):
yield dev
def filter_either(bus_id, vendor_id, product_id):
return filter_receivers(bus_id, vendor_id, product_id) or filter_devices(bus_id, vendor_id, product_id)
def notify_on_receivers_glib(callback):
"""Watch for matching devices and notifies the callback on the GLib thread."""
_hid.monitor_glib(callback, *_RECEIVER_USB_IDS, *_WIRED_DEVICE_IDS)
return _hid.monitor_glib(callback, filter_either)
#

View File

@ -17,6 +17,13 @@
## with this program; if not, write to the Free Software Foundation, Inc.,
## 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
## According to Logitech, they use the following product IDs (as of September 2020)
## USB product IDs for receivers: 0xC526 - 0xC5xx
## Wireless PIDs for hidpp10 devices: 0x2006 - 0x2019
## Wireless PIDs for hidpp20 devices: 0x4002 - 0x4097, 0x4101 - 0x4102
## USB product IDs for hidpp20 devices: 0xC07D - 0xC093, 0xC32B - 0xC344
## Bluetooth product IDs (for hidpp20 devices): 0xB012 - 0xB0xx, 0xB32A - 0xB3xx
# USB ids of Logitech wireless receivers.
# Only receivers supporting the HID++ protocol can go in here.
@ -166,15 +173,26 @@ _wired_device = lambda product_id, interface: {
_bt_device = lambda product_id: {'vendor_id': 0x046d, 'product_id': product_id, 'bus_id': 0x5, 'isDevice': True}
WIRED_DEVICES = []
DEVICES = []
for _ignore, d in _DEVICES.items():
if d.usbid:
WIRED_DEVICES.append(_wired_device(d.usbid, d.interface if d.interface else 2))
DEVICES.append(_wired_device(d.usbid, d.interface if d.interface else 2))
if d.btid:
WIRED_DEVICES.append(_bt_device(d.btid))
DEVICES.append(_bt_device(d.btid))
del _DRIVER, _unifying_receiver, _nano_receiver, _lenovo_receiver, _lightspeed_receiver, _wired_device, _bt_device
def other_device_check(bus_id, vendor_id, product_id):
"""Check whether product is a Logitech USB-connected or Bluetooth device based on bus, vendor, and product IDs
This allows Solaar to support receiverless HID++ 2.0 devices that it knows nothing about"""
if vendor_id != 0x46d: # Logitech
return
if bus_id == 0x3: # USB
if (product_id >= 0xC07D and product_id <= 0xC093 or product_id >= 0xC32B and product_id <= 0xC344):
return _wired_device(product_id, 2)
elif bus_id == 0x5: # Bluetooth
if (product_id >= 0xB012 and product_id <= 0xB0FF or product_id >= 0xB32A and product_id <= 0xB3FF):
return _bt_device(product_id)
def product_information(usb_id):
@ -184,3 +202,6 @@ def product_information(usb_id):
if usb_id == r.get('product_id'):
return r
return {}
del _DRIVER, _unifying_receiver, _nano_receiver, _lenovo_receiver, _lightspeed_receiver