made full package out of unifying_receiver, added some tests

This commit is contained in:
Daniel Pavel 2012-09-25 13:49:24 +03:00
parent 2c5a3b0ed2
commit cd3ffcca81
15 changed files with 1074 additions and 119 deletions

View File

@ -1,3 +1 @@
# pass
__all__ = ['unifying_receiver']

View File

@ -0,0 +1,37 @@
"""Low-level interface for devices connected through a Logitech Universal
Receiver (UR).
Uses the HID api exposed through hidapi.py, a Python thin layer over a native
implementation.
Incomplete. Based on a bit of documentation, trial-and-error, and guesswork.
Strongly recommended to use these functions from a single thread; calling
multiple functions from different threads has a high chance of mixing the
replies and causing apparent failures.
Basic order of operations is:
- open() to obtain a UR handle
- request() to make a feature call to one of the devices attached to the UR
- close() to close the UR handle
References:
http://julien.danjou.info/blog/2012/logitech-k750-linux-support
http://6xq.net/git/lars/lshidpp.git/plain/doc/
"""
#
# Logging set-up.
# Add a new logging level for tracing low-level writes and reads.
#
import logging
_l = logging.getLogger('unifying_receiver')
_LOG_LEVEL = 5
_l.setLevel(_LOG_LEVEL)
from .constants import *
from .exceptions import *
from .api import *

View File

