Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
314 changes: 281 additions & 33 deletions ldapconsole.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
# Date created : 29 Jul 2021

import argparse
import json
from ldap3.protocol.formatters.formatters import format_sid
import ldap3
import os
Expand Down Expand Up @@ -145,6 +146,239 @@ def dict_path_access(d, path):
return d


### Offline ldap2json import

def flatten_ldap2json(tree, rdn_path=None):
"""
Flatten the hierarchical ldap2json format into a {dn: {attr: value, ...}}
dictionary. In an ldap2json file, each nesting level is keyed by an RDN
component (CN=..., OU=..., DC=...), so a child key is distinguished from
an attribute key by containing '='. Leaves (and any node with attribute
keys) contribute one entry to the output, with the DN reconstructed from
the accumulated RDN path in reverse order (the ldap2json writer reverses
the DN components on write, so we reverse them back here).
"""
if rdn_path is None:
rdn_path = []
out = {}
attrs = {}
for key, value in tree.items():
if isinstance(key, str) and "=" in key and isinstance(value, dict):
out.update(flatten_ldap2json(value, rdn_path + [key]))
else:
attrs[key] = value
if attrs and rdn_path:
dn = ",".join(reversed(rdn_path))
out[dn] = attrs
return out


def load_ldap2json_file(path):
"""
Load and flatten an ldap2json .json export. Returns (flat_results, naming_contexts).
A naming context is any DN in the dataset that has no ancestor DN also
present in the dataset.
"""
with open(path, "r", encoding="utf-8") as fh:
tree = json.load(fh)
flat = flatten_ldap2json(tree)
dns = list(flat.keys())
dns_lower = {d: d.lower() for d in dns}
naming_contexts = []
for dn in dns:
dn_l = dns_lower[dn]
has_ancestor = False
for other, other_l in dns_lower.items():
if other_l == dn_l:
continue
if dn_l.endswith("," + other_l):
has_ancestor = True
break
if not has_ancestor:
naming_contexts.append(dn)
return flat, naming_contexts


def parse_ldap_filter(filter_str):
"""
Parse a subset of RFC 4515 LDAP filter syntax and return a callable
matcher(attrs) -> bool. Supported forms:

- Equality: (attr=value)
- Presence: (attr=*)
- Substring: (attr=prefix*), (attr=*suffix), (attr=*infix*)
- Conjunction: (&(...)(...)...)
- Disjunction: (|(...)(...)...)
- Negation: (!(...))

Comparisons are case-insensitive on both attribute names and string
values, which matches Active Directory's default matching rules for
most attributes. Extensible match rules (e.g. LDAP_MATCHING_RULE_BIT_AND)
and approximate match (~=) are not supported.
"""
pos = [0]

def peek():
return filter_str[pos[0]] if pos[0] < len(filter_str) else None

def advance():
ch = filter_str[pos[0]]
pos[0] += 1
return ch

def expect(ch):
if peek() != ch:
raise ValueError("Expected %r at position %d in filter %r" % (ch, pos[0], filter_str))
advance()

def parse_expr():
expect("(")
op = peek()
if op in ("&", "|"):
advance()
children = []
while peek() == "(":
children.append(parse_expr())
expect(")")
if op == "&":
return lambda attrs: all(c(attrs) for c in children)
return lambda attrs: any(c(attrs) for c in children)
if op == "!":
advance()
child = parse_expr()
expect(")")
return lambda attrs: not child(attrs)
# simple (attr op value) — only '=' supported
name_start = pos[0]
while pos[0] < len(filter_str) and filter_str[pos[0]] not in ("=", ")"):
pos[0] += 1
attr = filter_str[name_start:pos[0]].strip()
# strip extensible-match modifiers like ':1.2.840...:'
if ":" in attr:
attr = attr.split(":")[0]
expect("=")
val_start = pos[0]
while pos[0] < len(filter_str) and filter_str[pos[0]] != ")":
pos[0] += 1
value = filter_str[val_start:pos[0]]
expect(")")
return _build_match(attr, value)

