clean-ups in LUR library

This commit is contained in:
Daniel Pavel 2012-10-11 18:38:57 +03:00
parent f295d1d90e
commit c9f06aa5da
12 changed files with 158 additions and 137 deletions

View File

@ -195,7 +195,7 @@ class Watcher(Thread):
updated = True updated = True
self._device_status_changed(devstatus, C.STATUS.UNAVAILABLE) self._device_status_changed(devstatus, C.STATUS.UNAVAILABLE)
elif code == 0x11: elif code == 0x11:
status = devices.process_event(devstatus, self.listener, data) status = devices.process_event(devstatus, data)
updated |= self._device_status_changed(devstatus, status) updated |= self._device_status_changed(devstatus, status)
else: else:
_l.warn("unknown event code %02x", code) _l.warn("unknown event code %02x", code)

View File

@ -1 +0,0 @@
# pass

View File

@ -5,7 +5,7 @@ __license__ = "GPL"
__version__ = "0.3" __version__ = "0.3"
# #
# In case a future pure-Python implementation is feasible. # This package exists in case a future pure-Python implementation is feasible.
# #
from .native import * from .native import *

View File

@ -50,7 +50,7 @@ if __name__ == '__main__':
help='linux device to connect to') help='linux device to connect to')
args = arg_parser.parse_args() args = arg_parser.parse_args()
import hidapi from . import hidapi
print (".. Opening device %s" % args.device) print (".. Opening device %s" % args.device)
handle = hidapi.open_path(args.device.encode('utf-8')) handle = hidapi.open_path(args.device.encode('utf-8'))
if handle: if handle:

View File

@ -17,13 +17,13 @@ def ping(devinfo, listener):
def default_request_status(devinfo, listener): def default_request_status(devinfo, listener):
if _api.C.FEATURE.BATTERY in devinfo.features: if _api.C.FEATURE.BATTERY in devinfo.features:
reply = listener.request(_api.get_device_battery_level, devinfo.number, features_array=devinfo.features) reply = listener.request(_api.get_device_battery_level, devinfo.number, features=devinfo.features)
if reply: if reply:
discharge, dischargeNext, status = reply discharge, dischargeNext, status = reply
return C.STATUS.CONNECTED, {C.PROPS.BATTERY_LEVEL: discharge} return C.STATUS.CONNECTED, {C.PROPS.BATTERY_LEVEL: discharge}
def default_process_event(devinfo, listener, data): def default_process_event(devinfo, data):
feature_index = ord(data[0:1]) feature_index = ord(data[0:1])
feature = devinfo.features[feature_index] feature = devinfo.features[feature_index]
feature_function = ord(data[1:2]) & 0xF0 feature_function = ord(data[1:2]) & 0xF0
@ -53,6 +53,12 @@ _REQUEST_STATUS_FUNCTIONS = {
} }
def request_status(devinfo, listener): def request_status(devinfo, listener):
"""Trigger a status request for a device.
:param devinfo: the device info tuple.
:param listener: the EventsListener that will be used to send the request,
and which will receive the status events from the device.
"""
if listener: if listener:
if devinfo.name in _REQUEST_STATUS_FUNCTIONS: if devinfo.name in _REQUEST_STATUS_FUNCTIONS:
return _REQUEST_STATUS_FUNCTIONS[devinfo.name](devinfo, listener) return _REQUEST_STATUS_FUNCTIONS[devinfo.name](devinfo, listener)
@ -63,11 +69,18 @@ _PROCESS_EVENT_FUNCTIONS = {
k750.NAME: k750.process_event k750.NAME: k750.process_event
} }
def process_event(devinfo, listener, data): def process_event(devinfo, data):
if listener: """Process an event received for a device.
default_result = default_process_event(devinfo, listener, data)
if default_result is not None:
return default_result
if devinfo.name in _PROCESS_EVENT_FUNCTIONS: When using an EventsListener, it is assumed this event was received through
return _PROCESS_EVENT_FUNCTIONS[devinfo.name](devinfo, listener, data) its callback, where you may call LUR APIs directly.
:param devinfo: the device info tuple.
:param data: the event data (event packet sans the first two bytes: reply code and device number)
"""
default_result = default_process_event(devinfo, data)
if default_result is not None:
return default_result
if devinfo.name in _PROCESS_EVENT_FUNCTIONS:
return _PROCESS_EVENT_FUNCTIONS[devinfo.name](devinfo, data)

View File

