updated lib and application to work both python 2 and 3

This commit is contained in:
Daniel Pavel 2012-09-27 20:35:59 +03:00
parent 216c9eafe7
commit 4da3c09949
15 changed files with 121 additions and 1216 deletions

View File

@ -20,6 +20,7 @@ __version__ = '0.2-hidapi-0.7.0'
import ctypes as _C
import struct
#
@ -81,13 +82,13 @@ del namedtuple
# create a DeviceInfo tuple from a hid_device object
def _makeDeviceInfo(native_device_info):
return DeviceInfo(
path=str(native_device_info.path),
path=native_device_info.path,
vendor_id=hex(native_device_info.vendor_id)[2:],
product_id=hex(native_device_info.product_id)[2:],
serial=str(native_device_info.serial) if native_device_info.serial else None,
serial=native_device_info.serial if native_device_info.serial else None,
release=hex(native_device_info.release)[2:],
manufacturer=str(native_device_info.manufacturer),
product=str(native_device_info.product),
manufacturer=native_device_info.manufacturer,
product=native_device_info.product,
interface=native_device_info.interface)
@ -307,7 +308,7 @@ def send_feature_report(device_handle, data, report_number=None):
:returns: ``True`` if the report was successfully written to the device.
"""
if report_number is not None:
data = chr(report_number) + data
data = struct.pack('!B', report_number) + data
bytes_written = _native.hid_send_feature_report(device_handle, _C.c_char_p(data), len(data))
return bytes_written > -1
@ -323,7 +324,7 @@ def get_feature_report(device_handle, bytes_count, report_number=None):
"""
out_buffer = _C.create_string_buffer('\x00' * (bytes_count + 2))
if report_number is not None:
out_buffer[0] = chr(report_number)
out_buffer[0] = struct.pack('!B', report_number)
bytes_read = _native.hid_get_feature_report(device_handle, out_buffer, bytes_count)
if bytes_read > -1:
return out_buffer[:bytes_read]

View File

@ -3,6 +3,7 @@
#
import logging
import struct
from ..unifying_receiver import api as _api
from .constants import *
@ -32,7 +33,7 @@ _LIGHTING_LIMITS = (450, 310, 190, -1)
def _charge_status(data):
charge = ord(data[2])
charge, lux = struct.unpack('!BH', data[2:5])
for i in range(0, len(_CHARGE_LIMITS)):
if charge >= _CHARGE_LIMITS[i]:
@ -40,7 +41,6 @@ def _charge_status(data):
break
text = 'Charge %d%% (%s)' % (charge, _STATUS_NAMES[charge_index])
lux = (ord(data[3]) << 8) + ord(data[4])
if lux > 0:
for i in range(0, len(_CHARGE_LIMITS)):
if lux > _LIGHTING_LIMITS[i]:

View File