def _build_match(attr, value):
attr_l = attr.lower()

def _attr_values(attrs):
for k in attrs.keys():
if k.lower() == attr_l:
v = attrs[k]
return v if isinstance(v, list) else [v]
return None

if value == "*":
def match_presence(attrs):
vals = _attr_values(attrs)
return vals is not None and len(vals) > 0 and any(v not in ("", None) for v in vals)
return match_presence

starts_wild = value.startswith("*")
ends_wild = value.endswith("*")
inner = value.strip("*").lower()

def match(attrs):
vals = _attr_values(attrs)
if vals is None:
return False
for item in vals:
s = item if isinstance(item, str) else str(item)
sl = s.lower()
if starts_wild and ends_wild:
if inner in sl:
return True
elif starts_wild:
if sl.endswith(inner):
return True
elif ends_wild:
if sl.startswith(inner):
return True
else:
if sl == inner:
return True
return False

return match

matcher = parse_expr()
if pos[0] != len(filter_str):
raise ValueError("Unexpected trailing characters in filter %r" % filter_str)
return matcher


class OfflineLDAPSearcher(object):
"""
Drop-in replacement for LDAPSearcher that answers queries from an
in-memory flat dict loaded from an ldap2json file. Supports the same
query(base_dn, query, attributes, ...) signature used by the REPL.
"""
def __init__(self, flat_results, naming_contexts):
self.flat_results = flat_results
self.naming_contexts = naming_contexts
self.ldap_server = None
self.ldap_session = None

def _iter_in_scope(self, base_dn, search_scope):
base_norm = base_dn.strip().lower()
for dn in self.flat_results.keys():
dn_norm = dn.lower()
if base_norm == "":
yield dn
continue
if search_scope == ldap3.BASE:
if dn_norm == base_norm:
yield dn
elif search_scope == ldap3.LEVEL:
if dn_norm.endswith("," + base_norm) and dn_norm.count(",") == base_norm.count(",") + 1:
yield dn
else: # SUBTREE (default)
if dn_norm == base_norm or dn_norm.endswith("," + base_norm):
yield dn

def query(self, base_dn, query, attributes=["*"], page_size=1000, size_limit=0, search_scope=ldap3.SUBTREE):
try:
matcher = parse_ldap_filter(query)
except ValueError as e:
print("Invalid Filter. (%s)" % str(e))
return {}

requested = attributes or ["*"]
want_all = "*" in requested
wanted = {a.lower() for a in requested if a != "*"}

results = {}
count = 0
for dn in self._iter_in_scope(base_dn, search_scope):
attrs = self.flat_results[dn]
if not matcher(attrs):
continue
if want_all:
projected = dict(attrs)
else:
projected = {k: v for k, v in attrs.items() if k.lower() in wanted}
results[dn] = projected
count += 1
if size_limit and count >= size_limit:
break
return results

def query_all_naming_contexts(self, query, attributes=["*"], page_size=1000, size_limit=0, search_scope=ldap3.SUBTREE):
merged = {}
for nc in self.naming_contexts:
merged.update(self.query(nc, query, attributes=attributes, page_size=page_size, size_limit=size_limit, search_scope=search_scope))
return merged

def print_colored_result(self, dn, data):
return LDAPSearcher.print_colored_result(self, dn, data)


### LDAPConsole

# LDAP controls
Expand Down Expand Up @@ -513,6 +747,8 @@ def parseArgs():

parser.add_argument("-x", "--xlsx", dest="xlsx", default=None, type=str, help="Output results to an XLSX file.")

parser.add_argument("--ldap2json", dest="ldap2json_file", default=None, type=str, metavar="FILE", help="Offline mode: load an ldap2json .json export and run queries against it instead of a live LDAP server.")

authconn = parser.add_argument_group("authentication & connection")
authconn.add_argument("--dc-ip", action="store", metavar="ip address", help="IP Address of the domain controller or KDC (Key Distribution Center) for Kerberos. If omitted it will use the domain part (FQDN) specified in the identity parameter")
authconn.add_argument("--kdcHost", dest="kdcHost", action="store", metavar="FQDN KDC", help="FQDN of KDC for Kerberos.")
Expand Down Expand Up @@ -542,43 +778,55 @@ def parseArgs():
if not options.quiet:
print("LDAPconsole v%s - by Remi GASCOU (Podalirius)\n" % VERSION)