@ -21,7 +21,7 @@ NAME = 'Wireless Solar Keyboard K750'
def _trigger_solar_charge_events(handle, devinfo): def _trigger_solar_charge_events(handle, devinfo):
return _api.request(handle, devinfo.number, return _api.request(handle, devinfo.number,
feature=_api.C.FEATURE.SOLAR_CHARGE, function=b'\x03', params=b'\x78\x01', feature=_api.C.FEATURE.SOLAR_CHARGE, function=b'\x03', params=b'\x78\x01',
features_array=devinfo.features) features=devinfo.features)
def _charge_status(data, hasLux=False): def _charge_status(data, hasLux=False):
@ -54,7 +54,7 @@ def request_status(devinfo, listener):
return C.STATUS.UNAVAILABLE return C.STATUS.UNAVAILABLE
def process_event(devinfo, listener, data): def process_event(devinfo, data):
if data[:2] == b'\x09\x00' and data[7:11] == b'GOOD': if data[:2] == b'\x09\x00' and data[7:11] == b'GOOD':
# usually sent after the keyboard is turned on # usually sent after the keyboard is turned on
return _charge_status(data) return _charge_status(data)
@ -65,7 +65,7 @@ def process_event(devinfo, listener, data):
if data[:2] == b'\x09\x20' and data[7:11] == b'GOOD': if data[:2] == b'\x09\x20' and data[7:11] == b'GOOD':
logging.debug("Solar key pressed") logging.debug("Solar key pressed")
if listener and _trigger_solar_charge_events(listener.receiver, devinfo) is None: if _trigger_solar_charge_events(devinfo.handle, devinfo) is None:
return C.STATUS.UNAVAILABLE return C.STATUS.UNAVAILABLE
return _charge_status(data) return _charge_status(data)

View File

