467 lines
		
	
	
		
			14 KiB
		
	
	
	
		
			Python
		
	
	
	
			
		
		
	
	
			467 lines
		
	
	
		
			14 KiB
		
	
	
	
		
			Python
		
	
	
	
| #
 | |
| # Logitech Unifying Receiver API.
 | |
| #
 | |
| 
 | |
| from struct import pack as _pack
 | |
| from struct import unpack as _unpack
 | |
| import errno as _errno
 | |
| 
 | |
| 
 | |
| from . import base as _base
 | |
| from .common import (FirmwareInfo as _FirmwareInfo,
 | |
| 					ReprogrammableKeyInfo as _ReprogrammableKeyInfo)
 | |
| from .constants import (FEATURE, FEATURE_NAME, FEATURE_FLAGS,
 | |
| 						FIRMWARE_KIND, DEVICE_KIND,
 | |
| 						BATTERY_STATUS, KEY_NAME,
 | |
| 						MAX_ATTACHED_DEVICES)
 | |
| from .exceptions import FeatureNotSupported as _FeatureNotSupported
 | |
| 
 | |
| 
 | |
| _hex = _base._hex
 | |
| 
 | |
| from logging import getLogger
 | |
| _log = getLogger('LUR').getChild('api')
 | |
| del getLogger
 | |
| 
 | |
| #
 | |
| #
 | |
| #
 | |
| 
 | |
| class PairedDevice(object):
 | |
| 	def __init__(self, handle, number):
 | |
| 		self.handle = handle
 | |
| 		self.number = number
 | |
| 
 | |
| 		self._name = None
 | |
| 		self._kind = None
 | |
| 		self._firmware = None
 | |
| 		self.features = [FEATURE.ROOT]
 | |
| 
 | |
| 	@property
 | |
| 	def name(self):
 | |
| 		if self._name is None:
 | |
| 			self._name = get_device_name(self.handle, self.number, self.features)
 | |
| 		return self._name or '?'
 | |
| 
 | |
| 	@property
 | |
| 	def kind(self):
 | |
| 		if self._kind is None:
 | |
| 			self._kind = get_device_kind(self.handle, self.number, self.features)
 | |
| 		return self._kind or '?'
 | |
| 
 | |
| 	@property
 | |
| 	def firmware(self):
 | |
| 		if self._firmware is None:
 | |
| 			self._firmware = get_device_firmware(self.handle, self.number, self.features)
 | |
| 		return self._firmware or ()
 | |
| 
 | |
| 	def ping(self):
 | |
| 		reply = _base.request(self.handle, self.number, b'\x00\x10', b'\x00\x00\xAA')
 | |
| 		return reply is not None and reply[2:3] == b'\xAA'
 | |
| 
 | |
| 	def __str__(self):
 | |
| 		return '<PairedDevice(%X,%d,%s)>' % (self.handle, self.number, self._name or '?')
 | |
| 
 | |
| 	def __hash__(self):
 | |
| 		return self.number
 | |
| 
 | |
| 
 | |
| class Receiver(object):
 | |
| 	name = 'Unifying Receiver'
 | |
| 	max_devices = MAX_ATTACHED_DEVICES
 | |
| 
 | |
| 	def __init__(self, handle, path=None):
 | |
| 		self.handle = handle
 | |
| 		self.path = path
 | |
| 
 | |
| 		self._serial = None
 | |
| 		self._firmware = None
 | |
| 
 | |
| 	def close(self):
 | |
| 		handle, self.handle = self.handle, 0
 | |
| 		return (handle and _base.close(handle))
 | |
| 
 | |
| 	@property
 | |
| 	def serial(self):
 | |
| 		if self._serial is None and self.handle:
 | |
| 			serial = _base.request(self.handle, 0xFF, b'\x83\xB5', b'\x03')
 | |
| 			if serial:
 | |
| 				self._serial = _hex(serial[1:5])
 | |