@ -0,0 +1,279 @@
#
# Logitech Unifying Receiver API.
#
import logging
_LOG_LEVEL = 5
_l = logging.getLogger('logitech.unifying_receiver.api')
_l.setLevel(_LOG_LEVEL)
from .constants import *
from .exceptions import *
from . import base
from .unhandled import _publish as _unhandled_publish
def open():
"""Opens the first Logitech UR found attached to the machine.
:returns: An open file handle for the found receiver, or ``None``.
"""
for rawdevice in base.list_receiver_devices():
_l.log(_LOG_LEVEL, "checking %s", rawdevice)
receiver = base.try_open(rawdevice.path)
if receiver:
return receiver
return None
"""Closes a HID device handle."""
close = base.close
def request(handle, device, feature, function=b'\x00', params=b'', features_array=None):
"""Makes a feature call to the device, and returns the reply data.
Basically a write() followed by (possibly multiple) reads, until a reply
matching the called feature is received. In theory the UR will always reply
to feature call; otherwise this function will wait indefinitely.
Incoming data packets not matching the feature and function will be
delivered to the unhandled hook (if any), and ignored.
The optional ``features_array`` parameter is a cached result of the
get_device_features function for this device, necessary to find the feature
index. If the ``features_arrary`` is not provided, one will be obtained by
manually calling get_device_features before making the request call proper.
:raises FeatureNotSupported: if the device does not support the feature.
"""
feature_index = None
if feature == FEATURE.ROOT:
feature_index = b'\x00'
else:
if features_array is None:
features_array = get_device_features(handle, device)
if features_array is None:
_l.log(_LOG_LEVEL, "(%d,%d) no features array available", handle, device)
return None
if feature in features_array:
feature_index = chr(features_array.index(feature))
if feature_index is None:
_l.warn("(%d,%d) feature <%s:%s> not supported", handle, device, feature.encode('hex'), FEATURE_NAME(feature))
raise FeatureNotSupported(device, feature)
return base.request(handle, device, feature_index + function, params)
def ping(handle, device):
"""Pings a device number to check if it is attached to the UR.
:returns: True if the device is connected to the UR, False if the device is
not attached, None if no conclusive reply is received.
"""
ping_marker = b'\xAA'
def _status(reply):
if not reply:
return None
reply_code, reply_device, reply_data = reply
if reply_device != device:
# oops
_l.log(_LOG_LEVEL, "(%d,%d) ping: reply for another device %d: %s", handle, device, reply_device, reply_data.encode('hex'))
_unhandled_publish(reply_code, reply_device, reply_data)
return _status(base.read(handle))
if (reply_code == 0x11 and reply_data[:2] == b'\x00\x10' and reply_data[4] == ping_marker):
# ping ok
_l.log(_LOG_LEVEL, "(%d,%d) ping: ok [%s]", handle, device, reply_data.encode('hex'))
return True
if (reply_code == 0x10 and reply_data[:2] == b'\x8F\x00'):
# ping failed
_l.log(_LOG_LEVEL, "(%d,%d) ping: device not present", handle, device)
return False
if (reply_code == 0x11 and reply_data[:2] == b'\x09\x00' and len(reply_data) == 18 and reply_data[7:11] == b'GOOD'):
# some devices may reply with a SOLAR_STATUS event before the
# ping_ok reply, especially right after the device connected to the
# receiver
_l.log(_LOG_LEVEL, "(%d,%d) ping: solar status %s", handle, device, reply_data.encode('hex'))
_unhandled_publish(reply_code, reply_device, reply_data)
return _status(base.read(handle))
# ugh
_l.log(_LOG_LEVEL, "(%d,%d) ping: unknown reply for this device", handle, device, reply)
_unhandled_publish(reply_code, reply_device, reply_data)
return None
_l.log(_LOG_LEVEL, "(%d,%d) pinging", handle, device)
base.write(handle, device, b'\x00\x10\x00\x00' + ping_marker)
# pings may take a while to reply success
return _status(base.read(handle, base.DEFAULT_TIMEOUT * 3))
def get_feature_index(handle, device, feature):
"""Reads the index of a device's feature.
:returns: An int, or ``None`` if the feature is not available.
"""
_l.log(_LOG_LEVEL, "(%d,%d) get feature index <%s:%s>", handle, device, feature.encode('hex'), FEATURE_NAME(feature))
if len(feature) != 2:
raise ValueError("invalid feature <%s>: it must be a two-byte string" % (feature.encode(hex)))
# FEATURE.ROOT should always be available for any attached devices
reply = base.request(handle, device, FEATURE.ROOT, feature)
if reply:
# only consider active and supported features
feature_index = ord(reply[0])
if feature_index:
feature_flags = ord(reply[1]) & 0xE0
_l.log(_LOG_LEVEL, "(%d,%d) feature <%s:%s> has index %d flags %02x", handle, device, feature.encode('hex'), FEATURE_NAME(feature), feature_index, feature_flags)
if feature_flags == 0:
return feature_index
if feature_flags & 0x80:
_l.warn("(%d,%d) feature <%s:%s> not supported: obsolete", handle, device, feature.encode('hex'), FEATURE_NAME(feature))
if feature_flags & 0x40:
_l.warn("(%d,%d) feature <%s:%s> not supported: hidden", handle, device, feature.encode('hex'), FEATURE_NAME(feature))
if feature_flags & 0x20:
_l.warn("(%d,%d) feature <%s:%s> not supported: Logitech internal", handle, device, feature.encode('hex'), FEATURE_NAME(feature))
raise FeatureNotSupported(device, feature)
else:
_l.warn("(%d,%d) feature <%s:%s> not supported by the device", handle, device, feature.encode('hex'), FEATURE_NAME(feature))
raise FeatureNotSupported(device, feature)
def get_device_features(handle, device):
"""Returns an array of feature ids.
Their position in the array is the index to be used when requesting that
feature on the device.
"""
_l.log(_LOG_LEVEL, "(%d,%d) get device features", handle, device)
# get the index of the FEATURE_SET
# FEATURE.ROOT should always be available for all devices
fs_index = base.request(handle, device, FEATURE.ROOT, FEATURE.FEATURE_SET)
if not fs_index:
_l.warn("(%d,%d) FEATURE_SET not available", handle, device)
return None
fs_index = fs_index[0]
# For debugging purposes, query all the available features on the device,
# even if unknown.
# get the number of active features the device has
features_count = base.request(handle, device, fs_index + b'\x00')
if not features_count:
# this can happen if the device disappeard since the fs_index request
# otherwise we should get at least a count of 1 (the FEATURE_SET we've just used above)
_l.log(_LOG_LEVEL, "(%d,%d) no features available?!", handle, device)
return None
features_count = ord(features_count[0])
_l.log(_LOG_LEVEL, "(%d,%d) found %d features", handle, device, features_count)
# a device may have a maximum of 15 features, other than FEATURE.ROOT
features = [None] * 0x10
for index in range(1, 1 + features_count):
# for each index, get the feature residing at that index
feature = base.request(handle, device, fs_index + b'\x10', chr(index))
if feature:
feature = feature[0:2].upper()
features[index] = feature
_l.log(_LOG_LEVEL, "(%d,%d) feature <%s:%s> at index %d", handle, device, feature.encode('hex'), FEATURE_NAME(feature), index)
return None if all(c == None for c in features) else features
def get_device_firmware(handle, device, features_array=None):
"""Reads a device's firmware info.
Returns an list of tuples [ (firmware_type, firmware_version, ...), ... ],
ordered by firmware layer.
"""
fw_count = request(handle, device, FEATURE.FIRMWARE, features_array=features_array)
if fw_count:
fw_count = ord(fw_count[0])
fw = []
for index in range(0, fw_count):
fw_info = request(handle, device, FEATURE.FIRMWARE, function=b'\x10', params=chr(index), features_array=features_array)
if fw_info:
fw_type = ord(fw_info[0]) & 0x0F
if fw_type == 0 or fw_type == 1:
prefix = str(fw_info[1:4])
version = ( str((ord(fw_info[4]) & 0xF0) >> 4) +
str(ord(fw_info[4]) & 0x0F) +
'.' +
str((ord(fw_info[5]) & 0xF0) >> 4) +
str(ord(fw_info[5]) & 0x0F))
name = prefix + ' ' + version
build = 256 * ord(fw_info[6]) + ord(fw_info[7])
if build:
name += ' b' + str(build)
extras = fw_info[9:].rstrip('\x00')
_l.log(_LOG_LEVEL, "(%d:%d) firmware %d = %s %s extras=%s", handle, device, fw_type, FIRMWARE_TYPES[fw_type], name, extras.encode('hex'))
fw.append((fw_type, name, build, extras))
elif fw_type == 2:
version = ord(fw_info[1])
_l.log(_LOG_LEVEL, "(%d:%d) firmware 2 = Hardware v%x", handle, device, version)
fw.append((2, version))
else:
_l.log(_LOG_LEVEL, "(%d:%d) firmware other", handle, device)
fw.append((fw_type, ))
return fw
def get_device_type(handle, device, features_array=None):
"""Reads a device's type.
:see DEVICE_TYPES:
:returns: a string describing the device type, or ``None`` if the device is
not available or does not support the ``NAME`` feature.
"""
d_type = request(handle, device, FEATURE.NAME, function=b'\x20', features_array=features_array)
if d_type:
d_type = ord(d_type[0])
_l.log(_LOG_LEVEL, "(%d,%d) device type %d = %s", handle, device, d_type, DEVICE_TYPES[d_type])
return DEVICE_TYPES[d_type]
def get_device_name(handle, device, features_array=None):
"""Reads a device's name.
:returns: a string with the device name, or ``None`` if the device is not
available or does not support the ``NAME`` feature.
"""
name_length = request(handle, device, FEATURE.NAME, features_array=features_array)
if name_length:
name_length = ord(name_length[0])
d_name = ''
while len(d_name) < name_length:
name_index = len(d_name)
name_fragment = request(handle, device, FEATURE.NAME, function=b'\x10', params=chr(name_index), features_array=features_array)
name_fragment = name_fragment[:name_length - len(d_name)]
d_name += name_fragment
_l.log(_LOG_LEVEL, "(%d,%d) device name %s", handle, device, d_name)
return d_name
def get_device_battery_level(handle, device, features_array=None):
"""Reads a device's battery level.
"""
battery = request(handle, device, FEATURE.BATTERY, features_array=features_array)
if battery:
discharge = ord(battery[0])
dischargeNext = ord(battery[1])
status = ord(battery[2])
_l.log(_LOG_LEVEL, "(%d:%d) battery %d%% charged, next level %d%% charge, status %d = %s", discharge, dischargeNext, status, BATTERY_STATUSES[status])
return (discharge, dischargeNext, status)

