-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathscanner.py
More file actions
127 lines (108 loc) · 4.11 KB
/
scanner.py
File metadata and controls
127 lines (108 loc) · 4.11 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
#!/usr/bin/env python3
import asyncio
import json
import os
import sys
import ssl
DEFAULT_CONCURRENCY = 2000 # Tune for your system
CACHE_DIR = "cache"
os.makedirs(CACHE_DIR, exist_ok=True)
def get_cache_file(target):
safe_name = target.replace(':', '_').replace('/', '_')
return os.path.join(CACHE_DIR, f"{safe_name}.json")
async def _scan_port_inner(target, port):
reader, writer = await asyncio.open_connection(target, port)
writer.close()
await writer.wait_closed()
return port, True
async def scan_port(semaphore, target, port, timeout=2):
async with semaphore:
try:
return await asyncio.wait_for(_scan_port_inner(target, port), timeout)
except (asyncio.TimeoutError, ConnectionRefusedError, OSError):
return port, False
def load_cache(target):
cache_file = get_cache_file(target)
if os.path.exists(cache_file):
try:
with open(cache_file, "r") as f:
data = json.load(f)
return set(data.get("open_ports", [])), set(data.get("scanned_ports", []))
except Exception:
print("Warning: Could not read cache file, starting fresh.")
return set(), set()
def save_cache(target, open_ports, scanned_ports):
cache_file = get_cache_file(target)
data = {
"open_ports": list(open_ports),
"scanned_ports": list(scanned_ports),
}
with open(cache_file, "w") as f:
json.dump(data, f)
async def check_https(target):
ssl_ctx = ssl.create_default_context()
try:
reader, writer = await asyncio.open_connection(target, 443, ssl=ssl_ctx)
writer.close()
await writer.wait_closed()
return True
except Exception:
return False
async def scan_ports(target, ports, concurrency, open_ports_cache, scanned_ports_cache):
semaphore = asyncio.Semaphore(concurrency)
open_ports = set(open_ports_cache)
scanned_ports = set(scanned_ports_cache)
ports_to_scan = [p for p in ports if p not in scanned_ports]
print(f"Scanning {len(ports_to_scan)} port(s) on {target}...")
tasks = [scan_port(semaphore, target, port) for port in ports_to_scan]
for future in asyncio.as_completed(tasks):
port, is_open = await future
scanned_ports.add(port)
if is_open:
open_ports.add(port)
print(f"[OPEN] Port {port}")
save_cache(target, open_ports, scanned_ports)
return open_ports
def parse_ports_input(raw_input):
if not raw_input.strip():
return list(range(1, 65536))
ports = set()
parts = raw_input.split(',')
for part in parts:
if '-' in part:
start, end = map(int, part.split('-'))
ports.update(range(start, end + 1))
else:
ports.add(int(part.strip()))
return sorted(ports)
async def main():
target = input("Enter domain or IP to scan: ").strip()
if not target:
print("Error: No domain or IP entered.")
sys.exit(1)
try:
await asyncio.get_running_loop().getaddrinfo(target, None)
except Exception:
print(f"Error: Cannot resolve domain or IP '{target}'.")
sys.exit(1)
port_input = input("Enter ports to scan (e.g. 5432, 80-100, leave empty for full scan): ").strip()
ports = parse_ports_input(port_input)
concurrency_input = input(f"Enter concurrency (default {DEFAULT_CONCURRENCY}): ").strip()
concurrency = int(concurrency_input) if concurrency_input.isdigit() else DEFAULT_CONCURRENCY
open_ports_cache, scanned_ports_cache = load_cache(target)
open_ports = await scan_ports(target, ports, concurrency, open_ports_cache, scanned_ports_cache)
print("\nScan complete. Open ports:")
if open_ports:
for p in sorted(open_ports):
print(f" - {p}")
if 443 in open_ports:
https_ok = await check_https(target)
print(f"\n[HTTPS] {'Available' if https_ok else 'Not available'} on port 443")
else:
print("No open ports found.")
if __name__ == "__main__":
try:
asyncio.run(main())
except KeyboardInterrupt:
print("\nScan interrupted by user.")
sys.exit(0)