This commit is contained in:
Shaheer Yasir 2025-10-02 10:16:31 +00:00 committed by GitHub
commit ffb23db2ea
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
7 changed files with 115343 additions and 130105 deletions

315
README.md
View File

@ -1,183 +1,198 @@
## About Sublist3r
# Sublist3r ![Python](https://img.shields.io/badge/Python-3.6%2B-blue?logo=python&logoColor=white) [![License: GPL v2](https://img.shields.io/badge/License-GPL%20v2-green.svg)](https://www.gnu.org/licenses/old-licenses/gpl-2.0.en.html) [![Stars](https://img.shields.io/github/stars/aboul3la/Sublist3r?style=social)](https://github.com/aboul3la/Sublist3r/stargazers)
Sublist3r is a python tool designed to enumerate subdomains of websites using OSINT. It helps penetration testers and bug hunters collect and gather subdomains for the domain they are targeting. Sublist3r enumerates subdomains using many search engines such as Google, Yahoo, Bing, Baidu and Ask. Sublist3r also enumerates subdomains using Netcraft, Virustotal, ThreatCrowd, DNSdumpster and ReverseDNS.
> **Sublist3r** is a fast and powerful Python tool designed for OSINT-based subdomain enumeration. It helps penetration testers, bug bounty hunters, and security researchers discover hidden subdomains for targeted domains. Sublist3r leverages multiple search engines (Google, Yahoo, Bing, Baidu, Ask) and passive sources (Netcraft, VirusTotal, ThreatCrowd, DNSdumpster, ReverseDNS, BufferOverRun, CertSpotter) to build comprehensive subdomain lists.
[subbrute](https://github.com/TheRook/subbrute) was integrated with Sublist3r to increase the possibility of finding more subdomains using bruteforce with an improved wordlist. The credit goes to TheRook who is the author of subbrute.
**Enhanced to v3.0 by [Shaheer Yasir](https://github.com/shaheeryasir) (2025):** Full Python 3 support, new passive engines (CertSpotter for Certificate Transparency logs, BufferOverRun for DNS intel), JSON output, improved performance, and VirusTotal API v3 integration.
## Screenshots
## 🚀 Features
- **Multi-Engine Enumeration:** Supports 12+ search engines and passive sources for broad coverage.
- **Brute-Force Integration:** Powered by [SubBrute](https://github.com/TheRook/subbrute) (v1.3) with optimized wordlists.
- **Output Flexibility:** Text or JSON export; verbose real-time results.
- **Port Scanning:** Built-in TCP port checks on discovered subdomains.
- **Modular Design:** Easy to import as a Python library.
- **Cross-Platform:** Works on Linux, macOS, and Windows (with colorama for enhanced output).
- **Rate-Limited & Stealthy:** Configurable threads, sleeps, and proxies to avoid detection.
![Sublist3r](http://www.secgeek.net/images/Sublist3r.png "Sublist3r in action")
## 📦 Installation
1. **Clone the Repository:**
```
git clone https://github.com/aboul3la/Sublist3r.git
cd Sublist3r
```
## Installation
2. **Install Dependencies:**
```
pip install -r requirements.txt
```
(Includes `requests>=2.25.0`, `dnspython>=2.0.0`, `colorama>=0.4.4`)
```
git clone https://github.com/aboul3la/Sublist3r.git
```
3. **Optional: VirusTotal API Key:**
For unlimited scans, set `export VT_API_KEY=your_key_here`.
## Recommended Python Version:
> **Note:** Python 3.6+ required (tested up to 3.12). No Python 2 support.
Sublist3r currently supports **Python 2** and **Python 3**.
## 🔧 Usage
* The recommended version for Python 2 is **2.7.x**
* The recommended version for Python 3 is **3.4.x**
## Dependencies:
Sublist3r depends on the `requests`, `dnspython` and `argparse` python modules.
These dependencies can be installed using the requirements file:
- Installation on Windows:
```
c:\python27\python.exe -m pip install -r requirements.txt
```
- Installation on Linux
```
sudo pip install -r requirements.txt
```
Alternatively, each module can be installed independently as shown below.
#### Requests Module (http://docs.python-requests.org/en/latest/)
- Install for Windows:
```
c:\python27\python.exe -m pip install requests
```
- Install for Ubuntu/Debian:
```
sudo apt-get install python-requests
```
- Install for Centos/Redhat:
```
sudo yum install python-requests
```
- Install using pip on Linux:
```
sudo pip install requests
```
#### dnspython Module (http://www.dnspython.org/)
- Install for Windows:
```
c:\python27\python.exe -m pip install dnspython
```
- Install for Ubuntu/Debian:
```
sudo apt-get install python-dnspython
```
- Install using pip:
```
sudo pip install dnspython
```
#### argparse Module
- Install for Ubuntu/Debian:
```
sudo apt-get install python-argparse
```
- Install for Centos/Redhat:
```
sudo yum install python-argparse
```
- Install using pip:
```
sudo pip install argparse
```
**for coloring in windows install the following libraries**
```
c:\python27\python.exe -m pip install win_unicode_console colorama
```
## Usage
Short Form | Long Form | Description
------------- | ------------- |-------------
-d | --domain | Domain name to enumerate subdomains of
-b | --bruteforce | Enable the subbrute bruteforce module
-p | --ports | Scan the found subdomains against specific tcp ports
-v | --verbose | Enable the verbose mode and display results in realtime
-t | --threads | Number of threads to use for subbrute bruteforce
-e | --engines | Specify a comma-separated list of search engines
-o | --output | Save the results to text file
-h | --help | show the help message and exit
| Short Form | Long Form | Description |
|------------|-----------------|-------------|
| `-d` | `--domain` | Domain name to enumerate subdomains of |
| `-b` | `--bruteforce` | Enable the SubBrute bruteforce module |
| `-p` | `--ports` | Scan found subdomains against specific TCP ports |
| `-v` | `--verbose` | Enable verbose mode and display results in realtime |
| `-t` | `--threads` | Number of threads for SubBrute bruteforce (default: 30) |
| `-e` | `--engines` | Comma-separated list of search engines |
| `-o` | `--output` | Save results to text file |
| `-j` | `--json` | Save results to JSON file |
| `-n` | `--no-color` | Output without color |
| `-h` | `--help` | Show the help message and exit |
### Examples
* To list all the basic options and switches use -h switch:
* **Basic Enumeration:**
```
python sublist3r.py -d example.com
```
```python sublist3r.py -h```
* **With Port Scanning (80, 443):**
```
python sublist3r.py -d example.com -p 80,443
```
* To enumerate subdomains of specific domain:
* **Verbose Real-Time Results:**
```
python sublist3r.py -v -d example.com
```
``python sublist3r.py -d example.com``
* **Enable Bruteforce:**
```
python sublist3r.py -b -d example.com
```
* To enumerate subdomains of specific domain and show only subdomains which have open ports 80 and 443 :
* **Specific Engines (Google, Yahoo, VirusTotal):**
```
python sublist3r.py -e google,yahoo,virustotal -d example.com
```
``python sublist3r.py -d example.com -p 80,443``
* **Full Scan with JSON Output:**
```
python sublist3r.py -d example.com -b -v -j -o output.txt
```
* To enumerate subdomains of specific domain and show the results in realtime:
## 📚 Using Sublist3r as a Module
``python sublist3r.py -v -d example.com``
* To enumerate subdomains and enable the bruteforce module:
``python sublist3r.py -b -d example.com``
* To enumerate subdomains and use specific engines such Google, Yahoo and Virustotal engines
``python sublist3r.py -e google,yahoo,virustotal -d example.com``
## Using Sublist3r as a module in your python scripts
**Example**
Import Sublist3r into your Python scripts for automated workflows.
```python
import sublist3r
subdomains = sublist3r.main(domain, no_threads, savefile, ports, silent, verbose, enable_bruteforce, engines)
```
The main function will return a set of unique subdomains found by Sublist3r
import sublist3r
**Function Usage:**
* **domain**: The domain you want to enumerate subdomains of.
* **savefile**: save the output into text file.
* **ports**: specify a comma-sperated list of the tcp ports to scan.
* **silent**: set sublist3r to work in silent mode during the execution (helpful when you don't need a lot of noise).
* **verbose**: display the found subdomains in real time.
* **enable_bruteforce**: enable the bruteforce module.
* **engines**: (Optional) to choose specific engines.
# Enumerate subdomains
subdomains = sublist3r.main(
domain='yahoo.com',
no_threads=40, # Threads for bruteforce
savefile='yahoo_subdomains.txt', # Output file
ports=None, # Ports to scan
silent=False, # Silent mode
verbose=False, # Real-time output
enable_bruteforce=False, # Enable bruteforce
engines=None # Specific engines
)
Example to enumerate subdomains of Yahoo.com:
```python
import sublist3r
subdomains = sublist3r.main('yahoo.com', 40, 'yahoo_subdomains.txt', ports= None, silent=False, verbose= False, enable_bruteforce= False, engines=None)
print(f"Found {len(subdomains)} subdomains: {subdomains}")
```
## License
**Parameters:**
- `domain`: Target domain.
- `savefile`: Optional output file.
- `ports`: Comma-separated TCP ports.
- `silent`: Suppress noise.
- `verbose`: Real-time display.
- `enable_bruteforce`: Use SubBrute.
- `engines`: Optional comma-separated engines (e.g., 'google,bing').
## 🖼️ Screenshots
![Sublist3r in Action](http://www.secgeek.net/images/Sublist3r.png)
## 🤝 Credits
- **[Ahmed Aboul-Ela](https://twitter.com/aboul3la)**: Original author.
- **[TheRook](https://github.com/TheRook)**: SubBrute bruteforce module.
- **[Bitquark](https://github.com/bitquark)**: SubBrute wordlist based on **dnspop** research.
- **[Shaheer Yasir](https://github.com/shaheeryasir)**: v3.0 enhancements (Python 3, new engines, JSON output, performance).
- **Special Thanks:** [Ibrahim Mosaad](https://twitter.com/ibrahim_mosaad) for foundational contributions.
## 📄 License
Sublist3r is licensed under the [GNU GPL v2](https://www.gnu.org/licenses/old-licenses/gpl-2.0.en.html). See [LICENSE](LICENSE) for details.
## 🙌 Contributing
We welcome contributions! Fork the repo, create a feature branch, and submit a PR. For issues or questions, open a ticket on GitHub.
- Report bugs: [Issues](https://github.com/aboul3la/Sublist3r/issues)
- Suggest features: [Discussions](https://github.com/aboul3la/Sublist3r/discussions)
## 📈 Version
**Current version: 3.0** (October 01, 2025)
---
**Star this repo** if Sublist3r helps your recon workflow! Follow [@aboul3la](https://twitter.com/aboul3la) for updates. Happy hunting! 🔍
Sublist3r is licensed under the GNU GPL license. take a look at the [LICENSE](https://github.com/aboul3la/Sublist3r/blob/master/LICENSE) for more information.
## Credits
* [TheRook](https://github.com/TheRook) - The bruteforce module was based on his script **subbrute**.
* [Bitquark](https://github.com/bitquark) - The Subbrute's wordlist was based on his research **dnspop**.
## Thanks
* Special Thanks to [Ibrahim Mosaad](https://twitter.com/ibrahim_mosaad) for his great contributions that helped in improving the tool.
## Version
**Current version is 1.0**

View File

@ -1,3 +1,3 @@
argparse
dnspython
requests
requests>=2.25.0
dnspython>=2.0.0
colorama>=0.4.4 # Optional, for colored output

View File

@ -2,14 +2,18 @@ from setuptools import setup, find_packages
setup(
name='Sublist3r',
version='1.0',
python_requires='>=2.7',
install_requires=['dnspython', 'requests', 'argparse; python_version==\'2.7\''],
packages=find_packages()+['.'],
version='3.0',
python_requires='>=3.6',
install_requires=[
'dnspython>=2.0.0',
'requests>=2.25.0',
'colorama>=0.4.4' # For cross-platform colored output
],
packages=find_packages() + ['.'],
include_package_data=True,
url='https://github.com/aboul3la/Sublist3r',
license='GPL-2.0',
description='Subdomains enumeration tool for penetration testers',
description='Fast subdomains enumeration tool for penetration testers - Enhanced v3.0 by Shaheer Yasir',
classifiers=[
'Development Status :: 5 - Production/Stable',
'Environment :: Console',
@ -18,16 +22,20 @@ setup(
'Intended Audience :: Telecommunications Industry',
'License :: OSI Approved :: GNU General Public License v2',
'Operating System :: POSIX :: Linux',
'Operating System :: MacOS',
'Operating System :: Microsoft :: Windows',
'Programming Language :: Python',
'Programming Language :: Python :: 2',
'Programming Language :: Python :: 3',
'Programming Language :: Python :: 2.7',
'Programming Language :: Python :: 3.4',
'Programming Language :: Python :: 3.5',
'Programming Language :: Python :: 3.6',
'Programming Language :: Python :: 3.7',
'Programming Language :: Python :: 3.8',
'Programming Language :: Python :: 3.9',
'Programming Language :: Python :: 3.10',
'Programming Language :: Python :: 3.11',
'Programming Language :: Python :: 3.12',
'Topic :: Security',
],
keywords='subdomain dns detection',
keywords='subdomain enumeration, dns detection, security, pentest, reconnaissance',
entry_points={
'console_scripts': [
'sublist3r = sublist3r:interactive',

File diff suppressed because it is too large Load Diff

View File

@ -1,12 +1,13 @@
#!/usr/bin/env python
#!/usr/bin/env python3
#
#SubBrute v1.2
#A (very) fast subdomain enumeration tool.
#
#Maintained by rook
#Contributors:
#JordanMilne, KxCode, rc0r, memoryprint, ppaulojr
# SubBrute v1.3
# A (very) fast subdomain enumeration tool.
#
# Maintained by rook
# Contributors: JordanMilne, KxCode, rc0r, memoryprint, ppaulojr
# Enhanced for 2025: Python 3 only, improved Windows support, bug fixes, better output handling
import re
import optparse
import os
@ -18,176 +19,126 @@ import ctypes
import dns.resolver
import dns.rdatatype
import json
from queue import Queue, Empty # Python 3 import
#Python 2.x and 3.x compatiablity
#We need the Queue library for exception handling
try:
import queue as Queue
except:
import Queue
#The 'multiprocessing' library does not rely upon a Global Interpreter Lock (GIL)
# The 'multiprocessing' library does not rely upon a Global Interpreter Lock (GIL)
import multiprocessing
#Microsoft compatiablity
if sys.platform.startswith('win'):
#Drop-in replacement, subbrute + multiprocessing throws exceptions on windows.
# Microsoft compatibility - Use threading as fallback for Windows multiprocessing issues
if sys.platform.startswith('win'):
import threading
multiprocessing.Process = threading.Thread
class verify_nameservers(multiprocessing.Process):
class VerifyNameservers(multiprocessing.Process):
def __init__(self, target, record_type, resolver_q, resolver_list, wildcards):
multiprocessing.Process.__init__(self, target = self.run)
multiprocessing.Process.__init__(self, target=self.run)
self.daemon = True
signal_init()
self.time_to_die = False
self.resolver_q = resolver_q
self.wildcards = wildcards
#Do we need wildcards for other types of records?
#This needs testing!
self.record_type = "A"
if record_type == "AAAA":
self.record_type = record_type
self.resolver_list = resolver_list
resolver = dns.resolver.Resolver()
#The domain provided by the user.
self.target = target
#1 website in the world, modify the following line when this status changes.
#www.google.cn, I'm looking at you ;)
self.most_popular_website = "www.google.com"
#We shouldn't need the backup_resolver, but we we can use them if need be.
#We must have a resolver, and localhost can work in some environments.
self.backup_resolver = resolver.nameservers + ['127.0.0.1', '8.8.8.8', '8.8.4.4']
#Ideally a nameserver should respond in less than 1 sec.
resolver.timeout = 1
resolver.lifetime = 1
try:
#Lets test the letancy of our connection.
#Google's DNS server should be an ideal time test.
resolver.nameservers = ['8.8.8.8']
resolver.query(self.most_popular_website, self.record_type)
except:
#Our connection is slower than a junebug in molasses
resolver = dns.resolver.Resolver()
self.resolver = resolver
def end(self):
self.time_to_die = True
#This process cannot block forever, it needs to check if its time to die.
def add_nameserver(self, nameserver):
keep_trying = True
while not self.time_to_die and keep_trying:
try:
self.resolver_q.put(nameserver, timeout = 1)
trace("Added nameserver:", nameserver)
self.resolver_q.put(nameserver, timeout=1)
print(f"[DEBUG] Added nameserver: {nameserver}", file=sys.stderr)
keep_trying = False
except Exception as e:
if type(e) == Queue.Full or str(type(e)) == "<class 'queue.Full'>":
if isinstance(e, (Queue.Full, queue.Full)):
keep_trying = True
def verify(self, nameserver_list):
added_resolver = False
for server in nameserver_list:
if self.time_to_die:
#We are done here.
break
server = server.strip()
if server:
self.resolver.nameservers = [server]
try:
#test_result = self.resolver.query(self.most_popular_website, "A")
#should throw an exception before this line.
if True:#test_result:
#Only add the nameserver to the queue if we can detect wildcards.
if(self.find_wildcards(self.target)):# and self.find_wildcards(".com")
#wildcards have been added to the set, it is now safe to be added to the queue.
#blocking queue, this process will halt on put() when the queue is full:
self.add_nameserver(server)
added_resolver = True
else:
trace("Rejected nameserver - wildcard:", server)
if self.find_wildcards(self.target):
self.add_nameserver(server)
added_resolver = True
else:
print(f"[DEBUG] Rejected nameserver - wildcard: {server}", file=sys.stderr)
except Exception as e:
#Rejected server :(
trace("Rejected nameserver - unreliable:", server, type(e))
print(f"[DEBUG] Rejected nameserver - unreliable: {server} {type(e)}", file=sys.stderr)
return added_resolver
def run(self):
#Every user will get a different set of resovlers, this helps redistribute traffic.
random.shuffle(self.resolver_list)
if not self.verify(self.resolver_list):
#This should never happen, inform the user.
sys.stderr.write('Warning: No nameservers found, trying fallback list.\n')
#Try and fix it for the user:
self.verify(self.backup_resolver)
#End of the resolvers list.
try:
self.resolver_q.put(False, timeout = 1)
self.resolver_q.put(False, timeout=1)
except:
pass
#Only add the nameserver to the queue if we can detect wildcards.
#Returns False on error.
def find_wildcards(self, host):
#We want sovle the following three problems:
#1)The target might have a wildcard DNS record.
#2)The target maybe using geolocaiton-aware DNS.
#3)The DNS server we are testing may respond to non-exsistant 'A' records with advertizements.
#I have seen a CloudFlare Enterprise customer with the first two conditions.
try:
#This is case #3, these spam nameservers seem to be more trouble then they are worth.
wildtest = self.resolver.query(uuid.uuid4().hex + ".com", "A")
if len(wildtest):
trace("Spam DNS detected:", host)
wildtest = self.resolver.query(uuid.uuid4().hex + ".com", "A")
if len(wildtest):
print(f"[DEBUG] Spam DNS detected: {host}", file=sys.stderr)
return False
except:
pass
test_counter = 8
looking_for_wildcards = True
while looking_for_wildcards and test_counter >= 0 :
while looking_for_wildcards and test_counter >= 0:
looking_for_wildcards = False
#Don't get lost, this nameserver could be playing tricks.
test_counter -= 1
test_counter -= 1
try:
testdomain = "%s.%s" % (uuid.uuid4().hex, host)
testdomain = f"{uuid.uuid4().hex}.{host}"
wildtest = self.resolver.query(testdomain, self.record_type)
#This 'A' record may contain a list of wildcards.
if wildtest:
for w in wildtest:
w = str(w)
if w not in self.wildcards:
#wildcards were detected.
self.wildcards[w] = None
#We found atleast one wildcard, look for more.
looking_for_wildcards = True
except Exception as e:
if type(e) == dns.resolver.NXDOMAIN or type(e) == dns.name.EmptyLabel:
#not found
if isinstance(e, (dns.resolver.NXDOMAIN, dns.name.EmptyLabel)):
return True
else:
#This resolver maybe flakey, we don't want it for our tests.
trace("wildcard exception:", self.resolver.nameservers, type(e))
return False
#If we hit the end of our depth counter and,
#there are still wildcards, then reject this nameserver because it smells bad.
print(f"[DEBUG] wildcard exception: {self.resolver.nameservers} {type(e)}", file=sys.stderr)
return False
return (test_counter >= 0)
class lookup(multiprocessing.Process):
class Lookup(multiprocessing.Process):
def __init__(self, in_q, out_q, resolver_q, domain, wildcards, spider_blacklist):
multiprocessing.Process.__init__(self, target = self.run)
multiprocessing.Process.__init__(self, target=self.run)
signal_init()
self.required_nameservers = 16
self.in_q = in_q
self.out_q = out_q
self.resolver_q = resolver_q
self.resolver_q = resolver_q
self.domain = domain
self.wildcards = wildcards
self.spider_blacklist = spider_blacklist
self.resolver = dns.resolver.Resolver()
#Force pydns to use our nameservers
self.resolver.nameservers = []
def get_ns(self):
@ -195,46 +146,37 @@ class lookup(multiprocessing.Process):
try:
ret = [self.resolver_q.get_nowait()]
if ret == False:
#Queue is empty, inform the rest.
self.resolver_q.put(False)
ret = []
except:
pass
return ret
pass
return ret
def get_ns_blocking(self):
ret = []
ret = [self.resolver_q.get()]
if ret == False:
trace("get_ns_blocking - Resolver list is empty.")
#Queue is empty, inform the rest.
print("[DEBUG] get_ns_blocking - Resolver list is empty.", file=sys.stderr)
self.resolver_q.put(False)
ret = []
return ret
def check(self, host, record_type = "A", retries = 0):
trace("Checking:", host)
def check(self, host, record_type="A", retries=0):
print(f"[DEBUG] Checking: {host}", file=sys.stderr)
cname_record = []
retries = 0
if len(self.resolver.nameservers) <= self.required_nameservers:
#This process needs more nameservers, lets see if we have one avaible
self.resolver.nameservers += self.get_ns()
#Ok we should be good to go.
while True:
try:
#Query the nameserver, this is not simple...
if not record_type or record_type == "A":
resp = self.resolver.query(host)
#Crawl the response
hosts = extract_hosts(str(resp.response), self.domain)
for h in hosts:
if h not in self.spider_blacklist:
self.spider_blacklist[h]=None
trace("Found host with spider:", h)
self.spider_blacklist[h] = None
print(f"[DEBUG] Found host with spider: {h}", file=sys.stderr)
self.in_q.put((h, record_type, 0))
return resp
if record_type == "CNAME":
#A max 20 lookups
for x in range(20):
try:
resp = self.resolver.query(host, record_type)
@ -245,135 +187,97 @@ class lookup(multiprocessing.Process):
host = str(resp[0]).rstrip(".")
cname_record.append(host)
else:
return cname_record
return cname_record
else:
#All other records:
return self.resolver.query(host, record_type)
except Exception as e:
if type(e) == dns.resolver.NoNameservers:
#We should never be here.
#We must block, another process should try this host.
#do we need a limit?
if isinstance(e, dns.resolver.NoNameservers):
self.in_q.put((host, record_type, 0))
self.resolver.nameservers += self.get_ns_blocking()
return False
elif type(e) == dns.resolver.NXDOMAIN:
#"Non-existent domain name."
elif isinstance(e, dns.resolver.NXDOMAIN):
return False
elif type(e) == dns.resolver.NoAnswer:
#"The response did not contain an answer."
elif isinstance(e, dns.resolver.NoAnswer):
if retries >= 1:
trace("NoAnswer retry")
print("[DEBUG] NoAnswer retry", file=sys.stderr)
return False
retries += 1
elif type(e) == dns.resolver.Timeout:
trace("lookup failure:", host, retries)
#Check if it is time to give up.
elif isinstance(e, dns.resolver.Timeout):
print(f"[DEBUG] lookup failure: {host} {retries}", file=sys.stderr)
if retries >= 3:
if retries > 3:
#Sometimes 'internal use' subdomains will timeout for every request.
#As far as I'm concerned, the authorative name server has told us this domain exists,
#we just can't know the address value using this method.
return ['Mutiple Query Timeout - External address resolution was restricted']
return ['Multiple Query Timeout - External address resolution was restricted']
else:
#Maybe another process can take a crack at it.
self.in_q.put((host, record_type, retries + 1))
return False
retries += 1
#retry...
elif type(e) == IndexError:
#Some old versions of dnspython throw this error,
#doesn't seem to affect the results, and it was fixed in later versions.
elif isinstance(e, IndexError):
pass
elif type(e) == TypeError:
# We'll get here if the number procs > number of resolvers.
# This is an internal error do we need a limit?
elif isinstance(e, TypeError):
self.in_q.put((host, record_type, 0))
return False
elif type(e) == dns.rdatatype.UnknownRdatatype:
elif isinstance(e, dns.rdatatype.UnknownRdatatype):
error("DNS record type not supported:", record_type)
else:
trace("Problem processing host:", host)
#dnspython threw some strange exception...
print(f"[DEBUG] Problem processing host: {host}", file=sys.stderr)
raise e
def run(self):
#This process needs one resolver before it can start looking.
self.resolver.nameservers += self.get_ns_blocking()
while True:
found_addresses = []
work = self.in_q.get()
#Check if we have hit the end marker
while not work:
#Look for a re-queued lookup
try:
work = self.in_q.get(blocking = False)
#if we took the end marker of the queue we need to put it back
work = self.in_q.get(block=False)
if work:
self.in_q.put(False)
except:#Queue.Empty
trace('End of work queue')
#There isn't an item behind the end marker
except Empty:
print('[DEBUG] End of work queue', file=sys.stderr)
work = False
break
#Is this the end all work that needs to be done?
if not work:
#Perpetuate the end marker for all threads to see
self.in_q.put(False)
#Notify the parent that we have died of natural causes
self.out_q.put(False)
break
else:
if len(work) == 3:
#keep track of how many times this lookup has timedout.
(hostname, record_type, timeout_retries) = work
response = self.check(hostname, record_type, timeout_retries)
else:
(hostname, record_type) = work
response = self.check(hostname, record_type)
response = self.check(hostname, record_type)
sys.stdout.flush()
trace(response)
#self.wildcards is populated by the verify_nameservers() thread.
#This variable doesn't need a muetex, because it has a queue.
#A queue ensure nameserver cannot be used before it's wildcard entries are found.
print(f"[DEBUG] {response}", file=sys.stderr)
reject = False
if response:
for a in response:
a = str(a)
if a in self.wildcards:
trace("resovled wildcard:", hostname)
reject= True
#reject this domain.
break;
print(f"[DEBUG] resolved wildcard: {hostname}", file=sys.stderr)
reject = True
break
else:
found_addresses.append(a)
if not reject:
#This request is filled, send the results back
result = (hostname, record_type, found_addresses)
self.out_q.put(result)
#Extract relevant hosts
#The dot at the end of a domain signifies the root,
#and all TLDs are subs of the root.
# Extract relevant hosts
host_match = re.compile(r"((?<=[\s])[a-zA-Z0-9_-]+\.(?:[a-zA-Z0-9_-]+\.?)+(?=[\s]))")
def extract_hosts(data, hostname):
#made a global to avoid re-compilation
global host_match
ret = []
hosts = re.findall(host_match, data)
for fh in hosts:
host = fh.rstrip(".")
#Is this host in scope?
if host.endswith(hostname):
ret.append(host)
return ret
#Return a list of unique sub domains, sorted by frequency.
#Only match domains that have 3 or more sections subdomain.domain.tld
# Return a list of unique subdomains, sorted by frequency
domain_match = re.compile("([a-zA-Z0-9_-]*\.[a-zA-Z0-9_-]*\.[a-zA-Z0-9_-]*)+")
def extract_subdomains(file_name):
#Avoid re-compilation
global domain_match
subs = {}
sub_file = open(file_name).read()
@ -382,50 +286,49 @@ def extract_subdomains(file_name):
for i in f_all:
if i.find(".") >= 0:
p = i.split(".")[0:-1]
#gobble everything that might be a TLD
while p and len(p[-1]) <= 3:
p = p[0:-1]
#remove the domain name
p = p[0:-1]
#do we have a subdomain.domain left?
if len(p) >= 1:
trace(str(p), " : ", i)
print(f"[DEBUG] {str(p)} : {i}", file=sys.stderr)
for q in p:
if q :
#domain names can only be lower case.
if q:
q = q.lower()
if q in subs:
subs[q] += 1
else:
subs[q] = 1
#Free some memory before the sort...
del f_all
#Sort by freq in desc order
subs_sorted = sorted(subs.keys(), key = lambda x: subs[x], reverse = True)
subs_sorted = sorted(subs.keys(), key=lambda x: subs[x], reverse=True)
return subs_sorted
def print_target(target, record_type = None, subdomains = "names.txt", resolve_list = "resolvers.txt", process_count = 16, output = False, json_output = False, found_subdomains=[],verbose=False):
def print_target(target, record_type=None, subdomains="names.txt", resolve_list="resolvers.txt", process_count=16, output=False, json_output=False, found_subdomains=[], verbose=False):
subdomains_list = []
results_temp = []
run(target, record_type, subdomains, resolve_list, process_count)
# Fixed: Call run only once
for result in run(target, record_type, subdomains, resolve_list, process_count):
(hostname, record_type, response) = result
if not record_type:
result = hostname
result_str = hostname
else:
result = "%s,%s" % (hostname, ",".join(response).strip(","))
if result not in found_subdomains:
result_str = f"{hostname},{','.join(response).strip(',')}"
if result_str not in found_subdomains:
if verbose:
print(result)
subdomains_list.append(result)
print(result_str)
subdomains_list.append(result_str)
# Handle output files
if output:
output.write(f"{result_str}\n")
output.flush()
if json_output:
json_output.write(json.dumps({"hostname": hostname, "record_type": record_type, "addresses": response}) + "\n")
json_output.flush()
return set(subdomains_list)
return set(subdomains_list)
def run(target, record_type = None, subdomains = "names.txt", resolve_list = "resolvers.txt", process_count = 16):
def run(target, record_type=None, subdomains="names.txt", resolve_list="resolvers.txt", process_count=16):
subdomains = check_open(subdomains)
resolve_list = check_open(resolve_list)
if (len(resolve_list) / 16) < process_count:
sys.stderr.write('Warning: Fewer than 16 resovlers per thread, consider adding more nameservers to resolvers.txt.\n')
sys.stderr.write('Warning: Fewer than 16 resolvers per thread, consider adding more nameservers to resolvers.txt.\n')
if os.name == 'nt':
wildcards = {}
spider_blacklist = {}
@ -434,70 +337,52 @@ def run(target, record_type = None, subdomains = "names.txt", resolve_list = "re
spider_blacklist = multiprocessing.Manager().dict()
in_q = multiprocessing.Queue()
out_q = multiprocessing.Queue()
#have a buffer of at most two new nameservers that lookup processes can draw from.
resolve_q = multiprocessing.Queue(maxsize = 2)
resolve_q = multiprocessing.Queue(maxsize=2)
#Make a source of fast nameservers avaiable for other processes.
verify_nameservers_proc = verify_nameservers(target, record_type, resolve_q, resolve_list, wildcards)
verify_nameservers_proc = VerifyNameservers(target, record_type, resolve_q, resolve_list, wildcards)
verify_nameservers_proc.start()
#The empty string
in_q.put((target, record_type))
spider_blacklist[target]=None
#A list of subdomains is the input
spider_blacklist[target] = None
for s in subdomains:
s = str(s).strip()
if s:
if s.find(","):
#SubBrute should be forgiving, a comma will never be in a url
#but the user might try an use a CSV file as input.
s=s.split(",")[0]
s = s.split(",")[0]
if not s.endswith(target):
hostname = "%s.%s" % (s, target)
hostname = f"{s}.{target}"
else:
#A user might feed an output list as a subdomain list.
hostname = s
if hostname not in spider_blacklist:
spider_blacklist[hostname]=None
spider_blacklist[hostname] = None
work = (hostname, record_type)
in_q.put(work)
#Terminate the queue
in_q.put(False)
for i in range(process_count):
worker = lookup(in_q, out_q, resolve_q, target, wildcards, spider_blacklist)
worker = Lookup(in_q, out_q, resolve_q, target, wildcards, spider_blacklist)
worker.start()
threads_remaining = process_count
while True:
try:
#The output is valid hostnames
result = out_q.get(True, 10)
#we will get an empty exception before this runs.
if not result:
threads_remaining -= 1
else:
#run() is a generator, and yields results from the work queue
yield result
except Exception as e:
#The cx_freeze version uses queue.Empty instead of Queue.Empty :(
if type(e) == Queue.Empty or str(type(e)) == "<class 'queue.Empty'>":
if isinstance(e, Empty) or str(type(e)) == "<class 'queue.Empty'>":
pass
else:
raise(e)
#make sure everyone is complete
raise e
if threads_remaining <= 0:
break
trace("killing nameserver process")
#We no longer require name servers.
print("[DEBUG] killing nameserver process", file=sys.stderr)
try:
killproc(pid = verify_nameservers_proc.pid)
killproc(pid=verify_nameservers_proc.pid)
except:
#Windows threading.tread
verify_nameservers_proc.end()
trace("End")
print("[DEBUG] End", file=sys.stderr)
#exit handler for signals. So ctrl+c will work.
#The 'multiprocessing' library each process is it's own process which side-steps the GIL
#If the user wants to exit prematurely, each process must be killed.
def killproc(signum = 0, frame = 0, pid = False):
def killproc(signum=0, frame=0, pid=False):
if not pid:
pid = os.getpid()
if sys.platform.startswith('win'):
@ -506,12 +391,10 @@ def killproc(signum = 0, frame = 0, pid = False):
handle = kernel32.OpenProcess(1, 0, pid)
kernel32.TerminateProcess(handle, 0)
except:
#Oah windows.
pass
else:
os.kill(pid, 9)
#Toggle debug output
verbose = False
def trace(*args, **kwargs):
if verbose:
@ -529,7 +412,6 @@ def error(*args, **kwargs):
def check_open(input_file):
ret = []
#If we can't find a resolver from an input file, then we need to improvise.
try:
ret = open(input_file).readlines()
except:
@ -538,98 +420,78 @@ def check_open(input_file):
error("File is empty:", input_file)
return ret
#Every 'multiprocessing' process needs a signal handler.
#All processes need to die, we don't want to leave zombies.
def signal_init():
#Escliate signal to prevent zombies.
signal.signal(signal.SIGINT, killproc)
try:
signal.signal(signal.SIGTSTP, killproc)
signal.signal(signal.SIGQUIT, killproc)
except:
#Windows
pass
pass # Windows
if __name__ == "__main__":
if getattr(sys, 'frozen', False):
# cx_freeze windows:
base_path = os.path.dirname(sys.executable)
multiprocessing.freeze_support()
else:
#everything else:
base_path = os.path.dirname(os.path.realpath(__file__))
parser = optparse.OptionParser("usage: %prog [options] target")
parser.add_option("-s", "--subs", dest = "subs", default = os.path.join(base_path, "names.txt"),
type = "string", help = "(optional) list of subdomains, default = 'names.txt'")
parser.add_option("-r", "--resolvers", dest = "resolvers", default = os.path.join(base_path, "resolvers.txt"),
type = "string", help = "(optional) A list of DNS resolvers, if this list is empty it will OS's internal resolver default = 'resolvers.txt'")
parser.add_option("-t", "--targets_file", dest = "targets", default = "",
type = "string", help = "(optional) A file containing a newline delimited list of domains to brute force.")
parser.add_option("-o", "--output", dest = "output", default = False, help = "(optional) Output to file (Greppable Format)")
parser.add_option("-j", "--json", dest="json", default = False, help="(optional) Output to file (JSON Format)")
parser.add_option("-a", "-A", action = 'store_true', dest = "ipv4", default = False,
help = "(optional) Print all IPv4 addresses for sub domains (default = off).")
parser.add_option("--type", dest = "type", default = False,
type = "string", help = "(optional) Print all reponses for an arbitrary DNS record type (CNAME, AAAA, TXT, SOA, MX...)")
parser.add_option("-c", "--process_count", dest = "process_count",
default = 16, type = "int",
help = "(optional) Number of lookup theads to run. default = 16")
parser.add_option("-f", "--filter_subs", dest = "filter", default = "",
type = "string", help = "(optional) A file containing unorganized domain names which will be filtered into a list of subdomains sorted by frequency. This was used to build names.txt.")
parser.add_option("-v", "--verbose", action = 'store_true', dest = "verbose", default = False,
help = "(optional) Print debug information.")
parser.add_option("-s", "--subs", dest="subs", default=os.path.join(base_path, "names.txt"),
type="string", help="(optional) list of subdomains, default = 'names.txt'")
parser.add_option("-r", "--resolvers", dest="resolvers", default=os.path.join(base_path, "resolvers.txt"),
type="string", help="(optional) A list of DNS resolvers, default = 'resolvers.txt'")
parser.add_option("-t", "--targets_file", dest="targets", default="",
type="string", help="(optional) A file containing a newline delimited list of domains to brute force.")
parser.add_option("-o", "--output", dest="output", default=False, help="(optional) Output to file (Greppable Format)")
parser.add_option("-j", "--json", dest="json", default=False, help="(optional) Output to file (JSON Format)")
parser.add_option("-a", "-A", action='store_true', dest="ipv4", default=False,
help="(optional) Print all IPv4 addresses for subdomains (default = off).")
parser.add_option("--type", dest="type", default=False,
type="string", help="(optional) Print all responses for an arbitrary DNS record type (CNAME, AAAA, TXT, SOA, MX...)")
parser.add_option("-c", "--process_count", dest="process_count",
default=16, type="int",
help="(optional) Number of lookup threads to run. default = 16")
parser.add_option("-f", "--filter_subs", dest="filter", default="",
type="string", help="(optional) A file containing unorganized domain names which will be filtered into a list of subdomains sorted by frequency.")
parser.add_option("-v", "--verbose", action='store_true', dest="verbose", default=False,
help="(optional) Print debug information.")
(options, args) = parser.parse_args()
verbose = options.verbose
if len(args) < 1 and options.filter == "" and options.targets == "":
parser.error("You must provie a target. Use -h for help.")
parser.error("You must provide a target. Use -h for help.")
if options.filter != "":
#cleanup this file and print it out
for d in extract_subdomains(options.filter):
print(d)
sys.exit()
if options.targets != "":
targets = check_open(options.targets) #the domains
targets = check_open(options.targets)
else:
targets = args #multiple arguments on the cli: ./subbrute.py google.com gmail.com yahoo.com if (len(resolver_list) / 16) < options.process_count:
targets = args
output = False
output_file = False
if options.output:
try:
output = open(options.output, "w")
output_file = open(options.output, "w")
except:
error("Failed writing to file:", options.output)
json_output = False
json_file = False
if options.json:
try:
json_output = open(options.json, "w")
json_file = open(options.json, "w")
except:
error("Failed writing to file:", options.json)
record_type = False
if options.ipv4:
record_type="A"
record_type = "A"
if options.type:
record_type = str(options.type).upper()
threads = []
for target in targets:
target = target.strip()
if target:
#target => domain
#record_type =>
#options.subs => file the contain the subdomains list
#options.process_count => process count default = 16
#options.resolvers => the resolvers file
#options.output
#options.json
print(target, record_type, options.subs, options.resolvers, options.process_count, output, json_output)
print_target(target, record_type, options.subs, options.resolvers, options.process_count, output, json_output)
print_target(target, record_type, options.subs, options.resolvers, options.process_count, output_file, json_file, verbose=verbose)

File diff suppressed because it is too large Load Diff

236
takeover.py Normal file
View File

@ -0,0 +1,236 @@
#!/usr/bin/env python3
"""
Subdomain Takeover Extension for Sublist3r v3.0 - Fixed & improved
Usage examples:
python takeover_extension_fixed.py -i subdomains.txt -o results.txt --delay 0.2 -t 20 -v
cat subdomains.txt | python takeover_extension_fixed.py -o results.txt
Requirements:
pip install dnspython requests colorama
"""
import argparse
import sys
import time
import requests
import urllib3
from concurrent.futures import ThreadPoolExecutor, as_completed
import dns.resolver
import dns.exception
import threading
# Silence TLS warnings (we use verify=False on purpose for dangling domains)
urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)
# Console Colors (using colorama for cross-platform)
try:
import colorama
colorama.init(autoreset=True)
G = '\033[92m' # green
Y = '\033[93m' # yellow
B = '\033[94m' # blue
R = '\033[91m' # red
W = '\033[0m' # white
except Exception:
G = Y = B = R = W = ''
# Lock for thread-safe prints and writes
LOCK = threading.Lock()
def banner():
print(f"""{R}
Subdomain Takeover Extension for Sublist3r v3.0
Detects dangling CNAMEs & HTTP fingerprints
Provides evidence snippets and confidence levels
{W}{Y}
""")
# --- Fingerprints ---
FINGERPRINTS = {
"GitHub Pages": {
"cname_suffix": "github.io",
"keywords": ["There isn't a GitHub Pages site here."],
"nxdomain_required": False
},
"Heroku": {
"cname_suffix": "herokuapp.com",
"keywords": ["No such app"],
"nxdomain_required": False
},
"AWS/S3": {
"cname_suffix": "s3.amazonaws.com",
"keywords": ["The specified bucket does not exist"],
"nxdomain_required": False
},
"Shopify": {
"cname_suffix": "myshopify.com",
"keywords": ["Sorry, this shop is currently unavailable"],
"nxdomain_required": False
},
# Example keyword-only fingerprint
"Canny": {
"cname_suffix": None,
"keywords": ["Company Not Found", "There is no such company"],
"nxdomain_required": False
}
}
# --- DNS helpers ---
def resolve_cname(subdomain):
try:
resolver = dns.resolver.Resolver()
answers = resolver.resolve(subdomain, 'CNAME')
return [r.target.to_text().rstrip('.') for r in answers]
except dns.resolver.NXDOMAIN:
return ["NXDOMAIN"]
except (dns.resolver.NoAnswer, dns.exception.DNSException):
return []
return []
def resolve_a(name):
try:
resolver = dns.resolver.Resolver()
answers = resolver.resolve(name, 'A')
return [r.address for r in answers]
except Exception:
return []
def is_dangling_cname(target):
if target == "NXDOMAIN":
return True
ips = resolve_a(target)
return len(ips) == 0
# --- HTTP fingerprinting ---
def check_http_fingerprint(subdomain, keywords, verbose=False):
for scheme in ("https", "http"):
url = f"{scheme}://{subdomain}/"
try:
resp = requests.get(url, timeout=6, verify=False, allow_redirects=True)
except requests.RequestException as e:
if verbose:
with LOCK:
print(f"{Y}[!] HTTP error {url}: {e}{W}")
continue
body = resp.text or ""
for kw in keywords:
if kw in body:
snippet = body[body.find(kw)-50:body.find(kw)+50].replace("\n", " ")
return True, kw, url, resp.status_code, snippet
return False, None, None, None, None
# --- Takeover analysis ---
def check_takeover(subdomain, verbose=False):
cnames = resolve_cname(subdomain)
a_records = resolve_a(subdomain)
# NXDOMAIN handling
if "NXDOMAIN" in cnames and not a_records:
suspects = []
for svc, fp in FINGERPRINTS.items():
if fp.get("nxdomain_required"):
suspects.append(svc)
if suspects:
return {
"vulnerable": True,
"service": ", ".join(suspects),
"confidence": "low",
"evidence": {"dns": "NXDOMAIN"},
"note": "NXDOMAIN detected, manual validation needed."
}
# Check CNAME-based providers
for cname in cnames:
for svc, fp in FINGERPRINTS.items():
suffix = fp.get("cname_suffix")
if suffix and cname.lower().endswith(suffix.lower()):
dangling = is_dangling_cname(cname)
matched, kw, url, status, snippet = check_http_fingerprint(subdomain, fp["keywords"], verbose)
if dangling and matched:
return {
"vulnerable": True, "service": svc, "confidence": "high",
"evidence": {"cname": cname, "http_url": url, "status": status, "kw": kw, "snippet": snippet},
"note": "Dangling CNAME + HTTP fingerprint match"
}
if dangling:
return {
"vulnerable": True, "service": svc, "confidence": "medium",
"evidence": {"cname": cname}, "note": "Dangling CNAME, no HTTP fingerprint"
}
if matched:
return {
"vulnerable": True, "service": svc, "confidence": "medium",
"evidence": {"http_url": url, "status": status, "kw": kw, "snippet": snippet},
"note": "HTTP fingerprint matched but CNAME resolves"
}
# Keyword-only providers
for svc, fp in FINGERPRINTS.items():
if fp["cname_suffix"] is None:
matched, kw, url, status, snippet = check_http_fingerprint(subdomain, fp["keywords"], verbose)
if matched:
return {
"vulnerable": True, "service": svc, "confidence": "high",
"evidence": {"http_url": url, "status": status, "kw": kw, "snippet": snippet},
"note": "Keyword-only fingerprint matched"
}
return {"vulnerable": False}
# --- Processing ---
def process_subdomain(sub, verbose=False, output_file=None):
with LOCK:
print(f"{B}[*] Checking {sub}{W}")
result = check_takeover(sub, verbose)
ts = time.strftime("%Y-%m-%d %H:%M:%S")
if result["vulnerable"]:
line = f"[!] {ts} {sub} VULNERABLE ({result['service']}) | Confidence: {result['confidence']} | Evidence: {result['evidence']} | Note: {result['note']}"
with LOCK:
print(f"{R}{line}{W}")
if output_file:
with open(output_file, "a") as f:
f.write(line + "\n")
else:
line = f"[+] {ts} {sub} not vulnerable"
with LOCK:
print(f"{G}{line}{W}")
if output_file:
with open(output_file, "a") as f:
f.write(line + "\n")
# --- Main ---
def main():
parser = argparse.ArgumentParser(description="Subdomain Takeover Extension for Sublist3r")
parser.add_argument("-i", "--input", help="Input file of subdomains")
parser.add_argument("-o", "--output", help="Output file")
parser.add_argument("-d", "--domain", help="Domain (demo mode with test/dev/staging)")
parser.add_argument("--delay", type=float, default=0.0, help="Delay between results")
parser.add_argument("-t", "--threads", type=int, default=10, help="Concurrent threads")
parser.add_argument("-v", "--verbose", action="store_true", help="Verbose debug")
args = parser.parse_args()
banner()
subs = []
if args.domain:
subs = [f"test.{args.domain}", f"dev.{args.domain}", f"staging.{args.domain}"]
elif args.input:
with open(args.input) as f:
subs = [x.strip() for x in f if x.strip()]
else:
subs = [x.strip() for x in sys.stdin if x.strip()]
if not subs:
print(f"{R}[!] No subdomains provided{W}")
sys.exit(1)
with ThreadPoolExecutor(max_workers=args.threads) as exe:
futures = [exe.submit(process_subdomain, s, args.verbose, args.output) for s in subs]
for f in as_completed(futures):
if args.delay > 0:
time.sleep(args.delay)
print(f"{G}[+] Scan complete{W}")
if __name__ == "__main__":
main()