View File

@ -0,0 +1,240 @@
#
# Base low-level functions used by the API proper.
# Unlikely to be used directly unless you're expanding the API.
#
import logging
_LOG_LEVEL = 4
_l = logging.getLogger('logitech.unifying_receiver.base')
_l.setLevel(_LOG_LEVEL)
from .constants import *
from .exceptions import *
from . import unhandled as _unhandled
import hidapi as _hid
#
# These values are defined by the Logitech documentation.
# Overstepping these boundaries will only produce log warnings.
#
"""Minimim lenght of a feature call packet."""
_MIN_CALL_SIZE = 7
"""Maximum lenght of a feature call packet."""
_MAX_CALL_SIZE = 20
"""Minimum size of a feature reply packet."""
_MIN_REPLY_SIZE = _MIN_CALL_SIZE
"""Maximum size of a feature reply packet."""
_MAX_REPLY_SIZE = _MAX_CALL_SIZE
"""Default timeout on read (in ms)."""
DEFAULT_TIMEOUT = 1000
#
#
#
def list_receiver_devices():
"""List all the Linux devices exposed by the UR attached to the machine."""
# (Vendor ID, Product ID) = ('Logitech', 'Unifying Receiver')
# interface 2 if the actual receiver interface
return _hid.enumerate(0x046d, 0xc52b, 2)
def try_open(path):
"""Checks if the given device path points to the right UR device.
:param path: the Linux device path.
The UR physical device may expose multiple linux devices with the same
interface, so we have to check for the right one. At this moment the only
way to distinguish betheen them is to do a test ping on an invalid
(attached) device number (i.e., 0), expecting a 'ping failed' reply.
:returns: an open receiver handle if this is the right Linux device, or
``None``.
"""
receiver_handle = _hid.open_path(path)
if receiver_handle is None:
# could be a file permissions issue (did you add the udev rules?)
# in any case, unreachable
_l.log(_LOG_LEVEL, "[%s] open failed", path)
return None
_l.log(_LOG_LEVEL, "[%s] receiver handle (%d,)", path, receiver_handle)
# ping on device id 0 (always an error)
_hid.write(receiver_handle, b'\x10\x00\x00\x10\x00\x00\xAA')
# if this is the right hidraw device, we'll receive a 'bad device' from the UR
# otherwise, the read should produce nothing
reply = _hid.read(receiver_handle, _MAX_REPLY_SIZE, DEFAULT_TIMEOUT)
if reply:
if reply[:4] == b'\x10\x00\x8F\x00':
# 'device 0 unreachable' is the expected reply from a valid receiver handle
_l.log(_LOG_LEVEL, "[%s] success: handle (%d,)", path, receiver_handle)
return receiver_handle
# any other replies are ignored, and will assume this is the wrong receiver device
if reply == b'\x01\x00\x00\x00\x00\x00\x00\x00':
# no idea what this is, but it comes up occasionally
_l.log(_LOG_LEVEL, "[%s] (%d,) mistery reply [%s]", path, receiver_handle, reply.encode('hex'))
else:
_l.log(_LOG_LEVEL, "[%s] (%d,) unknown reply [%s]", path, receiver_handle, reply.encode('hex'))
else:
_l.log(_LOG_LEVEL, "[%s] (%d,) no reply", path, receiver_handle)
close(receiver_handle)
return None
def close(handle):
"""Closes a HID device handle."""
if handle:
try:
_hid.close(handle)
_l.log(_LOG_LEVEL, "(%d,) closed", handle)
return True
except:
_l.exception("(%d,) closing", handle)
return False
# def write(handle, device, feature_index, function=b'\x00', param1=b'\x00', param2=b'\x00', param3=b'\x00'):
# """Write a feature call to the receiver.
#
# :param handle: UR handle obtained with open().
# :param device: attached device number.
# :param feature_index: index in the device's feature array.
# """
# if type(feature_index) == int:
# feature_index = chr(feature_index)
# data = feature_index + function + param1 + param2 + param3
# return _write(handle, device, data)
def write(handle, device, data):
"""Writes some data to a certain device.
:param handle: an open UR handle.
:param device: attached device number.
:param data: data to send, up to 5 bytes.
The first two (required) bytes of data must be the feature index for the
device, and a function code for that feature.
:raises NoReceiver: if the receiver is no longer available, i.e. has
been physically removed from the machine, or the kernel driver has been
unloaded.
"""
wdata = b'\x10' + chr(device) + data + b'\x00' * (5 - len(data))
_l.log(_LOG_LEVEL, "(%d,%d) <= w[%s]", handle, device, wdata.encode('hex'))
if len(wdata) < _MIN_CALL_SIZE:
_l.warn("(%d:%d) <= w[%s] call packet too short: %d bytes", handle, device, wdata.encode('hex'), len(wdata))
if len(wdata) > _MAX_CALL_SIZE:
_l.warn("(%d:%d) <= w[%s] call packet too long: %d bytes", handle, device, wdata.encode('hex'), len(wdata))
if not _hid.write(handle, wdata):
_l.warn("(%d,%d) write failed, assuming receiver no longer available", handle, device)
raise NoReceiver()
def read(handle, timeout=DEFAULT_TIMEOUT):
"""Read some data from the receiver. Usually called after a write (feature
call), to get the reply.
:param handle: an open UR handle.
:param timeout: read timeout on the UR handle.
If any data was read in the given timeout, returns a tuple of
(reply_code, device, message data). The reply code should be ``0x11`` for a
successful feature call, or ``0x10`` to indicate some error, e.g. the device
is no longer available.
"""
data = _hid.read(handle, _MAX_REPLY_SIZE * 2, timeout)
if data:
_l.log(_LOG_LEVEL, "(%d,*) => r[%s]", handle, data.encode('hex'))
if len(data) < _MIN_REPLY_SIZE:
_l.warn("(%d,*) => r[%s] read packet too short: %d bytes", handle, data.encode('hex'), len(data))
if len(data) > _MAX_REPLY_SIZE:
_l.warn("(%d,*) => r[%s] read packet too long: %d bytes", handle, data.encode('hex'), len(data))
return ord(data[0]), ord(data[1]), data[2:]
else:
_l.log(_LOG_LEVEL, "(%d,*) => r[]", handle)
def request(handle, device, feature_index_function, params=b'', features_array=None):
"""Makes a feature call device and waits for a matching reply.
This function will skip all incoming messages and events not related to the
device we're requesting for, or the feature specified in the initial
request; it will also wait for a matching reply indefinitely.
:param handle: an open UR handle.
:param device: attached device number.
:param feature_index_function: a two-byte string of (feature_index, feature_function).
:param params: parameters for the feature call, 3 to 16 bytes.
:param features_array: optional features array for the device, only used to
fill the FeatureCallError exception if one occurs.
:returns: the reply data packet, or ``None`` if the device is no longer
available.
:raisees FeatureCallError: if the feature call replied with an error.
"""
_l.log(_LOG_LEVEL, "(%d,%d) request {%s} params [%s]", handle, device, feature_index_function.encode('hex'), params.encode('hex'))
if len(feature_index_function) != 2:
raise ValueError('invalid feature_index_function {%s}: it must be a two-byte string' % feature_index_function.encode('hex'))
write(handle, device, feature_index_function + params)
while True:
reply = read(handle)
if not reply:
# keep waiting...
continue
reply_code, reply_device, reply_data = reply
if reply_device != device:
# this message not for the device we're interested in
_l.log(_LOG_LEVEL, "(%d,%d) request got reply for unexpected device %d: [%s]", handle, device, reply_device, reply.encode('hex'))
# worst case scenario, this is a reply for a concurrent request
# on this receiver
_unhandled._publish(reply_code, reply_device, reply_data)
continue
if reply_code == 0x10 and reply_data[0] == b'\x8F' and reply_data[1:2] == feature_index_function:
# device not present
_l.log(_LOG_LEVEL, "(%d,%d) request ping failed on {%s} call: [%s]", handle, device, feature_index_function.encode('hex'), reply_data.encode('hex'))
return None
if reply_code == 0x10 and reply_data[0] == b'\x8F':
# device not present
_l.log(_LOG_LEVEL, "(%d,%d) request ping failed: [%s]", handle, device, reply_data.encode('hex'))
return None
if reply_code == 0x11 and reply_data[0] == b'\xFF' and reply_data[1:3] == feature_index_function:
# an error returned from the device
error_code = ord(reply_data[3])
_l.warn("(%d,%d) request feature call error %d = %s: %s", handle, device, error, _ERROR_NAME(error_code), reply_data.encode('hex'))
feature_index = ord(feature_index_function[0])
feature_function = feature_index_function[1].encode('hex')
feature = None if features_array is None else features_array[feature_index]
raise FeatureCallError(device, feature, feature_index, feature_function, error_code, reply_data)
if reply_code == 0x11 and reply_data[:2] == feature_index_function:
# a matching reply
_l.log(_LOG_LEVEL, "(%d,%d) matched reply with data [%s]", handle, device, reply_data[2:].encode('hex'))
return reply_data[2:]
_l.log(_LOG_LEVEL, "(%d,%d) unmatched reply {%s} (expected {%s})", handle, device, reply_data[:2].encode('hex'), feature_index_function.encode('hex'))
_unhandled._publish(reply_code, reply_device, reply_data)

