solaar app functional now

This commit is contained in:
Daniel Pavel 2012-09-27 14:55:38 +03:00
parent ebe8320f2e
commit 05560d8af4
24 changed files with 1577 additions and 218 deletions

View File

@ -0,0 +1,24 @@
#
#
#
from . import k750
from .constants import *
_REQUEST_STATUS_FUNCTIONS = {
k750.NAME : k750.request_status,
}
def request_status(devinfo, listener):
if devinfo.name in _REQUEST_STATUS_FUNCTIONS:
return _REQUEST_STATUS_FUNCTIONS[devinfo.name](devinfo, listener)
_PROCESS_EVENT_FUNCTIONS = {
k750.NAME : k750.process_event,
}
def process_event(devinfo, listener, data):
if devinfo.name in _PROCESS_EVENT_FUNCTIONS:
return _PROCESS_EVENT_FUNCTIONS[devinfo.name](devinfo, listener, data)

View File

@ -0,0 +1,21 @@
#
#
#
DEVICE_STATUS = type('DEVICE_STATUS', (),
dict(
UNKNOWN=None,
UNAVAILABLE=-1,
CONNECTED=0,
ACTIVE=1,
))
from collections import defaultdict
DEVICE_STATUS_NAME = defaultdict(lambda x: None)
DEVICE_STATUS_NAME[DEVICE_STATUS.UNAVAILABLE] = 'not available'
DEVICE_STATUS_NAME[DEVICE_STATUS.CONNECTED] = 'connected'
del defaultdict

View File

@ -0,0 +1,74 @@
#
# Functions that are specific to the K750 solar keyboard.
#
import logging
from ..unifying_receiver import api as _api
from .constants import *
#
#
#
NAME = 'Wireless Solar Keyboard K750'
_STATUS_NAMES = ('excellent', 'good', 'okay', 'poor')
_CHARGE_LIMITS = (75, 40, 20, -1)
_LIGHTING_LIMITS = (450, 310, 190, -1)
#
#
#
def _trigger_solar_charge_events(receiver, devinfo):
return _api.request(receiver, devinfo.number,
feature=_api.FEATURE.SOLAR_CHARGE, function=b'\x03', params=b'\x78\x01',
features_array=devinfo.features_array)
def _charge_status(data):
charge = ord(data[2])
lux = (ord(data[3]) << 8) + ord(data[4])
for i in range(0, len(_CHARGE_LIMITS)):
if charge >= _CHARGE_LIMITS[i]:
charge_index = i
break
if lux == 0:
return 0x10 << charge_index, '\n\tCharge %d%% (%s)' % (charge, _STATUS_NAMES[charge_index])
for i in range(0, len(_CHARGE_LIMITS)):
if lux > _LIGHTING_LIMITS[i]:
lighting_index = i
break
return 0x10 << charge_index, '\n\tCharge %d%% (%s), Lighting %s (%d lux)' % (
charge, _STATUS_NAMES[charge_index], _STATUS_NAMES[lighting_index], lux)
def request_status(devinfo, listener):
reply = listener.request(_trigger_solar_charge_events, devinfo)
if reply is None:
return DEVICE_STATUS.UNAVAILABLE
def process_event(devinfo, listener, data):
if data[:2] == b'\x05\x00':
# wireless device status
if data[2:5] == b'\x01\x01\x01':
logging.debug("Keyboard just started")
return DEVICE_STATUS.CONNECTED
elif data[:2] == b'\x09\x00' and data[7:11] == b'GOOD':
return _charge_status(data)
elif data[:2] == b'\x09\x10' and data[7:11] == b'GOOD':
return _charge_status(data)
elif data[:2] == b'\x09\x20':
logging.debug("Solar key pressed")
if _trigger_solar_charge_events(listener.receiver, devinfo) is None:
return DEVICE_STATUS.UNAVAILABLE
return _charge_status(data)

View File

