207 lines
		
	
	
		
			5.3 KiB
		
	
	
	
		
			Python
		
	
	
	
			
		
		
	
	
			207 lines
		
	
	
		
			5.3 KiB
		
	
	
	
		
			Python
		
	
	
	
| #!/usr/bin/env python -u
 | |
| 
 | |
| from __future__ import absolute_import, division, print_function, unicode_literals
 | |
| 
 | |
| import os
 | |
| import sys
 | |
| from select import select as _select
 | |
| import time
 | |
| from binascii import hexlify, unhexlify
 | |
| 
 | |
| #
 | |
| #
 | |
| #
 | |
| 
 | |
| interactive = os.isatty(0)
 | |
| start_time = 0
 | |
| try:  # python3 support
 | |
| 	read_packet = raw_input
 | |
| except:
 | |
| 	read_packet = input
 | |
| prompt = '?? Input: ' if interactive else ''
 | |
| 
 | |
| strhex = lambda d: hexlify(d).decode('ascii').upper()
 | |
| 
 | |
| #
 | |
| #
 | |
| #
 | |
| 
 | |
| from threading import Lock
 | |
| print_lock = Lock()
 | |
| 
 | |
| def _print(marker, data, scroll=False):
 | |
| 	t = time.time() - start_time
 | |
| 	if type(data) == unicode:
 | |
| 		s = marker + ' ' + data
 | |
| 	else:
 | |
| 		hexs = strhex(data)
 | |
| 		s = '%s (% 8.3f) [%s %s %s %s] %s' % (marker, t, hexs[0:2], hexs[2:4], hexs[4:8], hexs[8:], repr(data))
 | |
| 
 | |
| 	print_lock.acquire()
 | |
| 
 | |
| 	if interactive and scroll:
 | |
| 		# scroll the entire screen above the current line up by 1 line
 | |
| 		sys.stdout.write('\033[s'  # save cursor position
 | |
| 						'\033[S'  # scroll up
 | |
| 						'\033[A'   # cursor up
 | |
| 						'\033[L'    # insert 1 line
 | |
| 						'\033[G')   # move cursor to column 1
 | |
| 	sys.stdout.write(s)
 | |
| 	if interactive and scroll:
 | |
| 		# restore cursor position
 | |
| 		sys.stdout.write('\033[u')
 | |
| 	else:
 | |
| 		sys.stdout.write('\n')
 | |
| 
 | |
| 	print_lock.release()
 | |
| 
 | |
| 
 | |
| def _error(text, scroll=False):
 | |
| 	_print("!!", text, scroll)
 | |
| 
 | |
| 
 | |
| def _continuous_read(handle, timeout=2000):
 | |
| 	while True:
 | |
| 		try:
 | |
| 			reply = hidapi.read(handle, 128, timeout)
 | |
| 		except OSError as e:
 | |
| 			_error("Read failed, aborting: " + str(e), True)
 | |
| 			break
 | |
| 		assert reply is not None
 | |
| 		if reply:
 | |
| 			_print(">>", reply, True)
 | |
| 
 | |
| 
 | |
| def _validate_input(line, hidpp=False):
 | |
| 	try:
 | |
| 		data = unhexlify(line.encode('ascii'))
 | |
| 	except Exception as e:
 | |
| 		_error("Invalid input: " + str(e))
 | |
| 		return None
 | |
| 
 | |
| 	if hidpp:
 | |
| 		if len(data) < 4:
 | |
| 			_error("Invalid HID++ request: need at least 4 bytes")
 | |
| 			return None
 | |
| 		if data[:1] not in b'\x10\x11':
 | |
| 			_error("Invalid HID++ request: first byte must be 0x10 or 0x11")
 | |
| 			return None
 | |
| 		if data[1:2] not in b'\xFF\x01\x02\x03\x04\x05\x06':
 | |
| 			_error("Invalid HID++ request: second byte must be 0xFF or one of 0x01..0x06")
 | |
| 			return None
 | |
| 		if data[:1] == b'\x10':
 | |
| 			if len(data) > 7:
 | |
| 				_error("Invalid HID++ request: maximum length of a 0x10 request is 7 bytes")
 | |
| 				return None
 | |
| 			while len(data) < 7:
 | |
| 				data = (data + b'\x00' * 7)[:7]
 | |
| 		elif data[:1] == b'\x11':
 | |
| 			if len(data) > 20:
 | |
| 				_error("Invalid HID++ request: maximum length of a 0x11 request is 20 bytes")
 | |
| 				return None
 | |
| 			while len(data) < 20:
 | |
| 				data = (data + b'\x00' * 20)[:20]
 | |
