cleanup in the pyudev hidapi implementation, incremented app version

This commit is contained in:
Daniel Pavel 2012-11-03 07:49:10 +02:00
parent 39855408ed
commit d5f8a4bc45
8 changed files with 50 additions and 61 deletions

View File

@ -2,7 +2,7 @@
APPNAME = 'Solaar' APPNAME = 'Solaar'
__author__ = "Daniel Pavel <daniel.pavel@gmail.com>" __author__ = "Daniel Pavel <daniel.pavel@gmail.com>"
__version__ = '0.6' __version__ = '0.7'
__license__ = "GPL" __license__ = "GPL"
# #

View File

@ -1,7 +1,7 @@
# pass # pass
APPNAME = 'Solaar' APPNAME = 'Solaar'
APPVERSION = '0.6' APPVERSION = '0.7'
from . import (notify, status_icon, main_window, pair_window, action) from . import (notify, status_icon, main_window, pair_window, action)

View File

@ -21,16 +21,6 @@ def _toggle_action(name, label, function, *args):
action.connect('activate', function, *args) action.connect('activate', function, *args)
return action return action
def wrap_action(action, prefire):
def _wrap(aw, aa):
prefire(aa)
aa.activate()
wrapper = _action(action.get_name(), action.get_label(), None)
wrapper.set_icon_name(action.get_icon_name())
wrapper.connect('activate', _wrap, action)
return wrapper
# #
# #
# #

View File

@ -1,15 +1,8 @@
"""Generic Human Interface Device API.""" """Generic Human Interface Device API."""
from __future__ import absolute_import
__author__ = "Daniel Pavel" __author__ = "Daniel Pavel"
__license__ = "GPL" __license__ = "GPL"
__version__ = "0.3" __version__ = "0.4"
#
# This package exists in case a future pure-Python implementation is feasible.
#
try: try:
from hidapi.udev import * from hidapi.udev import *

View File

@ -74,7 +74,9 @@ DeviceInfo = namedtuple('DeviceInfo', [
'release', 'release',
'manufacturer', 'manufacturer',
'product', 'product',
'interface']) 'interface',
'driver',
])
del namedtuple del namedtuple
@ -88,7 +90,8 @@ def _makeDeviceInfo(native_device_info):
release=hex(native_device_info.release)[2:], release=hex(native_device_info.release)[2:],
manufacturer=native_device_info.manufacturer, manufacturer=native_device_info.manufacturer,
product=native_device_info.product, product=native_device_info.product,
interface=native_device_info.interface) interface=native_device_info.interface,
driver=None)
# #

View File

