From 929bb456ecfc8cd30b01a8daaf0a2dac27779b00 Mon Sep 17 00:00:00 2001 From: Sourcery AI <> Date: Mon, 2 Oct 2023 15:43:23 +0000 Subject: [PATCH] 'Refactored by Sourcery' --- autorecon/default-plugins/dirbuster.py | 4 +- .../default-plugins/portscan-all-tcp-ports.py | 39 ++--- .../portscan-guess-tcp-ports.py | 20 ++- .../portscan-top-100-udp-ports.py | 13 +- .../default-plugins/portscan-top-tcp-ports.py | 11 +- .../default-plugins/reporting-cherrytree.py | 22 +-- .../default-plugins/subdomain-enumeration.py | 13 +- .../virtual-host-enumeration.py | 19 ++- autorecon/default-plugins/winrm-detection.py | 5 +- autorecon/io.py | 15 +- autorecon/main.py | 131 ++++++++--------- autorecon/plugins.py | 136 ++++++++++-------- autorecon/targets.py | 20 +-- 13 files changed, 241 insertions(+), 207 deletions(-) diff --git a/autorecon/default-plugins/dirbuster.py b/autorecon/default-plugins/dirbuster.py index bd48ab0..cf3b9e6 100644 --- a/autorecon/default-plugins/dirbuster.py +++ b/autorecon/default-plugins/dirbuster.py @@ -41,7 +41,7 @@ class DirBuster(ServiceScan): return False async def run(self, service): - dot_extensions = ','.join(['.' + x for x in self.get_option('ext').split(',')]) + dot_extensions = ','.join([f'.{x}' for x in self.get_option('ext').split(',')]) for wordlist in self.get_option('wordlist'): name = os.path.splitext(os.path.basename(wordlist))[0] if self.get_option('tool') == 'feroxbuster': @@ -63,7 +63,7 @@ class DirBuster(ServiceScan): await service.execute('dirb {http_scheme}://{addressv6}:{port}/ ' + wordlist + ' -l ' + ('' if self.get_option('recursive') else '-r ') + '-S -X ",' + dot_extensions + '" -f -o "{scandir}/{protocol}_{port}_{http_scheme}_dirb_' + name + '.txt"' + (' ' + self.get_option('extras') if self.get_option('extras') else '')) def manual(self, service, plugin_was_run): - dot_extensions = ','.join(['.' + x for x in self.get_option('ext').split(',')]) + dot_extensions = ','.join([f'.{x}' for x in self.get_option('ext').split(',')]) if self.get_option('tool') == 'feroxbuster': service.add_manual_command('(feroxbuster) Multi-threaded recursive directory/file enumeration for web servers using various wordlists:', [ 'feroxbuster -u {http_scheme}://{addressv6}:{port} -t ' + str(self.get_option('threads')) + ' -w /usr/share/wordlists/dirbuster/directory-list-2.3-medium.txt -x "' + self.get_option('ext') + '" -v -k ' + ('' if self.get_option('recursive') else '-n ') + '-e -r -o {scandir}/{protocol}_{port}_{http_scheme}_feroxbuster_dirbuster.txt' + (' ' + self.get_option('extras') if self.get_option('extras') else '') diff --git a/autorecon/default-plugins/portscan-all-tcp-ports.py b/autorecon/default-plugins/portscan-all-tcp-ports.py index c6dc70e..a403cc8 100644 --- a/autorecon/default-plugins/portscan-all-tcp-ports.py +++ b/autorecon/default-plugins/portscan-all-tcp-ports.py @@ -13,11 +13,7 @@ class AllTCPPortScan(PortScan): self.tags = ['default', 'default-port-scan', 'long'] async def run(self, target): - if config['proxychains']: - traceroute_os = '' - else: - traceroute_os = ' -A --osscan-guess' - + traceroute_os = '' if config['proxychains'] else ' -A --osscan-guess' if target.ports: if target.ports['tcp']: process, stdout, stderr = await target.execute('nmap {nmap_extra} -sV -sC --version-all' + traceroute_os + ' -p ' + target.ports['tcp'] + ' -oN "{scandir}/_full_tcp_nmap.txt" -oX "{scandir}/xml/_full_tcp_nmap.xml" {address}', blocking=False) @@ -28,25 +24,20 @@ class AllTCPPortScan(PortScan): services = [] while True: line = await stdout.readline() - if line is not None: - match = re.search('^Discovered open port ([0-9]+)/tcp', line) - if match: - target.info('Discovered open port {bmagenta}tcp/' + match.group(1) + '{rst} on {byellow}' + target.address + '{rst}', verbosity=1) - service = target.extract_service(line) - - if service: - # Check if HTTP service appears to be WinRM. If so, override service name as wsman. - if service.name == 'http' and service.port in [5985, 5986]: - wsman = requests.get(('https' if service.secure else 'http') + '://' + target.address + ':' + str(service.port) + '/wsman', verify=False) - if wsman.status_code == 405: - service.name = 'wsman' - wsman = requests.post(('https' if service.secure else 'http') + '://' + target.address + ':' + str(service.port) + '/wsman', verify=False) - else: - if wsman.status_code == 401: - service.name = 'wsman' - - services.append(service) - else: + if line is None: break + if match := re.search('^Discovered open port ([0-9]+)/tcp', line): + target.info('Discovered open port {bmagenta}tcp/' + match.group(1) + '{rst} on {byellow}' + target.address + '{rst}', verbosity=1) + if service := target.extract_service(line): + # Check if HTTP service appears to be WinRM. If so, override service name as wsman. + if service.name == 'http' and service.port in [5985, 5986]: + wsman = requests.get(('https' if service.secure else 'http') + '://' + target.address + ':' + str(service.port) + '/wsman', verify=False) + if wsman.status_code == 405: + service.name = 'wsman' + wsman = requests.post(('https' if service.secure else 'http') + '://' + target.address + ':' + str(service.port) + '/wsman', verify=False) + elif wsman.status_code == 401: + service.name = 'wsman' + + services.append(service) await process.wait() return services diff --git a/autorecon/default-plugins/portscan-guess-tcp-ports.py b/autorecon/default-plugins/portscan-guess-tcp-ports.py index 1919fff..ffb504b 100644 --- a/autorecon/default-plugins/portscan-guess-tcp-ports.py +++ b/autorecon/default-plugins/portscan-guess-tcp-ports.py @@ -31,18 +31,16 @@ class GuessPortScan(PortScan): services = [] while True: line = await stdout.readline() - if line is not None: - match = re.match('^Discovered open port ([0-9]+)/tcp', line) - if match: - if match.group(1) in insecure_ports.keys(): - await target.add_service(Service('tcp', match.group(1), insecure_ports[match.group(1)])) - elif match.group(1) in secure_ports.keys(): - await target.add_service(Service('tcp', match.group(1), secure_ports[match.group(1)], True)) - service = target.extract_service(line) - if service is not None: - services.append(service) - else: + if line is None: break + if match := re.match('^Discovered open port ([0-9]+)/tcp', line): + if match.group(1) in insecure_ports: + await target.add_service(Service('tcp', match.group(1), insecure_ports[match.group(1)])) + elif match.group(1) in secure_ports: + await target.add_service(Service('tcp', match.group(1), secure_ports[match.group(1)], True)) + service = target.extract_service(line) + if service is not None: + services.append(service) await process.wait() return services diff --git a/autorecon/default-plugins/portscan-top-100-udp-ports.py b/autorecon/default-plugins/portscan-top-100-udp-ports.py index 7f28de3..85c8241 100644 --- a/autorecon/default-plugins/portscan-top-100-udp-ports.py +++ b/autorecon/default-plugins/portscan-top-100-udp-ports.py @@ -25,15 +25,12 @@ class Top100UDPPortScan(PortScan): services = [] while True: line = await stdout.readline() - if line is not None: - match = re.search('^Discovered open port ([0-9]+)/udp', line) - if match: - target.info('Discovered open port {bmagenta}udp/' + match.group(1) + '{rst} on {byellow}' + target.address + '{rst}', verbosity=1) - service = target.extract_service(line) - if service: - services.append(service) - else: + if line is None: break + if match := re.search('^Discovered open port ([0-9]+)/udp', line): + target.info('Discovered open port {bmagenta}udp/' + match.group(1) + '{rst} on {byellow}' + target.address + '{rst}', verbosity=1) + if service := target.extract_service(line): + services.append(service) await process.wait() return services else: diff --git a/autorecon/default-plugins/portscan-top-tcp-ports.py b/autorecon/default-plugins/portscan-top-tcp-ports.py index d332812..72e3233 100644 --- a/autorecon/default-plugins/portscan-top-tcp-ports.py +++ b/autorecon/default-plugins/portscan-top-tcp-ports.py @@ -16,11 +16,7 @@ class QuickTCPPortScan(PortScan): if target.ports: # Don't run this plugin if there are custom ports. return [] - if config['proxychains']: - traceroute_os = '' - else: - traceroute_os = ' -A --osscan-guess' - + traceroute_os = '' if config['proxychains'] else ' -A --osscan-guess' process, stdout, stderr = await target.execute('nmap {nmap_extra} -sV -sC --version-all' + traceroute_os + ' -oN "{scandir}/_quick_tcp_nmap.txt" -oX "{scandir}/xml/_quick_tcp_nmap.xml" {address}', blocking=False) services = await target.extract_services(stdout) @@ -31,9 +27,8 @@ class QuickTCPPortScan(PortScan): if wsman.status_code == 405: service.name = 'wsman' wsman = requests.post(('https' if service.secure else 'http') + '://' + target.address + ':' + str(service.port) + '/wsman', verify=False) - else: - if wsman.status_code == 401: - service.name = 'wsman' + elif wsman.status_code == 401: + service.name = 'wsman' await process.wait() return services diff --git a/autorecon/default-plugins/reporting-cherrytree.py b/autorecon/default-plugins/reporting-cherrytree.py index bd1d4d8..8580004 100644 --- a/autorecon/default-plugins/reporting-cherrytree.py +++ b/autorecon/default-plugins/reporting-cherrytree.py @@ -21,7 +21,10 @@ class CherryTree(Report): with open(report, 'w') as output: output.writelines('\n\n') for target in targets: - output.writelines('\n') + output.writelines( + f'\n' + ) files = [os.path.abspath(filename) for filename in glob.iglob(os.path.join(target.scandir, '**/*'), recursive=True) if os.path.isfile(filename) and filename.endswith(('.txt', '.html'))] @@ -31,7 +34,7 @@ class CherryTree(Report): if len(target.scans['ports'][scan]['commands']) > 0: output.writelines('\n') for command in target.scans['ports'][scan]['commands']: - output.writelines('' + escape(command[0])) + output.writelines(f'{escape(command[0])}') for filename in files: if filename in command[0] or (command[1] is not None and filename == command[1]) or (command[2] is not None and filename == command[2]): output.writelines('\n\n' + escape(filename) + ':\n\n') @@ -43,12 +46,15 @@ class CherryTree(Report): if target.scans['services']: output.writelines('\n') for service in target.scans['services'].keys(): - output.writelines('\n') + output.writelines( + f'\n' + ) for plugin in target.scans['services'][service].keys(): if len(target.scans['services'][service][plugin]['commands']) > 0: output.writelines('\n') for command in target.scans['services'][service][plugin]['commands']: - output.writelines('' + escape(command[0])) + output.writelines(f'{escape(command[0])}') for filename in files: if filename in command[0] or (command[1] is not None and filename == command[1]) or (command[2] is not None and filename == command[2]): output.writelines('\n\n' + escape(filename) + ':\n\n') @@ -63,28 +69,28 @@ class CherryTree(Report): if os.path.isfile(manual_commands): output.writelines('\n') with open(manual_commands, 'r') as file: - output.writelines('' + escape(file.read()) + '\n') + output.writelines(f'{escape(file.read())}' + '\n') output.writelines('\n') patterns = os.path.join(target.scandir, '_patterns.log') if os.path.isfile(patterns): output.writelines('\n') with open(patterns, 'r') as file: - output.writelines('' + escape(file.read()) + '\n') + output.writelines(f'{escape(file.read())}' + '\n') output.writelines('\n') commands = os.path.join(target.scandir, '_commands.log') if os.path.isfile(commands): output.writelines('\n') with open(commands, 'r') as file: - output.writelines('' + escape(file.read()) + '\n') + output.writelines(f'{escape(file.read())}' + '\n') output.writelines('\n') errors = os.path.join(target.scandir, '_errors.log') if os.path.isfile(errors): output.writelines('\n') with open(errors, 'r') as file: - output.writelines('' + escape(file.read()) + '\n') + output.writelines(f'{escape(file.read())}' + '\n') output.writelines('\n') output.writelines('\n') diff --git a/autorecon/default-plugins/subdomain-enumeration.py b/autorecon/default-plugins/subdomain-enumeration.py index 185f5a3..3411c76 100644 --- a/autorecon/default-plugins/subdomain-enumeration.py +++ b/autorecon/default-plugins/subdomain-enumeration.py @@ -25,10 +25,19 @@ class SubdomainEnumeration(ServiceScan): if self.get_global('domain') and self.get_global('domain') not in domains: domains.append(self.get_global('domain')) - if len(domains) > 0: + if domains: for wordlist in self.get_option('wordlist'): name = os.path.splitext(os.path.basename(wordlist))[0] for domain in domains: - await service.execute('gobuster dns -d ' + domain + ' -r {addressv6} -w ' + wordlist + ' -o "{scandir}/{protocol}_{port}_' + domain + '_subdomains_' + name + '.txt"') + await service.execute( + f'gobuster dns -d {domain}' + + ' -r {addressv6} -w ' + + wordlist + + ' -o "{scandir}/{protocol}_{port}_' + + domain + + '_subdomains_' + + name + + '.txt"' + ) else: service.info('The target was not a domain, nor was a domain provided as an option. Skipping subdomain enumeration.') diff --git a/autorecon/default-plugins/virtual-host-enumeration.py b/autorecon/default-plugins/virtual-host-enumeration.py index 6689de3..2959f57 100644 --- a/autorecon/default-plugins/virtual-host-enumeration.py +++ b/autorecon/default-plugins/virtual-host-enumeration.py @@ -27,11 +27,26 @@ class VirtualHost(ServiceScan): if self.get_global('domain') and self.get_global('domain') not in hostnames: hostnames.append(self.get_global('domain')) - if len(hostnames) > 0: + if hostnames: for wordlist in self.get_option('wordlist'): name = os.path.splitext(os.path.basename(wordlist))[0] for hostname in hostnames: - wildcard = requests.get(('https' if service.secure else 'http') + '://' + service.target.address + ':' + str(service.port) + '/', headers={'Host':''.join(random.choice(string.ascii_letters) for i in range(20)) + '.' + hostname}, verify=False) + wildcard = requests.get( + ('https' if service.secure else 'http') + + '://' + + service.target.address + + ':' + + str(service.port) + + '/', + headers={ + 'Host': ''.join( + random.choice(string.ascii_letters) for _ in range(20) + ) + + '.' + + hostname + }, + verify=False, + ) size = str(len(wildcard.content)) await service.execute('ffuf -u {http_scheme}://' + hostname + ':{port}/ -t ' + str(self.get_option('threads')) + ' -w ' + wordlist + ' -H "Host: FUZZ.' + hostname + '" -mc all -fs ' + size + ' -r -noninteractive -s | tee "{scandir}/{protocol}_{port}_{http_scheme}_' + hostname + '_vhosts_' + name + '.txt"') diff --git a/autorecon/default-plugins/winrm-detection.py b/autorecon/default-plugins/winrm-detection.py index 8742f47..b898974 100644 --- a/autorecon/default-plugins/winrm-detection.py +++ b/autorecon/default-plugins/winrm-detection.py @@ -15,7 +15,10 @@ class WinRMDetection(ServiceScan): async def run(self, service): filename = fformat('{scandir}/{protocol}_{port}_winrm-detection.txt') with open(filename, mode='wt', encoding='utf8') as winrm: - winrm.write('WinRM was possibly detected running on ' + service.protocol + ' port ' + str(service.port) + '.\nCheck _manual_commands.txt for manual commands you can run against this service.') + winrm.write( + f'WinRM was possibly detected running on {service.protocol} port {str(service.port)}' + + '.\nCheck _manual_commands.txt for manual commands you can run against this service.' + ) def manual(self, service, plugin_was_run): service.add_manual_commands('Bruteforce logins:', [ diff --git a/autorecon/io.py b/autorecon/io.py index 3ccace6..bf5b0d5 100644 --- a/autorecon/io.py +++ b/autorecon/io.py @@ -52,12 +52,12 @@ def cprint(*args, color=Fore.RESET, char='*', sep=' ', end='\n', frame_index=1, unfmt = '' if char is not None and not config['accessible']: - unfmt += color + '[' + Style.BRIGHT + char + Style.NORMAL + ']' + Fore.RESET + sep + unfmt += f'{color}[{Style.BRIGHT}{char}{Style.NORMAL}]{Fore.RESET}{sep}' unfmt += sep.join(args) fmted = unfmt - for attempt in range(10): + for _ in range(10): try: fmted = string.Formatter().vformat(unfmt, args, vals) break @@ -128,9 +128,7 @@ class CommandStreamReader(object): for p in self.patterns: description = '' - # Match and replace entire pattern. - match = p.pattern.search(line) - if match: + if match := p.pattern.search(line): if p.description: description = p.description.replace('{match}', line[match.start():match.end()]) @@ -139,12 +137,9 @@ class CommandStreamReader(object): if len(matches) > 0 and isinstance(matches[0], tuple): matches = list(matches[0]) - match_count = 1 - for match in matches: + for match_count, match in enumerate(matches, start=1): if p.description: description = description.replace('{match' + str(match_count) + '}', match) - match_count += 1 - async with self.target.lock: with open(os.path.join(self.target.scandir, '_patterns.log'), 'a') as file: info('{bright}[{yellow}' + self.target.address + '{crst}/{bgreen}' + self.tag + '{crst}]{rst} {bmagenta}' + description + '{rst}', verbosity=2) @@ -153,7 +148,7 @@ class CommandStreamReader(object): info('{bright}[{yellow}' + self.target.address + '{crst}/{bgreen}' + self.tag + '{crst}]{rst} {bmagenta}Matched Pattern: ' + line[match.start():match.end()] + '{rst}', verbosity=2) async with self.target.lock: with open(os.path.join(self.target.scandir, '_patterns.log'), 'a') as file: - file.writelines('Matched Pattern: ' + line[match.start():match.end()] + '\n\n') + file.writelines(f'Matched Pattern: {line[match.start():match.end()]}' + '\n\n') if self.outfile is not None: with open(self.outfile, 'a') as writer: diff --git a/autorecon/main.py b/autorecon/main.py index f8ff349..3b10bcd 100644 --- a/autorecon/main.py +++ b/autorecon/main.py @@ -22,7 +22,7 @@ VERSION = "2.0.34" if not os.path.exists(config['config_dir']): shutil.rmtree(config['config_dir'], ignore_errors=True, onerror=None) os.makedirs(config['config_dir'], exist_ok=True) - open(os.path.join(config['config_dir'], 'VERSION-' + VERSION), 'a').close() + open(os.path.join(config['config_dir'], f'VERSION-{VERSION}'), 'a').close() shutil.copy(os.path.join(os.path.dirname(os.path.realpath(__file__)), 'config.toml'), os.path.join(config['config_dir'], 'config.toml')) shutil.copy(os.path.join(os.path.dirname(os.path.realpath(__file__)), 'global.toml'), os.path.join(config['config_dir'], 'global.toml')) else: @@ -30,14 +30,16 @@ else: shutil.copy(os.path.join(os.path.dirname(os.path.realpath(__file__)), 'config.toml'), os.path.join(config['config_dir'], 'config.toml')) if not os.path.exists(os.path.join(config['config_dir'], 'global.toml')): shutil.copy(os.path.join(os.path.dirname(os.path.realpath(__file__)), 'global.toml'), os.path.join(config['config_dir'], 'global.toml')) - if not os.path.exists(os.path.join(config['config_dir'], 'VERSION-' + VERSION)): + if not os.path.exists( + os.path.join(config['config_dir'], f'VERSION-{VERSION}') + ): warn('It looks like the config in ' + config['config_dir'] + ' is outdated. Please remove the ' + config['config_dir'] + ' directory and re-run AutoRecon to rebuild it.') if not os.path.exists(config['data_dir']): shutil.rmtree(config['data_dir'], ignore_errors=True, onerror=None) os.makedirs(config['data_dir'], exist_ok=True) - open(os.path.join(config['data_dir'], 'VERSION-' + VERSION), 'a').close() + open(os.path.join(config['data_dir'], f'VERSION-{VERSION}'), 'a').close() shutil.copytree(os.path.join(os.path.dirname(os.path.realpath(__file__)), 'default-plugins'), os.path.join(config['data_dir'], 'plugins')) shutil.copytree(os.path.join(os.path.dirname(os.path.realpath(__file__)), 'wordlists'), os.path.join(config['data_dir'], 'wordlists')) else: @@ -45,7 +47,7 @@ else: shutil.copytree(os.path.join(os.path.dirname(os.path.realpath(__file__)), 'default-plugins'), os.path.join(config['data_dir'], 'plugins')) if not os.path.exists(os.path.join(config['data_dir'], 'wordlists')): shutil.copytree(os.path.join(os.path.dirname(os.path.realpath(__file__)), 'wordlists'), os.path.join(config['data_dir'], 'wordlists')) - if not os.path.exists(os.path.join(config['data_dir'], 'VERSION-' + VERSION)): + if not os.path.exists(os.path.join(config['data_dir'], f'VERSION-{VERSION}')): warn('It looks like the plugins in ' + config['data_dir'] + ' are outdated. Please remove the ' + config['data_dir'] + ' directory and re-run AutoRecon to rebuild them.') @@ -63,34 +65,28 @@ def calculate_elapsed_time(start_time, short=False): elapsed_time = [] if short: elapsed_time.append(str(h).zfill(2)) - else: - if h == 1: - elapsed_time.append(str(h) + ' hour') - elif h > 1: - elapsed_time.append(str(h) + ' hours') + elif h == 1: + elapsed_time.append(f'{str(h)} hour') + elif h > 1: + elapsed_time.append(f'{str(h)} hours') if short: elapsed_time.append(str(m).zfill(2)) - else: - if m == 1: - elapsed_time.append(str(m) + ' minute') - elif m > 1: - elapsed_time.append(str(m) + ' minutes') + elif m == 1: + elapsed_time.append(f'{str(m)} minute') + elif m > 1: + elapsed_time.append(f'{str(m)} minutes') if short: elapsed_time.append(str(s).zfill(2)) + elif s == 1: + elapsed_time.append(f'{str(s)} second') + elif s > 1: + elapsed_time.append(f'{str(s)} seconds') else: - if s == 1: - elapsed_time.append(str(s) + ' second') - elif s > 1: - elapsed_time.append(str(s) + ' seconds') - else: - elapsed_time.append('less than a second') + elapsed_time.append('less than a second') - if short: - return ':'.join(elapsed_time) - else: - return ', '.join(elapsed_time) + return ':'.join(elapsed_time) if short else ', '.join(elapsed_time) # sig and frame args are only present so the function # works with signal.signal() and handles Ctrl-C. @@ -143,14 +139,18 @@ async def start_heartbeat(target, period=60): if process_dict['process'].returncode is None: processes.append(str(process_dict['process'].pid)) try: - for child in psutil.Process(process_dict['process'].pid).children(recursive=True): - processes.append(str(child.pid)) + processes.extend( + str(child.pid) + for child in psutil.Process(process_dict['process'].pid).children( + recursive=True + ) + ) except psutil.NoSuchProcess: pass - + if processes: task_str += ' (PID' + ('s' if len(processes) > 1 else '') + ': ' + ', '.join(processes) + ')' - + tasks_list.append(task_str) tasks_list = ': {bblue}' + ', '.join(tasks_list) + '{rst}' @@ -183,9 +183,8 @@ async def keyboard(): else: config['verbose'] -= 1 info('Verbosity decreased to ' + str(config['verbose'])) - else: - if input[0] != 's': - input = input[1:] + elif input[0] != 's': + input = input[1:] if len(input) > 0 and input[0] == 's': input = input[1:] @@ -206,14 +205,18 @@ async def keyboard(): if process_dict['process'].returncode is None: processes.append(str(process_dict['process'].pid)) try: - for child in psutil.Process(process_dict['process'].pid).children(recursive=True): - processes.append(str(child.pid)) + processes.extend( + str(child.pid) + for child in psutil.Process( + process_dict['process'].pid + ).children(recursive=True) + ) except psutil.NoSuchProcess: pass - + if processes: task_str += ' (PID' + ('s' if len(processes) > 1 else '') + ': ' + ', '.join(processes) + ')' - + tasks_list.append(task_str) tasks_list = ':\n ' + '\n '.join(tasks_list) @@ -233,35 +236,31 @@ async def keyboard(): async def get_semaphore(autorecon): semaphore = autorecon.service_scan_semaphore while True: - # If service scan semaphore is locked, see if we can use port scan semaphore. - if semaphore.locked(): - if semaphore != autorecon.port_scan_semaphore: # This will be true unless user sets max_scans == max_port_scans - - port_scan_task_count = 0 - for target in autorecon.scanning_targets: - for process_list in target.running_tasks.values(): - if issubclass(process_list['plugin'].__class__, PortScan): - port_scan_task_count += 1 - - if not autorecon.pending_targets and (config['max_port_scans'] - port_scan_task_count) >= 1: # If no more targets, and we have room, use port scan semaphore. - if autorecon.port_scan_semaphore.locked(): - await asyncio.sleep(1) - continue - semaphore = autorecon.port_scan_semaphore - break - else: # Do some math to see if we can use the port scan semaphore. - if (config['max_port_scans'] - (port_scan_task_count + (len(autorecon.pending_targets) * config['port_scan_plugin_count']))) >= 1: - if autorecon.port_scan_semaphore.locked(): - await asyncio.sleep(1) - continue - semaphore = autorecon.port_scan_semaphore - break - else: - await asyncio.sleep(1) - else: - break - else: + if not semaphore.locked(): break + if semaphore == autorecon.port_scan_semaphore: + break + port_scan_task_count = 0 + for target in autorecon.scanning_targets: + for process_list in target.running_tasks.values(): + if issubclass(process_list['plugin'].__class__, PortScan): + port_scan_task_count += 1 + + if not autorecon.pending_targets and (config['max_port_scans'] - port_scan_task_count) >= 1: # If no more targets, and we have room, use port scan semaphore. + if autorecon.port_scan_semaphore.locked(): + await asyncio.sleep(1) + continue + semaphore = autorecon.port_scan_semaphore + break + else: # Do some math to see if we can use the port scan semaphore. + if (config['max_port_scans'] - (port_scan_task_count + (len(autorecon.pending_targets) * config['port_scan_plugin_count']))) >= 1: + if autorecon.port_scan_semaphore.locked(): + await asyncio.sleep(1) + continue + semaphore = autorecon.port_scan_semaphore + break + else: + await asyncio.sleep(1) return semaphore async def port_scan(plugin, target): @@ -314,7 +313,11 @@ async def port_scan(plugin, target): error('Port scan {bblue}' + plugin.name + ' {green}(' + plugin.slug + '){rst} ran a command against {byellow}' + target.address + '{rst} which returned a non-zero exit code (' + str(process_dict['process'].returncode) + '). Check ' + target.scandir + '/_errors.log for more details.', verbosity=2) async with target.lock: with open(os.path.join(target.scandir, '_errors.log'), 'a') as file: - file.writelines('[*] Port scan ' + plugin.name + ' (' + plugin.slug + ') ran a command which returned a non-zero exit code (' + str(process_dict['process'].returncode) + ').\n') + file.writelines( + f'[*] Port scan {plugin.name} ({plugin.slug}) ran a command which returned a non-zero exit code (' + + str(process_dict['process'].returncode) + + ').\n' + ) file.writelines('[-] Command: ' + process_dict['cmd'] + '\n') if errors: file.writelines(['[-] Error Output:\n'] + errors + ['\n']) diff --git a/autorecon/plugins.py b/autorecon/plugins.py index 222de4f..1f323ea 100644 --- a/autorecon/plugins.py +++ b/autorecon/plugins.py @@ -45,7 +45,12 @@ class Plugin(object): @final def add_choice_option(self, name, choices, default=None, help=None): if not isinstance(choices, list): - fail('The choices argument for ' + self.name + '\'s ' + name + ' choice option should be a list.') + fail( + f'The choices argument for {self.name}' + + '\'s ' + + name + + ' choice option should be a list.' + ) self.autorecon.add_argument(self, name, choices=choices, default=default, help=help) @final @@ -53,35 +58,23 @@ class Plugin(object): # TODO: make sure name is simple. name = self.slug.replace('-', '_') + '.' + slugify(name).replace('-', '_') - if name in vars(self.autorecon.args): - if vars(self.autorecon.args)[name] is None: - if default: - return default - else: - return None - else: - return vars(self.autorecon.args)[name] + if name not in vars(self.autorecon.args): + return default if default else None + if vars(self.autorecon.args)[name] is None: + return default if default else None else: - if default: - return default - return None + return vars(self.autorecon.args)[name] @final def get_global_option(self, name, default=None): name = 'global.' + slugify(name).replace('-', '_') - if name in vars(self.autorecon.args): - if vars(self.autorecon.args)[name] is None: - if default: - return default - else: - return None - else: - return vars(self.autorecon.args)[name] + if name not in vars(self.autorecon.args): + return default if default else None + if vars(self.autorecon.args)[name] is None: + return default if default else None else: - if default: - return default - return None + return vars(self.autorecon.args)[name] @final def get_global(self, name, default=None): @@ -96,7 +89,9 @@ class Plugin(object): else: self.patterns.append(Pattern(compiled)) except re.error: - fail('Error: The pattern "' + pattern + '" in the plugin "' + self.name + '" is invalid regex.') + fail( + f'Error: The pattern "{pattern}" in the plugin "{self.name}" is invalid regex.' + ) @final def info(self, msg, verbosity=0): @@ -154,7 +149,7 @@ class ServiceScan(Plugin): try: re.compile(r) except re.error: - print('Invalid regex: ' + r) + print(f'Invalid regex: {r}') valid_regex = False if not valid_regex: @@ -190,7 +185,7 @@ class ServiceScan(Plugin): try: re.compile(r) except re.error: - print('Invalid regex: ' + r) + print(f'Invalid regex: {r}') valid_regex = False if valid_regex: @@ -246,7 +241,7 @@ class AutoRecon(object): def add_argument(self, plugin, name, **kwargs): # TODO: make sure name is simple. - name = '--' + plugin.slug + '.' + slugify(name) + name = f'--{plugin.slug}.{slugify(name)}' if self.argparse_group is None: self.argparse_group = self.argparse.add_argument_group('plugin arguments', description='These are optional arguments for certain plugins.') @@ -255,19 +250,17 @@ class AutoRecon(object): def extract_service(self, line, regex): if regex is None: regex = '^(?P\d+)\/(?P(tcp|udp))(.*)open(\s*)(?P[\w\-\/]+)(\s*)(.*)$' - match = re.search(regex, line) - if match: - protocol = match.group('protocol').lower() - port = int(match.group('port')) - service = match.group('service') - secure = True if 'ssl' in service or 'tls' in service else False - - if service.startswith('ssl/') or service.startswith('tls/'): - service = service[4:] - - return Service(protocol, port, service, secure) - else: + if not (match := re.search(regex, line)): return None + protocol = match.group('protocol').lower() + port = int(match.group('port')) + service = match.group('service') + secure = 'ssl' in service or 'tls' in service + + if service.startswith('ssl/') or service.startswith('tls/'): + service = service[4:] + + return Service(protocol, port, service, secure) async def extract_services(self, stream, regex): if not isinstance(stream, CommandStreamReader): @@ -278,8 +271,7 @@ class AutoRecon(object): while True: line = await stream.readline() if line is not None: - service = self.extract_service(line, regex) - if service: + if service := self.extract_service(line, regex): services.append(service) else: break @@ -290,25 +282,38 @@ class AutoRecon(object): return if plugin.name is None: - fail('Error: Plugin with class name "' + plugin.__class__.__name__ + '" in ' + filename + ' does not have a name.') + fail( + f'Error: Plugin with class name "{plugin.__class__.__name__}" in {filename} does not have a name.' + ) for _, loaded_plugin in self.plugins.items(): if plugin.name == loaded_plugin.name: - fail('Error: Duplicate plugin name "' + plugin.name + '" detected in ' + filename + '.', file=sys.stderr) + fail( + f'Error: Duplicate plugin name "{plugin.name}" detected in {filename}.', + file=sys.stderr, + ) if plugin.slug is None: plugin.slug = slugify(plugin.name) elif not self.__slug_regex.match(plugin.slug): - fail('Error: provided slug "' + plugin.slug + '" in ' + filename + ' is not valid (must only contain lowercase letters, numbers, and hyphens).', file=sys.stderr) + fail( + f'Error: provided slug "{plugin.slug}" in {filename} is not valid (must only contain lowercase letters, numbers, and hyphens).', + file=sys.stderr, + ) if plugin.slug in config['protected_classes']: - fail('Error: plugin slug "' + plugin.slug + '" in ' + filename + ' is a protected string. Please change.') + fail( + f'Error: plugin slug "{plugin.slug}" in {filename} is a protected string. Please change.' + ) if plugin.slug not in self.plugins: for _, loaded_plugin in self.plugins.items(): if plugin is loaded_plugin: - fail('Error: plugin "' + plugin.name + '" in ' + filename + ' already loaded as "' + loaded_plugin.name + '" (' + str(loaded_plugin) + ')', file=sys.stderr) + fail( + f'Error: plugin "{plugin.name}" in {filename} already loaded as "{loaded_plugin.name}" ({str(loaded_plugin)})', + file=sys.stderr, + ) configure_function_found = False run_coroutine_found = False @@ -319,30 +324,46 @@ class AutoRecon(object): configure_function_found = True elif member_name == 'run' and inspect.iscoroutinefunction(member_value): if len(inspect.getfullargspec(member_value).args) != 2: - fail('Error: the "run" coroutine in the plugin "' + plugin.name + '" in ' + filename + ' should have two arguments.', file=sys.stderr) + fail( + f'Error: the "run" coroutine in the plugin "{plugin.name}" in {filename} should have two arguments.', + file=sys.stderr, + ) run_coroutine_found = True elif member_name == 'manual': if len(inspect.getfullargspec(member_value).args) != 3: - fail('Error: the "manual" function in the plugin "' + plugin.name + '" in ' + filename + ' should have three arguments.', file=sys.stderr) + fail( + f'Error: the "manual" function in the plugin "{plugin.name}" in {filename} should have three arguments.', + file=sys.stderr, + ) manual_function_found = True if not run_coroutine_found and not manual_function_found: - fail('Error: the plugin "' + plugin.name + '" in ' + filename + ' needs either a "manual" function, a "run" coroutine, or both.', file=sys.stderr) + fail( + f'Error: the plugin "{plugin.name}" in {filename} needs either a "manual" function, a "run" coroutine, or both.', + file=sys.stderr, + ) if issubclass(plugin.__class__, PortScan): if plugin.type is None: - fail('Error: the PortScan plugin "' + plugin.name + '" in ' + filename + ' requires a type (either tcp or udp).') + fail( + f'Error: the PortScan plugin "{plugin.name}" in {filename} requires a type (either tcp or udp).' + ) else: plugin.type = plugin.type.lower() if plugin.type not in ['tcp', 'udp']: - fail('Error: the PortScan plugin "' + plugin.name + '" in ' + filename + ' has an invalid type (should be tcp or udp).') + fail( + f'Error: the PortScan plugin "{plugin.name}" in {filename} has an invalid type (should be tcp or udp).' + ) self.plugin_types["port"].append(plugin) elif issubclass(plugin.__class__, ServiceScan): self.plugin_types["service"].append(plugin) elif issubclass(plugin.__class__, Report): self.plugin_types["report"].append(plugin) else: - fail('Plugin "' + plugin.name + '" in ' + filename + ' is neither a PortScan, ServiceScan, nor a Report.', file=sys.stderr) + fail( + f'Plugin "{plugin.name}" in {filename} is neither a PortScan, ServiceScan, nor a Report.', + file=sys.stderr, + ) plugin.tags = [tag.lower() for tag in plugin.tags] @@ -354,14 +375,13 @@ class AutoRecon(object): plugin.configure() self.plugins[plugin.slug] = plugin else: - fail('Error: plugin slug "' + plugin.slug + '" in ' + filename + ' is already assigned.', file=sys.stderr) + fail( + f'Error: plugin slug "{plugin.slug}" in {filename} is already assigned.', + file=sys.stderr, + ) async def execute(self, cmd, target, tag, patterns=None, outfile=None, errfile=None): - if patterns: - combined_patterns = self.patterns + patterns - else: - combined_patterns = self.patterns - + combined_patterns = self.patterns + patterns if patterns else self.patterns process = await asyncio.create_subprocess_shell( cmd, stdin=open('/dev/null'), diff --git a/autorecon/targets.py b/autorecon/targets.py index 19a1b5c..b8419b2 100644 --- a/autorecon/targets.py +++ b/autorecon/targets.py @@ -58,13 +58,13 @@ class Target: nmap_extra = target.autorecon.args.nmap if target.autorecon.args.nmap_append: - nmap_extra += ' ' + target.autorecon.args.nmap_append + nmap_extra += f' {target.autorecon.args.nmap_append}' if target.ipversion == 'IPv6': nmap_extra += ' -6' if addressv6 == target.ip: - addressv6 = '[' + addressv6 + ']' - ipaddressv6 = '[' + ipaddressv6 + ']' + addressv6 = f'[{addressv6}]' + ipaddressv6 = f'[{ipaddressv6}]' plugin = inspect.currentframe().f_back.f_locals['self'] @@ -115,11 +115,13 @@ class Service: @final def tag(self): - return self.protocol + '/' + str(self.port) + '/' + self.name + return f'{self.protocol}/{str(self.port)}/{self.name}' @final def full_tag(self): - return self.protocol + '/' + str(self.port) + '/' + self.name + '/' + ('secure' if self.secure else 'insecure') + return f'{self.protocol}/{str(self.port)}/{self.name}/' + ( + 'secure' if self.secure else 'insecure' + ) @final def add_manual_commands(self, description, commands): @@ -174,7 +176,7 @@ class Service: nmap_extra = target.autorecon.args.nmap if target.autorecon.args.nmap_append: - nmap_extra += ' ' + target.autorecon.args.nmap_append + nmap_extra += f' {target.autorecon.args.nmap_append}' if protocol == 'udp': nmap_extra += ' -sU' @@ -182,15 +184,15 @@ class Service: if target.ipversion == 'IPv6': nmap_extra += ' -6' if addressv6 == target.ip: - addressv6 = '[' + addressv6 + ']' - ipaddressv6 = '[' + ipaddressv6 + ']' + addressv6 = f'[{addressv6}]' + ipaddressv6 = f'[{ipaddressv6}]' if config['proxychains'] and protocol == 'tcp': nmap_extra += ' -sT' plugin = inspect.currentframe().f_back.f_locals['self'] cmd = e(cmd) - tag = self.tag() + '/' + plugin.slug + tag = f'{self.tag()}/{plugin.slug}' plugin_tag = tag if plugin.run_once_boolean: plugin_tag = plugin.slug