# Parse hashes
if options.auth_hashes is not None:
if ":" not in options.auth_hashes:
options.auth_hashes = ":" + options.auth_hashes
auth_lm_hash, auth_nt_hash = parse_lm_nt_hashes(options.auth_hashes)

# Use AES Authentication key if available
if options.auth_key is not None:
options.use_kerberos = True
if options.use_kerberos is True and options.kdcHost is None:
print("[!] Specify KDC's Hostname of FQDN using the argument --kdcHost")
exit()
# Parse hashes and validate auth options only when talking to a live server
if options.ldap2json_file is None:
if options.auth_hashes is not None:
if ":" not in options.auth_hashes:
options.auth_hashes = ":" + options.auth_hashes
auth_lm_hash, auth_nt_hash = parse_lm_nt_hashes(options.auth_hashes)

# Use AES Authentication key if available
if options.auth_key is not None:
options.use_kerberos = True
if options.use_kerberos is True and options.kdcHost is None:
print("[!] Specify KDC's Hostname of FQDN using the argument --kdcHost")
exit()
else:
auth_lm_hash, auth_nt_hash = None, None

# Try to authenticate with specified credentials
try:
if not options.quiet:
if options.auth_domain is not None:
print("[>] Try domain authentication as \"%s\\%s\" on %s ... " % (options.auth_domain, options.auth_username, options.dc_ip))
else:
print("[>] Try local authentication as \"%s\" on %s ... " % (options.auth_username, options.dc_ip))
ldap_server, ldap_session = init_ldap_session(
auth_domain=options.auth_domain,
auth_dc_ip=options.dc_ip,
auth_username=options.auth_username,
auth_password=options.auth_password,
auth_lm_hash=auth_lm_hash,
auth_nt_hash=auth_nt_hash,
auth_key=options.auth_key,
use_kerberos=options.use_kerberos,
kdcHost=options.kdcHost,
use_ldaps=options.use_ldaps
)
if not options.quiet:
print("[+] Authentication successful!\n")
if options.ldap2json_file is not None:
if not options.quiet:
print("[>] Loading ldap2json file \"%s\" ..." % options.ldap2json_file)
flat_results, naming_contexts = load_ldap2json_file(options.ldap2json_file)
if not options.quiet:
print("[+] Loaded %d entries, %d naming context(s).\n" % (len(flat_results), len(naming_contexts)))
ls = OfflineLDAPSearcher(flat_results=flat_results, naming_contexts=naming_contexts)
search_base = naming_contexts[0] if naming_contexts else ""
else:
if not options.quiet:
if options.auth_domain is not None:
print("[>] Try domain authentication as \"%s\\%s\" on %s ... " % (options.auth_domain, options.auth_username, options.dc_ip))
else:
print("[>] Try local authentication as \"%s\" on %s ... " % (options.auth_username, options.dc_ip))
ldap_server, ldap_session = init_ldap_session(
auth_domain=options.auth_domain,
auth_dc_ip=options.dc_ip,
auth_username=options.auth_username,
auth_password=options.auth_password,
auth_lm_hash=auth_lm_hash,
auth_nt_hash=auth_nt_hash,
auth_key=options.auth_key,
use_kerberos=options.use_kerberos,
kdcHost=options.kdcHost,
use_ldaps=options.use_ldaps
)
if not options.quiet:
print("[+] Authentication successful!\n")

search_base = ldap_server.info.other["defaultNamingContext"][0]
ls = LDAPSearcher(ldap_server=ldap_server, ldap_session=ldap_session)
search_base = ldap_server.info.other["defaultNamingContext"][0]
ls = LDAPSearcher(ldap_server=ldap_server, ldap_session=ldap_session)

search_scope = ldap3.SUBTREE

Expand Down