@ -4,7 +4,7 @@
# #
import os as _os import os as _os
import select as _select from select import select as _select
from pyudev import Context as _Context from pyudev import Context as _Context
from pyudev import Device as _Device from pyudev import Device as _Device
@ -22,7 +22,9 @@ DeviceInfo = namedtuple('DeviceInfo', [
'release', 'release',
'manufacturer', 'manufacturer',
'product', 'product',
'interface']) 'interface',
'driver',
])
del namedtuple del namedtuple
# #
@ -31,26 +33,19 @@ del namedtuple
# #
def init(): def init():
"""Initialize the HIDAPI library. """This function is a no-op, and exists only to match the native hidapi
implementation.
This function initializes the HIDAPI library. Calling it is not strictly :returns: ``True``.
necessary, as it will be called automatically by enumerate() and any of the
open_*() functions if it is needed. This function should be called at the
beginning of execution however, if there is a chance of HIDAPI handles
being opened by different threads simultaneously.
:returns: ``True`` if successful.
""" """
return True return True
def exit(): def exit():
"""Finalize the HIDAPI library. """This function is a no-op, and exists only to match the native hidapi
implementation.
This function frees all of the static data associated with HIDAPI. It should :returns: ``True``.
be called at the end of execution to avoid memory leaks.
:returns: ``True`` if successful.
""" """
return True return True
@ -65,9 +60,10 @@ def enumerate(vendor_id=None, product_id=None, interface_number=None):
""" """
for dev in _Context().list_devices(subsystem='hidraw'): for dev in _Context().list_devices(subsystem='hidraw'):
hid_dev = dev.find_parent('hid') hid_dev = dev.find_parent('hid')
if not hid_dev or 'HID_ID' not in hid_dev: if not hid_dev:
continue continue
assert 'HID_ID' in hid_dev
bus, vid, pid = hid_dev['HID_ID'].split(':') bus, vid, pid = hid_dev['HID_ID'].split(':')
if vendor_id is not None and vendor_id != int(vid, 16): if vendor_id is not None and vendor_id != int(vid, 16):
continue continue
@ -79,7 +75,6 @@ def enumerate(vendor_id=None, product_id=None, interface_number=None):
if not intf_dev: if not intf_dev:
continue continue
# interface = int(intf_dev.attributes['bInterfaceNumber'], 16)
interface = intf_dev.attributes.asint('bInterfaceNumber') interface = intf_dev.attributes.asint('bInterfaceNumber')
if interface_number is not None and interface_number != interface: if interface_number is not None and interface_number != interface:
continue continue
@ -87,19 +82,20 @@ def enumerate(vendor_id=None, product_id=None, interface_number=None):
serial = hid_dev['HID_UNIQ'] if 'HID_UNIQ' in hid_dev else None serial = hid_dev['HID_UNIQ'] if 'HID_UNIQ' in hid_dev else None
usb_dev = dev.find_parent('usb', 'usb_device') usb_dev = dev.find_parent('usb', 'usb_device')
if usb_dev: assert usb_dev
attrs = usb_dev.attributes attrs = usb_dev.attributes
devinfo = DeviceInfo(path=dev.device_node, d_info = DeviceInfo(path=dev.device_node,
vendor_id=vid[-4:], vendor_id=vid[-4:],
product_id=pid[-4:], product_id=pid[-4:],
serial=serial, serial=serial,
release=attrs['bcdDevice'], release=attrs['bcdDevice'],
manufacturer=attrs['manufacturer'], manufacturer=attrs['manufacturer'],
product=attrs['product'], product=attrs['product'],
interface=interface) interface=interface,
yield devinfo driver=hid_dev['DRIVER'])
yield d_info
if bus == '0005': # BLUETOOTH elif bus == '0005': # BLUETOOTH
# TODO # TODO
pass pass
@ -186,7 +182,7 @@ def read(device_handle, bytes_count, timeout_ms=-1):
""" """
try: try:
timeout = None if timeout_ms < 0 else timeout_ms / 1000.0 timeout = None if timeout_ms < 0 else timeout_ms / 1000.0
rlist, wlist, xlist = _select.select([device_handle], [], [], timeout) rlist, wlist, xlist = _select([device_handle], [], [], timeout)
if rlist: if rlist:
assert rlist == [device_handle] assert rlist == [device_handle]
return _os.read(device_handle, bytes_count) return _os.read(device_handle, bytes_count)
@ -243,13 +239,18 @@ def get_indexed_string(device_handle, index):
dev = _Device.from_device_number(_Context(), 'char', stat.st_rdev) dev = _Device.from_device_number(_Context(), 'char', stat.st_rdev)
if dev: if dev:
hid_dev = dev.find_parent('hid') hid_dev = dev.find_parent('hid')
if hid_dev and 'HID_ID' in hid_dev: if hid_dev:
assert 'HID_ID' in hid_dev
bus, _, _ = hid_dev['HID_ID'].split(':') bus, _, _ = hid_dev['HID_ID'].split(':')
if bus == '0003': # USB if bus == '0003': # USB
usb_dev = dev.find_parent('usb', 'usb_device') usb_dev = dev.find_parent('usb', 'usb_device')
if usb_dev: assert usb_dev
attrs = usb_dev.attributes
key = _DEVICE_STRINGS[index] key = _DEVICE_STRINGS[index]
attrs = usb_dev.attributes
if key in attrs: if key in attrs:
return attrs[key] return attrs[key]
elif bus == '0005': # BLUETOOTH
# TODO
pass

View File

@ -2,4 +2,4 @@
__author__ = "Daniel Pavel" __author__ = "Daniel Pavel"
__license__ = "GPL" __license__ = "GPL"
__version__ = "0.4" __version__ = "0.5"

View File

@ -75,7 +75,9 @@ def list_receiver_devices():
"""List all the Linux devices exposed by the UR attached to the machine.""" """List all the Linux devices exposed by the UR attached to the machine."""
# (Vendor ID, Product ID) = ('Logitech', 'Unifying Receiver') # (Vendor ID, Product ID) = ('Logitech', 'Unifying Receiver')
# interface 2 if the actual receiver interface # interface 2 if the actual receiver interface
return _hid.enumerate(0x046d, 0xc52b, 2) for d in _hid.enumerate(0x046d, 0xc52b, 2):
if d.driver is None or d.driver == 'logitech-djreceiver':
yield d
_COUNT_DEVICES_REQUEST = b'\x10\xFF\x81\x00\x00\x00\x00' _COUNT_DEVICES_REQUEST = b'\x10\xFF\x81\x00\x00\x00\x00'
@ -104,7 +106,7 @@ def try_open(path):
# if this is the right hidraw device, we'll receive a 'bad device' from the UR # if this is the right hidraw device, we'll receive a 'bad device' from the UR
# otherwise, the read should produce nothing # otherwise, the read should produce nothing
reply = _hid.read(receiver_handle, _MAX_REPLY_SIZE, DEFAULT_TIMEOUT) reply = _hid.read(receiver_handle, _MAX_REPLY_SIZE, DEFAULT_TIMEOUT / 2)
if reply: if reply:
if reply[:5] == _COUNT_DEVICES_REQUEST[:5]: if reply[:5] == _COUNT_DEVICES_REQUEST[:5]:
# 'device 0 unreachable' is the expected reply from a valid receiver handle # 'device 0 unreachable' is the expected reply from a valid receiver handle