some clean-ups in the logitech library

This commit is contained in:
Daniel Pavel 2012-09-28 13:58:17 +03:00
parent 0fe3151051
commit d65c1dbf59
11 changed files with 188 additions and 135 deletions

View File

@ -4,10 +4,10 @@
import logging
logging.basicConfig(level=logging.DEBUG)
import struct
from binascii import hexlify
from logitech.unifying_receiver import api
from logitech.unifying_receiver.constants import *
def scan_devices(receiver):
@ -21,42 +21,37 @@ def scan_devices(receiver):
for fw in devinfo.firmware:
print " %s firmware: %s version %s build %d" % (fw.type, fw.name, fw.version, fw.build)
for index in range(0, len(devinfo.features_array)):
feature = devinfo.features_array[index]
for index in range(0, len(devinfo.features)):
feature = devinfo.features[index]
if feature:
print " Feature %s (%s) available at index %d" % (api.FEATURE_NAME(feature), hexlify(feature), index)
print "~ Feature %s (%s) at index %d" % (FEATURE_NAME[feature], hexlify(feature), index)
if api.FEATURE.REPROGRAMMABLE_KEYS in devinfo.features_array:
keys_count = api.request(receiver, devinfo.number, api.FEATURE.REPROGRAMMABLE_KEYS, features_array=devinfo.features_array)
if keys_count:
keys_count = ord(keys_count[:1])
print " %d reprogrammable keys available" % keys_count
for index in range(0, keys_count):
key_info = api.request(receiver, devinfo.number, api.FEATURE.REPROGRAMMABLE_KEYS,
function=b'\x10', params=struct.pack('!B', index),
features_array=devinfo.features_array)
ctrl_id_indexes, ctrl_task_indexes, flags = struct.unpack('!HHB', key_info[:5])
flag = ''
if flags & 0x10:
flag += ' reprogrammable'
if flags & 0x08:
flag += ' fn-sensitive'
if flags & 0x04:
flag += ' nonstandard'
if flags & 0x02:
flag += ' is-fn'
if flags & 0x01:
flag += ' mse'
print " key %d : %04x %04x %s" % (index, ctrl_id_indexes, ctrl_task_indexes, flag)
if FEATURE.BATTERY in devinfo.features:
discharge, dischargeNext, status = api.get_device_battery_level(receiver, devinfo.number, features_array=devinfo.features)
print " Battery %d charged (next level %d%), status %s" % (discharge, dischargeNext, status)
if FEATURE.REPROGRAMMABLE_KEYS in devinfo.features:
keys = api.get_device_keys(receiver, devinfo.number, features_array=devinfo.features)
if keys is not None and keys:
print " %d reprogrammable keys found" % len(keys)
for k in keys:
flags = ''
if k.flags & KEY_FLAG.REPROGRAMMABLE:
flags += ' reprogrammable'
if k.flags & KEY_FLAG.FN_SENSITIVE:
flags += ' fn-sensitive'
if k.flags & KEY_FLAG.NONSTANDARD:
flags += ' nonstandard'
if k.flags & KEY_FLAG.IS_FN:
flags += ' is-fn'
if k.flags & KEY_FLAG.MSE:
flags += ' mse'
print " %2d: %s => %s :%s" % (k.index, KEY_NAME[k.id], KEY_NAME[k.task], flags)
print "--------"
if __name__ == '__main__':
import argparse
arg_parser = argparse.ArgumentParser()

View File

@ -26,7 +26,7 @@ _LIGHTING_LIMITS = (400, 200, 50, 20, -1)
def _trigger_solar_charge_events(receiver, devinfo):
return _api.request(receiver, devinfo.number,
feature=_api.FEATURE.SOLAR_CHARGE, function=b'\x03', params=b'\x78\x01',
features_array=devinfo.features_array)
features_array=devinfo.features)
def _charge_status(data):

View File