View File

@ -0,0 +1,71 @@
#
# Constants used by the rest of the API.
#
"""Possible features available on a Logitech device.
A particular device might not support all these features, and may support other
unknown features as well.
"""
FEATURE = type('FEATURE', (),
dict(
ROOT=b'\x00\x00',
FEATURE_SET=b'\x00\x01',
FIRMWARE=b'\x00\x03',
NAME=b'\x00\x05',
BATTERY=b'\x10\x00',
REPROGRAMMABLE_KEYS=b'\x1B\x00',
WIRELESS_STATUS=b'\x1D\x4B',
# declared by the K750 keyboard, no documentation found so far
SOLAR_CHARGE=b'\x43\x01',
# declared by the K750 keyboard, no documentation found so far
# UNKNOWN_1DF3=b'\x1D\xF3',
# UNKNOWN_40A0=b'\x40\xA0',
# UNKNOWN_4100=b'\x41\x00',
# UNKNOWN_4520=b'\x45\x20',
))
"""Feature names indexed by feature id."""
_FEATURE_NAMES = {
FEATURE.ROOT: 'ROOT',
FEATURE.FEATURE_SET: 'FEATURE_SET',
FEATURE.FIRMWARE: 'FIRMWARE',
FEATURE.NAME: 'NAME',
FEATURE.BATTERY: 'BATTERY',
FEATURE.REPROGRAMMABLE_KEYS: 'REPROGRAMMABLE_KEYS',
FEATURE.WIRELESS_STATUS: 'WIRELESS_STATUS',
FEATURE.SOLAR_CHARGE: 'SOLAR_CHARGE',
}
def FEATURE_NAME(feature_code):
if feature_code is None:
return None
if feature_code in _FEATURE_NAMES:
return _FEATURE_NAMES[feature_code]
return 'UNKNOWN_' + feature_code.encode('hex')
"""Possible types of devices connected to an UR."""
DEVICE_TYPES = ("Keyboard", "Remote Control", "NUMPAD", "Mouse",
"Touchpad", "Trackball", "Presenter", "Receiver")
"""Names of different firmware levels possible, ordered from top to bottom."""
FIRMWARE_TYPES = ("Main (HID)", "Bootloader", "Hardware", "Other")
"""Names for possible battery status values."""
BATTERY_STATUSES = ("Discharging (in use)", "Recharging", "Almost full", "Full",
"Slow recharge", "Invalid battery", "Thermal error",
"Charging error")
"""Names for error codes."""
_ERROR_NAMES = ("Ok", "Unknown", "Invalid argument", "Out of range",
"Hardware error", "Logitech internal", "Invalid feature index",
"Invalid function", "Busy", "Unsupported")
def ERROR_NAME(error_code):
if error_code < len(_ERROR_NAMES):
return _ERROR_NAMES[error_code]
return 'Unknown Error'

