Changed keyboard control code.

Changed keyboard control to some custom code which should work with all systems. Unfortunately occasionally it will not register inputs. Unsure why. Good enough for now.
This commit is contained in:
Tib3rius 2021-08-31 10:51:22 -04:00
parent b77422d7c0
commit 112268db43
2 changed files with 54 additions and 79 deletions

View File

@ -3,15 +3,11 @@ from datetime import datetime
from typing import final from typing import final
import traceback import traceback
import termios, tty import termios, tty
import select
try: try:
import colorama, toml, unidecode import colorama, toml, unidecode
from colorama import Fore, Style from colorama import Fore, Style
if os.getuid() == 0:
import keyboard
elif 'DISPLAY' in os.environ and (os.environ['DISPLAY'] != None or os.environ['DISPLAY'] != ''):
import pynput
except ModuleNotFoundError: except ModuleNotFoundError:
print('One or more required modules was not installed. Please run or re-run: ' + ('sudo ' if os.getuid() == 0 else '') + 'python3 -m pip install -r requirements.txt') print('One or more required modules was not installed. Please run or re-run: ' + ('sudo ' if os.getuid() == 0 else '') + 'python3 -m pip install -r requirements.txt')
sys.exit(1) sys.exit(1)
@ -196,7 +192,7 @@ class Service:
class CommandStreamReader(object): class CommandStreamReader(object):
def __init__(self, stream, target, tag,patterns=None, outfile=None): def __init__(self, stream, target, tag, patterns=None, outfile=None):
self.stream = stream self.stream = stream
self.target = target self.target = target
self.tag = tag self.tag = tag
@ -268,7 +264,6 @@ class Plugin(object):
def __init__(self): def __init__(self):
self.name = None self.name = None
self.slug = None self.slug = None
self.description = None
self.tags = ['default'] self.tags = ['default']
self.priority = 1 self.priority = 1
self.patterns = [] self.patterns = []
@ -577,9 +572,6 @@ class AutoRecon(object):
if plugin is loaded_plugin: if plugin is loaded_plugin:
fail('Error: plugin "' + plugin.name + '" in ' + filename + ' already loaded as "' + loaded_plugin.name + '" (' + str(loaded_plugin) + ')', file=sys.stderr) fail('Error: plugin "' + plugin.name + '" in ' + filename + ' already loaded as "' + loaded_plugin.name + '" (' + str(loaded_plugin) + ')', file=sys.stderr)
if plugin.description is None:
plugin.description = ''
configure_function_found = False configure_function_found = False
run_coroutine_found = False run_coroutine_found = False
manual_function_found = False manual_function_found = False
@ -788,9 +780,8 @@ def cancel_all_tasks(signal, frame):
pass pass
if not autorecon.config['disable_keyboard_control']: if not autorecon.config['disable_keyboard_control']:
if os.getuid() == 0 or ('DISPLAY' in os.environ and (os.environ['DISPLAY'] != None or os.environ['DISPLAY'] != '')): # Restore original terminal settings.
# Restore original terminal settings. termios.tcsetattr(sys.stdin.fileno(), termios.TCSADRAIN, terminal_settings)
termios.tcsetattr(sys.stdin.fileno(), termios.TCSADRAIN, terminal_settings)
def timeout(signal, frame): def timeout(signal, frame):
raise Exception("Function timed out.") raise Exception("Function timed out.")
@ -812,45 +803,55 @@ async def start_heartbeat(target, period=60):
elif count == 1: elif count == 1:
info('{bgreen}' + current_time + '{rst} - There is {byellow}1{rst} scan still running against {byellow}' + target.address + '{rst}' + tasks_list) info('{bgreen}' + current_time + '{rst} - There is {byellow}1{rst} scan still running against {byellow}' + target.address + '{rst}' + tasks_list)
def handle_keyboard(key): async def keyboard():
# We need to do this to support pynput and keyboard over SSH / Docker. input = ''
if os.getuid() != 0: while True:
key_conversion = {pynput.keyboard.Key.up:'up', pynput.keyboard.Key.down:'down', pynput.keyboard.KeyCode.from_char('s'):'s'} if select.select([sys.stdin],[],[],0.1)[0]:
if key in key_conversion.keys(): input += sys.stdin.buffer.read1(-1).decode('utf8')
key = key_conversion[key] while input != '':
if len(input) >= 3:
if input[:3] == '\x1b[A':
input = ''
if autorecon.config['verbose'] == 2:
info('Verbosity is already at the highest level.')
else:
autorecon.config['verbose'] += 1
info('Verbosity increased to ' + str(autorecon.config['verbose']))
elif input[:3] == '\x1b[B':
input = ''
if autorecon.config['verbose'] == 0:
info('Verbosity is already at the lowest level.')
else:
autorecon.config['verbose'] -= 1
info('Verbosity decreased to ' + str(autorecon.config['verbose']))
else:
if input[0] != 's':
input = input[1:]
if key == 'up': if len(input) > 0 and input[0] == 's':
if autorecon.config['verbose'] == 2: input = input[1:]
info('Verbosity is already at the highest level.') for target in autorecon.scanning_targets:
else: count = len(target.running_tasks)
autorecon.config['verbose'] += 1
info('Verbosity increased to ' + str(autorecon.config['verbose']))
elif key == 'down':
if autorecon.config['verbose'] == 0:
info('Verbosity is already at the lowest level.')
else:
autorecon.config['verbose'] -= 1
info('Verbosity decreased to ' + str(autorecon.config['verbose']))
elif key == 's':
for target in autorecon.scanning_targets:
count = len(target.running_tasks)
tasks_list = [] tasks_list = []
if target.autorecon.config['verbose'] >= 1: if target.autorecon.config['verbose'] >= 1:
for key, value in target.running_tasks.items(): for key, value in target.running_tasks.items():
elapsed_time = calculate_elapsed_time(value['start'], short=True) elapsed_time = calculate_elapsed_time(value['start'], short=True)
tasks_list.append('{bblue}' + key + '{rst}' + ' (elapsed: ' + elapsed_time + ')') tasks_list.append('{bblue}' + key + '{rst}' + ' (elapsed: ' + elapsed_time + ')')
tasks_list = ':\n ' + '\n '.join(tasks_list) tasks_list = ':\n ' + '\n '.join(tasks_list)
else: else:
tasks_list = '' tasks_list = ''
current_time = datetime.now().strftime('%H:%M:%S') current_time = datetime.now().strftime('%H:%M:%S')
if count > 1: if count > 1:
info('{bgreen}' + current_time + '{rst} - There are {byellow}' + str(count) + '{rst} scans still running against {byellow}' + target.address + '{rst}' + tasks_list) info('{bgreen}' + current_time + '{rst} - There are {byellow}' + str(count) + '{rst} scans still running against {byellow}' + target.address + '{rst}' + tasks_list)
elif count == 1: elif count == 1:
info('{bgreen}' + current_time + '{rst} - There is {byellow}1{rst} scan still running against {byellow}' + target.address + '{rst}' + tasks_list) info('{bgreen}' + current_time + '{rst} - There is {byellow}1{rst} scan still running against {byellow}' + target.address + '{rst}' + tasks_list)
else:
input = input[1:]
await asyncio.sleep(0.1)
async def port_scan(plugin, target): async def port_scan(plugin, target):
if autorecon.config['ports']: if autorecon.config['ports']:
@ -1078,6 +1079,7 @@ async def scan_target(target):
else: else:
error('No services were defined. Please check your service syntax: [tcp|udp]/<port>/<service-name>/[secure|insecure]') error('No services were defined. Please check your service syntax: [tcp|udp]/<port>/<service-name>/[secure|insecure]')
heartbeat.cancel() heartbeat.cancel()
keyboard_monitor.cancel()
autorecon.errors = True autorecon.errors = True
return return
else: else:
@ -1818,19 +1820,8 @@ async def main():
break break
if not autorecon.config['disable_keyboard_control']: if not autorecon.config['disable_keyboard_control']:
if os.getuid() == 0 or ('DISPLAY' in os.environ and (os.environ['DISPLAY'] != None or os.environ['DISPLAY'] != '')): tty.setcbreak(sys.stdin.fileno())
# This makes it possible to capture keypresses without <enter> and without displaying them. keyboard_monitor = asyncio.create_task(keyboard())
tty.setcbreak(sys.stdin.fileno())
if os.getuid() == 0:
keyboard.add_hotkey('s', lambda:handle_keyboard('s'))
keyboard.add_hotkey('up', lambda:handle_keyboard('up'))
keyboard.add_hotkey('down', lambda:handle_keyboard('down'))
else:
keyboard_monitor = pynput.keyboard.Listener(on_press=handle_keyboard)
keyboard_monitor.start()
else:
warn('No "DISPLAY" environment variable was found. This likely means you are in an SSH session or using Docker. Keyboard control features have been disabled. If you run AutoRecon as root you should not have this problem.')
timed_out = False timed_out = False
while pending: while pending:
@ -1875,20 +1866,7 @@ async def main():
if i >= num_new_targets: if i >= num_new_targets:
break break
if os.getuid() != 0 and ('DISPLAY' in os.environ and (os.environ['DISPLAY'] != None or os.environ['DISPLAY'] != '')): keyboard_monitor.cancel()
# The keyboard_monitor.stop() function sometimes seems to block forever.
# Since it will get killed at the end of the program anyway, if it takes
# more than 1 second to work, we'll time it out.
signal.signal(signal.SIGALRM, timeout)
signal.alarm(1)
try:
keyboard_monitor.stop()
except:
pass
# Cancel the alarm.
signal.alarm(0)
if timed_out: if timed_out:
cancel_all_tasks(None, None) cancel_all_tasks(None, None)
@ -1907,9 +1885,8 @@ async def main():
warn('{byellow}AutoRecon identified the following services, but could not match them to any plugins based on the service name. Please report these to Tib3rius: ' + ', '.join(autorecon.missing_services) + '{rst}') warn('{byellow}AutoRecon identified the following services, but could not match them to any plugins based on the service name. Please report these to Tib3rius: ' + ', '.join(autorecon.missing_services) + '{rst}')
if not autorecon.config['disable_keyboard_control']: if not autorecon.config['disable_keyboard_control']:
if os.getuid() == 0 or ('DISPLAY' in os.environ and (os.environ['DISPLAY'] != None or os.environ['DISPLAY'] != '')): # Restore original terminal settings.
# Restore original terminal settings. termios.tcsetattr(sys.stdin, termios.TCSADRAIN, terminal_settings)
termios.tcsetattr(sys.stdin, termios.TCSADRAIN, terminal_settings)
if __name__ == '__main__': if __name__ == '__main__':
# Capture Ctrl+C and cancel everything. # Capture Ctrl+C and cancel everything.

View File

@ -1,5 +1,3 @@
unidecode unidecode
toml toml
colorama colorama
pynput
keyboard