@ -6,6 +6,7 @@ import logging
import struct
from binascii import hexlify
from .common import *
from .constants import *
from .exceptions import *
from . import base
@ -19,34 +20,6 @@ _l = logging.getLogger('logitech.unifying_receiver.api')
#
#
from collections import namedtuple
"""Tuple returned by list_devices and find_device_by_name."""
AttachedDeviceInfo = namedtuple('AttachedDeviceInfo', [
'number',
'type',
'name',
'firmware',
'features_array'])
"""Firmware information."""
FirmwareInfo = namedtuple('FirmwareInfo', [
'level',
'type',
'name',
'version',
'build',
'extras'])
def _makeFirmwareInfo(level, type, name=None, version=None, build=None, extras=None):
return FirmwareInfo(level, type, name, version, build, extras)
del namedtuple
#
#
#
def open():
"""Opens the first Logitech UR found attached to the machine.
@ -97,7 +70,7 @@ def request(handle, device, feature, function=b'\x00', params=b'', features_arra
feature_index = struct.pack('!B', features_array.index(feature))
if feature_index is None:
_l.warn("(%d,%d) feature <%s:%s> not supported", handle, device, hexlify(feature), FEATURE_NAME(feature))
_l.warn("(%d,%d) feature <%s:%s> not supported", handle, device, hexlify(feature), FEATURE_NAME[feature])
raise FeatureNotSupported(device, feature)
return base.request(handle, device, feature_index + function, params)
@ -208,7 +181,7 @@ def get_feature_index(handle, device, feature):
:returns: An int, or ``None`` if the feature is not available.
"""
_l.log(_LOG_LEVEL, "(%d,%d) get feature index <%s:%s>", handle, device, hexlify(feature), FEATURE_NAME(feature))
_l.log(_LOG_LEVEL, "(%d,%d) get feature index <%s:%s>", handle, device, hexlify(feature), FEATURE_NAME[feature])
if len(feature) != 2:
raise ValueError("invalid feature <%s>: it must be a two-byte string" % feature)
@ -219,19 +192,19 @@ def get_feature_index(handle, device, feature):
feature_index = ord(reply[0:1])
if feature_index:
feature_flags = ord(reply[1:2]) & 0xE0
_l.log(_LOG_LEVEL, "(%d,%d) feature <%s:%s> has index %d flags %02x", handle, device, hexlify(feature), FEATURE_NAME(feature), feature_index, feature_flags)
_l.log(_LOG_LEVEL, "(%d,%d) feature <%s:%s> has index %d flags %02x", handle, device, hexlify(feature), FEATURE_NAME[feature], feature_index, feature_flags)
if feature_flags == 0:
return feature_index
if feature_flags & 0x80:
_l.warn("(%d,%d) feature <%s:%s> not supported: obsolete", handle, device, hexlify(feature), FEATURE_NAME(feature))
_l.warn("(%d,%d) feature <%s:%s> not supported: obsolete", handle, device, hexlify(feature), FEATURE_NAME[feature])
if feature_flags & 0x40:
_l.warn("(%d,%d) feature <%s:%s> not supported: hidden", handle, device, hexlify(feature), FEATURE_NAME(feature))
_l.warn("(%d,%d) feature <%s:%s> not supported: hidden", handle, device, hexlify(feature), FEATURE_NAME[feature])
if feature_flags & 0x20:
_l.warn("(%d,%d) feature <%s:%s> not supported: Logitech internal", handle, device, hexlify(feature), FEATURE_NAME(feature))
_l.warn("(%d,%d) feature <%s:%s> not supported: Logitech internal", handle, device, hexlify(feature), FEATURE_NAME[feature])
raise FeatureNotSupported(device, feature)
else:
_l.warn("(%d,%d) feature <%s:%s> not supported by the device", handle, device, hexlify(feature), FEATURE_NAME(feature))
_l.warn("(%d,%d) feature <%s:%s> not supported by the device", handle, device, hexlify(feature), FEATURE_NAME[feature])
raise FeatureNotSupported(device, feature)
@ -273,7 +246,7 @@ def get_device_features(handle, device):
if feature:
feature = feature[0:2].upper()
features[index] = feature
_l.log(_LOG_LEVEL, "(%d,%d) feature <%s:%s> at index %d", handle, device, hexlify(feature), FEATURE_NAME(feature), index)
_l.log(_LOG_LEVEL, "(%d,%d) feature <%s:%s> at index %d", handle, device, hexlify(feature), FEATURE_NAME[feature], index)
return None if all(c == None for c in features) else features
@ -283,6 +256,9 @@ def get_device_firmware(handle, device, features_array=None):
:returns: a list of FirmwareInfo tuples, ordered by firmware layer.
"""
def _makeFirmwareInfo(level, type, name=None, version=None, build=None, extras=None):
return FirmwareInfo(level, type, name, version, build, extras)
fw_count = request(handle, device, FEATURE.FIRMWARE, features_array=features_array)
if fw_count:
fw_count = ord(fw_count[:1])
@ -294,7 +270,7 @@ def get_device_firmware(handle, device, features_array=None):
if fw_info:
fw_level = ord(fw_info[:1]) & 0x0F
if fw_level == 0 or fw_level == 1:
fw_type = FIRMWARE_TYPES[fw_level]
fw_type = FIRMWARE_TYPE[fw_level]
name, = struct.unpack('!3s', fw_info[1:4])
name = name.decode('ascii')
version = ( chr(0x30 + (ord(fw_info[4:5]) >> 4)) +
@ -309,9 +285,9 @@ def get_device_firmware(handle, device, features_array=None):
else:
fw_info = _makeFirmwareInfo(level=fw_level, type=fw_type, name=name, version=version, build=build)
elif fw_level == 2:
fw_info = _makeFirmwareInfo(level=2, type=FIRMWARE_TYPES[2], version=ord(fw_info[1:2]))
fw_info = _makeFirmwareInfo(level=2, type=FIRMWARE_TYPE[2], version=ord(fw_info[1:2]))
else:
fw_info = _makeFirmwareInfo(level=fw_level, type=FIRMWARE_TYPES[-1])
fw_info = _makeFirmwareInfo(level=fw_level, type=FIRMWARE_TYPE[-1])
fw.append(fw_info)
_l.log(_LOG_LEVEL, "(%d:%d) firmware %s", handle, device, fw_info)
@ -362,5 +338,21 @@ def get_device_battery_level(handle, device, features_array=None):
battery = request(handle, device, FEATURE.BATTERY, features_array=features_array)
if battery:
discharge, dischargeNext, status = struct.unpack('!BBB', battery[:3])
_l.log(_LOG_LEVEL, "(%d:%d) battery %d%% charged, next level %d%% charge, status %d = %s", discharge, dischargeNext, status, BATTERY_STATUSES[status])
return (discharge, dischargeNext, status)
_l.log(_LOG_LEVEL, "(%d:%d) battery %d%% charged, next level %d%% charge, status %d = %s", discharge, dischargeNext, status, BATTERY_STATUSE[status])
return (discharge, dischargeNext, BATTERY_STATUSE[status])
def get_device_keys(handle, device, features_array=None):
count = request(handle, device, FEATURE.REPROGRAMMABLE_KEYS, features_array=features_array)
if count:
keys = []
count = ord(count[:1])
for index in range(0, count):
keyindex = struct.pack('!B', index)
keydata = request(handle, device, FEATURE.REPROGRAMMABLE_KEYS, function=b'\x10', params=keyindex, features_array=features_array)
if keydata:
key, key_task, flags = struct.unpack('!HHB', keydata[:5])
keys.append(ReprogrammableKeyInfo(index, key, KEY_NAME[key], key_task, KEY_NAME[key_task], flags))
return keys

View File

@ -0,0 +1,50 @@
#
# Some common functions and types.
#
class FallbackDict(dict):
def __init__(self, fallback_function, *args, **kwargs):
super(FallbackDict, self).__init__(*args, **kwargs)
self.fallback = fallback_function
def __getitem__(self, key):
try:
return super(FallbackDict, self).__getitem__(key)
except KeyError:
return self.fallback(key)
def list2dict(values_list):
return dict(zip(range(0, len(values_list)), values_list))
from collections import namedtuple
"""Tuple returned by list_devices and find_device_by_name."""
AttachedDeviceInfo = namedtuple('AttachedDeviceInfo', [
'number',
'type',
'name',
'firmware',
'features'])
"""Firmware information."""
FirmwareInfo = namedtuple('FirmwareInfo', [
'level',
'type',
'name',
'version',
'build',
'extras'])
"""Reprogrammable keys informations."""
ReprogrammableKeyInfo = namedtuple('ReprogrammableKeyInfo', [
'index',
'id',
'name',
'task',
'task_name',
'flags'])
del namedtuple

View File

@ -2,7 +2,11 @@
# Constants used by the rest of the API.
#
from binascii import hexlify
from binascii import hexlify as _hexlify
from struct import pack as _pack
from .common import *
"""Possible features available on a Logitech device.
@ -18,55 +22,67 @@ FEATURE = type('FEATURE', (),
BATTERY=b'\x10\x00',
REPROGRAMMABLE_KEYS=b'\x1B\x00',
WIRELESS_STATUS=b'\x1D\x4B',
# declared by the K750 keyboard, no documentation found so far
SOLAR_CHARGE=b'\x43\x01',
# declared by the K750 keyboard, no documentation found so far
# UNKNOWN_1DF3=b'\x1D\xF3',
# UNKNOWN_40A0=b'\x40\xA0',
# UNKNOWN_4100=b'\x41\x00',
# UNKNOWN_4520=b'\x45\x20',
))
def _feature_name(key):
if key is None:
return None
if type(key) == int:
return FEATURE_NAME[_pack('!H', key)]
return 'UNKNOWN_' + _hexlify(key)
"""Feature names indexed by feature id."""
_FEATURE_NAMES = {
FEATURE.ROOT: 'ROOT',
FEATURE.FEATURE_SET: 'FEATURE_SET',
FEATURE.FIRMWARE: 'FIRMWARE',
FEATURE.NAME: 'NAME',
FEATURE.BATTERY: 'BATTERY',
FEATURE.REPROGRAMMABLE_KEYS: 'REPROGRAMMABLE_KEYS',
FEATURE.WIRELESS_STATUS: 'WIRELESS_STATUS',
FEATURE.SOLAR_CHARGE: 'SOLAR_CHARGE',
}
def FEATURE_NAME(feature_code):
if feature_code is None:
return None
if feature_code in _FEATURE_NAMES:
return _FEATURE_NAMES[feature_code]
return 'UNKNOWN_%s' % hexlify(feature_code)
FEATURE_NAME = FallbackDict(_feature_name)
FEATURE_NAME[FEATURE.ROOT] = 'ROOT'
FEATURE_NAME[FEATURE.FEATURE_SET] = 'FEATURE_SET'
FEATURE_NAME[FEATURE.FIRMWARE] = 'FIRMWARE'
FEATURE_NAME[FEATURE.NAME] = 'NAME'
FEATURE_NAME[FEATURE.BATTERY] = 'BATTERY'
FEATURE_NAME[FEATURE.REPROGRAMMABLE_KEYS] = 'REPROGRAMMABLE_KEYS'
FEATURE_NAME[FEATURE.WIRELESS_STATUS] = 'WIRELESS_STATUS'
FEATURE_NAME[FEATURE.SOLAR_CHARGE] = 'SOLAR_CHARGE'
"""Possible types of devices connected to an UR."""
DEVICE_TYPES = ("Keyboard", "Remote Control", "NUMPAD", "Mouse",
"Touchpad", "Trackball", "Presenter", "Receiver")
DEVICE_TYPES = ('Keyboard', 'Remote Control', 'NUMPAD', 'Mouse',
'Touchpad', 'Trackball', 'Presenter', 'Receiver')
_FIRMWARE_TYPES = ('Main (HID)', 'Bootloader', 'Hardware', 'Other')
"""Names of different firmware levels possible, ordered from top to bottom."""
FIRMWARE_TYPES = ("Main (HID)", "Bootloader", "Hardware", "Other")
FIRMWARE_TYPE = FallbackDict(lambda x: 'Unknown', list2dict(_FIRMWARE_TYPES))
_BATTERY_STATUSES = ('Discharging (in use)', 'Recharging', 'Almost full',
'Full', 'Slow recharge', 'Invalid battery', 'Thermal error',
'Charging error')
"""Names for possible battery status values."""
BATTERY_STATUSES = ("Discharging (in use)", "Recharging", "Almost full", "Full",
"Slow recharge", "Invalid battery", "Thermal error",
"Charging error")
BATTERY_STATUS = FallbackDict(lambda x: 'unknown', list2dict(_BATTERY_STATUSES))
_KEY_NAMES = ( 'unknown_0000', 'Volume up', 'Volume down', 'Mute', 'Play/Pause',
'Next', 'Previous', 'Stop', 'Application switcher',
'unknown_0009', 'Calculator', 'unknown_000b', 'unknown_000c',
'unknown_000d', 'Mail')
"""Standard names for reprogrammable keys."""
KEY_NAME = FallbackDict(lambda x: 'unknown_%04x' % x, list2dict(_KEY_NAMES))
"""Possible flags on a reprogrammable key."""
KEY_FLAG = type('REPROGRAMMABLE_KEY_FLAGS', (), dict(
REPROGRAMMABLE=0x10,
FN_SENSITIVE=0x08,
NONSTANDARD=0x04,
IS_FN=0x02,
MSE=0x01,
))
_ERROR_NAMES = ('Ok', 'Unknown', 'Invalid argument', 'Out of range',
'Hardware error', 'Logitech internal', 'Invalid feature index',
'Invalid function', 'Busy', 'Unsupported')
"""Names for error codes."""
_ERROR_NAMES = ("Ok", "Unknown", "Invalid argument", "Out of range",
"Hardware error", "Logitech internal", "Invalid feature index",
"Invalid function", "Busy", "Unsupported")
def ERROR_NAME(error_code):
if error_code < len(_ERROR_NAMES):
return _ERROR_NAMES[error_code]
return 'Unknown Error'
ERROR_NAME = FallbackDict(lambda x: 'Unknown error', list2dict(_ERROR_NAMES))