| 		return self._serial
 | |
| 
 | |
| 	@property
 | |
| 	def firmware(self):
 | |
| 		if self._firmware is None and self.handle:
 | |
| 			firmware = []
 | |
| 
 | |
| 			reply = _base.request(self.handle, 0xFF, b'\x83\xB5', b'\x02')
 | |
| 			if reply and reply[0:1] == b'\x02':
 | |
| 				fw_version = _hex(reply[1:5])
 | |
| 				fw_version = '%s.%s.B%s' % (fw_version[0:2], fw_version[2:4], fw_version[4:8])
 | |
| 				firmware.append(_FirmwareInfo(0, FIRMWARE_KIND[0], '', fw_version, None))
 | |
| 
 | |
| 			reply = _base.request(self.handle, 0xFF, b'\x81\xF1', b'\x04')
 | |
| 			if reply and reply[0:1] == b'\x04':
 | |
| 				bl_version = _hex(reply[1:3])
 | |
| 				bl_version = '%s.%s' % (bl_version[0:2], bl_version[2:4])
 | |
| 				firmware.append(_FirmwareInfo(1, FIRMWARE_KIND[1], '', bl_version, None))
 | |
| 
 | |
| 			self._firmware = tuple(firmware)
 | |
| 
 | |
| 		return self._firmware
 | |
| 
 | |
| 	def __iter__(self):
 | |
| 		if self.handle == 0:
 | |
| 			return
 | |
| 
 | |
| 		for number in range(1, 1 + MAX_ATTACHED_DEVICES):
 | |
| 			dev = get_device(self.handle, number)
 | |
| 			if dev is not None:
 | |
| 				yield dev
 | |
| 
 | |
| 	def __getitem__(self, key):
 | |
| 		if type(key) != int:
 | |
| 			raise TypeError('key must be an integer')
 | |
| 		if self.handle == 0 or key < 0 or key > MAX_ATTACHED_DEVICES:
 | |
| 			raise IndexError(key)
 | |
| 		return get_device(self.handle, key) if key > 0 else None
 | |
| 
 | |
| 	def __delitem__(self, key):
 | |
| 		if type(key) != int:
 | |
| 			raise TypeError('key must be an integer')
 | |
| 		if self.handle == 0 or key < 0 or key > MAX_ATTACHED_DEVICES:
 | |
| 			raise IndexError(key)
 | |
| 		if key > 0:
 | |
| 			_log.debug("unpairing device %d", key)
 | |
| 			reply = _base.request(self.handle, 0xFF, b'\x80\xB2', _pack('!BB', 0x03, key))
 | |
| 			if reply is None or reply[1:2] == b'\x8F':
 | |
| 				raise IndexError(key)
 | |
| 
 | |
| 	def __len__(self):
 | |
| 		if self.handle == 0:
 | |
| 			return 0
 | |
| 		# not really sure about this one...
 | |
| 		count = _base.request(self.handle, 0xFF, b'\x81\x00')
 | |
| 		return 0 if count is None else ord(count[1:2])
 | |
| 
 | |
| 	def __contains__(self, dev):
 | |
| 		if self.handle == 0:
 | |
| 			return False
 | |
| 		if type(dev) == int:
 | |
| 			return (dev < 1 or dev > MAX_ATTACHED_DEVICES) and ping(self.handle, dev)
 | |
| 		return ping(self.handle, dev.number)
 | |
| 
 | |
| 	def __str__(self):
 | |
| 		return '<Receiver(%X,%s)>' % (self.handle, self.path)
 | |
| 
 | |
| 	def __hash__(self):
 | |
| 		return self.handle
 | |
| 
 | |
| 	__bool__ = __nonzero__ = lambda self: self.handle != 0
 | |
| 
 | |
| 	@classmethod
 | |
| 	def open(self):
 | |
