From 4edc830d1f7f4b4167207d78306f141246656bad Mon Sep 17 00:00:00 2001 From: Shaheer Yasir <165695923+shaheeryasirofficial@users.noreply.github.com> Date: Thu, 2 Oct 2025 00:04:08 +0500 Subject: [PATCH] Update and rename live.py to takover.py --- live.py | 155 ----------------------------------- takover.py | 236 +++++++++++++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 236 insertions(+), 155 deletions(-) delete mode 100644 live.py create mode 100644 takover.py diff --git a/live.py b/live.py deleted file mode 100644 index 7afccd2..0000000 --- a/live.py +++ /dev/null @@ -1,155 +0,0 @@ -#!/usr/bin/env python3 -# Nmap Integrator for Subdomains - v1.0 -# Integrates Nmap port scanning with subdomain lists (e.g., from Sublist3r). -# Filters live subdomains (DNS + HTTP check), then scans for open ports. -# Usage: python nmap_integrator.py -i subdomains.txt -o nmap_results.xml -# Requires: pip install requests dnspython; nmap installed on system - -import argparse -import sys -import subprocess -import xml.etree.ElementTree as ET -from concurrent.futures import ThreadPoolExecutor, as_completed -import requests -import dns.resolver -import dns.exception -import tempfile -import os - -def is_dns_live(subdomain): - """Check if subdomain resolves to an IP.""" - try: - dns.resolver.resolve(subdomain, 'A') - return True - except (dns.exception.DNSException, Exception): - return False - -def is_http_live(subdomain, timeout=5): - """Check if subdomain responds to HTTP/HTTPS.""" - for protocol in ['http', 'https']: - url = f"{protocol}://{subdomain}" - try: - resp = requests.get(url, timeout=timeout, verify=False, allow_redirects=True) - if resp.status_code > 0: - return True - except requests.RequestException: - continue - return False - -def check_live(subdomain, dns_only=False, timeout=5): - """Full live check: DNS + optional HTTP.""" - if not is_dns_live(subdomain): - return False - if dns_only: - return True - return is_http_live(subdomain, timeout) - -def run_nmap_scan(subdomain, ports='top-1000', output_dir=None, output_format='xml'): - """Run Nmap scan on a subdomain and return results.""" - if output_dir: - output_file = os.path.join(output_dir, f"{subdomain}_nmap.{output_format}") - else: - output_file = f"{subdomain}_nmap.{output_format}" - - cmd = [ - 'nmap', '-sV', '-sC', # Service version + script scan - f'-p{ports}', # Ports to scan - f'--open', # Only show open ports - f'-o{output_format}', output_file, # Output format - subdomain - ] - - try: - result = subprocess.run(cmd, capture_output=True, text=True, timeout=300) # 5-min timeout per host - if result.returncode == 0: - print(f"[SCAN] {subdomain}: Scan complete. Output: {output_file}") - return output_file - else: - print(f"[ERROR] {subdomain}: Nmap failed - {result.stderr}") - return None - except subprocess.TimeoutExpired: - print(f"[TIMEOUT] {subdomain}: Scan timed out") - return None - except FileNotFoundError: - print("[ERROR] Nmap not found. Install Nmap and ensure it's in PATH.", file=sys.stderr) - sys.exit(1) - -def parse_nmap_xml(xml_file): - """Parse Nmap XML for summary (open ports).""" - try: - tree = ET.parse(xml_file) - root = tree.getroot() - host = root.find('host') - if host is None: - return [] - ports = [] - for port in host.findall('.//port[@state="open"]'): - port_id = port.get('portid') - service = port.find('service') - service_name = service.get('name') if service is not None else 'unknown' - ports.append(f"{port_id}/{service_name}") - return ports - except ET.ParseError: - return [] - -def main(): - parser = argparse.ArgumentParser(description="Integrate Nmap port scanning with subdomain lists.") - parser.add_argument('-i', '--input', required=True, help="Input file with subdomains (one per line)") - parser.add_argument('-o', '--output-dir', help="Directory for Nmap output files (default: current dir)") - parser.add_argument('-t', '--threads', type=int, default=10, help="Threads for live check (default: 10); Nmap is sequential") - parser.add_argument('--dns-only', action='store_true', help="Only check DNS (faster, skip HTTP)") - parser.add_argument('--ports', default='top-1000', help="Nmap ports (default: top-1000)") - parser.add_argument('--timeout', type=int, default=5, help="HTTP timeout in seconds (default: 5)") - parser.add_argument('--summary', action='store_true', help="Print summary of open ports after scanning") - args = parser.parse_args() - - # Read subdomains - try: - with open(args.input, 'r') as f: - subdomains = [line.strip() for line in f if line.strip()] - except FileNotFoundError: - print(f"Error: Input file '{args.input}' not found.", file=sys.stderr) - sys.exit(1) - - print(f"[INFO] Filtering {len(subdomains)} subdomains for live hosts...") - - # Filter live subdomains - live_subdomains = [] - with ThreadPoolExecutor(max_workers=args.threads) as executor: - futures = {executor.submit(check_live, sub, args.dns_only, args.timeout): sub for sub in subdomains} - for future in as_completed(futures): - sub = futures[future] - try: - if future.result(): - live_subdomains.append(sub) - print(f"[LIVE] {sub}") - else: - print(f"[DEAD] {sub}") - except Exception as e: - print(f"[ERROR] {sub}: {e}", file=sys.stderr) - - print(f"[INFO] Found {len(live_subdomains)} live subdomains. Starting Nmap scans...") - - # Create output dir if specified - if args.output_dir: - os.makedirs(args.output_dir, exist_ok=True) - - # Run Nmap sequentially (to avoid overwhelming the network; parallelize if needed) - scan_results = {} - for subdomain in live_subdomains: - output_file = run_nmap_scan(subdomain, args.ports, args.output_dir) - if output_file and args.summary: - open_ports = parse_nmap_xml(output_file) - if open_ports: - scan_results[subdomain] = open_ports - print(f"[PORTS] {subdomain}: {', '.join(open_ports)}") - - if args.summary and scan_results: - print("\n[SUMMARY] Open Ports by Host:") - for host, ports in scan_results.items(): - print(f"{host}: {', '.join(ports)}") - - print(f"[COMPLETE] Scanned {len(live_subdomains)} hosts. Check output files for details.") - -if __name__ == "__main__": - main() diff --git a/takover.py b/takover.py new file mode 100644 index 0000000..569f438 --- /dev/null +++ b/takover.py @@ -0,0 +1,236 @@ +#!/usr/bin/env python3 +""" +Subdomain Takeover Extension for Sublist3r v3.0 - Fixed & improved + +Usage examples: + python takeover_extension_fixed.py -i subdomains.txt -o results.txt --delay 0.2 -t 20 -v + cat subdomains.txt | python takeover_extension_fixed.py -o results.txt + +Requirements: + pip install dnspython requests colorama +""" +import argparse +import sys +import time +import requests +import urllib3 +from concurrent.futures import ThreadPoolExecutor, as_completed +import dns.resolver +import dns.exception +import threading + +# Silence TLS warnings (we use verify=False on purpose for dangling domains) +urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning) + +# Console Colors (using colorama for cross-platform) +try: + import colorama + colorama.init(autoreset=True) + G = '\033[92m' # green + Y = '\033[93m' # yellow + B = '\033[94m' # blue + R = '\033[91m' # red + W = '\033[0m' # white +except Exception: + G = Y = B = R = W = '' + +# Lock for thread-safe prints and writes +LOCK = threading.Lock() + +def banner(): + print(f"""{R} + ╔══════════════════════════════════════════════════════════════╗ + ║ Subdomain Takeover Extension for Sublist3r v3.0 ║ + ║ Detects dangling CNAMEs & HTTP fingerprints ║ + ║ Provides evidence snippets and confidence levels ║ + ╚══════════════════════════════════════════════════════════════╝{W}{Y} + """) + +# --- Fingerprints --- +FINGERPRINTS = { + "GitHub Pages": { + "cname_suffix": "github.io", + "keywords": ["There isn't a GitHub Pages site here."], + "nxdomain_required": False + }, + "Heroku": { + "cname_suffix": "herokuapp.com", + "keywords": ["No such app"], + "nxdomain_required": False + }, + "AWS/S3": { + "cname_suffix": "s3.amazonaws.com", + "keywords": ["The specified bucket does not exist"], + "nxdomain_required": False + }, + "Shopify": { + "cname_suffix": "myshopify.com", + "keywords": ["Sorry, this shop is currently unavailable"], + "nxdomain_required": False + }, + # Example keyword-only fingerprint + "Canny": { + "cname_suffix": None, + "keywords": ["Company Not Found", "There is no such company"], + "nxdomain_required": False + } +} + +# --- DNS helpers --- +def resolve_cname(subdomain): + try: + resolver = dns.resolver.Resolver() + answers = resolver.resolve(subdomain, 'CNAME') + return [r.target.to_text().rstrip('.') for r in answers] + except dns.resolver.NXDOMAIN: + return ["NXDOMAIN"] + except (dns.resolver.NoAnswer, dns.exception.DNSException): + return [] + return [] + +def resolve_a(name): + try: + resolver = dns.resolver.Resolver() + answers = resolver.resolve(name, 'A') + return [r.address for r in answers] + except Exception: + return [] + +def is_dangling_cname(target): + if target == "NXDOMAIN": + return True + ips = resolve_a(target) + return len(ips) == 0 + +# --- HTTP fingerprinting --- +def check_http_fingerprint(subdomain, keywords, verbose=False): + for scheme in ("https", "http"): + url = f"{scheme}://{subdomain}/" + try: + resp = requests.get(url, timeout=6, verify=False, allow_redirects=True) + except requests.RequestException as e: + if verbose: + with LOCK: + print(f"{Y}[!] HTTP error {url}: {e}{W}") + continue + body = resp.text or "" + for kw in keywords: + if kw in body: + snippet = body[body.find(kw)-50:body.find(kw)+50].replace("\n", " ") + return True, kw, url, resp.status_code, snippet + return False, None, None, None, None + +# --- Takeover analysis --- +def check_takeover(subdomain, verbose=False): + cnames = resolve_cname(subdomain) + a_records = resolve_a(subdomain) + + # NXDOMAIN handling + if "NXDOMAIN" in cnames and not a_records: + suspects = [] + for svc, fp in FINGERPRINTS.items(): + if fp.get("nxdomain_required"): + suspects.append(svc) + if suspects: + return { + "vulnerable": True, + "service": ", ".join(suspects), + "confidence": "low", + "evidence": {"dns": "NXDOMAIN"}, + "note": "NXDOMAIN detected, manual validation needed." + } + + # Check CNAME-based providers + for cname in cnames: + for svc, fp in FINGERPRINTS.items(): + suffix = fp.get("cname_suffix") + if suffix and cname.lower().endswith(suffix.lower()): + dangling = is_dangling_cname(cname) + matched, kw, url, status, snippet = check_http_fingerprint(subdomain, fp["keywords"], verbose) + if dangling and matched: + return { + "vulnerable": True, "service": svc, "confidence": "high", + "evidence": {"cname": cname, "http_url": url, "status": status, "kw": kw, "snippet": snippet}, + "note": "Dangling CNAME + HTTP fingerprint match" + } + if dangling: + return { + "vulnerable": True, "service": svc, "confidence": "medium", + "evidence": {"cname": cname}, "note": "Dangling CNAME, no HTTP fingerprint" + } + if matched: + return { + "vulnerable": True, "service": svc, "confidence": "medium", + "evidence": {"http_url": url, "status": status, "kw": kw, "snippet": snippet}, + "note": "HTTP fingerprint matched but CNAME resolves" + } + + # Keyword-only providers + for svc, fp in FINGERPRINTS.items(): + if fp["cname_suffix"] is None: + matched, kw, url, status, snippet = check_http_fingerprint(subdomain, fp["keywords"], verbose) + if matched: + return { + "vulnerable": True, "service": svc, "confidence": "high", + "evidence": {"http_url": url, "status": status, "kw": kw, "snippet": snippet}, + "note": "Keyword-only fingerprint matched" + } + + return {"vulnerable": False} + +# --- Processing --- +def process_subdomain(sub, verbose=False, output_file=None): + with LOCK: + print(f"{B}[*] Checking {sub}{W}") + result = check_takeover(sub, verbose) + ts = time.strftime("%Y-%m-%d %H:%M:%S") + if result["vulnerable"]: + line = f"[!] {ts} {sub} VULNERABLE ({result['service']}) | Confidence: {result['confidence']} | Evidence: {result['evidence']} | Note: {result['note']}" + with LOCK: + print(f"{R}{line}{W}") + if output_file: + with open(output_file, "a") as f: + f.write(line + "\n") + else: + line = f"[+] {ts} {sub} not vulnerable" + with LOCK: + print(f"{G}{line}{W}") + if output_file: + with open(output_file, "a") as f: + f.write(line + "\n") + +# --- Main --- +def main(): + parser = argparse.ArgumentParser(description="Subdomain Takeover Extension for Sublist3r") + parser.add_argument("-i", "--input", help="Input file of subdomains") + parser.add_argument("-o", "--output", help="Output file") + parser.add_argument("-d", "--domain", help="Domain (demo mode with test/dev/staging)") + parser.add_argument("--delay", type=float, default=0.0, help="Delay between results") + parser.add_argument("-t", "--threads", type=int, default=10, help="Concurrent threads") + parser.add_argument("-v", "--verbose", action="store_true", help="Verbose debug") + args = parser.parse_args() + + banner() + subs = [] + if args.domain: + subs = [f"test.{args.domain}", f"dev.{args.domain}", f"staging.{args.domain}"] + elif args.input: + with open(args.input) as f: + subs = [x.strip() for x in f if x.strip()] + else: + subs = [x.strip() for x in sys.stdin if x.strip()] + + if not subs: + print(f"{R}[!] No subdomains provided{W}") + sys.exit(1) + + with ThreadPoolExecutor(max_workers=args.threads) as exe: + futures = [exe.submit(process_subdomain, s, args.verbose, args.output) for s in subs] + for f in as_completed(futures): + if args.delay > 0: + time.sleep(args.delay) + + print(f"{G}[+] Scan complete{W}") + +if __name__ == "__main__": + main()