View File

@ -0,0 +1,38 @@
#
# Exceptions that may be raised by this API.
#
from .constants import FEATURE_NAME as _FEATURE_NAME
from .constants import ERROR_NAME as _ERROR_NAME
class NoReceiver(Exception):
"""May be raised when trying to talk through a previously connected
receiver that is no longer available. Should only happen if the receiver is
physically disconnected from the machine, or its kernel driver module is
unloaded."""
pass
class FeatureNotSupported(Exception):
"""Raised when trying to request a feature not supported by the device."""
def __init__(self, device, feature):
super(FeatureNotSupported, self).__init__(device, feature, _FEATURE_NAME(feature))
self.device = device
self.feature = feature
self.feature_name = _FEATURE_NAME(feature)
class FeatureCallError(Exception):
"""Raised if the device replied to a feature call with an error."""
def __init__(self, device, feature, feature_index, feature_function, error_code, data=None):
super(FeatureCallError, self).__init__(device, feature, feature_index, feature_function, error_code, _ERROR_NAME(error_code))
self.device = device
self.feature = feature
self.feature_name = _FEATURE_NAME(feature)
self.feature_index = feature_index
self.feature_function = feature_function
self.error_code = error_code
self.error_string = _ERROR_NAME(error_code)
self.data = data

View File

@ -0,0 +1,3 @@
#
# Tests for the logitech.unifying_receiver package.
#

View File

@ -0,0 +1,17 @@
#
# test loading the hidapi library
#
import logging
import unittest
class Test_Import_HIDAPI(unittest.TestCase):
def test_00_import_hidapi(self):
import hidapi
self.assertIsNotNone(hidapi)
logging.info("hidapi loaded native implementation %s", hidapi._native._name)
if __name__ == '__main__':
unittest.main()

View File

@ -0,0 +1,34 @@
#
#
#
import unittest
from logitech.unifying_receiver import constants
class Test_UR_Constants(unittest.TestCase):
def test_10_feature_names(self):
self.assertIsNone(constants.FEATURE_NAME(None))
for code in range(0x0000, 0x10000):
feature = chr((code & 0xFF00) >> 8) + chr(code & 0x00FF)
name = constants.FEATURE_NAME(feature)
self.assertIsNotNone(name)
if name.startswith('UNKNOWN_'):
self.assertEquals(code, int(name[8:], 16))
else:
self.assertTrue(hasattr(constants.FEATURE, name))
self.assertEquals(feature, getattr(constants.FEATURE, name))
def test_20_error_names(self):
for code in range(0x00, 0x100):
name = constants.ERROR_NAME(code)
self.assertIsNotNone(name)
if code > 9:
self.assertEquals(name, 'Unknown Error')
else:
self.assertEquals(code, constants._ERROR_NAMES.index(name))
if __name__ == '__main__':
unittest.main()

