Refactor: Modernize code with AI assistance

Key changes include:
- Updated User-Agent string.
- Added interactive API key prompt for VirusTotal.
- Improved error handling for DNSdumpster and VirusTotal.
- General code cleanup and Python 3 compatibility improvements.
This commit is contained in:
FLOURISH 2025-06-19 16:42:04 +01:00
parent 729d649ec5
commit a32920859c
1 changed files with 106 additions and 66 deletions

View File

@ -2,6 +2,7 @@
# coding: utf-8
# Sublist3r v1.0
# By Ahmed Aboul-Ela - twitter.com/aboul3la
# Refactored with AI By CYBWithFlourish - github.com/CYBWithFlourish
# modules in standard library
import re
@ -72,14 +73,16 @@ def no_color():
def banner():
# <<< FIXED: Escaped backslashes to remove SyntaxWarning
print("""%s
____ _ _ _ _ _____
/ ___| _ _| |__ | (_)___| |_|___ / _ __
\___ \| | | | '_ \| | / __| __| |_ \| '__|
___) | |_| | |_) | | \__ \ |_ ___) | |
|____/ \__,_|_.__/|_|_|___/\__|____/|_|%s%s
\\___ \\| | | | '_ \\| | / __| __| |_ \\| '__|
___) | |_| | |_) | | \\__ \\ |_ ___) | |
|____/ \\__,_|_.__/|_|_|___/\\__|____/|_|%s%s
# Coded By Ahmed Aboul-Ela - @aboul3la
# Refactored By CYBWithFlourish - @CYBWithFlourish
""" % (R, W, Y))
@ -152,7 +155,7 @@ class enumratorBase(object):
self.silent = silent
self.verbose = verbose
self.headers = {
'User-Agent': 'Mozilla/5.0 (Windows NT 6.1; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/58.0.3029.110 Safari/537.36',
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/108.0.0.0 Safari/537.36', # <<< IMPROVED: Modern User-Agent
'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,*/*;q=0.8',
'Accept-Language': 'en-US,en;q=0.8',
'Accept-Encoding': 'gzip',
@ -283,7 +286,7 @@ class GoogleEnum(enumratorBaseThreaded):
def extract_domains(self, resp):
links_list = list()
link_regx = re.compile('<cite.*?>(.*?)<\/cite>')
link_regx = re.compile(r'<cite.*?>(.*?)<\/cite>')
try:
links_list = link_regx.findall(resp)
for link in links_list:
@ -300,14 +303,14 @@ class GoogleEnum(enumratorBaseThreaded):
return links_list
def check_response_errors(self, resp):
if (type(resp) is str or type(resp) is unicode) and 'Our systems have detected unusual traffic' in resp:
if (type(resp) is str or type(resp) is bytes) and 'Our systems have detected unusual traffic' in str(resp):
self.print_(R + "[!] Error: Google probably now is blocking our requests" + W)
self.print_(R + "[~] Finished now the Google Enumeration ..." + W)
return False
return True
def should_sleep(self):
time.sleep(5)
time.sleep(random.randint(5, 10))
return
def generate_query(self):
@ -320,6 +323,7 @@ class GoogleEnum(enumratorBaseThreaded):
return query
# ... (The other enumerator classes like Yahoo, Ask, Bing, etc. remain unchanged) ...
class YahooEnum(enumratorBaseThreaded):
def __init__(self, domain, subdomains=None, q=None, silent=False, verbose=True):
subdomains = subdomains or []
@ -332,15 +336,15 @@ class YahooEnum(enumratorBaseThreaded):
return
def extract_domains(self, resp):
link_regx2 = re.compile('<span class=" fz-.*? fw-m fc-12th wr-bw.*?">(.*?)</span>')
link_regx = re.compile('<span class="txt"><span class=" cite fw-xl fz-15px">(.*?)</span>')
link_regx2 = re.compile(r'<span class=" fz-.*? fw-m fc-12th wr-bw.*?">(.*?)</span>')
link_regx = re.compile(r'<span class="txt"><span class=" cite fw-xl fz-15px">(.*?)</span>')
links_list = []
try:
links = link_regx.findall(resp)
links2 = link_regx2.findall(resp)
links_list = links + links2
for link in links_list:
link = re.sub("<(\/)?b>", "", link)
link = re.sub(r"<(\/)?b>", "", link)
if not link.startswith('http'):
link = "http://" + link
subdomain = urlparse.urlparse(link).netloc
@ -352,7 +356,6 @@ class YahooEnum(enumratorBaseThreaded):
self.subdomains.append(subdomain.strip())
except Exception:
pass
return links_list
def should_sleep(self):
@ -378,13 +381,13 @@ class AskEnum(enumratorBaseThreaded):
self.engine_name = "Ask"
self.MAX_DOMAINS = 11
self.MAX_PAGES = 0
enumratorBaseThreaded.__init__(self, base_url, self.engine_name, domain, subdomains, q=q, silent=silent, verbose=verbose)
super(AskEnum, self).__init__(base_url, self.engine_name, domain, subdomains, q=q, silent=silent, verbose=verbose)
self.q = q
return
def extract_domains(self, resp):
links_list = list()
link_regx = re.compile('<p class="web-result-url">(.*?)</p>')
link_regx = re.compile(r'<p class="web-result-url">(.*?)</p>')
try:
links_list = link_regx.findall(resp)
for link in links_list:
@ -397,7 +400,6 @@ class AskEnum(enumratorBaseThreaded):
self.subdomains.append(subdomain.strip())
except Exception:
pass
return links_list
def get_page(self, num):
@ -410,10 +412,8 @@ class AskEnum(enumratorBaseThreaded):
query = fmt.format(domain=self.domain, found=found)
else:
query = "site:{domain} -www.{domain}".format(domain=self.domain)
return query
class BingEnum(enumratorBaseThreaded):
def __init__(self, domain, subdomains=None, q=None, silent=False, verbose=True):
subdomains = subdomains or []
@ -421,22 +421,21 @@ class BingEnum(enumratorBaseThreaded):
self.engine_name = "Bing"
self.MAX_DOMAINS = 30
self.MAX_PAGES = 0
enumratorBaseThreaded.__init__(self, base_url, self.engine_name, domain, subdomains, q=q, silent=silent)
super(BingEnum, self).__init__(base_url, self.engine_name, domain, subdomains, q=q, silent=silent)
self.q = q
self.verbose = verbose
return
def extract_domains(self, resp):
links_list = list()
link_regx = re.compile('<li class="b_algo"><h2><a href="(.*?)"')
link_regx2 = re.compile('<div class="b_title"><h2><a href="(.*?)"')
link_regx = re.compile(r'<li class="b_algo"><h2><a href="(.*?)"')
link_regx2 = re.compile(r'<div class="b_title"><h2><a href="(.*?)"')
try:
links = link_regx.findall(resp)
links2 = link_regx2.findall(resp)
links_list = links + links2
for link in links_list:
link = re.sub('<(\/)?strong>|<span.*?>|<|>', '', link)
link = re.sub(r'<(\/)?strong>|<span.*?>|<|>', '', link)
if not link.startswith('http'):
link = "http://" + link
subdomain = urlparse.urlparse(link).netloc
@ -446,7 +445,6 @@ class BingEnum(enumratorBaseThreaded):
self.subdomains.append(subdomain.strip())
except Exception:
pass
return links_list
def generate_query(self):
@ -466,7 +464,7 @@ class BaiduEnum(enumratorBaseThreaded):
self.engine_name = "Baidu"
self.MAX_DOMAINS = 2
self.MAX_PAGES = 760
enumratorBaseThreaded.__init__(self, base_url, self.engine_name, domain, subdomains, q=q, silent=silent, verbose=verbose)
super(BaiduEnum, self).__init__(base_url, self.engine_name, domain, subdomains, q=q, silent=silent, verbose=verbose)
self.querydomain = self.domain
self.q = q
return
@ -475,11 +473,11 @@ class BaiduEnum(enumratorBaseThreaded):
links = list()
found_newdomain = False
subdomain_list = []
link_regx = re.compile('<a.*?class="c-showurl".*?>(.*?)</a>')
link_regx = re.compile(r'<a.*?class="c-showurl".*?>(.*?)</a>')
try:
links = link_regx.findall(resp)
for link in links:
link = re.sub('<.*?>|>|<|&nbsp;', '', link)
link = re.sub('<.*?>|>|<| ', '', link)
if not link.startswith('http'):
link = "http://" + link
subdomain = urlparse.urlparse(link).netloc
@ -542,7 +540,7 @@ class NetcraftEnum(enumratorBaseThreaded):
return
def get_next(self, resp):
link_regx = re.compile('<a.*?href="(.*?)">Next Page')
link_regx = re.compile(r'<a.*?href="(.*?)">Next Page')
link = link_regx.findall(resp)
url = 'http://searchdns.netcraft.com' + link[0]
return url
@ -551,7 +549,6 @@ class NetcraftEnum(enumratorBaseThreaded):
cookies = dict()
cookies_list = cookie[0:cookie.find(';')].split("=")
cookies[cookies_list[0]] = cookies_list[1]
# hashlib.sha1 requires utf-8 encoded str
cookies['netcraft_js_verification_response'] = hashlib.sha1(urllib.unquote(cookies_list[1]).encode('utf-8')).hexdigest()
return cookies
@ -578,7 +575,7 @@ class NetcraftEnum(enumratorBaseThreaded):
def extract_domains(self, resp):
links_list = list()
link_regx = re.compile('<a class="results-table__host" href="(.*?)"')
link_regx = re.compile(r'<a class="results-table__host" href="(.*?)"')
try:
links_list = link_regx.findall(resp)
for link in links_list:
@ -637,26 +634,35 @@ class DNSdumpster(enumratorBaseThreaded):
return self.get_response(resp)
def get_csrftoken(self, resp):
csrf_regex = re.compile('<input type="hidden" name="csrfmiddlewaretoken" value="(.*?)">', re.S)
token = csrf_regex.findall(resp)[0]
return token.strip()
csrf_regex = re.compile(r'<input type="hidden" name="csrftoken" value="(.*?)">', re.S)
try:
token = csrf_regex.findall(resp)[0]
return token.strip()
except IndexError:
self.print_(R + "[!] Error: Could not find CSRF token for DNSdumpster." + W)
return ""
def enumerate(self):
self.lock = threading.BoundedSemaphore(value=70)
resp = self.req('GET', self.base_url)
token = self.get_csrftoken(resp)
params = {'csrfmiddlewaretoken': token, 'targetip': self.domain}
if not token:
return self.subdomains
params = {'csrftoken': token, 'targetip': self.domain}
post_resp = self.req('POST', self.base_url, params)
self.extract_domains(post_resp)
threads = []
for subdomain in self.subdomains:
t = threading.Thread(target=self.check_host, args=(subdomain,))
threads.append(t)
t.start()
for t in threads:
t.join()
return self.live_subdomains
def extract_domains(self, resp):
tbl_regex = re.compile('<a name="hostanchor"><\/a>Host Records.*?<table.*?>(.*?)</table>', re.S)
link_regex = re.compile('<td class="col-md-4">(.*?)<br>', re.S)
tbl_regex = re.compile(r'<a name="hostanchor"></a>Host Records.*?<table.*?>(.*?)</table>', re.S)
link_regex = re.compile(r'<td class="col-md-4">(.*?)<br>', re.S)
links = []
try:
results_tbl = tbl_regex.findall(resp)[0]
@ -676,52 +682,77 @@ class DNSdumpster(enumratorBaseThreaded):
class Virustotal(enumratorBaseThreaded):
def __init__(self, domain, subdomains=None, q=None, silent=False, verbose=True):
subdomains = subdomains or []
base_url = 'https://www.virustotal.com/ui/domains/{domain}/subdomains'
base_url = 'https://www.virustotal.com/api/v3/domains/{domain}/subdomains'
self.engine_name = "Virustotal"
self.q = q
super(Virustotal, self).__init__(base_url, self.engine_name, domain, subdomains, q=q, silent=silent, verbose=verbose)
# <<< CUSTOMIZATION: Prompt for API key >>>
# This will only be prompted once when the Virustotal module is initialized.
prompt_message = Y + "[?] Please enter your Virustotal API key (or press Enter to skip): " + W
self.api_key = input(prompt_message).strip()
if self.api_key:
self.headers['x-apikey'] = self.api_key
self.url = self.base_url.format(domain=self.domain)
return
# the main send_req need to be rewritten
def send_req(self, url):
try:
resp = self.session.get(url, headers=self.headers, timeout=self.timeout)
except Exception as e:
self.print_(e)
resp = None
return self.get_response(resp)
# once the send_req is rewritten we don't need to call this function, the stock one should be ok
def enumerate(self):
while self.url != '':
# <<< CUSTOMIZATION: Check if an API key was provided before running >>>
if not self.api_key:
self.print_(Y + "[~] Skipping Virustotal enumeration (no API key provided)." + W)
return self.subdomains
while self.url:
resp = self.send_req(self.url)
resp = json.loads(resp)
if 'error' in resp:
self.print_(R + "[!] Error: Virustotal probably now is blocking our requests" + W)
if not resp:
break
if 'links' in resp and 'next' in resp['links']:
self.url = resp['links']['next']
try:
resp_json = json.loads(resp)
except json.JSONDecodeError:
self.print_(R + "[!] Error: Virustotal returned a non-JSON response. It may be blocking requests." + W)
break
if 'error' in resp_json:
error_code = resp_json['error'].get('code')
if error_code == 'WrongCredentialsError':
self.print_(R + "[!] Error: Invalid Virustotal API key." + W)
else:
self.print_(R + "[!] Error: Virustotal API returned an error: " + resp_json['error'].get('message', 'Unknown Error') + W)
break
self.extract_domains(resp_json)
if 'links' in resp_json and 'next' in resp_json['links']:
self.url = resp_json['links']['next']
else:
self.url = ''
self.extract_domains(resp)
self.url = '' # No more pages
return self.subdomains
def extract_domains(self, resp):
#resp is already parsed as json
try:
for i in resp['data']:
if i['type'] == 'domain':
subdomain = i['id']
for i in resp.get('data', []):
if i.get('type') == 'domain':
subdomain = i.get('id')
if not subdomain.endswith(self.domain):
continue
if subdomain not in self.subdomains and subdomain != self.domain:
if self.verbose:
self.print_("%s%s: %s%s" % (R, self.engine_name, W, subdomain))
self.subdomains.append(subdomain.strip())
except Exception:
pass
except Exception as e:
self.print_(R + f"[!] Error parsing Virustotal data: {e}" + W)
class ThreatCrowd(enumratorBaseThreaded):
@ -749,17 +780,22 @@ class ThreatCrowd(enumratorBaseThreaded):
def extract_domains(self, resp):
try:
links = json.loads(resp)['subdomains']
for link in links:
subdomain = link.strip()
if not subdomain.endswith(self.domain):
continue
if subdomain not in self.subdomains and subdomain != self.domain:
if self.verbose:
self.print_("%s%s: %s%s" % (R, self.engine_name, W, subdomain))
self.subdomains.append(subdomain.strip())
except Exception as e:
data = json.loads(resp)
# ThreatCrowd API can return 404/empty response
if 'subdomains' in data:
for link in data['subdomains']:
subdomain = link.strip()
if not subdomain.endswith(self.domain):
continue
if subdomain not in self.subdomains and subdomain != self.domain:
if self.verbose:
self.print_("%s%s: %s%s" % (R, self.engine_name, W, subdomain))
self.subdomains.append(subdomain.strip())
except (json.JSONDecodeError, TypeError):
# Gracefully handle cases where resp is not valid JSON
pass
except Exception as e:
self.print_(R + f"[!] Error parsing ThreatCrowd data: {e}" + W)
class CrtSearch(enumratorBaseThreaded):
@ -787,7 +823,7 @@ class CrtSearch(enumratorBaseThreaded):
return self.subdomains
def extract_domains(self, resp):
link_regx = re.compile('<TD>(.*?)</TD>')
link_regx = re.compile(r'<TD>(.*?)</TD>')
try:
links = link_regx.findall(resp)
for link in links:
@ -872,13 +908,17 @@ class portscan():
pass
self.lock.release()
if len(openports) > 0:
print("%s%s%s - %sFound open ports:%s %s%s%s" % (G, host, W, R, W, Y, ', '.join(openports), W))
print("%s%s%s - %sFound open ports:%s %s%s%s" % (G, host, W, R, W, Y, ', '.join(map(str,openports)), W))
def run(self):
self.lock = threading.BoundedSemaphore(value=20)
threads = []
for subdomain in self.subdomains:
t = threading.Thread(target=self.port_scan, args=(subdomain, self.ports))
threads.append(t)
t.start()
for t in threads:
t.join()
def main(domain, threads, savefile, ports, silent, verbose, enable_bruteforce, engines):
@ -895,13 +935,13 @@ def main(domain, threads, savefile, ports, silent, verbose, enable_bruteforce, e
enable_bruteforce = True
# Validate domain
domain_check = re.compile("^(http|https)?[a-zA-Z0-9]+([\-\.]{1}[a-zA-Z0-9]+)*\.[a-zA-Z]{2,}$")
domain_check = re.compile(r"^(http|https)?[a-zA-Z0-9]+([\-\.]{1}[a-zA-Z0-9]+)*\.[a-zA-Z]{2,}$")
if not domain_check.match(domain):
if not silent:
print(R + "Error: Please enter a valid domain" + W)
return []
if not domain.startswith('http://') or not domain.startswith('https://'):
if not domain.startswith('http://') and not domain.startswith('https://'):
domain = 'http://' + domain
parsed_domain = urlparse.urlparse(domain)