View File

@ -17,21 +17,21 @@ class NoReceiver(Exception):
class FeatureNotSupported(Exception):
"""Raised when trying to request a feature not supported by the device."""
def __init__(self, device, feature):
super(FeatureNotSupported, self).__init__(device, feature, _FEATURE_NAME(feature))
super(FeatureNotSupported, self).__init__(device, feature, _FEATURE_NAME[feature])
self.device = device
self.feature = feature
self.feature_name = _FEATURE_NAME(feature)
self.feature_name = _FEATURE_NAME[feature]
class FeatureCallError(Exception):
"""Raised if the device replied to a feature call with an error."""
def __init__(self, device, feature, feature_index, feature_function, error_code, data=None):
super(FeatureCallError, self).__init__(device, feature, feature_index, feature_function, error_code, _ERROR_NAME(error_code))
super(FeatureCallError, self).__init__(device, feature, feature_index, feature_function, error_code, _ERROR_NAME[error_code])
self.device = device
self.feature = feature
self.feature_name = _FEATURE_NAME(feature)
self.feature_name = None if feature is None else _FEATURE_NAME[feature]
self.feature_index = feature_index
self.feature_function = feature_function
self.error_code = error_code
self.error_string = _ERROR_NAME(error_code)
self.error_string = _ERROR_NAME[error_code]
self.data = data