View File

@ -0,0 +1,188 @@
#
#
#
import unittest
from logitech.unifying_receiver import base
from logitech.unifying_receiver.exceptions import *
from logitech.unifying_receiver.constants import *
from logitech.unifying_receiver.unhandled import *
class Test_UR_Base(unittest.TestCase):
@classmethod
def setUpClass(cls):
cls.ur_available = False
cls.handle = None
cls.device = None
@classmethod
def tearDownClass(cls):
if cls.handle:
base.close(cls.handle)
cls.ur_available = False
cls.handle = None
cls.device = None
def test_10_list_receiver_devices(self):
rawdevices = base.list_receiver_devices()
self.assertIsNotNone(rawdevices, "list_receiver_devices returned None")
self.assertIsInstance(rawdevices, list, "list_receiver_devices should have returned a list")
Test_UR_Base.ur_available = len(rawdevices) > 0
def test_20_try_open(self):
if not self.ur_available:
self.fail("No receiver found")
for rawdevice in base.list_receiver_devices():
handle = base.try_open(rawdevice.path)
if handle is None:
continue
self.assertIsInstance(handle, int, "try_open should have returned an int")
if Test_UR_Base.handle is None:
Test_UR_Base.handle = handle
else:
base.close(handle)
base.close(Test_UR_Base.handle)
Test_UR_Base.handle = None
self.fail("try_open found multiple valid receiver handles")
self.assertIsNotNone(self.handle, "no valid receiver handles found")
def test_25_ping_device_zero(self):
if self.handle is None:
self.fail("No receiver found")
w = base.write(self.handle, 0, b'\x00\x10\x00\x00\xAA')
self.assertIsNone(w, "write should have returned None")
reply = base.read(self.handle, base.DEFAULT_TIMEOUT * 3)
self.assertIsNotNone(reply, "None reply for ping")
self.assertIsInstance(reply, tuple, "read should have returned a tuple")
reply_code, reply_device, reply_data = reply
self.assertEquals(reply_device, 0, "got ping reply for valid device")
self.assertGreater(len(reply_data), 4, "ping reply has wrong length: " + reply_data.encode('hex'))
if reply_code == 0x10:
# ping fail
self.assertEquals(reply_data[:3], b'\x8F\x00\x10', "0x10 reply with unknown reply data: " + reply_data.encode('hex'))
elif reply_code == 0x11:
self.fail("Got valid ping from device 0")
else:
self.fail("ping got bad reply code: " + reply)
def test_30_ping_all_devices(self):
if self.handle is None:
self.fail("No receiver found")
devices = []
for device in range(1, 7):
w = base.write(self.handle, device, b'\x00\x10\x00\x00\xAA')
self.assertIsNone(w, "write should have returned None")
reply = base.read(self.handle, base.DEFAULT_TIMEOUT * 3)
self.assertIsNotNone(reply, "None reply for ping")
self.assertIsInstance(reply, tuple, "read should have returned a tuple")
reply_code, reply_device, reply_data = reply
self.assertEquals(reply_device, device, "ping reply for wrong device")
self.assertGreater(len(reply_data), 4, "ping reply has wrong length: " + reply_data.encode('hex'))
if reply_code == 0x10:
# ping fail
self.assertEquals(reply_data[:3], b'\x8F\x00\x10', "0x10 reply with unknown reply data: " + reply_data.encode('hex'))
elif reply_code == 0x11:
# ping ok
self.assertEquals(reply_data[:2], b'\x00\x10', "0x11 reply with unknown reply data: " + reply_data.encode('hex'))
self.assertEquals(reply_data[4], b'\xAA')
devices.append(device)
else:
self.fail("ping got bad reply code: " + reply)
if devices:
Test_UR_Base.device = devices[0]
def test_50_request_bad_device(self):
if self.handle is None:
self.fail("No receiver found")
device = 1 if self.device is None else self.device + 1
reply = base.request(self.handle, device, FEATURE.ROOT, FEATURE.FEATURE_SET)
self.assertIsNone(reply, "request returned valid reply")
def test_52_request_root_no_feature(self):
if self.handle is None:
self.fail("No receiver found")
if self.device is None:
self.fail("No devices attached")
reply = base.request(self.handle, self.device, FEATURE.ROOT)
self.assertIsNotNone(reply, "request returned None reply")
self.assertEquals(reply[:2], b'\x00\x00', "request returned for wrong feature id")
def test_55_request_root_feature_set(self):
if self.handle is None:
self.fail("No receiver found")
if self.device is None:
self.fail("No devices attached")
reply = base.request(self.handle, self.device, FEATURE.ROOT, FEATURE.FEATURE_SET)
self.assertIsNotNone(reply, "request returned None reply")
index = ord(reply[0])
self.assertGreater(index, 0, "FEATURE_SET not available on device " + str(self.device))
def test_57_request_ignore_undhandled(self):
if self.handle is None:
self.fail("No receiver found")
if self.device is None:
self.fail("No devices attached")
fs_index = base.request(self.handle, self.device, FEATURE.ROOT, FEATURE.FEATURE_SET)
self.assertIsNotNone(fs_index)
fs_index = fs_index[0]
self.assertGreater(fs_index, 0)
global received_unhandled
received_unhandled = None
def _unhandled(code, device, data):
self.assertIsNotNone(code)
self.assertIsInstance(code, int)
self.assertIsNotNone(device)
self.assertIsInstance(device, int)
self.assertIsNotNone(data)
self.assertIsInstance(data, str)
global received_unhandled
received_unhandled = (code, device, data)
# set_unhandled_hook(_unhandled)
base.write(self.handle, self.device, FEATURE.ROOT + FEATURE.FEATURE_SET)
reply = base.request(self.handle, self.device, fs_index + b'\x00')
self.assertIsNotNone(reply, "request returned None reply")
self.assertNotEquals(reply[0], b'\x00')
# self.assertIsNotNone(received_unhandled, "extra message not received by unhandled hook")
received_unhandled = None
# set_unhandled_hook()
base.write(self.handle, self.device, FEATURE.ROOT + FEATURE.FEATURE_SET)
reply = base.request(self.handle, self.device, fs_index + b'\x00')
self.assertIsNotNone(reply, "request returned None reply")
self.assertNotEquals(reply[0], b'\x00')
self.assertIsNone(received_unhandled)
del received_unhandled
# def test_90_receiver_missing(self):
# if self.handle is None:
# self.fail("No receiver found")
#
# logging.warn("remove the receiver in 5 seconds or this test will fail")
# import time
# time.sleep(5)
# with self.assertRaises(NoReceiver):
# self.test_30_ping_all_devices()
if __name__ == '__main__':
unittest.main()