@ -3,9 +3,6 @@
#
import logging
_LOG_LEVEL = 5
_l = logging.getLogger('logitech.unifying_receiver.api')
_l.setLevel(_LOG_LEVEL)
from .constants import *
from .exceptions import *
@ -13,20 +10,40 @@ from . import base
from .unhandled import _publish as _unhandled_publish
_LOG_LEVEL = 5
_l = logging.getLogger('logitech.unifying_receiver.api')
#
#
#
from collections import namedtuple
"""Tuple returned by list_devices and find_device_by_name."""
AttachedDeviceInfo = namedtuple('AttachedDeviceInfo', [
'number',
'type',
'name',
'firmware',
'features_array'])
"""Firmware information."""
FirmwareInfo = namedtuple('FirmwareInfo', [
'level',
'type',
'name',
'version',
'build',
'extras'])
def _makeFirmwareInfo(level, type, name=None, version=None, build=None, extras=None):
return FirmwareInfo(level, type, name, version, build, extras)
del namedtuple
#
#
#
@ -142,17 +159,14 @@ def find_device_by_name(handle, device_name):
:returns: an AttachedDeviceInfo tuple, or ``None``.
"""
_l.log(_LOG_LEVEL, "(%d:,) searching for device '%s'", handle, device_name)
_l.log(_LOG_LEVEL, "(%d,) searching for device '%s'", handle, device_name)
for device in range(1, 1 + base.MAX_ATTACHED_DEVICES):
features_array = get_device_features(handle, device)
if features_array:
d_name = get_device_name(handle, device, features_array)
if d_name == device_name:
d_type = get_device_type(handle, device, features_array)
device_info = AttachedDeviceInfo(device, d_type, d_name, features_array)
_l.log(_LOG_LEVEL, "(%d:,%d) found device %s", handle, device, device_info)
return device_info
return get_device_info(handle, device, device_name=d_name, features_array=features_array)
def list_devices(handle):
@ -160,22 +174,36 @@ def list_devices(handle):
:returns: a list of AttachedDeviceInfo tuples.
"""
_l.log(_LOG_LEVEL, "(%d:,) listing all devices", handle)
_l.log(_LOG_LEVEL, "(%d,) listing all devices", handle)
devices = []
for device in range(1, 1 + base.MAX_ATTACHED_DEVICES):
features_array = get_device_features(handle, device)
if features_array:
d_type = get_device_type(handle, device, features_array)
d_name = get_device_name(handle, device, features_array)
device_info = AttachedDeviceInfo(device, d_type, d_name, features_array)
_l.log(_LOG_LEVEL, "(%d:,%d) found device %s", handle, device, device_info)
devices.append(device_info)
devices.append(get_device_info(handle, device, features_array=features_array))
return devices
def get_device_info(handle, device, device_name=None, features_array=None):
"""Gets the complete info for a device.
:returns: an AttachedDeviceInfo tuple, or ``None``.
"""
if features_array is None:
features_array = get_device_features(handle, device)
if features_array is None:
return None
d_type = get_device_type(handle, device, features_array)
d_name = get_device_name(handle, device, features_array) if device_name is None else device_name
d_firmware = get_device_firmware(handle, device, features_array)
devinfo = AttachedDeviceInfo(device, d_type, d_name, d_firmware, features_array)
_l.log(_LOG_LEVEL, "(%d,%d) found device %s", handle, device, devinfo)
return devinfo
def get_feature_index(handle, device, feature):
"""Reads the index of a device's feature.
@ -265,28 +293,28 @@ def get_device_firmware(handle, device, features_array=None):
for index in range(0, fw_count):
fw_info = request(handle, device, FEATURE.FIRMWARE, function=b'\x10', params=chr(index), features_array=features_array)
if fw_info:
fw_type = ord(fw_info[0]) & 0x0F
if fw_type == 0 or fw_type == 1:
prefix = str(fw_info[1:4])
fw_level = ord(fw_info[0]) & 0x0F
if fw_level == 0 or fw_level == 1:
fw_type = FIRMWARE_TYPES[fw_level]
name = str(fw_info[1:4])
version = ( str((ord(fw_info[4]) & 0xF0) >> 4) +
str(ord(fw_info[4]) & 0x0F) +
'.' +
str((ord(fw_info[5]) & 0xF0) >> 4) +
str(ord(fw_info[5]) & 0x0F))
name = prefix + ' ' + version
build = (ord(fw_info[6]) << 8) + ord(fw_info[7])
if build:
name += ' b' + str(build)
extras = fw_info[9:].rstrip('\x00')
_l.log(_LOG_LEVEL, "(%d:%d) firmware %d = %s %s extras=%s", handle, device, fw_type, FIRMWARE_TYPES[fw_type], name, extras.encode('hex'))
fw.append((fw_type, name, build, extras))
elif fw_type == 2:
version = ord(fw_info[1])
_l.log(_LOG_LEVEL, "(%d:%d) firmware 2 = Hardware v%x", handle, device, version)
fw.append((2, version))
if extras:
fw_info = _makeFirmwareInfo(level=fw_level, type=fw_type, name=name, version=version, build=build, extras=extras)
else:
fw_info = _makeFirmwareInfo(level=fw_level, type=fw_type, name=name, version=version, build=build)
elif fw_level == 2:
fw_info = _makeFirmwareInfo(level=2, type=FIRMWARE_TYPES[2], version=ord(fw_info[1]))
else:
_l.log(_LOG_LEVEL, "(%d:%d) firmware other", handle, device)
fw.append((fw_type, ))
fw_info = _makeFirmwareInfo(level=fw_level, type=FIRMWARE_TYPES[-1])
fw.append(fw_info)
_l.log(_LOG_LEVEL, "(%d:%d) firmware %s", handle, device, fw_info)
return fw

View File

@ -4,9 +4,6 @@
#
import logging
_LOG_LEVEL = 4
_l = logging.getLogger('logitech.unifying_receiver.base')
_l.setLevel(_LOG_LEVEL)
from .constants import *
from .exceptions import *
@ -15,6 +12,10 @@ from . import unhandled as _unhandled
import hidapi as _hid
_LOG_LEVEL = 4
_l = logging.getLogger('logitech.unifying_receiver.base')
#
# These values are defined by the Logitech documentation.
# Overstepping these boundaries will only produce log warnings.

View File

@ -0,0 +1,78 @@
#
#
#
import logging
import threading
import time
from . import base
from .exceptions import *
_LOG_LEVEL = 6
_l = logging.getLogger('logitech.unifying_receiver.listener')
_EVENT_TIMEOUT = 100
_IDLE_SLEEP = 1000.0 / 1000.0
class EventsListener(threading.Thread):
def __init__(self, receiver, callback):
super(EventsListener, self).__init__(name='Unifying_Receiver_Listener_' + str(receiver))
self.receiver = receiver
self.callback = callback
self.task = None
self.task_processing = threading.Lock()
self.task_reply = None
self.task_done = threading.Event()
self.active = False
def run(self):
_l.log(_LOG_LEVEL, "(%d) starting", self.receiver)
self.active = True
while self.active:
# _l.log(_LOG_LEVEL, "(%d) reading next event", self.receiver)
event = base.read(self.receiver, _EVENT_TIMEOUT)
if event:
_l.log(_LOG_LEVEL, "(%d) got event %s", self.receiver, event)
self.callback.__call__(*event)
elif self.task is None:
# _l.log(_LOG_LEVEL, "(%d) idle sleep", self.receiver)
time.sleep(_IDLE_SLEEP)
else:
self.task_reply = self._make_request(*self.task)
self.task_done.set()
def stop(self):
_l.log(_LOG_LEVEL, "(%d) stopping", self.receiver)
self.active = False
self.join()
def request(self, api_function, *args, **kwargs):
# _l.log(_LOG_LEVEL, "(%d) request '%s' with %s, %s", self.receiver, api_function.__name__, args, kwargs)
self.task_processing.acquire()
self.task_done.clear()
self.task = (api_function, args, kwargs)
self.task_done.wait()
reply = self.task_reply
self.task = self.task_reply = None
self.task_processing.release()
# _l.log(_LOG_LEVEL, "(%d) request '%s' => [%s]", self.receiver, api_function.__name__, reply.encode('hex'))
if isinstance(reply, Exception):
raise reply
return reply
def _make_request(self, api_function, args, kwargs):
_l.log(_LOG_LEVEL, "(%d) calling '%s' with %s, %s", self.receiver, api_function.__name__, args, kwargs)
try:
return api_function.__call__(self.receiver, *args, **kwargs)
except NoReceiver as nr:
self.task_reply = nr
self.active = False
except Exception as e:
self.task_reply = e

View File

@ -15,6 +15,7 @@ class Test_UR_API(unittest.TestCase):
cls.handle = None
cls.device = None
cls.features_array = None
cls.device_info = None
@classmethod
def tearDownClass(cls):
@ -22,6 +23,7 @@ class Test_UR_API(unittest.TestCase):
api.close(cls.handle)
cls.device = None
cls.features_array = None
cls.device_info = None
def test_00_open_receiver(self):
Test_UR_API.handle = api.open()
@ -92,8 +94,10 @@ class Test_UR_API(unittest.TestCase):
self.fail("no feature set available")
d_firmware = api.get_device_firmware(self.handle, self.device, self.features_array)
self.assertIsNotNone(d_firmware, "failed to get device type")
self.assertGreater(len(d_firmware), 0, "empty device type")
self.assertIsNotNone(d_firmware, "failed to get device firmware")
self.assertGreater(len(d_firmware), 0, "device reported no firmware")
for fw in d_firmware:
self.assertIsInstance(fw, api.FirmwareInfo)
def test_52_get_device_type(self):
if self.handle is None:
@ -119,6 +123,19 @@ class Test_UR_API(unittest.TestCase):
self.assertIsNotNone(d_name, "failed to read device name")
self.assertGreater(len(d_name), 0, "empty device name")
def test_59_get_device_info(self):
if self.handle is None:
self.fail("No receiver found")
if self.device is None:
self.fail("Found no devices attached.")
if self.features_array is None:
self.fail("no feature set available")
device_info = api.get_device_info(self.handle, self.device, features_array=self.features_array)
self.assertIsNotNone(device_info, "failed to read full device info")
self.assertIsInstance(device_info, api.AttachedDeviceInfo)
Test_UR_API.device_info = device_info
def test_60_get_battery_level(self):
if self.handle is None:
self.fail("No receiver found")
@ -131,7 +148,7 @@ class Test_UR_API(unittest.TestCase):
battery = api.get_device_battery_level(self.handle, self.device, self.features_array)
self.assertIsNotNone(battery, "failed to read battery level")
except FeatureNotSupported:
self.fail("BATTERY feature not supported by device " + str(self.device))
self.fail("FEATURE.BATTERY not supported by device " + str(self.device) + ": " + str(self.device_info))
def test_70_list_devices(self):
if self.handle is None:

View File

@ -0,0 +1,363 @@
"""Low-level interface for devices connected through a Logitech Universal
Receiver (UR).
Uses the HID api exposed through hidapi.py.
Incomplete. Based on a bit of documentation, trial-and-error, and guesswork.
In the context of this API, 'device' is the number (1..6 according to the
documentation) of the device attached to the UR.
References:
http://julien.danjou.info/blog/2012/logitech-k750-linux-support
http://6xq.net/git/lars/lshidpp.git/plain/doc/
"""
#
# Logging set-up.
# Add a new logging level for tracing low-level writes and reads.
#
import logging
import threading
from . import hidapi
LOG_LEVEL = 1
def _urll_trace(self, msg, *args):
if self.isEnabledFor(LOG_LEVEL):
args = (None if x is None
else x.encode('hex') if type(x) == str and any(c < '\x20' or c > '\x7E' for c in x)
else x
for x in args)
self.log(LOG_LEVEL, msg, *args)
logging.addLevelName(LOG_LEVEL, 'trace1')
logging.Logger.trace1 = _urll_trace
_log = logging.getLogger('logitech.ur_lowlevel')
_log.setLevel(LOG_LEVEL)
#
#
#
"""Possible features available on a Logitech device.
A particular device might not support all these features, and may support other
unknown features as well.
"""
FEATURE = type('FEATURE', (),
dict(
ROOT=b'\x00\x00',
FEATURE_SET=b'\x00\x01',
FIRMWARE=b'\x00\x03',
NAME=b'\x00\x05',
BATTERY=b'\x10\x00',
REPROGRAMMABLE_KEYS=b'\x1B\x00',
WIRELESS_STATUS=b'\x1D\x4B',
# UNKNOWN_1=b'\x1D\xF3',
# UNKNOWN_2=b'\x40\xA0',
# UNKNOWN_3=b'\x41\x00',
SOLAR_CHARGE=b'\x43\x01',
# UNKNOWN_4=b'\x45\x20',
))
"""Possible types of devices connected to an UR."""
DEVICE_TYPES = ("Keyboard", "Remote Control", "NUMPAD", "Mouse",
"Touchpad", "Trackball", "Presenter", "Receiver")
"""Default timeout on read (in ms)."""
DEFAULT_TIMEOUT = 1000
"""Minimum size of a reply data packet."""
_MIN_REPLY_SIZE = 7
"""Maximum size of a reply data packet."""
_MAX_REPLY_SIZE = 32
class NoReceiver(Exception):
"""May be raised when trying to talk through a previously connected
receiver that is no longer available."""
pass
#
#
#
class Receiver(threading.Thread):
def __init__(self, handle, path, timeout=DEFAULT_TIMEOUT):
super(Receiver, self).__init__(name='Unifying_Receiver_' + path)
self.handle = handle
self.path = path
self.timeout = timeout
self.read_data = None
self.data_available = threading.Event()
self.devices = {}
self.hooks = {}
self.active = True
self.start()
def __del__(self):
self.close()
def close(self):
self.active = False
try:
hidapi.close(self.handle)
_log.trace1("|%s:| closed", self.path)
return True
except Exception as e:
_log.warn("|%s:| closing: %s", self.path, e)
self.hooks = None
self.devices = None
def run(self):
while self.active:
data = hidapi.read(self.handle, _MAX_REPLY_SIZE, self.timeout)
if self.active and data:
_log.trace1("|%s|*| => r[%s]", self.path, data)
if len(data) < _MIN_REPLY_SIZE:
_log.trace1("|%s|*| => r[%s] short read", self.path, data)
if len(data) > _MAX_REPLY_SIZE:
_log.trace1("|%s|*| => r[%s] long read", self.path, data)
if not self._dispatch_to_hooks(data):
self.read_data = data
self.data_available.set()
def _dispatch_to_hooks(self, data):
if data[0] == b'\x11':
for key in self.hooks:
if key == data[1:3]:
self.hooks[key].__call__(data[3:])
return True
def set_hook(self, device, feature_index, function=b'\x00', callback=None):
key = '%c%s%c' % (device, feature_index, function)
if callback is None:
if key in self.hooks:
del self.hooks[key]
else:
self.hooks[key] = callback
return True
def _write(self, device, data):
wdata = b'\x10' + chr(device) + data + b'\x00' * (5 - len(data))
if hidapi.write(self.handle, wdata):
_log.trace1("|%s|%d| <= w[%s]", self.path, device, wdata)
return True
else:
_log.trace1("|%s|%d| <= w[%s] failed ", self.path, device, wdata)
raise NoReceiver()
def _read(self, device, feature_index=None, function=None, timeout=DEFAULT_TIMEOUT):
while True:
self.data_available.wait()
data = self.data
self.data_available.clear()
if data[1] == chr(device):
if feature_index is None or data[2] == feature_index:
if function is None or data[3] == function:
return data
_log.trace1("|%s:| ignoring read data [%s]", self.path, data)
def _request(self, device, feature_index, function=b'\x00', data=b''):
self._write(device, feature_index + function + data)
return self._read(device, feature_index, function)
def _request_direct(self, device, feature_index, function=b'\x00', data=b''):
self._write(device, feature_index + function + data)
while True:
data = hidapi.read(self.handle, _MAX_REPLY_SIZE, self.timeout)
if not data:
continue
if data[1] == chr(device) and data[2] == feature_index and data[3] == function:
return data
def ping(self, device):
"""Pings a device to check if it is attached to the UR.
:returns: True if the device is connected to the UR, False if the device is
not attached, None if no conclusive reply is received.
"""
if self._write(device, b'\x00\x10\x00\x00\xAA'):
while True:
reply = self._read(device, timeout=DEFAULT_TIMEOUT*3)
# ping ok
if reply[0] == b'\0x11' and reply[1] == chr(device):
if reply[2:4] == b'\x00\x10' and reply[6] == b'\xAA':
_log.trace1("|%s|%d| ping: ok %s", self.path, device, reply[2])
return True
# ping failed
if reply[0] == b'\0x10' and reply[1] == chr(device):
if reply[2:4] == b'\x8F\x00':
_log.trace1("|%s|%d| ping: device not present", self.path, device)
return False
_log.trace1("|%s|%d| ping: unknown reply", self.path, device, reply)
def scan_devices(self):
for device in range(1, 7):
self.get_device(device)
return self.devices.values()
def get_device(self, device, query=True):
if device in self.devices:
value = self.devices[device]
_log.trace1("|%s:%d| device info %s", self.path, device, value)
return value
if query and self.ping(device):
d_type = self.get_type(device)
d_name = self.get_name(device)
features_array = self._get_features(device)
value = (d_type, d_name, features_array)
self.devices[device] = value
_log.trace1("|%s:%d| device info %s", self.path, device, value)
return value
_log.trace1("|%s:%d| device not found", self.path, device)
def _get_feature_index(self, device, feature):
"""Reads the index of a device's feature.
:returns: An int, or None if the feature is not available.
"""
_log.trace1("|%s|%d| get feature index <%s>", self.path, device, feature)
reply = self._request(device, b'\x00', b'\x00', feature)
# only consider active and supported features
if ord(reply[4]) and ord(reply[5]) & 0xA0 == 0:
_log.trace1("|%s|%d| feature <%s> has index %s", self.path, device, feature, reply[4])
return ord(reply[4])
_log.trace1("|%s|%d| feature <%s> not available", self.path, device, feature)
def _get_features(self, device):
"""Returns an array of feature ids.
Their position in the array is the index to be used when accessing that
feature on the device.
Only call this function in the initial set-up of the device, because
other messages and events not related to querying the feature set
will be ignored.
"""
_log.trace1("|%s|%d| get device features", self.path, device)
# get the index of the FEATURE_SET
fs_index = self._get_feature_index(device, FEATURE.FEATURE_SET)
fs_index = chr(fs_index)
# Query all the available features on the device, even if unknown.
# get the number of active features the device has
features_count = self._request(device, fs_index)
features_count = ord(features_count[4])
_log.trace1("|%s|%d| found %d features", self.path, device, features_count)
# a device may have a maximum of 15 features
features = [None] * 0x10
for index in range(1, 1 + features_count):
# for each index, get the feature residing at that index
feature = self._request(device, fs_index, b'\x10', chr(index))
features[index] = feature[4:6].upper()
_log.trace1("|%s|%d| feature <%s> at index %d", self.path, device, features[index], index)
return None if all(c is None for c in features) else features
def get_type(self, device):
if device in self.devices:
return self.devices[device][0]
dnt_index = self._get_feature_index(device, FEATURE.NAME)
dnt_index = chr(dnt_index)
d_type = self._request(device, dnt_index, b'\x20')
d_type = ord(d_type[4])
return DEVICE_TYPES[d_type]
def get_name(self, device):
if device in self.devices:
return self.devices[device][1]
dnt_index = self._get_feature_index(device, FEATURE.NAME)
dnt_index = chr(dnt_index)
self._write(device, dnt_index)
name_length = self._read(device, dnt_index, b'\x00')
name_length = ord(name_length[4])
d_name = ''
while len(d_name) < name_length:
name_index = len(d_name)
name_fragment = self._request(device, dnt_index, b'\x10', chr(name_index))
name_fragment = name_fragment[:name_length - len(d_name)]
d_name += name_fragment
return d_name
def open():
"""Opens the first Logitech UR found attached to the machine.
:returns: A Receiver object for the found receiver, or ``None``.
"""
# USB ids for (Logitech, Unifying Receiver)
# interface 2 if the actual receiver interface
for rawdevice in hidapi.enumerate(0x046d, 0xc52b, 2):
_log.trace1("checking %s", rawdevice)
receiver = hidapi.open_path(rawdevice.path)
if not receiver:
# could be a file permissions issue
# in any case, unreachable
_log.trace1("[%s] open failed", rawdevice.path)
continue
_log.trace1("[%s] receiver handle %d", rawdevice.path, receiver)
# ping on device id 0 (always an error)
hidapi.write(receiver, b'\x10\x00\x00\x10\x00\x00\xAA')
# if this is the right hidraw device, we'll receive a 'bad subdevice'
# otherwise, the read should produce nothing
reply = hidapi.read(receiver, _MAX_REPLY_SIZE, DEFAULT_TIMEOUT)
if reply:
_log.trace1("[%s] receiver %d exploratory ping reply [%s]", rawdevice.path, receiver, reply)
if reply[:4] == b'\x10\x00\x8F\x00':
# 'device 0 unreachable' is the expected reply from a valid receiver handle
_log.trace1("[%s] success: found receiver with handle %d", rawdevice.path, receiver)
return Receiver(receiver, rawdevice.path)
if reply == b'\x01\x00\x00\x00\x00\x00\x00\x00':
# no idea what this is, but it comes up occasionally
_log.trace1("[%s] receiver %d mistery reply", rawdevice.path, receiver)
else:
_log.trace1("[%s] receiver %d unknown reply", rawdevice.path, receiver)
else:
_log.trace1("[%s] receiver %d no reply", rawdevice.path, receiver)
pass
# ignore
hidapi.close(receiver)

216
lib/logitech/ur_queue2.py Normal file
View File

@ -0,0 +1,216 @@
"""A few functions to deal with the Logitech Universal Receiver.
It is assumed a single UR device is attached to the machine.
Uses hidapi.
"""
import logging
import threading
from . import ur_lowlevel as urll
from urll import FEATURE
_log = logging.getLogger('logitech.ur')
_log.setLevel(logging.DEBUG)
# class NoDevice(Exception):
# """May be thrown when trying to talk through a previously present device
# that is no longer available."""
# pass
class _EventQueue(threading.Thread):
def __init__(self, receiver, timeout=urll.DEFAULT_TIMEOUT):
super(_EventQueue, self).__init__()
self.daemon = True
self.receiver = receiver
self.timeout = timeout
self.active = True
def stop(self):
self.active = False
self.join()
def run(self):
while self.active:
data = urll.read(self.receiver.handle, self.timeout)
if not self.active:
# in case the queue has been stopped while reading
break
if data:
self.receiver._dispatch(*data)
class Receiver:
def __init__(self, path, handle=None, timeout=urll.DEFAULT_TIMEOUT):
self.path = path
self.handle = handle
self.DEVICE_FEATURES = {}
self.hooks = {}
self.event_queue = _EventQueue(self.handle, timeout)
self.event_queue.start()
def close(self):
self.event_queue.stop()
self.event_queue = None
urll.close(self.handle)
self.handle = None
self.hooks = {}
self.DEVICE_FEATURES = {}
def ping(self, device):
reply = self.event_queue.req()
if not urll.write(self.handle, device, '\x00\x10\x00\x00\xAA'):
# print "write failed",
return False
reply = urll.read(self.handle, device)
if not reply:
# print "no data",
return False
# 10018f00100900
if ord(reply[0]) == 0x10:
if ord(reply[2]) == 0x8F:
# print "invalid",
return False
# 110100100200aa00000000000000000000000000
if ord(reply[0]) == 0x11:
if reply[2:4] == "\x00\x10" and reply[6] == "\xAA":
# success
return True
# print "unknown"
return False
def hook(self, device, feature, function=None, callback=None):
features = self.DEVICE_FEATURES[device]
if feature not in features:
raise Exception("feature " + feature + " not supported by device")
feature_index = features.index(feature)
key = (device, feature_index, function, callback)
if key not in self.hooks:
self.hooks[key] = []
if callback is None:
if callback in self.hooks[key]:
self.hooks[key].remove(callback)
else:
self.hooks[key].append(callback)
def _dispatch(self, status, device, data):
_log.debug("incoming event %2x:%2x:%s", status, device, data.encode('hex'))
dispatched = False
for (key, callback) in self.hooks.items():
if key[0] == device and key[1] == ord(data[0]):
if key[2] is not None and key[2] == data[1] & 0xFF:
callback.__call__(data)
if not dispatched:
_log.debug("ignored incoming event %2x:%2x:%s",
status, device, data.encode('hex'))
def _request(self, device, data=''):
if urll.write(self.handler, device, data):
pass
def find_device(self, device_type=None, name=None):
"""Gets the device number for the first device matching.
The device type and name are case-insensitive.
"""
# Apparently a receiver supports up to 6 devices.
for device in range(1, 7):
if self.ping(device):
if device not in self.DEVICE_FEATURES:
self.DEVICE_FEATURES[device] = \
urll.get_device_features(self.handle, device)
# print get_reprogrammable_keys(receiver, device)
# d_firmware = get_firmware_version(receiver, device)
# print "device", device, "[", d_name, "/", d_type, "]"
# print "firmware", d_firmware, "features", _DEVICE_FEATURES[device]
if device_type:
d_type = self.get_type(device)
if d_type is None or device_type.lower() != d_type.lower():
continue
if name:
d_name = self.get_name(device)
if d_name is None or name.lower() != d_name.lower():
continue
return device
def get_type(self, device):
reply = self._request(device, FEATURE.GET_NAME, '\x20')
if reply:
return DEVICE_TYPES[ord(reply[2][2])]
def get_name(self, device):
reply = self._request(device, FEATURE.GET_NAME)
if reply:
charcount = ord(reply[4])
name = ''
index = 0
while len(name) < charcount:
reply = self._request(device, FEATURE.NAME, '\x10', chr(index))
if reply:
name += reply[4:4 + charcount - index]
index = len(name)
else:
break
return name
def get_firmware_version(self, device, firmware_type=0):
reply = self._request(device,
FEATURE.FIRMWARE, '\x10', chr(firmware_type))
if reply:
return '%s %s.%s' % (reply[5:8],
reply[8:10].encode('hex'), reply[10:12].encode('hex'))
def get_battery_level(self, device):
reply = self._request(device, FEATURE.BATTERY)
if reply:
return (ord(reply[4]), ord(reply[5]), ord(reply[6]))
def get_reprogrammable_keys(self, device):
count = self._request(device, FEATURE.REPROGRAMMABLE_KEYS)
if count:
keys = []
for index in range(ord(count[4])):
key = self._request(device,
FEATURE.REPROGRAMMABLE_KEYS, '\x10', chr(index))
keys.append(key[4:6], keys[6:8], ord(key[8]))
return keys
def get_solar_charge(self, device):
reply = self._request(device, FEATURE.SOLAR_CHARGE,
'\x03', '\x78', '\x01', reply_function='\x10')
if reply:
charge = ord(reply[4])
lux = ord(reply[5]) << 8 | ord(reply[6])
# lux = int(round(((255 * ord(reply[5])) + ord(reply[6])) / 538.0, 2) * 100)
return (charge, lux)
#
#
#
def get():
"""Gets a Receiver object for the Unifying Receiver connected to the machine.
It is assumed a single receiver is connected to the machine. If more than
one are present, the first one found will be returned.
:returns: a Receiver object, or None.
"""
receiver = urll.open()
if receiver:
return Receiver(*receiver)

