From bc841b7a3ca07c0303bf5ecaf08ea17f752d81e3 Mon Sep 17 00:00:00 2001 From: Shaheer Yasir <165695923+shaheeryasirofficial@users.noreply.github.com> Date: Wed, 1 Oct 2025 18:57:39 +0500 Subject: [PATCH] Update sublist3r.py --- sublist3r.py | 563 ++------------------------------------------------- 1 file changed, 12 insertions(+), 551 deletions(-) diff --git a/sublist3r.py b/sublist3r.py index 8acabda..c870403 100755 --- a/sublist3r.py +++ b/sublist3r.py @@ -17,7 +17,7 @@ import threading import socket import json from collections import Counter -from urllib.parse import urlparse +from urllib.parse import urlparse, unquote # Fixed for Python 3 # external modules from subbrute import subbrute @@ -272,15 +272,15 @@ class YahooEnum(EnumeratorBaseThreaded): super(YahooEnum, self).__init__(base_url, self.engine_name, domain, subdomains, q=q, silent=silent, verbose=verbose) def extract_domains(self, resp): - link_regx2 = re.compile('(.*?)') - link_regx = re.compile('(.*?)') + link_regx2 = re.compile(r'(.*?)') + link_regx = re.compile(r'(.*?)') links_list = [] try: links = link_regx.findall(resp) links2 = link_regx2.findall(resp) links_list = links + links2 for link in links_list: - link = re.sub("<(\/)?b>", "", link) + link = re.sub(r"<(\/)?b>", "", link) # Fixed raw string if not link.startswith('http'): link = "http://" + link subdomain = urlparse(link).netloc @@ -321,7 +321,7 @@ class AskEnum(EnumeratorBaseThreaded): def extract_domains(self, resp): links_list = [] - link_regx = re.compile('

(.*?)

') + link_regx = re.compile(r'

(.*?)