View File

@ -0,0 +1,117 @@
#
#
#
import unittest
from logitech.unifying_receiver import api
from logitech.unifying_receiver.exceptions import *
from logitech.unifying_receiver.constants import *
class Test_UR_API(unittest.TestCase):
@classmethod
def setUpClass(cls):
cls.handle = None
cls.device = None
cls.features_array = None
@classmethod
def tearDownClass(cls):
if cls.handle:
api.close(cls.handle)
cls.device = None
cls.features_array = None
def test_00_open_receiver(self):
Test_UR_API.handle = api.open()
if self.handle is None:
self.fail("No receiver found")
def test_05_ping_device_zero(self):
ok = api.ping(self.handle, 0)
self.assertIsNotNone(ok, "invalid ping reply")
self.assertFalse(ok, "device zero replied")
def test_10_ping_all_devices(self):
devices = []
for device in range(1, 7):
ok = api.ping(self.handle, device)
self.assertIsNotNone(ok, "invalid ping reply")
if ok:
devices.append(device)
if devices:
Test_UR_API.device = devices[0]
def test_30_get_feature_index(self):
if self.device is None:
self.fail("Found no devices attached.")
fs_index = api.get_feature_index(self.handle, self.device, FEATURE.FEATURE_SET)
self.assertIsNotNone(fs_index, "feature FEATURE_SET not available")
self.assertGreater(fs_index, 0, "invalid FEATURE_SET index: " + str(fs_index))
def test_31_bad_feature(self):
if self.device is None:
self.fail("Found no devices attached.")
reply = api.request(self.handle, self.device, FEATURE.ROOT, params=b'\xFF\xFF')
self.assertIsNotNone(reply, "invalid reply")
self.assertEquals(reply[:5], b'\x00' * 5, "invalid reply")
def test_40_get_device_features(self):
if self.device is None:
self.fail("Found no devices attached.")
features = api.get_device_features(self.handle, self.device)
self.assertIsNotNone(features, "failed to read features array")
self.assertIn(FEATURE.FEATURE_SET, features, "feature FEATURE_SET not available")
# cache this to simplify next tests
Test_UR_API.features_array = features
def test_50_get_device_firmware(self):
if self.device is None:
self.fail("Found no devices attached.")
if self.features_array is None:
self.fail("no feature set available")
d_firmware = api.get_device_firmware(self.handle, self.device, self.features_array)
self.assertIsNotNone(d_firmware, "failed to get device type")
self.assertGreater(len(d_firmware), 0, "empty device type")
def test_52_get_device_type(self):
if self.device is None:
self.fail("Found no devices attached.")
if self.features_array is None:
self.fail("no feature set available")
d_type = api.get_device_type(self.handle, self.device, self.features_array)
self.assertIsNotNone(d_type, "failed to get device type")
self.assertGreater(len(d_type), 0, "empty device type")
def test_55_get_device_name(self):
if self.device is None:
self.fail("Found no devices attached.")
if self.features_array is None:
self.fail("no feature set available")
d_name = api.get_device_name(self.handle, self.device, self.features_array)
self.assertIsNotNone(d_name, "failed to read device name")
self.assertGreater(len(d_name), 0, "empty device name")
def test_60_get_battery_level(self):
if self.device is None:
self.fail("Found no devices attached.")
if self.features_array is None:
self.fail("no feature set available")
try:
battery = api.get_device_battery_level(self.handle, self.device, self.features_array)
self.assertIsNotNone(battery, "failed to read battery level")
except FeatureNotSupported:
self.fail("BATTERY feature not supported by device " + str(self.device))
if __name__ == '__main__':
unittest.main()

View File