533
lib/logitech/ur_test3.py Normal file
View File

@ -0,0 +1,533 @@
"""Low-level interface for devices connected through a Logitech Universal
Receiver (UR).
Uses the HID api exposed through hidapi.py.
Incomplete. Based on a bit of documentation, trial-and-error, and guesswork.
Strongly recommended to use these functions from a single thread; calling
multiple functions from different threads has a high chance of mixing the
replies and causing apparent failures.
In the context of this API, 'handle' is the open handle of UR attached to
the machine, and 'device' is the number (1..6 according to the documentation)
of the device attached to the UR.
References:
http://julien.danjou.info/blog/2012/logitech-k750-linux-support
http://6xq.net/git/lars/lshidpp.git/plain/doc/
"""
#
# Logging set-up.
# Add a new logging level for tracing low-level writes and reads.
#
import logging
LOG_LEVEL = 1
def _urll_trace(self, msg, *args):
if self.isEnabledFor(LOG_LEVEL):
args = (None if x is None
else x.encode('hex') if type(x) == str and any(c < '\x20' or c > '\x7E' for c in x)
else x
for x in args)
self.log(LOG_LEVEL, msg, *args)
logging.addLevelName(LOG_LEVEL, 'TRACE1')
logging.Logger.trace1 = _urll_trace
_l = logging.getLogger('ur_lowlevel')
_l.setLevel(LOG_LEVEL)
#
#
#
"""Possible features available on a Logitech device.
A particular device might not support all these features, and may support other
unknown features as well.
"""
FEATURE = type('FEATURE', (),
dict(
ROOT=b'\x00\x00',
FEATURE_SET=b'\x00\x01',
FIRMWARE=b'\x00\x03',
NAME=b'\x00\x05',
BATTERY=b'\x10\x00',
REPROGRAMMABLE_KEYS=b'\x1B\x00',
WIRELESS_STATUS=b'\x1D\x4B',
# UNKNOWN_1=b'\x1D\xF3',
# UNKNOWN_2=b'\x40\xA0',
# UNKNOWN_3=b'\x41\x00',
SOLAR_CHARGE=b'\x43\x01',
# UNKNOWN_4=b'\x45\x20',
))
FEATURE_NAMES = { FEATURE.ROOT: 'ROOT',
FEATURE.FEATURE_SET: 'FEATURE_SET',
FEATURE.FIRMWARE: 'FIRMWARE',
FEATURE.NAME: 'NAME',
FEATURE.BATTERY: 'BATTERY',
FEATURE.REPROGRAMMABLE_KEYS: 'REPROGRAMMABLE_KEYS',
FEATURE.WIRELESS_STATUS: 'WIRELESS_STATUS',
FEATURE.SOLAR_CHARGE: 'SOLAR_CHARGE',
}
"""Possible types of devices connected to an UR."""
DEVICE_TYPES = ("Keyboard", "Remote Control", "NUMPAD", "Mouse",
"Touchpad", "Trackball", "Presenter", "Receiver")
FIRMWARE_TYPES = ("Main (HID)", "Bootloader", "Hardware", "Other")
BATTERY_STATUSES = ("Discharging (in use)", "Recharging", "Almost full", "Full",
"Slow recharge", "Invalid battery", "Thermal error",
"Charging error")
ERROR_CODES = ("Ok", "Unknown", "Invalid argument", "Out of range",
"Hardware error", "Logitech internal", "Invalid feature index",
"Invalid function", "Busy", "Usupported")
"""Default timeout on read (in ms)."""
DEFAULT_TIMEOUT = 1000
"""Minimum size of a reply data packet."""
_MIN_REPLY_SIZE = 7
"""Maximum size of a reply data packet."""
_MAX_REPLY_SIZE = 32
#
# Exceptions that may be raised by this API.
#
class NoReceiver(Exception):
"""May be raised when trying to talk through a previously connected
receiver that is no longer available."""
pass
class FeatureNotSupported(Exception):
"""Raised when trying to request a feature not supported by the device."""
def __init__(self, device, feature):
super(FeatureNotSupported, self).__init__(device, feature, FEATURE_NAMES[feature])
self.device = device
self.feature = feature
self.feature_name = FEATURE_NAMES[feature]
#
#
#
def _default_event_hook(reply_code, device, data):
_l.trace1("EVENT_HOOK (,%d) code %d status [%s]", device, reply_code, data)
"""A function that will be called on incoming events.
It must be a function with the signature: ``_(int, int, str)``, where the
parameters are: (reply code, device number, data).
This function will be called by the request() function, when it receives replies
that do not match the write ca
"""
event_hook = _default_event_hook
def _publish_event(reply_code, device, data):
if event_hook is not None:
event_hook.__call__(reply_code, device, data)
#
# Low-level functions.
#
from . import hidapi
def open():
"""Opens the first Logitech UR found attached to the machine.
:returns: An open file handle for the found receiver, or ``None``.
"""
# USB ids for (Logitech, Unifying Receiver)
# interface 2 if the actual receiver interface
for rawdevice in hidapi.enumerate(0x046d, 0xc52b, 2):
_l.trace1("checking %s", rawdevice)
receiver = hidapi.open_path(rawdevice.path)
if not receiver:
# could be a file permissions issue
# in any case, unreachable
_l.trace1("[%s] open failed", rawdevice.path)
continue
_l.trace1("[%s] receiver handle (%d,)", rawdevice.path, receiver)
# ping on device id 0 (always an error)
hidapi.write(receiver, b'\x10\x00\x00\x10\x00\x00\xAA')
# if this is the right hidraw device, we'll receive a 'bad subdevice'
# otherwise, the read should produce nothing
reply = hidapi.read(receiver, _MAX_REPLY_SIZE, DEFAULT_TIMEOUT)
if reply:
if reply[:4] == b'\x10\x00\x8F\x00':
# 'device 0 unreachable' is the expected reply from a valid receiver handle
_l.trace1("[%s] success: handle (%d,)", rawdevice.path, receiver)
return receiver
if reply == b'\x01\x00\x00\x00\x00\x00\x00\x00':
# no idea what this is, but it comes up occasionally
_l.trace1("[%s] (%d,) mistery reply [%s]", rawdevice.path, receiver, reply)
else:
_l.trace1("[%s] (%d,) unknown reply [%s]", rawdevice.path, receiver, reply)
else:
_l.trace1("[%s] (%d,) no reply", rawdevice.path, receiver)
# ignore
close(receiver)
# hidapi.close(receiver)
return None
def close(handle):
"""Closes a HID device handle."""
if handle:
try:
hidapi.close(handle)
_l.trace1("(%d,) closed", handle)
return True
except Exception as e:
_l.debug("(%d,) closing: %s", handle, e)
return False
def write(handle, device, feature_index, function=b'\x00', param1=b'\x00', param2=b'\x00', param3=b'\x00'):
"""Write a feature call to the receiver.
:param handle: UR handle obtained with open().
:param device: attached device number
:param feature_index: index in the
"""
if type(feature_index) == int:
feature_index = chr(feature_index)
data = feature_index + function + param1 + param2 + param3
return _write(handle, device, data)
def _write(handle, device, data):
"""Writes some data to a certain device.
The first two (required) bytes of data must be the feature index for the
device, and a function code for that feature.
If the receiver is no longer available (e.g. has been physically removed
from the machine), raises NoReceiver.
"""
wdata = b'\x10' + chr(device) + data + b'\x00' * (5 - len(data))
_l.trace1("(%d,%d) <= w[%s]", handle, device, wdata)
# return hidapi.write(handle, wdata)
if not hidapi.write(handle, wdata):
_l.trace1("(%d,%d) write failed, assuming receiver has been removed", handle, device)
raise NoReceiver()
def read(handle, timeout=DEFAULT_TIMEOUT):
"""Read some data from the receiver. Usually called after a write (feature
call), to get the reply.
If any data was read in the given timeout, returns a tuple of
(reply_code, device, message data). The reply code should be ``0x11`` for a
successful feature call, or ``0x10`` to indicate some error, e.g. the device
is no longer available.
"""
data = hidapi.read(handle, _MAX_REPLY_SIZE, timeout)
if data:
_l.trace1("(%d,*) => r[%s]", handle, data)
if len(data) < _MIN_REPLY_SIZE:
_l.trace1("(%d,*) => r[%s] read short reply", handle, data)
if len(data) > _MAX_REPLY_SIZE:
_l.trace1("(%d,*) => r[%s] read long reply", handle, data)
return ord(data[0]), ord(data[1]), data[2:]
else:
_l.trace1("(%d,*) => r[]", handle)
def request(handle, device, feature, function=b'\x00', data=b'', features_array=None):
"""Makes a feature call to the device, and returns the reply data.
Basically a write() followed by (possibly multiple) reads, until a reply
matching the called feature is received. In theory the UR will always reply
to feature call; otherwise this function will wait indefinetly.
Incoming data packets not matching the feature and function will be
delivered to the event_hook (if any), and then ignored.
The optional ``features_array`` parameter is a cached result of the
get_device_features function for this device, necessary to find the feature
index. If the ``features_arrary`` is not provided, one will be obtained by
calling get_device_features.
If the feature is not supported, returns None.
"""
if features_array is None:
features_array = get_device_features(handle, device)
if features_array is None:
_l.trace1("(%d,%d) no features array available", handle, device)
return None
if feature in features_array:
feature_index = chr(features_array.index(feature))
return _request(handle, device, feature_index + function, data)
else:
_l.warn("(%d,%d) feature <%s:%s> not supported", handle, device, feature.encode('hex'), FEATURE_NAMES[feature])
raise FeatureNotSupported(device, feature)
def _request(handle, device, feature_function, data=b''):
"""Makes a feature call device and waits for a matching reply.
Only call this in the initial set-up of the device.
This function will skip all incoming messages and events not related to the
device we're requesting for, or the feature specified in the initial
request; it will also wait for a matching reply indefinetly.
:param feature_function: a two-byte string of (feature_index, function).
:param data: additional data to send, up to 5 bytes.
:returns:
"""
_l.trace1("(%d,%d) request feature %s data %s", handle, device, feature_function, data)
_write(handle, device, feature_function + data)
while True:
reply = read(handle)
if not reply:
# keep waiting...
continue
if reply[1] != device:
# this message not for the device we're interested in
_l.trace1("(%d,%d) request reply for unexpected device %s", handle, device, reply)
_publish_event(*reply)
continue
if reply[0] == 0x10 and reply[2][0] == b'\x8F':
# device not present
_l.trace1("(%d,%d) request ping failed %s", handle, device, reply)
return None
if reply[0] == 0x11 and reply[2][0] == b'\xFF' and reply[2][1:3] == feature_function:
# an error returned from the device
error = ord(reply[2][3])
_l.trace1("(%d,%d) request feature call error %d = %s: %s", handle, device, error, ERROR_CODES[error], reply)
return None
if reply[0] == 0x11 and reply[2][:2] == feature_function:
# a matching reply
_l.trace1("(%d,%d) matched reply with data [%s]", handle, device, reply[2][2:])
return reply[2][2:]
_l.trace1("(%d,%d) unmatched reply %s (expected %s)", handle, device, reply[2][:2], feature_function)
_publish_event(*reply)
def ping(handle, device):
"""Pings a device to check if it is attached to the UR.
:returns: True if the device is connected to the UR, False if the device is
not attached, None if no conclusive reply is received.
"""
def _status(reply):
if not reply:
return None
if reply[1] != device:
# oops
_l.trace1("(%d,%d) ping: reply for another device: %s", handle, device, reply)
_publish_event(*reply)
return _status(read(handle))
if (reply[0] == 0x11 and reply[2][:2] == b'\x00\x10' and reply[2][4] == b'\xAA'):
# ping ok
_l.trace1("(%d,%d) ping: ok %s", handle, device, reply[2])
return True
if (reply[0] == 0x10 and reply[2][:2] == b'\x8F\x00'):
# ping failed
_l.trace1("(%d,%d) ping: device not present", handle, device)
return False
if (reply[0] == 0x11 and reply[2][:2] == b'\x09\x00' and reply[2][7:11] == b'GOOD'):
# some devices may reply with a SOLAR_STATUS packet before the
# ping_ok reply, especially right after the device connected to the
# receiver
_l.trace1("(%d,%d) ping: solar status %s", handle, device, reply[2])
_publish_event(*reply)
return _status(read(handle))
# ugh
_l.trace1("(%d,%d) ping: unknown reply for this device", handle, device, reply)
_publish_event(*reply)
return None
_l.trace1("(%d,%d) pinging", handle, device)
_write(handle, device, b'\x00\x10\x00\x00\xAA')
return _status(read(handle, DEFAULT_TIMEOUT * 3))
def get_feature_index(handle, device, feature):
"""Reads the index of a device's feature.
:returns: An int, or ``None`` if the feature is not available.
"""
_l.trace1("(%d,%d) get feature index <%s>", handle, device, feature.encode('hex'))
feature_index = _request(handle, device, FEATURE.ROOT, feature)
if feature_index:
# only consider active and supported features
if ord(feature_index[0]) and ord(feature_index[1]) & 0xA0 == 0:
_l.trace1("(%d,%d) feature <%s> index %s", handle, device, feature.encode('hex'), feature_index[0])
return ord(feature_index[0])
_l.warn("(%d,%d) feature <%s:%s> not supported", handle, device, feature.encode('hex'), FEATURE_NAMES[feature])
raise FeatureNotSupported(device, feature)
def get_device_features(handle, device):
"""Returns an array of feature ids.
Their position in the array is the index to be used when accessing that
feature on the device.
Only call this function in the initial set-up of the device, because
other messages and events not related to querying the feature set
will be ignored.
"""
_l.trace1("(%d,%d) get device features", handle, device)
# get the index of the FEATURE_SET
fs_index = _request(handle, device, FEATURE.ROOT, FEATURE.FEATURE_SET)
if not fs_index:
_l.trace1("(%d,%d) FEATURE_SET not available", handle, device)
return None
fs_index = fs_index[0]
# For debugging purposes, query all the available features on the device,
# even if unknown.
# get the number of active features the device has
features_count = _request(handle, device, fs_index + b'\x00')
if not features_count:
# this can happen if the device disappeard since the fs_index call
_l.trace1("(%d,%d) no features available?!", handle, device)
return None
features_count = ord(features_count[0])
# a device may have a maximum of 15 features
features = [None] * 0x10
_l.trace1("(%d,%d) found %d features", handle, device, features_count)
for index in range(1, 1 + features_count):
# for each index, get the feature residing at that index
feature = _request(handle, device, fs_index + b'\x10', chr(index))
if feature:
features[index] = feature[0:2].upper()
_l.trace1("(%d,%d) feature <%s> at index %d", handle, device, features[index].encode('hex'), index)
return None if all(c == None for c in features) else features
def get_device_firmware(handle, device, features_array=None):
"""Reads a device's firmware info.
Returns an list of tuples [ (firmware_type, firmware_version, ...), ... ],
ordered by firmware layer.
"""
fw_count = request(handle, device, FEATURE.FIRMWARE, features_array=features_array)
if fw_count:
fw_count = ord(fw_count[0])
fw = []
for index in range(0, fw_count):
fw_info = request(handle, device, FEATURE.FIRMWARE, function=b'\x10', data=chr(index), features_array=features_array)
if fw_info:
fw_type = ord(fw_info[0]) & 0x0F
if fw_type == 0 or fw_type == 1:
prefix = str(fw_info[1:4])
version = ( str((ord(fw_info[4]) & 0xF0) >> 4) +
str(ord(fw_info[4]) & 0x0F) +
'.' +
str((ord(fw_info[5]) & 0xF0) >> 4) +
str(ord(fw_info[5]) & 0x0F))
name = prefix + ' ' + version
build = 256 * ord(fw_info[6]) + ord(fw_info[7])
if build:
name += ' b' + str(build)
extras = fw_info[9:].rstrip('\x00')
_l.trace1("(%d:%d) firmware %d = %s %s extras=%s", handle, device, fw_type, FIRMWARE_TYPES[fw_type], name, extras.encode('hex'))
fw.append((fw_type, name, build, extras))
elif fw_type == 2:
version = ord(fw_info[1])
_l.trace1("(%d:%d) firmware 2 = Hardware v%x", handle, device, version)
fw.append((2, version))
else:
_l.trace1("(%d:%d) firmware other", handle, device)
fw.append((fw_type, ))
return fw
def get_device_type(handle, device, features_array=None):
"""Reads a device's type.
:see DEVICE_TYPES:
:returns: a string describing the device type, or ``None`` if the device is
not available or does not support the ``NAME`` feature.
"""
d_type = request(handle, device, FEATURE.NAME, function=b'\x20', features_array=features_array)
if d_type:
d_type = ord(d_type[0])
_l.trace1("(%d,%d) device type %d = %s", handle, device, d_type, DEVICE_TYPES[d_type])
return DEVICE_TYPES[d_type]
def get_device_name(handle, device, features_array=None):
"""Reads a device's name.
:returns: a string with the device name, or ``None`` if the device is not
available or does not support the ``NAME`` feature.
"""
name_length = request(handle, device, FEATURE.NAME, features_array=features_array)
if name_length:
name_length = ord(name_length[0])
d_name = ''
while len(d_name) < name_length:
name_index = len(d_name)
name_fragment = request(handle, device, FEATURE.NAME, function=b'\x10', data=chr(name_index), features_array=features_array)
name_fragment = name_fragment[:name_length - len(d_name)]
d_name += name_fragment
_l.trace1("(%d,%d) device name %s", handle, device, d_name)
return d_name
def get_device_battery_level(handle, device, features_array=None):
"""Reads a device's battery level.
"""
battery = request(handle, device, FEATURE.BATTERY, features_array=features_array)
if battery:
discharge = ord(battery[0])
dischargeNext = ord(battery[1])
status = ord(battery[2])
_l.trace1("(%d:%d) battery %d%% charged, next level %d%% charge, status %d = %s", discharge, dischargeNext, status, BATTERY_STATUSES[status])
return (discharge, dischargeNext, status)