') try: links_list = link_regx.findall(resp) for link in links_list: @@ -360,14 +360,14 @@ class BingEnum(EnumeratorBaseThreaded): def extract_domains(self, resp): links_list = [] - link_regx = re.compile('
  • ||<|>', '', link) + link = re.sub(r'<(\/)?strong>||<|>', '', link) # Fixed raw string if not link.startswith('http'): link = "http://" + link subdomain = urlparse(link).netloc @@ -403,11 +403,11 @@ class BaiduEnum(EnumeratorBaseThreaded): links = [] found_newdomain = False subdomain_list = [] - link_regx = re.compile('(.*?)') + link_regx = re.compile(r'(.*?)') try: links = link_regx.findall(resp) for link in links: - link = re.sub('<.*?>|>|<| ', '', link) + link = re.sub(r'<.*?>|>|<| ', '', link) # Fixed raw string if not link.startswith('http'): link = "http://" + link subdomain = urlparse(link).netloc @@ -446,7 +446,7 @@ class BaiduEnum(EnumeratorBaseThreaded): query = "site:{domain} -site:www.{domain}".format(domain=self.domain) return query -# NetcraftEnum (from original) +# NetcraftEnum (from original, fixed urllib.unquote) class NetcraftEnum(EnumeratorBaseThreaded): def __init__(self, domain, subdomains=None, q=None, silent=False, verbose=True): subdomains = subdomains or [] @@ -457,543 +457,4 @@ class NetcraftEnum(EnumeratorBaseThreaded): def req(self, url, cookies=None): cookies = cookies or {} try: - resp = self.session.get(url, headers=self.headers, timeout=self.timeout, cookies=cookies) - except Exception as e: - self.print_(e) - resp = None - return resp - - def should_sleep(self): - time.sleep(random.randint(1, 2)) - return - - def get_next(self, resp): - link_regx = re.compile('Next Page') - link = link_regx.findall(resp) - url = 'http://searchdns.netcraft.com' + link[0] - return url - - def create_cookies(self, cookie): - cookies = dict() - cookies_list = cookie[0:cookie.find(';')].split("=") - cookies[cookies_list[0]] = cookies_list[1] - # hashlib.sha1 requires utf-8 encoded str - cookies['netcraft_js_verification_response'] = hashlib.sha1(urllib.unquote(cookies_list[1]).encode('utf-8')).hexdigest() - return cookies - - def get_cookies(self, headers): - if 'set-cookie' in headers: - cookies = self.create_cookies(headers['set-cookie']) - else: - cookies = {} - return cookies - - def enumerate(self): - start_url = self.base_url.format(domain='example.com') - resp = self.req(start_url) - cookies = self.get_cookies(resp.headers) - url = self.base_url.format(domain=self.domain) - while True: - resp = self.get_response(self.req(url, cookies)) - self.extract_domains(resp) - if 'Next Page' not in resp: - break - url = self.get_next(resp) - self.should_sleep() - return self.subdomains - - def extract_domains(self, resp): - links_list = [] - link_regx = re.compile('', re.S) - token = csrf_regex.findall(resp)[0] - return token.strip() - - def enumerate(self): - self.lock = threading.BoundedSemaphore(value=70) - resp = self.req('GET', self.base_url) - token = self.get_csrftoken(resp) - params = {'csrfmiddlewaretoken': token, 'targetip': self.domain} - post_resp = self.req('POST', self.base_url, params) - self.extract_domains(post_resp) - for subdomain in self.subdomains: - t = threading.Thread(target=self.check_host, args=(subdomain,)) - t.start() - t.join() - return self.live_subdomains - - def extract_domains(self, resp): - tbl_regex = re.compile('<\/a>Host Records.*?(.*?)', re.S) - link_regex = re.compile('(.*?)
    ', re.S) - links = [] - try: - results_tbl = tbl_regex.findall(resp)[0] - except IndexError: - results_tbl = '' - links_list = link_regex.findall(results_tbl) - links = list(set(links_list)) - for link in links: - subdomain = link.strip() - if not subdomain.endswith(self.domain): - continue - if subdomain and subdomain not in self.subdomains and subdomain != self.domain: - self.subdomains.append(subdomain.strip()) - return links - -# Virustotal (updated to v3) -class Virustotal(EnumeratorBaseThreaded): - def __init__(self, domain, subdomains=None, q=None, silent=False, verbose=True): - subdomains = subdomains or [] - self.engine_name = "VirusTotal" - self.q = q - super(Virustotal, self).__init__('', self.engine_name, domain, subdomains, q=q, silent=silent, verbose=verbose) - self.url = f"https://www.virustotal.com/api/v3/domains/{self.domain}/relationships/subdomains" - api_key = os.getenv('VT_API_KEY') - if api_key: - self.headers['x-apikey'] = api_key - else: - self.print_(Y + "[!] No VT_API_KEY set, using public API (rate limited)" + W) - - def send_req(self, url): - try: - resp = self.session.get(url, headers=self.headers, timeout=self.timeout) - resp.raise_for_status() - except Exception as e: - self.print_(f"{R}[!] VT Error: {e}{W}") - resp = None - return self.get_response(resp) - - def enumerate(self): - while self.url: - resp = self.send_req(self.url) - if not resp: - break - try: - data = json.loads(resp) - if 'error' in data: - self.print_(R + f"[!] VT Error: {data['error']['message']}" + W) - break - if 'data' in data: - for item in data['data']: - subdomain = item['id'] - if subdomain.endswith(self.domain) and subdomain not in self.subdomains and subdomain != self.domain: - if self.verbose: - self.print_(f"{R}{self.engine_name}: {W}{subdomain}") - self.subdomains.append(subdomain) - if 'links' in data and 'next' in data['links']: - self.url = data['links']['next'] - else: - self.url = None - except json.JSONDecodeError: - break - time.sleep(15) # Rate limit - return self.subdomains - - def extract_domains(self, resp): - pass - -# ThreatCrowd (from original, note: ThreatCrowd is deprecated, but keeping for compatibility) -class ThreatCrowd(EnumeratorBaseThreaded): - def __init__(self, domain, subdomains=None, q=None, silent=False, verbose=True): - subdomains = subdomains or [] - base_url = 'https://www.threatcrowd.org/searchApi/v2/domain/report/?domain={domain}' - self.engine_name = "ThreatCrowd" - self.q = q - super(ThreatCrowd, self).__init__(base_url, self.engine_name, domain, subdomains, q=q, silent=silent, verbose=verbose) - - def req(self, url): - try: - resp = self.session.get(url, headers=self.headers, timeout=self.timeout) - except Exception: - resp = None - return self.get_response(resp) - - def enumerate(self): - url = self.base_url.format(domain=self.domain) - resp = self.req(url) - self.extract_domains(resp) - return self.subdomains - - def extract_domains(self, resp): - try: - links = json.loads(resp)['subdomains'] - for link in links: - subdomain = link.strip() - if not subdomain.endswith(self.domain): - continue - if subdomain not in self.subdomains and subdomain != self.domain: - if self.verbose: - self.print_(f"{R}{self.engine_name}: {W}{subdomain}") - self.subdomains.append(subdomain.strip()) - except Exception as e: - pass - -# CrtSearch (from original) -class CrtSearch(EnumeratorBaseThreaded): - def __init__(self, domain, subdomains=None, q=None, silent=False, verbose=True): - subdomains = subdomains or [] - base_url = 'https://crt.sh/?q=%25.{domain}' - self.engine_name = "SSL Certificates" - self.q = q - super(CrtSearch, self).__init__(base_url, self.engine_name, domain, subdomains, q=q, silent=silent, verbose=verbose) - - def req(self, url): - try: - resp = self.session.get(url, headers=self.headers, timeout=self.timeout) - except Exception: - resp = None - return self.get_response(resp) - - def enumerate(self): - url = self.base_url.format(domain=self.domain) - resp = self.req(url) - if resp: - self.extract_domains(resp) - return self.subdomains - - def extract_domains(self, resp): - link_regx = re.compile('(.*?)') - try: - links = link_regx.findall(resp) - for link in links: - link = link.strip() - subdomains = [] - if '
    ' in link: - subdomains = link.split('
    ') - else: - subdomains.append(link) - for subdomain in subdomains: - if not subdomain.endswith(self.domain) or '*' in subdomain: - continue - if '@' in subdomain: - subdomain = subdomain[subdomain.find('@')+1:] - if subdomain not in self.subdomains and subdomain != self.domain: - if self.verbose: - self.print_(f"{R}{self.engine_name}: {W}{subdomain}") - self.subdomains.append(subdomain.strip()) - except Exception as e: - pass - -# PassiveDNS (from original) -class PassiveDNS(EnumeratorBaseThreaded): - def __init__(self, domain, subdomains=None, q=None, silent=False, verbose=True): - subdomains = subdomains or [] - base_url = 'https://api.sublist3r.com/search.php?domain={domain}' - self.engine_name = "PassiveDNS" - self.q = q - super(PassiveDNS, self).__init__(base_url, self.engine_name, domain, subdomains, q=q, silent=silent, verbose=verbose) - - def req(self, url): - try: - resp = self.session.get(url, headers=self.headers, timeout=self.timeout) - except Exception as e: - resp = None - return self.get_response(resp) - - def enumerate(self): - url = self.base_url.format(domain=self.domain) - resp = self.req(url) - if not resp: - return self.subdomains - self.extract_domains(resp) - return self.subdomains - - def extract_domains(self, resp): - try: - subdomains = json.loads(resp) - for subdomain in subdomains: - if subdomain not in self.subdomains and subdomain != self.domain: - if self.verbose: - self.print_(f"{R}{self.engine_name}: {W}{subdomain}") - self.subdomains.append(subdomain.strip()) - except Exception as e: - pass - -# BufferOverRunEnum (new in v2, kept) -class BufferOverRunEnum(EnumeratorBaseThreaded): - def __init__(self, domain, subdomains=None, q=None, silent=False, verbose=True): - subdomains = subdomains or [] - self.engine_name = "BufferOverRun" - self.q = q - super(BufferOverRunEnum, self).__init__('', self.engine_name, domain, subdomains, q=q, silent=silent, verbose=verbose) - self.url = f"https://dns.bufferover.run/dns?q=.{self.domain}" - - def send_req(self, url): - try: - resp = self.session.get(url, headers=self.headers, timeout=self.timeout) - resp.raise_for_status() - except Exception: - resp = None - return self.get_response(resp) - - def enumerate(self): - resp = self.send_req(self.url) - if not resp: - return self.subdomains - try: - data = json.loads(resp) - all_dns = data.get('FDNS_A', []) + data.get('FDNS_AAAA', []) - for dns_entry in all_dns: - parts = [p.strip() for p in dns_entry.split(',')] - if len(parts) > 1: - subdomain = parts[1] - if subdomain.endswith(self.domain) and subdomain not in self.subdomains and subdomain != self.domain: - if self.verbose: - self.print_(f"{R}{self.engine_name}: {W}{subdomain}") - self.subdomains.append(subdomain) - except Exception as e: - self.print_(f"{R}[!] BufferOverRun Error: {e}{W}") - return self.subdomains - - def extract_domains(self, resp): - pass - -# New for v3.0: CertSpotter -class CertSpotterEnum(EnumeratorBaseThreaded): - def __init__(self, domain, subdomains=None, q=None, silent=False, verbose=True): - subdomains = subdomains or [] - self.engine_name = "CertSpotter" - self.q = q - super(CertSpotterEnum, self).__init__('', self.engine_name, domain, subdomains, q=q, silent=silent, verbose=verbose) - self.url = f"https://certspotter.com/api/v0/certs?domain={self.domain}&expand=dns_names" - - def send_req(self, url): - try: - resp = self.session.get(url, headers=self.headers, timeout=self.timeout) - resp.raise_for_status() - except Exception: - resp = None - return self.get_response(resp) - - def enumerate(self): - resp = self.send_req(self.url) - if not resp: - return self.subdomains - try: - data = json.loads(resp) - for cert in data: - for dns_name in cert.get('dns_names', []): - if dns_name.endswith(self.domain) and dns_name not in self.subdomains and dns_name != self.domain: - if self.verbose: - self.print_(f"{R}{self.engine_name}: {W}{dns_name}") - self.subdomains.append(dns_name) - except Exception as e: - self.print_(f"{R}[!] CertSpotter Error: {e}{W}") - return self.subdomains - - def extract_domains(self, resp): - pass - -class PortScan: - def __init__(self, subdomains, ports): - self.subdomains = subdomains - self.ports = ports - self.lock = None - - def port_scan(self, host, ports): - openports = [] - self.lock.acquire() - for port in ports: - try: - s = socket.socket(socket.AF_INET, socket.SOCK_STREAM) - s.settimeout(2) - result = s.connect_ex((host, int(port))) - if result == 0: - openports.append(port) - s.close() - except Exception: - pass - self.lock.release() - if openports: - print(f"{G}{host}{W} - {R}Found open ports:{W} {Y}{', '.join(openports)}{W}") - - def run(self): - self.lock = threading.BoundedSemaphore(value=50) - threads = [] - for subdomain in self.subdomains: - t = threading.Thread(target=self.port_scan, args=(subdomain, self.ports)) - t.start() - threads.append(t) - for t in threads: - t.join() - -def main(domain, threads, savefile, ports, silent, verbose, enable_bruteforce, engines, json_output): - bruteforce_list = set() - search_list = set() - - subdomains_queue = multiprocessing.Manager().list() - - if enable_bruteforce or enable_bruteforce is None: - enable_bruteforce = True - - domain_check = re.compile(r"^(http|https)?[a-zA-Z0-9]+([\-\.]{1}[a-zA-Z0-9]+)*\.[a-zA-Z]{2,}$") - if not domain_check.match(domain): - if not silent: - print(R + "Error: Please enter a valid domain" + W) - return [] - - if not domain.startswith(('http://', 'https://')): - domain = 'http://' + domain - - parsed_domain = urlparse(domain).netloc - - if not silent: - print(B + f"[-] Enumerating subdomains now for {parsed_domain}" + W) - - if verbose and not silent: - print(Y + "[-] Verbosity enabled, showing results in realtime" + W) - - supported_engines = { - 'baidu': BaiduEnum, - 'yahoo': YahooEnum, - 'google': GoogleEnum, - 'bing': BingEnum, - 'ask': AskEnum, - 'netcraft': NetcraftEnum, - 'dnsdumpster': DNSdumpster, - 'virustotal': Virustotal, - 'threatcrowd': ThreatCrowd, - 'crt': CrtSearch, - 'passivedns': PassiveDNS, - 'bufferover': BufferOverRunEnum, - 'certspotter': CertSpotterEnum # New in v3.0 - } - - chosen_enums = [] - - if engines is None: - chosen_enums = [ - GoogleEnum, BingEnum, YahooEnum, AskEnum, BaiduEnum, - NetcraftEnum, DNSdumpster, Virustotal, ThreatCrowd, - CrtSearch, BufferOverRunEnum, PassiveDNS, CertSpotterEnum # Added CertSpotter - ] - else: - engines_list = [e.lower().strip() for e in engines.split(',')] - for engine in engines_list: - if engine in supported_engines: - chosen_enums.append(supported_engines[engine]) - - # Start enumeration - enums = [enum_class(domain, [], q=subdomains_queue, silent=silent, verbose=verbose) for enum_class in chosen_enums] - for enum in enums: - enum.start() - for enum in enums: - enum.join() - - subdomains = set(subdomains_queue) - for subdomain in subdomains: - search_list.add(subdomain) - - if enable_bruteforce: - if not silent: - print(G + "[-] Starting bruteforce with subbrute.." + W) - path_to_file = os.path.dirname(os.path.realpath(__file__)) - subs_file = os.path.join(path_to_file, 'subbrute', 'names.txt') - resolvers_file = os.path.join(path_to_file, 'subbrute', 'resolvers.txt') - process_count = threads - output = False - json_out = False - bruteforce_list = subbrute.print_target(parsed_domain, False, subs_file, resolvers_file, process_count, output, json_out, search_list, verbose) - - all_subdomains = search_list.union(bruteforce_list) - - if all_subdomains: - all_subdomains = sorted(all_subdomains, key=subdomain_sorting_key) - - if savefile: - write_file(savefile, all_subdomains, json_output=False) - - if json_output: - json_filename = f"{parsed_domain}.json" - write_file(json_filename, all_subdomains, json_output=True) - - if not silent: - print(Y + f"[-] Total Unique Subdomains Found: {len(all_subdomains)}" + W) - - if not json_output: - for subdomain in all_subdomains: - print(G + subdomain + W) - - if ports: - if not silent: - print(G + f"[-] Starting port scan for ports: {Y}{ports}" + W) - ports_list = ports.split(',') - pscan = PortScan(all_subdomains, ports_list) - pscan.run() - - return list(all_subdomains) - -def interactive(): - args = parse_args() - domain = args.domain - threads = args.threads - savefile = args.output - ports = args.ports - enable_bruteforce = args.bruteforce - verbose = args.verbose or args.verbose is None - engines = args.engines - json_out = args.json - if args.no_color: - no_color() - banner() - main(domain, threads, savefile, ports, silent=False, verbose=verbose, enable_bruteforce=enable_bruteforce, engines=engines, json_output=json_out) - -if __name__ == "__main__": - interactive() + resp = self.session.get(url, headers=self.headers