@ -0,0 +1,40 @@
#
# Optional hook for unhandled data packets received while talking to the UR.
# These are usually broadcast events received from the attached devices.
#
import logging
_l = logging.getLogger('logitech.unifying_receiver.unhandled')
def _logging_unhandled_hook(reply_code, device, data):
"""Default unhandled hook, logs the reply as DEBUG."""
_l.debug("UNHANDLED (,%d) code 0x%02x data [%s]", device, reply_code, data.encode('hex'))
_unhandled_hook = _logging_unhandled_hook
def _publish(reply_code, device, data):
"""Delivers a reply to the unhandled hook, if any."""
if _unhandled_hook is not None:
_unhandled_hook.__call__(reply_code, device, data)
def set_unhandled_hook(hook=None):
"""Sets the function that will be called on unhandled incoming events.
The hook must be a function with the signature: ``_(int, int, str)``, where
the parameters are: (reply code, device number, data).
This hook will only be called by the request() function, when it receives
replies that do not match the requested feature call. As such, it is not
suitable for intercepting broadcast events from the device (e.g. special
keys being pressed, battery charge events, etc), at least not in a timely
manner. However, these events *may* be delivered here if they happen while
doing a feature call to the device.
The default implementation logs the unhandled reply as DEBUG.
"""
global _unhandled_hook
_unhandled_hook = hook

View File

@ -1,112 +0,0 @@
import unittest
import logging
logging.root.addHandler(logging.FileHandler('test.log', mode='w'))
logging.root.setLevel(1)
from . import ur_lowlevel as urll
class TestLUR(unittest.TestCase):
@classmethod
def setUpClass(cls):
cls.handle = urll.open()
cls.device = None
cls.features_array = None
@classmethod
def tearDownClass(cls):
cls.device = None
cls.features_array = None
if cls.handle:
urll.close(cls.handle)
def setUp(self):
if self.handle is None:
self.skipTest("Logitech Unifying Receiver not found")
def first_device(self):
if TestLUR.device is None:
for device in range(1, 7):
ok = urll.ping(self.handle, device)
self.assertIsNotNone(ok, "invalid ping reply")
if ok:
TestLUR.device = device
return device
self.skipTest("No attached device found")
else:
return TestLUR.device
def test_00_ping_device_zero(self):
ok = urll.ping(self.handle, 0)
self.assertIsNotNone(ok, "invalid ping reply")
self.assertFalse(ok, "device zero replied")
def test_10_ping_all_devices(self):
devices = []
for device in range(1, 7):
ok = urll.ping(self.handle, device)
self.assertIsNotNone(ok, "invalid ping reply")
if ok:
devices.append(device)
# if devices:
# print "found", len(devices), "device(s)", devices
# else:
# print "no devices found"
def test_30_root_feature(self):
device = self.first_device()
fs_index = urll.get_feature_index(self.handle, device, urll.FEATURE.FEATURE_SET)
self.assertIsNotNone(fs_index, "feature FEATURE_SET not available")
self.assertGreater(fs_index, 0, "invalid FEATURE_SET index: " + str(fs_index))
def test_31_bad_feature(self):
device = self.first_device()
reply = urll._request(self.handle, device, urll.FEATURE.ROOT, b'\xFF\xFF')
self.assertIsNotNone(reply, "invalid reply")
self.assertEquals(reply[:5], b'\x00' * 5, "invalid reply")
def test_40_features(self):
device = self.first_device()
features = urll.get_device_features(self.handle, device)
self.assertIsNotNone(features, "failed to read features array")
self.assertIn(urll.FEATURE.FEATURE_SET, features, "feature FEATURE_SET not available")
# cache this to simplify next tests
TestLUR.features_array = features
def test_50_device_type(self):
device = self.first_device()
if not TestLUR.features_array:
self.skipTest("no feature set available")
d_type = urll.request(self.handle, device, urll.FEATURE.NAME, function=b'\x20', features_array=TestLUR.features_array)
self.assertIsNotNone(d_type, "no device type for " + str(device))
d_type = ord(d_type[0])
self.assertGreaterEqual(d_type, 0, "negative device type " + str(d_type))
self.assertLess(d_type, len(urll.DEVICE_TYPES[d_type]), "unknown device type " + str(d_type))
print "device", device, "type", urll.DEVICE_TYPES[d_type],
def test_55_device_name(self):
device = self.first_device()
if not TestLUR.features_array:
self.skipTest("no feature set available")
d_name_length = urll.request(self.handle, device, urll.FEATURE.NAME, features_array=TestLUR.features_array)
self.assertIsNotNone(d_name_length, "no device name length for " + str(device))
self.assertTrue(d_name_length > 0, "zero device name length for " + str(device))
d_name_length = ord(d_name_length[0])
d_name = ''
while len(d_name) < d_name_length:
name_index = len(d_name)
name_fragment = urll.request(self.handle, device, urll.FEATURE.NAME, function=b'\x10', data=chr(name_index), features_array=TestLUR.features_array)
self.assertIsNotNone(name_fragment, "no device name fragment " + str(device) + " @" + str(name_index))
name_fragment = name_fragment[:d_name_length - len(d_name)]
self.assertNotEqual(name_fragment[0], b'\x00', "empty fragment " + str(device) + " @" + str(name_index))
d_name += name_fragment
self.assertEquals(len(d_name), d_name_length)
print "device", device, "name", d_name,
if __name__ == '__main__':
unittest.main()

View File

@ -1,5 +0,0 @@
#!/bin/sh
cd `dirname "$0"`
rm -f test.log
python -m unittest discover -v -p '*_test.py'

10
unittest.sh Executable file
View File

@ -0,0 +1,10 @@
#!/bin/sh
cd `dirname "$0"`
export LD_LIBRARY_PATH=$PWD/lib
export PYTHONPATH=$PWD/lib
export PYTHONDONTWRITEBYTECODE=true
export PYTHONWARNINGS=all
python -m unittest discover -v "$@"