278 lines
8.4 KiB
Python
278 lines
8.4 KiB
Python
# -*- python-mode -*-
|
|
# -*- coding: UTF-8 -*-
|
|
|
|
## Copyright (C) 2012-2013 Daniel Pavel
|
|
##
|
|
## This program is free software; you can redistribute it and/or modify
|
|
## it under the terms of the GNU General Public License as published by
|
|
## the Free Software Foundation; either version 2 of the License, or
|
|
## (at your option) any later version.
|
|
##
|
|
## This program is distributed in the hope that it will be useful,
|
|
## but WITHOUT ANY WARRANTY; without even the implied warranty of
|
|
## MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
|
## GNU General Public License for more details.
|
|
##
|
|
## You should have received a copy of the GNU General Public License along
|
|
## with this program; if not, write to the Free Software Foundation, Inc.,
|
|
## 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
|
|
|
|
from __future__ import absolute_import, division, print_function, unicode_literals
|
|
|
|
import os
|
|
import sys
|
|
from select import select as _select
|
|
import time
|
|
from binascii import hexlify, unhexlify
|
|
import hidapi as _hid
|
|
|
|
#
|
|
#
|
|
#
|
|
|
|
try:
|
|
read_packet = raw_input
|
|
except NameError:
|
|
# Python 3 equivalent of raw_input
|
|
read_packet = input
|
|
|
|
interactive = os.isatty(0)
|
|
prompt = '?? Input: ' if interactive else ''
|
|
start_time = time.time()
|
|
|
|
strhex = lambda d: hexlify(d).decode('ascii').upper()
|
|
try:
|
|
unicode
|
|
# this is certanly Python 2
|
|
is_string = lambda d: isinstance(d, unicode)
|
|
# no easy way to distinguish between b'' and '' :(
|
|
# or (isinstance(d, str) \
|
|
# and not any((chr(k) in d for k in range(0x00, 0x1F))) \
|
|
# and not any((chr(k) in d for k in range(0x80, 0xFF))) \
|
|
# )
|
|
except:
|
|
# this is certanly Python 3
|
|
# In Py3, unicode and str are equal (the unicode object does not exist)
|
|
is_string = lambda d: isinstance(d, str)
|
|
|
|
#
|
|
#
|
|
#
|
|
|
|
from threading import Lock
|
|
print_lock = Lock()
|
|
del Lock
|
|
|
|
|
|
def _print(marker, data, scroll=False):
|
|
t = time.time() - start_time
|
|
if is_string(data):
|
|
s = marker + ' ' + data
|
|
else:
|
|
hexs = strhex(data)
|
|
s = '%s (% 8.3f) [%s %s %s %s] %s' % (marker, t, hexs[0:2], hexs[2:4],
|
|
hexs[4:8], hexs[8:], repr(data))
|
|
|
|
with print_lock:
|
|
# allow only one thread at a time to write to the console, otherwise
|
|
# the output gets garbled, especially with ANSI codes.
|
|
|
|
if interactive and scroll:
|
|
# scroll the entire screen above the current line up by 1 line
|
|
sys.stdout.write('\033[s' # save cursor position
|
|
'\033[S' # scroll up
|
|
'\033[A' # cursor up
|
|
'\033[L' # insert 1 line
|
|
'\033[G') # move cursor to column 1
|
|
sys.stdout.write(s)
|
|
if interactive and scroll:
|
|
# restore cursor position
|
|
sys.stdout.write('\033[u')
|
|
else:
|
|
sys.stdout.write('\n')
|
|
|
|
# flush stdout manually...
|
|
# because trying to open stdin/out unbuffered programmatically
|
|
# works much too differently in Python 2/3
|
|
sys.stdout.flush()
|
|
|
|
|
|
def _error(text, scroll=False):
|
|
_print('!!', text, scroll)
|
|
|
|
|
|
def _continuous_read(handle, timeout=2000):
|
|
while True:
|
|
try:
|
|
reply = _hid.read(handle, 128, timeout)
|
|
except OSError as e:
|
|
_error("Read failed, aborting: " + str(e), True)
|
|
break
|
|
assert reply is not None
|
|
if reply:
|
|
_print('>>', reply, True)
|
|
|
|
|
|
def _validate_input(line, hidpp=False):
|
|
try:
|
|
data = unhexlify(line.encode('ascii'))
|
|
except Exception as e:
|
|
_error("Invalid input: " + str(e))
|
|
return None
|
|
|
|
if hidpp:
|
|
if len(data) < 4:
|
|
_error("Invalid HID++ request: need at least 4 bytes")
|
|
return None
|
|
if data[:1] not in b'\x10\x11':
|
|
_error("Invalid HID++ request: first byte must be 0x10 or 0x11")
|
|
return None
|
|
if data[1:2] not in b'\xFF\x01\x02\x03\x04\x05\x06':
|
|
_error(
|
|
"Invalid HID++ request: second byte must be 0xFF or one of 0x01..0x06"
|
|
)
|
|
return None
|
|
if data[:1] == b'\x10':
|
|
if len(data) > 7:
|
|
_error(
|
|
"Invalid HID++ request: maximum length of a 0x10 request is 7 bytes"
|
|
)
|
|
return None
|
|
while len(data) < 7:
|
|
data = (data + b'\x00' * 7)[:7]
|
|
elif data[:1] == b'\x11':
|
|
if len(data) > 20:
|
|
_error(
|
|
"Invalid HID++ request: maximum length of a 0x11 request is 20 bytes"
|
|
)
|
|
return None
|
|
while len(data) < 20:
|
|
data = (data + b'\x00' * 20)[:20]
|
|
|
|
return data
|
|
|
|
|
|
def _open(args):
|
|
device = args.device
|
|
if args.hidpp and not device:
|
|
for d in _hid.enumerate(vendor_id=0x046d):
|
|
if d.driver == 'logitech-djreceiver':
|
|
device = d.path
|
|
break
|
|
if not device:
|
|
sys.exit("!! No HID++ receiver found.")
|
|
if not device:
|
|
sys.exit("!! Device path required.")
|
|
|
|
print(".. Opening device", device)
|
|
handle = _hid.open_path(device)
|
|
if not handle:
|
|
sys.exit("!! Failed to open %s, aborting." % device)
|
|
|
|
print(".. Opened handle %r, vendor %r product %r serial %r." %
|
|
(handle, _hid.get_manufacturer(handle), _hid.get_product(handle),
|
|
_hid.get_serial(handle)))
|
|
if args.hidpp:
|
|
if _hid.get_manufacturer(handle) != b'Logitech':
|
|
sys.exit("!! Only Logitech devices support the HID++ protocol.")
|
|
print(".. HID++ validation enabled.")
|
|
else:
|
|
if (_hid.get_manufacturer(handle) == b'Logitech'
|
|
and b'Receiver' in _hid.get_product(handle)):
|
|
args.hidpp = True
|
|
print(".. Logitech receiver detected, HID++ validation enabled.")
|
|
|
|
return handle
|
|
|
|
|
|
#
|
|
#
|
|
#
|
|
|
|
|
|
def _parse_arguments():
|
|
import argparse
|
|
arg_parser = argparse.ArgumentParser()
|
|
arg_parser.add_argument(
|
|
'--history', help="history file (default ~/.hidconsole-history)")
|
|
arg_parser.add_argument('--hidpp',
|
|
action='store_true',
|
|
help="ensure input data is a valid HID++ request")
|
|
arg_parser.add_argument(
|
|
'device',
|
|
nargs='?',
|
|
help="linux device to connect to (/dev/hidrawX); "
|
|
"may be omitted if --hidpp is given, in which case it looks for the first Logitech receiver"
|
|
)
|
|
return arg_parser.parse_args()
|
|
|
|
|
|
def main():
|
|
args = _parse_arguments()
|
|
handle = _open(args)
|
|
|
|
if interactive:
|
|
print(
|
|
".. Press ^C/^D to exit, or type hex bytes to write to the device."
|
|
)
|
|
|
|
import readline
|
|
if args.history is None:
|
|
import os.path
|
|
args.history = os.path.join(os.path.expanduser('~'),
|
|
'.hidconsole-history')
|
|
try:
|
|
readline.read_history_file(args.history)
|
|
except:
|
|
# file may not exist yet
|
|
pass
|
|
|
|
try:
|
|
from threading import Thread
|
|
t = Thread(target=_continuous_read, args=(handle, ))
|
|
t.daemon = True
|
|
t.start()
|
|
|
|
if interactive:
|
|
# move the cursor at the bottom of the screen
|
|
sys.stdout.write(
|
|
'\033[300B') # move cusor at most 300 lines down, don't scroll
|
|
|
|
while t.is_alive():
|
|
line = read_packet(prompt)
|
|
line = line.strip().replace(' ', '')
|
|
# print ("line", line)
|
|
if not line:
|
|
continue
|
|
|
|
data = _validate_input(line, args.hidpp)
|
|
if data is None:
|
|
continue
|
|
|
|
_print('<<', data)
|
|
_hid.write(handle, data)
|
|
# wait for some kind of reply
|
|
if args.hidpp and not interactive:
|
|
rlist, wlist, xlist = _select([handle], [], [], 1)
|
|
if data[1:2] == b'\xFF':
|
|
# the receiver will reply very fast, in a few milliseconds
|
|
time.sleep(0.010)
|
|
else:
|
|
# the devices might reply quite slow
|
|
time.sleep(0.700)
|
|
except EOFError:
|
|
if interactive:
|
|
print("")
|
|
else:
|
|
time.sleep(1)
|
|
|
|
finally:
|
|
print(".. Closing handle %r" % handle)
|
|
_hid.close(handle)
|
|
if interactive:
|
|
readline.write_history_file(args.history)
|
|
|
|
|
|
if __name__ == '__main__':
|
|
main()
|