| 
 | |
| 	return data
 | |
| 
 | |
| def _open(device, hidpp):
 | |
| 	if hidpp and not device:
 | |
| 		for d in hidapi.enumerate(vendor_id=0x046d):
 | |
| 			if d.driver == 'logitech-djreceiver':
 | |
| 				device = d.path
 | |
| 				break
 | |
| 		if not device:
 | |
| 			sys.exit("!! No HID++ receiver found.")
 | |
| 	if not device:
 | |
| 		sys.exit("!! Device path required.")
 | |
| 
 | |
| 	print (".. Opening device", device)
 | |
| 	handle = hidapi.open_path(device)
 | |
| 	if not handle:
 | |
| 		sys.exit("!! Failed to open %s, aborting." % device)
 | |
| 
 | |
| 	print (".. Opened handle %s, vendor %s product %s serial %s." % (
 | |
| 					repr(handle),
 | |
| 					repr(hidapi.get_manufacturer(handle)),
 | |
| 					repr(hidapi.get_product(handle)),
 | |
| 					repr(hidapi.get_serial(handle))))
 | |
| 	if args.hidpp:
 | |
| 		if hidapi.get_manufacturer(handle) != 'Logitech':
 | |
| 			sys.exit("!! Only Logitech devices support the HID++ protocol.")
 | |
| 		print (".. HID++ validation enabled.")
 | |
| 
 | |
| 	return handle
 | |
| 
 | |
| #
 | |
| #
 | |
| #
 | |
| 
 | |
| if __name__ == '__main__':
 | |
| 	import argparse
 | |
| 	arg_parser = argparse.ArgumentParser()
 | |
| 	arg_parser.add_argument('--history', help='history file (default ~/.hidconsole-history)')
 | |
| 	arg_parser.add_argument('--hidpp', action='store_true', help='ensure input data is a valid HID++ request')
 | |
| 	arg_parser.add_argument('device', nargs='?', help='linux device to connect to (/dev/hidrawX); '
 | |
| 							'may be omitted if --hidpp is given, in which case it looks for the first Logitech receiver')
 | |
| 	args = arg_parser.parse_args()
 | |
| 
 | |
| 	import hidapi
 | |
| 	handle = _open(args.device, args.hidpp)
 | |
| 
 | |
| 	if interactive:
 | |
| 		print (".. Press ^C/^D to exit, or type hex bytes to write to the device.")
 | |
| 
 | |
| 		import readline
 | |
| 		if args.history is None:
 | |
| 			import os.path
 | |
| 			args.history = os.path.join(os.path.expanduser("~"), ".hidconsole-history")
 | |
| 		try:
 | |
| 			readline.read_history_file(args.history)
 | |
| 		except:
 | |
| 			# file may not exist yet
 | |
| 			pass
 | |
| 
 | |
| 	start_time = time.time()
 | |
| 
 | |
| 	try:
 | |
| 		from threading import Thread
 | |
| 		t = Thread(target=_continuous_read, args=(handle,))
 | |
| 		t.daemon = True
 | |
| 		t.start()
 | |
| 
 | |
| 		if interactive:
 | |
| 			# move the cursor at the bottom of the screen
 | |
| 			sys.stdout.write('\033[300B')  # move cusor at most 300 lines down, don't scroll
 | |
| 
 | |
| 		while t.is_alive():
 | |
| 			line = read_packet(prompt)
 | |
| 			line = line.strip().replace(' ', '')
 | |
| 			if not line:
 | |
| 				continue
 | |
| 
 | |
| 			data = _validate_input(line, args.hidpp)
 | |
| 			if data is None:
 | |
| 				continue
 | |
| 
 | |
| 			_print("<<", data)
 | |
| 			hidapi.write(handle, data)
 | |
| 			# wait for some kind of reply
 | |
| 			if args.hidpp and not interactive:
 | |
| 				if data[1:2] == b'\xFF':
 | |
| 					# the receiver will reply very fast, in a few milliseconds
 | |
| 					time.sleep(0.010)
 | |
| 				else:
 | |
| 					# the devices might reply quite slow
 | |
| 					rlist, wlist, xlist = _select([handle], [], [], 1)
 | |
| 					time.sleep(1)
 | |
| 	except EOFError:
 | |
| 		if interactive:
 | |
| 			print ("")
 | |
| 	except Exception as e:
 | |
| 		print ('%s: %s' % (type(e).__name__, e))
 | |
| 
 | |
| 	print (".. Closing handle", repr(handle))
 | |
| 	hidapi.close(handle)
 | |
| 	if interactive:
 | |
| 		readline.write_history_file(args.history)
 |