258 lines
		
	
	
		
			8.0 KiB
		
	
	
	
		
			Python
		
	
	
	
			
		
		
	
	
			258 lines
		
	
	
		
			8.0 KiB
		
	
	
	
		
			Python
		
	
	
	
| #
 | |
| # Handles incoming events from the receiver/devices, updating the related
 | |
| # status object as appropiate.
 | |
| #
 | |
| 
 | |
| from __future__ import absolute_import, division, print_function, unicode_literals
 | |
| 
 | |
| from logging import getLogger, DEBUG as _DEBUG, INFO as _INFO
 | |
| _log = getLogger('LUR.notifications')
 | |
| del getLogger
 | |
| 
 | |
| from .common import strhex as _strhex, unpack as _unpack
 | |
| from . import hidpp10 as _hidpp10
 | |
| from . import hidpp20 as _hidpp20
 | |
| from .status import KEYS as _K, ALERT as _ALERT
 | |
| 
 | |
| _R = _hidpp10.REGISTERS
 | |
| _F = _hidpp20.FEATURE
 | |
| 
 | |
| #
 | |
| #
 | |
| #
 | |
| 
 | |
| def process(device, notification):
 | |
| 	assert device
 | |
| 	assert notification
 | |
| 
 | |
| 	assert hasattr(device, 'status')
 | |
| 	status = device.status
 | |
| 	assert status is not None
 | |
| 
 | |
| 	if device.kind is None:
 | |
| 		return _process_receiver_notification(device, status, notification)
 | |
| 
 | |
| 	return _process_device_notification(device, status, notification)
 | |
| 
 | |
| #
 | |
| #
 | |
| #
 | |
| 
 | |
| def _process_receiver_notification(receiver, status, n):
 | |
| 	# supposedly only 0x4x notifications arrive for the receiver
 | |
| 	assert n.sub_id & 0x40 == 0x40
 | |
| 
 | |
| 	# pairing lock notification
 | |
| 	if n.sub_id == 0x4A:
 | |
| 		status.lock_open = bool(n.address & 0x01)
 | |
| 		reason = 'pairing lock is ' + ('open' if status.lock_open else 'closed')
 | |
| 		if _log.isEnabledFor(_INFO):
 | |
| 			_log.info("%s: %s", receiver, reason)
 | |
| 
 | |
| 		status[_K.ERROR] = None
 | |
| 		if status.lock_open:
 | |
| 			status.new_device = None
 | |
| 
 | |
| 		pair_error = ord(n.data[:1])
 | |
| 		if pair_error:
 | |
| 			status[_K.ERROR] = error_string = _hidpp10.PAIRING_ERRORS[pair_error]
 | |
| 			status.new_device = None
 | |
| 			_log.warn("pairing error %d: %s", pair_error, error_string)
 | |
| 
 | |
| 		status.changed(reason=reason)
 | |
| 		return True
 | |
| 
 | |
| 	_log.warn("%s: unhandled notification %s", receiver, n)
 | |
| 
 | |
| #
 | |
| #
 | |
| #
 | |
| 
 | |
| def _process_device_notification(device, status, n):
 | |
| 	# incoming packets with SubId >= 0x80 are supposedly replies from
 | |
| 	# HID++ 1.0 requests, should never get here
 | |
| 	assert n.sub_id & 0x80 == 0
 | |
| 
 | |
| 	# 0x40 to 0x7F appear to be HID++ 1.0 notifications
 | |
| 	if n.sub_id >= 0x40:
 | |
| 		return _process_hidpp10_notification(device, status, n)
 | |
| 
 | |
| 	# At this point, we need to know the device's protocol, otherwise it's
 | |
| 	# possible to not know how to handle it.
 | |
| 	assert device.protocol is not None
 | |
| 
 | |
| 	# some custom battery events for HID++ 1.0 devices
 | |
| 	if device.protocol < 2.0:
 | |
| 		return _process_hidpp10_custom_notification(device, status, n)
 | |
| 
 | |
| 	# assuming 0x00 to 0x3F are feature (HID++ 2.0) notifications
 | |
| 	assert device.features
 | |
| 	try:
 | |
| 		feature = device.features[n.sub_id]
 | |
| 	except IndexError:
 | |
| 		_log.warn("%s: notification from invalid feature index %02X: %s", device, n.sub_id, n)
 | |
| 		return False
 | |
| 
 | |
| 	return _process_feature_notification(device, status, n, feature)
 | |
| 
 | |
| 
 | |
| def _process_hidpp10_custom_notification(device, status, n):
 | |
