Sublist3r/turbolist3r.py

1195 lines
44 KiB
Python

#!/usr/bin/env python
# coding: utf-8
# Turbolist3r
# By Carl Pearson - github.com/fleetcaptain
# Based on Sublist3r code created by Ahmed Aboul-Ela - twitter.com/aboul3la
#
# Changes to Turbolist3r from Sublist3r:
# - check subdomain for text "From http://PTRarchive.com: " and remove it (otherwise it ends up in the output and can impede automated analysis with other tools)
# - added functionality to query found subdomains, record answer, and catagorize as A or CNAME record. Speeds up subdomain takeover analysis as CNAME records and the services they point to are collected and displayed
#
# TODO - merge Sublist3r dns requests with dnslib to avoid duplication of dns libraries
#
# modules in standard library
import re
import sys
import os
import argparse
import time
import hashlib
import random
import multiprocessing
import threading
import socket
import time
from collections import Counter
# external modules
# from subbrute import subbrute
import dns.resolver
import requests
# import dnslib, which provides better features compared to dns.resolver for finding subdomains
# for example, a return status of NXDOMAIN causes an exception with dns.resolver, but dnslib allows
# us to easily capture the reply, which could indicate the precsence of subdomain takeover
import dnslib
# Python 2.x and 3.x compatiablity
if sys.version > '3':
import urllib.parse as urlparse
import urllib.parse as urllib
else:
import urlparse
import urllib
# In case you cannot install some of the required development packages
# there's also an option to disable the SSL warning:
try:
import requests.packages.urllib3
requests.packages.urllib3.disable_warnings()
except:
pass
# Check if we are running this on windows platform
is_windows = sys.platform.startswith('win')
global debug
# Console Colors
if is_windows:
# Windows deserve coloring too :D
G = '\033[92m' # green
Y = '\033[93m' # yellow
B = '\033[94m' # blue
R = '\033[91m' # red
W = '\033[0m' # white
try:
import win_unicode_console, colorama
win_unicode_console.enable()
colorama.init()
# Now the unicode will work ^_^
except:
print("[!] Error: Coloring libraries not installed ,no coloring will be used [Check the readme]")
G = Y = B = R = W = G = Y = B = R = W = ''
else:
G = '\033[92m' # green
Y = '\033[93m' # yellow
B = '\033[94m' # blue
R = '\033[91m' # red
W = '\033[0m' # white
def banner():
print("""%s
_____ _ _ _ _ _____
|_ _|_ _ _ __| |__ ____ | (_)___| |_|___ / _ __
| | | | | | `__| _ \/ \| | / __| __| |_ \| '__|
| | | \_| | | | |_) | () || | \__ \ |_ ___) | |
|_| \____|_| |_.__/\____/|_|_|___/\__|____/|_|%s%s
# Based on Sublist3r by Ahmed Aboul-Ela - @aboul3la
# Forked by Carl Pearson - github.com/fleetcaptain
""" % (R, W, Y))
def parser_error(errmsg):
banner()
print("Usage: python " + sys.argv[0] + " [Options] use -h for help")
print(R + "Error: " + errmsg + W)
sys.exit()
def parse_args():
# parse the arguments
parser = argparse.ArgumentParser(epilog='\tExample: \r\npython ' + sys.argv[0] + " -d google.com")
parser.error = parser_error
parser._optionals.title = "OPTIONS"
parser.add_argument('-d', '--domain', help="Domain name to enumerate it's subdomains", required=True)
parser.add_argument('-b', '--bruteforce', help='Enable the subbrute bruteforce module', nargs='?', default=False)
parser.add_argument('-p', '--ports', help='Scan the found subdomains against specified tcp ports')
parser.add_argument('-v', '--verbose', help='Enable Verbosity and display results in realtime', nargs='?',
default=False)
parser.add_argument('-t', '--threads', help='Number of threads to use for subbrute bruteforce', type=int,
default=30)
parser.add_argument('-e', '--engines', help='Specify a comma-separated list of search engines')
parser.add_argument('-o', '--output', help='Save just domain names to specified text file')
parser.add_argument('-a', '--analyze', default=False, help='Do reverse DNS analysis and output results', action="store_true")
parser.add_argument('--saverdns', help='Save reverse DNS analysis to specified file')
parser.add_argument('--inputfile', help='Read domains from specified file (perhaps from other tool) and use instead of searching engines. Use with -a to analyze domains')
parser.add_argument('--debug', default=False, help='Enable technical debug output', action="store_true")
return parser.parse_args()
def write_file(filename, subdomains):
# saving subdomains results to output file
print("%s[-] Saving results to file: %s%s%s%s" % (Y, W, R, filename, W))
with open(str(filename), 'wt') as f:
for subdomain in subdomains:
f.write(subdomain + "\r\n")
def subdomain_sorting_key(hostname):
"""Sorting key for subdomains
This sorting key orders subdomains from the top-level domain at the right
reading left, then moving '^' and 'www' to the top of their group. For
example, the following list is sorted correctly:
[
'example.com',
'www.example.com',
'a.example.com',
'www.a.example.com',
'b.a.example.com',
'b.example.com',
'example.net',
'www.example.net',
'a.example.net',
]
"""
parts = hostname.split('.')[::-1]
if parts[-1] == 'www':
return parts[:-1], 1
return parts, 0
class enumratorBase(object):
def __init__(self, base_url, engine_name, domain, subdomains=None, silent=False, verbose=True):
subdomains = subdomains or []
self.domain = urlparse.urlparse(domain).netloc
self.session = requests.Session()
self.subdomains = []
self.timeout = 25
self.base_url = base_url
self.engine_name = engine_name
self.silent = silent
self.verbose = verbose
self.headers = {
'User-Agent': 'Mozilla/5.0 (Windows NT 6.1; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/58.0.3029.110 Safari/537.36',
'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,*/*;q=0.8',
'Accept-Language': 'en-US,en;q=0.8',
'Accept-Encoding': 'gzip',
}
self.print_banner()
def print_(self, text):
if not self.silent:
print(text)
return
def print_banner(self):
""" subclass can override this if they want a fancy banner :)"""
self.print_(G + "[-] Searching now in %s.." % (self.engine_name) + W)
return
def send_req(self, query, page_no=1):
url = self.base_url.format(query=query, page_no=page_no)
try:
resp = self.session.get(url, headers=self.headers, timeout=self.timeout)
except Exception:
resp = None
return self.get_response(resp)
def get_response(self, response):
if response is None:
return 0
return response.text if hasattr(response, "text") else response.content
def check_max_subdomains(self, count):
if self.MAX_DOMAINS == 0:
return False
return count >= self.MAX_DOMAINS
def check_max_pages(self, num):
if self.MAX_PAGES == 0:
return False
return num >= self.MAX_PAGES
# override
def extract_domains(self, resp):
""" chlid class should override this function """
return
# override
def check_response_errors(self, resp):
""" chlid class should override this function
The function should return True if there are no errors and False otherwise
"""
return True
def should_sleep(self):
"""Some enumrators require sleeping to avoid bot detections like Google enumerator"""
return
def generate_query(self):
""" chlid class should override this function """
return
def get_page(self, num):
""" chlid class that user different pagnation counter should override this function """
return num + 10
def enumerate(self, altquery=False):
flag = True
page_no = 0
prev_links = []
retries = 0
while flag:
query = self.generate_query()
count = query.count(self.domain) # finding the number of subdomains found so far
# if they we reached the maximum number of subdomains in search query
# then we should go over the pages
if self.check_max_subdomains(count):
page_no = self.get_page(page_no)
if self.check_max_pages(page_no): # maximum pages for Google to avoid getting blocked
return self.subdomains
resp = self.send_req(query, page_no)
# check if there is any error occured
if not self.check_response_errors(resp):
return self.subdomains
links = self.extract_domains(resp)
# if the previous page hyperlinks was the similar to the current one, then maybe we have reached the last page
if links == prev_links:
retries += 1
page_no = self.get_page(page_no)
# make another retry maybe it isn't the last page
if retries >= 3:
return self.subdomains
prev_links = links
self.should_sleep()
return self.subdomains
class enumratorBaseThreaded(multiprocessing.Process, enumratorBase):
def __init__(self, base_url, engine_name, domain, subdomains=None, q=None, lock=threading.Lock(), silent=False,
verbose=True):
subdomains = subdomains or []
enumratorBase.__init__(self, base_url, engine_name, domain, subdomains, silent=silent, verbose=verbose)
multiprocessing.Process.__init__(self)
self.lock = lock
self.q = q
return
def run(self):
domain_list = self.enumerate()
for domain in domain_list:
self.q.append(domain)
class GoogleEnum(enumratorBaseThreaded):
def __init__(self, domain, subdomains=None, q=None, silent=False, verbose=True):
subdomains = subdomains or []
base_url = "https://google.com/search?q={query}&btnG=Search&hl=en-US&biw=&bih=&gbv=1&start={page_no}&filter=0"
self.engine_name = "Google"
self.MAX_DOMAINS = 11
self.MAX_PAGES = 200
super(GoogleEnum, self).__init__(base_url, self.engine_name, domain, subdomains, q=q, silent=silent,
verbose=verbose)
self.q = q
return
def extract_domains(self, resp):
link_regx = re.compile('<cite.*?>(.*?)<\/cite>')
try:
links_list = link_regx.findall(resp)
for link in links_list:
link = re.sub('<span.*>', '', link)
if not link.startswith('http'):
link = "http://" + link
subdomain = urlparse.urlparse(link).netloc
if subdomain and subdomain not in self.subdomains and subdomain != self.domain:
if self.verbose:
self.print_("%s%s: %s%s" % (R, self.engine_name, W, subdomain))
self.subdomains.append(subdomain.strip())
except Exception:
pass
return links_list
def check_response_errors(self, resp):
if (type(resp) is str or type(resp) is unicode) and 'Our systems have detected unusual traffic' in resp:
self.print_(R + "[!] Error: Google probably now is blocking our requests" + W)
self.print_(R + "[~] Finished now the Google Enumeration ..." + W)
return False
return True
def should_sleep(self):
time.sleep(5)
return
def generate_query(self):
if self.subdomains:
fmt = 'site:{domain} -www.{domain} -{found}'
found = ' -'.join(self.subdomains[:self.MAX_DOMAINS - 2])
query = fmt.format(domain=self.domain, found=found)
else:
query = "site:{domain} -www.{domain}".format(domain=self.domain)
return query
class YahooEnum(enumratorBaseThreaded):
def __init__(self, domain, subdomains=None, q=None, silent=False, verbose=True):
subdomains = subdomains or []
base_url = "https://search.yahoo.com/search?p={query}&b={page_no}"
self.engine_name = "Yahoo"
self.MAX_DOMAINS = 10
self.MAX_PAGES = 0
super(YahooEnum, self).__init__(base_url, self.engine_name, domain, subdomains, q=q, silent=silent,
verbose=verbose)
self.q = q
return
def extract_domains(self, resp):
link_regx2 = re.compile('<span class=" fz-15px fw-m fc-12th wr-bw.*?">(.*?)</span>')
link_regx = re.compile('<span class="txt"><span class=" cite fw-xl fz-15px">(.*?)</span>')
links_list = []
try:
links = link_regx.findall(resp)
links2 = link_regx2.findall(resp)
links_list = links + links2
for link in links_list:
link = re.sub("<(\/)?b>", "", link)
if not link.startswith('http'):
link = "http://" + link
subdomain = urlparse.urlparse(link).netloc
if not subdomain.endswith(self.domain):
continue
if subdomain and subdomain not in self.subdomains and subdomain != self.domain:
if self.verbose:
self.print_("%s%s: %s%s" % (R, self.engine_name, W, subdomain))
self.subdomains.append(subdomain.strip())
except Exception:
pass
return links_list
def should_sleep(self):
return
def get_page(self, num):
return num + 10
def generate_query(self):
if self.subdomains:
fmt = 'site:{domain} -domain:www.{domain} -domain:{found}'
found = ' -domain:'.join(self.subdomains[:77])
query = fmt.format(domain=self.domain, found=found)
else:
query = "site:{domain}".format(domain=self.domain)
return query
class AskEnum(enumratorBaseThreaded):
def __init__(self, domain, subdomains=None, q=None, silent=False, verbose=True):
subdomains = subdomains or []
base_url = 'http://www.ask.com/web?q={query}&page={page_no}&qid=8D6EE6BF52E0C04527E51F64F22C4534&o=0&l=dir&qsrc=998&qo=pagination'
self.engine_name = "Ask"
self.MAX_DOMAINS = 11
self.MAX_PAGES = 0
enumratorBaseThreaded.__init__(self, base_url, self.engine_name, domain, subdomains, q=q, silent=silent,
verbose=verbose)
self.q = q
return
def extract_domains(self, resp):
link_regx = re.compile('<td>(.*?)</td>', re.IGNORECASE)
try:
links_list = link_regx.findall(resp)
for link in links_list:
if not link.startswith('http'):
link = "http://" + link
subdomain = urlparse.urlparse(link).netloc
if subdomain not in self.subdomains and subdomain != self.domain:
if self.verbose:
self.print_("%s%s: %s%s" % (R, self.engine_name, W, subdomain))
self.subdomains.append(subdomain.strip())
except Exception:
pass
return links_list
def get_page(self, num):
return num + 1
def generate_query(self):
if self.subdomains:
fmt = 'site:{domain} -www.{domain} -{found}'
found = ' -'.join(self.subdomains[:self.MAX_DOMAINS])
query = fmt.format(domain=self.domain, found=found)
else:
query = "site:{domain} -www.{domain}".format(domain=self.domain)
return query
class BingEnum(enumratorBaseThreaded):
def __init__(self, domain, subdomains=None, q=None, silent=False, verbose=True):
subdomains = subdomains or []
base_url = 'https://www.bing.com/search?q={query}&go=Submit&first={page_no}'
self.engine_name = "Bing"
self.MAX_DOMAINS = 30
self.MAX_PAGES = 0
enumratorBaseThreaded.__init__(self, base_url, self.engine_name, domain, subdomains, q=q, silent=silent)
self.q = q
self.verbose = verbose
return
def extract_domains(self, resp):
link_regx = re.compile('<li class="b_algo"><h2><a href="(.*?)"')
link_regx2 = re.compile('<div class="b_title"><h2><a href="(.*?)"')
try:
links = link_regx.findall(resp)
links2 = link_regx2.findall(resp)
links_list = links + links2
for link in links_list:
link = re.sub('<(\/)?strong>|<span.*?>|<|>', '', link)
if not link.startswith('http'):
link = "http://" + link
subdomain = urlparse.urlparse(link).netloc
if subdomain not in self.subdomains and subdomain != self.domain:
if self.verbose:
self.print_("%s%s: %s%s" % (R, self.engine_name, W, subdomain))
self.subdomains.append(subdomain.strip())
except Exception:
pass
return links_list
def generate_query(self):
if self.subdomains:
fmt = 'domain:{domain} -www.{domain} -{found}'
found = ' -'.join(self.subdomains[:self.MAX_DOMAINS])
query = fmt.format(domain=self.domain, found=found)
else:
query = "domain:{domain} -www.{domain}".format(domain=self.domain)
return query
class BaiduEnum(enumratorBaseThreaded):
def __init__(self, domain, subdomains=None, q=None, silent=False, verbose=True):
subdomains = subdomains or []
base_url = 'https://www.baidu.com/s?pn={page_no}&wd={query}&oq={query}'
self.engine_name = "Baidu"
self.MAX_DOMAINS = 2
self.MAX_PAGES = 760
enumratorBaseThreaded.__init__(self, base_url, self.engine_name, domain, subdomains, q=q, silent=silent,
verbose=verbose)
self.querydomain = self.domain
self.q = q
return
def extract_domains(self, resp):
found_newdomain = False
subdomain_list = []
link_regx = re.compile('<a.*?class="c-showurl".*?>(.*?)</a>')
try:
links = link_regx.findall(resp)
for link in links:
link = re.sub('<.*?>|>|<|&nbsp;', '', link)
if not link.startswith('http'):
link = "http://" + link
subdomain = urlparse.urlparse(link).netloc
if subdomain.endswith(self.domain):
subdomain_list.append(subdomain)
if subdomain not in self.subdomains and subdomain != self.domain:
found_newdomain = True
if self.verbose:
self.print_("%s%s: %s%s" % (R, self.engine_name, W, subdomain))
self.subdomains.append(subdomain.strip())
except Exception:
pass
if not found_newdomain and subdomain_list:
self.querydomain = self.findsubs(subdomain_list)
return links
def findsubs(self, subdomains):
count = Counter(subdomains)
subdomain1 = max(count, key=count.get)
count.pop(subdomain1, "None")
subdomain2 = max(count, key=count.get) if count else ''
return (subdomain1, subdomain2)
def check_response_errors(self, resp):
return True
def should_sleep(self):
time.sleep(random.randint(2, 5))
return
def generate_query(self):
if self.subdomains and self.querydomain != self.domain:
found = ' -site:'.join(self.querydomain)
query = "site:{domain} -site:www.{domain} -site:{found} ".format(domain=self.domain, found=found)
else:
query = "site:{domain} -site:www.{domain}".format(domain=self.domain)
return query
class NetcraftEnum(enumratorBaseThreaded):
def __init__(self, domain, subdomains=None, q=None, silent=False, verbose=True):
subdomains = subdomains or []
self.base_url = 'https://searchdns.netcraft.com/?restriction=site+ends+with&host={domain}'
self.engine_name = "Netcraft"
self.lock = threading.Lock()
super(NetcraftEnum, self).__init__(self.base_url, self.engine_name, domain, subdomains, q=q, silent=silent,
verbose=verbose)
self.q = q
return
def req(self, url, cookies=None):
cookies = cookies or {}
try:
resp = self.session.get(url, headers=self.headers, timeout=self.timeout, cookies=cookies)
except Exception as e:
self.print_(e)
resp = None
return resp
def get_next(self, resp):
link_regx = re.compile('<A href="(.*?)"><b>Next page</b></a>')
link = link_regx.findall(resp)
link = re.sub('host=.*?%s' % self.domain, 'host=%s' % self.domain, link[0])
url = 'https://searchdns.netcraft.com' + link
return url
def create_cookies(self, cookie):
cookies = dict()
cookies_list = cookie[0:cookie.find(';')].split("=")
cookies[cookies_list[0]] = cookies_list[1]
cookies['netcraft_js_verification_response'] = hashlib.sha1(urllib.unquote(cookies_list[1])).hexdigest()
return cookies
def get_cookies(self, headers):
if 'set-cookie' in headers:
cookies = self.create_cookies(headers['set-cookie'])
else:
cookies = {}
return cookies
def enumerate(self):
start_url = self.base_url.format(domain='example.com')
resp = self.req(start_url)
cookies = self.get_cookies(resp.headers)
url = self.base_url.format(domain=self.domain)
while True:
resp = self.get_response(self.req(url, cookies))
self.extract_domains(resp)
if 'Next page' not in resp:
return self.subdomains
break
url = self.get_next(resp)
def extract_domains(self, resp):
link_regx = re.compile('<a href="http://toolbar.netcraft.com/site_report\?url=(.*)">')
try:
links_list = link_regx.findall(resp)
for link in links_list:
subdomain = urlparse.urlparse(link).netloc
if not subdomain.endswith(self.domain):
continue
if subdomain and subdomain not in self.subdomains and subdomain != self.domain:
if self.verbose:
self.print_("%s%s: %s%s" % (R, self.engine_name, W, subdomain))
self.subdomains.append(subdomain.strip())
except Exception:
pass
return links_list
class DNSdumpster(enumratorBaseThreaded):
def __init__(self, domain, subdomains=None, q=None, silent=False, verbose=True):
subdomains = subdomains or []
base_url = 'https://dnsdumpster.com/'
self.live_subdomains = []
self.engine_name = "DNSdumpster"
self.threads = 70
self.lock = threading.BoundedSemaphore(value=self.threads)
self.q = q
super(DNSdumpster, self).__init__(base_url, self.engine_name, domain, subdomains, q=q, silent=silent,
verbose=verbose)
return
def check_host(self, host):
is_valid = False
Resolver = dns.resolver.Resolver()
Resolver.nameservers = ['8.8.8.8', '8.8.4.4', '1.1.1.1', '1.0.0.1']
self.lock.acquire()
try:
ip = Resolver.query(host, 'A')[0].to_text()
if ip:
if self.verbose:
self.print_("%s%s: %s%s" % (R, self.engine_name, W, host))
is_valid = True
self.live_subdomains.append(host)
except:
pass
self.lock.release()
return is_valid
def req(self, req_method, url, params=None):
params = params or {}
headers = dict(self.headers)
headers['Referer'] = 'https://dnsdumpster.com'
try:
if req_method == 'GET':
resp = self.session.get(url, headers=headers, timeout=self.timeout)
else:
resp = self.session.post(url, data=params, headers=headers, timeout=self.timeout)
except Exception as e:
self.print_(e)
resp = None
return self.get_response(resp)
def get_csrftoken(self, resp):
csrf_regex = re.compile("<input type='hidden' name='csrfmiddlewaretoken' value='(.*?)' />", re.S)
token = csrf_regex.findall(resp)[0]
return token.strip()
def enumerate(self):
resp = self.req('GET', self.base_url)
token = self.get_csrftoken(resp)
params = {'csrfmiddlewaretoken': token, 'targetip': self.domain}
post_resp = self.req('POST', self.base_url, params)
self.extract_domains(post_resp)
for subdomain in self.subdomains:
t = threading.Thread(target=self.check_host, args=(subdomain,))
t.start()
t.join()
return self.live_subdomains
def extract_domains(self, resp):
tbl_regex = re.compile('<a name="hostanchor"><\/a>Host Records.*?<table.*?>(.*?)</table>', re.S)
link_regex = re.compile('<td class="col-md-4">(.*?)<br>', re.S)
links = []
try:
results_tbl = tbl_regex.findall(resp)[0]
except IndexError:
results_tbl = ''
links_list = link_regex.findall(results_tbl)
links = list(set(links_list))
for link in links:
subdomain = link.strip()
if not subdomain.endswith(self.domain):
continue
if subdomain and subdomain not in self.subdomains and subdomain != self.domain:
self.subdomains.append(subdomain.strip())
return links
class Virustotal(enumratorBaseThreaded):
def __init__(self, domain, subdomains=None, q=None, silent=False, verbose=True):
subdomains = subdomains or []
base_url = 'https://www.virustotal.com/en/domain/{domain}/information/'
self.engine_name = "Virustotal"
self.lock = threading.Lock()
self.q = q
super(Virustotal, self).__init__(base_url, self.engine_name, domain, subdomains, q=q, silent=silent,
verbose=verbose)
return
# the main send_req need to be rewritten
def send_req(self, url):
try:
resp = self.session.get(url, headers=self.headers, timeout=self.timeout)
except Exception as e:
self.print_(e)
resp = None
return self.get_response(resp)
# once the send_req is rewritten we don't need to call this function, the stock one should be ok
def enumerate(self):
url = self.base_url.format(domain=self.domain)
resp = self.send_req(url)
self.extract_domains(resp)
return self.subdomains
def extract_domains(self, resp):
link_regx = re.compile('<div class="enum.*?">.*?<a target="_blank" href=".*?">(.*?)</a>', re.S)
try:
links = link_regx.findall(resp)
for link in links:
subdomain = link.strip()
if not subdomain.endswith(self.domain):
continue
if subdomain not in self.subdomains and subdomain != self.domain:
if self.verbose:
self.print_("%s%s: %s%s" % (R, self.engine_name, W, subdomain))
self.subdomains.append(subdomain.strip())
except Exception:
pass
class ThreatCrowd(enumratorBaseThreaded):
def __init__(self, domain, subdomains=None, q=None, silent=False, verbose=True):
subdomains = subdomains or []
base_url = 'https://www.threatcrowd.org/searchApi/v2/domain/report/?domain={domain}'
self.engine_name = "ThreatCrowd"
self.lock = threading.Lock()
self.q = q
super(ThreatCrowd, self).__init__(base_url, self.engine_name, domain, subdomains, q=q, silent=silent,
verbose=verbose)
return
def req(self, url):
try:
resp = self.session.get(url, headers=self.headers, timeout=self.timeout)
except Exception:
resp = None
return self.get_response(resp)
def enumerate(self):
url = self.base_url.format(domain=self.domain)
resp = self.req(url)
self.extract_domains(resp)
return self.subdomains
def extract_domains(self, resp):
try:
import json
except Exception as e:
self.print_(e)
return
try:
links = json.loads(resp)['subdomains']
for link in links:
subdomain = link.strip()
if not subdomain.endswith(self.domain):
continue
if subdomain not in self.subdomains and subdomain != self.domain:
if self.verbose:
self.print_("%s%s: %s%s" % (R, self.engine_name, W, subdomain))
self.subdomains.append(subdomain.strip())
except Exception as e:
pass
class CrtSearch(enumratorBaseThreaded):
def __init__(self, domain, subdomains=None, q=None, silent=False, verbose=True):
subdomains = subdomains or []
base_url = 'https://crt.sh/?q=%25.{domain}'
self.engine_name = "SSL Certificates"
self.lock = threading.Lock()
self.q = q
super(CrtSearch, self).__init__(base_url, self.engine_name, domain, subdomains, q=q, silent=silent,
verbose=verbose)
return
def req(self, url):
try:
resp = self.session.get(url, headers=self.headers, timeout=self.timeout)
except Exception:
resp = None
return self.get_response(resp)
def enumerate(self):
url = self.base_url.format(domain=self.domain)
resp = self.req(url)
if resp:
self.extract_domains(resp)
return self.subdomains
def extract_domains(self, resp):
link_regx = re.compile('<TD>(.*?)</TD>')
try:
links = link_regx.findall(resp)
for link in links:
subdomain = link.strip()
if not subdomain.endswith(self.domain) or '*' in subdomain:
continue
if '@' in subdomain:
subdomain = subdomain[subdomain.find('@') + 1:]
if subdomain not in self.subdomains and subdomain != self.domain:
if self.verbose:
self.print_("%s%s: %s%s" % (R, self.engine_name, W, subdomain))
self.subdomains.append(subdomain.strip())
except Exception as e:
pass
class PassiveDNS(enumratorBaseThreaded):
def __init__(self, domain, subdomains=None, q=None, silent=False, verbose=True):
subdomains = subdomains or []
base_url = 'http://ptrarchive.com/tools/search.htm?label={domain}'
self.engine_name = "PassiveDNS"
self.lock = threading.Lock()
self.q = q
super(PassiveDNS, self).__init__(base_url, self.engine_name, domain, subdomains, q=q, silent=silent,
verbose=verbose)
return
def get_agent(self, ua=None):
agents_url = 'http://www.webuseragents.com/recent'
try:
resp = session.get(agents_url, headers=self.headers, timeout=self.timeout)
agents_list = self.get_response(resp)
agents_regex = re.compile('<a href="/ua/.*?>(.*)</a>')
agents = agents_regex.findall(agents_list)
ua = random.choice(agents)
except Exception as e:
pass
return ua
def req(self, url):
try:
if self.get_agent():
self.headers['User-Agent'] = self.get_agent()
resp = self.session.get(url, headers=self.headers, timeout=self.timeout)
except Exception as e:
self.print_(e)
resp = None
return self.get_response(resp)
def enumerate(self):
url = self.base_url.format(domain=self.domain)
resp = self.req(url)
self.extract_domains(resp)
return self.subdomains
def extract_domains(self, resp):
link_regx = re.compile('<td>(.*?)</td>')
try:
links = link_regx.findall(resp)
for link in links:
if self.domain not in link:
continue
subdomain = link[:link.find('[')].strip()
if subdomain not in self.subdomains and subdomain != self.domain and subdomain.endswith(self.domain):
if self.verbose:
self.print_("%s%s: %s%s" % (R, self.engine_name, W, subdomain))
self.subdomains.append(subdomain.strip())
except Exception:
pass
class portscan():
def __init__(self, subdomains, ports):
self.subdomains = subdomains
self.ports = ports
self.threads = 20
self.lock = threading.BoundedSemaphore(value=self.threads)
def port_scan(self, host, ports):
openports = []
self.lock.acquire()
for port in ports:
try:
s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
s.settimeout(2)
result = s.connect_ex((host, int(port)))
if result == 0:
openports.append(port)
s.close()
except Exception:
pass
self.lock.release()
if len(openports) > 0:
print("%s%s%s - %sFound open ports:%s %s%s%s" % (G, host, W, R, W, Y, ', '.join(openports), W))
def run(self):
for subdomain in self.subdomains:
t = threading.Thread(target=self.port_scan, args=(subdomain, self.ports))
t.start()
def main(domain, threads, savefile, ports, silent, verbose, enable_bruteforce, engines):
bruteforce_list = set()
search_list = set()
if is_windows:
subdomains_queue = list()
else:
subdomains_queue = multiprocessing.Manager().list()
# Check Bruteforce Status
if enable_bruteforce or enable_bruteforce is None:
enable_bruteforce = True
# Validate domain
domain_check = re.compile("^(http|https)?[a-zA-Z0-9]+([\-\.]{1}[a-zA-Z0-9]+)*\.[a-zA-Z]{2,}$")
if not domain_check.match(domain):
if not silent:
print(R + "Error: Please enter a valid domain" + W)
return []
if not domain.startswith('http://') or not domain.startswith('https://'):
domain = 'http://' + domain
parsed_domain = urlparse.urlparse(domain)
if not silent:
print(B + "[-] Enumerating subdomains now for %s" % parsed_domain.netloc + W)
if verbose and not silent:
print(Y + "[-] verbosity is enabled, will show the subdomains results in realtime" + W)
supported_engines = {'baidu': BaiduEnum,
'yahoo': YahooEnum,
'google': GoogleEnum,
'bing': BingEnum,
'ask': AskEnum,
'netcraft': NetcraftEnum,
'dnsdumpster': DNSdumpster,
'virustotal': Virustotal,
'threatcrowd': ThreatCrowd,
'ssl': CrtSearch,
'passivedns': PassiveDNS
}
chosenEnums = []
if engines is None:
chosenEnums = [
BaiduEnum, YahooEnum, GoogleEnum, BingEnum, AskEnum,
NetcraftEnum, DNSdumpster, Virustotal, ThreatCrowd,
CrtSearch, PassiveDNS
]
else:
engines = engines.split(',')
for engine in engines:
if engine.lower() in supported_engines:
chosenEnums.append(supported_engines[engine.lower()])
# Start the engines enumeration
enums = [enum(domain, [], q=subdomains_queue, silent=silent, verbose=verbose) for enum in chosenEnums]
for enum in enums:
enum.start()
for enum in enums:
enum.join()
subdomains = set(subdomains_queue)
for subdomain in subdomains:
search_list.add(subdomain)
if enable_bruteforce:
if not silent:
print(G + "[-] Starting bruteforce module now using subbrute.." + W)
record_type = False
path_to_file = os.path.dirname(os.path.realpath(__file__))
subs = os.path.join(path_to_file, 'subbrute', 'names.txt')
resolvers = os.path.join(path_to_file, 'subbrute', 'resolvers.txt')
process_count = threads
output = False
json_output = False
bruteforce_list = subbrute.print_target(parsed_domain.netloc, record_type, subs, resolvers, process_count,
output, json_output, search_list, verbose)
subdomains = search_list.union(bruteforce_list)
if subdomains:
subdomains = sorted(subdomains, key=subdomain_sorting_key)
if savefile:
write_file(savefile, subdomains)
if not silent:
print(Y + "[-] Total Unique Subdomains Found: %s" % len(subdomains) + W)
if ports:
if not silent:
print(G + "[-] Start port scan now for the following ports: %s%s" % (Y, ports) + W)
ports = ports.split(',')
pscan = portscan(subdomains, ports)
pscan.run()
elif not silent:
for subdomain in subdomains:
# Code modified - remove 'From http://PTRarchive.com: ' which shows up in some results
subdomain = subdomain.replace("From http://PTRarchive.com: ", "")
print(G + subdomain + W)
return subdomains
# Method code added
cnames = ['== CNAME records ==']
ahosts = ['== A records ==']
def lookup(guess, name_server):
if (debug):
print('Trying ' + guess + ' at ' + name_server)
use_tcp = False
response = None
failed = False
record_type = ""
record_value = ""
query = dnslib.DNSRecord.question(guess)
try:
response_q = query.send(name_server, 53, use_tcp, timeout=3)
if response_q:
if (debug):
print("response_q: " + response_q)
response = dnslib.DNSRecord.parse(response_q)
except KeyboardInterrupt:
print('User exit')
exit()
except:
# probably socket timed out
print("ERROR - possible socket timeout when trying " + guess + ' at server ' + name_server)
pass
if response:
if debug:
print("Decoded response:\n" + str(response) + "\n")
rcode = dnslib.RCODE[response.header.rcode]
if rcode == 'NOERROR' or rcode == 'NXDOMAIN':
# success, this is a valid subdomain
if debug:
print("response.rr:\n" + str(response.rr) + "\n")
for r in response.rr:
# note that we are looping through each piece of the answer but we only return from this method once... we must pick what we return verrry carefully, see explantion below
if debug:
print("r:\n" + str(r) + "\n")
rtype = None
try:
rtype = str(dnslib.QTYPE[r.rtype])
except:
rtype = str(r.rtype)
# print rtype
if (rtype == 'CNAME'):
# print r.rdata
record_type = 'CNAME'
record_value = str(r.rdata)
'''
*Why the following break keyword is here
So if we submit a query and get back a CNAME, the response contains the CNAME record and also the
CNAME records' record, and so on down to the A or AAAA record with the final IP address for the
query we submitted.
Example:
;; ANSWER SECTION:
support.indeed.com. 7200 IN CNAME indeed.zendesk.com.
indeed.zendesk.com. 900 IN A 52.34.200.91
indeed.zendesk.com. 900 IN A 35.167.245.158
indeed.zendesk.com. 900 IN A 34.216.174.56
That's fine, and we loop through each of these "answers" in the 'for r in response.rr' loop.
PROBLEM: we were overwriting the value of record_type and record_value each time around the loop.
So if the first 'r' was a CNAME record we wouldn't know about it since the subsequent A/AAAA record would update 'record_type
and record_value to reflect the A record not the first CNAME record!
The user would see the subdomain as a plain A record but that wasn't true... depending on if the CNAME was processed first.
Since this is structered as a method call that returns a single record type and value pair, and because
we care more about the CNAME record our query points to than the IP (at least for subdomain takeover recon), we will
halt analysis and return the CNAME data if we encounter a CNAME record. We care that a domain name points to an
AWS bucket, for example, not the particular Amazon IP it ultimately has to talk to.
'''
break
elif (rtype == 'A' or rtype == 'AAAA'):
record_type = 'A'
record_value = str(r.rdata)
else:
print("ERROR - returned stats " + rcode + " when trying " + guess + ' at server ' + name_server)
return record_type, record_value
#####################
# Program entry point
#####################
if __name__ == "__main__":
args = parse_args()
domain = args.domain
threads = args.threads
savefile = args.output
ports = args.ports
enable_bruteforce = args.bruteforce
verbose = args.verbose
engines = args.engines
inputfile = args.inputfile
analyze = args.analyze
analysisfile = args.saverdns
debug = args.debug
if (debug):
print("Debugging output enabled for analysis module")
if verbose or verbose is None:
verbose = True
banner()
if (inputfile != None):
print(B + "[-] Reading subdomains from " + inputfile + W)
f = open(inputfile, 'r')
res = f.readlines()
f.close()
else:
res = main(domain, threads, savefile, ports, silent=False, verbose=verbose, enable_bruteforce=enable_bruteforce,
engines=engines)
# Code added here
if (analyze):
# res is the list of subdomains e.g. www.example.com, mail.example.com, etc
resolvers = ['8.8.8.8', '8.8.4.4', '9.9.9.9', '1.1.1.1', '1.0.0.1']
server = 0
count = 0
total = str(len(res))
print("")
print(B + "[-] Beginning analysis of " + total + " subdomains..." + W)
for subdomain in res:
try:
name = subdomain.replace('\n', '').replace('\r', '')
(rtype, record) = lookup(name, resolvers[server])
# if the query did not return an error, then add result to appropriate array
if rtype != "ERROR":
if rtype == "CNAME":
cnames.append(name + " -->-- " + record)
elif rtype == "A":
ahosts.append(name + " -->-- " + record)
# round robin the resolvers
server = server + 1
server = server % len(resolvers)
# update user on our progress - every 30 hosts
count = count + 1
if (count % 30) == 0:
print(str(count) + '/' + total)
time.sleep(0.2) # This helps the script catch the Ctrl-C cancel without looping up to the next subdomain
except KeyboardInterrupt:
print(R + '\n[-] User exit' + W)
exit()
except:
# Generally unknown error. Keep going
# Known errors: subdomain sample starting with a dot, ex .domain.com
continue
ahosts.sort()
cnames.sort()
# output analysis results to console
for x in range(0, len(ahosts)):
print(G + ahosts[x] + W)
print("\n")
for x in range(0, len(cnames)):
print(G + cnames[x] + W)
if (analysisfile!=None):
# save the analysis to a file. Merge the arrays into one list for easier reading
write_file(analysisfile, ahosts + ["\n"] + cnames)
print(B + "Saved reverse DNS analysis to " + analysisfile + W)