From cd3ffcca8168fcbca7d0f515587c8e3fa0d2179b Mon Sep 17 00:00:00 2001 From: Daniel Pavel Date: Tue, 25 Sep 2012 13:49:24 +0300 Subject: [PATCH] made full package out of unifying_receiver, added some tests --- logitech/__init__.py | 2 - logitech/unifying_receiver/__init__.py | 37 +++ logitech/unifying_receiver/api.py | 279 ++++++++++++++++++ logitech/unifying_receiver/base.py | 240 +++++++++++++++ logitech/unifying_receiver/constants.py | 71 +++++ logitech/unifying_receiver/exceptions.py | 38 +++ logitech/unifying_receiver/tests/__init__.py | 3 + .../unifying_receiver/tests/test_00_hidapi.py | 17 ++ .../tests/test_10_constants.py | 34 +++ .../unifying_receiver/tests/test_30_base.py | 188 ++++++++++++ .../unifying_receiver/tests/test_50_api.py | 117 ++++++++ logitech/unifying_receiver/unhandled.py | 40 +++ logitech/ur_lowlevel_test.py | 112 ------- test.sh | 5 - unittest.sh | 10 + 15 files changed, 1074 insertions(+), 119 deletions(-) create mode 100644 logitech/unifying_receiver/__init__.py create mode 100644 logitech/unifying_receiver/api.py create mode 100644 logitech/unifying_receiver/base.py create mode 100644 logitech/unifying_receiver/constants.py create mode 100644 logitech/unifying_receiver/exceptions.py create mode 100644 logitech/unifying_receiver/tests/__init__.py create mode 100644 logitech/unifying_receiver/tests/test_00_hidapi.py create mode 100644 logitech/unifying_receiver/tests/test_10_constants.py create mode 100644 logitech/unifying_receiver/tests/test_30_base.py create mode 100644 logitech/unifying_receiver/tests/test_50_api.py create mode 100644 logitech/unifying_receiver/unhandled.py delete mode 100644 logitech/ur_lowlevel_test.py delete mode 100755 test.sh create mode 100755 unittest.sh diff --git a/logitech/__init__.py b/logitech/__init__.py index a2d36019..cfe18a79 100644 --- a/logitech/__init__.py +++ b/logitech/__init__.py @@ -1,3 +1 @@ # pass - -__all__ = ['unifying_receiver'] diff --git a/logitech/unifying_receiver/__init__.py b/logitech/unifying_receiver/__init__.py new file mode 100644 index 00000000..271d2b50 --- /dev/null +++ b/logitech/unifying_receiver/__init__.py @@ -0,0 +1,37 @@ +"""Low-level interface for devices connected through a Logitech Universal +Receiver (UR). + +Uses the HID api exposed through hidapi.py, a Python thin layer over a native +implementation. + +Incomplete. Based on a bit of documentation, trial-and-error, and guesswork. + +Strongly recommended to use these functions from a single thread; calling +multiple functions from different threads has a high chance of mixing the +replies and causing apparent failures. + +Basic order of operations is: + - open() to obtain a UR handle + - request() to make a feature call to one of the devices attached to the UR + - close() to close the UR handle + +References: +http://julien.danjou.info/blog/2012/logitech-k750-linux-support +http://6xq.net/git/lars/lshidpp.git/plain/doc/ +""" + + +# +# Logging set-up. +# Add a new logging level for tracing low-level writes and reads. +# + +import logging +_l = logging.getLogger('unifying_receiver') +_LOG_LEVEL = 5 +_l.setLevel(_LOG_LEVEL) + + +from .constants import * +from .exceptions import * +from .api import * diff --git a/logitech/unifying_receiver/api.py b/logitech/unifying_receiver/api.py new file mode 100644 index 00000000..31873fa0 --- /dev/null +++ b/logitech/unifying_receiver/api.py @@ -0,0 +1,279 @@ +# +# Logitech Unifying Receiver API. +# + +import logging +_LOG_LEVEL = 5 +_l = logging.getLogger('logitech.unifying_receiver.api') +_l.setLevel(_LOG_LEVEL) + +from .constants import * +from .exceptions import * +from . import base +from .unhandled import _publish as _unhandled_publish + + +def open(): + """Opens the first Logitech UR found attached to the machine. + + :returns: An open file handle for the found receiver, or ``None``. + """ + for rawdevice in base.list_receiver_devices(): + _l.log(_LOG_LEVEL, "checking %s", rawdevice) + + receiver = base.try_open(rawdevice.path) + if receiver: + return receiver + + return None + + +"""Closes a HID device handle.""" +close = base.close + + +def request(handle, device, feature, function=b'\x00', params=b'', features_array=None): + """Makes a feature call to the device, and returns the reply data. + + Basically a write() followed by (possibly multiple) reads, until a reply + matching the called feature is received. In theory the UR will always reply + to feature call; otherwise this function will wait indefinitely. + + Incoming data packets not matching the feature and function will be + delivered to the unhandled hook (if any), and ignored. + + The optional ``features_array`` parameter is a cached result of the + get_device_features function for this device, necessary to find the feature + index. If the ``features_arrary`` is not provided, one will be obtained by + manually calling get_device_features before making the request call proper. + + :raises FeatureNotSupported: if the device does not support the feature. + """ + + feature_index = None + if feature == FEATURE.ROOT: + feature_index = b'\x00' + else: + if features_array is None: + features_array = get_device_features(handle, device) + if features_array is None: + _l.log(_LOG_LEVEL, "(%d,%d) no features array available", handle, device) + return None + if feature in features_array: + feature_index = chr(features_array.index(feature)) + + if feature_index is None: + _l.warn("(%d,%d) feature <%s:%s> not supported", handle, device, feature.encode('hex'), FEATURE_NAME(feature)) + raise FeatureNotSupported(device, feature) + + return base.request(handle, device, feature_index + function, params) + + +def ping(handle, device): + """Pings a device number to check if it is attached to the UR. + + :returns: True if the device is connected to the UR, False if the device is + not attached, None if no conclusive reply is received. + """ + + ping_marker = b'\xAA' + + def _status(reply): + if not reply: + return None + + reply_code, reply_device, reply_data = reply + + if reply_device != device: + # oops + _l.log(_LOG_LEVEL, "(%d,%d) ping: reply for another device %d: %s", handle, device, reply_device, reply_data.encode('hex')) + _unhandled_publish(reply_code, reply_device, reply_data) + return _status(base.read(handle)) + + if (reply_code == 0x11 and reply_data[:2] == b'\x00\x10' and reply_data[4] == ping_marker): + # ping ok + _l.log(_LOG_LEVEL, "(%d,%d) ping: ok [%s]", handle, device, reply_data.encode('hex')) + return True + + if (reply_code == 0x10 and reply_data[:2] == b'\x8F\x00'): + # ping failed + _l.log(_LOG_LEVEL, "(%d,%d) ping: device not present", handle, device) + return False + + if (reply_code == 0x11 and reply_data[:2] == b'\x09\x00' and len(reply_data) == 18 and reply_data[7:11] == b'GOOD'): + # some devices may reply with a SOLAR_STATUS event before the + # ping_ok reply, especially right after the device connected to the + # receiver + _l.log(_LOG_LEVEL, "(%d,%d) ping: solar status %s", handle, device, reply_data.encode('hex')) + _unhandled_publish(reply_code, reply_device, reply_data) + return _status(base.read(handle)) + + # ugh + _l.log(_LOG_LEVEL, "(%d,%d) ping: unknown reply for this device", handle, device, reply) + _unhandled_publish(reply_code, reply_device, reply_data) + return None + + _l.log(_LOG_LEVEL, "(%d,%d) pinging", handle, device) + base.write(handle, device, b'\x00\x10\x00\x00' + ping_marker) + # pings may take a while to reply success + return _status(base.read(handle, base.DEFAULT_TIMEOUT * 3)) + + +def get_feature_index(handle, device, feature): + """Reads the index of a device's feature. + + :returns: An int, or ``None`` if the feature is not available. + """ + _l.log(_LOG_LEVEL, "(%d,%d) get feature index <%s:%s>", handle, device, feature.encode('hex'), FEATURE_NAME(feature)) + if len(feature) != 2: + raise ValueError("invalid feature <%s>: it must be a two-byte string" % (feature.encode(hex))) + + # FEATURE.ROOT should always be available for any attached devices + reply = base.request(handle, device, FEATURE.ROOT, feature) + if reply: + # only consider active and supported features + feature_index = ord(reply[0]) + if feature_index: + feature_flags = ord(reply[1]) & 0xE0 + _l.log(_LOG_LEVEL, "(%d,%d) feature <%s:%s> has index %d flags %02x", handle, device, feature.encode('hex'), FEATURE_NAME(feature), feature_index, feature_flags) + if feature_flags == 0: + return feature_index + + if feature_flags & 0x80: + _l.warn("(%d,%d) feature <%s:%s> not supported: obsolete", handle, device, feature.encode('hex'), FEATURE_NAME(feature)) + if feature_flags & 0x40: + _l.warn("(%d,%d) feature <%s:%s> not supported: hidden", handle, device, feature.encode('hex'), FEATURE_NAME(feature)) + if feature_flags & 0x20: + _l.warn("(%d,%d) feature <%s:%s> not supported: Logitech internal", handle, device, feature.encode('hex'), FEATURE_NAME(feature)) + raise FeatureNotSupported(device, feature) + else: + _l.warn("(%d,%d) feature <%s:%s> not supported by the device", handle, device, feature.encode('hex'), FEATURE_NAME(feature)) + raise FeatureNotSupported(device, feature) + + +def get_device_features(handle, device): + """Returns an array of feature ids. + + Their position in the array is the index to be used when requesting that + feature on the device. + """ + _l.log(_LOG_LEVEL, "(%d,%d) get device features", handle, device) + + # get the index of the FEATURE_SET + # FEATURE.ROOT should always be available for all devices + fs_index = base.request(handle, device, FEATURE.ROOT, FEATURE.FEATURE_SET) + if not fs_index: + _l.warn("(%d,%d) FEATURE_SET not available", handle, device) + return None + fs_index = fs_index[0] + + # For debugging purposes, query all the available features on the device, + # even if unknown. + + # get the number of active features the device has + features_count = base.request(handle, device, fs_index + b'\x00') + if not features_count: + # this can happen if the device disappeard since the fs_index request + # otherwise we should get at least a count of 1 (the FEATURE_SET we've just used above) + _l.log(_LOG_LEVEL, "(%d,%d) no features available?!", handle, device) + return None + + features_count = ord(features_count[0]) + _l.log(_LOG_LEVEL, "(%d,%d) found %d features", handle, device, features_count) + + # a device may have a maximum of 15 features, other than FEATURE.ROOT + features = [None] * 0x10 + for index in range(1, 1 + features_count): + # for each index, get the feature residing at that index + feature = base.request(handle, device, fs_index + b'\x10', chr(index)) + if feature: + feature = feature[0:2].upper() + features[index] = feature + _l.log(_LOG_LEVEL, "(%d,%d) feature <%s:%s> at index %d", handle, device, feature.encode('hex'), FEATURE_NAME(feature), index) + + return None if all(c == None for c in features) else features + + +def get_device_firmware(handle, device, features_array=None): + """Reads a device's firmware info. + + Returns an list of tuples [ (firmware_type, firmware_version, ...), ... ], + ordered by firmware layer. + """ + fw_count = request(handle, device, FEATURE.FIRMWARE, features_array=features_array) + if fw_count: + fw_count = ord(fw_count[0]) + + fw = [] + for index in range(0, fw_count): + fw_info = request(handle, device, FEATURE.FIRMWARE, function=b'\x10', params=chr(index), features_array=features_array) + if fw_info: + fw_type = ord(fw_info[0]) & 0x0F + if fw_type == 0 or fw_type == 1: + prefix = str(fw_info[1:4]) + version = ( str((ord(fw_info[4]) & 0xF0) >> 4) + + str(ord(fw_info[4]) & 0x0F) + + '.' + + str((ord(fw_info[5]) & 0xF0) >> 4) + + str(ord(fw_info[5]) & 0x0F)) + name = prefix + ' ' + version + build = 256 * ord(fw_info[6]) + ord(fw_info[7]) + if build: + name += ' b' + str(build) + extras = fw_info[9:].rstrip('\x00') + _l.log(_LOG_LEVEL, "(%d:%d) firmware %d = %s %s extras=%s", handle, device, fw_type, FIRMWARE_TYPES[fw_type], name, extras.encode('hex')) + fw.append((fw_type, name, build, extras)) + elif fw_type == 2: + version = ord(fw_info[1]) + _l.log(_LOG_LEVEL, "(%d:%d) firmware 2 = Hardware v%x", handle, device, version) + fw.append((2, version)) + else: + _l.log(_LOG_LEVEL, "(%d:%d) firmware other", handle, device) + fw.append((fw_type, )) + return fw + + +def get_device_type(handle, device, features_array=None): + """Reads a device's type. + + :see DEVICE_TYPES: + :returns: a string describing the device type, or ``None`` if the device is + not available or does not support the ``NAME`` feature. + """ + d_type = request(handle, device, FEATURE.NAME, function=b'\x20', features_array=features_array) + if d_type: + d_type = ord(d_type[0]) + _l.log(_LOG_LEVEL, "(%d,%d) device type %d = %s", handle, device, d_type, DEVICE_TYPES[d_type]) + return DEVICE_TYPES[d_type] + + +def get_device_name(handle, device, features_array=None): + """Reads a device's name. + + :returns: a string with the device name, or ``None`` if the device is not + available or does not support the ``NAME`` feature. + """ + name_length = request(handle, device, FEATURE.NAME, features_array=features_array) + if name_length: + name_length = ord(name_length[0]) + + d_name = '' + while len(d_name) < name_length: + name_index = len(d_name) + name_fragment = request(handle, device, FEATURE.NAME, function=b'\x10', params=chr(name_index), features_array=features_array) + name_fragment = name_fragment[:name_length - len(d_name)] + d_name += name_fragment + + _l.log(_LOG_LEVEL, "(%d,%d) device name %s", handle, device, d_name) + return d_name + +def get_device_battery_level(handle, device, features_array=None): + """Reads a device's battery level. + """ + battery = request(handle, device, FEATURE.BATTERY, features_array=features_array) + if battery: + discharge = ord(battery[0]) + dischargeNext = ord(battery[1]) + status = ord(battery[2]) + _l.log(_LOG_LEVEL, "(%d:%d) battery %d%% charged, next level %d%% charge, status %d = %s", discharge, dischargeNext, status, BATTERY_STATUSES[status]) + return (discharge, dischargeNext, status) diff --git a/logitech/unifying_receiver/base.py b/logitech/unifying_receiver/base.py new file mode 100644 index 00000000..2b260d75 --- /dev/null +++ b/logitech/unifying_receiver/base.py @@ -0,0 +1,240 @@ +# +# Base low-level functions used by the API proper. +# Unlikely to be used directly unless you're expanding the API. +# + +import logging +_LOG_LEVEL = 4 +_l = logging.getLogger('logitech.unifying_receiver.base') +_l.setLevel(_LOG_LEVEL) + +from .constants import * +from .exceptions import * + +from . import unhandled as _unhandled +import hidapi as _hid + + +# +# These values are defined by the Logitech documentation. +# Overstepping these boundaries will only produce log warnings. +# + + +"""Minimim lenght of a feature call packet.""" +_MIN_CALL_SIZE = 7 + + +"""Maximum lenght of a feature call packet.""" +_MAX_CALL_SIZE = 20 + + +"""Minimum size of a feature reply packet.""" +_MIN_REPLY_SIZE = _MIN_CALL_SIZE + + +"""Maximum size of a feature reply packet.""" +_MAX_REPLY_SIZE = _MAX_CALL_SIZE + + +"""Default timeout on read (in ms).""" +DEFAULT_TIMEOUT = 1000 + + +# +# +# + +def list_receiver_devices(): + """List all the Linux devices exposed by the UR attached to the machine.""" + # (Vendor ID, Product ID) = ('Logitech', 'Unifying Receiver') + # interface 2 if the actual receiver interface + return _hid.enumerate(0x046d, 0xc52b, 2) + + +def try_open(path): + """Checks if the given device path points to the right UR device. + + :param path: the Linux device path. + + The UR physical device may expose multiple linux devices with the same + interface, so we have to check for the right one. At this moment the only + way to distinguish betheen them is to do a test ping on an invalid + (attached) device number (i.e., 0), expecting a 'ping failed' reply. + + :returns: an open receiver handle if this is the right Linux device, or + ``None``. + """ + receiver_handle = _hid.open_path(path) + if receiver_handle is None: + # could be a file permissions issue (did you add the udev rules?) + # in any case, unreachable + _l.log(_LOG_LEVEL, "[%s] open failed", path) + return None + + _l.log(_LOG_LEVEL, "[%s] receiver handle (%d,)", path, receiver_handle) + # ping on device id 0 (always an error) + _hid.write(receiver_handle, b'\x10\x00\x00\x10\x00\x00\xAA') + + # if this is the right hidraw device, we'll receive a 'bad device' from the UR + # otherwise, the read should produce nothing + reply = _hid.read(receiver_handle, _MAX_REPLY_SIZE, DEFAULT_TIMEOUT) + if reply: + if reply[:4] == b'\x10\x00\x8F\x00': + # 'device 0 unreachable' is the expected reply from a valid receiver handle + _l.log(_LOG_LEVEL, "[%s] success: handle (%d,)", path, receiver_handle) + return receiver_handle + + # any other replies are ignored, and will assume this is the wrong receiver device + if reply == b'\x01\x00\x00\x00\x00\x00\x00\x00': + # no idea what this is, but it comes up occasionally + _l.log(_LOG_LEVEL, "[%s] (%d,) mistery reply [%s]", path, receiver_handle, reply.encode('hex')) + else: + _l.log(_LOG_LEVEL, "[%s] (%d,) unknown reply [%s]", path, receiver_handle, reply.encode('hex')) + else: + _l.log(_LOG_LEVEL, "[%s] (%d,) no reply", path, receiver_handle) + + close(receiver_handle) + return None + + +def close(handle): + """Closes a HID device handle.""" + if handle: + try: + _hid.close(handle) + _l.log(_LOG_LEVEL, "(%d,) closed", handle) + return True + except: + _l.exception("(%d,) closing", handle) + + return False + + +# def write(handle, device, feature_index, function=b'\x00', param1=b'\x00', param2=b'\x00', param3=b'\x00'): +# """Write a feature call to the receiver. +# +# :param handle: UR handle obtained with open(). +# :param device: attached device number. +# :param feature_index: index in the device's feature array. +# """ +# if type(feature_index) == int: +# feature_index = chr(feature_index) +# data = feature_index + function + param1 + param2 + param3 +# return _write(handle, device, data) + + +def write(handle, device, data): + """Writes some data to a certain device. + + :param handle: an open UR handle. + :param device: attached device number. + :param data: data to send, up to 5 bytes. + + The first two (required) bytes of data must be the feature index for the + device, and a function code for that feature. + + :raises NoReceiver: if the receiver is no longer available, i.e. has + been physically removed from the machine, or the kernel driver has been + unloaded. + """ + wdata = b'\x10' + chr(device) + data + b'\x00' * (5 - len(data)) + _l.log(_LOG_LEVEL, "(%d,%d) <= w[%s]", handle, device, wdata.encode('hex')) + if len(wdata) < _MIN_CALL_SIZE: + _l.warn("(%d:%d) <= w[%s] call packet too short: %d bytes", handle, device, wdata.encode('hex'), len(wdata)) + if len(wdata) > _MAX_CALL_SIZE: + _l.warn("(%d:%d) <= w[%s] call packet too long: %d bytes", handle, device, wdata.encode('hex'), len(wdata)) + if not _hid.write(handle, wdata): + _l.warn("(%d,%d) write failed, assuming receiver no longer available", handle, device) + raise NoReceiver() + + +def read(handle, timeout=DEFAULT_TIMEOUT): + """Read some data from the receiver. Usually called after a write (feature + call), to get the reply. + + :param handle: an open UR handle. + :param timeout: read timeout on the UR handle. + + If any data was read in the given timeout, returns a tuple of + (reply_code, device, message data). The reply code should be ``0x11`` for a + successful feature call, or ``0x10`` to indicate some error, e.g. the device + is no longer available. + """ + data = _hid.read(handle, _MAX_REPLY_SIZE * 2, timeout) + if data: + _l.log(_LOG_LEVEL, "(%d,*) => r[%s]", handle, data.encode('hex')) + if len(data) < _MIN_REPLY_SIZE: + _l.warn("(%d,*) => r[%s] read packet too short: %d bytes", handle, data.encode('hex'), len(data)) + if len(data) > _MAX_REPLY_SIZE: + _l.warn("(%d,*) => r[%s] read packet too long: %d bytes", handle, data.encode('hex'), len(data)) + return ord(data[0]), ord(data[1]), data[2:] + else: + _l.log(_LOG_LEVEL, "(%d,*) => r[]", handle) + + +def request(handle, device, feature_index_function, params=b'', features_array=None): + """Makes a feature call device and waits for a matching reply. + + This function will skip all incoming messages and events not related to the + device we're requesting for, or the feature specified in the initial + request; it will also wait for a matching reply indefinitely. + + :param handle: an open UR handle. + :param device: attached device number. + :param feature_index_function: a two-byte string of (feature_index, feature_function). + :param params: parameters for the feature call, 3 to 16 bytes. + :param features_array: optional features array for the device, only used to + fill the FeatureCallError exception if one occurs. + :returns: the reply data packet, or ``None`` if the device is no longer + available. + :raisees FeatureCallError: if the feature call replied with an error. + """ + _l.log(_LOG_LEVEL, "(%d,%d) request {%s} params [%s]", handle, device, feature_index_function.encode('hex'), params.encode('hex')) + if len(feature_index_function) != 2: + raise ValueError('invalid feature_index_function {%s}: it must be a two-byte string' % feature_index_function.encode('hex')) + + write(handle, device, feature_index_function + params) + while True: + reply = read(handle) + + if not reply: + # keep waiting... + continue + + reply_code, reply_device, reply_data = reply + + if reply_device != device: + # this message not for the device we're interested in + _l.log(_LOG_LEVEL, "(%d,%d) request got reply for unexpected device %d: [%s]", handle, device, reply_device, reply.encode('hex')) + # worst case scenario, this is a reply for a concurrent request + # on this receiver + _unhandled._publish(reply_code, reply_device, reply_data) + continue + + if reply_code == 0x10 and reply_data[0] == b'\x8F' and reply_data[1:2] == feature_index_function: + # device not present + _l.log(_LOG_LEVEL, "(%d,%d) request ping failed on {%s} call: [%s]", handle, device, feature_index_function.encode('hex'), reply_data.encode('hex')) + return None + + if reply_code == 0x10 and reply_data[0] == b'\x8F': + # device not present + _l.log(_LOG_LEVEL, "(%d,%d) request ping failed: [%s]", handle, device, reply_data.encode('hex')) + return None + + if reply_code == 0x11 and reply_data[0] == b'\xFF' and reply_data[1:3] == feature_index_function: + # an error returned from the device + error_code = ord(reply_data[3]) + _l.warn("(%d,%d) request feature call error %d = %s: %s", handle, device, error, _ERROR_NAME(error_code), reply_data.encode('hex')) + feature_index = ord(feature_index_function[0]) + feature_function = feature_index_function[1].encode('hex') + feature = None if features_array is None else features_array[feature_index] + raise FeatureCallError(device, feature, feature_index, feature_function, error_code, reply_data) + + if reply_code == 0x11 and reply_data[:2] == feature_index_function: + # a matching reply + _l.log(_LOG_LEVEL, "(%d,%d) matched reply with data [%s]", handle, device, reply_data[2:].encode('hex')) + return reply_data[2:] + + _l.log(_LOG_LEVEL, "(%d,%d) unmatched reply {%s} (expected {%s})", handle, device, reply_data[:2].encode('hex'), feature_index_function.encode('hex')) + _unhandled._publish(reply_code, reply_device, reply_data) diff --git a/logitech/unifying_receiver/constants.py b/logitech/unifying_receiver/constants.py new file mode 100644 index 00000000..ff3be916 --- /dev/null +++ b/logitech/unifying_receiver/constants.py @@ -0,0 +1,71 @@ +# +# Constants used by the rest of the API. +# + + +"""Possible features available on a Logitech device. + +A particular device might not support all these features, and may support other +unknown features as well. +""" +FEATURE = type('FEATURE', (), + dict( + ROOT=b'\x00\x00', + FEATURE_SET=b'\x00\x01', + FIRMWARE=b'\x00\x03', + NAME=b'\x00\x05', + BATTERY=b'\x10\x00', + REPROGRAMMABLE_KEYS=b'\x1B\x00', + WIRELESS_STATUS=b'\x1D\x4B', + # declared by the K750 keyboard, no documentation found so far + SOLAR_CHARGE=b'\x43\x01', + # declared by the K750 keyboard, no documentation found so far + # UNKNOWN_1DF3=b'\x1D\xF3', + # UNKNOWN_40A0=b'\x40\xA0', + # UNKNOWN_4100=b'\x41\x00', + # UNKNOWN_4520=b'\x45\x20', + )) + + +"""Feature names indexed by feature id.""" +_FEATURE_NAMES = { + FEATURE.ROOT: 'ROOT', + FEATURE.FEATURE_SET: 'FEATURE_SET', + FEATURE.FIRMWARE: 'FIRMWARE', + FEATURE.NAME: 'NAME', + FEATURE.BATTERY: 'BATTERY', + FEATURE.REPROGRAMMABLE_KEYS: 'REPROGRAMMABLE_KEYS', + FEATURE.WIRELESS_STATUS: 'WIRELESS_STATUS', + FEATURE.SOLAR_CHARGE: 'SOLAR_CHARGE', + } +def FEATURE_NAME(feature_code): + if feature_code is None: + return None + if feature_code in _FEATURE_NAMES: + return _FEATURE_NAMES[feature_code] + return 'UNKNOWN_' + feature_code.encode('hex') + + +"""Possible types of devices connected to an UR.""" +DEVICE_TYPES = ("Keyboard", "Remote Control", "NUMPAD", "Mouse", + "Touchpad", "Trackball", "Presenter", "Receiver") + + +"""Names of different firmware levels possible, ordered from top to bottom.""" +FIRMWARE_TYPES = ("Main (HID)", "Bootloader", "Hardware", "Other") + + +"""Names for possible battery status values.""" +BATTERY_STATUSES = ("Discharging (in use)", "Recharging", "Almost full", "Full", + "Slow recharge", "Invalid battery", "Thermal error", + "Charging error") + + +"""Names for error codes.""" +_ERROR_NAMES = ("Ok", "Unknown", "Invalid argument", "Out of range", + "Hardware error", "Logitech internal", "Invalid feature index", + "Invalid function", "Busy", "Unsupported") +def ERROR_NAME(error_code): + if error_code < len(_ERROR_NAMES): + return _ERROR_NAMES[error_code] + return 'Unknown Error' diff --git a/logitech/unifying_receiver/exceptions.py b/logitech/unifying_receiver/exceptions.py new file mode 100644 index 00000000..8a2708d2 --- /dev/null +++ b/logitech/unifying_receiver/exceptions.py @@ -0,0 +1,38 @@ +# +# Exceptions that may be raised by this API. +# + + +from .constants import FEATURE_NAME as _FEATURE_NAME +from .constants import ERROR_NAME as _ERROR_NAME + + +class NoReceiver(Exception): + """May be raised when trying to talk through a previously connected + receiver that is no longer available. Should only happen if the receiver is + physically disconnected from the machine, or its kernel driver module is + unloaded.""" + pass + + +class FeatureNotSupported(Exception): + """Raised when trying to request a feature not supported by the device.""" + def __init__(self, device, feature): + super(FeatureNotSupported, self).__init__(device, feature, _FEATURE_NAME(feature)) + self.device = device + self.feature = feature + self.feature_name = _FEATURE_NAME(feature) + + +class FeatureCallError(Exception): + """Raised if the device replied to a feature call with an error.""" + def __init__(self, device, feature, feature_index, feature_function, error_code, data=None): + super(FeatureCallError, self).__init__(device, feature, feature_index, feature_function, error_code, _ERROR_NAME(error_code)) + self.device = device + self.feature = feature + self.feature_name = _FEATURE_NAME(feature) + self.feature_index = feature_index + self.feature_function = feature_function + self.error_code = error_code + self.error_string = _ERROR_NAME(error_code) + self.data = data diff --git a/logitech/unifying_receiver/tests/__init__.py b/logitech/unifying_receiver/tests/__init__.py new file mode 100644 index 00000000..7c4bc79e --- /dev/null +++ b/logitech/unifying_receiver/tests/__init__.py @@ -0,0 +1,3 @@ +# +# Tests for the logitech.unifying_receiver package. +# diff --git a/logitech/unifying_receiver/tests/test_00_hidapi.py b/logitech/unifying_receiver/tests/test_00_hidapi.py new file mode 100644 index 00000000..2f22ec63 --- /dev/null +++ b/logitech/unifying_receiver/tests/test_00_hidapi.py @@ -0,0 +1,17 @@ +# +# test loading the hidapi library +# + +import logging +import unittest + + +class Test_Import_HIDAPI(unittest.TestCase): + def test_00_import_hidapi(self): + import hidapi + self.assertIsNotNone(hidapi) + logging.info("hidapi loaded native implementation %s", hidapi._native._name) + + +if __name__ == '__main__': + unittest.main() diff --git a/logitech/unifying_receiver/tests/test_10_constants.py b/logitech/unifying_receiver/tests/test_10_constants.py new file mode 100644 index 00000000..1fcd37f0 --- /dev/null +++ b/logitech/unifying_receiver/tests/test_10_constants.py @@ -0,0 +1,34 @@ +# +# +# + +import unittest + +from logitech.unifying_receiver import constants + + +class Test_UR_Constants(unittest.TestCase): + + def test_10_feature_names(self): + self.assertIsNone(constants.FEATURE_NAME(None)) + for code in range(0x0000, 0x10000): + feature = chr((code & 0xFF00) >> 8) + chr(code & 0x00FF) + name = constants.FEATURE_NAME(feature) + self.assertIsNotNone(name) + if name.startswith('UNKNOWN_'): + self.assertEquals(code, int(name[8:], 16)) + else: + self.assertTrue(hasattr(constants.FEATURE, name)) + self.assertEquals(feature, getattr(constants.FEATURE, name)) + + def test_20_error_names(self): + for code in range(0x00, 0x100): + name = constants.ERROR_NAME(code) + self.assertIsNotNone(name) + if code > 9: + self.assertEquals(name, 'Unknown Error') + else: + self.assertEquals(code, constants._ERROR_NAMES.index(name)) + +if __name__ == '__main__': + unittest.main() diff --git a/logitech/unifying_receiver/tests/test_30_base.py b/logitech/unifying_receiver/tests/test_30_base.py new file mode 100644 index 00000000..cc1961f8 --- /dev/null +++ b/logitech/unifying_receiver/tests/test_30_base.py @@ -0,0 +1,188 @@ +# +# +# + +import unittest + +from logitech.unifying_receiver import base +from logitech.unifying_receiver.exceptions import * +from logitech.unifying_receiver.constants import * +from logitech.unifying_receiver.unhandled import * + + +class Test_UR_Base(unittest.TestCase): + @classmethod + def setUpClass(cls): + cls.ur_available = False + cls.handle = None + cls.device = None + + @classmethod + def tearDownClass(cls): + if cls.handle: + base.close(cls.handle) + cls.ur_available = False + cls.handle = None + cls.device = None + + def test_10_list_receiver_devices(self): + rawdevices = base.list_receiver_devices() + self.assertIsNotNone(rawdevices, "list_receiver_devices returned None") + self.assertIsInstance(rawdevices, list, "list_receiver_devices should have returned a list") + Test_UR_Base.ur_available = len(rawdevices) > 0 + + def test_20_try_open(self): + if not self.ur_available: + self.fail("No receiver found") + + for rawdevice in base.list_receiver_devices(): + handle = base.try_open(rawdevice.path) + if handle is None: + continue + + self.assertIsInstance(handle, int, "try_open should have returned an int") + + if Test_UR_Base.handle is None: + Test_UR_Base.handle = handle + else: + base.close(handle) + base.close(Test_UR_Base.handle) + Test_UR_Base.handle = None + self.fail("try_open found multiple valid receiver handles") + + self.assertIsNotNone(self.handle, "no valid receiver handles found") + + def test_25_ping_device_zero(self): + if self.handle is None: + self.fail("No receiver found") + + w = base.write(self.handle, 0, b'\x00\x10\x00\x00\xAA') + self.assertIsNone(w, "write should have returned None") + reply = base.read(self.handle, base.DEFAULT_TIMEOUT * 3) + self.assertIsNotNone(reply, "None reply for ping") + self.assertIsInstance(reply, tuple, "read should have returned a tuple") + + reply_code, reply_device, reply_data = reply + self.assertEquals(reply_device, 0, "got ping reply for valid device") + self.assertGreater(len(reply_data), 4, "ping reply has wrong length: " + reply_data.encode('hex')) + if reply_code == 0x10: + # ping fail + self.assertEquals(reply_data[:3], b'\x8F\x00\x10', "0x10 reply with unknown reply data: " + reply_data.encode('hex')) + elif reply_code == 0x11: + self.fail("Got valid ping from device 0") + else: + self.fail("ping got bad reply code: " + reply) + + def test_30_ping_all_devices(self): + if self.handle is None: + self.fail("No receiver found") + + devices = [] + + for device in range(1, 7): + w = base.write(self.handle, device, b'\x00\x10\x00\x00\xAA') + self.assertIsNone(w, "write should have returned None") + reply = base.read(self.handle, base.DEFAULT_TIMEOUT * 3) + self.assertIsNotNone(reply, "None reply for ping") + self.assertIsInstance(reply, tuple, "read should have returned a tuple") + + reply_code, reply_device, reply_data = reply + self.assertEquals(reply_device, device, "ping reply for wrong device") + self.assertGreater(len(reply_data), 4, "ping reply has wrong length: " + reply_data.encode('hex')) + if reply_code == 0x10: + # ping fail + self.assertEquals(reply_data[:3], b'\x8F\x00\x10', "0x10 reply with unknown reply data: " + reply_data.encode('hex')) + elif reply_code == 0x11: + # ping ok + self.assertEquals(reply_data[:2], b'\x00\x10', "0x11 reply with unknown reply data: " + reply_data.encode('hex')) + self.assertEquals(reply_data[4], b'\xAA') + devices.append(device) + else: + self.fail("ping got bad reply code: " + reply) + + if devices: + Test_UR_Base.device = devices[0] + + def test_50_request_bad_device(self): + if self.handle is None: + self.fail("No receiver found") + + device = 1 if self.device is None else self.device + 1 + reply = base.request(self.handle, device, FEATURE.ROOT, FEATURE.FEATURE_SET) + self.assertIsNone(reply, "request returned valid reply") + + def test_52_request_root_no_feature(self): + if self.handle is None: + self.fail("No receiver found") + if self.device is None: + self.fail("No devices attached") + + reply = base.request(self.handle, self.device, FEATURE.ROOT) + self.assertIsNotNone(reply, "request returned None reply") + self.assertEquals(reply[:2], b'\x00\x00', "request returned for wrong feature id") + + def test_55_request_root_feature_set(self): + if self.handle is None: + self.fail("No receiver found") + if self.device is None: + self.fail("No devices attached") + + reply = base.request(self.handle, self.device, FEATURE.ROOT, FEATURE.FEATURE_SET) + self.assertIsNotNone(reply, "request returned None reply") + index = ord(reply[0]) + self.assertGreater(index, 0, "FEATURE_SET not available on device " + str(self.device)) + + def test_57_request_ignore_undhandled(self): + if self.handle is None: + self.fail("No receiver found") + if self.device is None: + self.fail("No devices attached") + + fs_index = base.request(self.handle, self.device, FEATURE.ROOT, FEATURE.FEATURE_SET) + self.assertIsNotNone(fs_index) + fs_index = fs_index[0] + self.assertGreater(fs_index, 0) + + global received_unhandled + received_unhandled = None + + def _unhandled(code, device, data): + self.assertIsNotNone(code) + self.assertIsInstance(code, int) + self.assertIsNotNone(device) + self.assertIsInstance(device, int) + self.assertIsNotNone(data) + self.assertIsInstance(data, str) + global received_unhandled + received_unhandled = (code, device, data) + + # set_unhandled_hook(_unhandled) + base.write(self.handle, self.device, FEATURE.ROOT + FEATURE.FEATURE_SET) + reply = base.request(self.handle, self.device, fs_index + b'\x00') + self.assertIsNotNone(reply, "request returned None reply") + self.assertNotEquals(reply[0], b'\x00') + # self.assertIsNotNone(received_unhandled, "extra message not received by unhandled hook") + + received_unhandled = None + # set_unhandled_hook() + base.write(self.handle, self.device, FEATURE.ROOT + FEATURE.FEATURE_SET) + reply = base.request(self.handle, self.device, fs_index + b'\x00') + self.assertIsNotNone(reply, "request returned None reply") + self.assertNotEquals(reply[0], b'\x00') + self.assertIsNone(received_unhandled) + + del received_unhandled + + # def test_90_receiver_missing(self): + # if self.handle is None: + # self.fail("No receiver found") + # + # logging.warn("remove the receiver in 5 seconds or this test will fail") + # import time + # time.sleep(5) + # with self.assertRaises(NoReceiver): + # self.test_30_ping_all_devices() + + +if __name__ == '__main__': + unittest.main() diff --git a/logitech/unifying_receiver/tests/test_50_api.py b/logitech/unifying_receiver/tests/test_50_api.py new file mode 100644 index 00000000..c90d1bae --- /dev/null +++ b/logitech/unifying_receiver/tests/test_50_api.py @@ -0,0 +1,117 @@ +# +# +# + +import unittest + +from logitech.unifying_receiver import api +from logitech.unifying_receiver.exceptions import * +from logitech.unifying_receiver.constants import * + + +class Test_UR_API(unittest.TestCase): + @classmethod + def setUpClass(cls): + cls.handle = None + cls.device = None + cls.features_array = None + + @classmethod + def tearDownClass(cls): + if cls.handle: + api.close(cls.handle) + cls.device = None + cls.features_array = None + + def test_00_open_receiver(self): + Test_UR_API.handle = api.open() + if self.handle is None: + self.fail("No receiver found") + + def test_05_ping_device_zero(self): + ok = api.ping(self.handle, 0) + self.assertIsNotNone(ok, "invalid ping reply") + self.assertFalse(ok, "device zero replied") + + def test_10_ping_all_devices(self): + devices = [] + + for device in range(1, 7): + ok = api.ping(self.handle, device) + self.assertIsNotNone(ok, "invalid ping reply") + if ok: + devices.append(device) + + if devices: + Test_UR_API.device = devices[0] + + def test_30_get_feature_index(self): + if self.device is None: + self.fail("Found no devices attached.") + + fs_index = api.get_feature_index(self.handle, self.device, FEATURE.FEATURE_SET) + self.assertIsNotNone(fs_index, "feature FEATURE_SET not available") + self.assertGreater(fs_index, 0, "invalid FEATURE_SET index: " + str(fs_index)) + + def test_31_bad_feature(self): + if self.device is None: + self.fail("Found no devices attached.") + + reply = api.request(self.handle, self.device, FEATURE.ROOT, params=b'\xFF\xFF') + self.assertIsNotNone(reply, "invalid reply") + self.assertEquals(reply[:5], b'\x00' * 5, "invalid reply") + + def test_40_get_device_features(self): + if self.device is None: + self.fail("Found no devices attached.") + + features = api.get_device_features(self.handle, self.device) + self.assertIsNotNone(features, "failed to read features array") + self.assertIn(FEATURE.FEATURE_SET, features, "feature FEATURE_SET not available") + # cache this to simplify next tests + Test_UR_API.features_array = features + + def test_50_get_device_firmware(self): + if self.device is None: + self.fail("Found no devices attached.") + if self.features_array is None: + self.fail("no feature set available") + + d_firmware = api.get_device_firmware(self.handle, self.device, self.features_array) + self.assertIsNotNone(d_firmware, "failed to get device type") + self.assertGreater(len(d_firmware), 0, "empty device type") + + def test_52_get_device_type(self): + if self.device is None: + self.fail("Found no devices attached.") + if self.features_array is None: + self.fail("no feature set available") + + d_type = api.get_device_type(self.handle, self.device, self.features_array) + self.assertIsNotNone(d_type, "failed to get device type") + self.assertGreater(len(d_type), 0, "empty device type") + + def test_55_get_device_name(self): + if self.device is None: + self.fail("Found no devices attached.") + if self.features_array is None: + self.fail("no feature set available") + + d_name = api.get_device_name(self.handle, self.device, self.features_array) + self.assertIsNotNone(d_name, "failed to read device name") + self.assertGreater(len(d_name), 0, "empty device name") + + def test_60_get_battery_level(self): + if self.device is None: + self.fail("Found no devices attached.") + if self.features_array is None: + self.fail("no feature set available") + + try: + battery = api.get_device_battery_level(self.handle, self.device, self.features_array) + self.assertIsNotNone(battery, "failed to read battery level") + except FeatureNotSupported: + self.fail("BATTERY feature not supported by device " + str(self.device)) + +if __name__ == '__main__': + unittest.main() diff --git a/logitech/unifying_receiver/unhandled.py b/logitech/unifying_receiver/unhandled.py new file mode 100644 index 00000000..ec8ebaf7 --- /dev/null +++ b/logitech/unifying_receiver/unhandled.py @@ -0,0 +1,40 @@ +# +# Optional hook for unhandled data packets received while talking to the UR. +# These are usually broadcast events received from the attached devices. +# + +import logging +_l = logging.getLogger('logitech.unifying_receiver.unhandled') + + +def _logging_unhandled_hook(reply_code, device, data): + """Default unhandled hook, logs the reply as DEBUG.""" + _l.debug("UNHANDLED (,%d) code 0x%02x data [%s]", device, reply_code, data.encode('hex')) + + +_unhandled_hook = _logging_unhandled_hook + + +def _publish(reply_code, device, data): + """Delivers a reply to the unhandled hook, if any.""" + if _unhandled_hook is not None: + _unhandled_hook.__call__(reply_code, device, data) + + +def set_unhandled_hook(hook=None): + """Sets the function that will be called on unhandled incoming events. + + The hook must be a function with the signature: ``_(int, int, str)``, where + the parameters are: (reply code, device number, data). + + This hook will only be called by the request() function, when it receives + replies that do not match the requested feature call. As such, it is not + suitable for intercepting broadcast events from the device (e.g. special + keys being pressed, battery charge events, etc), at least not in a timely + manner. However, these events *may* be delivered here if they happen while + doing a feature call to the device. + + The default implementation logs the unhandled reply as DEBUG. + """ + global _unhandled_hook + _unhandled_hook = hook diff --git a/logitech/ur_lowlevel_test.py b/logitech/ur_lowlevel_test.py deleted file mode 100644 index c1ebc531..00000000 --- a/logitech/ur_lowlevel_test.py +++ /dev/null @@ -1,112 +0,0 @@ -import unittest -import logging - -logging.root.addHandler(logging.FileHandler('test.log', mode='w')) -logging.root.setLevel(1) - -from . import ur_lowlevel as urll - - -class TestLUR(unittest.TestCase): - @classmethod - def setUpClass(cls): - cls.handle = urll.open() - cls.device = None - cls.features_array = None - - @classmethod - def tearDownClass(cls): - cls.device = None - cls.features_array = None - if cls.handle: - urll.close(cls.handle) - - def setUp(self): - if self.handle is None: - self.skipTest("Logitech Unifying Receiver not found") - - def first_device(self): - if TestLUR.device is None: - for device in range(1, 7): - ok = urll.ping(self.handle, device) - self.assertIsNotNone(ok, "invalid ping reply") - if ok: - TestLUR.device = device - return device - self.skipTest("No attached device found") - else: - return TestLUR.device - - def test_00_ping_device_zero(self): - ok = urll.ping(self.handle, 0) - self.assertIsNotNone(ok, "invalid ping reply") - self.assertFalse(ok, "device zero replied") - - def test_10_ping_all_devices(self): - devices = [] - for device in range(1, 7): - ok = urll.ping(self.handle, device) - self.assertIsNotNone(ok, "invalid ping reply") - if ok: - devices.append(device) - # if devices: - # print "found", len(devices), "device(s)", devices - # else: - # print "no devices found" - - def test_30_root_feature(self): - device = self.first_device() - fs_index = urll.get_feature_index(self.handle, device, urll.FEATURE.FEATURE_SET) - self.assertIsNotNone(fs_index, "feature FEATURE_SET not available") - self.assertGreater(fs_index, 0, "invalid FEATURE_SET index: " + str(fs_index)) - - def test_31_bad_feature(self): - device = self.first_device() - reply = urll._request(self.handle, device, urll.FEATURE.ROOT, b'\xFF\xFF') - self.assertIsNotNone(reply, "invalid reply") - self.assertEquals(reply[:5], b'\x00' * 5, "invalid reply") - - def test_40_features(self): - device = self.first_device() - features = urll.get_device_features(self.handle, device) - self.assertIsNotNone(features, "failed to read features array") - self.assertIn(urll.FEATURE.FEATURE_SET, features, "feature FEATURE_SET not available") - # cache this to simplify next tests - TestLUR.features_array = features - - def test_50_device_type(self): - device = self.first_device() - if not TestLUR.features_array: - self.skipTest("no feature set available") - - d_type = urll.request(self.handle, device, urll.FEATURE.NAME, function=b'\x20', features_array=TestLUR.features_array) - self.assertIsNotNone(d_type, "no device type for " + str(device)) - d_type = ord(d_type[0]) - self.assertGreaterEqual(d_type, 0, "negative device type " + str(d_type)) - self.assertLess(d_type, len(urll.DEVICE_TYPES[d_type]), "unknown device type " + str(d_type)) - print "device", device, "type", urll.DEVICE_TYPES[d_type], - - def test_55_device_name(self): - device = self.first_device() - if not TestLUR.features_array: - self.skipTest("no feature set available") - - d_name_length = urll.request(self.handle, device, urll.FEATURE.NAME, features_array=TestLUR.features_array) - self.assertIsNotNone(d_name_length, "no device name length for " + str(device)) - self.assertTrue(d_name_length > 0, "zero device name length for " + str(device)) - d_name_length = ord(d_name_length[0]) - - d_name = '' - while len(d_name) < d_name_length: - name_index = len(d_name) - name_fragment = urll.request(self.handle, device, urll.FEATURE.NAME, function=b'\x10', data=chr(name_index), features_array=TestLUR.features_array) - self.assertIsNotNone(name_fragment, "no device name fragment " + str(device) + " @" + str(name_index)) - name_fragment = name_fragment[:d_name_length - len(d_name)] - self.assertNotEqual(name_fragment[0], b'\x00', "empty fragment " + str(device) + " @" + str(name_index)) - d_name += name_fragment - self.assertEquals(len(d_name), d_name_length) - print "device", device, "name", d_name, - - -if __name__ == '__main__': - unittest.main() diff --git a/test.sh b/test.sh deleted file mode 100755 index dad374f4..00000000 --- a/test.sh +++ /dev/null @@ -1,5 +0,0 @@ -#!/bin/sh - -cd `dirname "$0"` -rm -f test.log -python -m unittest discover -v -p '*_test.py' diff --git a/unittest.sh b/unittest.sh new file mode 100755 index 00000000..26fc6624 --- /dev/null +++ b/unittest.sh @@ -0,0 +1,10 @@ +#!/bin/sh + +cd `dirname "$0"` + +export LD_LIBRARY_PATH=$PWD/lib +export PYTHONPATH=$PWD/lib +export PYTHONDONTWRITEBYTECODE=true +export PYTHONWARNINGS=all + +python -m unittest discover -v "$@"