diff --git a/logitech/__init__.py b/logitech/__init__.py new file mode 100644 index 00000000..2cbf40e6 --- /dev/null +++ b/logitech/__init__.py @@ -0,0 +1,3 @@ +# pass + +__all__ = [ 'unifying_receiver' ] diff --git a/logitech/hidapi.py b/logitech/hidapi.py new file mode 100644 index 00000000..1f6d32f6 --- /dev/null +++ b/logitech/hidapi.py @@ -0,0 +1,358 @@ +"""Human Interface Device API. + +It is little more than a thin ctypes layer over a native hidapi implementation. +The docstrings are mostly copied from the hidapi API header, with changes where +necessary. + +The native HID API implemenation is available at https://github.com/signal11/hidapi. + +Using the native hidraw implementation is recommended. +Currently the native libusb implementation (temporarily) detaches the device's +USB driver from the kernel, and it may cause the device to become unresponsive. +""" + +__version__ = '0.2-hidapi-0.7.0' + + +import os.path +from collections import namedtuple + +from ctypes import ( + cdll, create_string_buffer, create_unicode_buffer, + c_int, c_ushort, c_size_t, c_char_p, c_wchar_p, c_void_p, POINTER, Structure +) + + +_hidapi = None +native_path = os.path.dirname(__file__) +for native_implementation in ('hidraw', 'libusb'): + try: + native_lib = os.path.join(native_path, 'libhidapi-' + native_implementation + '.so') + _hidapi = cdll.LoadLibrary(native_lib) + break + except OSError: + pass +del native_path, native_lib, native_implementation +if _hidapi is None: + raise ImportError(__file__, 'failed to load any HID API native implementation') + + +# internally used by native hidapi, no need to expose it +class _DeviceInfo(Structure): + pass +_DeviceInfo._fields_ = [ + ('path', c_char_p), + ('vendor_id', c_ushort), + ('product_id', c_ushort), + ('serial', c_wchar_p), + ('release', c_ushort), + ('manufacturer', c_wchar_p), + ('product', c_wchar_p), + ('usage_page', c_ushort), + ('usage', c_ushort), + ('interface', c_int), + ('next', POINTER(_DeviceInfo)) +] + + +# the tuple object we'll expose when enumerating devices +DeviceInfo = namedtuple('DeviceInfo', [ + 'path', + 'vendor_id', + 'product_id', + 'serial', + 'release', + 'manufacturer', + 'product', + 'interface']) + + +# create a DeviceInfo tuple from a hid_device object +def _DevInfoTuple(hid_device): + return DeviceInfo( + path=str(hid_device.path), + vendor_id=hex(hid_device.vendor_id)[2:], + product_id=hex(hid_device.product_id)[2:], + serial=str(hid_device.serial) if hid_device.serial else None, + release=hex(hid_device.release)[2:], + manufacturer=str(hid_device.manufacturer), + product=str(hid_device.product), + interface=hid_device.interface) + + +# +# set-up arguments and return types for each hidapi function +# + +_hidapi.hid_init.argtypes = None +_hidapi.hid_init.restype = c_int + +_hidapi.hid_exit.argtypes = None +_hidapi.hid_exit.restype = c_int + +_hidapi.hid_enumerate.argtypes = [ c_ushort, c_ushort ] +_hidapi.hid_enumerate.restype = POINTER(_DeviceInfo) + +_hidapi.hid_free_enumeration.argtypes = [ POINTER(_DeviceInfo) ] +_hidapi.hid_free_enumeration.restype = None + +_hidapi.hid_open.argtypes = [ c_ushort, c_ushort, c_wchar_p ] +_hidapi.hid_open.restype = c_void_p + +_hidapi.hid_open_path.argtypes = [ c_char_p ] +_hidapi.hid_open_path.restype = c_void_p # POINTER(_hid_device) + +_hidapi.hid_close.argtypes = [ c_void_p ] +_hidapi.hid_close.restype = None + +_hidapi.hid_write.argtypes = [ c_void_p, c_char_p, c_size_t ] +_hidapi.hid_write.restype = c_int + +# _hidapi.hid_read.argtypes = [ c_void_p, c_char_p, c_size_t ] +# _hidapi.hid_read.restype = c_int + +_hidapi.hid_read_timeout.argtypes = [ c_void_p, c_char_p, c_size_t, c_int ] +_hidapi.hid_read_timeout.restype = c_int + +# _hidapi.hid_set_nonblocking.argtypes = [ c_void_p, c_int ] +# _hidapi.hid_set_nonblocking.restype = c_int + +_hidapi.hid_send_feature_report.argtypes = [ c_void_p, c_char_p, c_size_t ] +_hidapi.hid_send_feature_report.restype = c_int + +_hidapi.hid_get_feature_report.argtypes = [ c_void_p, c_char_p, c_size_t ] +_hidapi.hid_get_feature_report.restype = c_int + +_hidapi.hid_get_manufacturer_string.argtypes = [ c_void_p, c_wchar_p, c_size_t ] +_hidapi.hid_get_manufacturer_string.restype = c_int + +_hidapi.hid_get_product_string.argtypes = [ c_void_p, c_wchar_p, c_size_t ] +_hidapi.hid_get_product_string.restype = c_int + +_hidapi.hid_get_serial_number_string.argtypes = [ c_void_p, c_wchar_p, c_size_t ] +_hidapi.hid_get_serial_number_string.restype = c_int + +# _hidapi.hid_get_indexed_string.argtypes = [ c_void_p, c_int, c_wchar_p, c_size_t ] +# _hidapi.hid_get_indexed_string.restype = c_int + +# _hidapi.hid_error.argtypes = [ c_void_p ] +# _hidapi.hid_error.restype = c_wchar_p + + +# +# exposed API +# docstrings mostly copied from hidapi.h +# + + +def init(): + """Initialize the HIDAPI library. + + This function initializes the HIDAPI library. Calling it is not + strictly necessary, as it will be called automatically by + hid_enumerate() and any of the hid_open_*() functions if it is + needed. This function should be called at the beginning of + execution however, if there is a chance of HIDAPI handles + being opened by different threads simultaneously. + + :returns: True if successful. + """ + return _hidapi.hid_init() == 0 + + +def exit(): + """Finalize the HIDAPI library. + + This function frees all of the static data associated with + HIDAPI. It should be called at the end of execution to avoid + memory leaks. + + :returns: True if successful. + """ + return _hidapi.hid_exit() == 0 + + +def enumerate(vendor_id=None, product_id=None, interface_number=None): + """Enumerate the HID Devices. + + List all the HID devices attached to the system, optionally filtering by + vendor_id, product_id, and/or interface_number. + + :returns: a list of matching DeviceInfo tuples. + """ + results = [] + + devices = _hidapi.hid_enumerate(vendor_id, product_id) + d = devices + while d: + if interface_number is None or interface_number == d.contents.interface: + results.append(_DevInfoTuple(d.contents)) + d = d.contents.next + + if devices: + _hidapi.hid_free_enumeration(devices) + + return results + + +def open(vendor_id, product_id, serial=None): + """Open a HID device using a Vendor ID, Product ID and optionally a serial number. + + If no serial_number is provided, the first device with the specified ids is opened. + + :returns: an opaque device handle, or None. + """ + return _hidapi.hid_open(vendor_id, product_id, serial) or None + + +def open_path(device_path): + """Open a HID device by its path name. + + :param device_path: the path of a DeviceInfo tuple returned by enumerate(). + + :returns: an opaque device handle, or None. + """ + return _hidapi.hid_open_path(device_path) or None + + +def close(device_handle): + """Close a HID device. + + :param device_handle: a device handle returned by open() or open_path(). + """ + _hidapi.hid_close(device_handle) + + +def write(device_handle, data): + """Write an Output report to a HID device. + + :param device_handle: a device handle returned by open() or open_path(). + :param data: the data bytes to send including the report number as the first byte. + + The first byte of data[] must contain the Report ID. For + devices which only support a single report, this must be set + to 0x0. The remaining bytes contain the report data. Since + the Report ID is mandatory, calls to hid_write() will always + contain one more byte than the report contains. For example, + if a hid report is 16 bytes long, 17 bytes must be passed to + hid_write(), the Report ID (or 0x0, for devices with a + single report), followed by the report data (16 bytes). In + this example, the length passed in would be 17. + + write() will send the data on the first OUT endpoint, if + one exists. If it does not, it will send the data through + the Control Endpoint (Endpoint 0). + + :returns: True if the write was successful. + """ + bytes_written = _hidapi.hid_write(device_handle, c_char_p(data), len(data)) + return bytes_written > -1 + + +def read(device_handle, bytes_count, timeout_ms=-1): + """Read an Input report from a HID device. + + :param device_handle: a device handle returned by open() or open_path(). + :param bytes_count: maximum number of bytes to read. + :param timeout_ms: can be -1 (default) to wait for data indefinitely, 0 to + read whatever is in the device's input buffer, or a positive integer to + wait that many milliseconds. + + Input reports are returned to the host through the INTERRUPT IN endpoint. + The first byte will contain the Report number if the device uses numbered + reports. + + :returns: the bytes read, or None if a timeout was reached. + """ + out_buffer = create_string_buffer('\x00' * (bytes_count + 1)) + bytes_read = _hidapi.hid_read_timeout(device_handle, out_buffer, bytes_count, timeout_ms) + if bytes_read > -1: + return out_buffer[:bytes_read] + + +def send_feature_report(device_handle, data, report_number=None): + """Send a Feature report to the device. + + :param device_handle: a device handle returned by open() or open_path(). + :param data: the data bytes to send including the report number as the first byte. + :param report_number: if set, it is sent as the first byte with the data. + + Feature reports are sent over the Control endpoint as a + Set_Report transfer. The first byte of data[] must + contain the Report ID. For devices which only support a + single report, this must be set to 0x0. The remaining bytes + contain the report data. Since the Report ID is mandatory, + calls to send_feature_report() will always contain one + more byte than the report contains. For example, if a hid + report is 16 bytes long, 17 bytes must be passed to + send_feature_report(): the Report ID (or 0x0, for + devices which do not use numbered reports), followed by the + report data (16 bytes). + + :returns: True if the report was successfully written to the device. + """ + if report_number is not None: + data = chr(report_number) + data + bytes_written = _hidapi.hid_send_feature_report(device_handle, c_char_p(data), len(data)) + return bytes_written > -1 + + +def get_feature_report(device_handle, bytes_count, report_number=None): + """Get a feature report from a HID device. + + :param device_handle: a device handle returned by open() or open_path(). + :param bytes_count: how many bytes to read. + :param report_number: if set, it is sent as the report number. + + :returns: the feature report data. + """ + out_buffer = create_string_buffer('\x00' * (bytes_count + 2)) + if report_number is not None: + out_buffer[0] = chr(report_number) + bytes_read = _hidapi.hid_get_feature_report(device_handle, out_buffer, bytes_count) + if bytes_read > -1: + return out_buffer[:bytes_read] + + +def _read_wchar(func, device_handle, index=None): + _BUFFER_SIZE = 64 + buf = create_unicode_buffer('\x00' * _BUFFER_SIZE) + if index is None: + ok = func(device_handle, buf, _BUFFER_SIZE) + else: + ok = func(device_handle, index, buf, _BUFFER_SIZE) + if ok == 0: + return buf.value + + +def get_manufacturer(device_handle): + """Get the Manufacturer String from a HID device. + + :param device_handle: a device handle returned by open() or open_path(). + """ + return _read_wchar(_hidapi.hid_get_manufacturer_string, device_handle) + + +def get_product(device_handle): + """Get the Product String from a HID device. + + :param device_handle: a device handle returned by open() or open_path(). + """ + return _read_wchar(_hidapi.hid_get_product_string, device_handle) + + +def get_serial(device_handle): + """Get the serial number from a HID device. + + :param device_handle: a device handle returned by open() or open_path(). + """ + serial = _read_wchar(_hidapi.hid_get_serial_number_string, device_handle) + if serial is not None: + return ''.join(hex(ord(c)) for c in serial) + + +# def get_indexed_string(device_handle, index): +# """ +# :param device_handle: a device handle returned by open() or open_path(). +# """ +# return _read_wchar(_hidapi.hid_get_indexed_string, device_handle, index) diff --git a/logitech/unifying_receiver.py b/logitech/unifying_receiver.py new file mode 100644 index 00000000..e08ffca6 --- /dev/null +++ b/logitech/unifying_receiver.py @@ -0,0 +1,310 @@ +"""A few functions to deal with the Logitech Universal Receiver. + +Uses the HID api exposed through hidapi.py. +Incomplete. Based on a bit of documentation, trial-and-error, and guesswork. + +References: +http://julien.danjou.info/blog/2012/logitech-k750-linux-support +http://6xq.net/git/lars/lshidpp.git/plain/doc/logitech_hidpp_2.0_specification_draft_2012-06-04.pdf +""" + +# import logging +from . import hidapi + + +_TIMEOUT = 1000 + + +class NoReceiver(Exception): + """May be thrown when trying to talk through a previously connected + receiver that is no longer available (either because it was physically + disconnected or some other reason).""" + pass + + +FEATURE_ROOT = '\x00\x00' +FEATURE_GET_FEATURE_SET = '\x00\x01' +FEATURE_GET_FIRMWARE = '\x00\x03' +FEATURE_GET_NAME = '\x00\x05' +FEATURE_GET_BATTERY = '\x10\x00' +FEATURE_GET_REPROGRAMMABLE_KEYS = '\x1B\x00' +FEATURE_GET_WIRELESS_STATUS = '\x1D\x4B' +FEATURE_UNKNOWN_1 = '\x1D\xF3' +FEATURE_UNKNOWN_2 = '\x40\xA0' +FEATURE_UNKNOWN_3 = '\x41\x00' +FEATURE_GET_SOLAR_CHARGE = '\x43\x01' +FEATURE_UNKNOWN_4 = '\x45\x20' + + +DEVICE_TYPES = ( "Keyboard", "Remote Control", "NUMPAD", "Mouse", "Touchpad", "Trackball", "Presenter", "Receiver" ) + +_DEVICE_FEATURES = {} + + +def _write(receiver, device, data): + # just in case + # hidapi.read(receiver, 128, 0) + data = '\x10' + chr(device) + data + # print "w[", data.encode("hex"), "]", + return hidapi.write(receiver, data) + + +def _read(receiver, device, timeout=_TIMEOUT): + data = hidapi.read(receiver, 128, timeout) + if data is None: + print "r(None)" + return None + + if not data: + # print "r[ ]" + return "" + + # print "r[", data.encode("hex"), "]", + # if len(data) < 7: + # print "short", len(data), + + # if ord(data[0]) == 0x20: + # # no idea what it does, not in any specs + # return _read(receiver, device) + + if ord(data[1]) == 0: + # print "no device", + return _read(receiver, device) + + if ord(data[1]) != device: + # print "wrong device", + return _read(receiver, device) + + # print "" + return data + + +def _get_feature_index(receiver, device, feature_id): + if device not in _DEVICE_FEATURES: + _DEVICE_FEATURES[device] = [ 0 ] * 0x10 + pass + elif feature_id in _DEVICE_FEATURES[device]: + return _DEVICE_FEATURES[device].index(feature_id) + + if not _write(receiver, device, FEATURE_ROOT + feature_id + '\x00'): + # print "write failed, closing receiver" + close(receiver) + raise NoReceiver() + + while True: + reply = _read(receiver, device) + if not reply: + break + + if reply[2:4] != FEATURE_ROOT: + # ignore + continue + + # only return active and supported features + if ord(reply[4]) and ord(reply[5]) & 0xA0 == 0: + index = ord(reply[4]) + _DEVICE_FEATURES[device][index] = feature_id + return index + + # huh? + return 0 + + +def _request(receiver, device, feature_id, function='\x00', param1='\x00', param2='\x00', param3='\x00', reply_function=None): + feature_index = _get_feature_index(receiver, device, feature_id) + if not feature_index or feature_index == -1: + return None + + feature_index = chr(feature_index) + if not _write(receiver, device, feature_index + function + param1 + param2 + param3): + # print "write failed, closing receiver" + close(receiver) + raise NoReceiver() + + def _read_reply(receiver, device, attempts=2): + reply = _read(receiver, device) + if not reply: + if attempts > 0: + return _read_reply(receiver, device, attempts - 1) + return None + + if reply[0] == '\x10' and reply[2] == '\x8F': + # invalid device + return None + + if reply[0] == '\x11' and reply[2] == feature_index: + if reply[3] == reply_function if reply_function else function: + return reply + + if reply[0] == '\x11': + return _read_reply(receiver, device, attempts - 1) + + return _read_reply(receiver, device) + + +def _get_feature_set(receiver, device): + features = [ 0 ] * 0x10 + reply = _request(receiver, device, FEATURE_GET_FEATURE_SET) + if reply: + for index in range(1, 1 + ord(reply[4])): + reply = _request(receiver, device, FEATURE_GET_FEATURE_SET, '\x10', chr(index)) + if reply: + features[index] = reply[4:6].upper() + # print "feature", reply[4:6].encode('hex'), "index", index + + return features + + +_PING_DEVICE = '\x10\x00\x00\x10\x00\x00\xAA' + + +def open(): + """Gets the HID device handle for the Unifying Receiver. + + It is assumed a single receiver is connected to the machine. If more than + one are present, the first one found will be returned. + + :returns: an opaque device handle if a receiver is found, or None. + """ + # USB ids for (Logitech, Unifying Receiver) + for rawdevice in hidapi.enumerate(0x046d, 0xc52b, 2): + # print "checking", rawdevice, + receiver = hidapi.open_path(rawdevice.path) + if not receiver: + # could be a permissions problem + # in any case, unreachable + # print "failed to open" + continue + + # ping on a device id we know to be invalid + hidapi.write(receiver, _PING_DEVICE) + + # if this is the right hidraw device, we'll receive a 'bad subdevice' + # otherwise, the read should produce nothing + reply = hidapi.read(receiver, 32, 200) + if reply: + # print "r[", reply.encode("hex"), "]", + if reply == '\x01\x00\x00\x00\x00\x00\x00\x00': + # no idea what this is + # print "nope" + pass + elif reply[:4] == "\x10\x00\x8F\x00": + # print "found" + return receiver + # print "unknown" + else: + # print "no reply" + pass + hidapi.close(receiver) + + +def close(receiver): + """Closes a HID device handle obtained with open().""" + if receiver: + try: + hidapi.close(receiver) + # print "closed", receiver + return True + except: + pass + return False + + +def ping(receiver, device): + # print "ping", device, + if not _write(receiver, device, _PING_DEVICE[2:]): + # print "write failed", + return False + + reply = _read(receiver, device) + if not reply: + # print "no data", + return False + + # 10018f00100900 + if ord(reply[0]) == 0x10 and ord(reply[2]) == 0x8F: + # print "invalid", + return False + + # 110100100200aa00000000000000000000000000 + if ord(reply[0]) == 0x11 and reply[2:4] == "\x00\x10" and reply[6] == "\xAA": + return True + + return False + + +def get_name(receiver, device): + reply = _request(receiver, device, FEATURE_GET_NAME) + if reply: + charcount = ord(reply[4]) + name = '' + index = 0 + while len(name) < charcount: + reply = _request(receiver, device, FEATURE_GET_NAME, '\x10', chr(index)) + if reply: + name += reply[4:4 + charcount - index] + index = len(name) + else: + break + return name + + +def get_type(receiver, device): + reply = _request(receiver, device, FEATURE_GET_NAME, '\x20') + if reply: + return DEVICE_TYPES[ord(reply[4])] + + +def get_firmware_version(receiver, device, firmware_type=0): + reply = _request(receiver, device, FEATURE_GET_FIRMWARE, '\x10', chr(firmware_type)) + if reply: + return '%s %s.%s' % (reply[5:8], reply[8:10].encode('hex'), reply[10:12].encode('hex')) + + +def get_battery_level(receiver, device): + reply = _request(receiver, device, FEATURE_GET_BATTERY) + if reply: + return ( ord(reply[4]), ord(reply[5]), ord(reply[6]) ) + + +def get_reprogrammable_keys(receiver, device): + count = _request(receiver, device, FEATURE_GET_REPROGRAMMABLE_KEYS) + if count: + keys = [] + for index in range(ord(count[4])): + key = _request(receiver, device, FEATURE_GET_REPROGRAMMABLE_KEYS, '\x10', chr(index)) + keys.append( key[4:6], keys[6:8], ord(key[8]) ) + return keys + + +def get_solar_charge(receiver, device): + reply = _request(receiver, device, FEATURE_GET_SOLAR_CHARGE, '\x03', '\x78', '\x01', reply_function='\x10') + if reply: + charge = ord(reply[4]) + lux = ord(reply[5]) << 8 | ord(reply[6]) + # lux = int(round(((255 * ord(reply[5])) + ord(reply[6])) / 538.0, 2) * 100) + return (charge, lux) + + +def find_device(receiver, match_device_type=None, match_name=None): + """Gets the device number for the first device matching. + + The device type and name are case-insensitive. + """ + # Apparently a receiver supports up to 6 devices. + for device in range(1, 7): + if ping(receiver, device): + if device not in _DEVICE_FEATURES: + _DEVICE_FEATURES[device] = _get_feature_set(receiver, device) + # print get_reprogrammable_keys(receiver, device) + # d_firmware = get_firmware_version(receiver, device) + # print "device", device, "[", d_name, "/", d_type, "] firmware", d_firmware, "features", _DEVICE_FEATURES[device] + if match_device_type: + d_type = get_type(receiver, device) + if d_type is None or match_device_type.lower() != d_type.lower(): + continue + if match_name: + d_name = get_name(receiver, device) + if d_name is None or match_name.lower() != d_name.lower(): + continue + return device diff --git a/rules.d/99-logitech-unifying-receiver.rules b/rules.d/99-logitech-unifying-receiver.rules new file mode 100644 index 00000000..50d7e965 --- /dev/null +++ b/rules.d/99-logitech-unifying-receiver.rules @@ -0,0 +1,38 @@ +# Copied from https://github.com/signal11/hidapi, and modified + +# This is a sample udev file for HIDAPI devices which changes the permissions +# to 0660 and group to plugdev for a specified device on Linux systems. + +# Make sure the plugdev group exists on your system and your user is a member +# before applying these rules. + + +# If you are using the libusb implementation of hidapi (hid-libusb.c), then +# use something like the following line, substituting the VID and PID with +# those of your device. Note that for kernels before 2.6.24, you will need +# to substitute "usb" with "usb_device". It shouldn't hurt to use two lines +# (one each way) for compatibility with older systems. + +# HIDAPI/libusb +SUBSYSTEM=="usb", ATTR{idVendor}=="046d", ATTR{idProduct}=="c52b", GROUP="plugdev", MODE="0660" + +# If you are using the hidraw implementation, then do something like the +# following, substituting the VID and PID with your device. Busnum 1 is USB. + +# HIDAPI/hidraw +KERNEL=="hidraw*", ATTRS{idVendor}=="046d", ATTRS{idProduct}=="c52b", GROUP="plugdev", MODE="0660" + +# Once done, optionally rename this file for your device, and drop it into +# /etc/udev/rules.d and unplug and re-plug your device. This is all that is +# necessary to see the new permissions. Udev does not have to be restarted. + +# Note that the hexadecimal values for VID and PID are case sensitive and +# must be lower case. + +# If you think permissions of 0666 are too loose, then see: +# http://reactivated.net/writing_udev_rules.html for more information on finer +# grained permission setting. For example, it might be sufficient to just +# set the group or user owner for specific devices (for example the plugdev +# group on some systems). + +# vim: ft=udevrules diff --git a/rules.d/99-logitech.rules b/rules.d/99-logitech.rules deleted file mode 100644 index 780e82bb..00000000 --- a/rules.d/99-logitech.rules +++ /dev/null @@ -1,7 +0,0 @@ -# ... ID 046d:c52b Logitech, Inc. Unifying Receiver -# -# Creates symlink from /dev/solar_keyboard => /dev/bus... and sets -# appropriate device node permissions. -SUBSYSTEM=="usb", ATTR{idProduct}=="c52b", SYMLINK+="solar_keyboard", GROUP="users", OWNER="noah" - -# vim: ft=udevrules