| 		"""Opens the first Logitech Unifying Receiver found attached to the machine.
 | |
| 
 | |
| 		:returns: An open file handle for the found receiver, or ``None``.
 | |
| 		"""
 | |
| 		exception = None
 | |
| 
 | |
| 		for rawdevice in _base.list_receiver_devices():
 | |
| 			exception = None
 | |
| 			try:
 | |
| 				handle = _base.try_open(rawdevice.path)
 | |
| 				if handle:
 | |
| 					return Receiver(handle, rawdevice.path)
 | |
| 			except OSError as e:
 | |
| 				_log.exception("open %s", rawdevice.path)
 | |
| 				if e.errno == _errno.EACCES:
 | |
| 					exception = e
 | |
| 
 | |
| 		if exception:
 | |
| 			# only keep the last exception
 | |
| 			raise exception
 | |
| 
 | |
| #
 | |
| #
 | |
| #
 | |
| 
 | |
| def request(handle, devnumber, feature, function=b'\x00', params=b'', features=None):
 | |
| 	"""Makes a feature call to the device, and returns the reply data.
 | |
| 
 | |
| 	Basically a write() followed by (possibly multiple) reads, until a reply
 | |
| 	matching the called feature is received. In theory the UR will always reply
 | |
| 	to feature call; otherwise this function will wait indefinitely.
 | |
| 
 | |
| 	Incoming data packets not matching the feature and function will be
 | |
| 	delivered to the unhandled hook (if any), and ignored.
 | |
| 
 | |
| 	:param function: the function to call on that feature, may be an byte value
 | |
| 	or a bytes string of length 1.
 | |
| 	:param params: optional bytes string to send as function parameters to the
 | |
| 	feature; may also be an integer if the function only takes a single byte as
 | |
| 	parameter.
 | |
| 
 | |
| 	The optional ``features`` parameter is a cached result of the
 | |
| 	get_device_features function for this device, necessary to find the feature
 | |
| 	index. If the ``features_arrary`` is not provided, one will be obtained by
 | |
| 	manually calling get_device_features before making the request call proper.
 | |
| 
 | |
| 	:raises FeatureNotSupported: if the device does not support the feature.
 | |
| 	"""
 | |
| 	feature_index = None
 | |
| 	if feature == FEATURE.ROOT:
 | |
| 		feature_index = b'\x00'
 | |
| 	else:
 | |
| 		feature_index = _get_feature_index(handle, devnumber, feature, features)
 | |
| 		if feature_index is None:
 | |
| 			# i/o read error
 | |
| 			return None
 | |
| 
 | |
| 		feature_index = _pack('!B', feature_index)
 | |
| 
 | |
| 	if type(function) == int:
 | |
| 		function = _pack('!B', function)
 | |
| 	if type(params) == int:
 | |
| 		params = _pack('!B', params)
 | |
| 
 | |
| 	return _base.request(handle, devnumber, feature_index + function, params)
 | |
| 
 | |
| 
 | |
| def ping(handle, devnumber):
 | |
| 	"""
 | |
| 	:returns: True if the device is connected to the UR.
 | |
| 	"""
 | |
| 	reply = _base.request(handle, devnumber, b'\x00\x10', b'\x00\x00\xAA')
 | |
| 	return reply is not None and reply[2:3] == b'\xAA'
 | |
| 
 | |
| 
 | |
| def get_device(handle, devnumber, features=None):
 | |
| 	"""Gets the complete info for a device (type, features).
 | |
| 
 | |
| 	:returns: a PairedDevice or ``None``.
 | |
| 	"""
 | |
| 	if ping(handle, devnumber):
 | |
| 		devinfo = PairedDevice(handle, devnumber)
 | |
| 		# _log.debug("found device %s", devinfo)
 | |
| 		return devinfo
 | |
| 
 | |
| 
 | |
| def get_feature_index(handle, devnumber, feature):
 | |
| 	"""Reads the index of a device's feature.
 | |
| 
 | |
| 	:returns: An int, or ``None`` if the feature is not available.
 | |
| 	"""
 | |
