From 39855408edc67b0664175c95464a8b807cb30beb Mon Sep 17 00:00:00 2001 From: Daniel Pavel Date: Fri, 2 Nov 2012 22:09:31 +0200 Subject: [PATCH] added pyudev-based implementation of hidapi --- lib/hidapi/__init__.py | 8 +- lib/hidapi/udev.py | 255 +++++++++++++++++++++++++++++++++++++++++ 2 files changed, 262 insertions(+), 1 deletion(-) create mode 100644 lib/hidapi/udev.py diff --git a/lib/hidapi/__init__.py b/lib/hidapi/__init__.py index 92f1b29f..a6f14ef7 100644 --- a/lib/hidapi/__init__.py +++ b/lib/hidapi/__init__.py @@ -1,5 +1,7 @@ """Generic Human Interface Device API.""" +from __future__ import absolute_import + __author__ = "Daniel Pavel" __license__ = "GPL" __version__ = "0.3" @@ -8,4 +10,8 @@ __version__ = "0.3" # This package exists in case a future pure-Python implementation is feasible. # -from .native import * + +try: + from hidapi.udev import * +except ImportError: + from hidapi.native import * diff --git a/lib/hidapi/udev.py b/lib/hidapi/udev.py new file mode 100644 index 00000000..b7db6fe7 --- /dev/null +++ b/lib/hidapi/udev.py @@ -0,0 +1,255 @@ +# +# Partial Python implementation of the native hidapi. +# Requires pyudev +# + +import os as _os +import select as _select +from pyudev import Context as _Context +from pyudev import Device as _Device + + +native_implementation = 'udev' + + +# the tuple object we'll expose when enumerating devices +from collections import namedtuple +DeviceInfo = namedtuple('DeviceInfo', [ + 'path', + 'vendor_id', + 'product_id', + 'serial', + 'release', + 'manufacturer', + 'product', + 'interface']) +del namedtuple + +# +# exposed API +# docstrings mostly copied from hidapi.h +# + +def init(): + """Initialize the HIDAPI library. + + This function initializes the HIDAPI library. Calling it is not strictly + necessary, as it will be called automatically by enumerate() and any of the + open_*() functions if it is needed. This function should be called at the + beginning of execution however, if there is a chance of HIDAPI handles + being opened by different threads simultaneously. + + :returns: ``True`` if successful. + """ + return True + + +def exit(): + """Finalize the HIDAPI library. + + This function frees all of the static data associated with HIDAPI. It should + be called at the end of execution to avoid memory leaks. + + :returns: ``True`` if successful. + """ + return True + + +def enumerate(vendor_id=None, product_id=None, interface_number=None): + """Enumerate the HID Devices. + + List all the HID devices attached to the system, optionally filtering by + vendor_id, product_id, and/or interface_number. + + :returns: a list of matching ``DeviceInfo`` tuples. + """ + for dev in _Context().list_devices(subsystem='hidraw'): + hid_dev = dev.find_parent('hid') + if not hid_dev or 'HID_ID' not in hid_dev: + continue + + bus, vid, pid = hid_dev['HID_ID'].split(':') + if vendor_id is not None and vendor_id != int(vid, 16): + continue + if product_id is not None and product_id != int(pid, 16): + continue + + if bus == '0003': # USB + intf_dev = dev.find_parent('usb', 'usb_interface') + if not intf_dev: + continue + + # interface = int(intf_dev.attributes['bInterfaceNumber'], 16) + interface = intf_dev.attributes.asint('bInterfaceNumber') + if interface_number is not None and interface_number != interface: + continue + + serial = hid_dev['HID_UNIQ'] if 'HID_UNIQ' in hid_dev else None + + usb_dev = dev.find_parent('usb', 'usb_device') + if usb_dev: + attrs = usb_dev.attributes + devinfo = DeviceInfo(path=dev.device_node, + vendor_id=vid[-4:], + product_id=pid[-4:], + serial=serial, + release=attrs['bcdDevice'], + manufacturer=attrs['manufacturer'], + product=attrs['product'], + interface=interface) + yield devinfo + + if bus == '0005': # BLUETOOTH + # TODO + pass + + +def open(vendor_id, product_id, serial=None): + """Open a HID device by its Vendor ID, Product ID and optional serial number. + + If no serial is provided, the first device with the specified IDs is opened. + + :returns: an opaque device handle, or ``None``. + """ + for device in enumerate(vendor_id, product_id): + if serial is None or serial == device.serial: + return open_path(device.path) + + +def open_path(device_path): + """Open a HID device by its path name. + + :param device_path: the path of a ``DeviceInfo`` tuple returned by + enumerate(). + + :returns: an opaque device handle, or ``None``. + """ + try: + return _os.open(device_path, _os.O_RDWR | _os.O_SYNC) + except: + pass + + +def close(device_handle): + """Close a HID device. + + :param device_handle: a device handle returned by open() or open_path(). + """ + _os.close(device_handle) + + +def write(device_handle, data): + """Write an Output report to a HID device. + + :param device_handle: a device handle returned by open() or open_path(). + :param data: the data bytes to send including the report number as the + first byte. + + The first byte of data[] must contain the Report ID. For + devices which only support a single report, this must be set + to 0x0. The remaining bytes contain the report data. Since + the Report ID is mandatory, calls to hid_write() will always + contain one more byte than the report contains. For example, + if a hid report is 16 bytes long, 17 bytes must be passed to + hid_write(), the Report ID (or 0x0, for devices with a + single report), followed by the report data (16 bytes). In + this example, the length passed in would be 17. + + write() will send the data on the first OUT endpoint, if + one exists. If it does not, it will send the data through + the Control Endpoint (Endpoint 0). + + :returns: ``True`` if the write was successful. + """ + try: + bytes_written = _os.write(device_handle, data) + return bytes_written == len(data) + except: + pass + + +def read(device_handle, bytes_count, timeout_ms=-1): + """Read an Input report from a HID device. + + :param device_handle: a device handle returned by open() or open_path(). + :param bytes_count: maximum number of bytes to read. + :param timeout_ms: can be -1 (default) to wait for data indefinitely, 0 to + read whatever is in the device's input buffer, or a positive integer to + wait that many milliseconds. + + Input reports are returned to the host through the INTERRUPT IN endpoint. + The first byte will contain the Report number if the device uses numbered + reports. + + :returns: the data packet read, an empty bytes string if a timeout was + reached, or None if there was an error while reading. + """ + try: + timeout = None if timeout_ms < 0 else timeout_ms / 1000.0 + rlist, wlist, xlist = _select.select([device_handle], [], [], timeout) + if rlist: + assert rlist == [device_handle] + return _os.read(device_handle, bytes_count) + return b'' + except OSError: + pass + + +_DEVICE_STRINGS = { + 0: 'manufacturer', + 1: 'product', + 2: 'serial', +} + + +def get_manufacturer(device_handle): + """Get the Manufacturer String from a HID device. + + :param device_handle: a device handle returned by open() or open_path(). + """ + return get_indexed_string(device_handle, 0) + + +def get_product(device_handle): + """Get the Product String from a HID device. + + :param device_handle: a device handle returned by open() or open_path(). + """ + return get_indexed_string(device_handle, 1) + + +def get_serial(device_handle): + """Get the serial number from a HID device. + + :param device_handle: a device handle returned by open() or open_path(). + """ + serial = get_indexed_string(device_handle, 2) + if serial is not None: + return ''.join(hex(ord(c)) for c in serial) + + +def get_indexed_string(device_handle, index): + """Get a string from a HID device, based on its string index. + + Note: currently not working in the ``hidraw`` native implementation. + + :param device_handle: a device handle returned by open() or open_path(). + :param index: the index of the string to get. + """ + if index not in _DEVICE_STRINGS: + return None + + stat = _os.fstat(device_handle) + dev = _Device.from_device_number(_Context(), 'char', stat.st_rdev) + if dev: + hid_dev = dev.find_parent('hid') + if hid_dev and 'HID_ID' in hid_dev: + bus, _, _ = hid_dev['HID_ID'].split(':') + + if bus == '0003': # USB + usb_dev = dev.find_parent('usb', 'usb_device') + if usb_dev: + attrs = usb_dev.attributes + key = _DEVICE_STRINGS[index] + if key in attrs: + return attrs[key]