@ -3,6 +3,8 @@
#
import logging
import struct
from binascii import hexlify
from .constants import *
from .exceptions import *
@ -95,10 +97,10 @@ def request(handle, device, feature, function=b'\x00', params=b'', features_arra
_l.log(_LOG_LEVEL, "(%d,%d) no features array available", handle, device)
return None
if feature in features_array:
feature_index = chr(features_array.index(feature))
feature_index = struct.pack('!B', features_array.index(feature))
if feature_index is None:
_l.warn("(%d,%d) feature <%s:%s> not supported", handle, device, feature.encode('hex'), FEATURE_NAME(feature))
_l.warn("(%d,%d) feature <%s:%s> not supported", handle, device, hexlify(feature), FEATURE_NAME(feature))
raise FeatureNotSupported(device, feature)
return base.request(handle, device, feature_index + function, params)
@ -121,13 +123,13 @@ def ping(handle, device):
if reply_device != device:
# oops
_l.log(_LOG_LEVEL, "(%d,%d) ping: reply for another device %d: %s", handle, device, reply_device, reply_data.encode('hex'))
_l.log(_LOG_LEVEL, "(%d,%d) ping: reply for another device %d: %s", handle, device, reply_device, hexlify(reply_data))
_unhandled_publish(reply_code, reply_device, reply_data)
return _status(base.read(handle))
if (reply_code == 0x11 and reply_data[:2] == b'\x00\x10' and reply_data[4] == ping_marker):
if (reply_code == 0x11 and reply_data[:2] == b'\x00\x10' and reply_data[4:5] == ping_marker):
# ping ok
_l.log(_LOG_LEVEL, "(%d,%d) ping: ok [%s]", handle, device, reply_data.encode('hex'))
_l.log(_LOG_LEVEL, "(%d,%d) ping: ok [%s]", handle, device, hexlify(reply_data))
return True
if (reply_code == 0x10 and reply_data[:2] == b'\x8F\x00'):
@ -136,15 +138,15 @@ def ping(handle, device):
return False
if (reply_code == 0x11 and reply_data[:2] == b'\x09\x00' and len(reply_data) == 18 and reply_data[7:11] == b'GOOD'):
# some devices may reply with a SOLAR_STATUS event before the
# some devices may reply with a SOLAR_CHARGE event before the
# ping_ok reply, especially right after the device connected to the
# receiver
_l.log(_LOG_LEVEL, "(%d,%d) ping: solar status %s", handle, device, reply_data.encode('hex'))
_l.log(_LOG_LEVEL, "(%d,%d) ping: solar status %s", handle, device, hexlify(reply_data))
_unhandled_publish(reply_code, reply_device, reply_data)
return _status(base.read(handle))
# ugh
_l.log(_LOG_LEVEL, "(%d,%d) ping: unknown reply for this device", handle, device, reply)
_l.log(_LOG_LEVEL, "(%d,%d) ping: unknown reply for this device: %d=[%s]", handle, device, reply[0], hexlify(reply[2]))
_unhandled_publish(reply_code, reply_device, reply_data)
return None
@ -209,30 +211,30 @@ def get_feature_index(handle, device, feature):
:returns: An int, or ``None`` if the feature is not available.
"""
_l.log(_LOG_LEVEL, "(%d,%d) get feature index <%s:%s>", handle, device, feature.encode('hex'), FEATURE_NAME(feature))
_l.log(_LOG_LEVEL, "(%d,%d) get feature index <%s:%s>", handle, device, hexlify(feature), FEATURE_NAME(feature))
if len(feature) != 2:
raise ValueError("invalid feature <%s>: it must be a two-byte string" % (feature.encode(hex)))
raise ValueError("invalid feature <%s>: it must be a two-byte string" % feature)
# FEATURE.ROOT should always be available for any attached devices
reply = base.request(handle, device, FEATURE.ROOT, feature)
if reply:
# only consider active and supported features
feature_index = ord(reply[0])
feature_index = ord(reply[0:1])
if feature_index:
feature_flags = ord(reply[1]) & 0xE0
_l.log(_LOG_LEVEL, "(%d,%d) feature <%s:%s> has index %d flags %02x", handle, device, feature.encode('hex'), FEATURE_NAME(feature), feature_index, feature_flags)
feature_flags = ord(reply[1:2]) & 0xE0
_l.log(_LOG_LEVEL, "(%d,%d) feature <%s:%s> has index %d flags %02x", handle, device, hexlify(feature), FEATURE_NAME(feature), feature_index, feature_flags)
if feature_flags == 0:
return feature_index
if feature_flags & 0x80:
_l.warn("(%d,%d) feature <%s:%s> not supported: obsolete", handle, device, feature.encode('hex'), FEATURE_NAME(feature))
_l.warn("(%d,%d) feature <%s:%s> not supported: obsolete", handle, device, hexlify(feature), FEATURE_NAME(feature))
if feature_flags & 0x40:
_l.warn("(%d,%d) feature <%s:%s> not supported: hidden", handle, device, feature.encode('hex'), FEATURE_NAME(feature))
_l.warn("(%d,%d) feature <%s:%s> not supported: hidden", handle, device, hexlify(feature), FEATURE_NAME(feature))
if feature_flags & 0x20:
_l.warn("(%d,%d) feature <%s:%s> not supported: Logitech internal", handle, device, feature.encode('hex'), FEATURE_NAME(feature))
_l.warn("(%d,%d) feature <%s:%s> not supported: Logitech internal", handle, device, hexlify(feature), FEATURE_NAME(feature))
raise FeatureNotSupported(device, feature)
else:
_l.warn("(%d,%d) feature <%s:%s> not supported by the device", handle, device, feature.encode('hex'), FEATURE_NAME(feature))
_l.warn("(%d,%d) feature <%s:%s> not supported by the device", handle, device, hexlify(feature), FEATURE_NAME(feature))
raise FeatureNotSupported(device, feature)
@ -250,7 +252,7 @@ def get_device_features(handle, device):
if not fs_index:
_l.warn("(%d,%d) FEATURE_SET not available", handle, device)
return None
fs_index = fs_index[0]
fs_index = fs_index[:1]
# For debugging purposes, query all the available features on the device,
# even if unknown.
@ -263,18 +265,18 @@ def get_device_features(handle, device):
_l.log(_LOG_LEVEL, "(%d,%d) no features available?!", handle, device)
return None
features_count = ord(features_count[0])
features_count = ord(features_count[:1])
_l.log(_LOG_LEVEL, "(%d,%d) found %d features", handle, device, features_count)
# a device may have a maximum of 15 features, other than FEATURE.ROOT
features = [None] * 0x10
for index in range(1, 1 + features_count):
# for each index, get the feature residing at that index
feature = base.request(handle, device, fs_index + b'\x10', chr(index))
feature = base.request(handle, device, fs_index + b'\x10', struct.pack('!B', index))
if feature:
feature = feature[0:2].upper()
features[index] = feature
_l.log(_LOG_LEVEL, "(%d,%d) feature <%s:%s> at index %d", handle, device, feature.encode('hex'), FEATURE_NAME(feature), index)
_l.log(_LOG_LEVEL, "(%d,%d) feature <%s:%s> at index %d", handle, device, hexlify(feature), FEATURE_NAME(feature), index)
return None if all(c == None for c in features) else features
@ -282,34 +284,35 @@ def get_device_features(handle, device):
def get_device_firmware(handle, device, features_array=None):
"""Reads a device's firmware info.
Returns an list of tuples [ (firmware_type, firmware_version, ...), ... ],
ordered by firmware layer.
:returns: a list of FirmwareInfo tuples, ordered by firmware layer.
"""
fw_count = request(handle, device, FEATURE.FIRMWARE, features_array=features_array)
if fw_count:
fw_count = ord(fw_count[0])
fw_count = ord(fw_count[:1])
fw = []
for index in range(0, fw_count):
fw_info = request(handle, device, FEATURE.FIRMWARE, function=b'\x10', params=chr(index), features_array=features_array)
index = struct.pack('!B', index)
fw_info = request(handle, device, FEATURE.FIRMWARE, function=b'\x10', params=index, features_array=features_array)
if fw_info:
fw_level = ord(fw_info[0]) & 0x0F
fw_level = ord(fw_info[:1]) & 0x0F
if fw_level == 0 or fw_level == 1:
fw_type = FIRMWARE_TYPES[fw_level]
name = str(fw_info[1:4])
version = ( str((ord(fw_info[4]) & 0xF0) >> 4) +
str(ord(fw_info[4]) & 0x0F) +
name, = struct.unpack('!3s', fw_info[1:4])
name = name.decode('ascii')
version = ( chr(0x30 + (ord(fw_info[4:5]) >> 4)) +
chr(0x30 + (ord(fw_info[4:5]) & 0x0F)) +
'.' +
str((ord(fw_info[5]) & 0xF0) >> 4) +
str(ord(fw_info[5]) & 0x0F))
build = (ord(fw_info[6]) << 8) + ord(fw_info[7])
extras = fw_info[9:].rstrip('\x00')
chr(0x30 + (ord(fw_info[5:6]) >> 4)) +
chr(0x30 + (ord(fw_info[5:6]) & 0x0F)))
build, = struct.unpack('!H', fw_info[6:8])
extras = fw_info[9:].rstrip(b'\x00')
if extras:
fw_info = _makeFirmwareInfo(level=fw_level, type=fw_type, name=name, version=version, build=build, extras=extras)
else:
fw_info = _makeFirmwareInfo(level=fw_level, type=fw_type, name=name, version=version, build=build)
elif fw_level == 2:
fw_info = _makeFirmwareInfo(level=2, type=FIRMWARE_TYPES[2], version=ord(fw_info[1]))
fw_info = _makeFirmwareInfo(level=2, type=FIRMWARE_TYPES[2], version=ord(fw_info[1:2]))
else:
fw_info = _makeFirmwareInfo(level=fw_level, type=FIRMWARE_TYPES[-1])
@ -327,7 +330,7 @@ def get_device_type(handle, device, features_array=None):
"""
d_type = request(handle, device, FEATURE.NAME, function=b'\x20', features_array=features_array)
if d_type:
d_type = ord(d_type[0])
d_type = ord(d_type[:1])
_l.log(_LOG_LEVEL, "(%d,%d) device type %d = %s", handle, device, d_type, DEVICE_TYPES[d_type])
return DEVICE_TYPES[d_type]
@ -340,15 +343,16 @@ def get_device_name(handle, device, features_array=None):
"""
name_length = request(handle, device, FEATURE.NAME, features_array=features_array)
if name_length:
name_length = ord(name_length[0])
name_length = ord(name_length[:1])
d_name = ''
d_name = b''
while len(d_name) < name_length:
name_index = len(d_name)
name_fragment = request(handle, device, FEATURE.NAME, function=b'\x10', params=chr(name_index), features_array=features_array)
name_index = struct.pack('!B', len(d_name))
name_fragment = request(handle, device, FEATURE.NAME, function=b'\x10', params=name_index, features_array=features_array)
name_fragment = name_fragment[:name_length - len(d_name)]
d_name += name_fragment
d_name = d_name.decode('ascii')
_l.log(_LOG_LEVEL, "(%d,%d) device name %s", handle, device, d_name)
return d_name
@ -360,8 +364,6 @@ def get_device_battery_level(handle, device, features_array=None):
"""
battery = request(handle, device, FEATURE.BATTERY, features_array=features_array)
if battery:
discharge = ord(battery[0])
dischargeNext = ord(battery[1])
status = ord(battery[2])
discharge, dischargeNext, status = struct.unpack('!BBB', battery[:3])
_l.log(_LOG_LEVEL, "(%d:%d) battery %d%% charged, next level %d%% charge, status %d = %s", discharge, dischargeNext, status, BATTERY_STATUSES[status])
return (discharge, dischargeNext, status)

View File

@ -4,6 +4,8 @@
#
import logging
import struct
from binascii import hexlify
from .constants import *
from .exceptions import *
@ -50,6 +52,7 @@ MAX_ATTACHED_DEVICES = 6
#
#
def list_receiver_devices():
"""List all the Linux devices exposed by the UR attached to the machine."""
# (Vendor ID, Product ID) = ('Logitech', 'Unifying Receiver')
@ -93,9 +96,9 @@ def try_open(path):
# any other replies are ignored, and will assume this is the wrong receiver device
if reply == b'\x01\x00\x00\x00\x00\x00\x00\x00':
# no idea what this is, but it comes up occasionally
_l.log(_LOG_LEVEL, "[%s] (%d,) mistery reply [%s]", path, receiver_handle, reply.encode('hex'))
_l.log(_LOG_LEVEL, "[%s] (%d,) mistery reply [%s]", path, receiver_handle, hexlify(reply))
else:
_l.log(_LOG_LEVEL, "[%s] (%d,) unknown reply [%s]", path, receiver_handle, reply.encode('hex'))
_l.log(_LOG_LEVEL, "[%s] (%d,) unknown reply [%s]", path, receiver_handle, hexlify(reply))
else:
_l.log(_LOG_LEVEL, "[%s] (%d,) no reply", path, receiver_handle)
@ -143,12 +146,16 @@ def write(handle, device, data):
been physically removed from the machine, or the kernel driver has been
unloaded. The handle will be closed automatically.
"""
wdata = b'\x10' + chr(device) + data + b'\x00' * (5 - len(data))
_l.log(_LOG_LEVEL, "(%d,%d) <= w[%s]", handle, device, wdata.encode('hex'))
if len(data) < _MIN_CALL_SIZE - 2:
data += b'\x00' * (_MIN_CALL_SIZE - 2 - len(data))
elif len(data) > _MIN_CALL_SIZE - 2:
data += b'\x00' * (_MAX_CALL_SIZE - 2 - len(data))
wdata = struct.pack('!BB', 0x10, device) + data
_l.log(_LOG_LEVEL, "(%d,%d) <= w[%s]", handle, device, hexlify(wdata))
if len(wdata) < _MIN_CALL_SIZE:
_l.warn("(%d:%d) <= w[%s] call packet too short: %d bytes", handle, device, wdata.encode('hex'), len(wdata))
_l.warn("(%d:%d) <= w[%s] call packet too short: %d bytes", handle, device, hexlify(wdata), len(wdata))
if len(wdata) > _MAX_CALL_SIZE:
_l.warn("(%d:%d) <= w[%s] call packet too long: %d bytes", handle, device, wdata.encode('hex'), len(wdata))
_l.warn("(%d:%d) <= w[%s] call packet too long: %d bytes", handle, device, hexlify(wdata), len(wdata))
if not _hid.write(handle, wdata):
_l.warn("(%d,%d) write failed, assuming receiver no longer available", handle, device)
close(handle)
@ -178,12 +185,13 @@ def read(handle, timeout=DEFAULT_TIMEOUT):
raise NoReceiver
if data:
_l.log(_LOG_LEVEL, "(%d,*) => r[%s]", handle, data.encode('hex'))
_l.log(_LOG_LEVEL, "(%d,*) => r[%s]", handle, hexlify(data))
if len(data) < _MIN_REPLY_SIZE:
_l.warn("(%d,*) => r[%s] read packet too short: %d bytes", handle, data.encode('hex'), len(data))
_l.warn("(%d,*) => r[%s] read packet too short: %d bytes", handle, hexlify(data), len(data))
if len(data) > _MAX_REPLY_SIZE:
_l.warn("(%d,*) => r[%s] read packet too long: %d bytes", handle, data.encode('hex'), len(data))
return ord(data[0]), ord(data[1]), data[2:]
_l.warn("(%d,*) => r[%s] read packet too long: %d bytes", handle, hexlify(data), len(data))
code, device = struct.unpack('!BB', data[:2])
return code, device, data[2:]
_l.log(_LOG_LEVEL, "(%d,*) => r[]", handle)
@ -204,9 +212,9 @@ def request(handle, device, feature_index_function, params=b'', features_array=N
available.
:raisees FeatureCallError: if the feature call replied with an error.
"""
_l.log(_LOG_LEVEL, "(%d,%d) request {%s} params [%s]", handle, device, feature_index_function.encode('hex'), params.encode('hex'))
_l.log(_LOG_LEVEL, "(%d,%d) request {%s} params [%s]", handle, device, hexlify(feature_index_function), hexlify(params))
if len(feature_index_function) != 2:
raise ValueError('invalid feature_index_function {%s}: it must be a two-byte string' % feature_index_function.encode('hex'))
raise ValueError('invalid feature_index_function {%s}: it must be a two-byte string' % hexlify(feature_index_function))
write(handle, device, feature_index_function + params)
while True:
@ -220,35 +228,35 @@ def request(handle, device, feature_index_function, params=b'', features_array=N
if reply_device != device:
# this message not for the device we're interested in
_l.log(_LOG_LEVEL, "(%d,%d) request got reply for unexpected device %d: [%s]", handle, device, reply_device, reply_data.encode('hex'))
_l.log(_LOG_LEVEL, "(%d,%d) request got reply for unexpected device %d: [%s]", handle, device, hexlify(reply_device), hexlify(reply_data))
# worst case scenario, this is a reply for a concurrent request
# on this receiver
_unhandled._publish(reply_code, reply_device, reply_data)
continue
if reply_code == 0x10 and reply_data[0] == b'\x8F' and reply_data[1:2] == feature_index_function:
if reply_code == 0x10 and reply_data[:1] == b'\x8F' and reply_data[1:2] == feature_index_function:
# device not present
_l.log(_LOG_LEVEL, "(%d,%d) request ping failed on {%s} call: [%s]", handle, device, feature_index_function.encode('hex'), reply_data.encode('hex'))
_l.log(_LOG_LEVEL, "(%d,%d) request ping failed on {%s} call: [%s]", handle, device, hexlify(feature_index_function), hexlify(reply_data))
return None
if reply_code == 0x10 and reply_data[0] == b'\x8F':
if reply_code == 0x10 and reply_data[:1] == b'\x8F':
# device not present
_l.log(_LOG_LEVEL, "(%d,%d) request ping failed: [%s]", handle, device, reply_data.encode('hex'))
_l.log(_LOG_LEVEL, "(%d,%d) request ping failed: [%s]", handle, device, hexlify(reply_data))
return None
if reply_code == 0x11 and reply_data[0] == b'\xFF' and reply_data[1:3] == feature_index_function:
# an error returned from the device
error_code = ord(reply_data[3])
_l.warn("(%d,%d) request feature call error %d = %s: %s", handle, device, error_code, ERROR_NAME(error_code), reply_data.encode('hex'))
feature_index = ord(feature_index_function[0])
feature_function = feature_index_function[1].encode('hex')
_l.warn("(%d,%d) request feature call error %d = %s: %s", handle, device, error_code, ERROR_NAME(error_code), hexlify(reply_data))
feature_index = ord(feature_index_function[:1])
feature_function = feature_index_function[1:2]
feature = None if features_array is None else features_array[feature_index]
raise FeatureCallError(device, feature, feature_index, feature_function, error_code, reply_data)
if reply_code == 0x11 and reply_data[:2] == feature_index_function:
# a matching reply
_l.log(_LOG_LEVEL, "(%d,%d) matched reply with data [%s]", handle, device, reply_data[2:].encode('hex'))
_l.log(_LOG_LEVEL, "(%d,%d) matched reply with data [%s]", handle, device, hexlify(reply_data[2:]))
return reply_data[2:]
_l.log(_LOG_LEVEL, "(%d,%d) unmatched reply {%s} (expected {%s})", handle, device, reply_data[:2].encode('hex'), feature_index_function.encode('hex'))
_l.log(_LOG_LEVEL, "(%d,%d) unmatched reply {%s} (expected {%s})", handle, device, hexlify(reply_data[:2]), hexlify(feature_index_function))
_unhandled._publish(reply_code, reply_device, reply_data)

View File

@ -43,7 +43,7 @@ def FEATURE_NAME(feature_code):
return None
if feature_code in _FEATURE_NAMES:
return _FEATURE_NAMES[feature_code]
return 'UNKNOWN_' + feature_code.encode('hex')
return 'UNKNOWN_%s' % feature_code
"""Possible types of devices connected to an UR."""

View File

@ -5,6 +5,7 @@
import logging
import threading
from time import sleep
from binascii import hexlify
from . import base
from .exceptions import *
@ -31,7 +32,7 @@ class EventsListener(threading.Thread):
API calls in the events callback.
"""
def __init__(self, receiver, events_callback):
super(EventsListener, self).__init__(name='Unifying_Receiver_Listener_' + str(receiver))
super(EventsListener, self).__init__(name='Unifying_Receiver_Listener_' + hex(receiver))
self.daemon = True
self.receiver = receiver
@ -87,7 +88,7 @@ class EventsListener(threading.Thread):
self.task = self.task_reply = None
self.task_processing.release()
# _l.log(_LOG_LEVEL, "(%d) request '%s' => [%s]", self.receiver, api_function.__name__, reply.encode('hex'))
# _l.log(_LOG_LEVEL, "(%d) request '%s' => [%s]", self.receiver, api_function.__name__, hexlify(reply))
if isinstance(reply, Exception):
raise reply
return reply

View File

@ -3,6 +3,7 @@
#
import unittest
import struct
from logitech.unifying_receiver import constants
@ -12,23 +13,23 @@ class Test_UR_Constants(unittest.TestCase):
def test_10_feature_names(self):
self.assertIsNone(constants.FEATURE_NAME(None))
for code in range(0x0000, 0x10000):
feature = chr((code & 0xFF00) >> 8) + chr(code & 0x00FF)
feature = struct.pack('!H', code)
name = constants.FEATURE_NAME(feature)
self.assertIsNotNone(name)
if name.startswith('UNKNOWN_'):
self.assertEquals(code, int(name[8:], 16))
self.assertEqual(code, struct.unpack('!H', feature)[0])
else:
self.assertTrue(hasattr(constants.FEATURE, name))
self.assertEquals(feature, getattr(constants.FEATURE, name))
self.assertEqual(feature, getattr(constants.FEATURE, name))
def test_20_error_names(self):
for code in range(0x00, 0x100):
name = constants.ERROR_NAME(code)
self.assertIsNotNone(name)
if code > 9:
self.assertEquals(name, 'Unknown Error')
self.assertEqual(name, 'Unknown Error')
else:
self.assertEquals(code, constants._ERROR_NAMES.index(name))
self.assertEqual(code, constants._ERROR_NAMES.index(name))
if __name__ == '__main__':
unittest.main()

View File

@ -3,6 +3,7 @@
#
import unittest
from binascii import hexlify
from logitech.unifying_receiver import base
from logitech.unifying_receiver.exceptions import *
@ -63,11 +64,11 @@ class Test_UR_Base(unittest.TestCase):
self.assertIsInstance(reply, tuple, "read should have returned a tuple")
reply_code, reply_device, reply_data = reply
self.assertEquals(reply_device, 0, "got ping reply for valid device")
self.assertGreater(len(reply_data), 4, "ping reply has wrong length: " + reply_data.encode('hex'))
self.assertEqual(reply_device, 0, "got ping reply for valid device")
self.assertGreater(len(reply_data), 4, "ping reply has wrong length: %s" % hexlify(reply_data))
if reply_code == 0x10:
# ping fail
self.assertEquals(reply_data[:3], b'\x8F\x00\x10', "0x10 reply with unknown reply data: " + reply_data.encode('hex'))
self.assertEqual(reply_data[:3], b'\x8F\x00\x10', "0x10 reply with unknown reply data: %s" % hexlify(reply_data))
elif reply_code == 0x11:
self.fail("Got valid ping from device 0")
else:
@ -87,15 +88,15 @@ class Test_UR_Base(unittest.TestCase):
self.assertIsInstance(reply, tuple, "read should have returned a tuple")
reply_code, reply_device, reply_data = reply
self.assertEquals(reply_device, device, "ping reply for wrong device")
self.assertGreater(len(reply_data), 4, "ping reply has wrong length: " + reply_data.encode('hex'))
self.assertEqual(reply_device, device, "ping reply for wrong device")
self.assertGreater(len(reply_data), 4, "ping reply has wrong length: %s" % hexlify(reply_data))
if reply_code == 0x10:
# ping fail
self.assertEquals(reply_data[:3], b'\x8F\x00\x10', "0x10 reply with unknown reply data: " + reply_data.encode('hex'))
self.assertEqual(reply_data[:3], b'\x8F\x00\x10', "0x10 reply with unknown reply data: %s" % hexlify(reply_data))
elif reply_code == 0x11:
# ping ok
self.assertEquals(reply_data[:2], b'\x00\x10', "0x11 reply with unknown reply data: " + reply_data.encode('hex'))
self.assertEquals(reply_data[4], b'\xAA')
self.assertEqual(reply_data[:2], b'\x00\x10', "0x11 reply with unknown reply data: %s" % hexlify(reply_data))
self.assertEqual(reply_data[4:5], b'\xAA')
devices.append(device)
else:
self.fail("ping got bad reply code: " + reply)
@ -119,7 +120,7 @@ class Test_UR_Base(unittest.TestCase):
reply = base.request(self.handle, self.device, FEATURE.ROOT)
self.assertIsNotNone(reply, "request returned None reply")
self.assertEquals(reply[:2], b'\x00\x00', "request returned for wrong feature id")
self.assertEqual(reply[:2], b'\x00\x00', "request returned for wrong feature id")
def test_55_request_root_feature_set(self):
if self.handle is None:
@ -129,8 +130,8 @@ class Test_UR_Base(unittest.TestCase):
reply = base.request(self.handle, self.device, FEATURE.ROOT, FEATURE.FEATURE_SET)
self.assertIsNotNone(reply, "request returned None reply")
index = ord(reply[0])
self.assertGreater(index, 0, "FEATURE_SET not available on device " + str(self.device))
index = reply[:1]
self.assertGreater(index, b'\x00', "FEATURE_SET not available on device " + str(self.device))
def test_57_request_ignore_undhandled(self):
if self.handle is None:
@ -140,8 +141,8 @@ class Test_UR_Base(unittest.TestCase):
fs_index = base.request(self.handle, self.device, FEATURE.ROOT, FEATURE.FEATURE_SET)
self.assertIsNotNone(fs_index)
fs_index = fs_index[0]
self.assertGreater(fs_index, 0)
fs_index = fs_index[:1]
self.assertGreater(fs_index, b'\x00')
global received_unhandled
received_unhandled = None
@ -160,7 +161,7 @@ class Test_UR_Base(unittest.TestCase):
base.write(self.handle, self.device, FEATURE.ROOT + FEATURE.FEATURE_SET)
reply = base.request(self.handle, self.device, fs_index + b'\x00')
self.assertIsNotNone(reply, "request returned None reply")
self.assertNotEquals(reply[0], b'\x00')
self.assertNotEquals(reply[:1], b'\x00')
# self.assertIsNotNone(received_unhandled, "extra message not received by unhandled hook")
received_unhandled = None
@ -168,7 +169,7 @@ class Test_UR_Base(unittest.TestCase):
base.write(self.handle, self.device, FEATURE.ROOT + FEATURE.FEATURE_SET)
reply = base.request(self.handle, self.device, fs_index + b'\x00')
self.assertIsNotNone(reply, "request returned None reply")
self.assertNotEquals(reply[0], b'\x00')
self.assertNotEquals(reply[:1], b'\x00')
self.assertIsNone(received_unhandled)
del received_unhandled

View File

@ -71,7 +71,7 @@ class Test_UR_API(unittest.TestCase):
reply = api.request(self.handle, self.device, FEATURE.ROOT, params=b'\xFF\xFF')
self.assertIsNotNone(reply, "invalid reply")
self.assertEquals(reply[:5], b'\x00' * 5, "invalid reply")
self.assertEqual(reply[:5], b'\x00' * 5, "invalid reply")
def test_40_get_device_features(self):
if self.handle is None:
@ -171,7 +171,7 @@ class Test_UR_API(unittest.TestCase):
all_devices = api.list_devices(self.handle)
for device_info in all_devices:
device = api.find_device_by_name(self.handle, device_info.name)
self.assertEquals(device, device_info)
self.assertEqual(device, device_info)
if __name__ == '__main__':
unittest.main()

View File

@ -4,12 +4,14 @@
#
import logging
from binascii import hexlify
_l = logging.getLogger('logitech.unifying_receiver.unhandled')
def _logging_unhandled_hook(reply_code, device, data):
"""Default unhandled hook, logs the reply as DEBUG."""
_l.debug("UNHANDLED (,%d) code 0x%02x data [%s]", device, reply_code, data.encode('hex'))
_l.debug("UNHANDLED (,%d) code 0x%02x data [%s]", device, reply_code, hexlify(data))
_unhandled_hook = _logging_unhandled_hook

View File

@ -1,363 +0,0 @@
"""Low-level interface for devices connected through a Logitech Universal
Receiver (UR).
Uses the HID api exposed through hidapi.py.
Incomplete. Based on a bit of documentation, trial-and-error, and guesswork.
In the context of this API, 'device' is the number (1..6 according to the
documentation) of the device attached to the UR.
References:
http://julien.danjou.info/blog/2012/logitech-k750-linux-support
http://6xq.net/git/lars/lshidpp.git/plain/doc/
"""
#
# Logging set-up.
# Add a new logging level for tracing low-level writes and reads.
#
import logging
import threading
from . import hidapi
LOG_LEVEL = 1
def _urll_trace(self, msg, *args):
if self.isEnabledFor(LOG_LEVEL):
args = (None if x is None
else x.encode('hex') if type(x) == str and any(c < '\x20' or c > '\x7E' for c in x)
else x
for x in args)
self.log(LOG_LEVEL, msg, *args)
logging.addLevelName(LOG_LEVEL, 'trace1')
logging.Logger.trace1 = _urll_trace
_log = logging.getLogger('logitech.ur_lowlevel')
_log.setLevel(LOG_LEVEL)
#
#
#
"""Possible features available on a Logitech device.
A particular device might not support all these features, and may support other
unknown features as well.
"""
FEATURE = type('FEATURE', (),
dict(
ROOT=b'\x00\x00',
FEATURE_SET=b'\x00\x01',
FIRMWARE=b'\x00\x03',
NAME=b'\x00\x05',
BATTERY=b'\x10\x00',
REPROGRAMMABLE_KEYS=b'\x1B\x00',
WIRELESS_STATUS=b'\x1D\x4B',
# UNKNOWN_1=b'\x1D\xF3',
# UNKNOWN_2=b'\x40\xA0',
# UNKNOWN_3=b'\x41\x00',
SOLAR_CHARGE=b'\x43\x01',
# UNKNOWN_4=b'\x45\x20',
))
"""Possible types of devices connected to an UR."""
DEVICE_TYPES = ("Keyboard", "Remote Control", "NUMPAD", "Mouse",
"Touchpad", "Trackball", "Presenter", "Receiver")
"""Default timeout on read (in ms)."""
DEFAULT_TIMEOUT = 1000
"""Minimum size of a reply data packet."""
_MIN_REPLY_SIZE = 7
"""Maximum size of a reply data packet."""
_MAX_REPLY_SIZE = 32
class NoReceiver(Exception):
"""May be raised when trying to talk through a previously connected
receiver that is no longer available."""
pass
#
#
#
class Receiver(threading.Thread):
def __init__(self, handle, path, timeout=DEFAULT_TIMEOUT):
super(Receiver, self).__init__(name='Unifying_Receiver_' + path)
self.handle = handle
self.path = path
self.timeout = timeout
self.read_data = None
self.data_available = threading.Event()
self.devices = {}
self.hooks = {}
self.active = True
self.start()
def __del__(self):
self.close()
def close(self):
self.active = False
try:
hidapi.close(self.handle)
_log.trace1("|%s:| closed", self.path)
return True
except Exception as e:
_log.warn("|%s:| closing: %s", self.path, e)
self.hooks = None
self.devices = None
def run(self):
while self.active:
data = hidapi.read(self.handle, _MAX_REPLY_SIZE, self.timeout)
if self.active and data:
_log.trace1("|%s|*| => r[%s]", self.path, data)
if len(data) < _MIN_REPLY_SIZE:
_log.trace1("|%s|*| => r[%s] short read", self.path, data)
if len(data) > _MAX_REPLY_SIZE:
_log.trace1("|%s|*| => r[%s] long read", self.path, data)
if not self._dispatch_to_hooks(data):
self.read_data = data
self.data_available.set()
def _dispatch_to_hooks(self, data):
if data[0] == b'\x11':
for key in self.hooks:
if key == data[1:3]:
self.hooks[key].__call__(data[3:])
return True
def set_hook(self, device, feature_index, function=b'\x00', callback=None):
key = '%c%s%c' % (device, feature_index, function)
if callback is None:
if key in self.hooks:
del self.hooks[key]
else:
self.hooks[key] = callback
return True
def _write(self, device, data):
wdata = b'\x10' + chr(device) + data + b'\x00' * (5 - len(data))
if hidapi.write(self.handle, wdata):
_log.trace1("|%s|%d| <= w[%s]", self.path, device, wdata)
return True
else:
_log.trace1("|%s|%d| <= w[%s] failed ", self.path, device, wdata)
raise NoReceiver()
def _read(self, device, feature_index=None, function=None, timeout=DEFAULT_TIMEOUT):
while True:
self.data_available.wait()
data = self.data
self.data_available.clear()
if data[1] == chr(device):
if feature_index is None or data[2] == feature_index:
if function is None or data[3] == function:
return data
_log.trace1("|%s:| ignoring read data [%s]", self.path, data)
def _request(self, device, feature_index, function=b'\x00', data=b''):
self._write(device, feature_index + function + data)
return self._read(device, feature_index, function)
def _request_direct(self, device, feature_index, function=b'\x00', data=b''):
self._write(device, feature_index + function + data)
while True:
data = hidapi.read(self.handle, _MAX_REPLY_SIZE, self.timeout)
if not data:
continue
if data[1] == chr(device) and data[2] == feature_index and data[3] == function:
return data
def ping(self, device):
"""Pings a device to check if it is attached to the UR.
:returns: True if the device is connected to the UR, False if the device is
not attached, None if no conclusive reply is received.
"""
if self._write(device, b'\x00\x10\x00\x00\xAA'):
while True:
reply = self._read(device, timeout=DEFAULT_TIMEOUT*3)
# ping ok
if reply[0] == b'\0x11' and reply[1] == chr(device):
if reply[2:4] == b'\x00\x10' and reply[6] == b'\xAA':
_log.trace1("|%s|%d| ping: ok %s", self.path, device, reply[2])
return True
# ping failed
if reply[0] == b'\0x10' and reply[1] == chr(device):
if reply[2:4] == b'\x8F\x00':
_log.trace1("|%s|%d| ping: device not present", self.path, device)
return False
_log.trace1("|%s|%d| ping: unknown reply", self.path, device, reply)
def scan_devices(self):
for device in range(1, 7):
self.get_device(device)
return self.devices.values()
def get_device(self, device, query=True):
if device in self.devices:
value = self.devices[device]
_log.trace1("|%s:%d| device info %s", self.path, device, value)
return value
if query and self.ping(device):
d_type = self.get_type(device)
d_name = self.get_name(device)
features_array = self._get_features(device)
value = (d_type, d_name, features_array)
self.devices[device] = value
_log.trace1("|%s:%d| device info %s", self.path, device, value)
return value
_log.trace1("|%s:%d| device not found", self.path, device)
def _get_feature_index(self, device, feature):
"""Reads the index of a device's feature.
:returns: An int, or None if the feature is not available.
"""
_log.trace1("|%s|%d| get feature index <%s>", self.path, device, feature)
reply = self._request(device, b'\x00', b'\x00', feature)
# only consider active and supported features
if ord(reply[4]) and ord(reply[5]) & 0xA0 == 0:
_log.trace1("|%s|%d| feature <%s> has index %s", self.path, device, feature, reply[4])
return ord(reply[4])
_log.trace1("|%s|%d| feature <%s> not available", self.path, device, feature)
def _get_features(self, device):
"""Returns an array of feature ids.
Their position in the array is the index to be used when accessing that
feature on the device.
Only call this function in the initial set-up of the device, because
other messages and events not related to querying the feature set
will be ignored.
"""
_log.trace1("|%s|%d| get device features", self.path, device)
# get the index of the FEATURE_SET
fs_index = self._get_feature_index(device, FEATURE.FEATURE_SET)
fs_index = chr(fs_index)
# Query all the available features on the device, even if unknown.
# get the number of active features the device has
features_count = self._request(device, fs_index)
features_count = ord(features_count[4])
_log.trace1("|%s|%d| found %d features", self.path, device, features_count)
# a device may have a maximum of 15 features
features = [None] * 0x10
for index in range(1, 1 + features_count):
# for each index, get the feature residing at that index
feature = self._request(device, fs_index, b'\x10', chr(index))
features[index] = feature[4:6].upper()
_log.trace1("|%s|%d| feature <%s> at index %d", self.path, device, features[index], index)
return None if all(c is None for c in features) else features
def get_type(self, device):
if device in self.devices:
return self.devices[device][0]
dnt_index = self._get_feature_index(device, FEATURE.NAME)
dnt_index = chr(dnt_index)
d_type = self._request(device, dnt_index, b'\x20')
d_type = ord(d_type[4])
return DEVICE_TYPES[d_type]
def get_name(self, device):
if device in self.devices:
return self.devices[device][1]
dnt_index = self._get_feature_index(device, FEATURE.NAME)
dnt_index = chr(dnt_index)
self._write(device, dnt_index)
name_length = self._read(device, dnt_index, b'\x00')
name_length = ord(name_length[4])
d_name = ''
while len(d_name) < name_length:
name_index = len(d_name)
name_fragment = self._request(device, dnt_index, b'\x10', chr(name_index))
name_fragment = name_fragment[:name_length - len(d_name)]
d_name += name_fragment
return d_name
def open():
"""Opens the first Logitech UR found attached to the machine.
:returns: A Receiver object for the found receiver, or ``None``.
"""
# USB ids for (Logitech, Unifying Receiver)
# interface 2 if the actual receiver interface
for rawdevice in hidapi.enumerate(0x046d, 0xc52b, 2):
_log.trace1("checking %s", rawdevice)
receiver = hidapi.open_path(rawdevice.path)
if not receiver:
# could be a file permissions issue
# in any case, unreachable
_log.trace1("[%s] open failed", rawdevice.path)
continue
_log.trace1("[%s] receiver handle %d", rawdevice.path, receiver)
# ping on device id 0 (always an error)
hidapi.write(receiver, b'\x10\x00\x00\x10\x00\x00\xAA')
# if this is the right hidraw device, we'll receive a 'bad subdevice'
# otherwise, the read should produce nothing
reply = hidapi.read(receiver, _MAX_REPLY_SIZE, DEFAULT_TIMEOUT)
if reply:
_log.trace1("[%s] receiver %d exploratory ping reply [%s]", rawdevice.path, receiver, reply)
if reply[:4] == b'\x10\x00\x8F\x00':
# 'device 0 unreachable' is the expected reply from a valid receiver handle
_log.trace1("[%s] success: found receiver with handle %d", rawdevice.path, receiver)
return Receiver(receiver, rawdevice.path)
if reply == b'\x01\x00\x00\x00\x00\x00\x00\x00':
# no idea what this is, but it comes up occasionally
_log.trace1("[%s] receiver %d mistery reply", rawdevice.path, receiver)
else:
_log.trace1("[%s] receiver %d unknown reply", rawdevice.path, receiver)
else:
_log.trace1("[%s] receiver %d no reply", rawdevice.path, receiver)
pass
# ignore
hidapi.close(receiver)

View File

@ -1,216 +0,0 @@
"""A few functions to deal with the Logitech Universal Receiver.
It is assumed a single UR device is attached to the machine.
Uses hidapi.
"""
import logging
import threading
from . import ur_lowlevel as urll
from urll import FEATURE
_log = logging.getLogger('logitech.ur')
_log.setLevel(logging.DEBUG)
# class NoDevice(Exception):
# """May be thrown when trying to talk through a previously present device
# that is no longer available."""
# pass
class _EventQueue(threading.Thread):
def __init__(self, receiver, timeout=urll.DEFAULT_TIMEOUT):
super(_EventQueue, self).__init__()
self.daemon = True
self.receiver = receiver
self.timeout = timeout
self.active = True
def stop(self):
self.active = False
self.join()
def run(self):
while self.active:
data = urll.read(self.receiver.handle, self.timeout)
if not self.active:
# in case the queue has been stopped while reading
break
if data:
self.receiver._dispatch(*data)
class Receiver:
def __init__(self, path, handle=None, timeout=urll.DEFAULT_TIMEOUT):
self.path = path
self.handle = handle
self.DEVICE_FEATURES = {}
self.hooks = {}
self.event_queue = _EventQueue(self.handle, timeout)
self.event_queue.start()
def close(self):
self.event_queue.stop()
self.event_queue = None
urll.close(self.handle)
self.handle = None
self.hooks = {}
self.DEVICE_FEATURES = {}
def ping(self, device):
reply = self.event_queue.req()
if not urll.write(self.handle, device, '\x00\x10\x00\x00\xAA'):
# print "write failed",
return False
reply = urll.read(self.handle, device)
if not reply:
# print "no data",
return False
# 10018f00100900
if ord(reply[0]) == 0x10:
if ord(reply[2]) == 0x8F:
# print "invalid",
return False
# 110100100200aa00000000000000000000000000
if ord(reply[0]) == 0x11:
if reply[2:4] == "\x00\x10" and reply[6] == "\xAA":
# success
return True
# print "unknown"
return False
def hook(self, device, feature, function=None, callback=None):
features = self.DEVICE_FEATURES[device]
if feature not in features:
raise Exception("feature " + feature + " not supported by device")
feature_index = features.index(feature)
key = (device, feature_index, function, callback)
if key not in self.hooks:
self.hooks[key] = []
if callback is None:
if callback in self.hooks[key]:
self.hooks[key].remove(callback)
else:
self.hooks[key].append(callback)
def _dispatch(self, status, device, data):
_log.debug("incoming event %2x:%2x:%s", status, device, data.encode('hex'))
dispatched = False
for (key, callback) in self.hooks.items():
if key[0] == device and key[1] == ord(data[0]):
if key[2] is not None and key[2] == data[1] & 0xFF:
callback.__call__(data)
if not dispatched:
_log.debug("ignored incoming event %2x:%2x:%s",
status, device, data.encode('hex'))
def _request(self, device, data=''):
if urll.write(self.handler, device, data):
pass
def find_device(self, device_type=None, name=None):
"""Gets the device number for the first device matching.
The device type and name are case-insensitive.
"""
# Apparently a receiver supports up to 6 devices.
for device in range(1, 7):
if self.ping(device):
if device not in self.DEVICE_FEATURES:
self.DEVICE_FEATURES[device] = \
urll.get_device_features(self.handle, device)
# print get_reprogrammable_keys(receiver, device)
# d_firmware = get_firmware_version(receiver, device)
# print "device", device, "[", d_name, "/", d_type, "]"
# print "firmware", d_firmware, "features", _DEVICE_FEATURES[device]
if device_type:
d_type = self.get_type(device)
if d_type is None or device_type.lower() != d_type.lower():
continue
if name:
d_name = self.get_name(device)
if d_name is None or name.lower() != d_name.lower():
continue
return device
def get_type(self, device):
reply = self._request(device, FEATURE.GET_NAME, '\x20')
if reply:
return DEVICE_TYPES[ord(reply[2][2])]
def get_name(self, device):
reply = self._request(device, FEATURE.GET_NAME)
if reply:
charcount = ord(reply[4])
name = ''
index = 0
while len(name) < charcount:
reply = self._request(device, FEATURE.NAME, '\x10', chr(index))
if reply:
name += reply[4:4 + charcount - index]
index = len(name)
else:
break
return name
def get_firmware_version(self, device, firmware_type=0):
reply = self._request(device,
FEATURE.FIRMWARE, '\x10', chr(firmware_type))
if reply:
return '%s %s.%s' % (reply[5:8],
reply[8:10].encode('hex'), reply[10:12].encode('hex'))
def get_battery_level(self, device):
reply = self._request(device, FEATURE.BATTERY)
if reply:
return (ord(reply[4]), ord(reply[5]), ord(reply[6]))
def get_reprogrammable_keys(self, device):
count = self._request(device, FEATURE.REPROGRAMMABLE_KEYS)
if count:
keys = []
for index in range(ord(count[4])):
key = self._request(device,
FEATURE.REPROGRAMMABLE_KEYS, '\x10', chr(index))
keys.append(key[4:6], keys[6:8], ord(key[8]))
return keys
def get_solar_charge(self, device):
reply = self._request(device, FEATURE.SOLAR_CHARGE,
'\x03', '\x78', '\x01', reply_function='\x10')
if reply:
charge = ord(reply[4])
lux = ord(reply[5]) << 8 | ord(reply[6])
# lux = int(round(((255 * ord(reply[5])) + ord(reply[6])) / 538.0, 2) * 100)
return (charge, lux)
#
#
#
def get():
"""Gets a Receiver object for the Unifying Receiver connected to the machine.
It is assumed a single receiver is connected to the machine. If more than
one are present, the first one found will be returned.
:returns: a Receiver object, or None.
"""
receiver = urll.open()
if receiver:
return Receiver(*receiver)

View File

@ -1,533 +0,0 @@
"""Low-level interface for devices connected through a Logitech Universal
Receiver (UR).
Uses the HID api exposed through hidapi.py.
Incomplete. Based on a bit of documentation, trial-and-error, and guesswork.
Strongly recommended to use these functions from a single thread; calling
multiple functions from different threads has a high chance of mixing the
replies and causing apparent failures.
In the context of this API, 'handle' is the open handle of UR attached to
the machine, and 'device' is the number (1..6 according to the documentation)
of the device attached to the UR.
References:
http://julien.danjou.info/blog/2012/logitech-k750-linux-support
http://6xq.net/git/lars/lshidpp.git/plain/doc/
"""
#
# Logging set-up.
# Add a new logging level for tracing low-level writes and reads.
#
import logging
LOG_LEVEL = 1
def _urll_trace(self, msg, *args):
if self.isEnabledFor(LOG_LEVEL):
args = (None if x is None
else x.encode('hex') if type(x) == str and any(c < '\x20' or c > '\x7E' for c in x)
else x
for x in args)
self.log(LOG_LEVEL, msg, *args)
logging.addLevelName(LOG_LEVEL, 'TRACE1')
logging.Logger.trace1 = _urll_trace
_l = logging.getLogger('ur_lowlevel')
_l.setLevel(LOG_LEVEL)
#
#
#
"""Possible features available on a Logitech device.
A particular device might not support all these features, and may support other
unknown features as well.
"""
FEATURE = type('FEATURE', (),
dict(
ROOT=b'\x00\x00',
FEATURE_SET=b'\x00\x01',
FIRMWARE=b'\x00\x03',
NAME=b'\x00\x05',
BATTERY=b'\x10\x00',
REPROGRAMMABLE_KEYS=b'\x1B\x00',
WIRELESS_STATUS=b'\x1D\x4B',
# UNKNOWN_1=b'\x1D\xF3',
# UNKNOWN_2=b'\x40\xA0',
# UNKNOWN_3=b'\x41\x00',
SOLAR_CHARGE=b'\x43\x01',
# UNKNOWN_4=b'\x45\x20',
))
FEATURE_NAMES = { FEATURE.ROOT: 'ROOT',
FEATURE.FEATURE_SET: 'FEATURE_SET',
FEATURE.FIRMWARE: 'FIRMWARE',
FEATURE.NAME: 'NAME',
FEATURE.BATTERY: 'BATTERY',
FEATURE.REPROGRAMMABLE_KEYS: 'REPROGRAMMABLE_KEYS',
FEATURE.WIRELESS_STATUS: 'WIRELESS_STATUS',
FEATURE.SOLAR_CHARGE: 'SOLAR_CHARGE',
}
"""Possible types of devices connected to an UR."""
DEVICE_TYPES = ("Keyboard", "Remote Control", "NUMPAD", "Mouse",
"Touchpad", "Trackball", "Presenter", "Receiver")
FIRMWARE_TYPES = ("Main (HID)", "Bootloader", "Hardware", "Other")
BATTERY_STATUSES = ("Discharging (in use)", "Recharging", "Almost full", "Full",
"Slow recharge", "Invalid battery", "Thermal error",
"Charging error")
ERROR_CODES = ("Ok", "Unknown", "Invalid argument", "Out of range",
"Hardware error", "Logitech internal", "Invalid feature index",
"Invalid function", "Busy", "Usupported")
"""Default timeout on read (in ms)."""
DEFAULT_TIMEOUT = 1000
"""Minimum size of a reply data packet."""
_MIN_REPLY_SIZE = 7
"""Maximum size of a reply data packet."""
_MAX_REPLY_SIZE = 32
#
# Exceptions that may be raised by this API.
#
class NoReceiver(Exception):
"""May be raised when trying to talk through a previously connected
receiver that is no longer available."""
pass
class FeatureNotSupported(Exception):
"""Raised when trying to request a feature not supported by the device."""
def __init__(self, device, feature):
super(FeatureNotSupported, self).__init__(device, feature, FEATURE_NAMES[feature])
self.device = device
self.feature = feature
self.feature_name = FEATURE_NAMES[feature]
#
#
#
def _default_event_hook(reply_code, device, data):
_l.trace1("EVENT_HOOK (,%d) code %d status [%s]", device, reply_code, data)
"""A function that will be called on incoming events.
It must be a function with the signature: ``_(int, int, str)``, where the
parameters are: (reply code, device number, data).
This function will be called by the request() function, when it receives replies
that do not match the write ca
"""
event_hook = _default_event_hook
def _publish_event(reply_code, device, data):
if event_hook is not None:
event_hook.__call__(reply_code, device, data)
#
# Low-level functions.
#
from . import hidapi
def open():
"""Opens the first Logitech UR found attached to the machine.
:returns: An open file handle for the found receiver, or ``None``.
"""
# USB ids for (Logitech, Unifying Receiver)
# interface 2 if the actual receiver interface
for rawdevice in hidapi.enumerate(0x046d, 0xc52b, 2):
_l.trace1("checking %s", rawdevice)
receiver = hidapi.open_path(rawdevice.path)
if not receiver:
# could be a file permissions issue
# in any case, unreachable
_l.trace1("[%s] open failed", rawdevice.path)
continue
_l.trace1("[%s] receiver handle (%d,)", rawdevice.path, receiver)
# ping on device id 0 (always an error)
hidapi.write(receiver, b'\x10\x00\x00\x10\x00\x00\xAA')
# if this is the right hidraw device, we'll receive a 'bad subdevice'
# otherwise, the read should produce nothing
reply = hidapi.read(receiver, _MAX_REPLY_SIZE, DEFAULT_TIMEOUT)
if reply:
if reply[:4] == b'\x10\x00\x8F\x00':
# 'device 0 unreachable' is the expected reply from a valid receiver handle
_l.trace1("[%s] success: handle (%d,)", rawdevice.path, receiver)
return receiver
if reply == b'\x01\x00\x00\x00\x00\x00\x00\x00':
# no idea what this is, but it comes up occasionally
_l.trace1("[%s] (%d,) mistery reply [%s]", rawdevice.path, receiver, reply)
else:
_l.trace1("[%s] (%d,) unknown reply [%s]", rawdevice.path, receiver, reply)
else:
_l.trace1("[%s] (%d,) no reply", rawdevice.path, receiver)
# ignore
close(receiver)
# hidapi.close(receiver)
return None
def close(handle):
"""Closes a HID device handle."""
if handle:
try:
hidapi.close(handle)
_l.trace1("(%d,) closed", handle)
return True
except Exception as e:
_l.debug("(%d,) closing: %s", handle, e)
return False
def write(handle, device, feature_index, function=b'\x00', param1=b'\x00', param2=b'\x00', param3=b'\x00'):
"""Write a feature call to the receiver.
:param handle: UR handle obtained with open().
:param device: attached device number
:param feature_index: index in the
"""
if type(feature_index) == int:
feature_index = chr(feature_index)
data = feature_index + function + param1 + param2 + param3
return _write(handle, device, data)
def _write(handle, device, data):
"""Writes some data to a certain device.
The first two (required) bytes of data must be the feature index for the
device, and a function code for that feature.
If the receiver is no longer available (e.g. has been physically removed
from the machine), raises NoReceiver.
"""
wdata = b'\x10' + chr(device) + data + b'\x00' * (5 - len(data))
_l.trace1("(%d,%d) <= w[%s]", handle, device, wdata)
# return hidapi.write(handle, wdata)
if not hidapi.write(handle, wdata):
_l.trace1("(%d,%d) write failed, assuming receiver has been removed", handle, device)
raise NoReceiver()
def read(handle, timeout=DEFAULT_TIMEOUT):
"""Read some data from the receiver. Usually called after a write (feature
call), to get the reply.
If any data was read in the given timeout, returns a tuple of
(reply_code, device, message data). The reply code should be ``0x11`` for a
successful feature call, or ``0x10`` to indicate some error, e.g. the device
is no longer available.
"""
data = hidapi.read(handle, _MAX_REPLY_SIZE, timeout)
if data:
_l.trace1("(%d,*) => r[%s]", handle, data)
if len(data) < _MIN_REPLY_SIZE:
_l.trace1("(%d,*) => r[%s] read short reply", handle, data)
if len(data) > _MAX_REPLY_SIZE:
_l.trace1("(%d,*) => r[%s] read long reply", handle, data)
return ord(data[0]), ord(data[1]), data[2:]
else:
_l.trace1("(%d,*) => r[]", handle)
def request(handle, device, feature, function=b'\x00', data=b'', features_array=None):
"""Makes a feature call to the device, and returns the reply data.
Basically a write() followed by (possibly multiple) reads, until a reply
matching the called feature is received. In theory the UR will always reply
to feature call; otherwise this function will wait indefinetly.
Incoming data packets not matching the feature and function will be
delivered to the event_hook (if any), and then ignored.
The optional ``features_array`` parameter is a cached result of the
get_device_features function for this device, necessary to find the feature
index. If the ``features_arrary`` is not provided, one will be obtained by
calling get_device_features.
If the feature is not supported, returns None.
"""
if features_array is None:
features_array = get_device_features(handle, device)
if features_array is None:
_l.trace1("(%d,%d) no features array available", handle, device)
return None
if feature in features_array:
feature_index = chr(features_array.index(feature))
return _request(handle, device, feature_index + function, data)
else:
_l.warn("(%d,%d) feature <%s:%s> not supported", handle, device, feature.encode('hex'), FEATURE_NAMES[feature])
raise FeatureNotSupported(device, feature)
def _request(handle, device, feature_function, data=b''):
"""Makes a feature call device and waits for a matching reply.
Only call this in the initial set-up of the device.
This function will skip all incoming messages and events not related to the
device we're requesting for, or the feature specified in the initial
request; it will also wait for a matching reply indefinetly.
:param feature_function: a two-byte string of (feature_index, function).
:param data: additional data to send, up to 5 bytes.
:returns:
"""
_l.trace1("(%d,%d) request feature %s data %s", handle, device, feature_function, data)
_write(handle, device, feature_function + data)
while True:
reply = read(handle)
if not reply:
# keep waiting...
continue
if reply[1] != device:
# this message not for the device we're interested in
_l.trace1("(%d,%d) request reply for unexpected device %s", handle, device, reply)
_publish_event(*reply)
continue
if reply[0] == 0x10 and reply[2][0] == b'\x8F':
# device not present
_l.trace1("(%d,%d) request ping failed %s", handle, device, reply)
return None
if reply[0] == 0x11 and reply[2][0] == b'\xFF' and reply[2][1:3] == feature_function:
# an error returned from the device
error = ord(reply[2][3])
_l.trace1("(%d,%d) request feature call error %d = %s: %s", handle, device, error, ERROR_CODES[error], reply)
return None
if reply[0] == 0x11 and reply[2][:2] == feature_function:
# a matching reply
_l.trace1("(%d,%d) matched reply with data [%s]", handle, device, reply[2][2:])
return reply[2][2:]
_l.trace1("(%d,%d) unmatched reply %s (expected %s)", handle, device, reply[2][:2], feature_function)
_publish_event(*reply)
def ping(handle, device):
"""Pings a device to check if it is attached to the UR.
:returns: True if the device is connected to the UR, False if the device is
not attached, None if no conclusive reply is received.
"""
def _status(reply):
if not reply:
return None
if reply[1] != device:
# oops
_l.trace1("(%d,%d) ping: reply for another device: %s", handle, device, reply)
_publish_event(*reply)
return _status(read(handle))
if (reply[0] == 0x11 and reply[2][:2] == b'\x00\x10' and reply[2][4] == b'\xAA'):
# ping ok
_l.trace1("(%d,%d) ping: ok %s", handle, device, reply[2])
return True
if (reply[0] == 0x10 and reply[2][:2] == b'\x8F\x00'):
# ping failed
_l.trace1("(%d,%d) ping: device not present", handle, device)
return False
if (reply[0] == 0x11 and reply[2][:2] == b'\x09\x00' and reply[2][7:11] == b'GOOD'):
# some devices may reply with a SOLAR_STATUS packet before the
# ping_ok reply, especially right after the device connected to the
# receiver
_l.trace1("(%d,%d) ping: solar status %s", handle, device, reply[2])
_publish_event(*reply)
return _status(read(handle))
# ugh
_l.trace1("(%d,%d) ping: unknown reply for this device", handle, device, reply)
_publish_event(*reply)
return None
_l.trace1("(%d,%d) pinging", handle, device)
_write(handle, device, b'\x00\x10\x00\x00\xAA')
return _status(read(handle, DEFAULT_TIMEOUT * 3))
def get_feature_index(handle, device, feature):
"""Reads the index of a device's feature.
:returns: An int, or ``None`` if the feature is not available.
"""
_l.trace1("(%d,%d) get feature index <%s>", handle, device, feature.encode('hex'))
feature_index = _request(handle, device, FEATURE.ROOT, feature)
if feature_index:
# only consider active and supported features
if ord(feature_index[0]) and ord(feature_index[1]) & 0xA0 == 0:
_l.trace1("(%d,%d) feature <%s> index %s", handle, device, feature.encode('hex'), feature_index[0])
return ord(feature_index[0])
_l.warn("(%d,%d) feature <%s:%s> not supported", handle, device, feature.encode('hex'), FEATURE_NAMES[feature])
raise FeatureNotSupported(device, feature)
def get_device_features(handle, device):
"""Returns an array of feature ids.
Their position in the array is the index to be used when accessing that
feature on the device.
Only call this function in the initial set-up of the device, because
other messages and events not related to querying the feature set
will be ignored.
"""
_l.trace1("(%d,%d) get device features", handle, device)
# get the index of the FEATURE_SET
fs_index = _request(handle, device, FEATURE.ROOT, FEATURE.FEATURE_SET)
if not fs_index:
_l.trace1("(%d,%d) FEATURE_SET not available", handle, device)
return None
fs_index = fs_index[0]
# For debugging purposes, query all the available features on the device,
# even if unknown.
# get the number of active features the device has
features_count = _request(handle, device, fs_index + b'\x00')
if not features_count:
# this can happen if the device disappeard since the fs_index call
_l.trace1("(%d,%d) no features available?!", handle, device)
return None
features_count = ord(features_count[0])
# a device may have a maximum of 15 features
features = [None] * 0x10
_l.trace1("(%d,%d) found %d features", handle, device, features_count)
for index in range(1, 1 + features_count):
# for each index, get the feature residing at that index
feature = _request(handle, device, fs_index + b'\x10', chr(index))
if feature:
features[index] = feature[0:2].upper()
_l.trace1("(%d,%d) feature <%s> at index %d", handle, device, features[index].encode('hex'), index)
return None if all(c == None for c in features) else features
def get_device_firmware(handle, device, features_array=None):
"""Reads a device's firmware info.
Returns an list of tuples [ (firmware_type, firmware_version, ...), ... ],
ordered by firmware layer.
"""
fw_count = request(handle, device, FEATURE.FIRMWARE, features_array=features_array)
if fw_count:
fw_count = ord(fw_count[0])
fw = []
for index in range(0, fw_count):
fw_info = request(handle, device, FEATURE.FIRMWARE, function=b'\x10', data=chr(index), features_array=features_array)
if fw_info:
fw_type = ord(fw_info[0]) & 0x0F
if fw_type == 0 or fw_type == 1:
prefix = str(fw_info[1:4])
version = ( str((ord(fw_info[4]) & 0xF0) >> 4) +
str(ord(fw_info[4]) & 0x0F) +
'.' +
str((ord(fw_info[5]) & 0xF0) >> 4) +
str(ord(fw_info[5]) & 0x0F))
name = prefix + ' ' + version
build = 256 * ord(fw_info[6]) + ord(fw_info[7])
if build:
name += ' b' + str(build)
extras = fw_info[9:].rstrip('\x00')
_l.trace1("(%d:%d) firmware %d = %s %s extras=%s", handle, device, fw_type, FIRMWARE_TYPES[fw_type], name, extras.encode('hex'))
fw.append((fw_type, name, build, extras))
elif fw_type == 2:
version = ord(fw_info[1])
_l.trace1("(%d:%d) firmware 2 = Hardware v%x", handle, device, version)
fw.append((2, version))
else:
_l.trace1("(%d:%d) firmware other", handle, device)
fw.append((fw_type, ))
return fw
def get_device_type(handle, device, features_array=None):
"""Reads a device's type.
:see DEVICE_TYPES:
:returns: a string describing the device type, or ``None`` if the device is
not available or does not support the ``NAME`` feature.
"""
d_type = request(handle, device, FEATURE.NAME, function=b'\x20', features_array=features_array)
if d_type:
d_type = ord(d_type[0])
_l.trace1("(%d,%d) device type %d = %s", handle, device, d_type, DEVICE_TYPES[d_type])
return DEVICE_TYPES[d_type]
def get_device_name(handle, device, features_array=None):
"""Reads a device's name.
:returns: a string with the device name, or ``None`` if the device is not
available or does not support the ``NAME`` feature.
"""
name_length = request(handle, device, FEATURE.NAME, features_array=features_array)
if name_length:
name_length = ord(name_length[0])
d_name = ''
while len(d_name) < name_length:
name_index = len(d_name)
name_fragment = request(handle, device, FEATURE.NAME, function=b'\x10', data=chr(name_index), features_array=features_array)
name_fragment = name_fragment[:name_length - len(d_name)]
d_name += name_fragment
_l.trace1("(%d,%d) device name %s", handle, device, d_name)
return d_name
def get_device_battery_level(handle, device, features_array=None):
"""Reads a device's battery level.
"""
battery = request(handle, device, FEATURE.BATTERY, features_array=features_array)
if battery:
discharge = ord(battery[0])
dischargeNext = ord(battery[1])
status = ord(battery[2])
_l.trace1("(%d:%d) battery %d%% charged, next level %d%% charge, status %d = %s", discharge, dischargeNext, status, BATTERY_STATUSES[status])
return (discharge, dischargeNext, status)

View File

@ -3,4 +3,4 @@
cd `dirname "$0"`
export LD_LIBRARY_PATH=$PWD
exec python -Qnew -m unittest discover -v "$@"
exec python -m unittest discover -v "$@"

View File

@ -17,6 +17,7 @@ from logitech.devices import *
# A few constants
#
APP_TITLE = 'Solaar'
UNIFYING_RECEIVER = 'Unifying Receiver'
NO_DEVICES = 'No devices attached.'
@ -158,15 +159,15 @@ class StatusThread(threading.Thread):
else:
logging.warn("unknown event code %02x", code)
elif device:
logging.debug("got event (%d, %d, %s) for new device", code, device, data.encode('hex'))
logging.debug("got event (%d, %d, %s) for new device", code, device, data)
devinfo = ur.get_device_info(self.listener.receiver, device)
if devinfo:
self.devices[device] = devinfo
self.statuses[device] = [0, None, None]
else:
logging.warn("got event (%d, %d, %s) for unknown device", code, device, data.encode('hex'))
logging.warn("got event (%d, %d, %s) for unknown device", code, device, data)
else:
logging.warn("don't know how to handle event (%d, %d, %s)", code, device, data.encode('hex'))
logging.warn("don't know how to handle event (%d, %d, %s)", code, device, data)
if updated:
GObject.idle_add(self.update_status_icon)
@ -202,7 +203,7 @@ class StatusThread(threading.Thread):
devinfo = self.devices[d]
status_text = self.statuses[d][2]
if status_text:
all_statuses.append(unichr(0x274a) + ' ' + devinfo.name + '\n\t' + status_text)
all_statuses.append(devinfo.name + '\n\t' + status_text)
else:
all_statuses.append(devinfo.name)
@ -218,7 +219,7 @@ class StatusThread(threading.Thread):
if __name__ == '__main__':
logging.basicConfig(level=logging.DEBUG)
logging.basicConfig(level=6)
logging.captureWarnings(True)
status_icon = Gtk.StatusIcon.new_from_file('images/' + UNIFYING_RECEIVER + '.png')