| 	if _log.isEnabledFor(_DEBUG):
 | |
| 		_log.debug("%s (%s) custom notification %s", device, device.protocol, n)
 | |
| 
 | |
| 	if n.sub_id in (_R.battery_status, _R.battery_charge):
 | |
| 		# message layout: 10 ix <register> <xx> <yy> <zz> <00>
 | |
| 		assert n.data[-1:] == b'\x00'
 | |
| 		data = chr(n.address).encode() + n.data
 | |
| 		charge, status_text = _hidpp10.parse_battery_status(n.sub_id, data)
 | |
| 		status.set_battery_info(charge, status_text)
 | |
| 		return True
 | |
| 
 | |
| 	if n.sub_id == _R.illumination:
 | |
| 		# message layout: 10 ix 17("address")  <??> <?> <??> <light level 1=off..5=max>
 | |
| 		# TODO anything we can do with this?
 | |
| 		if _log.isEnabledFor(_INFO):
 | |
| 			_log.info("illumination event: %s", n)
 | |
| 		return True
 | |
| 
 | |
| 	_log.warn("%s: unrecognized %s", device, n)
 | |
| 
 | |
| 
 | |
| def _process_hidpp10_notification(device, status, n):
 | |
| 	# unpair notification
 | |
| 	if n.sub_id == 0x40:
 | |
| 		if n.address == 0x02:
 | |
| 			# device un-paired
 | |
| 			status.clear()
 | |
| 			device.wpid = None
 | |
| 			device.status = None
 | |
| 			if device.number in device.receiver:
 | |
| 				del device.receiver[device.number]
 | |
| 			status.changed(active=False, alert=_ALERT.ALL, reason='unpaired')
 | |
| 		else:
 | |
| 			_log.warn("%s: disconnection with unknown type %02X: %s", device, n.address, n)
 | |
| 		return True
 | |
| 
 | |
| 	# wireless link notification
 | |
| 	if n.sub_id == 0x41:
 | |
| 		protocol_name = ('unifying (eQuad DJ)' if n.address == 0x04
 | |
| 					else 'eQuad' if n.address == 0x03
 | |
| 					else None)
 | |
| 		if protocol_name:
 | |
| 			if _log.isEnabledFor(_DEBUG):
 | |
| 				wpid = _strhex(n.data[2:3] + n.data[1:2])
 | |
| 				assert wpid == device.wpid, "%s wpid mismatch, got %s" % (device, wpid)
 | |
| 
 | |
| 			flags = ord(n.data[:1]) & 0xF0
 | |
| 			link_encrypyed = bool(flags & 0x20)
 | |
| 			link_established = not (flags & 0x40)
 | |
| 			if _log.isEnabledFor(_DEBUG):
 | |
| 				sw_present = bool(flags & 0x10)
 | |
| 				has_payload = bool(flags & 0x80)
 | |
| 				_log.debug("%s: %s connection notification: software=%s, encrypted=%s, link=%s, payload=%s",
 | |
| 							device, protocol_name, sw_present, link_encrypyed, link_established, has_payload)
 | |
| 			status[_K.LINK_ENCRYPTED] = link_encrypyed
 | |
| 			status.changed(active=link_established)
 | |
| 		else:
 | |
| 			_log.warn("%s: connection notification with unknown protocol %02X: %s", device.number, n.address, n)
 | |
| 
 | |
| 		return True
 | |
| 
 | |
| 	if n.sub_id == 0x49:
 | |
| 		# raw input event? just ignore it
 | |
| 		# if n.address == 0x01, no idea what it is, but they keep on coming
 | |
| 		# if n.address == 0x03, appears to be an actual input event,
 | |
| 		#     because they only come when input happents
 | |
| 		return True
 | |
| 
 | |
| 	# power notification
 | |
| 	if n.sub_id == 0x4B:
 | |
| 		if n.address == 0x01:
 | |
| 			if _log.isEnabledFor(_DEBUG):
 | |
| 				_log.debug("%s: device powered on", device)
 | |
| 			reason = str(status) or 'powered on'
 | |
| 			status.changed(active=True, alert=_ALERT.NOTIFICATION, reason=reason)
 | |
| 		else:
 | |
| 			_log.warn("%s: unknown %s", device, n)
 | |
| 		return True
 | |
| 
 | |
| 	_log.warn("%s: unrecognized %s", device, n)
 | |
| 
 | |
| 
 | |
| def _process_feature_notification(device, status, n, feature):
 | |
| 	if feature == _F.BATTERY_STATUS:
 | |
