From fc0a0ca2bb4d54c18e0b5313874486bbc4820c72 Mon Sep 17 00:00:00 2001 From: Daniel Pavel Date: Tue, 25 Sep 2012 14:20:10 +0300 Subject: [PATCH] dropped obsolete files --- logitech/unifying_receiver.py | 318 ---------------------------- logitech/ur_lowlevel.py | 379 ---------------------------------- 2 files changed, 697 deletions(-) delete mode 100644 logitech/unifying_receiver.py delete mode 100644 logitech/ur_lowlevel.py diff --git a/logitech/unifying_receiver.py b/logitech/unifying_receiver.py deleted file mode 100644 index 6e40a41d..00000000 --- a/logitech/unifying_receiver.py +++ /dev/null @@ -1,318 +0,0 @@ -"""A few functions to deal with the Logitech Universal Receiver. - -Uses the HID api exposed through hidapi.py. -Incomplete. Based on a bit of documentation, trial-and-error, and guesswork. - -References: -http://julien.danjou.info/blog/2012/logitech-k750-linux-support -http://6xq.net/git/lars/lshidpp.git/plain/doc -""" - -import logging -from . import hidapi - - -_TIMEOUT = 1000 - - -class NoReceiver(Exception): - """May be thrown when trying to talk through a previously connected - receiver that is no longer available (either because it was physically - disconnected or some other reason).""" - pass - - -FEATURE_ROOT = '\x00\x00' -FEATURE_GET_FEATURE_SET = '\x00\x01' -FEATURE_GET_FIRMWARE = '\x00\x03' -FEATURE_GET_NAME = '\x00\x05' -FEATURE_GET_BATTERY = '\x10\x00' -FEATURE_GET_REPROGRAMMABLE_KEYS = '\x1B\x00' -FEATURE_GET_WIRELESS_STATUS = '\x1D\x4B' -FEATURE_UNKNOWN_1 = '\x1D\xF3' -FEATURE_UNKNOWN_2 = '\x40\xA0' -FEATURE_UNKNOWN_3 = '\x41\x00' -FEATURE_GET_SOLAR_CHARGE = '\x43\x01' -FEATURE_UNKNOWN_4 = '\x45\x20' - - -DEVICE_TYPES = ("Keyboard", "Remote Control", "NUMPAD", "Mouse", - "Touchpad", "Trackball", "Presenter", "Receiver") - -_DEVICE_FEATURES = {} - - -def _write(receiver, device, data): - # just in case - # hidapi.read(receiver, 128, 0) - data = '\x10' + chr(device) + data - logging.debug("w[%s]", data.encode("hex")) - return hidapi.write(receiver, data) - - -def _read(receiver, device, timeout=_TIMEOUT): - data = hidapi.read(receiver, 128, timeout) - if data is None: - print "r(None)" - return None - - if not data: - # print "r[ ]" - return "" - - # print "r[", data.encode("hex"), "]", - # if len(data) < 7: - # print "short", len(data), - - # if ord(data[0]) == 0x20: - # # no idea what it does, not in any specs - # return _read(receiver, device) - - if ord(data[1]) == 0: - # print "no device", - return _read(receiver, device) - - if ord(data[1]) != device: - # print "wrong device", - return _read(receiver, device) - - # print "" - return data - - -def _get_feature_index(receiver, device, feature_id): - if device not in _DEVICE_FEATURES: - _DEVICE_FEATURES[device] = [0] * 0x10 - pass - elif feature_id in _DEVICE_FEATURES[device]: - return _DEVICE_FEATURES[device].index(feature_id) - - if not _write(receiver, device, FEATURE_ROOT + feature_id + '\x00'): - # print "write failed, closing receiver" - close(receiver) - raise NoReceiver() - - while True: - reply = _read(receiver, device) - if not reply: - break - - if reply[2:4] != FEATURE_ROOT: - # ignore - continue - - # only return active and supported features - if ord(reply[4]) and ord(reply[5]) & 0xA0 == 0: - index = ord(reply[4]) - _DEVICE_FEATURES[device][index] = feature_id - return index - - # huh? - return 0 - - -def _req(receiver, device, feature_id, function='\x00', - param1='\x00', param2='\x00', param3='\x00', reply_function=None): - feature_index = _get_feature_index(receiver, device, feature_id) - if not feature_index: - return None - - feature_index = chr(feature_index) - if not _write(receiver, device, - feature_index + function + param1 + param2 + param3): - # print "write failed, closing receiver" - close(receiver) - raise NoReceiver() - - def _read_reply(receiver, device, attempts=2): - reply = _read(receiver, device) - if not reply: - if attempts > 0: - return _read_reply(receiver, device, attempts - 1) - return None - - if reply[0] == '\x10' and reply[2] == '\x8F': - # invalid device - return None - - if reply[0] == '\x11' and reply[2] == feature_index: - if reply[3] == reply_function if reply_function else function: - return reply - - if reply[0] == '\x11': - return _read_reply(receiver, device, attempts - 1) - - return _read_reply(receiver, device) - - -def _get_feature_set(receiver, device): - features = [0] * 0x10 - reply = _req(receiver, device, FEATURE_GET_FEATURE_SET) - if reply: - for index in range(1, 1 + ord(reply[4])): - reply = _req(receiver, device, FEATURE_GET_FEATURE_SET, '\x10', chr(index)) - if reply: - features[index] = reply[4:6].upper() - # print "feature", reply[4:6].encode('hex'), "index", index - - return features - - -_PING_DEVICE = '\x10\x00\x00\x10\x00\x00\xAA' - - -def open(): - """Gets the HID device handle for the Unifying Receiver. - - It is assumed a single receiver is connected to the machine. If more than - one are present, the first one found will be returned. - - :returns: an opaque device handle if a receiver is found, or None. - """ - # USB ids for (Logitech, Unifying Receiver) - for rawdevice in hidapi.enumerate(0x046d, 0xc52b, 2): - # print "checking", rawdevice, - receiver = hidapi.open_path(rawdevice.path) - if not receiver: - # could be a permissions problem - # in any case, unreachable - # print "failed to open" - continue - - # ping on a device id we know to be invalid - hidapi.write(receiver, _PING_DEVICE) - - # if this is the right hidraw device, we'll receive a 'bad subdevice' - # otherwise, the read should produce nothing - reply = hidapi.read(receiver, 32, 200) - if reply: - # print "r[", reply.encode("hex"), "]", - if reply == '\x01\x00\x00\x00\x00\x00\x00\x00': - # no idea what this is - # print "nope" - pass - elif reply[:4] == "\x10\x00\x8F\x00": - # print "found", receiver - return receiver - # print "unknown" - else: - # print "no reply" - pass - hidapi.close(receiver) - - -def close(receiver): - """Closes a HID device handle obtained with open().""" - if receiver: - try: - hidapi.close(receiver) - # print "closed", receiver - return True - except: - pass - return False - - -def ping(receiver, device): - # print "ping", device, - if not _write(receiver, device, _PING_DEVICE[2:]): - # print "write failed", - return False - - reply = _read(receiver, device) - if not reply: - # print "no data", - return False - - # 10018f00100900 - if ord(reply[0]) == 0x10 and ord(reply[2]) == 0x8F: - # print "invalid", - return False - - # 110100100200aa00000000000000000000000000 - if ord(reply[0]) == 0x11 and reply[2:4] == "\x00\x10" and reply[6] == "\xAA": - return True - - return False - - -def get_name(receiver, device): - reply = _req(receiver, device, FEATURE_GET_NAME) - if reply: - charcount = ord(reply[4]) - name = '' - index = 0 - while len(name) < charcount: - reply = _req(receiver, device, FEATURE_GET_NAME, '\x10', chr(index)) - if reply: - name += reply[4:4 + charcount - index] - index = len(name) - else: - break - return name - - -def get_type(receiver, device): - reply = _req(receiver, device, FEATURE_GET_NAME, '\x20') - if reply: - return DEVICE_TYPES[ord(reply[4])] - - -def get_firmware_version(receiver, device, firmware_type=0): - reply = _req(receiver, device, - FEATURE_GET_FIRMWARE, '\x10', chr(firmware_type)) - if reply: - return '%s %s.%s' % ( - reply[5:8], reply[8:10].encode('hex'), reply[10:12].encode('hex')) - - -def get_battery_level(receiver, device): - reply = _req(receiver, device, FEATURE_GET_BATTERY) - if reply: - return (ord(reply[4]), ord(reply[5]), ord(reply[6])) - - -def get_reprogrammable_keys(receiver, device): - count = _req(receiver, device, FEATURE_GET_REPROGRAMMABLE_KEYS) - if count: - keys = [] - for index in range(ord(count[4])): - key = _req(receiver, device, - FEATURE_GET_REPROGRAMMABLE_KEYS, '\x10', chr(index)) - keys.append(key[4:6], keys[6:8], ord(key[8])) - return keys - - -def get_solar_charge(receiver, device): - reply = _req(receiver, device, - FEATURE_GET_SOLAR_CHARGE, '\x03', '\x78', '\x01', reply_function='\x10') - if reply: - charge = ord(reply[4]) - lux = ord(reply[5]) << 8 | ord(reply[6]) - # lux = int(round(((255 * ord(reply[5])) + ord(reply[6])) / 538.0, 2) * 100) - return (charge, lux) - - -def find_device(receiver, match_device_type=None, match_name=None): - """Gets the device number for the first device matching. - - The device type and name are case-insensitive. - """ - # Apparently a receiver supports up to 6 devices. - for device in range(1, 7): - if ping(receiver, device): - if device not in _DEVICE_FEATURES: - _DEVICE_FEATURES[device] = _get_feature_set(receiver, device) - # print get_reprogrammable_keys(receiver, device) - # d_firmware = get_firmware_version(receiver, device) - # print "device", device, "[", d_name, "/", d_type, "]" - # print "firmware", d_firmware, "features", _DEVICE_FEATURES[device] - if match_device_type: - d_type = get_type(receiver, device) - if d_type is None or match_device_type.lower() != d_type.lower(): - continue - if match_name: - d_name = get_name(receiver, device) - if d_name is None or match_name.lower() != d_name.lower(): - continue - return device diff --git a/logitech/ur_lowlevel.py b/logitech/ur_lowlevel.py deleted file mode 100644 index aca5491e..00000000 --- a/logitech/ur_lowlevel.py +++ /dev/null @@ -1,379 +0,0 @@ -"""Low-level interface for devices connected through a Logitech Universal -Receiver (UR). - -Uses the HID api exposed through hidapi.py. -Incomplete. Based on a bit of documentation, trial-and-error, and guesswork. - -Strongly recommended to use these functions from a single thread; calling -multiple functions from different threads has a high chance of mixing the -replies and causing failures. - -In the context of this API, 'handle' is the open handle of UR attached to -the machine, and 'device' is the number (1..6 according to the documentation) -of the device attached to the UR. - -References: -http://julien.danjou.info/blog/2012/logitech-k750-linux-support -http://6xq.net/git/lars/lshidpp.git/plain/doc/ -""" - - -# -# Logging set-up. -# Add a new logging level for tracing low-level writes and reads. -# - -import logging - -LOG_LEVEL = 1 - -def _urll_trace(self, msg, *args): - if self.isEnabledFor(LOG_LEVEL): - args = (None if x is None - else x.encode('hex') if type(x) == str and any(c < '\x20' or c > '\x7E' for c in x) - else x - for x in args) - self.log(LOG_LEVEL, msg, *args) - -logging.addLevelName(LOG_LEVEL, 'trace1') -logging.Logger.trace1 = _urll_trace -_log = logging.getLogger('logitech.ur_lowlevel') -_log.setLevel(LOG_LEVEL) - - -# -# -# - - -"""Possible features available on a Logitech device. - -A particular device might not support all these features, and may support other -unknown features as well. -""" -FEATURE = type('FEATURE', (), - dict( - ROOT=b'\x00\x00', - FEATURE_SET=b'\x00\x01', - FIRMWARE=b'\x00\x03', - NAME=b'\x00\x05', - BATTERY=b'\x10\x00', - REPROGRAMMABLE_KEYS=b'\x1B\x00', - WIRELESS_STATUS=b'\x1D\x4B', - # UNKNOWN_1=b'\x1D\xF3', - # UNKNOWN_2=b'\x40\xA0', - # UNKNOWN_3=b'\x41\x00', - SOLAR_CHARGE=b'\x43\x01', - # UNKNOWN_4=b'\x45\x20', - )) - - -"""Possible types of devices connected to an UR.""" -DEVICE_TYPES = ("Keyboard", "Remote Control", "NUMPAD", "Mouse", - "Touchpad", "Trackball", "Presenter", "Receiver") - - -"""Default timeout on read (in ms).""" -DEFAULT_TIMEOUT = 1000 - - -"""Minimum size of a reply data packet.""" -_MIN_REPLY_SIZE = 7 - - -"""Maximum size of a reply data packet.""" -_MAX_REPLY_SIZE = 64 - -class NoReceiver(Exception): - """May be raised when trying to talk through a previously connected - receiver that is no longer available.""" - pass - - -def _default_event_hook(reply_code, device, data): - _log.trace1("EVENT_HOOK |:%d| code %d status [%s]", device, reply_code, data) - - -"""A function that will be called on incoming events. - -It must be a function with the signature: ``_(int, int, str)``, where the -parameters are: (reply code, device number, data). -""" -event_hook = _default_event_hook - - -# -# Low-level functions. -# - - -from . import hidapi - - -def open(): - """Opens the first Logitech UR found attached to the machine. - - :returns: An open file handle for the found receiver, or ``None``. - """ - # USB ids for (Logitech, Unifying Receiver) - # interface 2 if the actual receiver interface - for rawdevice in hidapi.enumerate(0x046d, 0xc52b, 2): - - _log.trace1("checking %s", rawdevice) - receiver = hidapi.open_path(rawdevice.path) - if not receiver: - # could be a file permissions issue - # in any case, unreachable - _log.trace1("[%s] open failed", rawdevice.path) - continue - - _log.trace1("[%s] receiver handle |%d:|", rawdevice.path, receiver) - # ping on device id 0 (always an error) - hidapi.write(receiver, b'\x10\x00\x00\x10\x00\x00\xAA') - - # if this is the right hidraw device, we'll receive a 'bad subdevice' - # otherwise, the read should produce nothing - reply = hidapi.read(receiver, _MAX_REPLY_SIZE, DEFAULT_TIMEOUT) - if reply: - _log.trace1("[%s] |%d:| exploratory ping reply [%s]", rawdevice.path, receiver, reply) - - if reply[:4] == b'\x10\x00\x8F\x00': - # 'device 0 unreachable' is the expected reply from a valid receiver handle - _log.trace1("[%s] success: found receiver with handle |%d:|", rawdevice.path, receiver) - return receiver - - if reply == b'\x01\x00\x00\x00\x00\x00\x00\x00': - # no idea what this is, but it comes up occasionally - _log.trace1("[%s] |%d:| mistery reply", rawdevice.path, receiver) - else: - _log.trace1("[%s] |%d:| unknown reply", rawdevice.path, receiver) - else: - _log.trace1("[%s] |%d:| no reply", rawdevice.path, receiver) - pass - - # ignore - hidapi.close(receiver) - - return (None, None) - - -def close(handle): - """Closes a HID device handle.""" - if handle: - try: - hidapi.close(handle) - _log.trace1("|%d:| closed", handle) - return True - except Exception as e: - _log.debug("|%d:| closing: %s", handle, e) - - return False - - -def write(handle, device, feature_index, function=b'\x00', - param1=b'\x00', param2=b'\x00', param3=b'\x00'): - """Write a feature call to the receiver. - - :param handle: UR handle obtained with open(). - :param device: attached device number - :param feature_index: index in the - """ - if type(feature_index) == int: - feature_index = chr(feature_index) - data = b''.join((feature_index, function, param1, param2, param3)) - return _write(handle, device, data) - - -def _write(handle, device, data): - """Writes some data to a certain device. - - :returns: True if the data was successfully written. - """ - wdata = b''.join((b'\x10', chr(device), data, b'\x00' * (5 - len(data)))) - _log.trace1("|%d:%d| <= w[%s]", handle, device, wdata) - # return hidapi.write(handle, wdata) - if not hidapi.write(handle, wdata): - _log.trace1("|%d:%d| write failed", handle, device) - raise NoReceiver() - return True - - -def read(handle, timeout=DEFAULT_TIMEOUT): - """Read some data from the receiver. - - If any data was read in the given timeout, returns a tuple of - (message key, device, message data). - """ - data = hidapi.read(handle, _MAX_REPLY_SIZE, timeout) - if data: - _log.trace1("|%d:*| => r[%s]", handle, data) - if len(data) < _MIN_REPLY_SIZE: - _log.trace1("|%d:*| => r[%s] short read", handle, data) - if len(data) > _MAX_REPLY_SIZE: - _log.trace1("|%d:*| => r[%s] long read", handle, data) - return ord(data[0]), ord(data[1]), data[2:] - else: - _log.trace1("|%d:*| => r[]", handle) - - -def _publish_event(reply_code, device, data): - if event_hook is not None: - event_hook.__call__(reply_code, device, data) - - -def request(handle, device, feature, function=b'\x00', data=b'', features_array=None): - if features_array is None: - features_array = get_device_features(handle, device) - if features_array is None: - _log.trace1("|%d:%d| no features array available", handle, device) - return None - - if feature not in features_array: - _log.trace1("|%d:%d| feature <%s> not supported", handle, device, feature) - return None - - index = chr(features_array.index(feature)) - return _request(handle, device, index + function, data) - - -def _request(handle, device, feature_function, data=b''): - """Makes a feature call device and waits for a matching reply. - - Only call this in the initial set-up of the device. - - This function will skip all incoming messages and events not related to the - device we're requesting for, or the feature specified in the initial - request; it will also wait for a matching reply indefinetly. - - :param feature_function: a two-byte string of (feature_index, function). - :param data: additional data to send, up to 5 bytes. - :returns: - """ - _log.trace1("|%d:%d| request feature %s data %s", handle, device, feature_function, data) - if _write(handle, device, feature_function + data): - while True: - reply = read(handle) - - if not reply: - # keep waiting... - continue - - if reply[1] != device: - # this message not for the device we're interested in - _log.trace1("request reply for unexpected device %s", reply) - _publish_event(*reply) - continue - - if reply[0] == 0x10 and reply[2][0] == b'\x8F': - # device not present - return None - - if reply[0] == 0x11 and reply[2][:2] == feature_function: - # a matching reply - _log.trace1("|%d:%d| matched reply with data [%s]", handle, device, reply[2][2:]) - return reply[2][2:] - - _log.trace1("unmatched reply %s (%s)", reply[2][:2], feature_function) - _publish_event(*reply) - - -def ping(handle, device): - """Pings a device to check if it is attached to the UR. - - :returns: True if the device is connected to the UR, False if the device is - not attached, None if no conclusive reply is received. - """ - def _status(reply): - if not reply: - return None - - # ping ok - if (reply[0] == 0x11 and reply[1] == device and - reply[2][:2] == b'\x00\x10' and - reply[2][4] == b'\xAA'): - _log.trace1("|%d:%d| ping: ok %s", handle, device, reply[2]) - return True - - # ping failed - if (reply[0] == 0x10 and reply[1] == device and - reply[2][:2] == b'\x8F\x00'): - _log.trace1("|%d:%d| ping: device not present", handle, device) - return False - - # sometimes the first packet is a status packet - if (reply[0] == 0x11 and reply[1] == device and - reply[2][:2] == b'\x09\x00' and - reply[2][7:11] == b'GOOD'): - _log.trace1("|%d:%d| ping: status %s", handle, device, reply[2]) - _publish_event(*reply) - return _status(read(handle)) - - # ugh - _log.trace1("|%d:%d| ping: unknown reply", handle, device, reply) - _publish_event(*reply) - return None - - _log.trace1("|%d:%d| pinging", handle, device) - if _write(handle, device, b'\x00\x10\x00\x00\xAA'): - return _status(read(handle, DEFAULT_TIMEOUT * 3)) - return None - - -def get_feature_index(handle, device, feature): - """Reads the index of a device's feature. - - :returns: An int, or None if the feature is not available. - """ - _log.trace1("|%d:%d| get feature index <%s>", handle, device, feature) - feature_index = _request(handle, device, FEATURE.ROOT, feature) - if feature_index: - # only consider active and supported features - if ord(feature_index[0]) and ord(feature_index[1]) & 0xA0 == 0: - _log.trace1("|%d:%d| feature <%s> index %s", handle, device, feature, feature_index[0]) - return ord(feature_index[0]) - - _log.trace1("|%d:%d| feature <%s> not available", handle, device, feature) - - -def get_device_features(handle, device): - """Returns an array of feature ids. - - Their position in the array is the index to be used when accessing that - feature on the device. - - Only call this function in the initial set-up of the device, because - other messages and events not related to querying the feature set - will be ignored. - """ - _log.trace1("|%d:%d| get device features", handle, device) - - # get the index of the FEATURE_SET - fs_index = _request(handle, device, FEATURE.ROOT, FEATURE.FEATURE_SET) - if not fs_index: - _log.trace1("|%d:%d| FEATURE_SET not available", handle, device) - return None - fs_index = fs_index[0] - - # For debugging purposes, query all the available features on the device, - # even if unknown. - - # get the number of active features the device has - features_count = _request(handle, device, fs_index + b'\x00') - if not features_count: - # theoretically this cannot happen, as we've already called FEATURE_SET - _log.trace1("|%d:%d| no features available?!", handle, device) - return None - features_count = ord(features_count[0]) - - # a device may have a maximum of 15 features - features = [0] * 0x10 - _log.trace1("|%d:%d| found %d features", handle, device, features_count) - - for index in range(1, 1 + features_count): - # for each index, get the feature residing at that index - feature = _request(handle, device, fs_index + b'\x10', chr(index)) - if feature: - features[index] = feature[0:2].upper() - _log.trace1("|%d:%d| feature <%s> at index %d", handle, device, features[index], index) - - return None if all(c == 0 for c in features) else features