| 	# _log.debug("device %d get feature index <%s:%s>", devnumber, _hex(feature), FEATURE_NAME[feature])
 | |
| 	if len(feature) != 2:
 | |
| 		raise ValueError("invalid feature <%s>: it must be a two-byte string" % feature)
 | |
| 
 | |
| 	# FEATURE.ROOT should always be available for any attached devices
 | |
| 	reply = _base.request(handle, devnumber, FEATURE.ROOT, feature)
 | |
| 	if reply:
 | |
| 		feature_index = ord(reply[0:1])
 | |
| 		if feature_index:
 | |
| 			# feature_flags = ord(reply[1:2]) & 0xE0
 | |
| 			# if feature_flags:
 | |
| 			# 	_log.debug("device %d feature <%s:%s> has index %d: %s",
 | |
| 			# 				devnumber, _hex(feature), FEATURE_NAME[feature], feature_index,
 | |
| 			# 				','.join([FEATURE_FLAGS[k] for k in FEATURE_FLAGS if feature_flags & k]))
 | |
| 			# else:
 | |
| 			# 	_log.debug("device %d feature <%s:%s> has index %d", devnumber, _hex(feature), FEATURE_NAME[feature], feature_index)
 | |
| 
 | |
| 			# only consider active and supported features?
 | |
| 			# if feature_flags:
 | |
| 			# 	raise E.FeatureNotSupported(devnumber, feature)
 | |
| 
 | |
| 			return feature_index
 | |
| 
 | |
| 		_log.warn("device %d feature <%s:%s> not supported by the device", devnumber, _hex(feature), FEATURE_NAME[feature])
 | |
| 		raise _FeatureNotSupported(devnumber, feature)
 | |
| 
 | |
| 
 | |
| def _get_feature_index(handle, devnumber, feature, features=None):
 | |
| 	if features is None:
 | |
| 		return get_feature_index(handle, devnumber, feature)
 | |
| 
 | |
| 	if feature in features:
 | |
| 		return features.index(feature)
 | |
| 
 | |
| 	index = get_feature_index(handle, devnumber, feature)
 | |
| 	if index is not None:
 | |
| 		if len(features) <= index:
 | |
| 			features += [None] * (index + 1 - len(features))
 | |
| 		features[index] = feature
 | |
| 		return index
 | |
| 
 | |
| 
 | |
| def get_device_features(handle, devnumber):
 | |
| 	"""Returns an array of feature ids.
 | |
| 
 | |
| 	Their position in the array is the index to be used when requesting that
 | |
| 	feature on the device.
 | |
| 	"""
 | |
| 	# _log.debug("device %d get device features", devnumber)
 | |
| 
 | |
| 	# get the index of the FEATURE_SET
 | |
| 	# FEATURE.ROOT should always be available for all devices
 | |
| 	fs_index = _base.request(handle, devnumber, FEATURE.ROOT, FEATURE.FEATURE_SET)
 | |
| 	if fs_index is None:
 | |
| 		_log.warn("device %d FEATURE_SET not available", devnumber)
 | |
| 		return None
 | |
| 	fs_index = fs_index[:1]
 | |
| 
 | |
| 	# For debugging purposes, query all the available features on the device,
 | |
| 	# even if unknown.
 | |
| 
 | |
| 	# get the number of active features the device has
 | |
| 	features_count = _base.request(handle, devnumber, fs_index + b'\x00')
 | |
| 	if not features_count:
 | |
| 		# this can happen if the device disappeard since the fs_index request
 | |
| 		# otherwise we should get at least a count of 1 (the FEATURE_SET we've just used above)
 | |
| 		_log.debug("device %d no features available?!", devnumber)
 | |
| 		return None
 | |
| 
 | |
| 	features_count = ord(features_count[:1])
 | |
| 	# _log.debug("device %d found %d features", devnumber, features_count)
 | |