| 		if n.address == 0x00:
 | |
| 			discharge = ord(n.data[:1])
 | |
| 			battery_status = ord(n.data[1:2])
 | |
| 			status.set_battery_info(discharge, _hidpp20.BATTERY_STATUS[battery_status])
 | |
| 		else:
 | |
| 			_log.warn("%s: unknown BATTERY %s", device, n)
 | |
| 		return True
 | |
| 
 | |
| 	# TODO: what are REPROG_CONTROLS_V{2,3}?
 | |
| 	if feature == _F.REPROG_CONTROLS:
 | |
| 		if n.address == 0x00:
 | |
| 			if _log.isEnabledFor(_INFO):
 | |
| 				_log.info("%s: reprogrammable key: %s", device, n)
 | |
| 		else:
 | |
| 			_log.warn("%s: unknown REPROGRAMMABLE KEYS %s", device, n)
 | |
| 		return True
 | |
| 
 | |
| 	if feature == _F.WIRELESS_DEVICE_STATUS:
 | |
| 		if n.address == 0x00:
 | |
| 			if _log.isEnabledFor(_DEBUG):
 | |
| 				_log.debug("wireless status: %s", n)
 | |
| 			if n.data[0:3] == b'\x01\x01\x01':
 | |
| 				status.changed(active=True, alert=_ALERT.NOTIFICATION, reason='powered on')
 | |
| 			else:
 | |
| 				_log.warn("%s: unknown WIRELESS %s", device, n)
 | |
| 		else:
 | |
| 			_log.warn("%s: unknown WIRELESS %s", device, n)
 | |
| 		return True
 | |
| 
 | |
| 	if feature == _F.SOLAR_DASHBOARD:
 | |
| 		if n.data[5:9] == b'GOOD':
 | |
| 			charge, lux, adc = _unpack('!BHH', n.data[:5])
 | |
| 			status[_K.BATTERY_LEVEL] = charge
 | |
| 			# guesstimate the battery voltage, emphasis on 'guess'
 | |
| 			status[_K.BATTERY_STATUS] = '%1.2fV' % (adc * 2.67793237653 / 0x0672)
 | |
| 			if n.address == 0x00:
 | |
| 				status[_K.LIGHT_LEVEL] = None
 | |
| 				status[_K.BATTERY_CHARGING] = None
 | |
| 				status.changed(active=True)
 | |
| 			elif n.address == 0x10:
 | |
| 				status[_K.LIGHT_LEVEL] = lux
 | |
| 				status[_K.BATTERY_CHARGING] = lux > 200
 | |
| 				status.changed(active=True)
 | |
| 			elif n.address == 0x20:
 | |
| 				if _log.isEnabledFor(_DEBUG):
 | |
| 					_log.debug("%s: Light Check button pressed", device)
 | |
| 				status.changed(alert=_ALERT.SHOW_WINDOW)
 | |
| 				# first cancel any reporting
 | |
| 				# device.feature_request(_F.SOLAR_DASHBOARD)
 | |
| 				# trigger a new report chain
 | |
| 				reports_count = 15
 | |
| 				reports_period = 2  # seconds
 | |
| 				device.feature_request(_F.SOLAR_DASHBOARD, 0x00, reports_count, reports_period)
 | |
| 			else:
 | |
| 				_log.warn("%s: unknown SOLAR CHAGE %s", device, n)
 | |
| 		else:
 | |
| 			_log.warn("%s: SOLAR CHARGE not GOOD? %s", device, n)
 | |
| 		return True
 | |
| 
 | |
| 	if feature == _F.TOUCHMOUSE_RAW_POINTS:
 | |
| 		if n.address == 0x00:
 | |
| 			if _log.isEnabledFor(_INFO):
 | |
| 				_log.info("%s: TOUCH MOUSE points %s", device, n)
 | |
| 		elif n.address == 0x10:
 | |
| 			touch = ord(n.data[:1])
 | |
| 			button_down = bool(touch & 0x02)
 | |
| 			mouse_lifted = bool(touch & 0x01)
 | |
| 			if _log.isEnabledFor(_INFO):
 | |
| 				_log.info("%s: TOUCH MOUSE status: button_down=%s mouse_lifted=%s", device, button_down, mouse_lifted)
 | |
| 		else:
 | |
| 			_log.warn("%s: unknown TOUCH MOUSE %s", device, n)
 | |
| 		return True
 | |
| 
 | |
| 	_log.warn("%s: unrecognized %s for feature %s (index %02X)", device, n, feature, n.sub_id)
 |