quicker detection of matching receivers in udev

This commit is contained in:
Daniel Pavel 2013-05-02 11:11:53 +02:00
parent 8fc45e5590
commit 0f80901bce
2 changed files with 78 additions and 63 deletions

View File

@ -57,66 +57,90 @@ def exit():
return True
def _match(device, hid_dev, vendor_id=None, product_id=None, interface_number=None, driver=None):
assert 'HID_ID' in hid_dev
bus, vid, pid = hid_dev['HID_ID'].split(':')
driver_name = hid_dev['DRIVER']
def _match(action, device, vendor_id=None, product_id=None, interface_number=None, driver=None):
usb_device = device.find_parent('usb', 'usb_device')
if not usb_device:
return
if ((vendor_id is None or vendor_id == int(vid, 16)) and
(product_id is None or product_id == int(pid, 16)) and
(driver is None or driver == driver_name)):
vid = usb_device['ID_VENDOR_ID']
pid = usb_device['ID_MODEL_ID']
if not ((vendor_id is None or vendor_id == int(vid, 16)) and
(product_id is None or product_id == int(pid, 16))):
return
if bus == '0003': # USB
intf_dev = device.find_parent('usb', 'usb_interface')
if intf_dev:
interface = intf_dev.attributes.asint('bInterfaceNumber')
if interface_number is None or interface_number == interface:
usb_dev = device.find_parent('usb', 'usb_device')
assert usb_dev
attrs = usb_dev.attributes
d_info = DeviceInfo(path=device.device_node,
vendor_id=vid[-4:],
product_id=pid[-4:],
serial=hid_dev.get('HID_UNIQ'),
release=attrs['bcdDevice'],
manufacturer=attrs['manufacturer'],
product=attrs['product'],
interface=interface,
driver=driver_name)
return d_info
if action == 'add':
hid_device = device.find_parent('hid')
if not hid_device:
return
hid_driver_name = hid_device['DRIVER']
if driver is not None and driver != hid_driver_name:
return
elif bus == '0005': # BLUETOOTH
# TODO
pass
intf_device = device.find_parent('usb', 'usb_interface')
if interface_number is None:
usb_interface = None if intf_device is None else intf_device.attributes.asint('bInterfaceNumber')
else:
usb_interface = None if intf_device is None else intf_device.attributes.asint('bInterfaceNumber')
if usb_interface is None or interface_number != usb_interface:
return
attrs = usb_device.attributes
d_info = DeviceInfo(path=device.device_node,
vendor_id=vid[-4:],
product_id=pid[-4:],
serial=hid_device.get('HID_UNIQ'),
release=attrs['bcdDevice'],
manufacturer=attrs['manufacturer'],
product=attrs['product'],
interface=usb_interface,
driver=hid_driver_name)
return d_info
elif action == 'remove':
# print (dict(device), dict(usb_device))
d_info = DeviceInfo(path=device.device_node,
vendor_id=vid[-4:],
product_id=pid[-4:],
serial=None,
release=None,
manufacturer=None,
product=None,
interface=None,
driver=None)
return d_info
def monitor_async(callback, *device_filters):
from threading import Thread as _Thread
t = _Thread(name='monitor_async_' + callback.__name__,
target=monitor,
args=[callback] + list(device_filters))
t.daemon = True
t.start()
def monitor(callback, *device_filters):
m = _Monitor.from_netlink(_Context())
c = _Context()
for device in c.list_devices(subsystem='hidraw'):
# print (device, dict(device), dict(device.attributes))
for filter in device_filters:
d_info = _match('add', device, *filter)
if d_info:
callback('add', d_info)
break
m = _Monitor.from_netlink(c)
m.filter_by(subsystem='hidraw')
for action, device in m:
if action == 'add':
hid_dev = device.find_parent(subsystem='hid')
# print ("****", action, device, hid_dev)
if hid_dev:
for filter in device_filters:
d_info = _match(device, hid_dev, *filter)
if d_info:
callback(action, d_info)
break
elif action == 'remove':
# this is ugly, but... well.
# signal the callback for each removed device; it will have figure
# out for itself if it's a device it should handle
d_info = DeviceInfo(path=device.device_node,
vendor_id=None,
product_id=None,
serial=None,
release=None,
manufacturer=None,
product=None,
interface=None,
driver=None)
callback(action, d_info)
# print ('----', action, device)
if action in ('add', 'remove'):
for filter in device_filters:
d_info = _match(action, device, *filter)
if d_info:
callback(action, d_info)
break
def enumerate(vendor_id=None, product_id=None, interface_number=None, driver=None):

View File

@ -76,20 +76,11 @@ def receivers():
def notify_on_receivers(callback):
"""Starts a thread that monitors receiver events from udev."""
from threading import Thread as _Thread
t = _Thread(name='receivers_monitor', target=_hid.monitor,
args=(callback,
_hid.monitor_async(callback,
DEVICE_UNIFYING_RECEIVER,
DEVICE_UNIFYING_RECEIVER_2,
# DEVICE_NANO_RECEIVER,
))
t.daemon = True
t.start()
# the HID monitor will only send events when devices are added/removed,
# so we need to trigger the callback for all currently detected receivers
for r in receivers():
callback('add', r)
)
def open_path(path):