diff --git a/core/relay_analyzer.py b/core/relay_analyzer.py index a29753a..da3c274 100644 --- a/core/relay_analyzer.py +++ b/core/relay_analyzer.py @@ -482,8 +482,14 @@ def _generate_description(self, protocol: str, host: str, result=None) -> str: # Check for ADCS-specific description if protocol in ['http', 'https']: - if result and hasattr(result, 'additional_info') and result.additional_info.get('is_adcs'): - return f"ADCS relay to {protocol.upper()} on {host} - Certificate enrollment abuse (ESC8), potential domain compromise" + if result and hasattr(result, 'additional_info'): + info = result.additional_info + if info.get('is_adcs'): + return (f"ADCS relay to {protocol.upper()} on {host} - " + f"Certificate enrollment abuse (ESC8), potential domain compromise") + if info.get('adcs_unconfirmed'): + return (f"Relay to {protocol.upper()} on {host} - " + f"/certsrv/ matched but ADCS unconfirmed (catchall NTLM site)") descriptions = { 'ldap': f"Relay to LDAP on {host} - Can create computer accounts, modify ACLs (RBCD, DACL abuse)", diff --git a/detectors/ghost_spn.py b/detectors/ghost_spn.py index cad9653..3ed220c 100644 --- a/detectors/ghost_spn.py +++ b/detectors/ghost_spn.py @@ -6,6 +6,7 @@ import re import socket +import uuid from concurrent.futures import ThreadPoolExecutor, as_completed from typing import Dict, List, Set, Tuple, Optional @@ -26,6 +27,11 @@ class GhostSPNDetector: # SPN format: serviceclass/hostname[:port][/instancename] SPN_REGEX = re.compile(r'^[^/]+/([^:/]+)(?::\d+)?(?:/.*)?$', re.IGNORECASE) + # GUID hostnames (DSA GUIDs) resolve via _msdcs CNAMEs, not a relay vector. + # GUID service classes (e.g. DRSUAPI) are Kerberos-only. + GUID_REGEX = re.compile(r'^[0-9a-f]{8}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{12}$', + re.IGNORECASE) + # SPNs whose service class is always self-referential - skip them SKIP_SERVICE_CLASSES = {'host', 'rpcss', 'wsman', 'exchangeinternalipaddress'} @@ -104,12 +110,15 @@ def detect(self) -> Dict: if not m: continue - # Skip self-referential service classes service_class = spn.split('/')[0].lower() if service_class in self.SKIP_SERVICE_CLASSES: continue + if self.GUID_REGEX.match(service_class): + continue raw_host = m.group(1).lower() + if self.GUID_REGEX.match(raw_host.split('.')[0]): + continue # Promote short hostname to FQDN if '.' not in raw_host and obj_domain: @@ -128,6 +137,14 @@ def detect(self) -> Dict: result['checked'] = len(hostnames) resolution_map = self._resolve_all(hostnames) + # A wildcard *existing* doesn't mean a given name resolved via it — + # real A records win. Probe to find the wildcard's IPs so we can + # tell catches apart from legit records. + wildcard_ips_by_domain: Dict[str, Set[str]] = {} + if has_wildcard_dns: + parents = {fqdn.split('.', 1)[1] for fqdn in hostnames if '.' in fqdn} + wildcard_ips_by_domain = self._probe_wildcard_targets(parents) + # Classify findings for fqdn, entries in hostname_map.items(): ips = resolution_map.get(fqdn) @@ -141,15 +158,16 @@ def detect(self) -> Dict: 'hostname': fqdn, }) elif has_wildcard_dns: - # Resolves but wildcard DNS exists - might be wildcard catch - for account, spn in entries: - result['probably_vulnerable'].append({ - 'account': account, - 'spn': spn, - 'hostname': fqdn, - 'resolved_to': list(ips), - }) - # else: legitimate DNS record, skip + parent = fqdn.split('.', 1)[1] if '.' in fqdn else '' + wc_ips = wildcard_ips_by_domain.get(parent, set()) + if wc_ips and set(ips).issubset(wc_ips): + for account, spn in entries: + result['probably_vulnerable'].append({ + 'account': account, + 'spn': spn, + 'hostname': fqdn, + 'resolved_to': list(ips), + }) return result @@ -321,6 +339,13 @@ def _on_spn(item): # DNS helpers # ────────────────────────────────────────────────────────────── + def _probe_wildcard_targets(self, domains: Set[str]) -> Dict[str, Set[str]]: + """Resolve a bogus name in each domain to discover the wildcard's + target IPs. Hostnames with real A records resolve to different IPs.""" + probes = {d: f"relayking-wc-{uuid.uuid4().hex[:12]}.{d}" for d in domains} + resolved = self._resolve_all(list(probes.values())) + return {d: set(resolved[name]) for d, name in probes.items() if resolved.get(name)} + def _resolve_all(self, hostnames: List[str]) -> Dict[str, Optional[List[str]]]: """ Resolve all hostnames in parallel. diff --git a/protocols/http_detector.py b/protocols/http_detector.py index 725d3f7..33336d2 100644 --- a/protocols/http_detector.py +++ b/protocols/http_detector.py @@ -102,10 +102,15 @@ def detect(self, host: str, port: int = 80, use_ssl: bool = False) -> ProtocolRe result.additional_info['ntlm_enabled'] = True result.additional_info['ntlm_paths'] = ntlm_paths - # Check for ADCS + # Check for ADCS. Validate against catchall NTLM web apps that + # 401 every URL (which would otherwise match /certsrv/ too). if any('/certsrv' in path.lower() for path in ntlm_paths): - result.additional_info['is_adcs'] = True - result.additional_info['adcs_method'] = 'certsrv endpoint' + scheme = 'https' if use_ssl else 'http' + if self._is_catchall_ntlm(host, port, scheme) is False: + result.additional_info['is_adcs'] = True + result.additional_info['adcs_method'] = 'certsrv endpoint (validated)' + else: + result.additional_info['adcs_unconfirmed'] = True # Check for SCCM if any('ccm_system_windowsauth' in path.lower() or 'sms_mp' in path.lower() for path in ntlm_paths): @@ -335,6 +340,15 @@ def _enumerate_ntlm_paths(self, host: str, port: int, use_ssl: bool) -> list: return ntlm_paths + def _is_catchall_ntlm(self, host: str, port: int, scheme: str): + """Returns True if random bogus paths also 401-NTLM (catchall site, + meaning /certsrv/ is not proof of ADCS), False otherwise, None on error.""" + bogus = ['/relayking-validation-xyzzy/', '/relayking-not-real-9f3c/'] + try: + return all(self._check_path_for_ntlm(host, port, scheme, p) for p in bogus) + except Exception: + return None + def _check_path_for_ntlm(self, host: str, port: int, scheme: str, path: str) -> bool: """Check if a specific path requires NTLM authentication""" try: