dropped obsolete files

This commit is contained in:
Daniel Pavel 2012-09-25 14:20:10 +03:00
parent 06304eb718
commit fc0a0ca2bb
2 changed files with 0 additions and 697 deletions

View File

@ -1,318 +0,0 @@
"""A few functions to deal with the Logitech Universal Receiver.
Uses the HID api exposed through hidapi.py.
Incomplete. Based on a bit of documentation, trial-and-error, and guesswork.
References:
http://julien.danjou.info/blog/2012/logitech-k750-linux-support
http://6xq.net/git/lars/lshidpp.git/plain/doc
"""
import logging
from . import hidapi
_TIMEOUT = 1000
class NoReceiver(Exception):
"""May be thrown when trying to talk through a previously connected
receiver that is no longer available (either because it was physically
disconnected or some other reason)."""
pass
FEATURE_ROOT = '\x00\x00'
FEATURE_GET_FEATURE_SET = '\x00\x01'
FEATURE_GET_FIRMWARE = '\x00\x03'
FEATURE_GET_NAME = '\x00\x05'
FEATURE_GET_BATTERY = '\x10\x00'
FEATURE_GET_REPROGRAMMABLE_KEYS = '\x1B\x00'
FEATURE_GET_WIRELESS_STATUS = '\x1D\x4B'
FEATURE_UNKNOWN_1 = '\x1D\xF3'
FEATURE_UNKNOWN_2 = '\x40\xA0'
FEATURE_UNKNOWN_3 = '\x41\x00'
FEATURE_GET_SOLAR_CHARGE = '\x43\x01'
FEATURE_UNKNOWN_4 = '\x45\x20'
DEVICE_TYPES = ("Keyboard", "Remote Control", "NUMPAD", "Mouse",
"Touchpad", "Trackball", "Presenter", "Receiver")
_DEVICE_FEATURES = {}
def _write(receiver, device, data):
# just in case
# hidapi.read(receiver, 128, 0)
data = '\x10' + chr(device) + data
logging.debug("w[%s]", data.encode("hex"))
return hidapi.write(receiver, data)
def _read(receiver, device, timeout=_TIMEOUT):
data = hidapi.read(receiver, 128, timeout)
if data is None:
print "r(None)"
return None
if not data:
# print "r[ ]"
return ""
# print "r[", data.encode("hex"), "]",
# if len(data) < 7:
# print "short", len(data),
# if ord(data[0]) == 0x20:
# # no idea what it does, not in any specs
# return _read(receiver, device)
if ord(data[1]) == 0:
# print "no device",
return _read(receiver, device)
if ord(data[1]) != device:
# print "wrong device",
return _read(receiver, device)
# print ""
return data
def _get_feature_index(receiver, device, feature_id):
if device not in _DEVICE_FEATURES:
_DEVICE_FEATURES[device] = [0] * 0x10
pass
elif feature_id in _DEVICE_FEATURES[device]:
return _DEVICE_FEATURES[device].index(feature_id)
if not _write(receiver, device, FEATURE_ROOT + feature_id + '\x00'):
# print "write failed, closing receiver"
close(receiver)
raise NoReceiver()
while True:
reply = _read(receiver, device)
if not reply:
break
if reply[2:4] != FEATURE_ROOT:
# ignore
continue
# only return active and supported features
if ord(reply[4]) and ord(reply[5]) & 0xA0 == 0:
index = ord(reply[4])
_DEVICE_FEATURES[device][index] = feature_id
return index
# huh?
return 0
def _req(receiver, device, feature_id, function='\x00',
param1='\x00', param2='\x00', param3='\x00', reply_function=None):
feature_index = _get_feature_index(receiver, device, feature_id)
if not feature_index:
return None
feature_index = chr(feature_index)
if not _write(receiver, device,
feature_index + function + param1 + param2 + param3):
# print "write failed, closing receiver"
close(receiver)
raise NoReceiver()
def _read_reply(receiver, device, attempts=2):
reply = _read(receiver, device)
if not reply:
if attempts > 0:
return _read_reply(receiver, device, attempts - 1)
return None
if reply[0] == '\x10' and reply[2] == '\x8F':
# invalid device
return None
if reply[0] == '\x11' and reply[2] == feature_index:
if reply[3] == reply_function if reply_function else function:
return reply
if reply[0] == '\x11':
return _read_reply(receiver, device, attempts - 1)
return _read_reply(receiver, device)
def _get_feature_set(receiver, device):
features = [0] * 0x10
reply = _req(receiver, device, FEATURE_GET_FEATURE_SET)
if reply:
for index in range(1, 1 + ord(reply[4])):
reply = _req(receiver, device, FEATURE_GET_FEATURE_SET, '\x10', chr(index))
if reply:
features[index] = reply[4:6].upper()
# print "feature", reply[4:6].encode('hex'), "index", index
return features
_PING_DEVICE = '\x10\x00\x00\x10\x00\x00\xAA'
def open():
"""Gets the HID device handle for the Unifying Receiver.
It is assumed a single receiver is connected to the machine. If more than
one are present, the first one found will be returned.
:returns: an opaque device handle if a receiver is found, or None.
"""
# USB ids for (Logitech, Unifying Receiver)
for rawdevice in hidapi.enumerate(0x046d, 0xc52b, 2):
# print "checking", rawdevice,
receiver = hidapi.open_path(rawdevice.path)
if not receiver:
# could be a permissions problem
# in any case, unreachable
# print "failed to open"
continue
# ping on a device id we know to be invalid
hidapi.write(receiver, _PING_DEVICE)
# if this is the right hidraw device, we'll receive a 'bad subdevice'
# otherwise, the read should produce nothing
reply = hidapi.read(receiver, 32, 200)
if reply:
# print "r[", reply.encode("hex"), "]",
if reply == '\x01\x00\x00\x00\x00\x00\x00\x00':
# no idea what this is
# print "nope"
pass
elif reply[:4] == "\x10\x00\x8F\x00":
# print "found", receiver
return receiver
# print "unknown"
else:
# print "no reply"
pass
hidapi.close(receiver)
def close(receiver):
"""Closes a HID device handle obtained with open()."""
if receiver:
try:
hidapi.close(receiver)
# print "closed", receiver
return True
except:
pass
return False
def ping(receiver, device):
# print "ping", device,
if not _write(receiver, device, _PING_DEVICE[2:]):
# print "write failed",
return False
reply = _read(receiver, device)
if not reply:
# print "no data",
return False
# 10018f00100900
if ord(reply[0]) == 0x10 and ord(reply[2]) == 0x8F:
# print "invalid",
return False
# 110100100200aa00000000000000000000000000
if ord(reply[0]) == 0x11 and reply[2:4] == "\x00\x10" and reply[6] == "\xAA":
return True
return False
def get_name(receiver, device):
reply = _req(receiver, device, FEATURE_GET_NAME)
if reply:
charcount = ord(reply[4])
name = ''
index = 0
while len(name) < charcount:
reply = _req(receiver, device, FEATURE_GET_NAME, '\x10', chr(index))
if reply:
name += reply[4:4 + charcount - index]
index = len(name)
else:
break
return name
def get_type(receiver, device):
reply = _req(receiver, device, FEATURE_GET_NAME, '\x20')
if reply:
return DEVICE_TYPES[ord(reply[4])]
def get_firmware_version(receiver, device, firmware_type=0):
reply = _req(receiver, device,
FEATURE_GET_FIRMWARE, '\x10', chr(firmware_type))
if reply:
return '%s %s.%s' % (
reply[5:8], reply[8:10].encode('hex'), reply[10:12].encode('hex'))
def get_battery_level(receiver, device):
reply = _req(receiver, device, FEATURE_GET_BATTERY)
if reply:
return (ord(reply[4]), ord(reply[5]), ord(reply[6]))
def get_reprogrammable_keys(receiver, device):
count = _req(receiver, device, FEATURE_GET_REPROGRAMMABLE_KEYS)
if count:
keys = []
for index in range(ord(count[4])):
key = _req(receiver, device,
FEATURE_GET_REPROGRAMMABLE_KEYS, '\x10', chr(index))
keys.append(key[4:6], keys[6:8], ord(key[8]))
return keys
def get_solar_charge(receiver, device):
reply = _req(receiver, device,
FEATURE_GET_SOLAR_CHARGE, '\x03', '\x78', '\x01', reply_function='\x10')
if reply:
charge = ord(reply[4])
lux = ord(reply[5]) << 8 | ord(reply[6])
# lux = int(round(((255 * ord(reply[5])) + ord(reply[6])) / 538.0, 2) * 100)
return (charge, lux)
def find_device(receiver, match_device_type=None, match_name=None):
"""Gets the device number for the first device matching.
The device type and name are case-insensitive.
"""
# Apparently a receiver supports up to 6 devices.
for device in range(1, 7):
if ping(receiver, device):
if device not in _DEVICE_FEATURES:
_DEVICE_FEATURES[device] = _get_feature_set(receiver, device)
# print get_reprogrammable_keys(receiver, device)
# d_firmware = get_firmware_version(receiver, device)
# print "device", device, "[", d_name, "/", d_type, "]"
# print "firmware", d_firmware, "features", _DEVICE_FEATURES[device]
if match_device_type:
d_type = get_type(receiver, device)
if d_type is None or match_device_type.lower() != d_type.lower():
continue
if match_name:
d_name = get_name(receiver, device)
if d_name is None or match_name.lower() != d_name.lower():
continue
return device