6
lib/unittest.sh Executable file
View File

@ -0,0 +1,6 @@
#!/bin/sh
cd `dirname "$0"`
export LD_LIBRARY_PATH=$PWD
exec python -Qnew -m unittest discover -v "$@"

View File

@ -4,6 +4,5 @@ cd `dirname "$0"`
export LD_LIBRARY_PATH=$PWD/lib
export PYTHONPATH=$PWD/lib
export PYTHONWARNINGS=all
exec python -OO -tt -u -3 solar.py "$@"
exec python -OO -Qnew solaar.py "$@"

181
solaar.py Normal file
View File

@ -0,0 +1,181 @@
#!/usr/bin/env python
import logging
logging.basicConfig(level=logging.DEBUG)
logging.captureWarnings(True)
import time
import threading
from gi.repository import GObject
from gi.repository import Gtk
from logitech.unifying_receiver import api as ur
from logitech.unifying_receiver.listener import EventsListener
from logitech.devices import *
#
# A few constants
#
APP_TITLE = 'Solaar'
_STATUS_TIMEOUT = 31 # seconds
_ICON_UPDATE_SLEEP = 13 # seconds
#
#
#
try:
import notify2
notify2.init(APP_TITLE)
def notify_desktop(status_code, text):
notification = notify2.Notification(APP_TITLE, text)
notification.show()
except ImportError:
def notify_desktop(status_code, text):
pass
#
#
#
class StatusThread(threading.Thread):
def __init__(self, status_icon):
super(StatusThread, self).__init__(name='StatusThread')
self.daemon = True
self.status_icon = status_icon
StatusThread.listener = None
StatusThread.devices = {}
StatusThread.statuses = {}
def run(self):
while True:
if self.listener is None:
receiver = ur.open()
if receiver:
for devinfo in ur.list_devices(receiver):
self.devices[devinfo.number] = devinfo
self.listener = EventsListener(receiver, self.events_callback)
logging.info("started events listener %s", self.listener)
self.listener.start()
elif not self.listener.active:
logging.info("stopped events listener %s", self.listener)
self.listener = None
self.devices.clear()
self.statuses.clear()
update_icon = True
if self.listener and self.devices:
update_icon &= self.update_statuses()
if update_icon:
GObject.idle_add(self.update_status_icon)
time.sleep(_ICON_UPDATE_SLEEP)
def update_statuses(self):
updated = False
for devinfo in self.devices.values():
if devinfo.number not in self.statuses:
self.statuses[devinfo.number] = [0, None, None]
last_status_time = self.statuses[devinfo.number][0]
if time.time() - last_status_time > _STATUS_TIMEOUT:
status = request_status(devinfo, self.listener)
updated |= self.device_status_changed(devinfo, status)
return updated
def events_callback(self, code, device, data):
updated = False
if device in self.devices:
devinfo = self.devices[device]
if code == 0x10 and data[0] == 'b\x8F':
updated = True
self.device_status_changed(devinfo, DEVICE_STATUS.UNAVAILABLE)
elif code == 0x11:
status = process_event(devinfo, self.listener, data)
updated |= self.device_status_changed(devinfo, status)
else:
logging.warn("unknown event code %02x", code)
elif device:
logging.debug("got event (%d, %d, %s) for new device", code, device, data.encode('hex'))
devinfo = ur.get_device_info(self.listener.receiver, device)
if devinfo:
self.devices[device] = devinfo
self.statuses[device] = [0, None, None]
else:
logging.warn("got event (%d, %d, %s) for unknown device", code, device, data.encode('hex'))
else:
logging.warn("don't know how to handle event (%d, %d, %s)", code, device, data.encode('hex'))
if updated:
GObject.idle_add(self.update_status_icon)
def device_status_changed(self, devinfo, status):
if status is None or devinfo.number not in self.statuses:
return False
if type(status) == int:
status_code = status
status_text = DEVICE_STATUS_NAME[status_code]
else:
status_code = status[0]
status_text = DEVICE_STATUS_NAME[status_code] if status[1] is None else status[1]
device_status = self.statuses[devinfo.number]
old_status_code = device_status[1]
device_status[0] = time.time()
device_status[1] = status_code
device_status[2] = status_text
if old_status_code != status_code:
logging.debug("device status changed from %s => %s: %s", old_status_code, status_code, status_text)
notify_desktop(status_code, devinfo.name + ' ' + status_text)
return True
def update_status_icon(self):
if self.listener:
all_statuses = []
for d in self.devices:
devinfo = self.devices[d]
status_text = self.statuses[d][2]
if status_text:
all_statuses.append(devinfo.name + ' ' + status_text)
else:
all_statuses.append(devinfo.name + ' present')
if all_statuses:
tooltip = '\n'.join(all_statuses)
else:
tooltip = 'No devices attached.'
else:
tooltip = 'Unifying Receiver not found.'
# logging.debug("tooltip %s", tooltip)
self.status_icon.set_tooltip_text(tooltip)
if __name__ == '__main__':
status_icon = Gtk.StatusIcon.new_from_file('images/icon.png')
status_icon.set_title(APP_TITLE)
status_icon.set_name(APP_TITLE)
status_icon.set_tooltip_text('Initializing...')
status_icon.connect('popup_menu', Gtk.main_quit)
GObject.threads_init()
StatusThread(status_icon).start()
Gtk.main()