| 
 | |
| 	features = [None] * 0x20
 | |
| 	for index in range(1, 1 + features_count):
 | |
| 		# for each index, get the feature residing at that index
 | |
| 		feature = _base.request(handle, devnumber, fs_index + b'\x10', _pack('!B', index))
 | |
| 		if feature:
 | |
| 			# feature_flags = ord(feature[2:3]) & 0xE0
 | |
| 			feature = feature[0:2].upper()
 | |
| 			features[index] = feature
 | |
| 
 | |
| 			# if feature_flags:
 | |
| 			# 	_log.debug("device %d feature <%s:%s> at index %d: %s",
 | |
| 			# 				devnumber, _hex(feature), FEATURE_NAME[feature], index,
 | |
| 			# 				','.join([FEATURE_FLAGS[k] for k in FEATURE_FLAGS if feature_flags & k]))
 | |
| 			# else:
 | |
| 			# 	_log.debug("device %d feature <%s:%s> at index %d", devnumber, _hex(feature), FEATURE_NAME[feature], index)
 | |
| 
 | |
| 	features[0] = FEATURE.ROOT
 | |
| 	while features[-1] is None:
 | |
| 		del features[-1]
 | |
| 	return tuple(features)
 | |
| 
 | |
| 
 | |
| def get_device_firmware(handle, devnumber, features=None):
 | |
| 	"""Reads a device's firmware info.
 | |
| 
 | |
| 	:returns: a list of FirmwareInfo tuples, ordered by firmware layer.
 | |
| 	"""
 | |
| 	fw_fi = _get_feature_index(handle, devnumber, FEATURE.FIRMWARE, features)
 | |
| 	if fw_fi is None:
 | |
| 		return None
 | |
| 
 | |
| 	fw_count = _base.request(handle, devnumber, _pack('!BB', fw_fi, 0x00))
 | |
| 	if fw_count:
 | |
| 		fw_count = ord(fw_count[:1])
 | |
| 
 | |
| 		fw = []
 | |
| 		for index in range(0, fw_count):
 | |
| 			fw_info = _base.request(handle, devnumber, _pack('!BB', fw_fi, 0x10), params=index)
 | |
| 			if fw_info:
 | |
| 				level = ord(fw_info[:1]) & 0x0F
 | |
| 				if level == 0 or level == 1:
 | |
| 					kind = FIRMWARE_KIND[level]
 | |
| 					name, = _unpack('!3s', fw_info[1:4])
 | |
| 					name = name.decode('ascii')
 | |
| 					version = _hex(fw_info[4:6])
 | |
| 					version = '%s.%s' % (version[0:2], version[2:4])
 | |
| 					build, = _unpack('!H', fw_info[6:8])
 | |
| 					if build:
 | |
| 						version += ' b%d' % build
 | |
| 					extras = fw_info[9:].rstrip(b'\x00') or None
 | |
| 					fw_info = _FirmwareInfo(level, kind, name, version, extras)
 | |
| 				elif level == 2:
 | |
| 					fw_info = _FirmwareInfo(2, FIRMWARE_KIND[2], '', ord(fw_info[1:2]), None)
 | |
| 				else:
 | |
| 					fw_info = _FirmwareInfo(level, FIRMWARE_KIND[-1], '', '', None)
 | |
| 
 | |
| 				fw.append(fw_info)
 | |
| 				# _log.debug("device %d firmware %s", devnumber, fw_info)
 | |
| 		return tuple(fw)
 | |
| 
 | |
| 
 | |
| def get_device_kind(handle, devnumber, features=None):
 | |
| 	"""Reads a device's type.
 | |
| 
 | |
| 	:see DEVICE_KIND:
 | |
| 	:returns: a string describing the device type, or ``None`` if the device is
 | |
| 	not available or does not support the ``NAME`` feature.
 | |
| 	"""
 | |
