From 731291690b0f0450d3317b4a3825226ab699d8e7 Mon Sep 17 00:00:00 2001 From: "Remi GASCOU (Podalirius)" <79218792+p0dalirius@users.noreply.github.com> Date: Sat, 18 Apr 2026 09:43:42 +0200 Subject: [PATCH] Add offline ldap2json import (#11) Add --ldap2json FILE flag that loads an ldap2json JSON export, flattens the hierarchical tree back into {dn: {attr: value, ...}}, and serves queries from memory via a new OfflineLDAPSearcher. Includes a small recursive-descent parser for the common subset of RFC 4515 filters (equality, presence, substring wildcards, AND, OR, NOT) so the existing query/presetquery/export commands work offline. --- ldapconsole.py | 314 +++++++++++++++++++++++++++++++++++++++++++------ 1 file changed, 281 insertions(+), 33 deletions(-) diff --git a/ldapconsole.py b/ldapconsole.py index 1f60e09..a4f3c07 100755 --- a/ldapconsole.py +++ b/ldapconsole.py @@ -5,6 +5,7 @@ # Date created : 29 Jul 2021 import argparse +import json from ldap3.protocol.formatters.formatters import format_sid import ldap3 import os @@ -145,6 +146,239 @@ def dict_path_access(d, path): return d +### Offline ldap2json import + +def flatten_ldap2json(tree, rdn_path=None): + """ + Flatten the hierarchical ldap2json format into a {dn: {attr: value, ...}} + dictionary. In an ldap2json file, each nesting level is keyed by an RDN + component (CN=..., OU=..., DC=...), so a child key is distinguished from + an attribute key by containing '='. Leaves (and any node with attribute + keys) contribute one entry to the output, with the DN reconstructed from + the accumulated RDN path in reverse order (the ldap2json writer reverses + the DN components on write, so we reverse them back here). + """ + if rdn_path is None: + rdn_path = [] + out = {} + attrs = {} + for key, value in tree.items(): + if isinstance(key, str) and "=" in key and isinstance(value, dict): + out.update(flatten_ldap2json(value, rdn_path + [key])) + else: + attrs[key] = value + if attrs and rdn_path: + dn = ",".join(reversed(rdn_path)) + out[dn] = attrs + return out + + +def load_ldap2json_file(path): + """ + Load and flatten an ldap2json .json export. Returns (flat_results, naming_contexts). + A naming context is any DN in the dataset that has no ancestor DN also + present in the dataset. + """ + with open(path, "r", encoding="utf-8") as fh: + tree = json.load(fh) + flat = flatten_ldap2json(tree) + dns = list(flat.keys()) + dns_lower = {d: d.lower() for d in dns} + naming_contexts = [] + for dn in dns: + dn_l = dns_lower[dn] + has_ancestor = False + for other, other_l in dns_lower.items(): + if other_l == dn_l: + continue + if dn_l.endswith("," + other_l): + has_ancestor = True + break + if not has_ancestor: + naming_contexts.append(dn) + return flat, naming_contexts + + +def parse_ldap_filter(filter_str): + """ + Parse a subset of RFC 4515 LDAP filter syntax and return a callable + matcher(attrs) -> bool. Supported forms: + + - Equality: (attr=value) + - Presence: (attr=*) + - Substring: (attr=prefix*), (attr=*suffix), (attr=*infix*) + - Conjunction: (&(...)(...)...) + - Disjunction: (|(...)(...)...) + - Negation: (!(...)) + + Comparisons are case-insensitive on both attribute names and string + values, which matches Active Directory's default matching rules for + most attributes. Extensible match rules (e.g. LDAP_MATCHING_RULE_BIT_AND) + and approximate match (~=) are not supported. + """ + pos = [0] + + def peek(): + return filter_str[pos[0]] if pos[0] < len(filter_str) else None + + def advance(): + ch = filter_str[pos[0]] + pos[0] += 1 + return ch + + def expect(ch): + if peek() != ch: + raise ValueError("Expected %r at position %d in filter %r" % (ch, pos[0], filter_str)) + advance() + + def parse_expr(): + expect("(") + op = peek() + if op in ("&", "|"): + advance() + children = [] + while peek() == "(": + children.append(parse_expr()) + expect(")") + if op == "&": + return lambda attrs: all(c(attrs) for c in children) + return lambda attrs: any(c(attrs) for c in children) + if op == "!": + advance() + child = parse_expr() + expect(")") + return lambda attrs: not child(attrs) + # simple (attr op value) — only '=' supported + name_start = pos[0] + while pos[0] < len(filter_str) and filter_str[pos[0]] not in ("=", ")"): + pos[0] += 1 + attr = filter_str[name_start:pos[0]].strip() + # strip extensible-match modifiers like ':1.2.840...:' + if ":" in attr: + attr = attr.split(":")[0] + expect("=") + val_start = pos[0] + while pos[0] < len(filter_str) and filter_str[pos[0]] != ")": + pos[0] += 1 + value = filter_str[val_start:pos[0]] + expect(")") + return _build_match(attr, value) + + def _build_match(attr, value): + attr_l = attr.lower() + + def _attr_values(attrs): + for k in attrs.keys(): + if k.lower() == attr_l: + v = attrs[k] + return v if isinstance(v, list) else [v] + return None + + if value == "*": + def match_presence(attrs): + vals = _attr_values(attrs) + return vals is not None and len(vals) > 0 and any(v not in ("", None) for v in vals) + return match_presence + + starts_wild = value.startswith("*") + ends_wild = value.endswith("*") + inner = value.strip("*").lower() + + def match(attrs): + vals = _attr_values(attrs) + if vals is None: + return False + for item in vals: + s = item if isinstance(item, str) else str(item) + sl = s.lower() + if starts_wild and ends_wild: + if inner in sl: + return True + elif starts_wild: + if sl.endswith(inner): + return True + elif ends_wild: + if sl.startswith(inner): + return True + else: + if sl == inner: + return True + return False + + return match + + matcher = parse_expr() + if pos[0] != len(filter_str): + raise ValueError("Unexpected trailing characters in filter %r" % filter_str) + return matcher + + +class OfflineLDAPSearcher(object): + """ + Drop-in replacement for LDAPSearcher that answers queries from an + in-memory flat dict loaded from an ldap2json file. Supports the same + query(base_dn, query, attributes, ...) signature used by the REPL. + """ + def __init__(self, flat_results, naming_contexts): + self.flat_results = flat_results + self.naming_contexts = naming_contexts + self.ldap_server = None + self.ldap_session = None + + def _iter_in_scope(self, base_dn, search_scope): + base_norm = base_dn.strip().lower() + for dn in self.flat_results.keys(): + dn_norm = dn.lower() + if base_norm == "": + yield dn + continue + if search_scope == ldap3.BASE: + if dn_norm == base_norm: + yield dn + elif search_scope == ldap3.LEVEL: + if dn_norm.endswith("," + base_norm) and dn_norm.count(",") == base_norm.count(",") + 1: + yield dn + else: # SUBTREE (default) + if dn_norm == base_norm or dn_norm.endswith("," + base_norm): + yield dn + + def query(self, base_dn, query, attributes=["*"], page_size=1000, size_limit=0, search_scope=ldap3.SUBTREE): + try: + matcher = parse_ldap_filter(query) + except ValueError as e: + print("Invalid Filter. (%s)" % str(e)) + return {} + + requested = attributes or ["*"] + want_all = "*" in requested + wanted = {a.lower() for a in requested if a != "*"} + + results = {} + count = 0 + for dn in self._iter_in_scope(base_dn, search_scope): + attrs = self.flat_results[dn] + if not matcher(attrs): + continue + if want_all: + projected = dict(attrs) + else: + projected = {k: v for k, v in attrs.items() if k.lower() in wanted} + results[dn] = projected + count += 1 + if size_limit and count >= size_limit: + break + return results + + def query_all_naming_contexts(self, query, attributes=["*"], page_size=1000, size_limit=0, search_scope=ldap3.SUBTREE): + merged = {} + for nc in self.naming_contexts: + merged.update(self.query(nc, query, attributes=attributes, page_size=page_size, size_limit=size_limit, search_scope=search_scope)) + return merged + + def print_colored_result(self, dn, data): + return LDAPSearcher.print_colored_result(self, dn, data) + + ### LDAPConsole # LDAP controls @@ -513,6 +747,8 @@ def parseArgs(): parser.add_argument("-x", "--xlsx", dest="xlsx", default=None, type=str, help="Output results to an XLSX file.") + parser.add_argument("--ldap2json", dest="ldap2json_file", default=None, type=str, metavar="FILE", help="Offline mode: load an ldap2json .json export and run queries against it instead of a live LDAP server.") + authconn = parser.add_argument_group("authentication & connection") authconn.add_argument("--dc-ip", action="store", metavar="ip address", help="IP Address of the domain controller or KDC (Key Distribution Center) for Kerberos. If omitted it will use the domain part (FQDN) specified in the identity parameter") authconn.add_argument("--kdcHost", dest="kdcHost", action="store", metavar="FQDN KDC", help="FQDN of KDC for Kerberos.") @@ -542,43 +778,55 @@ def parseArgs(): if not options.quiet: print("LDAPconsole v%s - by Remi GASCOU (Podalirius)\n" % VERSION) - # Parse hashes - if options.auth_hashes is not None: - if ":" not in options.auth_hashes: - options.auth_hashes = ":" + options.auth_hashes - auth_lm_hash, auth_nt_hash = parse_lm_nt_hashes(options.auth_hashes) - - # Use AES Authentication key if available - if options.auth_key is not None: - options.use_kerberos = True - if options.use_kerberos is True and options.kdcHost is None: - print("[!] Specify KDC's Hostname of FQDN using the argument --kdcHost") - exit() + # Parse hashes and validate auth options only when talking to a live server + if options.ldap2json_file is None: + if options.auth_hashes is not None: + if ":" not in options.auth_hashes: + options.auth_hashes = ":" + options.auth_hashes + auth_lm_hash, auth_nt_hash = parse_lm_nt_hashes(options.auth_hashes) + + # Use AES Authentication key if available + if options.auth_key is not None: + options.use_kerberos = True + if options.use_kerberos is True and options.kdcHost is None: + print("[!] Specify KDC's Hostname of FQDN using the argument --kdcHost") + exit() + else: + auth_lm_hash, auth_nt_hash = None, None # Try to authenticate with specified credentials try: - if not options.quiet: - if options.auth_domain is not None: - print("[>] Try domain authentication as \"%s\\%s\" on %s ... " % (options.auth_domain, options.auth_username, options.dc_ip)) - else: - print("[>] Try local authentication as \"%s\" on %s ... " % (options.auth_username, options.dc_ip)) - ldap_server, ldap_session = init_ldap_session( - auth_domain=options.auth_domain, - auth_dc_ip=options.dc_ip, - auth_username=options.auth_username, - auth_password=options.auth_password, - auth_lm_hash=auth_lm_hash, - auth_nt_hash=auth_nt_hash, - auth_key=options.auth_key, - use_kerberos=options.use_kerberos, - kdcHost=options.kdcHost, - use_ldaps=options.use_ldaps - ) - if not options.quiet: - print("[+] Authentication successful!\n") + if options.ldap2json_file is not None: + if not options.quiet: + print("[>] Loading ldap2json file \"%s\" ..." % options.ldap2json_file) + flat_results, naming_contexts = load_ldap2json_file(options.ldap2json_file) + if not options.quiet: + print("[+] Loaded %d entries, %d naming context(s).\n" % (len(flat_results), len(naming_contexts))) + ls = OfflineLDAPSearcher(flat_results=flat_results, naming_contexts=naming_contexts) + search_base = naming_contexts[0] if naming_contexts else "" + else: + if not options.quiet: + if options.auth_domain is not None: + print("[>] Try domain authentication as \"%s\\%s\" on %s ... " % (options.auth_domain, options.auth_username, options.dc_ip)) + else: + print("[>] Try local authentication as \"%s\" on %s ... " % (options.auth_username, options.dc_ip)) + ldap_server, ldap_session = init_ldap_session( + auth_domain=options.auth_domain, + auth_dc_ip=options.dc_ip, + auth_username=options.auth_username, + auth_password=options.auth_password, + auth_lm_hash=auth_lm_hash, + auth_nt_hash=auth_nt_hash, + auth_key=options.auth_key, + use_kerberos=options.use_kerberos, + kdcHost=options.kdcHost, + use_ldaps=options.use_ldaps + ) + if not options.quiet: + print("[+] Authentication successful!\n") - search_base = ldap_server.info.other["defaultNamingContext"][0] - ls = LDAPSearcher(ldap_server=ldap_server, ldap_session=ldap_session) + search_base = ldap_server.info.other["defaultNamingContext"][0] + ls = LDAPSearcher(ldap_server=ldap_server, ldap_session=ldap_session) search_scope = ldap3.SUBTREE