made hidconsole more user-friendly
This commit is contained in:
parent
30fedf418c
commit
33a9ca060d
|
@ -7,8 +7,10 @@ import sys
|
||||||
from select import select as _select
|
from select import select as _select
|
||||||
import time
|
import time
|
||||||
from binascii import hexlify, unhexlify
|
from binascii import hexlify, unhexlify
|
||||||
strhex = lambda d: hexlify(d).decode('ascii').upper()
|
|
||||||
|
|
||||||
|
#
|
||||||
|
#
|
||||||
|
#
|
||||||
|
|
||||||
interactive = os.isatty(0)
|
interactive = os.isatty(0)
|
||||||
start_time = 0
|
start_time = 0
|
||||||
|
@ -16,15 +18,24 @@ try: # python3 support
|
||||||
read_packet = raw_input
|
read_packet = raw_input
|
||||||
except:
|
except:
|
||||||
read_packet = input
|
read_packet = input
|
||||||
|
prompt = '?? Input: ' if interactive else ''
|
||||||
|
|
||||||
|
strhex = lambda d: hexlify(d).decode('ascii').upper()
|
||||||
|
|
||||||
|
#
|
||||||
|
#
|
||||||
|
#
|
||||||
|
|
||||||
from threading import Lock
|
from threading import Lock
|
||||||
print_lock = Lock()
|
print_lock = Lock()
|
||||||
|
|
||||||
|
|
||||||
def _print(marker, data, scroll=False):
|
def _print(marker, data, scroll=False):
|
||||||
t = time.time() - start_time
|
t = time.time() - start_time
|
||||||
hexs = strhex(data)
|
if type(data) == unicode:
|
||||||
s = '%s (% 8.3f) [%s %s %s %s] %s' % (marker, t, hexs[0:2], hexs[2:4], hexs[4:8], hexs[8:], repr(data))
|
s = marker + ' ' + data
|
||||||
|
else:
|
||||||
|
hexs = strhex(data)
|
||||||
|
s = '%s (% 8.3f) [%s %s %s %s] %s' % (marker, t, hexs[0:2], hexs[2:4], hexs[4:8], hexs[8:], repr(data))
|
||||||
|
|
||||||
print_lock.acquire()
|
print_lock.acquire()
|
||||||
|
|
||||||
|
@ -45,85 +56,151 @@ def _print(marker, data, scroll=False):
|
||||||
print_lock.release()
|
print_lock.release()
|
||||||
|
|
||||||
|
|
||||||
|
def _error(text, scroll=False):
|
||||||
|
_print("!!", text, scroll)
|
||||||
|
|
||||||
|
|
||||||
def _continuous_read(handle, timeout=2000):
|
def _continuous_read(handle, timeout=2000):
|
||||||
while True:
|
while True:
|
||||||
reply = hidapi.read(handle, 128, timeout)
|
try:
|
||||||
if reply is None:
|
reply = hidapi.read(handle, 128, timeout)
|
||||||
print ("!! Read failed, aborting")
|
except OSError as e:
|
||||||
|
_error("Read failed, aborting: " + str(e), True)
|
||||||
break
|
break
|
||||||
elif reply:
|
assert reply is not None
|
||||||
_print('>>', reply, True)
|
if reply:
|
||||||
|
_print(">>", reply, True)
|
||||||
|
|
||||||
|
|
||||||
|
def _validate_input(line, hidpp=False):
|
||||||
|
try:
|
||||||
|
data = unhexlify(line.encode('ascii'))
|
||||||
|
except Exception as e:
|
||||||
|
_error("Invalid input: " + str(e))
|
||||||
|
return None
|
||||||
|
|
||||||
|
if hidpp:
|
||||||
|
if len(data) < 4:
|
||||||
|
_error("Invalid HID++ request: need at least 4 bytes")
|
||||||
|
return None
|
||||||
|
if data[:1] not in b'\x10\x11':
|
||||||
|
_error("Invalid HID++ request: first byte must be 0x10 or 0x11")
|
||||||
|
return None
|
||||||
|
if data[1:2] not in b'\xFF\x01\x02\x03\x04\x05\x06':
|
||||||
|
_error("Invalid HID++ request: second byte must be 0xFF or one of 0x01..0x06")
|
||||||
|
return None
|
||||||
|
if data[:1] == b'\x10':
|
||||||
|
if len(data) > 7:
|
||||||
|
_error("Invalid HID++ request: maximum length of a 0x10 request is 7 bytes")
|
||||||
|
return None
|
||||||
|
while len(data) < 7:
|
||||||
|
data = (data + b'\x00' * 7)[:7]
|
||||||
|
elif data[:1] == b'\x11':
|
||||||
|
if len(data) > 20:
|
||||||
|
_error("Invalid HID++ request: maximum length of a 0x11 request is 20 bytes")
|
||||||
|
return None
|
||||||
|
while len(data) < 20:
|
||||||
|
data = (data + b'\x00' * 20)[:20]
|
||||||
|
|
||||||
|
return data
|
||||||
|
|
||||||
|
def _open(device, hidpp):
|
||||||
|
if hidpp and not device:
|
||||||
|
for d in hidapi.enumerate(vendor_id=0x046d):
|
||||||
|
if d.driver == 'logitech-djreceiver':
|
||||||
|
device = d.path
|
||||||
|
break
|
||||||
|
if not device:
|
||||||
|
sys.exit("!! No HID++ receiver found.")
|
||||||
|
if not device:
|
||||||
|
sys.exit("!! Device path required.")
|
||||||
|
|
||||||
|
print (".. Opening device %s" % device)
|
||||||
|
handle = hidapi.open_path(device)
|
||||||
|
if not handle:
|
||||||
|
sys.exit("!! Failed to open %s, aborting." % device)
|
||||||
|
|
||||||
|
print (".. Opened handle %s, vendor %s product %s serial %s." % (
|
||||||
|
repr(handle),
|
||||||
|
repr(hidapi.get_manufacturer(handle)),
|
||||||
|
repr(hidapi.get_product(handle)),
|
||||||
|
repr(hidapi.get_serial(handle))))
|
||||||
|
if args.hidpp:
|
||||||
|
if hidapi.get_manufacturer(handle) != 'Logitech':
|
||||||
|
sys.exit("!! Only Logitech devices support the HID++ protocol.")
|
||||||
|
print (".. HID++ validation enabled.")
|
||||||
|
|
||||||
|
return handle
|
||||||
|
|
||||||
|
#
|
||||||
|
#
|
||||||
|
#
|
||||||
|
|
||||||
if __name__ == '__main__':
|
if __name__ == '__main__':
|
||||||
import argparse
|
import argparse
|
||||||
arg_parser = argparse.ArgumentParser()
|
arg_parser = argparse.ArgumentParser()
|
||||||
arg_parser.add_argument('--history', help='history file')
|
arg_parser.add_argument('--history', help='history file (default ~/.hidconsole-history)')
|
||||||
arg_parser.add_argument('device', default=None, help='linux device to connect to')
|
arg_parser.add_argument('--hidpp', action='store_true', help='ensure input data is a valid HID++ request')
|
||||||
|
arg_parser.add_argument('device', nargs='?', help='linux device to connect to (/dev/hidrawX); '
|
||||||
|
'may be omitted if --hidpp is given, in which case it looks for the first Logitech receiver')
|
||||||
args = arg_parser.parse_args()
|
args = arg_parser.parse_args()
|
||||||
|
|
||||||
import hidapi
|
import hidapi
|
||||||
print (".. Opening device %s" % args.device)
|
handle = _open(args.device, args.hidpp)
|
||||||
handle = hidapi.open_path(args.device)
|
|
||||||
if handle:
|
|
||||||
print (".. Opened handle %s, vendor %s product %s serial %s" % (
|
|
||||||
repr(handle),
|
|
||||||
repr(hidapi.get_manufacturer(handle)),
|
|
||||||
repr(hidapi.get_product(handle)),
|
|
||||||
repr(hidapi.get_serial(handle))))
|
|
||||||
if interactive:
|
|
||||||
print (".. Press ^C/^D to exit, or type hex bytes to write to the device.")
|
|
||||||
|
|
||||||
import readline
|
if interactive:
|
||||||
if args.history is None:
|
print (".. Press ^C/^D to exit, or type hex bytes to write to the device.")
|
||||||
import os.path
|
|
||||||
args.history = os.path.join(os.path.expanduser("~"), ".hidconsole-history")
|
|
||||||
try:
|
|
||||||
readline.read_history_file(args.history)
|
|
||||||
except:
|
|
||||||
# file may not exist yet
|
|
||||||
pass
|
|
||||||
|
|
||||||
start_time = time.time()
|
|
||||||
|
|
||||||
|
import readline
|
||||||
|
if args.history is None:
|
||||||
|
import os.path
|
||||||
|
args.history = os.path.join(os.path.expanduser("~"), ".hidconsole-history")
|
||||||
try:
|
try:
|
||||||
from threading import Thread
|
readline.read_history_file(args.history)
|
||||||
t = Thread(target=_continuous_read, args=(handle,))
|
except:
|
||||||
t.daemon = True
|
# file may not exist yet
|
||||||
t.start()
|
|
||||||
|
|
||||||
prompt = '?? Input: ' if interactive else ''
|
|
||||||
if interactive:
|
|
||||||
# move the cursor at the bottom of the screen
|
|
||||||
sys.stdout.write('\033[300B') # move cusor at most 300 lines down, don't scroll
|
|
||||||
|
|
||||||
while t.is_alive():
|
|
||||||
line = read_packet(prompt).strip().replace(' ', '')
|
|
||||||
if line:
|
|
||||||
try:
|
|
||||||
data = unhexlify(line.encode('ascii'))
|
|
||||||
except Exception as e:
|
|
||||||
print ("!! Invalid input.")
|
|
||||||
else:
|
|
||||||
_print('<<', data)
|
|
||||||
hidapi.write(handle, data)
|
|
||||||
# wait for some kind of reply
|
|
||||||
if not interactive:
|
|
||||||
if data[1:2] == b'\xFF':
|
|
||||||
# the receiver will reply very fast, in a few milliseconds
|
|
||||||
time.sleep(0.010)
|
|
||||||
else:
|
|
||||||
# the devices might reply quite slow
|
|
||||||
rlist, wlist, xlist = _select([handle], [], [], 1)
|
|
||||||
time.sleep(1)
|
|
||||||
except EOFError:
|
|
||||||
pass
|
pass
|
||||||
except Exception as e:
|
|
||||||
print ('%s: %s' % (type(e).__name__, e))
|
|
||||||
|
|
||||||
print (".. Closing handle %s" % repr(handle))
|
start_time = time.time()
|
||||||
hidapi.close(handle)
|
|
||||||
|
try:
|
||||||
|
from threading import Thread
|
||||||
|
t = Thread(target=_continuous_read, args=(handle,))
|
||||||
|
t.daemon = True
|
||||||
|
t.start()
|
||||||
|
|
||||||
if interactive:
|
if interactive:
|
||||||
readline.write_history_file(args.history)
|
# move the cursor at the bottom of the screen
|
||||||
else:
|
sys.stdout.write('\033[300B') # move cusor at most 300 lines down, don't scroll
|
||||||
print ("!! Failed to open %s, aborting" % args.device)
|
|
||||||
|
while t.is_alive():
|
||||||
|
line = read_packet(prompt)
|
||||||
|
line = line.strip().replace(' ', '')
|
||||||
|
if not line:
|
||||||
|
continue
|
||||||
|
|
||||||
|
data = _validate_input(line, args.hidpp)
|
||||||
|
if data is None:
|
||||||
|
continue
|
||||||
|
|
||||||
|
_print("<<", data)
|
||||||
|
hidapi.write(handle, data)
|
||||||
|
# wait for some kind of reply
|
||||||
|
if args.hidpp and not interactive:
|
||||||
|
if data[1:2] == b'\xFF':
|
||||||
|
# the receiver will reply very fast, in a few milliseconds
|
||||||
|
time.sleep(0.010)
|
||||||
|
else:
|
||||||
|
# the devices might reply quite slow
|
||||||
|
rlist, wlist, xlist = _select([handle], [], [], 1)
|
||||||
|
time.sleep(1)
|
||||||
|
except EOFError:
|
||||||
|
if interactive:
|
||||||
|
print ("")
|
||||||
|
except Exception as e:
|
||||||
|
print ('%s: %s' % (type(e).__name__, e))
|
||||||
|
|
||||||
|
print (".. Closing handle %s" % repr(handle))
|
||||||
|
hidapi.close(handle)
|
||||||
|
if interactive:
|
||||||
|
readline.write_history_file(args.history)
|
||||||
|
|
Loading…
Reference in New Issue