import asyncio, inspect, os, re, sys from typing import final from autorecon.config import config from autorecon.io import slugify, error, fail, CommandStreamReader from autorecon.targets import Service class Pattern: def __init__(self, pattern, description=None): self.pattern = pattern self.description = description class Plugin(object): def __init__(self): self.name = None self.slug = None self.description = None self.tags = ['default'] self.priority = 1 self.patterns = [] self.autorecon = None self.disabled = False @final def add_option(self, name, default=None, help=None): self.autorecon.add_argument(self, name, metavar='VALUE', default=default, help=help) @final def add_constant_option(self, name, const, default=None, help=None): self.autorecon.add_argument(self, name, action='store_const', const=const, default=default, help=help) @final def add_true_option(self, name, help=None): self.autorecon.add_argument(self, name, action='store_true', help=help) @final def add_false_option(self, name, help=None): self.autorecon.add_argument(self, name, action='store_false', help=help) @final def add_list_option(self, name, default=None, help=None): self.autorecon.add_argument(self, name, nargs='+', metavar='VALUE', default=default, help=help) @final def add_choice_option(self, name, choices, default=None, help=None): if not isinstance(choices, list): fail('The choices argument for ' + self.name + '\'s ' + name + ' choice option should be a list.') self.autorecon.add_argument(self, name, choices=choices, default=default, help=help) @final def get_option(self, name): # TODO: make sure name is simple. name = self.slug.replace('-', '_') + '.' + slugify(name).replace('-', '_') if name in vars(self.autorecon.args): return vars(self.autorecon.args)[name] else: return None @final def get_global_option(self, name, default=None): name = 'global.' + slugify(name).replace('-', '_') if name in vars(self.autorecon.args): if vars(self.autorecon.args)[name] is None: if default: return default else: return None else: return vars(self.autorecon.args)[name] else: if default: return default return None @final def get_global(self, name, default=None): return self.get_global_option(name, default) @final def add_pattern(self, pattern, description=None): try: compiled = re.compile(pattern) if description: self.patterns.append(Pattern(compiled, description=description)) else: self.patterns.append(Pattern(compiled)) except re.error: fail('Error: The pattern "' + pattern + '" in the plugin "' + self.name + '" is invalid regex.') class PortScan(Plugin): def __init__(self): super().__init__() self.type = None self.specific_ports = False async def run(self, target): raise NotImplementedError class ServiceScan(Plugin): def __init__(self): super().__init__() self.ports = {'tcp':[], 'udp':[]} self.ignore_ports = {'tcp':[], 'udp':[]} self.services = [] self.service_names = [] self.ignore_service_names = [] self.run_once_boolean = False self.require_ssl_boolean = False @final def match_service(self, protocol, port, name, negative_match=False): protocol = protocol.lower() if protocol not in ['tcp', 'udp']: print('Invalid protocol.') sys.exit(1) if not isinstance(port, list): port = [port] port = list(map(int, port)) if not isinstance(name, list): name = [name] valid_regex = True for r in name: try: re.compile(r) except re.error: print('Invalid regex: ' + r) valid_regex = False if not valid_regex: sys.exit(1) service = {'protocol': protocol, 'port': port, 'name': name, 'negative_match': negative_match} self.services.append(service) @final def match_port(self, protocol, port, negative_match=False): protocol = protocol.lower() if protocol not in ['tcp', 'udp']: print('Invalid protocol.') sys.exit(1) else: if not isinstance(port, list): port = [port] port = list(map(int, port)) if negative_match: self.ignore_ports[protocol] = list(set(self.ignore_ports[protocol] + port)) else: self.ports[protocol] = list(set(self.ports[protocol] + port)) @final def match_service_name(self, name, negative_match=False): if not isinstance(name, list): name = [name] valid_regex = True for r in name: try: re.compile(r) except re.error: print('Invalid regex: ' + r) valid_regex = False if valid_regex: if negative_match: self.ignore_service_names = list(set(self.ignore_service_names + name)) else: self.service_names = list(set(self.service_names + name)) else: sys.exit(1) @final def require_ssl(self, boolean): self.require_ssl_boolean = boolean @final def run_once(self, boolean): self.run_once_boolean = boolean @final def match_all_service_names(self, boolean): if boolean: # Add a "match all" service name. self.match_service_name('.*') class Report(Plugin): def __init__(self): super().__init__() class AutoRecon(object): def __init__(self): self.pending_targets = [] self.scanning_targets = [] self.completed_targets = [] self.plugins = {} self.__slug_regex = re.compile('^[a-z0-9\-]+$') self.plugin_types = {'port':[], 'service':[], 'report':[]} self.port_scan_semaphore = None self.service_scan_semaphore = None self.argparse = None self.argparse_group = None self.args = None self.missing_services = [] self.taglist = [] self.tags = [] self.excluded_tags = [] self.patterns = [] self.errors = False self.lock = asyncio.Lock() self.load_slug = None self.load_module = None def add_argument(self, plugin, name, **kwargs): # TODO: make sure name is simple. name = '--' + plugin.slug + '.' + slugify(name) if self.argparse_group is None: self.argparse_group = self.argparse.add_argument_group('plugin arguments', description='These are optional arguments for certain plugins.') self.argparse_group.add_argument(name, **kwargs) def extract_service(self, line, regex): if regex is None: regex = '^(?P\d+)\/(?P(tcp|udp))(.*)open(\s*)(?P[\w\-\/]+)(\s*)(.*)$' match = re.search(regex, line) if match: protocol = match.group('protocol').lower() port = int(match.group('port')) service = match.group('service') secure = True if 'ssl' in service or 'tls' in service else False if service.startswith('ssl/') or service.startswith('tls/'): service = service[4:] return Service(protocol, port, service, secure) else: return None async def extract_services(self, stream, regex): if not isinstance(stream, CommandStreamReader): print('Error: extract_services must be passed an instance of a CommandStreamReader.') sys.exit(1) services = [] while True: line = await stream.readline() if line is not None: service = self.extract_service(line, regex) if service: services.append(service) else: break return services def register(self, plugin, filename): if plugin.disabled: return if plugin.name is None: fail('Error: Plugin with class name "' + plugin.__class__.__name__ + '" in ' + filename + ' does not have a name.') for _, loaded_plugin in self.plugins.items(): if plugin.name == loaded_plugin.name: fail('Error: Duplicate plugin name "' + plugin.name + '" detected in ' + filename + '.', file=sys.stderr) if plugin.slug is None: plugin.slug = slugify(plugin.name) elif not self.__slug_regex.match(plugin.slug): fail('Error: provided slug "' + plugin.slug + '" in ' + filename + ' is not valid (must only contain lowercase letters, numbers, and hyphens).', file=sys.stderr) if plugin.slug in config['protected_classes']: fail('Error: plugin slug "' + plugin.slug + '" in ' + filename + ' is a protected string. Please change.') if plugin.slug not in self.plugins: for _, loaded_plugin in self.plugins.items(): if plugin is loaded_plugin: fail('Error: plugin "' + plugin.name + '" in ' + filename + ' already loaded as "' + loaded_plugin.name + '" (' + str(loaded_plugin) + ')', file=sys.stderr) configure_function_found = False run_coroutine_found = False manual_function_found = False for member_name, member_value in inspect.getmembers(plugin, predicate=inspect.ismethod): if member_name == 'configure': configure_function_found = True elif member_name == 'run' and inspect.iscoroutinefunction(member_value): if len(inspect.getfullargspec(member_value).args) != 2: fail('Error: the "run" coroutine in the plugin "' + plugin.name + '" in ' + filename + ' should have two arguments.', file=sys.stderr) run_coroutine_found = True elif member_name == 'manual': if len(inspect.getfullargspec(member_value).args) != 3: fail('Error: the "manual" function in the plugin "' + plugin.name + '" in ' + filename + ' should have three arguments.', file=sys.stderr) manual_function_found = True if not run_coroutine_found and not manual_function_found: fail('Error: the plugin "' + plugin.name + '" in ' + filename + ' needs either a "manual" function, a "run" coroutine, or both.', file=sys.stderr) if issubclass(plugin.__class__, PortScan): if plugin.type is None: fail('Error: the PortScan plugin "' + plugin.name + '" in ' + filename + ' requires a type (either tcp or udp).') else: plugin.type = plugin.type.lower() if plugin.type not in ['tcp', 'udp']: fail('Error: the PortScan plugin "' + plugin.name + '" in ' + filename + ' has an invalid type (should be tcp or udp).') self.plugin_types["port"].append(plugin) elif issubclass(plugin.__class__, ServiceScan): self.plugin_types["service"].append(plugin) elif issubclass(plugin.__class__, Report): self.plugin_types["report"].append(plugin) else: fail('Plugin "' + plugin.name + '" in ' + filename + ' is neither a PortScan, ServiceScan, nor a Report.', file=sys.stderr) plugin.tags = [tag.lower() for tag in plugin.tags] # Add plugin tags to tag list. [self.taglist.append(t) for t in plugin.tags if t not in self.tags] plugin.autorecon = self if configure_function_found: plugin.configure() self.plugins[plugin.slug] = plugin else: fail('Error: plugin slug "' + plugin.slug + '" in ' + filename + ' is already assigned.', file=sys.stderr) async def execute(self, cmd, target, tag, patterns=None, outfile=None, errfile=None): if patterns: combined_patterns = self.patterns + patterns else: combined_patterns = self.patterns process = await asyncio.create_subprocess_shell( cmd, stdin=open('/dev/null'), stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE) cout = CommandStreamReader(process.stdout, target, tag, patterns=combined_patterns, outfile=outfile) cerr = CommandStreamReader(process.stderr, target, tag, patterns=combined_patterns, outfile=errfile) asyncio.create_task(cout._read()) asyncio.create_task(cerr._read()) return process, cout, cerr