| 	name_fi = _get_feature_index(handle, devnumber, FEATURE.NAME, features)
 | |
| 	if name_fi is None:
 | |
| 		return None
 | |
| 
 | |
| 	d_kind = _base.request(handle, devnumber, _pack('!BB', name_fi, 0x20))
 | |
| 	if d_kind:
 | |
| 		d_kind = ord(d_kind[:1])
 | |
| 		# _log.debug("device %d type %d = %s", devnumber, d_kind, DEVICE_KIND[d_kind])
 | |
| 		return DEVICE_KIND[d_kind]
 | |
| 
 | |
| 
 | |
| def get_device_name(handle, devnumber, features=None):
 | |
| 	"""Reads a device's name.
 | |
| 
 | |
| 	:returns: a string with the device name, or ``None`` if the device is not
 | |
| 	available or does not support the ``NAME`` feature.
 | |
| 	"""
 | |
| 	name_fi = _get_feature_index(handle, devnumber, FEATURE.NAME, features)
 | |
| 	if name_fi is None:
 | |
| 		return None
 | |
| 
 | |
| 	name_length = _base.request(handle, devnumber, _pack('!BB', name_fi, 0x00))
 | |
| 	if name_length:
 | |
| 		name_length = ord(name_length[:1])
 | |
| 
 | |
| 		d_name = b''
 | |
| 		while len(d_name) < name_length:
 | |
| 			name_fragment = _base.request(handle, devnumber, _pack('!BB', name_fi, 0x10), len(d_name))
 | |
| 			if name_fragment:
 | |
| 				name_fragment = name_fragment[:name_length - len(d_name)]
 | |
| 				d_name += name_fragment
 | |
| 			else:
 | |
| 				break
 | |
| 
 | |
| 		d_name = d_name.decode('ascii')
 | |
| 		# _log.debug("device %d name %s", devnumber, d_name)
 | |
| 		return d_name
 | |
| 
 | |
| 
 | |
| def get_device_battery_level(handle, devnumber, features=None):
 | |
| 	"""Reads a device's battery level.
 | |
| 
 | |
| 	:raises FeatureNotSupported: if the device does not support this feature.
 | |
| 	"""
 | |
| 	bat_fi = _get_feature_index(handle, devnumber, FEATURE.BATTERY, features)
 | |
| 	if bat_fi is not None:
 | |
| 		battery = _base.request(handle, devnumber, _pack('!BB', bat_fi, 0))
 | |
| 		if battery:
 | |
| 			discharge, dischargeNext, status = _unpack('!BBB', battery[:3])
 | |
| 			_log.debug("device %d battery %d%% charged, next level %d%% charge, status %d = %s",
 | |
| 						devnumber, discharge, dischargeNext, status, BATTERY_STATUS[status])
 | |
| 			return (discharge, dischargeNext, BATTERY_STATUS[status])
 | |
| 
 | |
| 
 | |
| def get_device_keys(handle, devnumber, features=None):
 | |
| 	rk_fi = _get_feature_index(handle, devnumber, FEATURE.REPROGRAMMABLE_KEYS, features)
 | |
| 	if rk_fi is None:
 | |
| 		return None
 | |
| 
 | |
| 	count = _base.request(handle, devnumber, _pack('!BB', rk_fi, 0))
 | |
| 	if count:
 | |
| 		keys = []
 | |
| 
 | |
| 		count = ord(count[:1])
 | |
| 		for index in range(0, count):
 | |
| 			keydata = _base.request(handle, devnumber, _pack('!BB', rk_fi, 0x10), index)
 | |
| 			if keydata:
 | |
| 				key, key_task, flags = _unpack('!HHB', keydata[:5])
 | |
| 				rki = _ReprogrammableKeyInfo(index, key, KEY_NAME[key], key_task, KEY_NAME[key_task], flags)
 | |
| 				keys.append(rki)
 | |
| 
 | |
| 		return keys
 |