172
solar.py
View File

@ -1,172 +0,0 @@
#!/usr/bin/env python
import logging
logging.basicConfig(level=1)
logging.captureWarnings(True)
import time
import threading
import subprocess
from collections import namedtuple
from gi.repository import GObject
from gi.repository import Gtk
from logitech.unifying_receiver import api as ur
#
# A few constants
#
KEYBOARD_NAME = 'Wireless Solar Keyboard K750'
TITLE = 'Solar [K750]'
NOTIFY_DESKTOP = True
OK = 0
NO_RECEIVER = 1
NO_K750 = 2
NO_STATUS = 3
SLEEP = (10, 5, 5, 15)
TEXT = ('K750 keyboard connected',
'Logitech Unifying Receiver not detected',
'K750 keyboard not detected',
'K750 keyboard not responding')
# ICON_NAMES = (
# 'status_good', 'status_attention',
# 'status_attention', 'status_warning'
# )
CHARGE_LUX_TEXT = 'Charge: %d%% Lux: %d'
K750_Status = namedtuple('K750_Status',
['receiver', 'device', 'status', 'charge', 'lux'])
#
#
#
# _unhandled_queue = []
# def unhandled_messages_hook(code, device, data):
# if len(_unhandled_queue) > 32:
# del _unhandled_queue[:]
# _unhandled_queue.append((code, device, data))
# from logitech.unifying_receiver import unhandled
# unhandled.set_unhandled_hook(unhandled_messages_hook)
#
#
#
def notify_desktop(status_code, text):
global NOTIFY_DESKTOP
if NOTIFY_DESKTOP:
try:
subprocess.call(('notify-send', '-u', 'low', TITLE, text))
except OSError:
NOTIFY_DESKTOP = False
def update_status_icon(status_icon, status_changed, k750):
print "update status", status_changed, k750
text = TEXT[k750.status]
if k750.status == OK:
text += '\n' + (CHARGE_LUX_TEXT % (k750.charge, k750.lux))
# print text
status_icon.set_tooltip_text(text)
if status_changed:
notify_desktop(k750.status, text)
def read_charge(receiver, device):
status, charge, lux = NO_RECEIVER, -1, -1
if receiver is None:
receiver = ur.open()
device = None
if receiver and not device:
try:
device = ur.find_device_by_name(receiver, KEYBOARD_NAME)
except ur.NoReceiver:
receiver = None
if receiver and device:
feature_solar_index = device.features_array.index(ur.FEATURE.SOLAR_CHARGE)
event = None
for i in range(0, 20):
next_event = ur.base.read(receiver, ur.base.DEFAULT_TIMEOUT * 2 // i if i > 0 else ur.base.DEFAULT_TIMEOUT * 3)
if not next_event:
break
if next_event[1] == device.number:
if next_event[0] == 0x10 and next_event[2][0] == b'\x8F':
event = next_event
elif next_event[0] == 0x11 and next_event[2][0] == chr(feature_solar_index) and next_event[2][7:11] == b'GOOD':
if next_event[2][1] == b'\x10':
event = next_event
elif next_event[2][1] == b'\x00' or next_event[2][1] == b'\x20':
event = next_event
if event is None:
try:
reply = ur.request(receiver, device.number, ur.FEATURE.SOLAR_CHARGE, function=b'\x03', params=b'\x78\x01', features_array=device.features_array)
if reply is None:
status = NO_K750
device = None
else:
return read_charge(receiver, device)
except ur.NoReceiver:
receiver = None
device = None
else:
if event[1] == 0x10:
status = NO_K750
device = None
else:
status = OK
charge = ord(event[2][2])
if event[2][1] == b'\x10':
lux = (ord(event[2][3]) << 8) + ord(event[2][4])
return K750_Status(receiver, device, status, charge, lux)
class StatusThread(threading.Thread):
def __init__(self, status_icon):
super(StatusThread, self).__init__()
self.daemon = True
self.status_icon = status_icon
def run(self):
last_status = NO_RECEIVER
k750 = K750_Status(None, None, NO_RECEIVER, 0, 0)
while True:
k750 = read_charge(k750.receiver, k750.device)
status_changed = k750.status != last_status
GObject.idle_add(update_status_icon, self.status_icon, status_changed, k750)
last_status = k750.status
time.sleep(SLEEP[k750.status])
if __name__ == "__main__":
status_icon = Gtk.StatusIcon.new_from_file('images/icon.png')
status_icon.set_title(TITLE)
status_icon.set_name(TITLE)
status_icon.set_tooltip_text('Initializing...')
status_icon.connect("popup_menu", Gtk.main_quit)
GObject.threads_init()
StatusThread(status_icon).start()
Gtk.main()

View File

@ -1,10 +0,0 @@
#!/bin/sh
cd `dirname "$0"`
export LD_LIBRARY_PATH=$PWD/lib
export PYTHONPATH=$PWD/lib
export PYTHONDONTWRITEBYTECODE=true
export PYTHONWARNINGS=all
exec python -m unittest discover -v "$@"