diff --git a/logitech/__init__.py b/lib/logitech/__init__.py similarity index 100% rename from logitech/__init__.py rename to lib/logitech/__init__.py diff --git a/lib/logitech/devices/__init__.py b/lib/logitech/devices/__init__.py new file mode 100644 index 00000000..9f8d5287 --- /dev/null +++ b/lib/logitech/devices/__init__.py @@ -0,0 +1,24 @@ +# +# +# + +from . import k750 +from .constants import * + + +_REQUEST_STATUS_FUNCTIONS = { + k750.NAME : k750.request_status, + } + +def request_status(devinfo, listener): + if devinfo.name in _REQUEST_STATUS_FUNCTIONS: + return _REQUEST_STATUS_FUNCTIONS[devinfo.name](devinfo, listener) + + +_PROCESS_EVENT_FUNCTIONS = { + k750.NAME : k750.process_event, + } + +def process_event(devinfo, listener, data): + if devinfo.name in _PROCESS_EVENT_FUNCTIONS: + return _PROCESS_EVENT_FUNCTIONS[devinfo.name](devinfo, listener, data) diff --git a/lib/logitech/devices/constants.py b/lib/logitech/devices/constants.py new file mode 100644 index 00000000..67355626 --- /dev/null +++ b/lib/logitech/devices/constants.py @@ -0,0 +1,21 @@ +# +# +# + + +DEVICE_STATUS = type('DEVICE_STATUS', (), + dict( + UNKNOWN=None, + UNAVAILABLE=-1, + CONNECTED=0, + ACTIVE=1, + )) + + +from collections import defaultdict + +DEVICE_STATUS_NAME = defaultdict(lambda x: None) +DEVICE_STATUS_NAME[DEVICE_STATUS.UNAVAILABLE] = 'not available' +DEVICE_STATUS_NAME[DEVICE_STATUS.CONNECTED] = 'connected' + +del defaultdict diff --git a/lib/logitech/devices/k750.py b/lib/logitech/devices/k750.py new file mode 100644 index 00000000..2c2c50f4 --- /dev/null +++ b/lib/logitech/devices/k750.py @@ -0,0 +1,74 @@ +# +# Functions that are specific to the K750 solar keyboard. +# + +import logging + +from ..unifying_receiver import api as _api +from .constants import * + + +# +# +# + +NAME = 'Wireless Solar Keyboard K750' + +_STATUS_NAMES = ('excellent', 'good', 'okay', 'poor') + +_CHARGE_LIMITS = (75, 40, 20, -1) +_LIGHTING_LIMITS = (450, 310, 190, -1) + +# +# +# + + +def _trigger_solar_charge_events(receiver, devinfo): + return _api.request(receiver, devinfo.number, + feature=_api.FEATURE.SOLAR_CHARGE, function=b'\x03', params=b'\x78\x01', + features_array=devinfo.features_array) + + +def _charge_status(data): + charge = ord(data[2]) + lux = (ord(data[3]) << 8) + ord(data[4]) + + for i in range(0, len(_CHARGE_LIMITS)): + if charge >= _CHARGE_LIMITS[i]: + charge_index = i + break + + if lux == 0: + return 0x10 << charge_index, '\n\tCharge %d%% (%s)' % (charge, _STATUS_NAMES[charge_index]) + + for i in range(0, len(_CHARGE_LIMITS)): + if lux > _LIGHTING_LIMITS[i]: + lighting_index = i + break + + return 0x10 << charge_index, '\n\tCharge %d%% (%s), Lighting %s (%d lux)' % ( + charge, _STATUS_NAMES[charge_index], _STATUS_NAMES[lighting_index], lux) + + +def request_status(devinfo, listener): + reply = listener.request(_trigger_solar_charge_events, devinfo) + if reply is None: + return DEVICE_STATUS.UNAVAILABLE + + +def process_event(devinfo, listener, data): + if data[:2] == b'\x05\x00': + # wireless device status + if data[2:5] == b'\x01\x01\x01': + logging.debug("Keyboard just started") + return DEVICE_STATUS.CONNECTED + elif data[:2] == b'\x09\x00' and data[7:11] == b'GOOD': + return _charge_status(data) + elif data[:2] == b'\x09\x10' and data[7:11] == b'GOOD': + return _charge_status(data) + elif data[:2] == b'\x09\x20': + logging.debug("Solar key pressed") + if _trigger_solar_charge_events(listener.receiver, devinfo) is None: + return DEVICE_STATUS.UNAVAILABLE + return _charge_status(data) diff --git a/logitech/unifying_receiver/__init__.py b/lib/logitech/unifying_receiver/__init__.py similarity index 100% rename from logitech/unifying_receiver/__init__.py rename to lib/logitech/unifying_receiver/__init__.py diff --git a/logitech/unifying_receiver/api.py b/lib/logitech/unifying_receiver/api.py similarity index 84% rename from logitech/unifying_receiver/api.py rename to lib/logitech/unifying_receiver/api.py index 864161e9..fd37d053 100644 --- a/logitech/unifying_receiver/api.py +++ b/lib/logitech/unifying_receiver/api.py @@ -3,9 +3,6 @@ # import logging -_LOG_LEVEL = 5 -_l = logging.getLogger('logitech.unifying_receiver.api') -_l.setLevel(_LOG_LEVEL) from .constants import * from .exceptions import * @@ -13,20 +10,40 @@ from . import base from .unhandled import _publish as _unhandled_publish +_LOG_LEVEL = 5 +_l = logging.getLogger('logitech.unifying_receiver.api') + + # # # from collections import namedtuple + """Tuple returned by list_devices and find_device_by_name.""" AttachedDeviceInfo = namedtuple('AttachedDeviceInfo', [ 'number', 'type', 'name', + 'firmware', 'features_array']) + +"""Firmware information.""" +FirmwareInfo = namedtuple('FirmwareInfo', [ + 'level', + 'type', + 'name', + 'version', + 'build', + 'extras']) + +def _makeFirmwareInfo(level, type, name=None, version=None, build=None, extras=None): + return FirmwareInfo(level, type, name, version, build, extras) + del namedtuple + # # # @@ -142,17 +159,14 @@ def find_device_by_name(handle, device_name): :returns: an AttachedDeviceInfo tuple, or ``None``. """ - _l.log(_LOG_LEVEL, "(%d:,) searching for device '%s'", handle, device_name) + _l.log(_LOG_LEVEL, "(%d,) searching for device '%s'", handle, device_name) for device in range(1, 1 + base.MAX_ATTACHED_DEVICES): features_array = get_device_features(handle, device) if features_array: d_name = get_device_name(handle, device, features_array) if d_name == device_name: - d_type = get_device_type(handle, device, features_array) - device_info = AttachedDeviceInfo(device, d_type, d_name, features_array) - _l.log(_LOG_LEVEL, "(%d:,%d) found device %s", handle, device, device_info) - return device_info + return get_device_info(handle, device, device_name=d_name, features_array=features_array) def list_devices(handle): @@ -160,22 +174,36 @@ def list_devices(handle): :returns: a list of AttachedDeviceInfo tuples. """ - _l.log(_LOG_LEVEL, "(%d:,) listing all devices", handle) + _l.log(_LOG_LEVEL, "(%d,) listing all devices", handle) devices = [] for device in range(1, 1 + base.MAX_ATTACHED_DEVICES): features_array = get_device_features(handle, device) if features_array: - d_type = get_device_type(handle, device, features_array) - d_name = get_device_name(handle, device, features_array) - device_info = AttachedDeviceInfo(device, d_type, d_name, features_array) - _l.log(_LOG_LEVEL, "(%d:,%d) found device %s", handle, device, device_info) - devices.append(device_info) + devices.append(get_device_info(handle, device, features_array=features_array)) return devices +def get_device_info(handle, device, device_name=None, features_array=None): + """Gets the complete info for a device. + + :returns: an AttachedDeviceInfo tuple, or ``None``. + """ + if features_array is None: + features_array = get_device_features(handle, device) + if features_array is None: + return None + + d_type = get_device_type(handle, device, features_array) + d_name = get_device_name(handle, device, features_array) if device_name is None else device_name + d_firmware = get_device_firmware(handle, device, features_array) + devinfo = AttachedDeviceInfo(device, d_type, d_name, d_firmware, features_array) + _l.log(_LOG_LEVEL, "(%d,%d) found device %s", handle, device, devinfo) + return devinfo + + def get_feature_index(handle, device, feature): """Reads the index of a device's feature. @@ -265,28 +293,28 @@ def get_device_firmware(handle, device, features_array=None): for index in range(0, fw_count): fw_info = request(handle, device, FEATURE.FIRMWARE, function=b'\x10', params=chr(index), features_array=features_array) if fw_info: - fw_type = ord(fw_info[0]) & 0x0F - if fw_type == 0 or fw_type == 1: - prefix = str(fw_info[1:4]) + fw_level = ord(fw_info[0]) & 0x0F + if fw_level == 0 or fw_level == 1: + fw_type = FIRMWARE_TYPES[fw_level] + name = str(fw_info[1:4]) version = ( str((ord(fw_info[4]) & 0xF0) >> 4) + str(ord(fw_info[4]) & 0x0F) + '.' + str((ord(fw_info[5]) & 0xF0) >> 4) + str(ord(fw_info[5]) & 0x0F)) - name = prefix + ' ' + version build = (ord(fw_info[6]) << 8) + ord(fw_info[7]) - if build: - name += ' b' + str(build) extras = fw_info[9:].rstrip('\x00') - _l.log(_LOG_LEVEL, "(%d:%d) firmware %d = %s %s extras=%s", handle, device, fw_type, FIRMWARE_TYPES[fw_type], name, extras.encode('hex')) - fw.append((fw_type, name, build, extras)) - elif fw_type == 2: - version = ord(fw_info[1]) - _l.log(_LOG_LEVEL, "(%d:%d) firmware 2 = Hardware v%x", handle, device, version) - fw.append((2, version)) + if extras: + fw_info = _makeFirmwareInfo(level=fw_level, type=fw_type, name=name, version=version, build=build, extras=extras) + else: + fw_info = _makeFirmwareInfo(level=fw_level, type=fw_type, name=name, version=version, build=build) + elif fw_level == 2: + fw_info = _makeFirmwareInfo(level=2, type=FIRMWARE_TYPES[2], version=ord(fw_info[1])) else: - _l.log(_LOG_LEVEL, "(%d:%d) firmware other", handle, device) - fw.append((fw_type, )) + fw_info = _makeFirmwareInfo(level=fw_level, type=FIRMWARE_TYPES[-1]) + + fw.append(fw_info) + _l.log(_LOG_LEVEL, "(%d:%d) firmware %s", handle, device, fw_info) return fw diff --git a/logitech/unifying_receiver/base.py b/lib/logitech/unifying_receiver/base.py similarity index 99% rename from logitech/unifying_receiver/base.py rename to lib/logitech/unifying_receiver/base.py index 5a58a9e5..f2aa5c0a 100644 --- a/logitech/unifying_receiver/base.py +++ b/lib/logitech/unifying_receiver/base.py @@ -4,9 +4,6 @@ # import logging -_LOG_LEVEL = 4 -_l = logging.getLogger('logitech.unifying_receiver.base') -_l.setLevel(_LOG_LEVEL) from .constants import * from .exceptions import * @@ -15,6 +12,10 @@ from . import unhandled as _unhandled import hidapi as _hid +_LOG_LEVEL = 4 +_l = logging.getLogger('logitech.unifying_receiver.base') + + # # These values are defined by the Logitech documentation. # Overstepping these boundaries will only produce log warnings. diff --git a/logitech/unifying_receiver/constants.py b/lib/logitech/unifying_receiver/constants.py similarity index 100% rename from logitech/unifying_receiver/constants.py rename to lib/logitech/unifying_receiver/constants.py diff --git a/logitech/unifying_receiver/exceptions.py b/lib/logitech/unifying_receiver/exceptions.py similarity index 100% rename from logitech/unifying_receiver/exceptions.py rename to lib/logitech/unifying_receiver/exceptions.py diff --git a/lib/logitech/unifying_receiver/listener.py b/lib/logitech/unifying_receiver/listener.py new file mode 100644 index 00000000..8a99bea4 --- /dev/null +++ b/lib/logitech/unifying_receiver/listener.py @@ -0,0 +1,78 @@ +# +# +# + +import logging +import threading +import time + +from . import base +from .exceptions import * + + +_LOG_LEVEL = 6 +_l = logging.getLogger('logitech.unifying_receiver.listener') + +_EVENT_TIMEOUT = 100 +_IDLE_SLEEP = 1000.0 / 1000.0 + + +class EventsListener(threading.Thread): + def __init__(self, receiver, callback): + super(EventsListener, self).__init__(name='Unifying_Receiver_Listener_' + str(receiver)) + + self.receiver = receiver + self.callback = callback + + self.task = None + self.task_processing = threading.Lock() + + self.task_reply = None + self.task_done = threading.Event() + + self.active = False + + def run(self): + _l.log(_LOG_LEVEL, "(%d) starting", self.receiver) + self.active = True + while self.active: + # _l.log(_LOG_LEVEL, "(%d) reading next event", self.receiver) + event = base.read(self.receiver, _EVENT_TIMEOUT) + if event: + _l.log(_LOG_LEVEL, "(%d) got event %s", self.receiver, event) + self.callback.__call__(*event) + elif self.task is None: + # _l.log(_LOG_LEVEL, "(%d) idle sleep", self.receiver) + time.sleep(_IDLE_SLEEP) + else: + self.task_reply = self._make_request(*self.task) + self.task_done.set() + + def stop(self): + _l.log(_LOG_LEVEL, "(%d) stopping", self.receiver) + self.active = False + self.join() + + def request(self, api_function, *args, **kwargs): + # _l.log(_LOG_LEVEL, "(%d) request '%s' with %s, %s", self.receiver, api_function.__name__, args, kwargs) + self.task_processing.acquire() + self.task_done.clear() + self.task = (api_function, args, kwargs) + self.task_done.wait() + reply = self.task_reply + self.task = self.task_reply = None + self.task_processing.release() + # _l.log(_LOG_LEVEL, "(%d) request '%s' => [%s]", self.receiver, api_function.__name__, reply.encode('hex')) + if isinstance(reply, Exception): + raise reply + return reply + + def _make_request(self, api_function, args, kwargs): + _l.log(_LOG_LEVEL, "(%d) calling '%s' with %s, %s", self.receiver, api_function.__name__, args, kwargs) + try: + return api_function.__call__(self.receiver, *args, **kwargs) + except NoReceiver as nr: + self.task_reply = nr + self.active = False + except Exception as e: + self.task_reply = e diff --git a/logitech/unifying_receiver/tests/__init__.py b/lib/logitech/unifying_receiver/tests/__init__.py similarity index 100% rename from logitech/unifying_receiver/tests/__init__.py rename to lib/logitech/unifying_receiver/tests/__init__.py diff --git a/logitech/unifying_receiver/tests/test_00_hidapi.py b/lib/logitech/unifying_receiver/tests/test_00_hidapi.py similarity index 100% rename from logitech/unifying_receiver/tests/test_00_hidapi.py rename to lib/logitech/unifying_receiver/tests/test_00_hidapi.py diff --git a/logitech/unifying_receiver/tests/test_10_constants.py b/lib/logitech/unifying_receiver/tests/test_10_constants.py similarity index 100% rename from logitech/unifying_receiver/tests/test_10_constants.py rename to lib/logitech/unifying_receiver/tests/test_10_constants.py diff --git a/logitech/unifying_receiver/tests/test_30_base.py b/lib/logitech/unifying_receiver/tests/test_30_base.py similarity index 100% rename from logitech/unifying_receiver/tests/test_30_base.py rename to lib/logitech/unifying_receiver/tests/test_30_base.py diff --git a/logitech/unifying_receiver/tests/test_50_api.py b/lib/logitech/unifying_receiver/tests/test_50_api.py similarity index 84% rename from logitech/unifying_receiver/tests/test_50_api.py rename to lib/logitech/unifying_receiver/tests/test_50_api.py index 07274d42..1a1721b1 100644 --- a/logitech/unifying_receiver/tests/test_50_api.py +++ b/lib/logitech/unifying_receiver/tests/test_50_api.py @@ -15,6 +15,7 @@ class Test_UR_API(unittest.TestCase): cls.handle = None cls.device = None cls.features_array = None + cls.device_info = None @classmethod def tearDownClass(cls): @@ -22,6 +23,7 @@ class Test_UR_API(unittest.TestCase): api.close(cls.handle) cls.device = None cls.features_array = None + cls.device_info = None def test_00_open_receiver(self): Test_UR_API.handle = api.open() @@ -92,8 +94,10 @@ class Test_UR_API(unittest.TestCase): self.fail("no feature set available") d_firmware = api.get_device_firmware(self.handle, self.device, self.features_array) - self.assertIsNotNone(d_firmware, "failed to get device type") - self.assertGreater(len(d_firmware), 0, "empty device type") + self.assertIsNotNone(d_firmware, "failed to get device firmware") + self.assertGreater(len(d_firmware), 0, "device reported no firmware") + for fw in d_firmware: + self.assertIsInstance(fw, api.FirmwareInfo) def test_52_get_device_type(self): if self.handle is None: @@ -119,6 +123,19 @@ class Test_UR_API(unittest.TestCase): self.assertIsNotNone(d_name, "failed to read device name") self.assertGreater(len(d_name), 0, "empty device name") + def test_59_get_device_info(self): + if self.handle is None: + self.fail("No receiver found") + if self.device is None: + self.fail("Found no devices attached.") + if self.features_array is None: + self.fail("no feature set available") + + device_info = api.get_device_info(self.handle, self.device, features_array=self.features_array) + self.assertIsNotNone(device_info, "failed to read full device info") + self.assertIsInstance(device_info, api.AttachedDeviceInfo) + Test_UR_API.device_info = device_info + def test_60_get_battery_level(self): if self.handle is None: self.fail("No receiver found") @@ -131,7 +148,7 @@ class Test_UR_API(unittest.TestCase): battery = api.get_device_battery_level(self.handle, self.device, self.features_array) self.assertIsNotNone(battery, "failed to read battery level") except FeatureNotSupported: - self.fail("BATTERY feature not supported by device " + str(self.device)) + self.fail("FEATURE.BATTERY not supported by device " + str(self.device) + ": " + str(self.device_info)) def test_70_list_devices(self): if self.handle is None: diff --git a/logitech/unifying_receiver/unhandled.py b/lib/logitech/unifying_receiver/unhandled.py similarity index 100% rename from logitech/unifying_receiver/unhandled.py rename to lib/logitech/unifying_receiver/unhandled.py diff --git a/lib/logitech/ur_eventqueue.py b/lib/logitech/ur_eventqueue.py new file mode 100644 index 00000000..1628d820 --- /dev/null +++ b/lib/logitech/ur_eventqueue.py @@ -0,0 +1,363 @@ +"""Low-level interface for devices connected through a Logitech Universal +Receiver (UR). + +Uses the HID api exposed through hidapi.py. +Incomplete. Based on a bit of documentation, trial-and-error, and guesswork. + +In the context of this API, 'device' is the number (1..6 according to the +documentation) of the device attached to the UR. + +References: +http://julien.danjou.info/blog/2012/logitech-k750-linux-support +http://6xq.net/git/lars/lshidpp.git/plain/doc/ +""" + + +# +# Logging set-up. +# Add a new logging level for tracing low-level writes and reads. +# + +import logging +import threading + +from . import hidapi + + +LOG_LEVEL = 1 + +def _urll_trace(self, msg, *args): + if self.isEnabledFor(LOG_LEVEL): + args = (None if x is None + else x.encode('hex') if type(x) == str and any(c < '\x20' or c > '\x7E' for c in x) + else x + for x in args) + self.log(LOG_LEVEL, msg, *args) + +logging.addLevelName(LOG_LEVEL, 'trace1') +logging.Logger.trace1 = _urll_trace +_log = logging.getLogger('logitech.ur_lowlevel') +_log.setLevel(LOG_LEVEL) + + +# +# +# + + +"""Possible features available on a Logitech device. + +A particular device might not support all these features, and may support other +unknown features as well. +""" +FEATURE = type('FEATURE', (), + dict( + ROOT=b'\x00\x00', + FEATURE_SET=b'\x00\x01', + FIRMWARE=b'\x00\x03', + NAME=b'\x00\x05', + BATTERY=b'\x10\x00', + REPROGRAMMABLE_KEYS=b'\x1B\x00', + WIRELESS_STATUS=b'\x1D\x4B', + # UNKNOWN_1=b'\x1D\xF3', + # UNKNOWN_2=b'\x40\xA0', + # UNKNOWN_3=b'\x41\x00', + SOLAR_CHARGE=b'\x43\x01', + # UNKNOWN_4=b'\x45\x20', + )) + + +"""Possible types of devices connected to an UR.""" +DEVICE_TYPES = ("Keyboard", "Remote Control", "NUMPAD", "Mouse", + "Touchpad", "Trackball", "Presenter", "Receiver") + + +"""Default timeout on read (in ms).""" +DEFAULT_TIMEOUT = 1000 + + +"""Minimum size of a reply data packet.""" +_MIN_REPLY_SIZE = 7 + + +"""Maximum size of a reply data packet.""" +_MAX_REPLY_SIZE = 32 + +class NoReceiver(Exception): + """May be raised when trying to talk through a previously connected + receiver that is no longer available.""" + pass + + +# +# +# + + +class Receiver(threading.Thread): + def __init__(self, handle, path, timeout=DEFAULT_TIMEOUT): + super(Receiver, self).__init__(name='Unifying_Receiver_' + path) + self.handle = handle + self.path = path + self.timeout = timeout + + self.read_data = None + self.data_available = threading.Event() + + self.devices = {} + self.hooks = {} + + self.active = True + self.start() + + def __del__(self): + self.close() + + def close(self): + self.active = False + + try: + hidapi.close(self.handle) + _log.trace1("|%s:| closed", self.path) + return True + except Exception as e: + _log.warn("|%s:| closing: %s", self.path, e) + + self.hooks = None + self.devices = None + + def run(self): + while self.active: + data = hidapi.read(self.handle, _MAX_REPLY_SIZE, self.timeout) + if self.active and data: + _log.trace1("|%s|*| => r[%s]", self.path, data) + if len(data) < _MIN_REPLY_SIZE: + _log.trace1("|%s|*| => r[%s] short read", self.path, data) + if len(data) > _MAX_REPLY_SIZE: + _log.trace1("|%s|*| => r[%s] long read", self.path, data) + if not self._dispatch_to_hooks(data): + self.read_data = data + self.data_available.set() + + def _dispatch_to_hooks(self, data): + if data[0] == b'\x11': + for key in self.hooks: + if key == data[1:3]: + self.hooks[key].__call__(data[3:]) + return True + + def set_hook(self, device, feature_index, function=b'\x00', callback=None): + key = '%c%s%c' % (device, feature_index, function) + if callback is None: + if key in self.hooks: + del self.hooks[key] + else: + self.hooks[key] = callback + return True + + def _write(self, device, data): + wdata = b'\x10' + chr(device) + data + b'\x00' * (5 - len(data)) + if hidapi.write(self.handle, wdata): + _log.trace1("|%s|%d| <= w[%s]", self.path, device, wdata) + return True + else: + _log.trace1("|%s|%d| <= w[%s] failed ", self.path, device, wdata) + raise NoReceiver() + + def _read(self, device, feature_index=None, function=None, timeout=DEFAULT_TIMEOUT): + while True: + self.data_available.wait() + data = self.data + self.data_available.clear() + + if data[1] == chr(device): + if feature_index is None or data[2] == feature_index: + if function is None or data[3] == function: + return data + + _log.trace1("|%s:| ignoring read data [%s]", self.path, data) + + def _request(self, device, feature_index, function=b'\x00', data=b''): + self._write(device, feature_index + function + data) + return self._read(device, feature_index, function) + + def _request_direct(self, device, feature_index, function=b'\x00', data=b''): + self._write(device, feature_index + function + data) + while True: + data = hidapi.read(self.handle, _MAX_REPLY_SIZE, self.timeout) + + if not data: + continue + + if data[1] == chr(device) and data[2] == feature_index and data[3] == function: + return data + + + def ping(self, device): + """Pings a device to check if it is attached to the UR. + + :returns: True if the device is connected to the UR, False if the device is + not attached, None if no conclusive reply is received. + """ + if self._write(device, b'\x00\x10\x00\x00\xAA'): + while True: + reply = self._read(device, timeout=DEFAULT_TIMEOUT*3) + + # ping ok + if reply[0] == b'\0x11' and reply[1] == chr(device): + if reply[2:4] == b'\x00\x10' and reply[6] == b'\xAA': + _log.trace1("|%s|%d| ping: ok %s", self.path, device, reply[2]) + return True + + # ping failed + if reply[0] == b'\0x10' and reply[1] == chr(device): + if reply[2:4] == b'\x8F\x00': + _log.trace1("|%s|%d| ping: device not present", self.path, device) + return False + + _log.trace1("|%s|%d| ping: unknown reply", self.path, device, reply) + + def scan_devices(self): + for device in range(1, 7): + self.get_device(device) + + return self.devices.values() + + def get_device(self, device, query=True): + if device in self.devices: + value = self.devices[device] + _log.trace1("|%s:%d| device info %s", self.path, device, value) + return value + + if query and self.ping(device): + d_type = self.get_type(device) + d_name = self.get_name(device) + features_array = self._get_features(device) + value = (d_type, d_name, features_array) + self.devices[device] = value + _log.trace1("|%s:%d| device info %s", self.path, device, value) + return value + + _log.trace1("|%s:%d| device not found", self.path, device) + + def _get_feature_index(self, device, feature): + """Reads the index of a device's feature. + + :returns: An int, or None if the feature is not available. + """ + _log.trace1("|%s|%d| get feature index <%s>", self.path, device, feature) + reply = self._request(device, b'\x00', b'\x00', feature) + # only consider active and supported features + if ord(reply[4]) and ord(reply[5]) & 0xA0 == 0: + _log.trace1("|%s|%d| feature <%s> has index %s", self.path, device, feature, reply[4]) + return ord(reply[4]) + + _log.trace1("|%s|%d| feature <%s> not available", self.path, device, feature) + + def _get_features(self, device): + """Returns an array of feature ids. + + Their position in the array is the index to be used when accessing that + feature on the device. + + Only call this function in the initial set-up of the device, because + other messages and events not related to querying the feature set + will be ignored. + """ + _log.trace1("|%s|%d| get device features", self.path, device) + + # get the index of the FEATURE_SET + fs_index = self._get_feature_index(device, FEATURE.FEATURE_SET) + fs_index = chr(fs_index) + + # Query all the available features on the device, even if unknown. + + # get the number of active features the device has + features_count = self._request(device, fs_index) + features_count = ord(features_count[4]) + _log.trace1("|%s|%d| found %d features", self.path, device, features_count) + + # a device may have a maximum of 15 features + features = [None] * 0x10 + for index in range(1, 1 + features_count): + # for each index, get the feature residing at that index + feature = self._request(device, fs_index, b'\x10', chr(index)) + features[index] = feature[4:6].upper() + _log.trace1("|%s|%d| feature <%s> at index %d", self.path, device, features[index], index) + + return None if all(c is None for c in features) else features + + def get_type(self, device): + if device in self.devices: + return self.devices[device][0] + + dnt_index = self._get_feature_index(device, FEATURE.NAME) + dnt_index = chr(dnt_index) + d_type = self._request(device, dnt_index, b'\x20') + d_type = ord(d_type[4]) + return DEVICE_TYPES[d_type] + + def get_name(self, device): + if device in self.devices: + return self.devices[device][1] + + dnt_index = self._get_feature_index(device, FEATURE.NAME) + dnt_index = chr(dnt_index) + self._write(device, dnt_index) + name_length = self._read(device, dnt_index, b'\x00') + name_length = ord(name_length[4]) + + d_name = '' + while len(d_name) < name_length: + name_index = len(d_name) + name_fragment = self._request(device, dnt_index, b'\x10', chr(name_index)) + name_fragment = name_fragment[:name_length - len(d_name)] + d_name += name_fragment + + return d_name + + + +def open(): + """Opens the first Logitech UR found attached to the machine. + + :returns: A Receiver object for the found receiver, or ``None``. + """ + # USB ids for (Logitech, Unifying Receiver) + # interface 2 if the actual receiver interface + for rawdevice in hidapi.enumerate(0x046d, 0xc52b, 2): + + _log.trace1("checking %s", rawdevice) + receiver = hidapi.open_path(rawdevice.path) + if not receiver: + # could be a file permissions issue + # in any case, unreachable + _log.trace1("[%s] open failed", rawdevice.path) + continue + + _log.trace1("[%s] receiver handle %d", rawdevice.path, receiver) + # ping on device id 0 (always an error) + hidapi.write(receiver, b'\x10\x00\x00\x10\x00\x00\xAA') + + # if this is the right hidraw device, we'll receive a 'bad subdevice' + # otherwise, the read should produce nothing + reply = hidapi.read(receiver, _MAX_REPLY_SIZE, DEFAULT_TIMEOUT) + if reply: + _log.trace1("[%s] receiver %d exploratory ping reply [%s]", rawdevice.path, receiver, reply) + + if reply[:4] == b'\x10\x00\x8F\x00': + # 'device 0 unreachable' is the expected reply from a valid receiver handle + _log.trace1("[%s] success: found receiver with handle %d", rawdevice.path, receiver) + return Receiver(receiver, rawdevice.path) + + if reply == b'\x01\x00\x00\x00\x00\x00\x00\x00': + # no idea what this is, but it comes up occasionally + _log.trace1("[%s] receiver %d mistery reply", rawdevice.path, receiver) + else: + _log.trace1("[%s] receiver %d unknown reply", rawdevice.path, receiver) + else: + _log.trace1("[%s] receiver %d no reply", rawdevice.path, receiver) + pass + + # ignore + hidapi.close(receiver) diff --git a/lib/logitech/ur_queue2.py b/lib/logitech/ur_queue2.py new file mode 100644 index 00000000..62f26fbe --- /dev/null +++ b/lib/logitech/ur_queue2.py @@ -0,0 +1,216 @@ +"""A few functions to deal with the Logitech Universal Receiver. + +It is assumed a single UR device is attached to the machine. + +Uses hidapi. +""" + +import logging +import threading + +from . import ur_lowlevel as urll +from urll import FEATURE + + +_log = logging.getLogger('logitech.ur') +_log.setLevel(logging.DEBUG) + + +# class NoDevice(Exception): +# """May be thrown when trying to talk through a previously present device +# that is no longer available.""" +# pass + + +class _EventQueue(threading.Thread): + def __init__(self, receiver, timeout=urll.DEFAULT_TIMEOUT): + super(_EventQueue, self).__init__() + self.daemon = True + self.receiver = receiver + self.timeout = timeout + self.active = True + + def stop(self): + self.active = False + self.join() + + def run(self): + while self.active: + data = urll.read(self.receiver.handle, self.timeout) + if not self.active: + # in case the queue has been stopped while reading + break + if data: + self.receiver._dispatch(*data) + + +class Receiver: + def __init__(self, path, handle=None, timeout=urll.DEFAULT_TIMEOUT): + self.path = path + self.handle = handle + + self.DEVICE_FEATURES = {} + self.hooks = {} + + self.event_queue = _EventQueue(self.handle, timeout) + self.event_queue.start() + + def close(self): + self.event_queue.stop() + self.event_queue = None + + urll.close(self.handle) + self.handle = None + + self.hooks = {} + self.DEVICE_FEATURES = {} + + def ping(self, device): + reply = self.event_queue.req() + if not urll.write(self.handle, device, '\x00\x10\x00\x00\xAA'): + # print "write failed", + return False + + reply = urll.read(self.handle, device) + if not reply: + # print "no data", + return False + + # 10018f00100900 + if ord(reply[0]) == 0x10: + if ord(reply[2]) == 0x8F: + # print "invalid", + return False + + # 110100100200aa00000000000000000000000000 + if ord(reply[0]) == 0x11: + if reply[2:4] == "\x00\x10" and reply[6] == "\xAA": + # success + return True + + # print "unknown" + return False + + def hook(self, device, feature, function=None, callback=None): + features = self.DEVICE_FEATURES[device] + if feature not in features: + raise Exception("feature " + feature + " not supported by device") + + feature_index = features.index(feature) + key = (device, feature_index, function, callback) + if key not in self.hooks: + self.hooks[key] = [] + if callback is None: + if callback in self.hooks[key]: + self.hooks[key].remove(callback) + else: + self.hooks[key].append(callback) + + def _dispatch(self, status, device, data): + _log.debug("incoming event %2x:%2x:%s", status, device, data.encode('hex')) + dispatched = False + for (key, callback) in self.hooks.items(): + if key[0] == device and key[1] == ord(data[0]): + if key[2] is not None and key[2] == data[1] & 0xFF: + callback.__call__(data) + + if not dispatched: + _log.debug("ignored incoming event %2x:%2x:%s", + status, device, data.encode('hex')) + + def _request(self, device, data=''): + if urll.write(self.handler, device, data): + pass + + def find_device(self, device_type=None, name=None): + """Gets the device number for the first device matching. + + The device type and name are case-insensitive. + """ + # Apparently a receiver supports up to 6 devices. + for device in range(1, 7): + if self.ping(device): + if device not in self.DEVICE_FEATURES: + self.DEVICE_FEATURES[device] = \ + urll.get_device_features(self.handle, device) + # print get_reprogrammable_keys(receiver, device) + # d_firmware = get_firmware_version(receiver, device) + # print "device", device, "[", d_name, "/", d_type, "]" + # print "firmware", d_firmware, "features", _DEVICE_FEATURES[device] + if device_type: + d_type = self.get_type(device) + if d_type is None or device_type.lower() != d_type.lower(): + continue + if name: + d_name = self.get_name(device) + if d_name is None or name.lower() != d_name.lower(): + continue + return device + + def get_type(self, device): + reply = self._request(device, FEATURE.GET_NAME, '\x20') + if reply: + return DEVICE_TYPES[ord(reply[2][2])] + + def get_name(self, device): + reply = self._request(device, FEATURE.GET_NAME) + if reply: + charcount = ord(reply[4]) + name = '' + index = 0 + while len(name) < charcount: + reply = self._request(device, FEATURE.NAME, '\x10', chr(index)) + if reply: + name += reply[4:4 + charcount - index] + index = len(name) + else: + break + return name + + def get_firmware_version(self, device, firmware_type=0): + reply = self._request(device, + FEATURE.FIRMWARE, '\x10', chr(firmware_type)) + if reply: + return '%s %s.%s' % (reply[5:8], + reply[8:10].encode('hex'), reply[10:12].encode('hex')) + + def get_battery_level(self, device): + reply = self._request(device, FEATURE.BATTERY) + if reply: + return (ord(reply[4]), ord(reply[5]), ord(reply[6])) + + def get_reprogrammable_keys(self, device): + count = self._request(device, FEATURE.REPROGRAMMABLE_KEYS) + if count: + keys = [] + for index in range(ord(count[4])): + key = self._request(device, + FEATURE.REPROGRAMMABLE_KEYS, '\x10', chr(index)) + keys.append(key[4:6], keys[6:8], ord(key[8])) + return keys + + def get_solar_charge(self, device): + reply = self._request(device, FEATURE.SOLAR_CHARGE, + '\x03', '\x78', '\x01', reply_function='\x10') + if reply: + charge = ord(reply[4]) + lux = ord(reply[5]) << 8 | ord(reply[6]) + # lux = int(round(((255 * ord(reply[5])) + ord(reply[6])) / 538.0, 2) * 100) + return (charge, lux) + + +# +# +# + +def get(): + """Gets a Receiver object for the Unifying Receiver connected to the machine. + + It is assumed a single receiver is connected to the machine. If more than + one are present, the first one found will be returned. + + :returns: a Receiver object, or None. + """ + receiver = urll.open() + if receiver: + return Receiver(*receiver) diff --git a/lib/logitech/ur_test3.py b/lib/logitech/ur_test3.py new file mode 100644 index 00000000..fcedd278 --- /dev/null +++ b/lib/logitech/ur_test3.py @@ -0,0 +1,533 @@ +"""Low-level interface for devices connected through a Logitech Universal +Receiver (UR). + +Uses the HID api exposed through hidapi.py. +Incomplete. Based on a bit of documentation, trial-and-error, and guesswork. + +Strongly recommended to use these functions from a single thread; calling +multiple functions from different threads has a high chance of mixing the +replies and causing apparent failures. + +In the context of this API, 'handle' is the open handle of UR attached to +the machine, and 'device' is the number (1..6 according to the documentation) +of the device attached to the UR. + +References: +http://julien.danjou.info/blog/2012/logitech-k750-linux-support +http://6xq.net/git/lars/lshidpp.git/plain/doc/ +""" + + +# +# Logging set-up. +# Add a new logging level for tracing low-level writes and reads. +# + +import logging + +LOG_LEVEL = 1 + +def _urll_trace(self, msg, *args): + if self.isEnabledFor(LOG_LEVEL): + args = (None if x is None + else x.encode('hex') if type(x) == str and any(c < '\x20' or c > '\x7E' for c in x) + else x + for x in args) + self.log(LOG_LEVEL, msg, *args) + +logging.addLevelName(LOG_LEVEL, 'TRACE1') +logging.Logger.trace1 = _urll_trace +_l = logging.getLogger('ur_lowlevel') +_l.setLevel(LOG_LEVEL) + + +# +# +# + + +"""Possible features available on a Logitech device. + +A particular device might not support all these features, and may support other +unknown features as well. +""" +FEATURE = type('FEATURE', (), + dict( + ROOT=b'\x00\x00', + FEATURE_SET=b'\x00\x01', + FIRMWARE=b'\x00\x03', + NAME=b'\x00\x05', + BATTERY=b'\x10\x00', + REPROGRAMMABLE_KEYS=b'\x1B\x00', + WIRELESS_STATUS=b'\x1D\x4B', + # UNKNOWN_1=b'\x1D\xF3', + # UNKNOWN_2=b'\x40\xA0', + # UNKNOWN_3=b'\x41\x00', + SOLAR_CHARGE=b'\x43\x01', + # UNKNOWN_4=b'\x45\x20', + )) +FEATURE_NAMES = { FEATURE.ROOT: 'ROOT', + FEATURE.FEATURE_SET: 'FEATURE_SET', + FEATURE.FIRMWARE: 'FIRMWARE', + FEATURE.NAME: 'NAME', + FEATURE.BATTERY: 'BATTERY', + FEATURE.REPROGRAMMABLE_KEYS: 'REPROGRAMMABLE_KEYS', + FEATURE.WIRELESS_STATUS: 'WIRELESS_STATUS', + FEATURE.SOLAR_CHARGE: 'SOLAR_CHARGE', + } + + +"""Possible types of devices connected to an UR.""" +DEVICE_TYPES = ("Keyboard", "Remote Control", "NUMPAD", "Mouse", + "Touchpad", "Trackball", "Presenter", "Receiver") + + +FIRMWARE_TYPES = ("Main (HID)", "Bootloader", "Hardware", "Other") + +BATTERY_STATUSES = ("Discharging (in use)", "Recharging", "Almost full", "Full", + "Slow recharge", "Invalid battery", "Thermal error", + "Charging error") + +ERROR_CODES = ("Ok", "Unknown", "Invalid argument", "Out of range", + "Hardware error", "Logitech internal", "Invalid feature index", + "Invalid function", "Busy", "Usupported") + +"""Default timeout on read (in ms).""" +DEFAULT_TIMEOUT = 1000 + + +"""Minimum size of a reply data packet.""" +_MIN_REPLY_SIZE = 7 + + +"""Maximum size of a reply data packet.""" +_MAX_REPLY_SIZE = 32 + + +# +# Exceptions that may be raised by this API. +# + +class NoReceiver(Exception): + """May be raised when trying to talk through a previously connected + receiver that is no longer available.""" + pass + +class FeatureNotSupported(Exception): + """Raised when trying to request a feature not supported by the device.""" + def __init__(self, device, feature): + super(FeatureNotSupported, self).__init__(device, feature, FEATURE_NAMES[feature]) + self.device = device + self.feature = feature + self.feature_name = FEATURE_NAMES[feature] + + +# +# +# + + +def _default_event_hook(reply_code, device, data): + _l.trace1("EVENT_HOOK (,%d) code %d status [%s]", device, reply_code, data) + + +"""A function that will be called on incoming events. + +It must be a function with the signature: ``_(int, int, str)``, where the +parameters are: (reply code, device number, data). + +This function will be called by the request() function, when it receives replies +that do not match the write ca +""" +event_hook = _default_event_hook + +def _publish_event(reply_code, device, data): + if event_hook is not None: + event_hook.__call__(reply_code, device, data) + + +# +# Low-level functions. +# + + +from . import hidapi + + +def open(): + """Opens the first Logitech UR found attached to the machine. + + :returns: An open file handle for the found receiver, or ``None``. + """ + # USB ids for (Logitech, Unifying Receiver) + # interface 2 if the actual receiver interface + for rawdevice in hidapi.enumerate(0x046d, 0xc52b, 2): + + _l.trace1("checking %s", rawdevice) + receiver = hidapi.open_path(rawdevice.path) + if not receiver: + # could be a file permissions issue + # in any case, unreachable + _l.trace1("[%s] open failed", rawdevice.path) + continue + + _l.trace1("[%s] receiver handle (%d,)", rawdevice.path, receiver) + # ping on device id 0 (always an error) + hidapi.write(receiver, b'\x10\x00\x00\x10\x00\x00\xAA') + + # if this is the right hidraw device, we'll receive a 'bad subdevice' + # otherwise, the read should produce nothing + reply = hidapi.read(receiver, _MAX_REPLY_SIZE, DEFAULT_TIMEOUT) + if reply: + if reply[:4] == b'\x10\x00\x8F\x00': + # 'device 0 unreachable' is the expected reply from a valid receiver handle + _l.trace1("[%s] success: handle (%d,)", rawdevice.path, receiver) + return receiver + + if reply == b'\x01\x00\x00\x00\x00\x00\x00\x00': + # no idea what this is, but it comes up occasionally + _l.trace1("[%s] (%d,) mistery reply [%s]", rawdevice.path, receiver, reply) + else: + _l.trace1("[%s] (%d,) unknown reply [%s]", rawdevice.path, receiver, reply) + else: + _l.trace1("[%s] (%d,) no reply", rawdevice.path, receiver) + + # ignore + close(receiver) + # hidapi.close(receiver) + + return None + + +def close(handle): + """Closes a HID device handle.""" + if handle: + try: + hidapi.close(handle) + _l.trace1("(%d,) closed", handle) + return True + except Exception as e: + _l.debug("(%d,) closing: %s", handle, e) + + return False + + +def write(handle, device, feature_index, function=b'\x00', param1=b'\x00', param2=b'\x00', param3=b'\x00'): + """Write a feature call to the receiver. + + :param handle: UR handle obtained with open(). + :param device: attached device number + :param feature_index: index in the + """ + if type(feature_index) == int: + feature_index = chr(feature_index) + data = feature_index + function + param1 + param2 + param3 + return _write(handle, device, data) + + +def _write(handle, device, data): + """Writes some data to a certain device. + + The first two (required) bytes of data must be the feature index for the + device, and a function code for that feature. + + If the receiver is no longer available (e.g. has been physically removed + from the machine), raises NoReceiver. + """ + wdata = b'\x10' + chr(device) + data + b'\x00' * (5 - len(data)) + _l.trace1("(%d,%d) <= w[%s]", handle, device, wdata) + # return hidapi.write(handle, wdata) + if not hidapi.write(handle, wdata): + _l.trace1("(%d,%d) write failed, assuming receiver has been removed", handle, device) + raise NoReceiver() + + +def read(handle, timeout=DEFAULT_TIMEOUT): + """Read some data from the receiver. Usually called after a write (feature + call), to get the reply. + + If any data was read in the given timeout, returns a tuple of + (reply_code, device, message data). The reply code should be ``0x11`` for a + successful feature call, or ``0x10`` to indicate some error, e.g. the device + is no longer available. + """ + data = hidapi.read(handle, _MAX_REPLY_SIZE, timeout) + if data: + _l.trace1("(%d,*) => r[%s]", handle, data) + if len(data) < _MIN_REPLY_SIZE: + _l.trace1("(%d,*) => r[%s] read short reply", handle, data) + if len(data) > _MAX_REPLY_SIZE: + _l.trace1("(%d,*) => r[%s] read long reply", handle, data) + return ord(data[0]), ord(data[1]), data[2:] + else: + _l.trace1("(%d,*) => r[]", handle) + + +def request(handle, device, feature, function=b'\x00', data=b'', features_array=None): + """Makes a feature call to the device, and returns the reply data. + + Basically a write() followed by (possibly multiple) reads, until a reply + matching the called feature is received. In theory the UR will always reply + to feature call; otherwise this function will wait indefinetly. + + Incoming data packets not matching the feature and function will be + delivered to the event_hook (if any), and then ignored. + + The optional ``features_array`` parameter is a cached result of the + get_device_features function for this device, necessary to find the feature + index. If the ``features_arrary`` is not provided, one will be obtained by + calling get_device_features. + + If the feature is not supported, returns None. + """ + if features_array is None: + features_array = get_device_features(handle, device) + if features_array is None: + _l.trace1("(%d,%d) no features array available", handle, device) + return None + + if feature in features_array: + feature_index = chr(features_array.index(feature)) + return _request(handle, device, feature_index + function, data) + else: + _l.warn("(%d,%d) feature <%s:%s> not supported", handle, device, feature.encode('hex'), FEATURE_NAMES[feature]) + raise FeatureNotSupported(device, feature) + + + +def _request(handle, device, feature_function, data=b''): + """Makes a feature call device and waits for a matching reply. + + Only call this in the initial set-up of the device. + + This function will skip all incoming messages and events not related to the + device we're requesting for, or the feature specified in the initial + request; it will also wait for a matching reply indefinetly. + + :param feature_function: a two-byte string of (feature_index, function). + :param data: additional data to send, up to 5 bytes. + :returns: + """ + _l.trace1("(%d,%d) request feature %s data %s", handle, device, feature_function, data) + _write(handle, device, feature_function + data) + while True: + reply = read(handle) + + if not reply: + # keep waiting... + continue + + if reply[1] != device: + # this message not for the device we're interested in + _l.trace1("(%d,%d) request reply for unexpected device %s", handle, device, reply) + _publish_event(*reply) + continue + + if reply[0] == 0x10 and reply[2][0] == b'\x8F': + # device not present + _l.trace1("(%d,%d) request ping failed %s", handle, device, reply) + return None + + if reply[0] == 0x11 and reply[2][0] == b'\xFF' and reply[2][1:3] == feature_function: + # an error returned from the device + error = ord(reply[2][3]) + _l.trace1("(%d,%d) request feature call error %d = %s: %s", handle, device, error, ERROR_CODES[error], reply) + return None + + if reply[0] == 0x11 and reply[2][:2] == feature_function: + # a matching reply + _l.trace1("(%d,%d) matched reply with data [%s]", handle, device, reply[2][2:]) + return reply[2][2:] + + _l.trace1("(%d,%d) unmatched reply %s (expected %s)", handle, device, reply[2][:2], feature_function) + _publish_event(*reply) + + +def ping(handle, device): + """Pings a device to check if it is attached to the UR. + + :returns: True if the device is connected to the UR, False if the device is + not attached, None if no conclusive reply is received. + """ + def _status(reply): + if not reply: + return None + + if reply[1] != device: + # oops + _l.trace1("(%d,%d) ping: reply for another device: %s", handle, device, reply) + _publish_event(*reply) + return _status(read(handle)) + + if (reply[0] == 0x11 and reply[2][:2] == b'\x00\x10' and reply[2][4] == b'\xAA'): + # ping ok + _l.trace1("(%d,%d) ping: ok %s", handle, device, reply[2]) + return True + + if (reply[0] == 0x10 and reply[2][:2] == b'\x8F\x00'): + # ping failed + _l.trace1("(%d,%d) ping: device not present", handle, device) + return False + + if (reply[0] == 0x11 and reply[2][:2] == b'\x09\x00' and reply[2][7:11] == b'GOOD'): + # some devices may reply with a SOLAR_STATUS packet before the + # ping_ok reply, especially right after the device connected to the + # receiver + _l.trace1("(%d,%d) ping: solar status %s", handle, device, reply[2]) + _publish_event(*reply) + return _status(read(handle)) + + # ugh + _l.trace1("(%d,%d) ping: unknown reply for this device", handle, device, reply) + _publish_event(*reply) + return None + + _l.trace1("(%d,%d) pinging", handle, device) + _write(handle, device, b'\x00\x10\x00\x00\xAA') + return _status(read(handle, DEFAULT_TIMEOUT * 3)) + + +def get_feature_index(handle, device, feature): + """Reads the index of a device's feature. + + :returns: An int, or ``None`` if the feature is not available. + """ + _l.trace1("(%d,%d) get feature index <%s>", handle, device, feature.encode('hex')) + feature_index = _request(handle, device, FEATURE.ROOT, feature) + if feature_index: + # only consider active and supported features + if ord(feature_index[0]) and ord(feature_index[1]) & 0xA0 == 0: + _l.trace1("(%d,%d) feature <%s> index %s", handle, device, feature.encode('hex'), feature_index[0]) + return ord(feature_index[0]) + + _l.warn("(%d,%d) feature <%s:%s> not supported", handle, device, feature.encode('hex'), FEATURE_NAMES[feature]) + raise FeatureNotSupported(device, feature) + + +def get_device_features(handle, device): + """Returns an array of feature ids. + + Their position in the array is the index to be used when accessing that + feature on the device. + + Only call this function in the initial set-up of the device, because + other messages and events not related to querying the feature set + will be ignored. + """ + _l.trace1("(%d,%d) get device features", handle, device) + + # get the index of the FEATURE_SET + fs_index = _request(handle, device, FEATURE.ROOT, FEATURE.FEATURE_SET) + if not fs_index: + _l.trace1("(%d,%d) FEATURE_SET not available", handle, device) + return None + fs_index = fs_index[0] + + # For debugging purposes, query all the available features on the device, + # even if unknown. + + # get the number of active features the device has + features_count = _request(handle, device, fs_index + b'\x00') + if not features_count: + # this can happen if the device disappeard since the fs_index call + _l.trace1("(%d,%d) no features available?!", handle, device) + return None + features_count = ord(features_count[0]) + + # a device may have a maximum of 15 features + features = [None] * 0x10 + _l.trace1("(%d,%d) found %d features", handle, device, features_count) + + for index in range(1, 1 + features_count): + # for each index, get the feature residing at that index + feature = _request(handle, device, fs_index + b'\x10', chr(index)) + if feature: + features[index] = feature[0:2].upper() + _l.trace1("(%d,%d) feature <%s> at index %d", handle, device, features[index].encode('hex'), index) + + return None if all(c == None for c in features) else features + + +def get_device_firmware(handle, device, features_array=None): + """Reads a device's firmware info. + + Returns an list of tuples [ (firmware_type, firmware_version, ...), ... ], + ordered by firmware layer. + """ + fw_count = request(handle, device, FEATURE.FIRMWARE, features_array=features_array) + if fw_count: + fw_count = ord(fw_count[0]) + + fw = [] + for index in range(0, fw_count): + fw_info = request(handle, device, FEATURE.FIRMWARE, function=b'\x10', data=chr(index), features_array=features_array) + if fw_info: + fw_type = ord(fw_info[0]) & 0x0F + if fw_type == 0 or fw_type == 1: + prefix = str(fw_info[1:4]) + version = ( str((ord(fw_info[4]) & 0xF0) >> 4) + + str(ord(fw_info[4]) & 0x0F) + + '.' + + str((ord(fw_info[5]) & 0xF0) >> 4) + + str(ord(fw_info[5]) & 0x0F)) + name = prefix + ' ' + version + build = 256 * ord(fw_info[6]) + ord(fw_info[7]) + if build: + name += ' b' + str(build) + extras = fw_info[9:].rstrip('\x00') + _l.trace1("(%d:%d) firmware %d = %s %s extras=%s", handle, device, fw_type, FIRMWARE_TYPES[fw_type], name, extras.encode('hex')) + fw.append((fw_type, name, build, extras)) + elif fw_type == 2: + version = ord(fw_info[1]) + _l.trace1("(%d:%d) firmware 2 = Hardware v%x", handle, device, version) + fw.append((2, version)) + else: + _l.trace1("(%d:%d) firmware other", handle, device) + fw.append((fw_type, )) + return fw + + +def get_device_type(handle, device, features_array=None): + """Reads a device's type. + + :see DEVICE_TYPES: + :returns: a string describing the device type, or ``None`` if the device is + not available or does not support the ``NAME`` feature. + """ + d_type = request(handle, device, FEATURE.NAME, function=b'\x20', features_array=features_array) + if d_type: + d_type = ord(d_type[0]) + _l.trace1("(%d,%d) device type %d = %s", handle, device, d_type, DEVICE_TYPES[d_type]) + return DEVICE_TYPES[d_type] + + +def get_device_name(handle, device, features_array=None): + """Reads a device's name. + + :returns: a string with the device name, or ``None`` if the device is not + available or does not support the ``NAME`` feature. + """ + name_length = request(handle, device, FEATURE.NAME, features_array=features_array) + if name_length: + name_length = ord(name_length[0]) + + d_name = '' + while len(d_name) < name_length: + name_index = len(d_name) + name_fragment = request(handle, device, FEATURE.NAME, function=b'\x10', data=chr(name_index), features_array=features_array) + name_fragment = name_fragment[:name_length - len(d_name)] + d_name += name_fragment + + _l.trace1("(%d,%d) device name %s", handle, device, d_name) + return d_name + +def get_device_battery_level(handle, device, features_array=None): + """Reads a device's battery level. + """ + battery = request(handle, device, FEATURE.BATTERY, features_array=features_array) + if battery: + discharge = ord(battery[0]) + dischargeNext = ord(battery[1]) + status = ord(battery[2]) + _l.trace1("(%d:%d) battery %d%% charged, next level %d%% charge, status %d = %s", discharge, dischargeNext, status, BATTERY_STATUSES[status]) + return (discharge, dischargeNext, status) diff --git a/lib/unittest.sh b/lib/unittest.sh new file mode 100755 index 00000000..3f582e3d --- /dev/null +++ b/lib/unittest.sh @@ -0,0 +1,6 @@ +#!/bin/sh + +cd `dirname "$0"` +export LD_LIBRARY_PATH=$PWD + +exec python -Qnew -m unittest discover -v "$@" diff --git a/solar b/solaar similarity index 57% rename from solar rename to solaar index 231e2324..37c85b0b 100755 --- a/solar +++ b/solaar @@ -4,6 +4,5 @@ cd `dirname "$0"` export LD_LIBRARY_PATH=$PWD/lib export PYTHONPATH=$PWD/lib -export PYTHONWARNINGS=all -exec python -OO -tt -u -3 solar.py "$@" +exec python -OO -Qnew solaar.py "$@" diff --git a/solaar.py b/solaar.py new file mode 100644 index 00000000..8b1f7112 --- /dev/null +++ b/solaar.py @@ -0,0 +1,181 @@ +#!/usr/bin/env python + +import logging +logging.basicConfig(level=logging.DEBUG) +logging.captureWarnings(True) + +import time +import threading + +from gi.repository import GObject +from gi.repository import Gtk + +from logitech.unifying_receiver import api as ur +from logitech.unifying_receiver.listener import EventsListener + +from logitech.devices import * + + +# +# A few constants +# + +APP_TITLE = 'Solaar' + +_STATUS_TIMEOUT = 31 # seconds +_ICON_UPDATE_SLEEP = 13 # seconds + + +# +# +# + + +try: + import notify2 + notify2.init(APP_TITLE) + def notify_desktop(status_code, text): + notification = notify2.Notification(APP_TITLE, text) + notification.show() +except ImportError: + def notify_desktop(status_code, text): + pass + + +# +# +# + + +class StatusThread(threading.Thread): + def __init__(self, status_icon): + super(StatusThread, self).__init__(name='StatusThread') + self.daemon = True + self.status_icon = status_icon + + StatusThread.listener = None + StatusThread.devices = {} + StatusThread.statuses = {} + + def run(self): + while True: + if self.listener is None: + receiver = ur.open() + if receiver: + for devinfo in ur.list_devices(receiver): + self.devices[devinfo.number] = devinfo + self.listener = EventsListener(receiver, self.events_callback) + logging.info("started events listener %s", self.listener) + self.listener.start() + elif not self.listener.active: + logging.info("stopped events listener %s", self.listener) + self.listener = None + self.devices.clear() + self.statuses.clear() + + update_icon = True + if self.listener and self.devices: + update_icon &= self.update_statuses() + + if update_icon: + GObject.idle_add(self.update_status_icon) + + time.sleep(_ICON_UPDATE_SLEEP) + + def update_statuses(self): + updated = False + + for devinfo in self.devices.values(): + if devinfo.number not in self.statuses: + self.statuses[devinfo.number] = [0, None, None] + + last_status_time = self.statuses[devinfo.number][0] + if time.time() - last_status_time > _STATUS_TIMEOUT: + status = request_status(devinfo, self.listener) + updated |= self.device_status_changed(devinfo, status) + + return updated + + def events_callback(self, code, device, data): + updated = False + + if device in self.devices: + devinfo = self.devices[device] + if code == 0x10 and data[0] == 'b\x8F': + updated = True + self.device_status_changed(devinfo, DEVICE_STATUS.UNAVAILABLE) + elif code == 0x11: + status = process_event(devinfo, self.listener, data) + updated |= self.device_status_changed(devinfo, status) + else: + logging.warn("unknown event code %02x", code) + elif device: + logging.debug("got event (%d, %d, %s) for new device", code, device, data.encode('hex')) + devinfo = ur.get_device_info(self.listener.receiver, device) + if devinfo: + self.devices[device] = devinfo + self.statuses[device] = [0, None, None] + else: + logging.warn("got event (%d, %d, %s) for unknown device", code, device, data.encode('hex')) + else: + logging.warn("don't know how to handle event (%d, %d, %s)", code, device, data.encode('hex')) + + if updated: + GObject.idle_add(self.update_status_icon) + + def device_status_changed(self, devinfo, status): + if status is None or devinfo.number not in self.statuses: + return False + + if type(status) == int: + status_code = status + status_text = DEVICE_STATUS_NAME[status_code] + else: + status_code = status[0] + status_text = DEVICE_STATUS_NAME[status_code] if status[1] is None else status[1] + + device_status = self.statuses[devinfo.number] + old_status_code = device_status[1] + + device_status[0] = time.time() + device_status[1] = status_code + device_status[2] = status_text + + if old_status_code != status_code: + logging.debug("device status changed from %s => %s: %s", old_status_code, status_code, status_text) + notify_desktop(status_code, devinfo.name + ' ' + status_text) + + return True + + def update_status_icon(self): + if self.listener: + all_statuses = [] + for d in self.devices: + devinfo = self.devices[d] + status_text = self.statuses[d][2] + if status_text: + all_statuses.append(devinfo.name + ' ' + status_text) + else: + all_statuses.append(devinfo.name + ' present') + + if all_statuses: + tooltip = '\n'.join(all_statuses) + else: + tooltip = 'No devices attached.' + else: + tooltip = 'Unifying Receiver not found.' + + # logging.debug("tooltip %s", tooltip) + self.status_icon.set_tooltip_text(tooltip) + + +if __name__ == '__main__': + status_icon = Gtk.StatusIcon.new_from_file('images/icon.png') + status_icon.set_title(APP_TITLE) + status_icon.set_name(APP_TITLE) + status_icon.set_tooltip_text('Initializing...') + status_icon.connect('popup_menu', Gtk.main_quit) + + GObject.threads_init() + StatusThread(status_icon).start() + Gtk.main() diff --git a/solar.py b/solar.py deleted file mode 100644 index da6ccdac..00000000 --- a/solar.py +++ /dev/null @@ -1,172 +0,0 @@ -#!/usr/bin/env python - -import logging -logging.basicConfig(level=1) -logging.captureWarnings(True) - -import time -import threading -import subprocess -from collections import namedtuple - -from gi.repository import GObject -from gi.repository import Gtk - -from logitech.unifying_receiver import api as ur - - -# -# A few constants -# - -KEYBOARD_NAME = 'Wireless Solar Keyboard K750' -TITLE = 'Solar [K750]' - -NOTIFY_DESKTOP = True - -OK = 0 -NO_RECEIVER = 1 -NO_K750 = 2 -NO_STATUS = 3 - -SLEEP = (10, 5, 5, 15) -TEXT = ('K750 keyboard connected', - 'Logitech Unifying Receiver not detected', - 'K750 keyboard not detected', - 'K750 keyboard not responding') -# ICON_NAMES = ( -# 'status_good', 'status_attention', -# 'status_attention', 'status_warning' -# ) -CHARGE_LUX_TEXT = 'Charge: %d%% Lux: %d' - - -K750_Status = namedtuple('K750_Status', - ['receiver', 'device', 'status', 'charge', 'lux']) - - -# -# -# - - -# _unhandled_queue = [] - -# def unhandled_messages_hook(code, device, data): -# if len(_unhandled_queue) > 32: -# del _unhandled_queue[:] -# _unhandled_queue.append((code, device, data)) - -# from logitech.unifying_receiver import unhandled -# unhandled.set_unhandled_hook(unhandled_messages_hook) - - -# -# -# - - -def notify_desktop(status_code, text): - global NOTIFY_DESKTOP - if NOTIFY_DESKTOP: - try: - subprocess.call(('notify-send', '-u', 'low', TITLE, text)) - except OSError: - NOTIFY_DESKTOP = False - - -def update_status_icon(status_icon, status_changed, k750): - print "update status", status_changed, k750 - - text = TEXT[k750.status] - if k750.status == OK: - text += '\n' + (CHARGE_LUX_TEXT % (k750.charge, k750.lux)) - # print text - status_icon.set_tooltip_text(text) - - if status_changed: - notify_desktop(k750.status, text) - - -def read_charge(receiver, device): - status, charge, lux = NO_RECEIVER, -1, -1 - - if receiver is None: - receiver = ur.open() - device = None - - if receiver and not device: - try: - device = ur.find_device_by_name(receiver, KEYBOARD_NAME) - except ur.NoReceiver: - receiver = None - - if receiver and device: - feature_solar_index = device.features_array.index(ur.FEATURE.SOLAR_CHARGE) - - event = None - for i in range(0, 20): - next_event = ur.base.read(receiver, ur.base.DEFAULT_TIMEOUT * 2 // i if i > 0 else ur.base.DEFAULT_TIMEOUT * 3) - if not next_event: - break - if next_event[1] == device.number: - if next_event[0] == 0x10 and next_event[2][0] == b'\x8F': - event = next_event - elif next_event[0] == 0x11 and next_event[2][0] == chr(feature_solar_index) and next_event[2][7:11] == b'GOOD': - if next_event[2][1] == b'\x10': - event = next_event - elif next_event[2][1] == b'\x00' or next_event[2][1] == b'\x20': - event = next_event - - if event is None: - try: - reply = ur.request(receiver, device.number, ur.FEATURE.SOLAR_CHARGE, function=b'\x03', params=b'\x78\x01', features_array=device.features_array) - if reply is None: - status = NO_K750 - device = None - else: - return read_charge(receiver, device) - except ur.NoReceiver: - receiver = None - device = None - else: - if event[1] == 0x10: - status = NO_K750 - device = None - else: - status = OK - charge = ord(event[2][2]) - if event[2][1] == b'\x10': - lux = (ord(event[2][3]) << 8) + ord(event[2][4]) - - return K750_Status(receiver, device, status, charge, lux) - - -class StatusThread(threading.Thread): - def __init__(self, status_icon): - super(StatusThread, self).__init__() - self.daemon = True - self.status_icon = status_icon - - def run(self): - last_status = NO_RECEIVER - k750 = K750_Status(None, None, NO_RECEIVER, 0, 0) - - while True: - k750 = read_charge(k750.receiver, k750.device) - status_changed = k750.status != last_status - GObject.idle_add(update_status_icon, self.status_icon, status_changed, k750) - last_status = k750.status - time.sleep(SLEEP[k750.status]) - - -if __name__ == "__main__": - status_icon = Gtk.StatusIcon.new_from_file('images/icon.png') - status_icon.set_title(TITLE) - status_icon.set_name(TITLE) - status_icon.set_tooltip_text('Initializing...') - status_icon.connect("popup_menu", Gtk.main_quit) - - GObject.threads_init() - StatusThread(status_icon).start() - Gtk.main() diff --git a/unittest.sh b/unittest.sh deleted file mode 100755 index 84e82a5b..00000000 --- a/unittest.sh +++ /dev/null @@ -1,10 +0,0 @@ -#!/bin/sh - -cd `dirname "$0"` - -export LD_LIBRARY_PATH=$PWD/lib -export PYTHONPATH=$PWD/lib -export PYTHONDONTWRITEBYTECODE=true -export PYTHONWARNINGS=all - -exec python -m unittest discover -v "$@"