'Refactored by Sourcery'

This commit is contained in:
Sourcery AI 2023-10-02 15:43:23 +00:00
parent 1160474b23
commit 929bb456ec
13 changed files with 241 additions and 207 deletions

View File

@ -41,7 +41,7 @@ class DirBuster(ServiceScan):
return False
async def run(self, service):
dot_extensions = ','.join(['.' + x for x in self.get_option('ext').split(',')])
dot_extensions = ','.join([f'.{x}' for x in self.get_option('ext').split(',')])
for wordlist in self.get_option('wordlist'):
name = os.path.splitext(os.path.basename(wordlist))[0]
if self.get_option('tool') == 'feroxbuster':
@ -63,7 +63,7 @@ class DirBuster(ServiceScan):
await service.execute('dirb {http_scheme}://{addressv6}:{port}/ ' + wordlist + ' -l ' + ('' if self.get_option('recursive') else '-r ') + '-S -X ",' + dot_extensions + '" -f -o "{scandir}/{protocol}_{port}_{http_scheme}_dirb_' + name + '.txt"' + (' ' + self.get_option('extras') if self.get_option('extras') else ''))
def manual(self, service, plugin_was_run):
dot_extensions = ','.join(['.' + x for x in self.get_option('ext').split(',')])
dot_extensions = ','.join([f'.{x}' for x in self.get_option('ext').split(',')])
if self.get_option('tool') == 'feroxbuster':
service.add_manual_command('(feroxbuster) Multi-threaded recursive directory/file enumeration for web servers using various wordlists:', [
'feroxbuster -u {http_scheme}://{addressv6}:{port} -t ' + str(self.get_option('threads')) + ' -w /usr/share/wordlists/dirbuster/directory-list-2.3-medium.txt -x "' + self.get_option('ext') + '" -v -k ' + ('' if self.get_option('recursive') else '-n ') + '-e -r -o {scandir}/{protocol}_{port}_{http_scheme}_feroxbuster_dirbuster.txt' + (' ' + self.get_option('extras') if self.get_option('extras') else '')

View File

@ -13,11 +13,7 @@ class AllTCPPortScan(PortScan):
self.tags = ['default', 'default-port-scan', 'long']
async def run(self, target):
if config['proxychains']:
traceroute_os = ''
else:
traceroute_os = ' -A --osscan-guess'
traceroute_os = '' if config['proxychains'] else ' -A --osscan-guess'
if target.ports:
if target.ports['tcp']:
process, stdout, stderr = await target.execute('nmap {nmap_extra} -sV -sC --version-all' + traceroute_os + ' -p ' + target.ports['tcp'] + ' -oN "{scandir}/_full_tcp_nmap.txt" -oX "{scandir}/xml/_full_tcp_nmap.xml" {address}', blocking=False)
@ -28,25 +24,20 @@ class AllTCPPortScan(PortScan):
services = []
while True:
line = await stdout.readline()
if line is not None:
match = re.search('^Discovered open port ([0-9]+)/tcp', line)
if match:
target.info('Discovered open port {bmagenta}tcp/' + match.group(1) + '{rst} on {byellow}' + target.address + '{rst}', verbosity=1)
service = target.extract_service(line)
if service:
# Check if HTTP service appears to be WinRM. If so, override service name as wsman.
if service.name == 'http' and service.port in [5985, 5986]:
wsman = requests.get(('https' if service.secure else 'http') + '://' + target.address + ':' + str(service.port) + '/wsman', verify=False)
if wsman.status_code == 405:
service.name = 'wsman'
wsman = requests.post(('https' if service.secure else 'http') + '://' + target.address + ':' + str(service.port) + '/wsman', verify=False)
else:
if wsman.status_code == 401:
service.name = 'wsman'
services.append(service)
else:
if line is None:
break
if match := re.search('^Discovered open port ([0-9]+)/tcp', line):
target.info('Discovered open port {bmagenta}tcp/' + match.group(1) + '{rst} on {byellow}' + target.address + '{rst}', verbosity=1)
if service := target.extract_service(line):
# Check if HTTP service appears to be WinRM. If so, override service name as wsman.
if service.name == 'http' and service.port in [5985, 5986]:
wsman = requests.get(('https' if service.secure else 'http') + '://' + target.address + ':' + str(service.port) + '/wsman', verify=False)
if wsman.status_code == 405:
service.name = 'wsman'
wsman = requests.post(('https' if service.secure else 'http') + '://' + target.address + ':' + str(service.port) + '/wsman', verify=False)
elif wsman.status_code == 401:
service.name = 'wsman'
services.append(service)
await process.wait()
return services

View File

@ -31,18 +31,16 @@ class GuessPortScan(PortScan):
services = []
while True:
line = await stdout.readline()
if line is not None:
match = re.match('^Discovered open port ([0-9]+)/tcp', line)
if match:
if match.group(1) in insecure_ports.keys():
await target.add_service(Service('tcp', match.group(1), insecure_ports[match.group(1)]))
elif match.group(1) in secure_ports.keys():
await target.add_service(Service('tcp', match.group(1), secure_ports[match.group(1)], True))
service = target.extract_service(line)
if service is not None:
services.append(service)
else:
if line is None:
break
if match := re.match('^Discovered open port ([0-9]+)/tcp', line):
if match.group(1) in insecure_ports:
await target.add_service(Service('tcp', match.group(1), insecure_ports[match.group(1)]))
elif match.group(1) in secure_ports:
await target.add_service(Service('tcp', match.group(1), secure_ports[match.group(1)], True))
service = target.extract_service(line)
if service is not None:
services.append(service)
await process.wait()
return services

View File

@ -25,15 +25,12 @@ class Top100UDPPortScan(PortScan):
services = []
while True:
line = await stdout.readline()
if line is not None:
match = re.search('^Discovered open port ([0-9]+)/udp', line)
if match:
target.info('Discovered open port {bmagenta}udp/' + match.group(1) + '{rst} on {byellow}' + target.address + '{rst}', verbosity=1)
service = target.extract_service(line)
if service:
services.append(service)
else:
if line is None:
break
if match := re.search('^Discovered open port ([0-9]+)/udp', line):
target.info('Discovered open port {bmagenta}udp/' + match.group(1) + '{rst} on {byellow}' + target.address + '{rst}', verbosity=1)
if service := target.extract_service(line):
services.append(service)
await process.wait()
return services
else:

View File

@ -16,11 +16,7 @@ class QuickTCPPortScan(PortScan):
if target.ports: # Don't run this plugin if there are custom ports.
return []
if config['proxychains']:
traceroute_os = ''
else:
traceroute_os = ' -A --osscan-guess'
traceroute_os = '' if config['proxychains'] else ' -A --osscan-guess'
process, stdout, stderr = await target.execute('nmap {nmap_extra} -sV -sC --version-all' + traceroute_os + ' -oN "{scandir}/_quick_tcp_nmap.txt" -oX "{scandir}/xml/_quick_tcp_nmap.xml" {address}', blocking=False)
services = await target.extract_services(stdout)
@ -31,9 +27,8 @@ class QuickTCPPortScan(PortScan):
if wsman.status_code == 405:
service.name = 'wsman'
wsman = requests.post(('https' if service.secure else 'http') + '://' + target.address + ':' + str(service.port) + '/wsman', verify=False)
else:
if wsman.status_code == 401:
service.name = 'wsman'
elif wsman.status_code == 401:
service.name = 'wsman'
await process.wait()
return services

View File

@ -21,7 +21,10 @@ class CherryTree(Report):
with open(report, 'w') as output:
output.writelines('<?xml version="1.0" encoding="UTF-8"?>\n<cherrytree>\n')
for target in targets:
output.writelines('<node name="' + escape(target.address) + '" is_bold="1" custom_icon_id="1">\n')
output.writelines(
f'<node name="{escape(target.address)}'
+ '" is_bold="1" custom_icon_id="1">\n'
)
files = [os.path.abspath(filename) for filename in glob.iglob(os.path.join(target.scandir, '**/*'), recursive=True) if os.path.isfile(filename) and filename.endswith(('.txt', '.html'))]
@ -31,7 +34,7 @@ class CherryTree(Report):
if len(target.scans['ports'][scan]['commands']) > 0:
output.writelines('<node name="PortScan: ' + escape(target.scans['ports'][scan]['plugin'].name) + '" custom_icon_id="21">\n')
for command in target.scans['ports'][scan]['commands']:
output.writelines('<rich_text>' + escape(command[0]))
output.writelines(f'<rich_text>{escape(command[0])}')
for filename in files:
if filename in command[0] or (command[1] is not None and filename == command[1]) or (command[2] is not None and filename == command[2]):
output.writelines('\n\n' + escape(filename) + ':\n\n')
@ -43,12 +46,15 @@ class CherryTree(Report):
if target.scans['services']:
output.writelines('<node name="Services" custom_icon_id="2">\n')
for service in target.scans['services'].keys():
output.writelines('<node name="Service: ' + escape(service.tag()) + '" custom_icon_id="3">\n')
output.writelines(
f'<node name="Service: {escape(service.tag())}'
+ '" custom_icon_id="3">\n'
)
for plugin in target.scans['services'][service].keys():
if len(target.scans['services'][service][plugin]['commands']) > 0:
output.writelines('<node name="' + escape(target.scans['services'][service][plugin]['plugin'].name) + '" custom_icon_id="21">\n')
for command in target.scans['services'][service][plugin]['commands']:
output.writelines('<rich_text>' + escape(command[0]))
output.writelines(f'<rich_text>{escape(command[0])}')
for filename in files:
if filename in command[0] or (command[1] is not None and filename == command[1]) or (command[2] is not None and filename == command[2]):
output.writelines('\n\n' + escape(filename) + ':\n\n')
@ -63,28 +69,28 @@ class CherryTree(Report):
if os.path.isfile(manual_commands):
output.writelines('<node name="Manual Commands" custom_icon_id="22">\n')
with open(manual_commands, 'r') as file:
output.writelines('<rich_text>' + escape(file.read()) + '</rich_text>\n')
output.writelines(f'<rich_text>{escape(file.read())}' + '</rich_text>\n')
output.writelines('</node>\n')
patterns = os.path.join(target.scandir, '_patterns.log')
if os.path.isfile(patterns):
output.writelines('<node name="Patterns" custom_icon_id="10">\n')
with open(patterns, 'r') as file:
output.writelines('<rich_text>' + escape(file.read()) + '</rich_text>\n')
output.writelines(f'<rich_text>{escape(file.read())}' + '</rich_text>\n')
output.writelines('</node>\n')
commands = os.path.join(target.scandir, '_commands.log')
if os.path.isfile(commands):
output.writelines('<node name="Commands" custom_icon_id="21">\n')
with open(commands, 'r') as file:
output.writelines('<rich_text>' + escape(file.read()) + '</rich_text>\n')
output.writelines(f'<rich_text>{escape(file.read())}' + '</rich_text>\n')
output.writelines('</node>\n')
errors = os.path.join(target.scandir, '_errors.log')
if os.path.isfile(errors):
output.writelines('<node name="Errors" custom_icon_id="57">\n')
with open(errors, 'r') as file:
output.writelines('<rich_text>' + escape(file.read()) + '</rich_text>\n')
output.writelines(f'<rich_text>{escape(file.read())}' + '</rich_text>\n')
output.writelines('</node>\n')
output.writelines('</node>\n')

View File

@ -25,10 +25,19 @@ class SubdomainEnumeration(ServiceScan):
if self.get_global('domain') and self.get_global('domain') not in domains:
domains.append(self.get_global('domain'))
if len(domains) > 0:
if domains:
for wordlist in self.get_option('wordlist'):
name = os.path.splitext(os.path.basename(wordlist))[0]
for domain in domains:
await service.execute('gobuster dns -d ' + domain + ' -r {addressv6} -w ' + wordlist + ' -o "{scandir}/{protocol}_{port}_' + domain + '_subdomains_' + name + '.txt"')
await service.execute(
f'gobuster dns -d {domain}'
+ ' -r {addressv6} -w '
+ wordlist
+ ' -o "{scandir}/{protocol}_{port}_'
+ domain
+ '_subdomains_'
+ name
+ '.txt"'
)
else:
service.info('The target was not a domain, nor was a domain provided as an option. Skipping subdomain enumeration.')

View File

@ -27,11 +27,26 @@ class VirtualHost(ServiceScan):
if self.get_global('domain') and self.get_global('domain') not in hostnames:
hostnames.append(self.get_global('domain'))
if len(hostnames) > 0:
if hostnames:
for wordlist in self.get_option('wordlist'):
name = os.path.splitext(os.path.basename(wordlist))[0]
for hostname in hostnames:
wildcard = requests.get(('https' if service.secure else 'http') + '://' + service.target.address + ':' + str(service.port) + '/', headers={'Host':''.join(random.choice(string.ascii_letters) for i in range(20)) + '.' + hostname}, verify=False)
wildcard = requests.get(
('https' if service.secure else 'http')
+ '://'
+ service.target.address
+ ':'
+ str(service.port)
+ '/',
headers={
'Host': ''.join(
random.choice(string.ascii_letters) for _ in range(20)
)
+ '.'
+ hostname
},
verify=False,
)
size = str(len(wildcard.content))
await service.execute('ffuf -u {http_scheme}://' + hostname + ':{port}/ -t ' + str(self.get_option('threads')) + ' -w ' + wordlist + ' -H "Host: FUZZ.' + hostname + '" -mc all -fs ' + size + ' -r -noninteractive -s | tee "{scandir}/{protocol}_{port}_{http_scheme}_' + hostname + '_vhosts_' + name + '.txt"')

View File

@ -15,7 +15,10 @@ class WinRMDetection(ServiceScan):
async def run(self, service):
filename = fformat('{scandir}/{protocol}_{port}_winrm-detection.txt')
with open(filename, mode='wt', encoding='utf8') as winrm:
winrm.write('WinRM was possibly detected running on ' + service.protocol + ' port ' + str(service.port) + '.\nCheck _manual_commands.txt for manual commands you can run against this service.')
winrm.write(
f'WinRM was possibly detected running on {service.protocol} port {str(service.port)}'
+ '.\nCheck _manual_commands.txt for manual commands you can run against this service.'
)
def manual(self, service, plugin_was_run):
service.add_manual_commands('Bruteforce logins:', [

View File

@ -52,12 +52,12 @@ def cprint(*args, color=Fore.RESET, char='*', sep=' ', end='\n', frame_index=1,
unfmt = ''
if char is not None and not config['accessible']:
unfmt += color + '[' + Style.BRIGHT + char + Style.NORMAL + ']' + Fore.RESET + sep
unfmt += f'{color}[{Style.BRIGHT}{char}{Style.NORMAL}]{Fore.RESET}{sep}'
unfmt += sep.join(args)
fmted = unfmt
for attempt in range(10):
for _ in range(10):
try:
fmted = string.Formatter().vformat(unfmt, args, vals)
break
@ -128,9 +128,7 @@ class CommandStreamReader(object):
for p in self.patterns:
description = ''
# Match and replace entire pattern.
match = p.pattern.search(line)
if match:
if match := p.pattern.search(line):
if p.description:
description = p.description.replace('{match}', line[match.start():match.end()])
@ -139,12 +137,9 @@ class CommandStreamReader(object):
if len(matches) > 0 and isinstance(matches[0], tuple):
matches = list(matches[0])
match_count = 1
for match in matches:
for match_count, match in enumerate(matches, start=1):
if p.description:
description = description.replace('{match' + str(match_count) + '}', match)
match_count += 1
async with self.target.lock:
with open(os.path.join(self.target.scandir, '_patterns.log'), 'a') as file:
info('{bright}[{yellow}' + self.target.address + '{crst}/{bgreen}' + self.tag + '{crst}]{rst} {bmagenta}' + description + '{rst}', verbosity=2)
@ -153,7 +148,7 @@ class CommandStreamReader(object):
info('{bright}[{yellow}' + self.target.address + '{crst}/{bgreen}' + self.tag + '{crst}]{rst} {bmagenta}Matched Pattern: ' + line[match.start():match.end()] + '{rst}', verbosity=2)
async with self.target.lock:
with open(os.path.join(self.target.scandir, '_patterns.log'), 'a') as file:
file.writelines('Matched Pattern: ' + line[match.start():match.end()] + '\n\n')
file.writelines(f'Matched Pattern: {line[match.start():match.end()]}' + '\n\n')
if self.outfile is not None:
with open(self.outfile, 'a') as writer:

View File

@ -22,7 +22,7 @@ VERSION = "2.0.34"
if not os.path.exists(config['config_dir']):
shutil.rmtree(config['config_dir'], ignore_errors=True, onerror=None)
os.makedirs(config['config_dir'], exist_ok=True)
open(os.path.join(config['config_dir'], 'VERSION-' + VERSION), 'a').close()
open(os.path.join(config['config_dir'], f'VERSION-{VERSION}'), 'a').close()
shutil.copy(os.path.join(os.path.dirname(os.path.realpath(__file__)), 'config.toml'), os.path.join(config['config_dir'], 'config.toml'))
shutil.copy(os.path.join(os.path.dirname(os.path.realpath(__file__)), 'global.toml'), os.path.join(config['config_dir'], 'global.toml'))
else:
@ -30,14 +30,16 @@ else:
shutil.copy(os.path.join(os.path.dirname(os.path.realpath(__file__)), 'config.toml'), os.path.join(config['config_dir'], 'config.toml'))
if not os.path.exists(os.path.join(config['config_dir'], 'global.toml')):
shutil.copy(os.path.join(os.path.dirname(os.path.realpath(__file__)), 'global.toml'), os.path.join(config['config_dir'], 'global.toml'))
if not os.path.exists(os.path.join(config['config_dir'], 'VERSION-' + VERSION)):
if not os.path.exists(
os.path.join(config['config_dir'], f'VERSION-{VERSION}')
):
warn('It looks like the config in ' + config['config_dir'] + ' is outdated. Please remove the ' + config['config_dir'] + ' directory and re-run AutoRecon to rebuild it.')
if not os.path.exists(config['data_dir']):
shutil.rmtree(config['data_dir'], ignore_errors=True, onerror=None)
os.makedirs(config['data_dir'], exist_ok=True)
open(os.path.join(config['data_dir'], 'VERSION-' + VERSION), 'a').close()
open(os.path.join(config['data_dir'], f'VERSION-{VERSION}'), 'a').close()
shutil.copytree(os.path.join(os.path.dirname(os.path.realpath(__file__)), 'default-plugins'), os.path.join(config['data_dir'], 'plugins'))
shutil.copytree(os.path.join(os.path.dirname(os.path.realpath(__file__)), 'wordlists'), os.path.join(config['data_dir'], 'wordlists'))
else:
@ -45,7 +47,7 @@ else:
shutil.copytree(os.path.join(os.path.dirname(os.path.realpath(__file__)), 'default-plugins'), os.path.join(config['data_dir'], 'plugins'))
if not os.path.exists(os.path.join(config['data_dir'], 'wordlists')):
shutil.copytree(os.path.join(os.path.dirname(os.path.realpath(__file__)), 'wordlists'), os.path.join(config['data_dir'], 'wordlists'))
if not os.path.exists(os.path.join(config['data_dir'], 'VERSION-' + VERSION)):
if not os.path.exists(os.path.join(config['data_dir'], f'VERSION-{VERSION}')):
warn('It looks like the plugins in ' + config['data_dir'] + ' are outdated. Please remove the ' + config['data_dir'] + ' directory and re-run AutoRecon to rebuild them.')
@ -63,34 +65,28 @@ def calculate_elapsed_time(start_time, short=False):
elapsed_time = []
if short:
elapsed_time.append(str(h).zfill(2))
else:
if h == 1:
elapsed_time.append(str(h) + ' hour')
elif h > 1:
elapsed_time.append(str(h) + ' hours')
elif h == 1:
elapsed_time.append(f'{str(h)} hour')
elif h > 1:
elapsed_time.append(f'{str(h)} hours')
if short:
elapsed_time.append(str(m).zfill(2))
else:
if m == 1:
elapsed_time.append(str(m) + ' minute')
elif m > 1:
elapsed_time.append(str(m) + ' minutes')
elif m == 1:
elapsed_time.append(f'{str(m)} minute')
elif m > 1:
elapsed_time.append(f'{str(m)} minutes')
if short:
elapsed_time.append(str(s).zfill(2))
elif s == 1:
elapsed_time.append(f'{str(s)} second')
elif s > 1:
elapsed_time.append(f'{str(s)} seconds')
else:
if s == 1:
elapsed_time.append(str(s) + ' second')
elif s > 1:
elapsed_time.append(str(s) + ' seconds')
else:
elapsed_time.append('less than a second')
elapsed_time.append('less than a second')
if short:
return ':'.join(elapsed_time)
else:
return ', '.join(elapsed_time)
return ':'.join(elapsed_time) if short else ', '.join(elapsed_time)
# sig and frame args are only present so the function
# works with signal.signal() and handles Ctrl-C.
@ -143,8 +139,12 @@ async def start_heartbeat(target, period=60):
if process_dict['process'].returncode is None:
processes.append(str(process_dict['process'].pid))
try:
for child in psutil.Process(process_dict['process'].pid).children(recursive=True):
processes.append(str(child.pid))
processes.extend(
str(child.pid)
for child in psutil.Process(process_dict['process'].pid).children(
recursive=True
)
)
except psutil.NoSuchProcess:
pass
@ -183,9 +183,8 @@ async def keyboard():
else:
config['verbose'] -= 1
info('Verbosity decreased to ' + str(config['verbose']))
else:
if input[0] != 's':
input = input[1:]
elif input[0] != 's':
input = input[1:]
if len(input) > 0 and input[0] == 's':
input = input[1:]
@ -206,8 +205,12 @@ async def keyboard():
if process_dict['process'].returncode is None:
processes.append(str(process_dict['process'].pid))
try:
for child in psutil.Process(process_dict['process'].pid).children(recursive=True):
processes.append(str(child.pid))
processes.extend(
str(child.pid)
for child in psutil.Process(
process_dict['process'].pid
).children(recursive=True)
)
except psutil.NoSuchProcess:
pass
@ -233,35 +236,31 @@ async def keyboard():
async def get_semaphore(autorecon):
semaphore = autorecon.service_scan_semaphore
while True:
# If service scan semaphore is locked, see if we can use port scan semaphore.
if semaphore.locked():
if semaphore != autorecon.port_scan_semaphore: # This will be true unless user sets max_scans == max_port_scans
port_scan_task_count = 0
for target in autorecon.scanning_targets:
for process_list in target.running_tasks.values():
if issubclass(process_list['plugin'].__class__, PortScan):
port_scan_task_count += 1
if not autorecon.pending_targets and (config['max_port_scans'] - port_scan_task_count) >= 1: # If no more targets, and we have room, use port scan semaphore.
if autorecon.port_scan_semaphore.locked():
await asyncio.sleep(1)
continue
semaphore = autorecon.port_scan_semaphore
break
else: # Do some math to see if we can use the port scan semaphore.
if (config['max_port_scans'] - (port_scan_task_count + (len(autorecon.pending_targets) * config['port_scan_plugin_count']))) >= 1:
if autorecon.port_scan_semaphore.locked():
await asyncio.sleep(1)
continue
semaphore = autorecon.port_scan_semaphore
break
else:
await asyncio.sleep(1)
else:
break
else:
if not semaphore.locked():
break
if semaphore == autorecon.port_scan_semaphore:
break
port_scan_task_count = 0
for target in autorecon.scanning_targets:
for process_list in target.running_tasks.values():
if issubclass(process_list['plugin'].__class__, PortScan):
port_scan_task_count += 1
if not autorecon.pending_targets and (config['max_port_scans'] - port_scan_task_count) >= 1: # If no more targets, and we have room, use port scan semaphore.
if autorecon.port_scan_semaphore.locked():
await asyncio.sleep(1)
continue
semaphore = autorecon.port_scan_semaphore
break
else: # Do some math to see if we can use the port scan semaphore.
if (config['max_port_scans'] - (port_scan_task_count + (len(autorecon.pending_targets) * config['port_scan_plugin_count']))) >= 1:
if autorecon.port_scan_semaphore.locked():
await asyncio.sleep(1)
continue
semaphore = autorecon.port_scan_semaphore
break
else:
await asyncio.sleep(1)
return semaphore
async def port_scan(plugin, target):
@ -314,7 +313,11 @@ async def port_scan(plugin, target):
error('Port scan {bblue}' + plugin.name + ' {green}(' + plugin.slug + '){rst} ran a command against {byellow}' + target.address + '{rst} which returned a non-zero exit code (' + str(process_dict['process'].returncode) + '). Check ' + target.scandir + '/_errors.log for more details.', verbosity=2)
async with target.lock:
with open(os.path.join(target.scandir, '_errors.log'), 'a') as file:
file.writelines('[*] Port scan ' + plugin.name + ' (' + plugin.slug + ') ran a command which returned a non-zero exit code (' + str(process_dict['process'].returncode) + ').\n')
file.writelines(
f'[*] Port scan {plugin.name} ({plugin.slug}) ran a command which returned a non-zero exit code ('
+ str(process_dict['process'].returncode)
+ ').\n'
)
file.writelines('[-] Command: ' + process_dict['cmd'] + '\n')
if errors:
file.writelines(['[-] Error Output:\n'] + errors + ['\n'])

View File

@ -45,7 +45,12 @@ class Plugin(object):
@final
def add_choice_option(self, name, choices, default=None, help=None):
if not isinstance(choices, list):
fail('The choices argument for ' + self.name + '\'s ' + name + ' choice option should be a list.')
fail(
f'The choices argument for {self.name}'
+ '\'s '
+ name
+ ' choice option should be a list.'
)
self.autorecon.add_argument(self, name, choices=choices, default=default, help=help)
@final
@ -53,35 +58,23 @@ class Plugin(object):
# TODO: make sure name is simple.
name = self.slug.replace('-', '_') + '.' + slugify(name).replace('-', '_')
if name in vars(self.autorecon.args):
if vars(self.autorecon.args)[name] is None:
if default:
return default
else:
return None
else:
return vars(self.autorecon.args)[name]
if name not in vars(self.autorecon.args):
return default if default else None
if vars(self.autorecon.args)[name] is None:
return default if default else None
else:
if default:
return default
return None
return vars(self.autorecon.args)[name]
@final
def get_global_option(self, name, default=None):
name = 'global.' + slugify(name).replace('-', '_')
if name in vars(self.autorecon.args):
if vars(self.autorecon.args)[name] is None:
if default:
return default
else:
return None
else:
return vars(self.autorecon.args)[name]
if name not in vars(self.autorecon.args):
return default if default else None
if vars(self.autorecon.args)[name] is None:
return default if default else None
else:
if default:
return default
return None
return vars(self.autorecon.args)[name]
@final
def get_global(self, name, default=None):
@ -96,7 +89,9 @@ class Plugin(object):
else:
self.patterns.append(Pattern(compiled))
except re.error:
fail('Error: The pattern "' + pattern + '" in the plugin "' + self.name + '" is invalid regex.')
fail(
f'Error: The pattern "{pattern}" in the plugin "{self.name}" is invalid regex.'
)
@final
def info(self, msg, verbosity=0):
@ -154,7 +149,7 @@ class ServiceScan(Plugin):
try:
re.compile(r)
except re.error:
print('Invalid regex: ' + r)
print(f'Invalid regex: {r}')
valid_regex = False
if not valid_regex:
@ -190,7 +185,7 @@ class ServiceScan(Plugin):
try:
re.compile(r)
except re.error:
print('Invalid regex: ' + r)
print(f'Invalid regex: {r}')
valid_regex = False
if valid_regex:
@ -246,7 +241,7 @@ class AutoRecon(object):
def add_argument(self, plugin, name, **kwargs):
# TODO: make sure name is simple.
name = '--' + plugin.slug + '.' + slugify(name)
name = f'--{plugin.slug}.{slugify(name)}'
if self.argparse_group is None:
self.argparse_group = self.argparse.add_argument_group('plugin arguments', description='These are optional arguments for certain plugins.')
@ -255,19 +250,17 @@ class AutoRecon(object):
def extract_service(self, line, regex):
if regex is None:
regex = '^(?P<port>\d+)\/(?P<protocol>(tcp|udp))(.*)open(\s*)(?P<service>[\w\-\/]+)(\s*)(.*)$'
match = re.search(regex, line)
if match:
protocol = match.group('protocol').lower()
port = int(match.group('port'))
service = match.group('service')
secure = True if 'ssl' in service or 'tls' in service else False
if service.startswith('ssl/') or service.startswith('tls/'):
service = service[4:]
return Service(protocol, port, service, secure)
else:
if not (match := re.search(regex, line)):
return None
protocol = match.group('protocol').lower()
port = int(match.group('port'))
service = match.group('service')
secure = 'ssl' in service or 'tls' in service
if service.startswith('ssl/') or service.startswith('tls/'):
service = service[4:]
return Service(protocol, port, service, secure)
async def extract_services(self, stream, regex):
if not isinstance(stream, CommandStreamReader):
@ -278,8 +271,7 @@ class AutoRecon(object):
while True:
line = await stream.readline()
if line is not None:
service = self.extract_service(line, regex)
if service:
if service := self.extract_service(line, regex):
services.append(service)
else:
break
@ -290,25 +282,38 @@ class AutoRecon(object):
return
if plugin.name is None:
fail('Error: Plugin with class name "' + plugin.__class__.__name__ + '" in ' + filename + ' does not have a name.')
fail(
f'Error: Plugin with class name "{plugin.__class__.__name__}" in {filename} does not have a name.'
)
for _, loaded_plugin in self.plugins.items():
if plugin.name == loaded_plugin.name:
fail('Error: Duplicate plugin name "' + plugin.name + '" detected in ' + filename + '.', file=sys.stderr)
fail(
f'Error: Duplicate plugin name "{plugin.name}" detected in {filename}.',
file=sys.stderr,
)
if plugin.slug is None:
plugin.slug = slugify(plugin.name)
elif not self.__slug_regex.match(plugin.slug):
fail('Error: provided slug "' + plugin.slug + '" in ' + filename + ' is not valid (must only contain lowercase letters, numbers, and hyphens).', file=sys.stderr)
fail(
f'Error: provided slug "{plugin.slug}" in {filename} is not valid (must only contain lowercase letters, numbers, and hyphens).',
file=sys.stderr,
)
if plugin.slug in config['protected_classes']:
fail('Error: plugin slug "' + plugin.slug + '" in ' + filename + ' is a protected string. Please change.')
fail(
f'Error: plugin slug "{plugin.slug}" in {filename} is a protected string. Please change.'
)
if plugin.slug not in self.plugins:
for _, loaded_plugin in self.plugins.items():
if plugin is loaded_plugin:
fail('Error: plugin "' + plugin.name + '" in ' + filename + ' already loaded as "' + loaded_plugin.name + '" (' + str(loaded_plugin) + ')', file=sys.stderr)
fail(
f'Error: plugin "{plugin.name}" in {filename} already loaded as "{loaded_plugin.name}" ({str(loaded_plugin)})',
file=sys.stderr,
)
configure_function_found = False
run_coroutine_found = False
@ -319,30 +324,46 @@ class AutoRecon(object):
configure_function_found = True
elif member_name == 'run' and inspect.iscoroutinefunction(member_value):
if len(inspect.getfullargspec(member_value).args) != 2:
fail('Error: the "run" coroutine in the plugin "' + plugin.name + '" in ' + filename + ' should have two arguments.', file=sys.stderr)
fail(
f'Error: the "run" coroutine in the plugin "{plugin.name}" in {filename} should have two arguments.',
file=sys.stderr,
)
run_coroutine_found = True
elif member_name == 'manual':
if len(inspect.getfullargspec(member_value).args) != 3:
fail('Error: the "manual" function in the plugin "' + plugin.name + '" in ' + filename + ' should have three arguments.', file=sys.stderr)
fail(
f'Error: the "manual" function in the plugin "{plugin.name}" in {filename} should have three arguments.',
file=sys.stderr,
)
manual_function_found = True
if not run_coroutine_found and not manual_function_found:
fail('Error: the plugin "' + plugin.name + '" in ' + filename + ' needs either a "manual" function, a "run" coroutine, or both.', file=sys.stderr)
fail(
f'Error: the plugin "{plugin.name}" in {filename} needs either a "manual" function, a "run" coroutine, or both.',
file=sys.stderr,
)
if issubclass(plugin.__class__, PortScan):
if plugin.type is None:
fail('Error: the PortScan plugin "' + plugin.name + '" in ' + filename + ' requires a type (either tcp or udp).')
fail(
f'Error: the PortScan plugin "{plugin.name}" in {filename} requires a type (either tcp or udp).'
)
else:
plugin.type = plugin.type.lower()
if plugin.type not in ['tcp', 'udp']:
fail('Error: the PortScan plugin "' + plugin.name + '" in ' + filename + ' has an invalid type (should be tcp or udp).')
fail(
f'Error: the PortScan plugin "{plugin.name}" in {filename} has an invalid type (should be tcp or udp).'
)
self.plugin_types["port"].append(plugin)
elif issubclass(plugin.__class__, ServiceScan):
self.plugin_types["service"].append(plugin)
elif issubclass(plugin.__class__, Report):
self.plugin_types["report"].append(plugin)
else:
fail('Plugin "' + plugin.name + '" in ' + filename + ' is neither a PortScan, ServiceScan, nor a Report.', file=sys.stderr)
fail(
f'Plugin "{plugin.name}" in {filename} is neither a PortScan, ServiceScan, nor a Report.',
file=sys.stderr,
)
plugin.tags = [tag.lower() for tag in plugin.tags]
@ -354,14 +375,13 @@ class AutoRecon(object):
plugin.configure()
self.plugins[plugin.slug] = plugin
else:
fail('Error: plugin slug "' + plugin.slug + '" in ' + filename + ' is already assigned.', file=sys.stderr)
fail(
f'Error: plugin slug "{plugin.slug}" in {filename} is already assigned.',
file=sys.stderr,
)
async def execute(self, cmd, target, tag, patterns=None, outfile=None, errfile=None):
if patterns:
combined_patterns = self.patterns + patterns
else:
combined_patterns = self.patterns
combined_patterns = self.patterns + patterns if patterns else self.patterns
process = await asyncio.create_subprocess_shell(
cmd,
stdin=open('/dev/null'),

View File

@ -58,13 +58,13 @@ class Target:
nmap_extra = target.autorecon.args.nmap
if target.autorecon.args.nmap_append:
nmap_extra += ' ' + target.autorecon.args.nmap_append
nmap_extra += f' {target.autorecon.args.nmap_append}'
if target.ipversion == 'IPv6':
nmap_extra += ' -6'
if addressv6 == target.ip:
addressv6 = '[' + addressv6 + ']'
ipaddressv6 = '[' + ipaddressv6 + ']'
addressv6 = f'[{addressv6}]'
ipaddressv6 = f'[{ipaddressv6}]'
plugin = inspect.currentframe().f_back.f_locals['self']
@ -115,11 +115,13 @@ class Service:
@final
def tag(self):
return self.protocol + '/' + str(self.port) + '/' + self.name
return f'{self.protocol}/{str(self.port)}/{self.name}'
@final
def full_tag(self):
return self.protocol + '/' + str(self.port) + '/' + self.name + '/' + ('secure' if self.secure else 'insecure')
return f'{self.protocol}/{str(self.port)}/{self.name}/' + (
'secure' if self.secure else 'insecure'
)
@final
def add_manual_commands(self, description, commands):
@ -174,7 +176,7 @@ class Service:
nmap_extra = target.autorecon.args.nmap
if target.autorecon.args.nmap_append:
nmap_extra += ' ' + target.autorecon.args.nmap_append
nmap_extra += f' {target.autorecon.args.nmap_append}'
if protocol == 'udp':
nmap_extra += ' -sU'
@ -182,15 +184,15 @@ class Service:
if target.ipversion == 'IPv6':
nmap_extra += ' -6'
if addressv6 == target.ip:
addressv6 = '[' + addressv6 + ']'
ipaddressv6 = '[' + ipaddressv6 + ']'
addressv6 = f'[{addressv6}]'
ipaddressv6 = f'[{ipaddressv6}]'
if config['proxychains'] and protocol == 'tcp':
nmap_extra += ' -sT'
plugin = inspect.currentframe().f_back.f_locals['self']
cmd = e(cmd)
tag = self.tag() + '/' + plugin.slug
tag = f'{self.tag()}/{plugin.slug}'
plugin_tag = tag
if plugin.run_once_boolean:
plugin_tag = plugin.slug