View File

@ -1,379 +0,0 @@
"""Low-level interface for devices connected through a Logitech Universal
Receiver (UR).
Uses the HID api exposed through hidapi.py.
Incomplete. Based on a bit of documentation, trial-and-error, and guesswork.
Strongly recommended to use these functions from a single thread; calling
multiple functions from different threads has a high chance of mixing the
replies and causing failures.
In the context of this API, 'handle' is the open handle of UR attached to
the machine, and 'device' is the number (1..6 according to the documentation)
of the device attached to the UR.
References:
http://julien.danjou.info/blog/2012/logitech-k750-linux-support
http://6xq.net/git/lars/lshidpp.git/plain/doc/
"""
#
# Logging set-up.
# Add a new logging level for tracing low-level writes and reads.
#
import logging
LOG_LEVEL = 1
def _urll_trace(self, msg, *args):
if self.isEnabledFor(LOG_LEVEL):
args = (None if x is None
else x.encode('hex') if type(x) == str and any(c < '\x20' or c > '\x7E' for c in x)
else x
for x in args)
self.log(LOG_LEVEL, msg, *args)
logging.addLevelName(LOG_LEVEL, 'trace1')
logging.Logger.trace1 = _urll_trace
_log = logging.getLogger('logitech.ur_lowlevel')
_log.setLevel(LOG_LEVEL)
#
#
#
"""Possible features available on a Logitech device.
A particular device might not support all these features, and may support other
unknown features as well.
"""
FEATURE = type('FEATURE', (),
dict(
ROOT=b'\x00\x00',
FEATURE_SET=b'\x00\x01',
FIRMWARE=b'\x00\x03',
NAME=b'\x00\x05',
BATTERY=b'\x10\x00',
REPROGRAMMABLE_KEYS=b'\x1B\x00',
WIRELESS_STATUS=b'\x1D\x4B',
# UNKNOWN_1=b'\x1D\xF3',
# UNKNOWN_2=b'\x40\xA0',
# UNKNOWN_3=b'\x41\x00',
SOLAR_CHARGE=b'\x43\x01',
# UNKNOWN_4=b'\x45\x20',
))
"""Possible types of devices connected to an UR."""
DEVICE_TYPES = ("Keyboard", "Remote Control", "NUMPAD", "Mouse",
"Touchpad", "Trackball", "Presenter", "Receiver")
"""Default timeout on read (in ms)."""
DEFAULT_TIMEOUT = 1000
"""Minimum size of a reply data packet."""
_MIN_REPLY_SIZE = 7
"""Maximum size of a reply data packet."""
_MAX_REPLY_SIZE = 64
class NoReceiver(Exception):
"""May be raised when trying to talk through a previously connected
receiver that is no longer available."""
pass
def _default_event_hook(reply_code, device, data):
_log.trace1("EVENT_HOOK |:%d| code %d status [%s]", device, reply_code, data)
"""A function that will be called on incoming events.
It must be a function with the signature: ``_(int, int, str)``, where the
parameters are: (reply code, device number, data).
"""
event_hook = _default_event_hook
#
# Low-level functions.
#
from . import hidapi
def open():
"""Opens the first Logitech UR found attached to the machine.
:returns: An open file handle for the found receiver, or ``None``.
"""
# USB ids for (Logitech, Unifying Receiver)
# interface 2 if the actual receiver interface
for rawdevice in hidapi.enumerate(0x046d, 0xc52b, 2):
_log.trace1("checking %s", rawdevice)
receiver = hidapi.open_path(rawdevice.path)
if not receiver:
# could be a file permissions issue
# in any case, unreachable
_log.trace1("[%s] open failed", rawdevice.path)
continue
_log.trace1("[%s] receiver handle |%d:|", rawdevice.path, receiver)
# ping on device id 0 (always an error)
hidapi.write(receiver, b'\x10\x00\x00\x10\x00\x00\xAA')
# if this is the right hidraw device, we'll receive a 'bad subdevice'
# otherwise, the read should produce nothing
reply = hidapi.read(receiver, _MAX_REPLY_SIZE, DEFAULT_TIMEOUT)
if reply:
_log.trace1("[%s] |%d:| exploratory ping reply [%s]", rawdevice.path, receiver, reply)
if reply[:4] == b'\x10\x00\x8F\x00':
# 'device 0 unreachable' is the expected reply from a valid receiver handle
_log.trace1("[%s] success: found receiver with handle |%d:|", rawdevice.path, receiver)
return receiver
if reply == b'\x01\x00\x00\x00\x00\x00\x00\x00':
# no idea what this is, but it comes up occasionally
_log.trace1("[%s] |%d:| mistery reply", rawdevice.path, receiver)
else:
_log.trace1("[%s] |%d:| unknown reply", rawdevice.path, receiver)
else:
_log.trace1("[%s] |%d:| no reply", rawdevice.path, receiver)
pass
# ignore
hidapi.close(receiver)
return (None, None)
def close(handle):
"""Closes a HID device handle."""
if handle:
try:
hidapi.close(handle)
_log.trace1("|%d:| closed", handle)
return True
except Exception as e:
_log.debug("|%d:| closing: %s", handle, e)
return False
def write(handle, device, feature_index, function=b'\x00',
param1=b'\x00', param2=b'\x00', param3=b'\x00'):
"""Write a feature call to the receiver.
:param handle: UR handle obtained with open().
:param device: attached device number
:param feature_index: index in the
"""
if type(feature_index) == int:
feature_index = chr(feature_index)
data = b''.join((feature_index, function, param1, param2, param3))
return _write(handle, device, data)
def _write(handle, device, data):
"""Writes some data to a certain device.
:returns: True if the data was successfully written.
"""
wdata = b''.join((b'\x10', chr(device), data, b'\x00' * (5 - len(data))))
_log.trace1("|%d:%d| <= w[%s]", handle, device, wdata)
# return hidapi.write(handle, wdata)
if not hidapi.write(handle, wdata):
_log.trace1("|%d:%d| write failed", handle, device)
raise NoReceiver()
return True
def read(handle, timeout=DEFAULT_TIMEOUT):
"""Read some data from the receiver.
If any data was read in the given timeout, returns a tuple of
(message key, device, message data).
"""
data = hidapi.read(handle, _MAX_REPLY_SIZE, timeout)
if data:
_log.trace1("|%d:*| => r[%s]", handle, data)
if len(data) < _MIN_REPLY_SIZE:
_log.trace1("|%d:*| => r[%s] short read", handle, data)
if len(data) > _MAX_REPLY_SIZE:
_log.trace1("|%d:*| => r[%s] long read", handle, data)
return ord(data[0]), ord(data[1]), data[2:]
else:
_log.trace1("|%d:*| => r[]", handle)
def _publish_event(reply_code, device, data):
if event_hook is not None:
event_hook.__call__(reply_code, device, data)
def request(handle, device, feature, function=b'\x00', data=b'', features_array=None):
if features_array is None:
features_array = get_device_features(handle, device)
if features_array is None:
_log.trace1("|%d:%d| no features array available", handle, device)
return None
if feature not in features_array:
_log.trace1("|%d:%d| feature <%s> not supported", handle, device, feature)
return None
index = chr(features_array.index(feature))
return _request(handle, device, index + function, data)
def _request(handle, device, feature_function, data=b''):
"""Makes a feature call device and waits for a matching reply.
Only call this in the initial set-up of the device.
This function will skip all incoming messages and events not related to the
device we're requesting for, or the feature specified in the initial
request; it will also wait for a matching reply indefinetly.
:param feature_function: a two-byte string of (feature_index, function).
:param data: additional data to send, up to 5 bytes.
:returns:
"""
_log.trace1("|%d:%d| request feature %s data %s", handle, device, feature_function, data)
if _write(handle, device, feature_function + data):
while True:
reply = read(handle)
if not reply:
# keep waiting...
continue
if reply[1] != device:
# this message not for the device we're interested in
_log.trace1("request reply for unexpected device %s", reply)
_publish_event(*reply)
continue
if reply[0] == 0x10 and reply[2][0] == b'\x8F':
# device not present
return None
if reply[0] == 0x11 and reply[2][:2] == feature_function:
# a matching reply
_log.trace1("|%d:%d| matched reply with data [%s]", handle, device, reply[2][2:])
return reply[2][2:]
_log.trace1("unmatched reply %s (%s)", reply[2][:2], feature_function)
_publish_event(*reply)
def ping(handle, device):
"""Pings a device to check if it is attached to the UR.
:returns: True if the device is connected to the UR, False if the device is
not attached, None if no conclusive reply is received.
"""
def _status(reply):
if not reply:
return None
# ping ok
if (reply[0] == 0x11 and reply[1] == device and
reply[2][:2] == b'\x00\x10' and
reply[2][4] == b'\xAA'):
_log.trace1("|%d:%d| ping: ok %s", handle, device, reply[2])
return True
# ping failed
if (reply[0] == 0x10 and reply[1] == device and
reply[2][:2] == b'\x8F\x00'):
_log.trace1("|%d:%d| ping: device not present", handle, device)
return False
# sometimes the first packet is a status packet
if (reply[0] == 0x11 and reply[1] == device and
reply[2][:2] == b'\x09\x00' and
reply[2][7:11] == b'GOOD'):
_log.trace1("|%d:%d| ping: status %s", handle, device, reply[2])
_publish_event(*reply)
return _status(read(handle))
# ugh
_log.trace1("|%d:%d| ping: unknown reply", handle, device, reply)
_publish_event(*reply)
return None
_log.trace1("|%d:%d| pinging", handle, device)
if _write(handle, device, b'\x00\x10\x00\x00\xAA'):
return _status(read(handle, DEFAULT_TIMEOUT * 3))
return None
def get_feature_index(handle, device, feature):
"""Reads the index of a device's feature.
:returns: An int, or None if the feature is not available.
"""
_log.trace1("|%d:%d| get feature index <%s>", handle, device, feature)
feature_index = _request(handle, device, FEATURE.ROOT, feature)
if feature_index:
# only consider active and supported features
if ord(feature_index[0]) and ord(feature_index[1]) & 0xA0 == 0:
_log.trace1("|%d:%d| feature <%s> index %s", handle, device, feature, feature_index[0])
return ord(feature_index[0])
_log.trace1("|%d:%d| feature <%s> not available", handle, device, feature)
def get_device_features(handle, device):
"""Returns an array of feature ids.
Their position in the array is the index to be used when accessing that
feature on the device.
Only call this function in the initial set-up of the device, because
other messages and events not related to querying the feature set
will be ignored.
"""
_log.trace1("|%d:%d| get device features", handle, device)
# get the index of the FEATURE_SET
fs_index = _request(handle, device, FEATURE.ROOT, FEATURE.FEATURE_SET)
if not fs_index:
_log.trace1("|%d:%d| FEATURE_SET not available", handle, device)
return None
fs_index = fs_index[0]
# For debugging purposes, query all the available features on the device,
# even if unknown.
# get the number of active features the device has
features_count = _request(handle, device, fs_index + b'\x00')
if not features_count:
# theoretically this cannot happen, as we've already called FEATURE_SET
_log.trace1("|%d:%d| no features available?!", handle, device)
return None
features_count = ord(features_count[0])
# a device may have a maximum of 15 features
features = [0] * 0x10
_log.trace1("|%d:%d| found %d features", handle, device, features_count)
for index in range(1, 1 + features_count):
# for each index, get the feature residing at that index
feature = _request(handle, device, fs_index + b'\x10', chr(index))
if feature:
features[index] = feature[0:2].upper()
_log.trace1("|%d:%d| feature <%s> at index %d", handle, device, features[index], index)
return None if all(c == 0 for c in features) else features