Fix Python 3 compatibility issues and code cleanup

This commit is contained in:
itsmemohamednaseem-rgb 2025-10-10 09:33:51 +05:30
parent 729d649ec5
commit 554ace7eee
2 changed files with 168 additions and 42 deletions

View File

@ -2,6 +2,7 @@
# coding: utf-8
# Sublist3r v1.0
# By Ahmed Aboul-Ela - twitter.com/aboul3la
# Optimized by Mohamed Naseem
# modules in standard library
import re
@ -75,9 +76,9 @@ def banner():
print("""%s
____ _ _ _ _ _____
/ ___| _ _| |__ | (_)___| |_|___ / _ __
\___ \| | | | '_ \| | / __| __| |_ \| '__|
___) | |_| | |_) | | \__ \ |_ ___) | |
|____/ \__,_|_.__/|_|_|___/\__|____/|_|%s%s
\\\___ \\| | | | '_ \\| | / __| __| |_ \\| '__|
___) | |_| | |_) | | \\__ \\ |_ ___) | |
|____/ \\__,_|_.__/|_|_|___/\\__|____/|_|%s%s
# Coded By Ahmed Aboul-Ela - @aboul3la
""" % (R, W, Y))
@ -97,9 +98,10 @@ def parse_args():
parser._optionals.title = "OPTIONS"
parser.add_argument('-d', '--domain', help="Domain name to enumerate it's subdomains", required=True)
parser.add_argument('-b', '--bruteforce', help='Enable the subbrute bruteforce module', nargs='?', default=False)
parser.add_argument('-f', '--fast', help='Enable fast scan mode (uses faster engines only)', action='store_true')
parser.add_argument('-t', '--threads', help='Number of threads to use for subbrute bruteforce (default: 30)', default=30, type=int)
parser.add_argument('-p', '--ports', help='Scan the found subdomains against specified tcp ports')
parser.add_argument('-v', '--verbose', help='Enable Verbosity and display results in realtime', nargs='?', default=False)
parser.add_argument('-t', '--threads', help='Number of threads to use for subbrute bruteforce', type=int, default=30)
parser.add_argument('-e', '--engines', help='Specify a comma-separated list of search engines')
parser.add_argument('-o', '--output', help='Save the results to text file')
parser.add_argument('-n', '--no-color', help='Output without color', default=False, action='store_true')
@ -283,7 +285,7 @@ class GoogleEnum(enumratorBaseThreaded):
def extract_domains(self, resp):
links_list = list()
link_regx = re.compile('<cite.*?>(.*?)<\/cite>')
link_regx = re.compile('<cite.*?>(.*?)</cite>')
try:
links_list = link_regx.findall(resp)
for link in links_list:
@ -300,7 +302,7 @@ class GoogleEnum(enumratorBaseThreaded):
return links_list
def check_response_errors(self, resp):
if (type(resp) is str or type(resp) is unicode) and 'Our systems have detected unusual traffic' in resp:
if isinstance(resp, str) and 'Our systems have detected unusual traffic' in resp:
self.print_(R + "[!] Error: Google probably now is blocking our requests" + W)
self.print_(R + "[~] Finished now the Google Enumeration ..." + W)
return False
@ -340,7 +342,7 @@ class YahooEnum(enumratorBaseThreaded):
links2 = link_regx2.findall(resp)
links_list = links + links2
for link in links_list:
link = re.sub("<(\/)?b>", "", link)
link = re.sub("<(/)?b>", "", link)
if not link.startswith('http'):
link = "http://" + link
subdomain = urlparse.urlparse(link).netloc
@ -436,7 +438,7 @@ class BingEnum(enumratorBaseThreaded):
links_list = links + links2
for link in links_list:
link = re.sub('<(\/)?strong>|<span.*?>|<|>', '', link)
link = re.sub('<(/)?strong>|<span.*?>|<|>', '', link)
if not link.startswith('http'):
link = "http://" + link
subdomain = urlparse.urlparse(link).netloc
@ -637,25 +639,82 @@ class DNSdumpster(enumratorBaseThreaded):
return self.get_response(resp)
def get_csrftoken(self, resp):
csrf_regex = re.compile('<input type="hidden" name="csrfmiddlewaretoken" value="(.*?)">', re.S)
token = csrf_regex.findall(resp)[0]
return token.strip()
try:
# Try the old method first
csrf_regex = re.compile('<input type="hidden" name="csrfmiddlewaretoken" value="(.*?)">', re.S)
token = csrf_regex.findall(resp)
if token:
return token[0].strip()
# Try alternate methods if the old one fails
# Method 1: Look for csrf in cookies
if 'csrftoken' in self.session.cookies:
return self.session.cookies['csrftoken']
# Method 2: Look for csrf in a meta tag
csrf_regex = re.compile('<meta[^>]*name[^>]*csrf[^>]*content=["\']([^"\']*)["\']', re.I)
token = csrf_regex.findall(resp)
if token:
return token[0].strip()
# Method 3: Look for csrf in any input field
csrf_regex = re.compile('name=["\']csrf[^"\']*["\'][^>]*value=["\']([^"\']*)["\']', re.I)
token = csrf_regex.findall(resp)
if token:
return token[0].strip()
return ""
except Exception as e:
if self.verbose:
self.print_("%s\nError getting CSRF token: %s" % (R, str(e)))
return ""
def enumerate(self):
self.lock = threading.BoundedSemaphore(value=70)
resp = self.req('GET', self.base_url)
token = self.get_csrftoken(resp)
params = {'csrfmiddlewaretoken': token, 'targetip': self.domain}
post_resp = self.req('POST', self.base_url, params)
self.extract_domains(post_resp)
for subdomain in self.subdomains:
t = threading.Thread(target=self.check_host, args=(subdomain,))
t.start()
t.join()
return self.live_subdomains
try:
# Get initial page and csrf token
resp = self.req('GET', self.base_url)
if not resp:
return self.live_subdomains
token = self.get_csrftoken(resp)
if not token and self.verbose:
self.print_("%sWarning: No CSRF token found, DNSdumpster might fail%s" % (Y, W))
# Prepare headers and params for POST request
params = {
'targetip': self.domain,
'csrfmiddlewaretoken': token
}
# Make the POST request
post_resp = self.req('POST', self.base_url, params)
if not post_resp:
return self.live_subdomains
# Extract and verify domains
self.extract_domains(post_resp)
# Create a thread pool instead of creating/joining threads one by one
threads = []
for subdomain in self.subdomains:
t = threading.Thread(target=self.check_host, args=(subdomain,))
threads.append(t)
t.start()
# Join all threads with a timeout
for t in threads:
t.join(timeout=10)
return self.live_subdomains
except Exception as e:
if self.verbose:
self.print_("%sError enumerating DNSdumpster: %s%s" % (R, str(e), W))
return self.live_subdomains
def extract_domains(self, resp):
tbl_regex = re.compile('<a name="hostanchor"><\/a>Host Records.*?<table.*?>(.*?)</table>', re.S)
tbl_regex = re.compile('<a name="hostanchor"></a>Host Records.*?<table.*?>(.*?)</table>', re.S)
link_regex = re.compile('<td class="col-md-4">(.*?)<br>', re.S)
links = []
try:
@ -676,36 +735,96 @@ class DNSdumpster(enumratorBaseThreaded):
class Virustotal(enumratorBaseThreaded):
def __init__(self, domain, subdomains=None, q=None, silent=False, verbose=True):
subdomains = subdomains or []
base_url = 'https://www.virustotal.com/ui/domains/{domain}/subdomains'
base_url = 'https://www.virustotal.com/api/v3/domains/{domain}/subdomains'
self.engine_name = "Virustotal"
self.q = q
super(Virustotal, self).__init__(base_url, self.engine_name, domain, subdomains, q=q, silent=silent, verbose=verbose)
self.url = self.base_url.format(domain=self.domain)
# Update headers for API v3
self.headers.update({
'Accept': 'application/json',
'x-apikey': os.getenv('VT_API_KEY', ''), # Get API key from environment variable
'x-tool': 'vt-ui-main',
'Accept-Encoding': 'gzip, deflate',
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.114 Safari/537.36'
})
return
# the main send_req need to be rewritten
def handle_ratelimit(self, resp):
"""Handle rate limiting by implementing exponential backoff"""
if resp.status_code == 429: # Too Many Requests
retry_after = int(resp.headers.get('Retry-After', 60))
if self.verbose:
self.print_("%sRate limited by VirusTotal. Waiting %d seconds...%s" % (Y, retry_after, W))
time.sleep(retry_after)
return True
return False
def send_req(self, url):
try:
resp = self.session.get(url, headers=self.headers, timeout=self.timeout)
except Exception as e:
self.print_(e)
resp = None
max_retries = 3
current_retry = 0
while current_retry < max_retries:
try:
resp = self.session.get(url, headers=self.headers, timeout=self.timeout)
if self.handle_ratelimit(resp):
current_retry += 1
continue
if resp.status_code == 401:
if self.verbose:
self.print_("%sError: Invalid or missing VirusTotal API key. Set VT_API_KEY environment variable.%s" % (R, W))
return None
if resp.status_code == 200:
return resp.text
except Exception as e:
if self.verbose:
self.print_("%sError connecting to VirusTotal: %s%s" % (R, str(e), W))
current_retry += 1
time.sleep(2 ** current_retry) # Exponential backoff
return None
return self.get_response(resp)
# once the send_req is rewritten we don't need to call this function, the stock one should be ok
def enumerate(self):
while self.url != '':
resp = self.send_req(self.url)
resp = json.loads(resp)
if 'error' in resp:
self.print_(R + "[!] Error: Virustotal probably now is blocking our requests" + W)
if not self.headers.get('x-apikey'):
if self.verbose:
self.print_("%sWarning: No VirusTotal API key found. Set VT_API_KEY environment variable.%s" % (Y, W))
return self.subdomains
cursor = ''
while True:
try:
url = self.url
if cursor:
url = f"{self.url}?cursor={cursor}"
resp = self.send_req(url)
if not resp:
break
resp_json = json.loads(resp)
# Extract subdomains from response
if 'data' in resp_json:
for item in resp_json['data']:
subdomain = item.get('id', '')
if subdomain and subdomain not in self.subdomains and subdomain != self.domain:
self.subdomains.append(subdomain.strip())
# Check for more pages
cursor = resp_json.get('meta', {}).get('cursor', '')
if not cursor:
break
except Exception as e:
if self.verbose:
self.print_("%sError processing VirusTotal response: %s%s" % (R, str(e), W))
break
if 'links' in resp and 'next' in resp['links']:
self.url = resp['links']['next']
else:
self.url = ''
self.extract_domains(resp)
return self.subdomains
def extract_domains(self, resp):

7
test_output.txt Normal file
View File

@ -0,0 +1,7 @@
AS207960 Test Intermediate - example.com
www.example.com
dev.example.com
m.example.com
products.example.com
support.example.com
m.testexample.com