View File

@ -78,7 +78,7 @@ class EventsListener(threading.Thread):
The api_function will get the receiver handle as a first agument, all
other args and kwargs will follow.
"""
# _l.log(_LOG_LEVEL, "(%d) request '%s' with %s, %s", self.receiver, api_function.__name__, args, kwargs)
# _l.log(_LOG_LEVEL, "(%d) request '%s.%s' with %s, %s", self.receiver, api_function.__module__, api_function.__name__, args, kwargs)
self.task_processing.acquire()
self.task_done.clear()
self.task = (api_function, args, kwargs)
@ -88,13 +88,13 @@ class EventsListener(threading.Thread):
self.task = self.task_reply = None
self.task_processing.release()
# _l.log(_LOG_LEVEL, "(%d) request '%s' => [%s]", self.receiver, api_function.__name__, hexlify(reply))
# _l.log(_LOG_LEVEL, "(%d) request '%s.%s' => [%s]", self.receiver, api_function.__module__, api_function.__name__, hexlify(reply))
if isinstance(reply, Exception):
raise reply
return reply
def _make_request(self, api_function, args, kwargs):
_l.log(_LOG_LEVEL, "(%d) calling '%s' with %s, %s", self.receiver, api_function.__name__, args, kwargs)
_l.log(_LOG_LEVEL, "(%d) calling '%s.%s' with %s, %s", self.receiver, api_function.__module__, api_function.__name__, args, kwargs)
try:
return api_function.__call__(self.receiver, *args, **kwargs)
except NoReceiver as nr:

View File

@ -5,31 +5,29 @@
import unittest
import struct
from logitech.unifying_receiver import constants
from logitech.unifying_receiver.constants import *
class Test_UR_Constants(unittest.TestCase):
def test_10_feature_names(self):
self.assertIsNone(constants.FEATURE_NAME(None))
for code in range(0x0000, 0x10000):
feature = struct.pack('!H', code)
name = constants.FEATURE_NAME(feature)
name = FEATURE_NAME[feature]
self.assertIsNotNone(name)
self.assertEqual(FEATURE_NAME[code], name)
if name.startswith('UNKNOWN_'):
self.assertEqual(code, struct.unpack('!H', feature)[0])
else:
self.assertTrue(hasattr(constants.FEATURE, name))
self.assertEqual(feature, getattr(constants.FEATURE, name))
self.assertTrue(hasattr(FEATURE, name))
self.assertEqual(feature, getattr(FEATURE, name))
def test_20_error_names(self):
for code in range(0x00, 0x100):
name = constants.ERROR_NAME(code)
for code in range(0, len(ERROR_NAME)):
name = ERROR_NAME[code]
self.assertIsNotNone(name)
if code > 9:
self.assertEqual(name, 'Unknown Error')
else:
self.assertEqual(code, constants._ERROR_NAMES.index(name))
# self.assertEqual(code, ERROR_NAME.index(name))
if __name__ == '__main__':
unittest.main()

View File

@ -3,6 +3,7 @@
#
import unittest
import warnings
from logitech.unifying_receiver import api
from logitech.unifying_receiver.exceptions import *
@ -144,11 +145,12 @@ class Test_UR_API(unittest.TestCase):
if self.features_array is None:
self.fail("no feature set available")
try:
if FEATURE.BATTERY in self.features_array:
battery = api.get_device_battery_level(self.handle, self.device, self.features_array)
self.assertIsNotNone(battery, "failed to read battery level")
except FeatureNotSupported:
self.fail("FEATURE.BATTERY not supported by device " + str(self.device) + ": " + str(self.device_info))
self.assertIsInstance(battery, tuple, "result not a tuple")
else:
warnings.warn("BATTERY feature not supported by device %d" % self.device)
def test_70_list_devices(self):
if self.handle is None:

View File

@ -1,9 +1,9 @@
#!/bin/sh
cd `dirname "$0"`
cd `dirname "$0"`/../lib
export LD_LIBRARY_PATH=$PWD/lib
export PYTHONPATH=$PWD/lib
export LD_LIBRARY_PATH=$PWD
export PYTHONPATH=$PWD
exec python -OO -m cli.hidconsole "$@"

View File

@ -1,8 +1,8 @@
#!/bin/sh
cd `dirname "$0"`
cd `dirname "$0"`/../lib
export LD_LIBRARY_PATH=$PWD/lib
export PYTHONPATH=$PWD/lib
export LD_LIBRARY_PATH=$PWD
export PYTHONPATH=$PWD
exec python -OO -m cli.ur_scanner "$@"