@ -33,7 +33,7 @@ open = _base.open
close = _base.close close = _base.close
def request(handle, devnumber, feature, function=b'\x00', params=b'', features_array=None): def request(handle, devnumber, feature, function=b'\x00', params=b'', features=None):
"""Makes a feature call to the device, and returns the reply data. """Makes a feature call to the device, and returns the reply data.
Basically a write() followed by (possibly multiple) reads, until a reply Basically a write() followed by (possibly multiple) reads, until a reply
@ -43,7 +43,13 @@ def request(handle, devnumber, feature, function=b'\x00', params=b'', features_a
Incoming data packets not matching the feature and function will be Incoming data packets not matching the feature and function will be
delivered to the unhandled hook (if any), and ignored. delivered to the unhandled hook (if any), and ignored.
The optional ``features_array`` parameter is a cached result of the :param function: the function to call on that feature, may be an byte value
or a bytes string of length 1.
:param params: optional bytes string to send as function parameters to the
feature; may also be an integer if the function only takes a single byte as
parameter.
The optional ``features`` parameter is a cached result of the
get_device_features function for this device, necessary to find the feature get_device_features function for this device, necessary to find the feature
index. If the ``features_arrary`` is not provided, one will be obtained by index. If the ``features_arrary`` is not provided, one will be obtained by
manually calling get_device_features before making the request call proper. manually calling get_device_features before making the request call proper.
@ -55,18 +61,23 @@ def request(handle, devnumber, feature, function=b'\x00', params=b'', features_a
if feature == C.FEATURE.ROOT: if feature == C.FEATURE.ROOT:
feature_index = b'\x00' feature_index = b'\x00'
else: else:
if features_array is None: if features is None:
features_array = get_device_features(handle, devnumber) features = get_device_features(handle, devnumber)
if features_array is None: if features is None:
_l.log(_LOG_LEVEL, "(%d) no features array available", devnumber) _l.log(_LOG_LEVEL, "(%d) no features array available", devnumber)
return None return None
if feature in features_array: if feature in features:
feature_index = _pack('!B', features_array.index(feature)) feature_index = _pack('!B', features.index(feature))
if feature_index is None: if feature_index is None:
_l.warn("(%d) feature <%s:%s> not supported", devnumber, _hexlify(feature), C.FEATURE_NAME[feature]) _l.warn("(%d) feature <%s:%s> not supported", devnumber, _hexlify(feature), C.FEATURE_NAME[feature])
raise E.FeatureNotSupported(devnumber, feature) raise E.FeatureNotSupported(devnumber, feature)
if type(function) == int:
function = _pack('!B', function)
if type(params) == int:
params = _pack('!B', params)
return _base.request(handle, devnumber, feature_index + function, params) return _base.request(handle, devnumber, feature_index + function, params)
@ -86,19 +97,19 @@ def get_device_protocol(handle, devnumber):
return 'HID %d.%d' % (ord(reply[0:1]), ord(reply[1:2])) return 'HID %d.%d' % (ord(reply[0:1]), ord(reply[1:2]))
def find_device_by_name(handle, device_name): def find_device_by_name(handle, name):
"""Searches for an attached device by name. """Searches for an attached device by name.
:returns: an AttachedDeviceInfo tuple, or ``None``. :returns: an AttachedDeviceInfo tuple, or ``None``.
""" """
_l.log(_LOG_LEVEL, "searching for device '%s'", device_name) _l.log(_LOG_LEVEL, "searching for device '%s'", name)
for devnumber in range(1, 1 + C.MAX_ATTACHED_DEVICES): for devnumber in range(1, 1 + C.MAX_ATTACHED_DEVICES):
features_array = get_device_features(handle, devnumber) features = get_device_features(handle, devnumber)
if features_array: if features:
d_name = get_device_name(handle, devnumber, features_array) d_name = get_device_name(handle, devnumber, features)
if d_name == device_name: if d_name == name:
return get_device_info(handle, devnumber, device_name=d_name, features_array=features_array) return get_device_info(handle, devnumber, name=d_name, features=features)
def list_devices(handle): def list_devices(handle):
@ -111,27 +122,27 @@ def list_devices(handle):
devices = [] devices = []
for device in range(1, 1 + C.MAX_ATTACHED_DEVICES): for device in range(1, 1 + C.MAX_ATTACHED_DEVICES):
features_array = get_device_features(handle, device) features = get_device_features(handle, device)
if features_array: if features:
devices.append(get_device_info(handle, device, features_array=features_array)) devices.append(get_device_info(handle, device, features=features))
return devices return devices
def get_device_info(handle, devnumber, device_name=None, features_array=None): def get_device_info(handle, devnumber, name=None, features=None):
"""Gets the complete info for a device (type, name, firmwares, and features_array). """Gets the complete info for a device (type, name, firmware versions, features).
:returns: an AttachedDeviceInfo tuple, or ``None``. :returns: an AttachedDeviceInfo tuple, or ``None``.
""" """
if features_array is None: if features is None:
features_array = get_device_features(handle, devnumber) features = get_device_features(handle, devnumber)
if features_array is None: if features is None:
return None return None
d_type = get_device_type(handle, devnumber, features_array) d_type = get_device_type(handle, devnumber, features)
d_name = get_device_name(handle, devnumber, features_array) if device_name is None else device_name d_name = get_device_name(handle, devnumber, features) if name is None else name
d_firmware = get_device_firmware(handle, devnumber, features_array) d_firmware = get_device_firmware(handle, devnumber, features)
devinfo = AttachedDeviceInfo(handle, devnumber, d_type, d_name, d_firmware, features_array) devinfo = AttachedDeviceInfo(handle, devnumber, d_type, d_name, d_firmware, features)
_l.log(_LOG_LEVEL, "(%d) found device %s", devnumber, devinfo) _l.log(_LOG_LEVEL, "(%d) found device %s", devnumber, devinfo)
return devinfo return devinfo
@ -222,7 +233,7 @@ def get_device_features(handle, devnumber):
return features return features
def get_device_firmware(handle, devnumber, features_array=None): def get_device_firmware(handle, devnumber, features=None):
"""Reads a device's firmware info. """Reads a device's firmware info.
:returns: a list of FirmwareInfo tuples, ordered by firmware layer. :returns: a list of FirmwareInfo tuples, ordered by firmware layer.
@ -230,14 +241,13 @@ def get_device_firmware(handle, devnumber, features_array=None):
def _makeFirmwareInfo(level, type, name=None, version=None, build=None, extras=None): def _makeFirmwareInfo(level, type, name=None, version=None, build=None, extras=None):
return FirmwareInfo(level, type, name, version, build, extras) return FirmwareInfo(level, type, name, version, build, extras)
fw_count = request(handle, devnumber, C.FEATURE.FIRMWARE, features_array=features_array) fw_count = request(handle, devnumber, C.FEATURE.FIRMWARE, features=features)
if fw_count: if fw_count:
fw_count = ord(fw_count[:1]) fw_count = ord(fw_count[:1])
fw = [] fw = []
for index in range(0, fw_count): for index in range(0, fw_count):
index = _pack('!B', index) fw_info = request(handle, devnumber, C.FEATURE.FIRMWARE, function=b'\x10', params=index, features=features)
fw_info = request(handle, devnumber, C.FEATURE.FIRMWARE, function=b'\x10', params=index, features_array=features_array)
if fw_info: if fw_info:
fw_level = ord(fw_info[:1]) & 0x0F fw_level = ord(fw_info[:1]) & 0x0F
if fw_level == 0 or fw_level == 1: if fw_level == 0 or fw_level == 1:
@ -262,34 +272,33 @@ def get_device_firmware(handle, devnumber, features_array=None):
return fw return fw
def get_device_type(handle, devnumber, features_array=None): def get_device_type(handle, devnumber, features=None):
"""Reads a device's type. """Reads a device's type.
:see DEVICE_TYPE: :see DEVICE_TYPE:
:returns: a string describing the device type, or ``None`` if the device is :returns: a string describing the device type, or ``None`` if the device is
not available or does not support the ``NAME`` feature. not available or does not support the ``NAME`` feature.
""" """
d_type = request(handle, devnumber, C.FEATURE.NAME, function=b'\x20', features_array=features_array) d_type = request(handle, devnumber, C.FEATURE.NAME, function=b'\x20', features=features)
if d_type: if d_type:
d_type = ord(d_type[:1]) d_type = ord(d_type[:1])
_l.log(_LOG_LEVEL, "(%d) device type %d = %s", devnumber, d_type, C.DEVICE_TYPE[d_type]) _l.log(_LOG_LEVEL, "(%d) device type %d = %s", devnumber, d_type, C.DEVICE_TYPE[d_type])
return C.DEVICE_TYPE[d_type] return C.DEVICE_TYPE[d_type]
def get_device_name(handle, devnumber, features_array=None): def get_device_name(handle, devnumber, features=None):
"""Reads a device's name. """Reads a device's name.
:returns: a string with the device name, or ``None`` if the device is not :returns: a string with the device name, or ``None`` if the device is not
available or does not support the ``NAME`` feature. available or does not support the ``NAME`` feature.
""" """
name_length = request(handle, devnumber, C.FEATURE.NAME, features_array=features_array) name_length = request(handle, devnumber, C.FEATURE.NAME, features=features)
if name_length: if name_length:
name_length = ord(name_length[:1]) name_length = ord(name_length[:1])
d_name = b'' d_name = b''
while len(d_name) < name_length: while len(d_name) < name_length:
name_index = _pack('!B', len(d_name)) name_fragment = request(handle, devnumber, C.FEATURE.NAME, function=b'\x10', params=len(d_name), features=features)
name_fragment = request(handle, devnumber, C.FEATURE.NAME, function=b'\x10', params=name_index, features_array=features_array)
if name_fragment: if name_fragment:
name_fragment = name_fragment[:name_length - len(d_name)] name_fragment = name_fragment[:name_length - len(d_name)]
d_name += name_fragment d_name += name_fragment
@ -301,12 +310,12 @@ def get_device_name(handle, devnumber, features_array=None):
return d_name return d_name
def get_device_battery_level(handle, devnumber, features_array=None): def get_device_battery_level(handle, devnumber, features=None):
"""Reads a device's battery level. """Reads a device's battery level.
:raises FeatureNotSupported: if the device does not support this feature. :raises FeatureNotSupported: if the device does not support this feature.
""" """
battery = request(handle, devnumber, C.FEATURE.BATTERY, features_array=features_array) battery = request(handle, devnumber, C.FEATURE.BATTERY, features=features)
if battery: if battery:
discharge, dischargeNext, status = _unpack('!BBB', battery[:3]) discharge, dischargeNext, status = _unpack('!BBB', battery[:3])
_l.log(_LOG_LEVEL, "(%d) battery %d%% charged, next level %d%% charge, status %d = %s", _l.log(_LOG_LEVEL, "(%d) battery %d%% charged, next level %d%% charge, status %d = %s",
@ -314,15 +323,14 @@ def get_device_battery_level(handle, devnumber, features_array=None):
return (discharge, dischargeNext, C.BATTERY_STATUS[status]) return (discharge, dischargeNext, C.BATTERY_STATUS[status])
def get_device_keys(handle, devnumber, features_array=None): def get_device_keys(handle, devnumber, features=None):
count = request(handle, devnumber, C.FEATURE.REPROGRAMMABLE_KEYS, features_array=features_array) count = request(handle, devnumber, C.FEATURE.REPROGRAMMABLE_KEYS, features=features)
if count: if count:
keys = [] keys = []
count = ord(count[:1]) count = ord(count[:1])
for index in range(0, count): for index in range(0, count):
keyindex = _pack('!B', index) keydata = request(handle, devnumber, C.FEATURE.REPROGRAMMABLE_KEYS, function=b'\x10', params=index, features=features)
keydata = request(handle, devnumber, C.FEATURE.REPROGRAMMABLE_KEYS, function=b'\x10', params=keyindex, features_array=features_array)
if keydata: if keydata:
key, key_task, flags = _unpack('!HHB', keydata[:5]) key, key_task, flags = _unpack('!HHB', keydata[:5])
keys.append(ReprogrammableKeyInfo(index, key, C.KEY_NAME[key], key_task, C.KEY_NAME[key_task], flags)) keys.append(ReprogrammableKeyInfo(index, key, C.KEY_NAME[key], key_task, C.KEY_NAME[key_task], flags))

View File

@ -216,7 +216,7 @@ def read(handle, timeout=DEFAULT_TIMEOUT):
# _l.log(_LOG_LEVEL, "(-) => r[]", handle) # _l.log(_LOG_LEVEL, "(-) => r[]", handle)
def request(handle, devnumber, feature_index_function, params=b'', features_array=None): def request(handle, devnumber, feature_index_function, params=b'', features=None):
"""Makes a feature call to a device and waits for a matching reply. """Makes a feature call to a device and waits for a matching reply.
This function will skip all incoming messages and events not related to the This function will skip all incoming messages and events not related to the
@ -227,8 +227,8 @@ def request(handle, devnumber, feature_index_function, params=b'', features_arra
:param devnumber: attached device number. :param devnumber: attached device number.
:param feature_index_function: a two-byte string of (feature_index, feature_function). :param feature_index_function: a two-byte string of (feature_index, feature_function).
:param params: parameters for the feature call, 3 to 16 bytes. :param params: parameters for the feature call, 3 to 16 bytes.
:param features_array: optional features array for the device, only used to :param features: optional features array for the device, only used to fill
fill the FeatureCallError exception if one occurs. the FeatureCallError exception if one occurs.
:returns: the reply data packet, or ``None`` if the device is no longer :returns: the reply data packet, or ``None`` if the device is no longer
available. available.
:raisees FeatureCallError: if the feature call replied with an error. :raisees FeatureCallError: if the feature call replied with an error.
@ -272,12 +272,12 @@ def request(handle, devnumber, feature_index_function, params=b'', features_arra
return None return None
if reply_code == 0x11 and reply_data[0] == b'\xFF' and reply_data[1:3] == feature_index_function: if reply_code == 0x11 and reply_data[0] == b'\xFF' and reply_data[1:3] == feature_index_function:
# an error returned from the device # the feature call returned with an error
error_code = ord(reply_data[3]) error_code = ord(reply_data[3])
_l.warn("(%d) request feature call error %d = %s: %s", devnumber, error_code, C.ERROR_NAME[error_code], _hexlify(reply_data)) _l.warn("(%d) request feature call error %d = %s: %s", devnumber, error_code, C.ERROR_NAME[error_code], _hexlify(reply_data))
feature_index = ord(feature_index_function[:1]) feature_index = ord(feature_index_function[:1])
feature_function = feature_index_function[1:2] feature_function = feature_index_function[1:2]
feature = None if features_array is None else features_array[feature_index] feature = None if features is None else features[feature_index] if feature_index < len(features) else None
raise E.FeatureCallError(devnumber, feature, feature_index, feature_function, error_code, reply_data) raise E.FeatureCallError(devnumber, feature, feature_index, feature_function, error_code, reply_data)
if reply_code == 0x11 and reply_data[:2] == feature_index_function: if reply_code == 0x11 and reply_data[:2] == feature_index_function:
@ -285,6 +285,11 @@ def request(handle, devnumber, feature_index_function, params=b'', features_arra
# _l.log(_LOG_LEVEL, "(%d) matched reply with feature-index-function [%s]", devnumber, _hexlify(reply_data[2:])) # _l.log(_LOG_LEVEL, "(%d) matched reply with feature-index-function [%s]", devnumber, _hexlify(reply_data[2:]))
return reply_data[2:] return reply_data[2:]
if reply_code == 0x10 and devnumber == 0xFF and reply_data[:2] == feature_index_function:
# direct calls to the receiver (device 0xFF) may also return successfully with reply code 0x10
# _l.log(_LOG_LEVEL, "(%d) matched reply with feature-index-function [%s]", devnumber, _hexlify(reply_data[2:]))
return reply_data[2:]
_l.log(_LOG_LEVEL, "(%d) unmatched reply {%s} (expected {%s})", devnumber, _hexlify(reply_data[:2]), _hexlify(feature_index_function)) _l.log(_LOG_LEVEL, "(%d) unmatched reply {%s} (expected {%s})", devnumber, _hexlify(reply_data[:2]), _hexlify(feature_index_function))
if unhandled_hook: if unhandled_hook:
unhandled_hook(reply_code, reply_devnumber, reply_data) unhandled_hook(reply_code, reply_devnumber, reply_data)

View File

@ -15,7 +15,7 @@ class Test_UR_API(unittest.TestCase):
def setUpClass(cls): def setUpClass(cls):
cls.handle = None cls.handle = None
cls.device = None cls.device = None
cls.features_array = None cls.features = None
cls.device_info = None cls.device_info = None
@classmethod @classmethod
@ -23,25 +23,30 @@ class Test_UR_API(unittest.TestCase):
if cls.handle: if cls.handle:
api.close(cls.handle) api.close(cls.handle)
cls.device = None cls.device = None
cls.features_array = None cls.features = None
cls.device_info = None cls.device_info = None
def _check(self, check_device=True, check_features=False):
if self.handle is None:
self.fail("No receiver found")
if check_device and self.device is None:
self.fail("Found no devices attached.")
if check_device and check_features and self.features is None:
self.fail("no feature set available")
def test_00_open_receiver(self): def test_00_open_receiver(self):
Test_UR_API.handle = api.open() Test_UR_API.handle = api.open()
if self.handle is None: self._check(check_device=False)
self.fail("No receiver found")
def test_05_ping_device_zero(self): def test_05_ping_device_zero(self):
if self.handle is None: self._check(check_device=False)
self.fail("No receiver found")
ok = api.ping(self.handle, 0) ok = api.ping(self.handle, 0)
self.assertIsNotNone(ok, "invalid ping reply") self.assertIsNotNone(ok, "invalid ping reply")
self.assertFalse(ok, "device zero replied") self.assertFalse(ok, "device zero replied")
def test_10_ping_all_devices(self): def test_10_ping_all_devices(self):
if self.handle is None: self._check(check_device=False)
self.fail("No receiver found")
devices = [] devices = []
@ -55,106 +60,71 @@ class Test_UR_API(unittest.TestCase):
Test_UR_API.device = devices[0] Test_UR_API.device = devices[0]
def test_30_get_feature_index(self): def test_30_get_feature_index(self):
if self.handle is None: self._check()
self.fail("No receiver found")
if self.device is None:
self.fail("Found no devices attached.")
fs_index = api.get_feature_index(self.handle, self.device, FEATURE.FEATURE_SET) fs_index = api.get_feature_index(self.handle, self.device, FEATURE.FEATURE_SET)
self.assertIsNotNone(fs_index, "feature FEATURE_SET not available") self.assertIsNotNone(fs_index, "feature FEATURE_SET not available")
self.assertGreater(fs_index, 0, "invalid FEATURE_SET index: " + str(fs_index)) self.assertGreater(fs_index, 0, "invalid FEATURE_SET index: " + str(fs_index))
def test_31_bad_feature(self): def test_31_bad_feature(self):
if self.handle is None: self._check()
self.fail("No receiver found")
if self.device is None:
self.fail("Found no devices attached.")
reply = api.request(self.handle, self.device, FEATURE.ROOT, params=b'\xFF\xFF') reply = api.request(self.handle, self.device, FEATURE.ROOT, params=b'\xFF\xFF')
self.assertIsNotNone(reply, "invalid reply") self.assertIsNotNone(reply, "invalid reply")
self.assertEqual(reply[:5], b'\x00' * 5, "invalid reply") self.assertEqual(reply[:5], b'\x00' * 5, "invalid reply")
def test_40_get_device_features(self): def test_40_get_device_features(self):
if self.handle is None: self._check()
self.fail("No receiver found")
if self.device is None:
self.fail("Found no devices attached.")
features = api.get_device_features(self.handle, self.device) features = api.get_device_features(self.handle, self.device)
self.assertIsNotNone(features, "failed to read features array") self.assertIsNotNone(features, "failed to read features array")
self.assertIn(FEATURE.FEATURE_SET, features, "feature FEATURE_SET not available") self.assertIn(FEATURE.FEATURE_SET, features, "feature FEATURE_SET not available")
# cache this to simplify next tests # cache this to simplify next tests
Test_UR_API.features_array = features Test_UR_API.features = features
def test_50_get_device_firmware(self): def test_50_get_device_firmware(self):
if self.handle is None: self._check(check_features=True)
self.fail("No receiver found")
if self.device is None:
self.fail("Found no devices attached.")
if self.features_array is None:
self.fail("no feature set available")
d_firmware = api.get_device_firmware(self.handle, self.device, self.features_array) d_firmware = api.get_device_firmware(self.handle, self.device, self.features)
self.assertIsNotNone(d_firmware, "failed to get device firmware") self.assertIsNotNone(d_firmware, "failed to get device firmware")
self.assertGreater(len(d_firmware), 0, "device reported no firmware") self.assertGreater(len(d_firmware), 0, "device reported no firmware")
for fw in d_firmware: for fw in d_firmware:
self.assertIsInstance(fw, FirmwareInfo) self.assertIsInstance(fw, FirmwareInfo)
def test_52_get_device_type(self): def test_52_get_device_type(self):
if self.handle is None: self._check(check_features=True)
self.fail("No receiver found")
if self.device is None:
self.fail("Found no devices attached.")
if self.features_array is None:
self.fail("no feature set available")
d_type = api.get_device_type(self.handle, self.device, self.features_array) d_type = api.get_device_type(self.handle, self.device, self.features)
self.assertIsNotNone(d_type, "failed to get device type") self.assertIsNotNone(d_type, "failed to get device type")
self.assertGreater(len(d_type), 0, "empty device type") self.assertGreater(len(d_type), 0, "empty device type")
def test_55_get_device_name(self): def test_55_get_device_name(self):
if self.handle is None: self._check(check_features=True)
self.fail("No receiver found")
if self.device is None:
self.fail("Found no devices attached.")
if self.features_array is None:
self.fail("no feature set available")
d_name = api.get_device_name(self.handle, self.device, self.features_array) d_name = api.get_device_name(self.handle, self.device, self.features)
self.assertIsNotNone(d_name, "failed to read device name") self.assertIsNotNone(d_name, "failed to read device name")
self.assertGreater(len(d_name), 0, "empty device name") self.assertGreater(len(d_name), 0, "empty device name")
def test_59_get_device_info(self): def test_59_get_device_info(self):
if self.handle is None: self._check(check_features=True)
self.fail("No receiver found")
if self.device is None:
self.fail("Found no devices attached.")
if self.features_array is None:
self.fail("no feature set available")
device_info = api.get_device_info(self.handle, self.device, features_array=self.features_array) device_info = api.get_device_info(self.handle, self.device, features=self.features)
self.assertIsNotNone(device_info, "failed to read full device info") self.assertIsNotNone(device_info, "failed to read full device info")
self.assertIsInstance(device_info, AttachedDeviceInfo) self.assertIsInstance(device_info, AttachedDeviceInfo)
Test_UR_API.device_info = device_info Test_UR_API.device_info = device_info
def test_60_get_battery_level(self): def test_60_get_battery_level(self):
if self.handle is None: self._check(check_features=True)
self.fail("No receiver found")
if self.device is None:
self.fail("Found no devices attached.")
if self.features_array is None:
self.fail("no feature set available")
if FEATURE.BATTERY in self.features_array: if FEATURE.BATTERY in self.features:
battery = api.get_device_battery_level(self.handle, self.device, self.features_array) battery = api.get_device_battery_level(self.handle, self.device, self.features)
self.assertIsNotNone(battery, "failed to read battery level") self.assertIsNotNone(battery, "failed to read battery level")
self.assertIsInstance(battery, tuple, "result not a tuple") self.assertIsInstance(battery, tuple, "result not a tuple")
else: else:
warnings.warn("BATTERY feature not supported by device %d" % self.device) warnings.warn("BATTERY feature not supported by device %d" % self.device)
def test_70_list_devices(self): def test_70_list_devices(self):
if self.handle is None: self._check(check_device=False)
self.fail("No receiver found")
all_devices = api.list_devices(self.handle) all_devices = api.list_devices(self.handle)
if all_devices: if all_devices:
@ -165,10 +135,7 @@ class Test_UR_API(unittest.TestCase):
self.assertIsNone(self.device) self.assertIsNone(self.device)
def test_70_find_device_by_name(self): def test_70_find_device_by_name(self):
if self.handle is None: self._check()
self.fail("No receiver found")
if self.device is None:
self.fail("Found no devices attached.")
all_devices = api.list_devices(self.handle) all_devices = api.list_devices(self.handle)
for device_info in all_devices: for device_info in all_devices:

View File

@ -6,11 +6,40 @@ logging.basicConfig(level=logging.DEBUG)
from binascii import hexlify from binascii import hexlify
from logitech.unifying_receiver import api from .unifying_receiver import (api, base)
from logitech.unifying_receiver.constants import * from .unifying_receiver.constants import *
def print_receiver(receiver):
print ("Unifying Receiver")
reply = base.request(receiver, 0xff, b'\x83\xB5', b'\x03')
if reply and reply[0:1] == b'\x03':
print (" Serial: %s" % hexlify(reply[1:5]))
reply = base.request(receiver, 0xff, b'\x81\xF1', b'\x01')
if reply and reply[0:1] == b'\x01':
fw_version = hexlify(reply[1:3])
firmware = fw_version[0:2] + '.' + fw_version[2:4]
else:
firmware = '??.??'
reply = base.request(receiver, 0xff, b'\x81\xF1', b'\x02')
if reply and reply[0:1] == b'\x02':
firmware += '.B' + hexlify(reply[1:3])
print (" Firmware version: %s" % firmware)
reply = base.request(receiver, 0xff, b'\x81\xF1', b'\x04')
if reply and reply[0:1] == b'\x04':
bl_version = hexlify(reply[1:3])
print (" Bootloader: %s.%s" % (bl_version[0:2], bl_version[2:4]))
print ("--------")
def scan_devices(receiver): def scan_devices(receiver):
print_receiver(receiver)
devices = api.list_devices(receiver) devices = api.list_devices(receiver)
if not devices: if not devices:
print ("!! No attached devices found.") print ("!! No attached devices found.")
@ -21,7 +50,7 @@ def scan_devices(receiver):
# print " Protocol %s" % devinfo.protocol # print " Protocol %s" % devinfo.protocol
for fw in devinfo.firmware: for fw in devinfo.firmware:
print (" %s firmware: %s version %s build %d" % (fw.type, fw.name, fw.version, fw.build)) print (" %s firmware: %s version %s build %d" % (fw.type, fw.name, fw.version, fw.build))
for index in range(0, len(devinfo.features)): for index in range(0, len(devinfo.features)):
feature = devinfo.features[index] feature = devinfo.features[index]
@ -29,11 +58,11 @@ def scan_devices(receiver):
print (" ~ Feature %s (%s) at index %d" % (FEATURE_NAME[feature], hexlify(feature), index)) print (" ~ Feature %s (%s) at index %d" % (FEATURE_NAME[feature], hexlify(feature), index))
if FEATURE.BATTERY in devinfo.features: if FEATURE.BATTERY in devinfo.features:
discharge, dischargeNext, status = api.get_device_battery_level(receiver, devinfo.number, features_array=devinfo.features) discharge, dischargeNext, status = api.get_device_battery_level(receiver, devinfo.number, features=devinfo.features)
print (" Battery %d charged (next level %d%), status %s" % (discharge, dischargeNext, status)) print (" Battery %d charged (next level %d%), status %s" % (discharge, dischargeNext, status))
if FEATURE.REPROGRAMMABLE_KEYS in devinfo.features: if FEATURE.REPROGRAMMABLE_KEYS in devinfo.features:
keys = api.get_device_keys(receiver, devinfo.number, features_array=devinfo.features) keys = api.get_device_keys(receiver, devinfo.number, features=devinfo.features)
if keys is not None and keys: if keys is not None and keys:
print (" %d reprogrammable keys found" % len(keys)) print (" %d reprogrammable keys found" % len(keys))
for k in keys: for k in keys:

View File

@ -1,10 +1,10 @@
#!/bin/sh #!/bin/sh
cd -P `dirname "$0"`/.. cd -P `dirname "$0"`/.. >/dev/null 2>&1
export LD_LIBRARY_PATH=$LD_LIBRARY_PATH:$PWD/lib/native/`uname -m` export LD_LIBRARY_PATH=$LD_LIBRARY_PATH:$PWD/lib/native/`uname -m`
export PYTHONPATH=$PWD/lib export PYTHONPATH=$PWD/lib
cd - cd - >/dev/null 2>&1
exec python -OO -u -m cli.hidconsole "$@" exec python -OO -u -m hidapi.hidconsole "$@"

View File

@ -1,10 +1,10 @@
#!/bin/sh #!/bin/sh
cd -P `dirname "$0"`/.. cd -P `dirname "$0"`/.. >/dev/null 2>&1
export LD_LIBRARY_PATH=$LD_LIBRARY_PATH:$PWD/lib/native/`uname -m` export LD_LIBRARY_PATH=$LD_LIBRARY_PATH:$PWD/lib/native/`uname -m`
export PYTHONPATH=$PWD/lib export PYTHONPATH=$PWD/lib
cd - cd - >/dev/null 2>&1
exec python -OO -m cli.ur_scanner "$@" exec python -OO -m logitech.ur_scanner "$@"