diff --git a/lib/hidapi.py b/lib/hidapi.py index cee24562..e6b4e110 100644 --- a/lib/hidapi.py +++ b/lib/hidapi.py @@ -20,6 +20,7 @@ __version__ = '0.2-hidapi-0.7.0' import ctypes as _C +import struct # @@ -81,13 +82,13 @@ del namedtuple # create a DeviceInfo tuple from a hid_device object def _makeDeviceInfo(native_device_info): return DeviceInfo( - path=str(native_device_info.path), + path=native_device_info.path, vendor_id=hex(native_device_info.vendor_id)[2:], product_id=hex(native_device_info.product_id)[2:], - serial=str(native_device_info.serial) if native_device_info.serial else None, + serial=native_device_info.serial if native_device_info.serial else None, release=hex(native_device_info.release)[2:], - manufacturer=str(native_device_info.manufacturer), - product=str(native_device_info.product), + manufacturer=native_device_info.manufacturer, + product=native_device_info.product, interface=native_device_info.interface) @@ -307,7 +308,7 @@ def send_feature_report(device_handle, data, report_number=None): :returns: ``True`` if the report was successfully written to the device. """ if report_number is not None: - data = chr(report_number) + data + data = struct.pack('!B', report_number) + data bytes_written = _native.hid_send_feature_report(device_handle, _C.c_char_p(data), len(data)) return bytes_written > -1 @@ -323,7 +324,7 @@ def get_feature_report(device_handle, bytes_count, report_number=None): """ out_buffer = _C.create_string_buffer('\x00' * (bytes_count + 2)) if report_number is not None: - out_buffer[0] = chr(report_number) + out_buffer[0] = struct.pack('!B', report_number) bytes_read = _native.hid_get_feature_report(device_handle, out_buffer, bytes_count) if bytes_read > -1: return out_buffer[:bytes_read] diff --git a/lib/logitech/devices/k750.py b/lib/logitech/devices/k750.py index 92f5a8be..e500f925 100644 --- a/lib/logitech/devices/k750.py +++ b/lib/logitech/devices/k750.py @@ -3,6 +3,7 @@ # import logging +import struct from ..unifying_receiver import api as _api from .constants import * @@ -32,7 +33,7 @@ _LIGHTING_LIMITS = (450, 310, 190, -1) def _charge_status(data): - charge = ord(data[2]) + charge, lux = struct.unpack('!BH', data[2:5]) for i in range(0, len(_CHARGE_LIMITS)): if charge >= _CHARGE_LIMITS[i]: @@ -40,7 +41,6 @@ def _charge_status(data): break text = 'Charge %d%% (%s)' % (charge, _STATUS_NAMES[charge_index]) - lux = (ord(data[3]) << 8) + ord(data[4]) if lux > 0: for i in range(0, len(_CHARGE_LIMITS)): if lux > _LIGHTING_LIMITS[i]: diff --git a/lib/logitech/unifying_receiver/api.py b/lib/logitech/unifying_receiver/api.py index 35f3c60a..d396dcf7 100644 --- a/lib/logitech/unifying_receiver/api.py +++ b/lib/logitech/unifying_receiver/api.py @@ -3,6 +3,8 @@ # import logging +import struct +from binascii import hexlify from .constants import * from .exceptions import * @@ -95,10 +97,10 @@ def request(handle, device, feature, function=b'\x00', params=b'', features_arra _l.log(_LOG_LEVEL, "(%d,%d) no features array available", handle, device) return None if feature in features_array: - feature_index = chr(features_array.index(feature)) + feature_index = struct.pack('!B', features_array.index(feature)) if feature_index is None: - _l.warn("(%d,%d) feature <%s:%s> not supported", handle, device, feature.encode('hex'), FEATURE_NAME(feature)) + _l.warn("(%d,%d) feature <%s:%s> not supported", handle, device, hexlify(feature), FEATURE_NAME(feature)) raise FeatureNotSupported(device, feature) return base.request(handle, device, feature_index + function, params) @@ -121,13 +123,13 @@ def ping(handle, device): if reply_device != device: # oops - _l.log(_LOG_LEVEL, "(%d,%d) ping: reply for another device %d: %s", handle, device, reply_device, reply_data.encode('hex')) + _l.log(_LOG_LEVEL, "(%d,%d) ping: reply for another device %d: %s", handle, device, reply_device, hexlify(reply_data)) _unhandled_publish(reply_code, reply_device, reply_data) return _status(base.read(handle)) - if (reply_code == 0x11 and reply_data[:2] == b'\x00\x10' and reply_data[4] == ping_marker): + if (reply_code == 0x11 and reply_data[:2] == b'\x00\x10' and reply_data[4:5] == ping_marker): # ping ok - _l.log(_LOG_LEVEL, "(%d,%d) ping: ok [%s]", handle, device, reply_data.encode('hex')) + _l.log(_LOG_LEVEL, "(%d,%d) ping: ok [%s]", handle, device, hexlify(reply_data)) return True if (reply_code == 0x10 and reply_data[:2] == b'\x8F\x00'): @@ -136,15 +138,15 @@ def ping(handle, device): return False if (reply_code == 0x11 and reply_data[:2] == b'\x09\x00' and len(reply_data) == 18 and reply_data[7:11] == b'GOOD'): - # some devices may reply with a SOLAR_STATUS event before the + # some devices may reply with a SOLAR_CHARGE event before the # ping_ok reply, especially right after the device connected to the # receiver - _l.log(_LOG_LEVEL, "(%d,%d) ping: solar status %s", handle, device, reply_data.encode('hex')) + _l.log(_LOG_LEVEL, "(%d,%d) ping: solar status %s", handle, device, hexlify(reply_data)) _unhandled_publish(reply_code, reply_device, reply_data) return _status(base.read(handle)) # ugh - _l.log(_LOG_LEVEL, "(%d,%d) ping: unknown reply for this device", handle, device, reply) + _l.log(_LOG_LEVEL, "(%d,%d) ping: unknown reply for this device: %d=[%s]", handle, device, reply[0], hexlify(reply[2])) _unhandled_publish(reply_code, reply_device, reply_data) return None @@ -209,30 +211,30 @@ def get_feature_index(handle, device, feature): :returns: An int, or ``None`` if the feature is not available. """ - _l.log(_LOG_LEVEL, "(%d,%d) get feature index <%s:%s>", handle, device, feature.encode('hex'), FEATURE_NAME(feature)) + _l.log(_LOG_LEVEL, "(%d,%d) get feature index <%s:%s>", handle, device, hexlify(feature), FEATURE_NAME(feature)) if len(feature) != 2: - raise ValueError("invalid feature <%s>: it must be a two-byte string" % (feature.encode(hex))) + raise ValueError("invalid feature <%s>: it must be a two-byte string" % feature) # FEATURE.ROOT should always be available for any attached devices reply = base.request(handle, device, FEATURE.ROOT, feature) if reply: # only consider active and supported features - feature_index = ord(reply[0]) + feature_index = ord(reply[0:1]) if feature_index: - feature_flags = ord(reply[1]) & 0xE0 - _l.log(_LOG_LEVEL, "(%d,%d) feature <%s:%s> has index %d flags %02x", handle, device, feature.encode('hex'), FEATURE_NAME(feature), feature_index, feature_flags) + feature_flags = ord(reply[1:2]) & 0xE0 + _l.log(_LOG_LEVEL, "(%d,%d) feature <%s:%s> has index %d flags %02x", handle, device, hexlify(feature), FEATURE_NAME(feature), feature_index, feature_flags) if feature_flags == 0: return feature_index if feature_flags & 0x80: - _l.warn("(%d,%d) feature <%s:%s> not supported: obsolete", handle, device, feature.encode('hex'), FEATURE_NAME(feature)) + _l.warn("(%d,%d) feature <%s:%s> not supported: obsolete", handle, device, hexlify(feature), FEATURE_NAME(feature)) if feature_flags & 0x40: - _l.warn("(%d,%d) feature <%s:%s> not supported: hidden", handle, device, feature.encode('hex'), FEATURE_NAME(feature)) + _l.warn("(%d,%d) feature <%s:%s> not supported: hidden", handle, device, hexlify(feature), FEATURE_NAME(feature)) if feature_flags & 0x20: - _l.warn("(%d,%d) feature <%s:%s> not supported: Logitech internal", handle, device, feature.encode('hex'), FEATURE_NAME(feature)) + _l.warn("(%d,%d) feature <%s:%s> not supported: Logitech internal", handle, device, hexlify(feature), FEATURE_NAME(feature)) raise FeatureNotSupported(device, feature) else: - _l.warn("(%d,%d) feature <%s:%s> not supported by the device", handle, device, feature.encode('hex'), FEATURE_NAME(feature)) + _l.warn("(%d,%d) feature <%s:%s> not supported by the device", handle, device, hexlify(feature), FEATURE_NAME(feature)) raise FeatureNotSupported(device, feature) @@ -250,7 +252,7 @@ def get_device_features(handle, device): if not fs_index: _l.warn("(%d,%d) FEATURE_SET not available", handle, device) return None - fs_index = fs_index[0] + fs_index = fs_index[:1] # For debugging purposes, query all the available features on the device, # even if unknown. @@ -263,18 +265,18 @@ def get_device_features(handle, device): _l.log(_LOG_LEVEL, "(%d,%d) no features available?!", handle, device) return None - features_count = ord(features_count[0]) + features_count = ord(features_count[:1]) _l.log(_LOG_LEVEL, "(%d,%d) found %d features", handle, device, features_count) # a device may have a maximum of 15 features, other than FEATURE.ROOT features = [None] * 0x10 for index in range(1, 1 + features_count): # for each index, get the feature residing at that index - feature = base.request(handle, device, fs_index + b'\x10', chr(index)) + feature = base.request(handle, device, fs_index + b'\x10', struct.pack('!B', index)) if feature: feature = feature[0:2].upper() features[index] = feature - _l.log(_LOG_LEVEL, "(%d,%d) feature <%s:%s> at index %d", handle, device, feature.encode('hex'), FEATURE_NAME(feature), index) + _l.log(_LOG_LEVEL, "(%d,%d) feature <%s:%s> at index %d", handle, device, hexlify(feature), FEATURE_NAME(feature), index) return None if all(c == None for c in features) else features @@ -282,34 +284,35 @@ def get_device_features(handle, device): def get_device_firmware(handle, device, features_array=None): """Reads a device's firmware info. - Returns an list of tuples [ (firmware_type, firmware_version, ...), ... ], - ordered by firmware layer. + :returns: a list of FirmwareInfo tuples, ordered by firmware layer. """ fw_count = request(handle, device, FEATURE.FIRMWARE, features_array=features_array) if fw_count: - fw_count = ord(fw_count[0]) + fw_count = ord(fw_count[:1]) fw = [] for index in range(0, fw_count): - fw_info = request(handle, device, FEATURE.FIRMWARE, function=b'\x10', params=chr(index), features_array=features_array) + index = struct.pack('!B', index) + fw_info = request(handle, device, FEATURE.FIRMWARE, function=b'\x10', params=index, features_array=features_array) if fw_info: - fw_level = ord(fw_info[0]) & 0x0F + fw_level = ord(fw_info[:1]) & 0x0F if fw_level == 0 or fw_level == 1: fw_type = FIRMWARE_TYPES[fw_level] - name = str(fw_info[1:4]) - version = ( str((ord(fw_info[4]) & 0xF0) >> 4) + - str(ord(fw_info[4]) & 0x0F) + + name, = struct.unpack('!3s', fw_info[1:4]) + name = name.decode('ascii') + version = ( chr(0x30 + (ord(fw_info[4:5]) >> 4)) + + chr(0x30 + (ord(fw_info[4:5]) & 0x0F)) + '.' + - str((ord(fw_info[5]) & 0xF0) >> 4) + - str(ord(fw_info[5]) & 0x0F)) - build = (ord(fw_info[6]) << 8) + ord(fw_info[7]) - extras = fw_info[9:].rstrip('\x00') + chr(0x30 + (ord(fw_info[5:6]) >> 4)) + + chr(0x30 + (ord(fw_info[5:6]) & 0x0F))) + build, = struct.unpack('!H', fw_info[6:8]) + extras = fw_info[9:].rstrip(b'\x00') if extras: fw_info = _makeFirmwareInfo(level=fw_level, type=fw_type, name=name, version=version, build=build, extras=extras) else: fw_info = _makeFirmwareInfo(level=fw_level, type=fw_type, name=name, version=version, build=build) elif fw_level == 2: - fw_info = _makeFirmwareInfo(level=2, type=FIRMWARE_TYPES[2], version=ord(fw_info[1])) + fw_info = _makeFirmwareInfo(level=2, type=FIRMWARE_TYPES[2], version=ord(fw_info[1:2])) else: fw_info = _makeFirmwareInfo(level=fw_level, type=FIRMWARE_TYPES[-1]) @@ -327,7 +330,7 @@ def get_device_type(handle, device, features_array=None): """ d_type = request(handle, device, FEATURE.NAME, function=b'\x20', features_array=features_array) if d_type: - d_type = ord(d_type[0]) + d_type = ord(d_type[:1]) _l.log(_LOG_LEVEL, "(%d,%d) device type %d = %s", handle, device, d_type, DEVICE_TYPES[d_type]) return DEVICE_TYPES[d_type] @@ -340,15 +343,16 @@ def get_device_name(handle, device, features_array=None): """ name_length = request(handle, device, FEATURE.NAME, features_array=features_array) if name_length: - name_length = ord(name_length[0]) + name_length = ord(name_length[:1]) - d_name = '' + d_name = b'' while len(d_name) < name_length: - name_index = len(d_name) - name_fragment = request(handle, device, FEATURE.NAME, function=b'\x10', params=chr(name_index), features_array=features_array) + name_index = struct.pack('!B', len(d_name)) + name_fragment = request(handle, device, FEATURE.NAME, function=b'\x10', params=name_index, features_array=features_array) name_fragment = name_fragment[:name_length - len(d_name)] d_name += name_fragment + d_name = d_name.decode('ascii') _l.log(_LOG_LEVEL, "(%d,%d) device name %s", handle, device, d_name) return d_name @@ -360,8 +364,6 @@ def get_device_battery_level(handle, device, features_array=None): """ battery = request(handle, device, FEATURE.BATTERY, features_array=features_array) if battery: - discharge = ord(battery[0]) - dischargeNext = ord(battery[1]) - status = ord(battery[2]) + discharge, dischargeNext, status = struct.unpack('!BBB', battery[:3]) _l.log(_LOG_LEVEL, "(%d:%d) battery %d%% charged, next level %d%% charge, status %d = %s", discharge, dischargeNext, status, BATTERY_STATUSES[status]) return (discharge, dischargeNext, status) diff --git a/lib/logitech/unifying_receiver/base.py b/lib/logitech/unifying_receiver/base.py index 83a2480d..9f34388c 100644 --- a/lib/logitech/unifying_receiver/base.py +++ b/lib/logitech/unifying_receiver/base.py @@ -4,6 +4,8 @@ # import logging +import struct +from binascii import hexlify from .constants import * from .exceptions import * @@ -50,6 +52,7 @@ MAX_ATTACHED_DEVICES = 6 # # + def list_receiver_devices(): """List all the Linux devices exposed by the UR attached to the machine.""" # (Vendor ID, Product ID) = ('Logitech', 'Unifying Receiver') @@ -93,9 +96,9 @@ def try_open(path): # any other replies are ignored, and will assume this is the wrong receiver device if reply == b'\x01\x00\x00\x00\x00\x00\x00\x00': # no idea what this is, but it comes up occasionally - _l.log(_LOG_LEVEL, "[%s] (%d,) mistery reply [%s]", path, receiver_handle, reply.encode('hex')) + _l.log(_LOG_LEVEL, "[%s] (%d,) mistery reply [%s]", path, receiver_handle, hexlify(reply)) else: - _l.log(_LOG_LEVEL, "[%s] (%d,) unknown reply [%s]", path, receiver_handle, reply.encode('hex')) + _l.log(_LOG_LEVEL, "[%s] (%d,) unknown reply [%s]", path, receiver_handle, hexlify(reply)) else: _l.log(_LOG_LEVEL, "[%s] (%d,) no reply", path, receiver_handle) @@ -143,12 +146,16 @@ def write(handle, device, data): been physically removed from the machine, or the kernel driver has been unloaded. The handle will be closed automatically. """ - wdata = b'\x10' + chr(device) + data + b'\x00' * (5 - len(data)) - _l.log(_LOG_LEVEL, "(%d,%d) <= w[%s]", handle, device, wdata.encode('hex')) + if len(data) < _MIN_CALL_SIZE - 2: + data += b'\x00' * (_MIN_CALL_SIZE - 2 - len(data)) + elif len(data) > _MIN_CALL_SIZE - 2: + data += b'\x00' * (_MAX_CALL_SIZE - 2 - len(data)) + wdata = struct.pack('!BB', 0x10, device) + data + _l.log(_LOG_LEVEL, "(%d,%d) <= w[%s]", handle, device, hexlify(wdata)) if len(wdata) < _MIN_CALL_SIZE: - _l.warn("(%d:%d) <= w[%s] call packet too short: %d bytes", handle, device, wdata.encode('hex'), len(wdata)) + _l.warn("(%d:%d) <= w[%s] call packet too short: %d bytes", handle, device, hexlify(wdata), len(wdata)) if len(wdata) > _MAX_CALL_SIZE: - _l.warn("(%d:%d) <= w[%s] call packet too long: %d bytes", handle, device, wdata.encode('hex'), len(wdata)) + _l.warn("(%d:%d) <= w[%s] call packet too long: %d bytes", handle, device, hexlify(wdata), len(wdata)) if not _hid.write(handle, wdata): _l.warn("(%d,%d) write failed, assuming receiver no longer available", handle, device) close(handle) @@ -178,12 +185,13 @@ def read(handle, timeout=DEFAULT_TIMEOUT): raise NoReceiver if data: - _l.log(_LOG_LEVEL, "(%d,*) => r[%s]", handle, data.encode('hex')) + _l.log(_LOG_LEVEL, "(%d,*) => r[%s]", handle, hexlify(data)) if len(data) < _MIN_REPLY_SIZE: - _l.warn("(%d,*) => r[%s] read packet too short: %d bytes", handle, data.encode('hex'), len(data)) + _l.warn("(%d,*) => r[%s] read packet too short: %d bytes", handle, hexlify(data), len(data)) if len(data) > _MAX_REPLY_SIZE: - _l.warn("(%d,*) => r[%s] read packet too long: %d bytes", handle, data.encode('hex'), len(data)) - return ord(data[0]), ord(data[1]), data[2:] + _l.warn("(%d,*) => r[%s] read packet too long: %d bytes", handle, hexlify(data), len(data)) + code, device = struct.unpack('!BB', data[:2]) + return code, device, data[2:] _l.log(_LOG_LEVEL, "(%d,*) => r[]", handle) @@ -204,9 +212,9 @@ def request(handle, device, feature_index_function, params=b'', features_array=N available. :raisees FeatureCallError: if the feature call replied with an error. """ - _l.log(_LOG_LEVEL, "(%d,%d) request {%s} params [%s]", handle, device, feature_index_function.encode('hex'), params.encode('hex')) + _l.log(_LOG_LEVEL, "(%d,%d) request {%s} params [%s]", handle, device, hexlify(feature_index_function), hexlify(params)) if len(feature_index_function) != 2: - raise ValueError('invalid feature_index_function {%s}: it must be a two-byte string' % feature_index_function.encode('hex')) + raise ValueError('invalid feature_index_function {%s}: it must be a two-byte string' % hexlify(feature_index_function)) write(handle, device, feature_index_function + params) while True: @@ -220,35 +228,35 @@ def request(handle, device, feature_index_function, params=b'', features_array=N if reply_device != device: # this message not for the device we're interested in - _l.log(_LOG_LEVEL, "(%d,%d) request got reply for unexpected device %d: [%s]", handle, device, reply_device, reply_data.encode('hex')) + _l.log(_LOG_LEVEL, "(%d,%d) request got reply for unexpected device %d: [%s]", handle, device, hexlify(reply_device), hexlify(reply_data)) # worst case scenario, this is a reply for a concurrent request # on this receiver _unhandled._publish(reply_code, reply_device, reply_data) continue - if reply_code == 0x10 and reply_data[0] == b'\x8F' and reply_data[1:2] == feature_index_function: + if reply_code == 0x10 and reply_data[:1] == b'\x8F' and reply_data[1:2] == feature_index_function: # device not present - _l.log(_LOG_LEVEL, "(%d,%d) request ping failed on {%s} call: [%s]", handle, device, feature_index_function.encode('hex'), reply_data.encode('hex')) + _l.log(_LOG_LEVEL, "(%d,%d) request ping failed on {%s} call: [%s]", handle, device, hexlify(feature_index_function), hexlify(reply_data)) return None - if reply_code == 0x10 and reply_data[0] == b'\x8F': + if reply_code == 0x10 and reply_data[:1] == b'\x8F': # device not present - _l.log(_LOG_LEVEL, "(%d,%d) request ping failed: [%s]", handle, device, reply_data.encode('hex')) + _l.log(_LOG_LEVEL, "(%d,%d) request ping failed: [%s]", handle, device, hexlify(reply_data)) return None if reply_code == 0x11 and reply_data[0] == b'\xFF' and reply_data[1:3] == feature_index_function: # an error returned from the device error_code = ord(reply_data[3]) - _l.warn("(%d,%d) request feature call error %d = %s: %s", handle, device, error_code, ERROR_NAME(error_code), reply_data.encode('hex')) - feature_index = ord(feature_index_function[0]) - feature_function = feature_index_function[1].encode('hex') + _l.warn("(%d,%d) request feature call error %d = %s: %s", handle, device, error_code, ERROR_NAME(error_code), hexlify(reply_data)) + feature_index = ord(feature_index_function[:1]) + feature_function = feature_index_function[1:2] feature = None if features_array is None else features_array[feature_index] raise FeatureCallError(device, feature, feature_index, feature_function, error_code, reply_data) if reply_code == 0x11 and reply_data[:2] == feature_index_function: # a matching reply - _l.log(_LOG_LEVEL, "(%d,%d) matched reply with data [%s]", handle, device, reply_data[2:].encode('hex')) + _l.log(_LOG_LEVEL, "(%d,%d) matched reply with data [%s]", handle, device, hexlify(reply_data[2:])) return reply_data[2:] - _l.log(_LOG_LEVEL, "(%d,%d) unmatched reply {%s} (expected {%s})", handle, device, reply_data[:2].encode('hex'), feature_index_function.encode('hex')) + _l.log(_LOG_LEVEL, "(%d,%d) unmatched reply {%s} (expected {%s})", handle, device, hexlify(reply_data[:2]), hexlify(feature_index_function)) _unhandled._publish(reply_code, reply_device, reply_data) diff --git a/lib/logitech/unifying_receiver/constants.py b/lib/logitech/unifying_receiver/constants.py index ff3be916..47e0b2f5 100644 --- a/lib/logitech/unifying_receiver/constants.py +++ b/lib/logitech/unifying_receiver/constants.py @@ -43,7 +43,7 @@ def FEATURE_NAME(feature_code): return None if feature_code in _FEATURE_NAMES: return _FEATURE_NAMES[feature_code] - return 'UNKNOWN_' + feature_code.encode('hex') + return 'UNKNOWN_%s' % feature_code """Possible types of devices connected to an UR.""" diff --git a/lib/logitech/unifying_receiver/listener.py b/lib/logitech/unifying_receiver/listener.py index 1b32f187..bcc9a3b1 100644 --- a/lib/logitech/unifying_receiver/listener.py +++ b/lib/logitech/unifying_receiver/listener.py @@ -5,6 +5,7 @@ import logging import threading from time import sleep +from binascii import hexlify from . import base from .exceptions import * @@ -31,7 +32,7 @@ class EventsListener(threading.Thread): API calls in the events callback. """ def __init__(self, receiver, events_callback): - super(EventsListener, self).__init__(name='Unifying_Receiver_Listener_' + str(receiver)) + super(EventsListener, self).__init__(name='Unifying_Receiver_Listener_' + hex(receiver)) self.daemon = True self.receiver = receiver @@ -87,7 +88,7 @@ class EventsListener(threading.Thread): self.task = self.task_reply = None self.task_processing.release() - # _l.log(_LOG_LEVEL, "(%d) request '%s' => [%s]", self.receiver, api_function.__name__, reply.encode('hex')) + # _l.log(_LOG_LEVEL, "(%d) request '%s' => [%s]", self.receiver, api_function.__name__, hexlify(reply)) if isinstance(reply, Exception): raise reply return reply diff --git a/lib/logitech/unifying_receiver/tests/test_10_constants.py b/lib/logitech/unifying_receiver/tests/test_10_constants.py index 1fcd37f0..c889d9fc 100644 --- a/lib/logitech/unifying_receiver/tests/test_10_constants.py +++ b/lib/logitech/unifying_receiver/tests/test_10_constants.py @@ -3,6 +3,7 @@ # import unittest +import struct from logitech.unifying_receiver import constants @@ -12,23 +13,23 @@ class Test_UR_Constants(unittest.TestCase): def test_10_feature_names(self): self.assertIsNone(constants.FEATURE_NAME(None)) for code in range(0x0000, 0x10000): - feature = chr((code & 0xFF00) >> 8) + chr(code & 0x00FF) + feature = struct.pack('!H', code) name = constants.FEATURE_NAME(feature) self.assertIsNotNone(name) if name.startswith('UNKNOWN_'): - self.assertEquals(code, int(name[8:], 16)) + self.assertEqual(code, struct.unpack('!H', feature)[0]) else: self.assertTrue(hasattr(constants.FEATURE, name)) - self.assertEquals(feature, getattr(constants.FEATURE, name)) + self.assertEqual(feature, getattr(constants.FEATURE, name)) def test_20_error_names(self): for code in range(0x00, 0x100): name = constants.ERROR_NAME(code) self.assertIsNotNone(name) if code > 9: - self.assertEquals(name, 'Unknown Error') + self.assertEqual(name, 'Unknown Error') else: - self.assertEquals(code, constants._ERROR_NAMES.index(name)) + self.assertEqual(code, constants._ERROR_NAMES.index(name)) if __name__ == '__main__': unittest.main() diff --git a/lib/logitech/unifying_receiver/tests/test_30_base.py b/lib/logitech/unifying_receiver/tests/test_30_base.py index 0c63f26c..a5811449 100644 --- a/lib/logitech/unifying_receiver/tests/test_30_base.py +++ b/lib/logitech/unifying_receiver/tests/test_30_base.py @@ -3,6 +3,7 @@ # import unittest +from binascii import hexlify from logitech.unifying_receiver import base from logitech.unifying_receiver.exceptions import * @@ -63,11 +64,11 @@ class Test_UR_Base(unittest.TestCase): self.assertIsInstance(reply, tuple, "read should have returned a tuple") reply_code, reply_device, reply_data = reply - self.assertEquals(reply_device, 0, "got ping reply for valid device") - self.assertGreater(len(reply_data), 4, "ping reply has wrong length: " + reply_data.encode('hex')) + self.assertEqual(reply_device, 0, "got ping reply for valid device") + self.assertGreater(len(reply_data), 4, "ping reply has wrong length: %s" % hexlify(reply_data)) if reply_code == 0x10: # ping fail - self.assertEquals(reply_data[:3], b'\x8F\x00\x10', "0x10 reply with unknown reply data: " + reply_data.encode('hex')) + self.assertEqual(reply_data[:3], b'\x8F\x00\x10', "0x10 reply with unknown reply data: %s" % hexlify(reply_data)) elif reply_code == 0x11: self.fail("Got valid ping from device 0") else: @@ -87,15 +88,15 @@ class Test_UR_Base(unittest.TestCase): self.assertIsInstance(reply, tuple, "read should have returned a tuple") reply_code, reply_device, reply_data = reply - self.assertEquals(reply_device, device, "ping reply for wrong device") - self.assertGreater(len(reply_data), 4, "ping reply has wrong length: " + reply_data.encode('hex')) + self.assertEqual(reply_device, device, "ping reply for wrong device") + self.assertGreater(len(reply_data), 4, "ping reply has wrong length: %s" % hexlify(reply_data)) if reply_code == 0x10: # ping fail - self.assertEquals(reply_data[:3], b'\x8F\x00\x10', "0x10 reply with unknown reply data: " + reply_data.encode('hex')) + self.assertEqual(reply_data[:3], b'\x8F\x00\x10', "0x10 reply with unknown reply data: %s" % hexlify(reply_data)) elif reply_code == 0x11: # ping ok - self.assertEquals(reply_data[:2], b'\x00\x10', "0x11 reply with unknown reply data: " + reply_data.encode('hex')) - self.assertEquals(reply_data[4], b'\xAA') + self.assertEqual(reply_data[:2], b'\x00\x10', "0x11 reply with unknown reply data: %s" % hexlify(reply_data)) + self.assertEqual(reply_data[4:5], b'\xAA') devices.append(device) else: self.fail("ping got bad reply code: " + reply) @@ -119,7 +120,7 @@ class Test_UR_Base(unittest.TestCase): reply = base.request(self.handle, self.device, FEATURE.ROOT) self.assertIsNotNone(reply, "request returned None reply") - self.assertEquals(reply[:2], b'\x00\x00', "request returned for wrong feature id") + self.assertEqual(reply[:2], b'\x00\x00', "request returned for wrong feature id") def test_55_request_root_feature_set(self): if self.handle is None: @@ -129,8 +130,8 @@ class Test_UR_Base(unittest.TestCase): reply = base.request(self.handle, self.device, FEATURE.ROOT, FEATURE.FEATURE_SET) self.assertIsNotNone(reply, "request returned None reply") - index = ord(reply[0]) - self.assertGreater(index, 0, "FEATURE_SET not available on device " + str(self.device)) + index = reply[:1] + self.assertGreater(index, b'\x00', "FEATURE_SET not available on device " + str(self.device)) def test_57_request_ignore_undhandled(self): if self.handle is None: @@ -140,8 +141,8 @@ class Test_UR_Base(unittest.TestCase): fs_index = base.request(self.handle, self.device, FEATURE.ROOT, FEATURE.FEATURE_SET) self.assertIsNotNone(fs_index) - fs_index = fs_index[0] - self.assertGreater(fs_index, 0) + fs_index = fs_index[:1] + self.assertGreater(fs_index, b'\x00') global received_unhandled received_unhandled = None @@ -160,7 +161,7 @@ class Test_UR_Base(unittest.TestCase): base.write(self.handle, self.device, FEATURE.ROOT + FEATURE.FEATURE_SET) reply = base.request(self.handle, self.device, fs_index + b'\x00') self.assertIsNotNone(reply, "request returned None reply") - self.assertNotEquals(reply[0], b'\x00') + self.assertNotEquals(reply[:1], b'\x00') # self.assertIsNotNone(received_unhandled, "extra message not received by unhandled hook") received_unhandled = None @@ -168,7 +169,7 @@ class Test_UR_Base(unittest.TestCase): base.write(self.handle, self.device, FEATURE.ROOT + FEATURE.FEATURE_SET) reply = base.request(self.handle, self.device, fs_index + b'\x00') self.assertIsNotNone(reply, "request returned None reply") - self.assertNotEquals(reply[0], b'\x00') + self.assertNotEquals(reply[:1], b'\x00') self.assertIsNone(received_unhandled) del received_unhandled diff --git a/lib/logitech/unifying_receiver/tests/test_50_api.py b/lib/logitech/unifying_receiver/tests/test_50_api.py index 1a1721b1..8abeb6ad 100644 --- a/lib/logitech/unifying_receiver/tests/test_50_api.py +++ b/lib/logitech/unifying_receiver/tests/test_50_api.py @@ -71,7 +71,7 @@ class Test_UR_API(unittest.TestCase): reply = api.request(self.handle, self.device, FEATURE.ROOT, params=b'\xFF\xFF') self.assertIsNotNone(reply, "invalid reply") - self.assertEquals(reply[:5], b'\x00' * 5, "invalid reply") + self.assertEqual(reply[:5], b'\x00' * 5, "invalid reply") def test_40_get_device_features(self): if self.handle is None: @@ -171,7 +171,7 @@ class Test_UR_API(unittest.TestCase): all_devices = api.list_devices(self.handle) for device_info in all_devices: device = api.find_device_by_name(self.handle, device_info.name) - self.assertEquals(device, device_info) + self.assertEqual(device, device_info) if __name__ == '__main__': unittest.main() diff --git a/lib/logitech/unifying_receiver/unhandled.py b/lib/logitech/unifying_receiver/unhandled.py index ec8ebaf7..acc1cf15 100644 --- a/lib/logitech/unifying_receiver/unhandled.py +++ b/lib/logitech/unifying_receiver/unhandled.py @@ -4,12 +4,14 @@ # import logging +from binascii import hexlify + _l = logging.getLogger('logitech.unifying_receiver.unhandled') def _logging_unhandled_hook(reply_code, device, data): """Default unhandled hook, logs the reply as DEBUG.""" - _l.debug("UNHANDLED (,%d) code 0x%02x data [%s]", device, reply_code, data.encode('hex')) + _l.debug("UNHANDLED (,%d) code 0x%02x data [%s]", device, reply_code, hexlify(data)) _unhandled_hook = _logging_unhandled_hook diff --git a/lib/logitech/ur_eventqueue.py b/lib/logitech/ur_eventqueue.py deleted file mode 100644 index 1628d820..00000000 --- a/lib/logitech/ur_eventqueue.py +++ /dev/null @@ -1,363 +0,0 @@ -"""Low-level interface for devices connected through a Logitech Universal -Receiver (UR). - -Uses the HID api exposed through hidapi.py. -Incomplete. Based on a bit of documentation, trial-and-error, and guesswork. - -In the context of this API, 'device' is the number (1..6 according to the -documentation) of the device attached to the UR. - -References: -http://julien.danjou.info/blog/2012/logitech-k750-linux-support -http://6xq.net/git/lars/lshidpp.git/plain/doc/ -""" - - -# -# Logging set-up. -# Add a new logging level for tracing low-level writes and reads. -# - -import logging -import threading - -from . import hidapi - - -LOG_LEVEL = 1 - -def _urll_trace(self, msg, *args): - if self.isEnabledFor(LOG_LEVEL): - args = (None if x is None - else x.encode('hex') if type(x) == str and any(c < '\x20' or c > '\x7E' for c in x) - else x - for x in args) - self.log(LOG_LEVEL, msg, *args) - -logging.addLevelName(LOG_LEVEL, 'trace1') -logging.Logger.trace1 = _urll_trace -_log = logging.getLogger('logitech.ur_lowlevel') -_log.setLevel(LOG_LEVEL) - - -# -# -# - - -"""Possible features available on a Logitech device. - -A particular device might not support all these features, and may support other -unknown features as well. -""" -FEATURE = type('FEATURE', (), - dict( - ROOT=b'\x00\x00', - FEATURE_SET=b'\x00\x01', - FIRMWARE=b'\x00\x03', - NAME=b'\x00\x05', - BATTERY=b'\x10\x00', - REPROGRAMMABLE_KEYS=b'\x1B\x00', - WIRELESS_STATUS=b'\x1D\x4B', - # UNKNOWN_1=b'\x1D\xF3', - # UNKNOWN_2=b'\x40\xA0', - # UNKNOWN_3=b'\x41\x00', - SOLAR_CHARGE=b'\x43\x01', - # UNKNOWN_4=b'\x45\x20', - )) - - -"""Possible types of devices connected to an UR.""" -DEVICE_TYPES = ("Keyboard", "Remote Control", "NUMPAD", "Mouse", - "Touchpad", "Trackball", "Presenter", "Receiver") - - -"""Default timeout on read (in ms).""" -DEFAULT_TIMEOUT = 1000 - - -"""Minimum size of a reply data packet.""" -_MIN_REPLY_SIZE = 7 - - -"""Maximum size of a reply data packet.""" -_MAX_REPLY_SIZE = 32 - -class NoReceiver(Exception): - """May be raised when trying to talk through a previously connected - receiver that is no longer available.""" - pass - - -# -# -# - - -class Receiver(threading.Thread): - def __init__(self, handle, path, timeout=DEFAULT_TIMEOUT): - super(Receiver, self).__init__(name='Unifying_Receiver_' + path) - self.handle = handle - self.path = path - self.timeout = timeout - - self.read_data = None - self.data_available = threading.Event() - - self.devices = {} - self.hooks = {} - - self.active = True - self.start() - - def __del__(self): - self.close() - - def close(self): - self.active = False - - try: - hidapi.close(self.handle) - _log.trace1("|%s:| closed", self.path) - return True - except Exception as e: - _log.warn("|%s:| closing: %s", self.path, e) - - self.hooks = None - self.devices = None - - def run(self): - while self.active: - data = hidapi.read(self.handle, _MAX_REPLY_SIZE, self.timeout) - if self.active and data: - _log.trace1("|%s|*| => r[%s]", self.path, data) - if len(data) < _MIN_REPLY_SIZE: - _log.trace1("|%s|*| => r[%s] short read", self.path, data) - if len(data) > _MAX_REPLY_SIZE: - _log.trace1("|%s|*| => r[%s] long read", self.path, data) - if not self._dispatch_to_hooks(data): - self.read_data = data - self.data_available.set() - - def _dispatch_to_hooks(self, data): - if data[0] == b'\x11': - for key in self.hooks: - if key == data[1:3]: - self.hooks[key].__call__(data[3:]) - return True - - def set_hook(self, device, feature_index, function=b'\x00', callback=None): - key = '%c%s%c' % (device, feature_index, function) - if callback is None: - if key in self.hooks: - del self.hooks[key] - else: - self.hooks[key] = callback - return True - - def _write(self, device, data): - wdata = b'\x10' + chr(device) + data + b'\x00' * (5 - len(data)) - if hidapi.write(self.handle, wdata): - _log.trace1("|%s|%d| <= w[%s]", self.path, device, wdata) - return True - else: - _log.trace1("|%s|%d| <= w[%s] failed ", self.path, device, wdata) - raise NoReceiver() - - def _read(self, device, feature_index=None, function=None, timeout=DEFAULT_TIMEOUT): - while True: - self.data_available.wait() - data = self.data - self.data_available.clear() - - if data[1] == chr(device): - if feature_index is None or data[2] == feature_index: - if function is None or data[3] == function: - return data - - _log.trace1("|%s:| ignoring read data [%s]", self.path, data) - - def _request(self, device, feature_index, function=b'\x00', data=b''): - self._write(device, feature_index + function + data) - return self._read(device, feature_index, function) - - def _request_direct(self, device, feature_index, function=b'\x00', data=b''): - self._write(device, feature_index + function + data) - while True: - data = hidapi.read(self.handle, _MAX_REPLY_SIZE, self.timeout) - - if not data: - continue - - if data[1] == chr(device) and data[2] == feature_index and data[3] == function: - return data - - - def ping(self, device): - """Pings a device to check if it is attached to the UR. - - :returns: True if the device is connected to the UR, False if the device is - not attached, None if no conclusive reply is received. - """ - if self._write(device, b'\x00\x10\x00\x00\xAA'): - while True: - reply = self._read(device, timeout=DEFAULT_TIMEOUT*3) - - # ping ok - if reply[0] == b'\0x11' and reply[1] == chr(device): - if reply[2:4] == b'\x00\x10' and reply[6] == b'\xAA': - _log.trace1("|%s|%d| ping: ok %s", self.path, device, reply[2]) - return True - - # ping failed - if reply[0] == b'\0x10' and reply[1] == chr(device): - if reply[2:4] == b'\x8F\x00': - _log.trace1("|%s|%d| ping: device not present", self.path, device) - return False - - _log.trace1("|%s|%d| ping: unknown reply", self.path, device, reply) - - def scan_devices(self): - for device in range(1, 7): - self.get_device(device) - - return self.devices.values() - - def get_device(self, device, query=True): - if device in self.devices: - value = self.devices[device] - _log.trace1("|%s:%d| device info %s", self.path, device, value) - return value - - if query and self.ping(device): - d_type = self.get_type(device) - d_name = self.get_name(device) - features_array = self._get_features(device) - value = (d_type, d_name, features_array) - self.devices[device] = value - _log.trace1("|%s:%d| device info %s", self.path, device, value) - return value - - _log.trace1("|%s:%d| device not found", self.path, device) - - def _get_feature_index(self, device, feature): - """Reads the index of a device's feature. - - :returns: An int, or None if the feature is not available. - """ - _log.trace1("|%s|%d| get feature index <%s>", self.path, device, feature) - reply = self._request(device, b'\x00', b'\x00', feature) - # only consider active and supported features - if ord(reply[4]) and ord(reply[5]) & 0xA0 == 0: - _log.trace1("|%s|%d| feature <%s> has index %s", self.path, device, feature, reply[4]) - return ord(reply[4]) - - _log.trace1("|%s|%d| feature <%s> not available", self.path, device, feature) - - def _get_features(self, device): - """Returns an array of feature ids. - - Their position in the array is the index to be used when accessing that - feature on the device. - - Only call this function in the initial set-up of the device, because - other messages and events not related to querying the feature set - will be ignored. - """ - _log.trace1("|%s|%d| get device features", self.path, device) - - # get the index of the FEATURE_SET - fs_index = self._get_feature_index(device, FEATURE.FEATURE_SET) - fs_index = chr(fs_index) - - # Query all the available features on the device, even if unknown. - - # get the number of active features the device has - features_count = self._request(device, fs_index) - features_count = ord(features_count[4]) - _log.trace1("|%s|%d| found %d features", self.path, device, features_count) - - # a device may have a maximum of 15 features - features = [None] * 0x10 - for index in range(1, 1 + features_count): - # for each index, get the feature residing at that index - feature = self._request(device, fs_index, b'\x10', chr(index)) - features[index] = feature[4:6].upper() - _log.trace1("|%s|%d| feature <%s> at index %d", self.path, device, features[index], index) - - return None if all(c is None for c in features) else features - - def get_type(self, device): - if device in self.devices: - return self.devices[device][0] - - dnt_index = self._get_feature_index(device, FEATURE.NAME) - dnt_index = chr(dnt_index) - d_type = self._request(device, dnt_index, b'\x20') - d_type = ord(d_type[4]) - return DEVICE_TYPES[d_type] - - def get_name(self, device): - if device in self.devices: - return self.devices[device][1] - - dnt_index = self._get_feature_index(device, FEATURE.NAME) - dnt_index = chr(dnt_index) - self._write(device, dnt_index) - name_length = self._read(device, dnt_index, b'\x00') - name_length = ord(name_length[4]) - - d_name = '' - while len(d_name) < name_length: - name_index = len(d_name) - name_fragment = self._request(device, dnt_index, b'\x10', chr(name_index)) - name_fragment = name_fragment[:name_length - len(d_name)] - d_name += name_fragment - - return d_name - - - -def open(): - """Opens the first Logitech UR found attached to the machine. - - :returns: A Receiver object for the found receiver, or ``None``. - """ - # USB ids for (Logitech, Unifying Receiver) - # interface 2 if the actual receiver interface - for rawdevice in hidapi.enumerate(0x046d, 0xc52b, 2): - - _log.trace1("checking %s", rawdevice) - receiver = hidapi.open_path(rawdevice.path) - if not receiver: - # could be a file permissions issue - # in any case, unreachable - _log.trace1("[%s] open failed", rawdevice.path) - continue - - _log.trace1("[%s] receiver handle %d", rawdevice.path, receiver) - # ping on device id 0 (always an error) - hidapi.write(receiver, b'\x10\x00\x00\x10\x00\x00\xAA') - - # if this is the right hidraw device, we'll receive a 'bad subdevice' - # otherwise, the read should produce nothing - reply = hidapi.read(receiver, _MAX_REPLY_SIZE, DEFAULT_TIMEOUT) - if reply: - _log.trace1("[%s] receiver %d exploratory ping reply [%s]", rawdevice.path, receiver, reply) - - if reply[:4] == b'\x10\x00\x8F\x00': - # 'device 0 unreachable' is the expected reply from a valid receiver handle - _log.trace1("[%s] success: found receiver with handle %d", rawdevice.path, receiver) - return Receiver(receiver, rawdevice.path) - - if reply == b'\x01\x00\x00\x00\x00\x00\x00\x00': - # no idea what this is, but it comes up occasionally - _log.trace1("[%s] receiver %d mistery reply", rawdevice.path, receiver) - else: - _log.trace1("[%s] receiver %d unknown reply", rawdevice.path, receiver) - else: - _log.trace1("[%s] receiver %d no reply", rawdevice.path, receiver) - pass - - # ignore - hidapi.close(receiver) diff --git a/lib/logitech/ur_queue2.py b/lib/logitech/ur_queue2.py deleted file mode 100644 index 62f26fbe..00000000 --- a/lib/logitech/ur_queue2.py +++ /dev/null @@ -1,216 +0,0 @@ -"""A few functions to deal with the Logitech Universal Receiver. - -It is assumed a single UR device is attached to the machine. - -Uses hidapi. -""" - -import logging -import threading - -from . import ur_lowlevel as urll -from urll import FEATURE - - -_log = logging.getLogger('logitech.ur') -_log.setLevel(logging.DEBUG) - - -# class NoDevice(Exception): -# """May be thrown when trying to talk through a previously present device -# that is no longer available.""" -# pass - - -class _EventQueue(threading.Thread): - def __init__(self, receiver, timeout=urll.DEFAULT_TIMEOUT): - super(_EventQueue, self).__init__() - self.daemon = True - self.receiver = receiver - self.timeout = timeout - self.active = True - - def stop(self): - self.active = False - self.join() - - def run(self): - while self.active: - data = urll.read(self.receiver.handle, self.timeout) - if not self.active: - # in case the queue has been stopped while reading - break - if data: - self.receiver._dispatch(*data) - - -class Receiver: - def __init__(self, path, handle=None, timeout=urll.DEFAULT_TIMEOUT): - self.path = path - self.handle = handle - - self.DEVICE_FEATURES = {} - self.hooks = {} - - self.event_queue = _EventQueue(self.handle, timeout) - self.event_queue.start() - - def close(self): - self.event_queue.stop() - self.event_queue = None - - urll.close(self.handle) - self.handle = None - - self.hooks = {} - self.DEVICE_FEATURES = {} - - def ping(self, device): - reply = self.event_queue.req() - if not urll.write(self.handle, device, '\x00\x10\x00\x00\xAA'): - # print "write failed", - return False - - reply = urll.read(self.handle, device) - if not reply: - # print "no data", - return False - - # 10018f00100900 - if ord(reply[0]) == 0x10: - if ord(reply[2]) == 0x8F: - # print "invalid", - return False - - # 110100100200aa00000000000000000000000000 - if ord(reply[0]) == 0x11: - if reply[2:4] == "\x00\x10" and reply[6] == "\xAA": - # success - return True - - # print "unknown" - return False - - def hook(self, device, feature, function=None, callback=None): - features = self.DEVICE_FEATURES[device] - if feature not in features: - raise Exception("feature " + feature + " not supported by device") - - feature_index = features.index(feature) - key = (device, feature_index, function, callback) - if key not in self.hooks: - self.hooks[key] = [] - if callback is None: - if callback in self.hooks[key]: - self.hooks[key].remove(callback) - else: - self.hooks[key].append(callback) - - def _dispatch(self, status, device, data): - _log.debug("incoming event %2x:%2x:%s", status, device, data.encode('hex')) - dispatched = False - for (key, callback) in self.hooks.items(): - if key[0] == device and key[1] == ord(data[0]): - if key[2] is not None and key[2] == data[1] & 0xFF: - callback.__call__(data) - - if not dispatched: - _log.debug("ignored incoming event %2x:%2x:%s", - status, device, data.encode('hex')) - - def _request(self, device, data=''): - if urll.write(self.handler, device, data): - pass - - def find_device(self, device_type=None, name=None): - """Gets the device number for the first device matching. - - The device type and name are case-insensitive. - """ - # Apparently a receiver supports up to 6 devices. - for device in range(1, 7): - if self.ping(device): - if device not in self.DEVICE_FEATURES: - self.DEVICE_FEATURES[device] = \ - urll.get_device_features(self.handle, device) - # print get_reprogrammable_keys(receiver, device) - # d_firmware = get_firmware_version(receiver, device) - # print "device", device, "[", d_name, "/", d_type, "]" - # print "firmware", d_firmware, "features", _DEVICE_FEATURES[device] - if device_type: - d_type = self.get_type(device) - if d_type is None or device_type.lower() != d_type.lower(): - continue - if name: - d_name = self.get_name(device) - if d_name is None or name.lower() != d_name.lower(): - continue - return device - - def get_type(self, device): - reply = self._request(device, FEATURE.GET_NAME, '\x20') - if reply: - return DEVICE_TYPES[ord(reply[2][2])] - - def get_name(self, device): - reply = self._request(device, FEATURE.GET_NAME) - if reply: - charcount = ord(reply[4]) - name = '' - index = 0 - while len(name) < charcount: - reply = self._request(device, FEATURE.NAME, '\x10', chr(index)) - if reply: - name += reply[4:4 + charcount - index] - index = len(name) - else: - break - return name - - def get_firmware_version(self, device, firmware_type=0): - reply = self._request(device, - FEATURE.FIRMWARE, '\x10', chr(firmware_type)) - if reply: - return '%s %s.%s' % (reply[5:8], - reply[8:10].encode('hex'), reply[10:12].encode('hex')) - - def get_battery_level(self, device): - reply = self._request(device, FEATURE.BATTERY) - if reply: - return (ord(reply[4]), ord(reply[5]), ord(reply[6])) - - def get_reprogrammable_keys(self, device): - count = self._request(device, FEATURE.REPROGRAMMABLE_KEYS) - if count: - keys = [] - for index in range(ord(count[4])): - key = self._request(device, - FEATURE.REPROGRAMMABLE_KEYS, '\x10', chr(index)) - keys.append(key[4:6], keys[6:8], ord(key[8])) - return keys - - def get_solar_charge(self, device): - reply = self._request(device, FEATURE.SOLAR_CHARGE, - '\x03', '\x78', '\x01', reply_function='\x10') - if reply: - charge = ord(reply[4]) - lux = ord(reply[5]) << 8 | ord(reply[6]) - # lux = int(round(((255 * ord(reply[5])) + ord(reply[6])) / 538.0, 2) * 100) - return (charge, lux) - - -# -# -# - -def get(): - """Gets a Receiver object for the Unifying Receiver connected to the machine. - - It is assumed a single receiver is connected to the machine. If more than - one are present, the first one found will be returned. - - :returns: a Receiver object, or None. - """ - receiver = urll.open() - if receiver: - return Receiver(*receiver) diff --git a/lib/logitech/ur_test3.py b/lib/logitech/ur_test3.py deleted file mode 100644 index fcedd278..00000000 --- a/lib/logitech/ur_test3.py +++ /dev/null @@ -1,533 +0,0 @@ -"""Low-level interface for devices connected through a Logitech Universal -Receiver (UR). - -Uses the HID api exposed through hidapi.py. -Incomplete. Based on a bit of documentation, trial-and-error, and guesswork. - -Strongly recommended to use these functions from a single thread; calling -multiple functions from different threads has a high chance of mixing the -replies and causing apparent failures. - -In the context of this API, 'handle' is the open handle of UR attached to -the machine, and 'device' is the number (1..6 according to the documentation) -of the device attached to the UR. - -References: -http://julien.danjou.info/blog/2012/logitech-k750-linux-support -http://6xq.net/git/lars/lshidpp.git/plain/doc/ -""" - - -# -# Logging set-up. -# Add a new logging level for tracing low-level writes and reads. -# - -import logging - -LOG_LEVEL = 1 - -def _urll_trace(self, msg, *args): - if self.isEnabledFor(LOG_LEVEL): - args = (None if x is None - else x.encode('hex') if type(x) == str and any(c < '\x20' or c > '\x7E' for c in x) - else x - for x in args) - self.log(LOG_LEVEL, msg, *args) - -logging.addLevelName(LOG_LEVEL, 'TRACE1') -logging.Logger.trace1 = _urll_trace -_l = logging.getLogger('ur_lowlevel') -_l.setLevel(LOG_LEVEL) - - -# -# -# - - -"""Possible features available on a Logitech device. - -A particular device might not support all these features, and may support other -unknown features as well. -""" -FEATURE = type('FEATURE', (), - dict( - ROOT=b'\x00\x00', - FEATURE_SET=b'\x00\x01', - FIRMWARE=b'\x00\x03', - NAME=b'\x00\x05', - BATTERY=b'\x10\x00', - REPROGRAMMABLE_KEYS=b'\x1B\x00', - WIRELESS_STATUS=b'\x1D\x4B', - # UNKNOWN_1=b'\x1D\xF3', - # UNKNOWN_2=b'\x40\xA0', - # UNKNOWN_3=b'\x41\x00', - SOLAR_CHARGE=b'\x43\x01', - # UNKNOWN_4=b'\x45\x20', - )) -FEATURE_NAMES = { FEATURE.ROOT: 'ROOT', - FEATURE.FEATURE_SET: 'FEATURE_SET', - FEATURE.FIRMWARE: 'FIRMWARE', - FEATURE.NAME: 'NAME', - FEATURE.BATTERY: 'BATTERY', - FEATURE.REPROGRAMMABLE_KEYS: 'REPROGRAMMABLE_KEYS', - FEATURE.WIRELESS_STATUS: 'WIRELESS_STATUS', - FEATURE.SOLAR_CHARGE: 'SOLAR_CHARGE', - } - - -"""Possible types of devices connected to an UR.""" -DEVICE_TYPES = ("Keyboard", "Remote Control", "NUMPAD", "Mouse", - "Touchpad", "Trackball", "Presenter", "Receiver") - - -FIRMWARE_TYPES = ("Main (HID)", "Bootloader", "Hardware", "Other") - -BATTERY_STATUSES = ("Discharging (in use)", "Recharging", "Almost full", "Full", - "Slow recharge", "Invalid battery", "Thermal error", - "Charging error") - -ERROR_CODES = ("Ok", "Unknown", "Invalid argument", "Out of range", - "Hardware error", "Logitech internal", "Invalid feature index", - "Invalid function", "Busy", "Usupported") - -"""Default timeout on read (in ms).""" -DEFAULT_TIMEOUT = 1000 - - -"""Minimum size of a reply data packet.""" -_MIN_REPLY_SIZE = 7 - - -"""Maximum size of a reply data packet.""" -_MAX_REPLY_SIZE = 32 - - -# -# Exceptions that may be raised by this API. -# - -class NoReceiver(Exception): - """May be raised when trying to talk through a previously connected - receiver that is no longer available.""" - pass - -class FeatureNotSupported(Exception): - """Raised when trying to request a feature not supported by the device.""" - def __init__(self, device, feature): - super(FeatureNotSupported, self).__init__(device, feature, FEATURE_NAMES[feature]) - self.device = device - self.feature = feature - self.feature_name = FEATURE_NAMES[feature] - - -# -# -# - - -def _default_event_hook(reply_code, device, data): - _l.trace1("EVENT_HOOK (,%d) code %d status [%s]", device, reply_code, data) - - -"""A function that will be called on incoming events. - -It must be a function with the signature: ``_(int, int, str)``, where the -parameters are: (reply code, device number, data). - -This function will be called by the request() function, when it receives replies -that do not match the write ca -""" -event_hook = _default_event_hook - -def _publish_event(reply_code, device, data): - if event_hook is not None: - event_hook.__call__(reply_code, device, data) - - -# -# Low-level functions. -# - - -from . import hidapi - - -def open(): - """Opens the first Logitech UR found attached to the machine. - - :returns: An open file handle for the found receiver, or ``None``. - """ - # USB ids for (Logitech, Unifying Receiver) - # interface 2 if the actual receiver interface - for rawdevice in hidapi.enumerate(0x046d, 0xc52b, 2): - - _l.trace1("checking %s", rawdevice) - receiver = hidapi.open_path(rawdevice.path) - if not receiver: - # could be a file permissions issue - # in any case, unreachable - _l.trace1("[%s] open failed", rawdevice.path) - continue - - _l.trace1("[%s] receiver handle (%d,)", rawdevice.path, receiver) - # ping on device id 0 (always an error) - hidapi.write(receiver, b'\x10\x00\x00\x10\x00\x00\xAA') - - # if this is the right hidraw device, we'll receive a 'bad subdevice' - # otherwise, the read should produce nothing - reply = hidapi.read(receiver, _MAX_REPLY_SIZE, DEFAULT_TIMEOUT) - if reply: - if reply[:4] == b'\x10\x00\x8F\x00': - # 'device 0 unreachable' is the expected reply from a valid receiver handle - _l.trace1("[%s] success: handle (%d,)", rawdevice.path, receiver) - return receiver - - if reply == b'\x01\x00\x00\x00\x00\x00\x00\x00': - # no idea what this is, but it comes up occasionally - _l.trace1("[%s] (%d,) mistery reply [%s]", rawdevice.path, receiver, reply) - else: - _l.trace1("[%s] (%d,) unknown reply [%s]", rawdevice.path, receiver, reply) - else: - _l.trace1("[%s] (%d,) no reply", rawdevice.path, receiver) - - # ignore - close(receiver) - # hidapi.close(receiver) - - return None - - -def close(handle): - """Closes a HID device handle.""" - if handle: - try: - hidapi.close(handle) - _l.trace1("(%d,) closed", handle) - return True - except Exception as e: - _l.debug("(%d,) closing: %s", handle, e) - - return False - - -def write(handle, device, feature_index, function=b'\x00', param1=b'\x00', param2=b'\x00', param3=b'\x00'): - """Write a feature call to the receiver. - - :param handle: UR handle obtained with open(). - :param device: attached device number - :param feature_index: index in the - """ - if type(feature_index) == int: - feature_index = chr(feature_index) - data = feature_index + function + param1 + param2 + param3 - return _write(handle, device, data) - - -def _write(handle, device, data): - """Writes some data to a certain device. - - The first two (required) bytes of data must be the feature index for the - device, and a function code for that feature. - - If the receiver is no longer available (e.g. has been physically removed - from the machine), raises NoReceiver. - """ - wdata = b'\x10' + chr(device) + data + b'\x00' * (5 - len(data)) - _l.trace1("(%d,%d) <= w[%s]", handle, device, wdata) - # return hidapi.write(handle, wdata) - if not hidapi.write(handle, wdata): - _l.trace1("(%d,%d) write failed, assuming receiver has been removed", handle, device) - raise NoReceiver() - - -def read(handle, timeout=DEFAULT_TIMEOUT): - """Read some data from the receiver. Usually called after a write (feature - call), to get the reply. - - If any data was read in the given timeout, returns a tuple of - (reply_code, device, message data). The reply code should be ``0x11`` for a - successful feature call, or ``0x10`` to indicate some error, e.g. the device - is no longer available. - """ - data = hidapi.read(handle, _MAX_REPLY_SIZE, timeout) - if data: - _l.trace1("(%d,*) => r[%s]", handle, data) - if len(data) < _MIN_REPLY_SIZE: - _l.trace1("(%d,*) => r[%s] read short reply", handle, data) - if len(data) > _MAX_REPLY_SIZE: - _l.trace1("(%d,*) => r[%s] read long reply", handle, data) - return ord(data[0]), ord(data[1]), data[2:] - else: - _l.trace1("(%d,*) => r[]", handle) - - -def request(handle, device, feature, function=b'\x00', data=b'', features_array=None): - """Makes a feature call to the device, and returns the reply data. - - Basically a write() followed by (possibly multiple) reads, until a reply - matching the called feature is received. In theory the UR will always reply - to feature call; otherwise this function will wait indefinetly. - - Incoming data packets not matching the feature and function will be - delivered to the event_hook (if any), and then ignored. - - The optional ``features_array`` parameter is a cached result of the - get_device_features function for this device, necessary to find the feature - index. If the ``features_arrary`` is not provided, one will be obtained by - calling get_device_features. - - If the feature is not supported, returns None. - """ - if features_array is None: - features_array = get_device_features(handle, device) - if features_array is None: - _l.trace1("(%d,%d) no features array available", handle, device) - return None - - if feature in features_array: - feature_index = chr(features_array.index(feature)) - return _request(handle, device, feature_index + function, data) - else: - _l.warn("(%d,%d) feature <%s:%s> not supported", handle, device, feature.encode('hex'), FEATURE_NAMES[feature]) - raise FeatureNotSupported(device, feature) - - - -def _request(handle, device, feature_function, data=b''): - """Makes a feature call device and waits for a matching reply. - - Only call this in the initial set-up of the device. - - This function will skip all incoming messages and events not related to the - device we're requesting for, or the feature specified in the initial - request; it will also wait for a matching reply indefinetly. - - :param feature_function: a two-byte string of (feature_index, function). - :param data: additional data to send, up to 5 bytes. - :returns: - """ - _l.trace1("(%d,%d) request feature %s data %s", handle, device, feature_function, data) - _write(handle, device, feature_function + data) - while True: - reply = read(handle) - - if not reply: - # keep waiting... - continue - - if reply[1] != device: - # this message not for the device we're interested in - _l.trace1("(%d,%d) request reply for unexpected device %s", handle, device, reply) - _publish_event(*reply) - continue - - if reply[0] == 0x10 and reply[2][0] == b'\x8F': - # device not present - _l.trace1("(%d,%d) request ping failed %s", handle, device, reply) - return None - - if reply[0] == 0x11 and reply[2][0] == b'\xFF' and reply[2][1:3] == feature_function: - # an error returned from the device - error = ord(reply[2][3]) - _l.trace1("(%d,%d) request feature call error %d = %s: %s", handle, device, error, ERROR_CODES[error], reply) - return None - - if reply[0] == 0x11 and reply[2][:2] == feature_function: - # a matching reply - _l.trace1("(%d,%d) matched reply with data [%s]", handle, device, reply[2][2:]) - return reply[2][2:] - - _l.trace1("(%d,%d) unmatched reply %s (expected %s)", handle, device, reply[2][:2], feature_function) - _publish_event(*reply) - - -def ping(handle, device): - """Pings a device to check if it is attached to the UR. - - :returns: True if the device is connected to the UR, False if the device is - not attached, None if no conclusive reply is received. - """ - def _status(reply): - if not reply: - return None - - if reply[1] != device: - # oops - _l.trace1("(%d,%d) ping: reply for another device: %s", handle, device, reply) - _publish_event(*reply) - return _status(read(handle)) - - if (reply[0] == 0x11 and reply[2][:2] == b'\x00\x10' and reply[2][4] == b'\xAA'): - # ping ok - _l.trace1("(%d,%d) ping: ok %s", handle, device, reply[2]) - return True - - if (reply[0] == 0x10 and reply[2][:2] == b'\x8F\x00'): - # ping failed - _l.trace1("(%d,%d) ping: device not present", handle, device) - return False - - if (reply[0] == 0x11 and reply[2][:2] == b'\x09\x00' and reply[2][7:11] == b'GOOD'): - # some devices may reply with a SOLAR_STATUS packet before the - # ping_ok reply, especially right after the device connected to the - # receiver - _l.trace1("(%d,%d) ping: solar status %s", handle, device, reply[2]) - _publish_event(*reply) - return _status(read(handle)) - - # ugh - _l.trace1("(%d,%d) ping: unknown reply for this device", handle, device, reply) - _publish_event(*reply) - return None - - _l.trace1("(%d,%d) pinging", handle, device) - _write(handle, device, b'\x00\x10\x00\x00\xAA') - return _status(read(handle, DEFAULT_TIMEOUT * 3)) - - -def get_feature_index(handle, device, feature): - """Reads the index of a device's feature. - - :returns: An int, or ``None`` if the feature is not available. - """ - _l.trace1("(%d,%d) get feature index <%s>", handle, device, feature.encode('hex')) - feature_index = _request(handle, device, FEATURE.ROOT, feature) - if feature_index: - # only consider active and supported features - if ord(feature_index[0]) and ord(feature_index[1]) & 0xA0 == 0: - _l.trace1("(%d,%d) feature <%s> index %s", handle, device, feature.encode('hex'), feature_index[0]) - return ord(feature_index[0]) - - _l.warn("(%d,%d) feature <%s:%s> not supported", handle, device, feature.encode('hex'), FEATURE_NAMES[feature]) - raise FeatureNotSupported(device, feature) - - -def get_device_features(handle, device): - """Returns an array of feature ids. - - Their position in the array is the index to be used when accessing that - feature on the device. - - Only call this function in the initial set-up of the device, because - other messages and events not related to querying the feature set - will be ignored. - """ - _l.trace1("(%d,%d) get device features", handle, device) - - # get the index of the FEATURE_SET - fs_index = _request(handle, device, FEATURE.ROOT, FEATURE.FEATURE_SET) - if not fs_index: - _l.trace1("(%d,%d) FEATURE_SET not available", handle, device) - return None - fs_index = fs_index[0] - - # For debugging purposes, query all the available features on the device, - # even if unknown. - - # get the number of active features the device has - features_count = _request(handle, device, fs_index + b'\x00') - if not features_count: - # this can happen if the device disappeard since the fs_index call - _l.trace1("(%d,%d) no features available?!", handle, device) - return None - features_count = ord(features_count[0]) - - # a device may have a maximum of 15 features - features = [None] * 0x10 - _l.trace1("(%d,%d) found %d features", handle, device, features_count) - - for index in range(1, 1 + features_count): - # for each index, get the feature residing at that index - feature = _request(handle, device, fs_index + b'\x10', chr(index)) - if feature: - features[index] = feature[0:2].upper() - _l.trace1("(%d,%d) feature <%s> at index %d", handle, device, features[index].encode('hex'), index) - - return None if all(c == None for c in features) else features - - -def get_device_firmware(handle, device, features_array=None): - """Reads a device's firmware info. - - Returns an list of tuples [ (firmware_type, firmware_version, ...), ... ], - ordered by firmware layer. - """ - fw_count = request(handle, device, FEATURE.FIRMWARE, features_array=features_array) - if fw_count: - fw_count = ord(fw_count[0]) - - fw = [] - for index in range(0, fw_count): - fw_info = request(handle, device, FEATURE.FIRMWARE, function=b'\x10', data=chr(index), features_array=features_array) - if fw_info: - fw_type = ord(fw_info[0]) & 0x0F - if fw_type == 0 or fw_type == 1: - prefix = str(fw_info[1:4]) - version = ( str((ord(fw_info[4]) & 0xF0) >> 4) + - str(ord(fw_info[4]) & 0x0F) + - '.' + - str((ord(fw_info[5]) & 0xF0) >> 4) + - str(ord(fw_info[5]) & 0x0F)) - name = prefix + ' ' + version - build = 256 * ord(fw_info[6]) + ord(fw_info[7]) - if build: - name += ' b' + str(build) - extras = fw_info[9:].rstrip('\x00') - _l.trace1("(%d:%d) firmware %d = %s %s extras=%s", handle, device, fw_type, FIRMWARE_TYPES[fw_type], name, extras.encode('hex')) - fw.append((fw_type, name, build, extras)) - elif fw_type == 2: - version = ord(fw_info[1]) - _l.trace1("(%d:%d) firmware 2 = Hardware v%x", handle, device, version) - fw.append((2, version)) - else: - _l.trace1("(%d:%d) firmware other", handle, device) - fw.append((fw_type, )) - return fw - - -def get_device_type(handle, device, features_array=None): - """Reads a device's type. - - :see DEVICE_TYPES: - :returns: a string describing the device type, or ``None`` if the device is - not available or does not support the ``NAME`` feature. - """ - d_type = request(handle, device, FEATURE.NAME, function=b'\x20', features_array=features_array) - if d_type: - d_type = ord(d_type[0]) - _l.trace1("(%d,%d) device type %d = %s", handle, device, d_type, DEVICE_TYPES[d_type]) - return DEVICE_TYPES[d_type] - - -def get_device_name(handle, device, features_array=None): - """Reads a device's name. - - :returns: a string with the device name, or ``None`` if the device is not - available or does not support the ``NAME`` feature. - """ - name_length = request(handle, device, FEATURE.NAME, features_array=features_array) - if name_length: - name_length = ord(name_length[0]) - - d_name = '' - while len(d_name) < name_length: - name_index = len(d_name) - name_fragment = request(handle, device, FEATURE.NAME, function=b'\x10', data=chr(name_index), features_array=features_array) - name_fragment = name_fragment[:name_length - len(d_name)] - d_name += name_fragment - - _l.trace1("(%d,%d) device name %s", handle, device, d_name) - return d_name - -def get_device_battery_level(handle, device, features_array=None): - """Reads a device's battery level. - """ - battery = request(handle, device, FEATURE.BATTERY, features_array=features_array) - if battery: - discharge = ord(battery[0]) - dischargeNext = ord(battery[1]) - status = ord(battery[2]) - _l.trace1("(%d:%d) battery %d%% charged, next level %d%% charge, status %d = %s", discharge, dischargeNext, status, BATTERY_STATUSES[status]) - return (discharge, dischargeNext, status) diff --git a/lib/unittest.sh b/lib/unittest.sh index 3f582e3d..1f633540 100755 --- a/lib/unittest.sh +++ b/lib/unittest.sh @@ -3,4 +3,4 @@ cd `dirname "$0"` export LD_LIBRARY_PATH=$PWD -exec python -Qnew -m unittest discover -v "$@" +exec python -m unittest discover -v "$@" diff --git a/solaar.py b/solaar.py index 558c70b1..fc44d7b3 100644 --- a/solaar.py +++ b/solaar.py @@ -17,6 +17,7 @@ from logitech.devices import * # A few constants # + APP_TITLE = 'Solaar' UNIFYING_RECEIVER = 'Unifying Receiver' NO_DEVICES = 'No devices attached.' @@ -158,15 +159,15 @@ class StatusThread(threading.Thread): else: logging.warn("unknown event code %02x", code) elif device: - logging.debug("got event (%d, %d, %s) for new device", code, device, data.encode('hex')) + logging.debug("got event (%d, %d, %s) for new device", code, device, data) devinfo = ur.get_device_info(self.listener.receiver, device) if devinfo: self.devices[device] = devinfo self.statuses[device] = [0, None, None] else: - logging.warn("got event (%d, %d, %s) for unknown device", code, device, data.encode('hex')) + logging.warn("got event (%d, %d, %s) for unknown device", code, device, data) else: - logging.warn("don't know how to handle event (%d, %d, %s)", code, device, data.encode('hex')) + logging.warn("don't know how to handle event (%d, %d, %s)", code, device, data) if updated: GObject.idle_add(self.update_status_icon) @@ -202,7 +203,7 @@ class StatusThread(threading.Thread): devinfo = self.devices[d] status_text = self.statuses[d][2] if status_text: - all_statuses.append(unichr(0x274a) + ' ' + devinfo.name + '\n\t' + status_text) + all_statuses.append(devinfo.name + '\n\t' + status_text) else: all_statuses.append(devinfo.name) @@ -218,7 +219,7 @@ class StatusThread(threading.Thread): if __name__ == '__main__': - logging.basicConfig(level=logging.DEBUG) + logging.basicConfig(level=6) logging.captureWarnings(True) status_icon = Gtk.StatusIcon.new_from_file('images/' + UNIFYING_RECEIVER + '.png')