diff --git a/sublist3r.py b/sublist3r.py index c870403..e5caf43 100755 --- a/sublist3r.py +++ b/sublist3r.py @@ -457,4 +457,543 @@ class NetcraftEnum(EnumeratorBaseThreaded): def req(self, url, cookies=None): cookies = cookies or {} try: - resp = self.session.get(url, headers=self.headers + resp = self.session.get(url, headers=self.headers, timeout=self.timeout, cookies=cookies) + except Exception as e: + self.print_(e) + resp = None + return resp + + def should_sleep(self): + time.sleep(random.randint(1, 2)) + return + + def get_next(self, resp): + link_regx = re.compile(r'Next Page') + link = link_regx.findall(resp) + url = 'http://searchdns.netcraft.com' + link[0] + return url + + def create_cookies(self, cookie): + cookies = dict() + cookies_list = cookie[0:cookie.find(';')].split("=") + cookies[cookies_list[0]] = cookies_list[1] + # Fixed for Python 3: use unquote instead of urllib.unquote + cookies['netcraft_js_verification_response'] = hashlib.sha1(unquote(cookies_list[1]).encode('utf-8')).hexdigest() + return cookies + + def get_cookies(self, headers): + if 'set-cookie' in headers: + cookies = self.create_cookies(headers['set-cookie']) + else: + cookies = {} + return cookies + + def enumerate(self): + start_url = self.base_url.format(domain='example.com') + resp = self.req(start_url) + cookies = self.get_cookies(resp.headers) + url = self.base_url.format(domain=self.domain) + while True: + resp = self.get_response(self.req(url, cookies)) + self.extract_domains(resp) + if 'Next Page' not in resp: + break + url = self.get_next(resp) + self.should_sleep() + return self.subdomains + + def extract_domains(self, resp): + links_list = [] + link_regx = re.compile(r'', re.S) + token = csrf_regex.findall(resp)[0] + return token.strip() + + def enumerate(self): + self.lock = threading.BoundedSemaphore(value=70) + resp = self.req('GET', self.base_url) + token = self.get_csrftoken(resp) + params = {'csrfmiddlewaretoken': token, 'targetip': self.domain} + post_resp = self.req('POST', self.base_url, params) + self.extract_domains(post_resp) + for subdomain in self.subdomains: + t = threading.Thread(target=self.check_host, args=(subdomain,)) + t.start() + t.join() + return self.live_subdomains + + def extract_domains(self, resp): + tbl_regex = re.compile(r'<\/a>Host Records.*?(.*?)', re.S) # Fixed raw string + link_regex = re.compile(r'(.*?)
', re.S) + links = [] + try: + results_tbl = tbl_regex.findall(resp)[0] + except IndexError: + results_tbl = '' + links_list = link_regex.findall(results_tbl) + links = list(set(links_list)) + for link in links: + subdomain = link.strip() + if not subdomain.endswith(self.domain): + continue + if subdomain and subdomain not in self.subdomains and subdomain != self.domain: + self.subdomains.append(subdomain.strip()) + return links + +# Virustotal (updated to v3) +class Virustotal(EnumeratorBaseThreaded): + def __init__(self, domain, subdomains=None, q=None, silent=False, verbose=True): + subdomains = subdomains or [] + self.engine_name = "VirusTotal" + self.q = q + super(Virustotal, self).__init__('', self.engine_name, domain, subdomains, q=q, silent=silent, verbose=verbose) + self.url = f"https://www.virustotal.com/api/v3/domains/{self.domain}/relationships/subdomains" + api_key = os.getenv('VT_API_KEY') + if api_key: + self.headers['x-apikey'] = api_key + else: + self.print_(Y + "[!] No VT_API_KEY set, using public API (rate limited)" + W) + + def send_req(self, url): + try: + resp = self.session.get(url, headers=self.headers, timeout=self.timeout) + resp.raise_for_status() + except Exception as e: + self.print_(f"{R}[!] VT Error: {e}{W}") + resp = None + return self.get_response(resp) + + def enumerate(self): + while self.url: + resp = self.send_req(self.url) + if not resp: + break + try: + data = json.loads(resp) + if 'error' in data: + self.print_(R + f"[!] VT Error: {data['error']['message']}" + W) + break + if 'data' in data: + for item in data['data']: + subdomain = item['id'] + if subdomain.endswith(self.domain) and subdomain not in self.subdomains and subdomain != self.domain: + if self.verbose: + self.print_(f"{R}{self.engine_name}: {W}{subdomain}") + self.subdomains.append(subdomain) + if 'links' in data and 'next' in data['links']: + self.url = data['links']['next'] + else: + self.url = None + except json.JSONDecodeError: + break + time.sleep(15) # Rate limit + return self.subdomains + + def extract_domains(self, resp): + pass + +# ThreatCrowd (from original, note: ThreatCrowd is deprecated, but keeping for compatibility) +class ThreatCrowd(EnumeratorBaseThreaded): + def __init__(self, domain, subdomains=None, q=None, silent=False, verbose=True): + subdomains = subdomains or [] + base_url = 'https://www.threatcrowd.org/searchApi/v2/domain/report/?domain={domain}' + self.engine_name = "ThreatCrowd" + self.q = q + super(ThreatCrowd, self).__init__(base_url, self.engine_name, domain, subdomains, q=q, silent=silent, verbose=verbose) + + def req(self, url): + try: + resp = self.session.get(url, headers=self.headers, timeout=self.timeout) + except Exception: + resp = None + return self.get_response(resp) + + def enumerate(self): + url = self.base_url.format(domain=self.domain) + resp = self.req(url) + self.extract_domains(resp) + return self.subdomains + + def extract_domains(self, resp): + try: + links = json.loads(resp)['subdomains'] + for link in links: + subdomain = link.strip() + if not subdomain.endswith(self.domain): + continue + if subdomain not in self.subdomains and subdomain != self.domain: + if self.verbose: + self.print_(f"{R}{self.engine_name}: {W}{subdomain}") + self.subdomains.append(subdomain.strip()) + except Exception as e: + pass + +# CrtSearch (from original) +class CrtSearch(EnumeratorBaseThreaded): + def __init__(self, domain, subdomains=None, q=None, silent=False, verbose=True): + subdomains = subdomains or [] + base_url = 'https://crt.sh/?q=%25.{domain}' + self.engine_name = "SSL Certificates" + self.q = q + super(CrtSearch, self).__init__(base_url, self.engine_name, domain, subdomains, q=q, silent=silent, verbose=verbose) + + def req(self, url): + try: + resp = self.session.get(url, headers=self.headers, timeout=self.timeout) + except Exception: + resp = None + return self.get_response(resp) + + def enumerate(self): + url = self.base_url.format(domain=self.domain) + resp = self.req(url) + if resp: + self.extract_domains(resp) + return self.subdomains + + def extract_domains(self, resp): + link_regx = re.compile(r'(.*?)') + try: + links = link_regx.findall(resp) + for link in links: + link = link.strip() + subdomains = [] + if '
' in link: + subdomains = link.split('
') + else: + subdomains.append(link) + for subdomain in subdomains: + if not subdomain.endswith(self.domain) or '*' in subdomain: + continue + if '@' in subdomain: + subdomain = subdomain[subdomain.find('@')+1:] + if subdomain not in self.subdomains and subdomain != self.domain: + if self.verbose: + self.print_(f"{R}{self.engine_name}: {W}{subdomain}") + self.subdomains.append(subdomain.strip()) + except Exception as e: + pass + +# PassiveDNS (from original) +class PassiveDNS(EnumeratorBaseThreaded): + def __init__(self, domain, subdomains=None, q=None, silent=False, verbose=True): + subdomains = subdomains or [] + base_url = 'https://api.sublist3r.com/search.php?domain={domain}' + self.engine_name = "PassiveDNS" + self.q = q + super(PassiveDNS, self).__init__(base_url, self.engine_name, domain, subdomains, q=q, silent=silent, verbose=verbose) + + def req(self, url): + try: + resp = self.session.get(url, headers=self.headers, timeout=self.timeout) + except Exception as e: + resp = None + return self.get_response(resp) + + def enumerate(self): + url = self.base_url.format(domain=self.domain) + resp = self.req(url) + if not resp: + return self.subdomains + self.extract_domains(resp) + return self.subdomains + + def extract_domains(self, resp): + try: + subdomains = json.loads(resp) + for subdomain in subdomains: + if subdomain not in self.subdomains and subdomain != self.domain: + if self.verbose: + self.print_(f"{R}{self.engine_name}: {W}{subdomain}") + self.subdomains.append(subdomain.strip()) + except Exception as e: + pass + +# BufferOverRunEnum (new in v2, kept) +class BufferOverRunEnum(EnumeratorBaseThreaded): + def __init__(self, domain, subdomains=None, q=None, silent=False, verbose=True): + subdomains = subdomains or [] + self.engine_name = "BufferOverRun" + self.q = q + super(BufferOverRunEnum, self).__init__('', self.engine_name, domain, subdomains, q=q, silent=silent, verbose=verbose) + self.url = f"https://dns.bufferover.run/dns?q=.{self.domain}" + + def send_req(self, url): + try: + resp = self.session.get(url, headers=self.headers, timeout=self.timeout) + resp.raise_for_status() + except Exception: + resp = None + return self.get_response(resp) + + def enumerate(self): + resp = self.send_req(self.url) + if not resp: + return self.subdomains + try: + data = json.loads(resp) + all_dns = data.get('FDNS_A', []) + data.get('FDNS_AAAA', []) + for dns_entry in all_dns: + parts = [p.strip() for p in dns_entry.split(',')] + if len(parts) > 1: + subdomain = parts[1] + if subdomain.endswith(self.domain) and subdomain not in self.subdomains and subdomain != self.domain: + if self.verbose: + self.print_(f"{R}{self.engine_name}: {W}{subdomain}") + self.subdomains.append(subdomain) + except Exception as e: + self.print_(f"{R}[!] BufferOverRun Error: {e}{W}") + return self.subdomains + + def extract_domains(self, resp): + pass + +# New for v3.0: CertSpotter +class CertSpotterEnum(EnumeratorBaseThreaded): + def __init__(self, domain, subdomains=None, q=None, silent=False, verbose=True): + subdomains = subdomains or [] + self.engine_name = "CertSpotter" + self.q = q + super(CertSpotterEnum, self).__init__('', self.engine_name, domain, subdomains, q=q, silent=silent, verbose=verbose) + self.url = f"https://certspotter.com/api/v0/certs?domain={self.domain}&expand=dns_names" + + def send_req(self, url): + try: + resp = self.session.get(url, headers=self.headers, timeout=self.timeout) + resp.raise_for_status() + except Exception: + resp = None + return self.get_response(resp) + + def enumerate(self): + resp = self.send_req(self.url) + if not resp: + return self.subdomains + try: + data = json.loads(resp) + for cert in data: + for dns_name in cert.get('dns_names', []): + if dns_name.endswith(self.domain) and dns_name not in self.subdomains and dns_name != self.domain: + if self.verbose: + self.print_(f"{R}{self.engine_name}: {W}{dns_name}") + self.subdomains.append(dns_name) + except Exception as e: + self.print_(f"{R}[!] CertSpotter Error: {e}{W}") + return self.subdomains + + def extract_domains(self, resp): + pass + +class PortScan: + def __init__(self, subdomains, ports): + self.subdomains = subdomains + self.ports = ports + self.lock = None + + def port_scan(self, host, ports): + openports = [] + self.lock.acquire() + for port in ports: + try: + s = socket.socket(socket.AF_INET, socket.SOCK_STREAM) + s.settimeout(2) + result = s.connect_ex((host, int(port))) + if result == 0: + openports.append(port) + s.close() + except Exception: + pass + self.lock.release() + if openports: + print(f"{G}{host}{W} - {R}Found open ports:{W} {Y}{', '.join(openports)}{W}") + + def run(self): + self.lock = threading.BoundedSemaphore(value=50) + threads = [] + for subdomain in self.subdomains: + t = threading.Thread(target=self.port_scan, args=(subdomain, self.ports)) + t.start() + threads.append(t) + for t in threads: + t.join() + +def main(domain, threads, savefile, ports, silent, verbose, enable_bruteforce, engines, json_output): + bruteforce_list = set() + search_list = set() + + subdomains_queue = multiprocessing.Manager().list() + + if enable_bruteforce or enable_bruteforce is None: + enable_bruteforce = True + + domain_check = re.compile(r"^(http|https)?[a-zA-Z0-9]+([\-\.]{1}[a-zA-Z0-9]+)*\.[a-zA-Z]{2,}$") + if not domain_check.match(domain): + if not silent: + print(R + "Error: Please enter a valid domain" + W) + return [] + + if not domain.startswith(('http://', 'https://')): + domain = 'http://' + domain + + parsed_domain = urlparse(domain).netloc + + if not silent: + print(B + f"[-] Enumerating subdomains now for {parsed_domain}" + W) + + if verbose and not silent: + print(Y + "[-] Verbosity enabled, showing results in realtime" + W) + + supported_engines = { + 'baidu': BaiduEnum, + 'yahoo': YahooEnum, + 'google': GoogleEnum, + 'bing': BingEnum, + 'ask': AskEnum, + 'netcraft': NetcraftEnum, + 'dnsdumpster': DNSdumpster, + 'virustotal': Virustotal, + 'threatcrowd': ThreatCrowd, + 'crt': CrtSearch, + 'passivedns': PassiveDNS, + 'bufferover': BufferOverRunEnum, + 'certspotter': CertSpotterEnum # New in v3.0 + } + + chosen_enums = [] + + if engines is None: + chosen_enums = [ + GoogleEnum, BingEnum, YahooEnum, AskEnum, BaiduEnum, + NetcraftEnum, DNSdumpster, Virustotal, ThreatCrowd, + CrtSearch, BufferOverRunEnum, PassiveDNS, CertSpotterEnum # Added CertSpotter + ] + else: + engines_list = [e.lower().strip() for e in engines.split(',')] + for engine in engines_list: + if engine in supported_engines: + chosen_enums.append(supported_engines[engine]) + + # Start enumeration + enums = [enum_class(domain, [], q=subdomains_queue, silent=silent, verbose=verbose) for enum_class in chosen_enums] + for enum in enums: + enum.start() + for enum in enums: + enum.join() + + subdomains = set(subdomains_queue) + for subdomain in subdomains: + search_list.add(subdomain) + + if enable_bruteforce: + if not silent: + print(G + "[-] Starting bruteforce with subbrute.." + W) + path_to_file = os.path.dirname(os.path.realpath(__file__)) + subs_file = os.path.join(path_to_file, 'subbrute', 'names.txt') + resolvers_file = os.path.join(path_to_file, 'subbrute', 'resolvers.txt') + process_count = threads + output = False + json_out = False + bruteforce_list = subbrute.print_target(parsed_domain, False, subs_file, resolvers_file, process_count, output, json_out, search_list, verbose) + + all_subdomains = search_list.union(bruteforce_list) + + if all_subdomains: + all_subdomains = sorted(all_subdomains, key=subdomain_sorting_key) + + if savefile: + write_file(savefile, all_subdomains, json_output=False) + + if json_output: + json_filename = f"{parsed_domain}.json" + write_file(json_filename, all_subdomains, json_output=True) + + if not silent: + print(Y + f"[-] Total Unique Subdomains Found: {len(all_subdomains)}" + W) + + if not json_output: + for subdomain in all_subdomains: + print(G + subdomain + W) + + if ports: + if not silent: + print(G + f"[-] Starting port scan for ports: {Y}{ports}" + W) + ports_list = ports.split(',') + pscan = PortScan(all_subdomains, ports_list) + pscan.run() + + return list(all_subdomains) + +def interactive(): + args = parse_args() + domain = args.domain + threads = args.threads + savefile = args.output + ports = args.ports + enable_bruteforce = args.bruteforce + verbose = args.verbose or args.verbose is None + engines = args.engines + json_out = args.json + if args.no_color: + no_color() + banner() + main(domain, threads, savefile, ports, silent=False, verbose=verbose, enable_bruteforce=enable_bruteforce, engines=engines, json_output=json_out) + +if __name__ == "__main__": + interactive()