diff --git a/bindings/Python/SymbolicInterpreter.cpp b/bindings/Python/SymbolicInterpreter.cpp index bc8f48af1..befe2ba66 100644 --- a/bindings/Python/SymbolicInterpreter.cpp +++ b/bindings/Python/SymbolicInterpreter.cpp @@ -239,6 +239,7 @@ PythonPolicy::~PythonPolicy() { Py_XDECREF(cached_on_enter_block_); Py_XDECREF(cached_on_global_initialized_); Py_XDECREF(cached_on_instruction_); + Py_XDECREF(cached_mem_bulk_op_); Py_XDECREF(pending_exc_type_); Py_XDECREF(pending_exc_value_); Py_XDECREF(pending_exc_tb_); @@ -699,6 +700,37 @@ bool PythonPolicy::exec_symbolic_store_impl(PythonScheduler &, bool PythonPolicy::mem_bulk_op(PythonScheduler &, MemOp sub, const std::vector &ops, const MemoryInst &mi, SharedPyPtr &result) { + // Try the Python policy first so analysts can intercept whole bulk + // ops (and so the per-byte default decomposition fires `mem_read` / + // `mem_write` hooks). NotImplemented = "fall through to concrete"; + // any other return is the IR result of the bulk op. + if (PyObject *method = lookup_method(cached_mem_bulk_op_, "mem_bulk_op")) { + PyObject *ops_list = PyList_New(static_cast(ops.size())); + if (!ops_list) { + capture_exception(); + result = make_default(); + return true; + } + for (size_t i = 0; i < ops.size(); ++i) { + PyObject *o = or_none(ops[i].Get()); + Py_INCREF(o); + PyList_SET_ITEM(ops_list, static_cast(i), o); + } + PyObject *py_result = PyObject_CallFunction( + method, "iN", static_cast(sub), ops_list); + if (!py_result) { + capture_exception(); + result = make_default(); + return true; + } + if (py_result != Py_NotImplemented) { + result = SharedPyPtr(py_result); + Py_DECREF(py_result); + return true; + } + Py_DECREF(py_result); // NotImplemented — fall through to concrete. + } + std::vector concrete_ops; concrete_ops.reserve(ops.size()); for (auto &op : ops) concrete_ops.push_back(python_to_value(op.Get())); @@ -845,11 +877,13 @@ bool PythonPolicy::resolve_call(PythonScheduler &, capture_exception(); return false; } - if (result != Py_NotImplemented && result != Py_None) { - // Any non-None / non-NotImplemented return is a skip with that - // value as the call's result. A `Skip(value)` instance is the - // disambiguator for "skip even though the value is None"; we - // unwrap it here. Anything else is taken as the value itself. + if (result != Py_NotImplemented) { + // Any non-NotImplemented return is a skip with that value as the + // call's result — `None` stubs the call to return `None`, an int + // / SymExpr / z3 expr is the replacement value, and `Skip(value)` + // is the typed marker analysts can still use (we unwrap it here). + // Only `NotImplemented` falls through to inlining, matching the + // convention used by `mem_read` / `mem_write` and the pure ops. static PyObject *cls_Skip = nullptr; if (!cls_Skip) { PyObject *mod = PyImport_ImportModule("multiplier.symex.events"); diff --git a/bindings/Python/SymbolicInterpreter.h b/bindings/Python/SymbolicInterpreter.h index c90cf15fe..ad52abcb1 100644 --- a/bindings/Python/SymbolicInterpreter.h +++ b/bindings/Python/SymbolicInterpreter.h @@ -366,6 +366,7 @@ class PythonPolicy PyObject *cached_on_enter_block_{nullptr}; PyObject *cached_on_instruction_{nullptr}; PyObject *cached_on_global_initialized_{nullptr}; + PyObject *cached_mem_bulk_op_{nullptr}; // Pending exception state. Captured when a Python hook raises so the // interpreter loop can exit cleanly and SymbolicStep can re-raise it. diff --git a/bindings/Python/symex/ctx.py b/bindings/Python/symex/ctx.py index ce95056a0..a19fb31fe 100644 --- a/bindings/Python/symex/ctx.py +++ b/bindings/Python/symex/ctx.py @@ -11,10 +11,12 @@ lens, args lens (for call events), layout, and solver. Hooks compose by calling `next_hook(...)` to forward to the rest of -the chain. To short-circuit with the substrate's natural default -(e.g., "let the call return None / 0 instead of inlining"), use -`ctx.default()`. To stop the entire path cleanly, call -`ctx.stop_path()` and then return `ctx.default()` (or any value). +the chain. To short-circuit a call hook with `None` as the return +value (the common "stub this function" case), just `return None`. +`ctx.default()` is the typed equivalent. To fall through to the +substrate's own resolver (e.g., let an IR function inline), return +`NotImplemented`. To stop the entire path cleanly, call +`ctx.stop_path()` and then return any value. """ from .events import Terminal, StopNow @@ -37,13 +39,12 @@ def __init__(self, *, path, mem, args=None, layout=None, solver=None): self.solver = solver def default(self): - """The substrate's "natural default" for the current event, - wrapped in `Skip` so the substrate treats it as an explicit - skip-with-this-value rather than a fall-through. + """The substrate's "natural default" for the current event. - Phase 2 uses `None` as the default value for every event; - per-event typed defaults (e.g., width-correct zero for - memory reads) can land later if a use-case demands it. + Equivalent to `return None` from a call handler — the call is + skipped with `None` as the return value. Returns `Skip()` (a + typed marker the substrate unwraps) so this is also safe in + any future hook surface that might re-disambiguate `None`. """ from .events import Skip return Skip() diff --git a/bindings/Python/symex/dispatch.py b/bindings/Python/symex/dispatch.py index eb6312cf0..5db0acf1f 100644 --- a/bindings/Python/symex/dispatch.py +++ b/bindings/Python/symex/dispatch.py @@ -41,6 +41,7 @@ BRANCH, LOOP, CONCRETIZE, BLOCK_ENTER, INSTRUCTION, ADDRESS_FOR, ADDRESS_RESOLVED, + BULK_MEMORY, EventKind, Phase, ) from .lens import MemView, ArgsView @@ -81,15 +82,90 @@ def __bool__(self): "SymExpr has no concrete truth value; route through is_true") +def _concrete_int(v): + """Return a Python ``int`` if ``v`` is concretely an integer. + + Accepts Python ``int`` (rejecting ``bool`` — booleans come through + only when a hook returned ``True``/``False`` and aren't valid + pointers / numeric operands), and z3 BitVec values. For z3 values + that aren't already a ``BitVecVal`` (e.g. ``Concat(BVV, BVV) + 0`` + after a multi-byte concrete load through the shadow), one + ``z3.simplify`` is enough to collapse the structural shape to a + value when the expression is provably constant. Returns ``None`` + when the value is genuinely symbolic. + """ + if isinstance(v, bool): + return None + if isinstance(v, int): + return v + z3 = _z3_module() + if z3 is None or not isinstance(v, z3.ExprRef): + return None + if z3.is_bv_value(v): + return v.as_long() + s = z3.simplify(v) + if z3.is_bv_value(s): + return s.as_long() + return None + + +def _concrete_bool(v): + """Return Python ``bool`` if ``v`` is concretely truthy or falsy. + + Mirrors ``_concrete_int`` for branch / is_true sites. ``None`` + means "still symbolic, fork". + """ + if isinstance(v, bool): + return v + if isinstance(v, int): + return v != 0 + z3 = _z3_module() + if z3 is None or not isinstance(v, z3.ExprRef): + return None + if z3.is_true(v): + return True + if z3.is_false(v): + return False + if z3.is_bv_value(v): + return v.as_long() != 0 + s = z3.simplify(v) + if z3.is_true(s): + return True + if z3.is_false(s): + return False + if z3.is_bv_value(s): + return s.as_long() != 0 + return None + + def extract_addr(addr): """Pull a concrete address out of the substrate's value form. - Pointers come through as bare ints; anything else (SymExpr, - z3 expression, None) yields None. + Pointers come through as bare ints (the common case) or as z3 + BitVec values that may be structurally non-trivial (e.g. a + ``Concat(BVV, …) + 0`` produced by the shadow-driven load of a + pointer global). ``_concrete_int`` collapses both shapes to an + ``int``; symbolic addresses yield ``None`` so the substrate + emits a ``MemAddrContinuation``. """ - if isinstance(addr, int) and not isinstance(addr, bool): - return int(addr) - return None + return _concrete_int(addr) + + +# Back-compat alias — `_coerce_int` used to be its own thing for +# bulk-op operands; keep the name working while everything points +# at the unified helper. +_coerce_int = _concrete_int + + +def _memop_name(op): + """Return the analyst-facing name for an IR `MemOp` sub-opcode. + + Derived from the `mx.ir.MemOp` enum so the name list stays in + lock-step with the C++ enum automatically — adding a new bulk op + in `OpCode.h` becomes available here without a separate edit. + Returns the lower-cased member name (e.g. ``"memcpy"``). + """ + return mx.ir.MemOp(int(op)).name.lower() class _Selector: @@ -102,11 +178,12 @@ class _Selector: """ __slots__ = ("addr_range", "name", "eid", "func", "block", "region", - "_layout", "_resolved_range", "kind", "target_kind", "decl") + "_layout", "_resolved_range", "kind", "target_kind", "decl", + "op") def __init__(self, addr_range=None, name=None, eid=None, func=None, block=None, region=None, layout=None, - kind=None, target_kind=None, decl=None): + kind=None, target_kind=None, decl=None, op=None): self.addr_range = addr_range self.name = name # Allow `decl=` to substitute for both name and eid: @@ -133,6 +210,10 @@ def __init__(self, addr_range=None, name=None, eid=None, func=None, # Phase 9 axes self.kind = kind self.target_kind = target_kind + # Bulk-memory selector — matches "memcpy", "memset", … against + # the IR sub-opcode name, mirroring the `kind=` axis used by + # the address-resolution events. + self.op = op def matches_addr(self, addr): if self.addr_range is None: @@ -181,6 +262,12 @@ def matches_target_kind(self, candidate): return True return candidate is not None and candidate == self.target_kind + def matches_op(self, candidate): + """Match the bulk-memory op name: "memcpy", "memset", ….""" + if self.op is None: + return True + return candidate is not None and candidate == self.op + def _compute_range(self): if self._resolved_range is not None: return self._resolved_range @@ -214,6 +301,7 @@ def make_selector(layout, **kwargs): layout=layout, kind=kwargs.get("kind"), target_kind=kwargs.get("target_kind"), + op=kwargs.get("op"), ) @@ -281,7 +369,7 @@ def _build_chain(handlers, default_fn): # ---- per-event default (chain bottom) functions ----------------------- -from ._types import _BYTES_TYPES, _INT_TYPES, _SEQ_TYPES +from ._types import _BYTES_TYPES, _INT_TYPES, _SEQ_TYPES, Endian # Pre-built zero-fill byte strings indexed by size (0-16). _ZERO_BYTES = tuple(bytes(n) for n in range(17)) @@ -380,7 +468,24 @@ def _shadow_read(shadow, addr, size, data, buf): return _z3_concat_fold(z3, result) -def _make_default_mem_read(is_float, shadow=None, buf=None, byte_order="little"): +def _shadow_range_is_concrete(z3, shadow, addr, size): + """True iff no byte in [addr, addr+size) carries a symbolic-shaped + shadow entry. Missing entries (``None``) are concrete — they're + filled from ``ctx.mem.read_bytes`` data by ``_shadow_read``. + ``BitVecVal`` entries (concrete IR-side stores) are also concrete. + Only symbolic shapes (``Extract``, region-overlay selects, …) + return False so the read preserves the symbolic shape.""" + i = 0 + while i < size: + entry = shadow.get(addr + i) + if entry is not None and not z3.is_bv_value(entry): + return False + i += 1 + return True + + +def _make_default_mem_read(is_float, shadow=None, buf=None, + byte_order=Endian.LITTLE): """Return the chain bottom for a memory_read event. Reads baseline bytes from ``ConcreteMemory`` first (always needed as @@ -388,6 +493,17 @@ def _make_default_mem_read(is_float, shadow=None, buf=None, byte_order="little") shadow dict. Shadow entries shadow the baseline; any range with no shadow entry goes straight to the concrete fallback. Falls back to zero-fill on a read failure so OOB sinks still fire. + + Concrete IR-side stores land in the shadow as ``BitVecVal`` bytes + (so sibling forks see their own writes through CoW). When the read + range is covered entirely by ``BitVecVal`` entries, collapse back + to a Python ``int`` (or float) so downstream comparisons stay + concrete — without this a round-tripped concrete value would + surface as a z3 expression and a later compare would fork or + produce a ``BoolRef`` the substrate can't store. Symbolic-shaped + entries (``Extract``, ``Concat``, region-overlay selects, …) are + preserved as-is, even when they happen to simplify to a constant + — analyst code relies on the *shape* signaling symbolic-ness. """ _buf = buf if buf is not None else [] @@ -399,50 +515,95 @@ def default(ctx, addr, size): if shadow is not None: sym = _shadow_read(shadow, addr, size, data, _buf) if sym is not None: + z3 = _z3_module() + if z3 is not None and _shadow_range_is_concrete( + z3, shadow, addr, size): + val = z3.simplify(sym).as_long() + if is_float and size in (4, 8): + # Shadow assembly is always little-endian + # (`_shadow_read` puts byte-at-addr in the + # LSB), so the int we get out is the LE + # representation. Repacking LE matches. + fmt = "f" - return _struct.unpack(fmt, data)[0] - if is_float and size == 8: - fmt = "d" + if is_float and size in (4, 8): + prefix = "<" if byte_order == Endian.LITTLE else ">" + fmt = f"{prefix}f" if size == 4 else f"{prefix}d" return _struct.unpack(fmt, data)[0] return int.from_bytes(data, byte_order, signed=False) return default -def _make_default_mem_write(is_float, shadow=None, byte_order="little"): +def _make_default_mem_write(is_float, shadow=None, byte_order=Endian.LITTLE): """Return the chain bottom for a memory_write event. - Writes concrete bytes via the lens. Handles ints, raw bytes, and - IEEE floats. A z3 write decomposes ``val`` into per-byte extracts - in the shadow dict; a concrete write clears any covered shadow - slots (concrete memory is the source of truth) and writes the - real bytes to ``ConcreteMemory``. + Concrete values land in BOTH the per-path shadow (as ``BitVecVal`` + bytes, little-endian internal layout) and ``ConcreteMemory``. The + shadow gives sibling forks per-path read isolation through the + dispatcher; ``ConcreteMemory`` keeps direct ``path.mem.read_bytes`` + inspection working as before. Symbolic (z3) values land in the + shadow only — there's no concrete byte representation for them. + + Without a shadow (test paths, substrate-only callers) the fallback + is just the ``ConcreteMemory`` write; symbolic writes have nowhere + to go and are silently dropped (matches prior behavior). """ def default(ctx, addr, val, size): if isinstance(val, bool): val = int(val) - if isinstance(val, int): - if shadow is not None: - _shadow_write(shadow, addr, val, size) - ctx.mem.write_bytes( - addr, val.to_bytes(size, byte_order, signed=(val < 0))) - return None - if isinstance(val, _BYTES_TYPES): + # ``BitVecVal(7, 32)`` IS a concrete int; route it through the + # int path so the write round-trips as a Python int rather + # than as Extract-of-BitVecVal bytes in the shadow. The check + # is intentionally strict (``is_bv_value`` only — no simplify) + # because a symbolic-shaped expression that *evaluates* to a + # constant (e.g. a region-overlay select that the test wants + # to surface as z3) should still take the symbolic path. + z3 = _z3_module() + if z3 is not None and isinstance(val, z3.ExprRef) \ + and z3.is_bv_value(val): + val = val.as_long() + + # Genuinely symbolic values: shadow only. No concrete + # representation, so nothing meaningful to put in ConcreteMemory. + if _is_z3(val): if shadow is not None: _shadow_write(shadow, addr, val, size) - ctx.mem.write_bytes(addr, val) return None - if isinstance(val, float): - if shadow is not None: - _shadow_write(shadow, addr, val, size) - prefix = "<" if byte_order == "little" else ">" + + # Concrete values: bytes for ConcreteMemory + matching + # BitVecVal entries in the shadow (when z3 is available). + if isinstance(val, int): + packed = val.to_bytes(size, byte_order, signed=(val < 0)) + elif isinstance(val, _BYTES_TYPES): + packed = bytes(val) + elif isinstance(val, float): + prefix = "<" if byte_order == Endian.LITTLE else ">" fmt = f"{prefix}f" if size == 4 else f"{prefix}d" - ctx.mem.write_bytes(addr, _struct.pack(fmt, val)) - return None - if shadow is not None and _is_z3(val): - _shadow_write(shadow, addr, val, size) + packed = _struct.pack(fmt, val) + else: return None + + ctx.mem.write_bytes(addr, packed) + + if shadow is not None: + z3 = _z3_module() + if z3 is not None: + # Shadow stores byte-at-addr in slot[addr], which + # `_shadow_read` reassembles LSB-first. Re-layout + # `packed` as LE so the shadow round-trips concrete + # writes correctly regardless of the requested + # ``byte_order`` for the concrete-mem write above. + if byte_order == Endian.LITTLE or len(packed) == 1: + bytes_le = packed + else: + bytes_le = packed[::-1] + i = 0 + for b in bytes_le: + shadow[addr + i] = z3.BitVecVal(b, 8) + i += 1 return None return default @@ -469,10 +630,14 @@ def _default_branch(ctx, condition): Concrete conditions resolve to True / False naturally; symbolic conditions return `_FORK`, telling the InterceptorPolicy to hand control back to the substrate so it can enumerate edges via a - BranchContinuation. + BranchContinuation. A z3 expression that simplifies to a concrete + ``True``/``False`` resolves concretely too — without this, an + ``Eq(BVV(0), BVV(0))``-shaped condition would force a needless + fork. """ - if isinstance(condition, _INT_TYPES): - return condition != 0 + decided = _concrete_bool(condition) + if decided is not None: + return decided return _FORK @@ -879,7 +1044,7 @@ def mem_read(self, addr, size, is_float): _make_default_mem_read(bool(is_float), self._shadow, self._shadow_buf, - byte_order=str(self._engine.endian))) + byte_order=self._engine.endian)) value = chain(ctx, addr_int, size_i) self._fire_observers(MEMORY_READ, Phase.AFTER, ctx, @@ -917,7 +1082,7 @@ def mem_write(self, addr, val, size, is_float): chain = _build_chain(handlers, _make_default_mem_write(bool(is_float), self._shadow, - byte_order=str(self._engine.endian))) + byte_order=self._engine.endian)) chain(ctx, addr_int, val, size_i) # Phase 6 invariant: when the region's overlay has been @@ -1117,8 +1282,8 @@ def _coerce_store_value(self, val, size, z3): if isinstance(val, int): return z3.BitVecVal(val & ((1 << bits) - 1), bits) if isinstance(val, float): - byte_order = str(self._engine.endian) - prefix = "<" if byte_order == "little" else ">" + byte_order = self._engine.endian + prefix = "<" if byte_order == Endian.LITTLE else ">" if int(size) == 4: packed = _struct.pack(f"{prefix}f", val) elif int(size) == 8: @@ -1182,7 +1347,11 @@ def _match(sel): target_addr=target_addr, is_indirect=is_indirect, return_value=None, handled=False) - return None # PythonPolicy treats None as "fall through". + # No handler claimed the call: tell the substrate to fall + # through to its own resolver. NotImplemented (matching + # mem_read / mem_write and the pure ops) is the + # fall-through signal; None is now a real "stub with None". + return NotImplemented self._fire_observers(event, Phase.AFTER, ctx, target_eid=target_for_match, @@ -1190,8 +1359,9 @@ def _match(sel): target_addr=target_addr, is_indirect=is_indirect, return_value=chosen, handled=True) - # Handlers return their replacement value directly; the substrate - # treats any non-None return as a skip with that value. + # Handlers return their replacement value directly; the + # substrate treats any non-NotImplemented return as a skip with + # that value (None included — that's the stub-with-None case). return chosen # ----- pure ops: propagate symbolic values, else fall through ----- @@ -1361,23 +1531,378 @@ def on_global_initialized(self, init_func, addr): init_func=init_func, decl=decl, name=name, eid=eid, addr=int(addr)) + # ----- bulk memory ops (memcpy / memset / strlen / …) ---------- + # + # The C++ substrate calls this for every IR MEMORY op whose + # sub-opcode is in the bulk range (32..56). Returning + # `NotImplemented` falls back to the substrate's concrete impl + # (raw `ConcreteMemory::memcpy/memset/read` — bypasses every + # hook); returning anything else is the IR result of the bulk op. + # + # The chain bottom (`_make_bulk_default`) decomposes supported ops + # into per-byte `mem_read` / `mem_write` calls so analyst hooks + # registered for `memory_read` / `memory_write` fire correctly. + + def mem_bulk_op(self, op, ops): + try: + op_name = _memop_name(op) + except ValueError: + # Op value outside the MemOp enum (shouldn't happen, but + # don't crash the substrate if it does). + return NotImplemented + ctx = self._make_ctx() + ctx.op = op_name + + handlers = self._matching_handlers( + BULK_MEMORY, lambda sel: sel.matches_op(op_name)) + default = self._make_bulk_default(op_name) + if default is None and not handlers: + return NotImplemented + if default is None: + # Op kind has no built-in decomposer but at least one + # analyst handler is registered. Bottom of the chain + # signals "fall through to concrete" if every handler + # delegates downstream. + def default(*_args, **_kwargs): + return NotImplemented + + chain = _build_chain(handlers, default) + return chain(ctx, *list(ops)) + + def _make_bulk_default(self, op_name): + if op_name in ("memcpy", "memmove"): + return self._default_memcpy_like + if op_name == "memset": + return self._default_memset + if op_name == "bzero": + return self._default_bzero + if op_name == "memcmp": + return self._default_memcmp + if op_name == "memchr": + return self._default_memchr + if op_name == "strlen": + return self._default_strlen + if op_name == "strnlen": + return self._default_strnlen + if op_name == "strcmp": + return self._default_strcmp + if op_name == "strncmp": + return self._default_strncmp + if op_name == "strchr": + return self._default_strchr + if op_name == "strrchr": + return self._default_strrchr + if op_name in ("strcpy", "stpcpy"): + return self._make_strcpy_like(op_name == "stpcpy") + if op_name in ("strncpy", "stpncpy"): + return self._make_strncpy_like(op_name == "stpncpy") + if op_name == "strcat": + return self._default_strcat + if op_name == "strncat": + return self._default_strncat + return None + + # Per-op default decomposers. Each returns NotImplemented when an + # operand or a byte read is non-concrete and the op's semantics + # depend on the concrete value (e.g., a symbolic length, a + # symbolic null-terminator). Falling back to concrete preserves + # pre-existing behavior for those cases — analysts who want + # symbolic modelling intercept the bulk op themselves. + + def _default_memcpy_like(self, ctx, dst, src, n): + n_int = _coerce_int(n) + dst_int = _coerce_int(dst) + src_int = _coerce_int(src) + if n_int is None or dst_int is None or src_int is None: + return NotImplemented + # Probe `src` for backing memory. The IR uses MEMCPY for + # struct-return-by-value: when the struct fits in a register + # the "src" value isn't a pointer, it's the raw u64 bit + # pattern of the struct. The C++ concrete fallback distinguishes + # those by trying a 1-byte probe; we mirror that here so the + # decomposition only handles real pointer-to-pointer copies. + if src_int == 0 or self._memory is None: + return NotImplemented + try: + self._memory.read_bytes(src_int, 1) + except RuntimeError: + return NotImplemented + for i in range(n_int): + byte = self.mem_read(src_int + i, 1, False) + if byte is NotImplemented: + return NotImplemented + self.mem_write(dst_int + i, byte, 1, False) + return dst + + def _default_memset(self, ctx, dst, byte_val, n): + n_int = _coerce_int(n) + dst_int = _coerce_int(dst) + if n_int is None or dst_int is None: + return NotImplemented + b_int = _coerce_int(byte_val) + masked = (b_int & 0xFF) if b_int is not None else byte_val + for i in range(n_int): + self.mem_write(dst_int + i, masked, 1, False) + return dst + + def _default_bzero(self, ctx, dst, n): + n_int = _coerce_int(n) + dst_int = _coerce_int(dst) + if n_int is None or dst_int is None: + return NotImplemented + for i in range(n_int): + self.mem_write(dst_int + i, 0, 1, False) + return dst + + def _default_memcmp(self, ctx, a, b, n): + n_int = _coerce_int(n) + a_int = _coerce_int(a) + b_int = _coerce_int(b) + if n_int is None or a_int is None or b_int is None: + return NotImplemented + for i in range(n_int): + ba = _coerce_int(self.mem_read(a_int + i, 1, False)) + bb = _coerce_int(self.mem_read(b_int + i, 1, False)) + if ba is None or bb is None: + return NotImplemented + if ba != bb: + return -1 if ba < bb else 1 + return 0 + + def _default_memchr(self, ctx, addr, byte, n): + n_int = _coerce_int(n) + addr_int = _coerce_int(addr) + b_int = _coerce_int(byte) + if n_int is None or addr_int is None or b_int is None: + return NotImplemented + needle = b_int & 0xFF + for i in range(n_int): + byte_val = _coerce_int(self.mem_read(addr_int + i, 1, False)) + if byte_val is None: + return NotImplemented + if byte_val == needle: + return addr_int + i + return 0 # null pointer + + def _default_strlen(self, ctx, addr): + addr_int = _coerce_int(addr) + if addr_int is None: + return NotImplemented + i = 0 + while True: + byte = _coerce_int(self.mem_read(addr_int + i, 1, False)) + if byte is None: + return NotImplemented + if byte == 0: + return i + i += 1 + + def _default_strnlen(self, ctx, addr, max_len): + addr_int = _coerce_int(addr) + max_int = _coerce_int(max_len) + if addr_int is None or max_int is None: + return NotImplemented + for i in range(max_int): + byte = _coerce_int(self.mem_read(addr_int + i, 1, False)) + if byte is None: + return NotImplemented + if byte == 0: + return i + return max_int + + def _default_strcmp(self, ctx, a, b): + a_int = _coerce_int(a) + b_int = _coerce_int(b) + if a_int is None or b_int is None: + return NotImplemented + i = 0 + while True: + ba = _coerce_int(self.mem_read(a_int + i, 1, False)) + bb = _coerce_int(self.mem_read(b_int + i, 1, False)) + if ba is None or bb is None: + return NotImplemented + if ba != bb: + return -1 if ba < bb else 1 + if ba == 0: + return 0 + i += 1 + + def _default_strncmp(self, ctx, a, b, n): + n_int = _coerce_int(n) + a_int = _coerce_int(a) + b_int = _coerce_int(b) + if n_int is None or a_int is None or b_int is None: + return NotImplemented + for i in range(n_int): + ba = _coerce_int(self.mem_read(a_int + i, 1, False)) + bb = _coerce_int(self.mem_read(b_int + i, 1, False)) + if ba is None or bb is None: + return NotImplemented + if ba != bb: + return -1 if ba < bb else 1 + if ba == 0: + return 0 + return 0 + + def _default_strchr(self, ctx, addr, byte): + addr_int = _coerce_int(addr) + b_int = _coerce_int(byte) + if addr_int is None or b_int is None: + return NotImplemented + needle = b_int & 0xFF + i = 0 + while True: + byte_val = _coerce_int(self.mem_read(addr_int + i, 1, False)) + if byte_val is None: + return NotImplemented + if byte_val == needle: + return addr_int + i + if byte_val == 0: + return addr_int + i if needle == 0 else 0 + i += 1 + + def _default_strrchr(self, ctx, addr, byte): + addr_int = _coerce_int(addr) + b_int = _coerce_int(byte) + if addr_int is None or b_int is None: + return NotImplemented + needle = b_int & 0xFF + last = -1 + i = 0 + while True: + byte_val = _coerce_int(self.mem_read(addr_int + i, 1, False)) + if byte_val is None: + return NotImplemented + if byte_val == needle: + last = i + if byte_val == 0: + break + i += 1 + return (addr_int + last) if last >= 0 else 0 + + def _make_strcpy_like(self, return_end): + """`strcpy` (return_end=False) and `stpcpy` (return_end=True) + share a body: walk src copying bytes (including the null + terminator) into dst. `stpcpy` returns a pointer to the + copied null; `strcpy` returns dst.""" + def default(ctx, dst, src): + dst_int = _coerce_int(dst) + src_int = _coerce_int(src) + if dst_int is None or src_int is None: + return NotImplemented + i = 0 + while True: + byte = self.mem_read(src_int + i, 1, False) + b_int = _coerce_int(byte) + if b_int is None: + return NotImplemented + self.mem_write(dst_int + i, byte, 1, False) + if b_int == 0: + return (dst_int + i) if return_end else dst + i += 1 + return default + + def _make_strncpy_like(self, return_end): + """`strncpy` (return_end=False) and `stpncpy` (return_end=True). + Copy at most n bytes of src to dst; if src is shorter than n, + the remaining dst bytes are zero-filled. `stpncpy` returns a + pointer to the byte AFTER the last non-null byte copied (or + dst+n if no null was found in the first n).""" + def default(ctx, dst, src, n): + n_int = _coerce_int(n) + dst_int = _coerce_int(dst) + src_int = _coerce_int(src) + if n_int is None or dst_int is None or src_int is None: + return NotImplemented + hit_null_at = -1 + for i in range(n_int): + byte = self.mem_read(src_int + i, 1, False) + b_int = _coerce_int(byte) + if b_int is None: + return NotImplemented + if hit_null_at >= 0: + self.mem_write(dst_int + i, 0, 1, False) + else: + self.mem_write(dst_int + i, byte, 1, False) + if b_int == 0: + hit_null_at = i + if return_end: + return (dst_int + hit_null_at) if hit_null_at >= 0 \ + else (dst_int + n_int) + return dst + return default + + def _default_strcat(self, ctx, dst, src): + dst_int = _coerce_int(dst) + src_int = _coerce_int(src) + if dst_int is None or src_int is None: + return NotImplemented + # Find end of dst. + end = 0 + while True: + byte = _coerce_int(self.mem_read(dst_int + end, 1, False)) + if byte is None: + return NotImplemented + if byte == 0: + break + end += 1 + # Copy src (including null) to dst+end. + i = 0 + while True: + byte = self.mem_read(src_int + i, 1, False) + b_int = _coerce_int(byte) + if b_int is None: + return NotImplemented + self.mem_write(dst_int + end + i, byte, 1, False) + if b_int == 0: + return dst + i += 1 + + def _default_strncat(self, ctx, dst, src, n): + n_int = _coerce_int(n) + dst_int = _coerce_int(dst) + src_int = _coerce_int(src) + if n_int is None or dst_int is None or src_int is None: + return NotImplemented + end = 0 + while True: + byte = _coerce_int(self.mem_read(dst_int + end, 1, False)) + if byte is None: + return NotImplemented + if byte == 0: + break + end += 1 + copied = 0 + while copied < n_int: + byte = self.mem_read(src_int + copied, 1, False) + b_int = _coerce_int(byte) + if b_int is None: + return NotImplemented + self.mem_write(dst_int + end + copied, byte, 1, False) + if b_int == 0: + return dst + copied += 1 + # No null in first n bytes; append a terminator. + self.mem_write(dst_int + end + copied, 0, 1, False) + return dst + # ----- truth + branch resolution: fork on non-concrete ----- def is_true(self, val): - # z3 truths always force a fork — the substrate enumerates - # both edges and the branch handler accumulates the path - # condition. - if _is_z3(val): - return None # If the analyst registered any BRANCH handlers, fall through to # `resolve_branch` (where we have the target eids) so they get a # chance to fire. The Phase 2 fast path only applies when no # handler is interested. if self._engine._intercepts.lookup(BRANCH): return None - if isinstance(val, _INT_TYPES): - return val != 0 - return None + # Genuinely symbolic truths force a fork. Concrete-shaped z3 + # (e.g. an ``Eq`` of two ``BitVecVal``s that simplifies to + # True/False) resolves concretely so the substrate doesn't + # explode the path set on a known-determined branch. + decided = _concrete_bool(val) + if decided is None: + return None + return decided def resolve_branch(self, branch_inst, condition, true_eid, false_eid): ctx = self._make_ctx() @@ -1406,11 +1931,15 @@ def _match(sel): return True handlers = self._matching_handlers(BRANCH, _match) - if not handlers and isinstance(condition, _INT_TYPES) and \ - not _is_z3(condition): - return condition != 0 # Phase 2 fast path if not handlers: - return None # symbolic, no handler — let substrate fork + # Phase 2 fast path: resolve concretely when we can. + # Covers Python ints AND z3 expressions that simplify to + # a concrete bool, so a determinate branch on lifted-from- + # memory pointers doesn't force a fork. + decided = _concrete_bool(condition) + if decided is not None: + return decided + return None # genuinely symbolic — let substrate fork chain = _build_chain(handlers, _default_branch) chosen = chain(ctx, condition) diff --git a/bindings/Python/symex/engine.py b/bindings/Python/symex/engine.py index 61e1c0a78..710e9d90d 100644 --- a/bindings/Python/symex/engine.py +++ b/bindings/Python/symex/engine.py @@ -666,7 +666,7 @@ def concretize_at(self, strategy, **selector_kwargs): def explore(self, start_func, *, start_block=None, args=None, seed=None, policy=None, until=None, slice_steps=_DEFAULT_SLICE_STEPS, - concretize=None, strategy=Strategy.BFS): + concretize=None, strategy=Strategy.BFS, from_path=None): ir_func = self._resolve_start(start_func) if ir_func is None: raise ValueError(f"start_func {start_func!r} not found in index") @@ -699,9 +699,14 @@ def explore(self, start_func, *, start_block=None, args=None, seed=None, init_policy = (InterceptorPolicy(self, None, layout=layout, memory=memory) if use_interceptor else policy) - initial_path = self._init_path( - ir_func, memory, init_policy, args=args, - start_block=start_block, value_seed=seed) + if from_path is not None: + initial_path = self._init_path_after( + from_path, ir_func, memory, init_policy, args=args, + start_block=start_block, value_seed=seed) + else: + initial_path = self._init_path( + ir_func, memory, init_policy, args=args, + start_block=start_block, value_seed=seed) return PathSet(self._drive( [initial_path], memory, layout, policy, @@ -1036,6 +1041,7 @@ def _init_path(self, ir_func, memory, policy, *, args, start_block, path._func_name = self._function_name(ir_func) path._layout = self.layout path.entry_func = ir_func + path._engine = self # Set TLS base from layout (same for all paths; isolation via shadow). if self.layout is not None: path.tls_base = self.layout.tls_base @@ -1047,6 +1053,95 @@ def _init_path(self, ir_func, memory, policy, *, args, start_block, path._symbolic_shadow.update(init_shadow) # Register externally-supplied z3 BitVec args in the provenance # table so origin() / taint_sources() can trace through them. + self._register_z3_args(path, args) + return path + + def _init_path_after(self, prior_path, ir_func, memory, policy, *, args, + start_block=None, value_seed=None): + """Build an initial path for `engine.explore(..., from_path=…)`. + + Clones `prior_path._state` and reinitializes it for `ir_func` — + `interp_init_state` (or `interp_init_state_at` when + `start_block` is given) clears `call_stack` / `work_stack` / + `global_addresses` / `function_addresses` / `steps` and pushes + a fresh entry frame for the target. The shared layout's + `ConcreteMemory` is unchanged, so substrate writes from the + prior exploration persist. Per-path symbolic state (shadow, + TLS, path condition, fresh vars, origin map, findings, vars) + is copied from `prior_path` so the new exploration sees + everything the previous one accumulated. + + `shared` is shared by reference — that's its design contract; + analyst mutations during the chained explore remain visible + to anyone else holding the reference. Loop counters reset + because they're per-function-instantiation. + """ + if args is None: + args = [] + if value_seed is not None and start_block is None: + raise ValueError( + "value_seed requires start_block (mid-block entry)") + + cloned_state = _interp.clone_state(prior_path._state) + + prev_path = self._current_path + self._current_path = None + try: + if start_block is None: + _interp.init_state(cloned_state, memory, policy, ir_func, + list(args), + self._func_resolver, self._global_resolver, + self._func_addr_resolver, + self._entity_by_addr_resolver) + else: + block = self._resolve_block(ir_func, start_block) + param_addrs = self._allocate_param_slots(ir_func, memory, args) + seed_dict = {int(k): v for k, v + in (value_seed or {}).items()} + _interp.init_state_at( + cloned_state, memory, policy, ir_func, block, + param_addrs, None, seed_dict, + self._func_resolver, self._global_resolver, + self._func_addr_resolver, + self._entity_by_addr_resolver) + finally: + self._current_path = prev_path + + path = Path(cloned_state, memory, + parent_id=prior_path.id, endian=self.endian) + path._func_name = self._function_name(ir_func) + path._layout = self.layout + path.entry_func = ir_func + path._engine = self + if self.layout is not None: + path.tls_base = self.layout.tls_base + + # Inherit per-path bits from the seed. + path._symbolic_shadow = dict(prior_path._symbolic_shadow) + path._tls_shadow = dict(prior_path._tls_shadow) + path.tls_base = prior_path.tls_base + path.path_condition = list(prior_path.path_condition) + path.solver.adopt_fresh_vars(prior_path.solver._fresh_vars) + path._origin_by_name = dict(prior_path._origin_by_name) + path.findings = FindingsList(prior_path.findings) + path.vars = dict(prior_path.vars) + path.shared = prior_path.shared + path._lazy_regions_used = prior_path._lazy_regions_used + path._region_at_suspension = prior_path._region_at_suspension + + # Init-time z3 args land in the policy's ephemeral shadow; + # migrate them onto the path the same way `_init_path` does. + init_shadow = getattr(policy, "_shadow", None) + if init_shadow: + path._symbolic_shadow.update(init_shadow) + self._register_z3_args(path, args) + return path + + def _register_z3_args(self, path, args): + """Record externally-supplied z3 BitVec args in the path's + provenance table so `path.origin` / `taint_sources` can trace + through them. Args that aren't z3, or whose `decl()` lookup + fails, are skipped silently.""" for arg in (args or []): if _is_z3(arg): try: @@ -1061,7 +1156,6 @@ def _init_path(self, ir_func, memory, policy, *, args, start_block, } except Exception: pass - return path def _function_name(self, ir_func): fd = ir_func.declaration @@ -1776,4 +1870,16 @@ def _fork_child(self, parent, child_state): # Phase 15: per-path copy of vars; shared reference for shared. child.vars = dict(parent.vars) child.shared = parent.shared + # Sink findings, region-at-suspension tag, and lazy-region + # budget counter all need to flow into forks the same way they + # flow into clones (path.py:244-250). Missing these meant every + # fork lost pre-fork findings, reset its region tag to None, and + # got a fresh `lazy_region_budget` allowance — letting total + # budget escape by O(num_forks). + child.findings = FindingsList(parent.findings) + child._region_at_suspension = parent._region_at_suspension + child._lazy_regions_used = parent._lazy_regions_used + # Carry the engine reference forward so children of a path + # produced by `engine.explore` can call `child.explore_next(...)`. + child._engine = getattr(parent, "_engine", None) return child diff --git a/bindings/Python/symex/events.py b/bindings/Python/symex/events.py index 83accc3e6..654dd6ed1 100644 --- a/bindings/Python/symex/events.py +++ b/bindings/Python/symex/events.py @@ -47,6 +47,10 @@ class EventKind(StrEnum): ADDRESS_FOR = "address_for" ADDRESS_RESOLVED = "address_resolved" INDIRECT_CALL_RESOLVED = "indirect_call_resolved" + # Bulk memory ops (memcpy / memset / strlen / …) — one event per + # IR MEMORY-with-bulk-sub-opcode, so an analyst can intercept the + # whole op instead of seeing N per-byte mem_read / mem_write hooks. + BULK_MEMORY = "bulk_memory" # Module-level aliases — analysts and dispatcher import these by name. @@ -69,6 +73,7 @@ class EventKind(StrEnum): ADDRESS_FOR = EventKind.ADDRESS_FOR ADDRESS_RESOLVED = EventKind.ADDRESS_RESOLVED INDIRECT_CALL_RESOLVED = EventKind.INDIRECT_CALL_RESOLVED +BULK_MEMORY = EventKind.BULK_MEMORY ALL_EVENTS = frozenset({ @@ -80,6 +85,7 @@ class EventKind(StrEnum): BLOCK_ENTER, INSTRUCTION, ADDRESS_FOR, ADDRESS_RESOLVED, INDIRECT_CALL_RESOLVED, + BULK_MEMORY, }) @@ -194,10 +200,11 @@ class Skip: """Explicit "skip this call, use `value` as the return slot." Most call handlers can return their replacement value directly — - the substrate treats any non-None return as a skip with that - value, since None already means "fall through to inlining." - `Skip(value)` is the disambiguator for the rare case where the - intent is to skip with `None` (e.g. `ctx.default()`). + `return None` stubs the call to return None, `return 42` returns + 42, and so on. `NotImplemented` is the only signal that means + "fall through to inlining" (matching `mem_read` / `mem_write` and + the pure-op hooks). `Skip(value)` is kept as a typed marker that + `ctx.default()` returns; new handlers don't need to use it. """ value: Any = None diff --git a/bindings/Python/symex/lens.py b/bindings/Python/symex/lens.py index e800bae92..756b4f601 100644 --- a/bindings/Python/symex/lens.py +++ b/bindings/Python/symex/lens.py @@ -23,9 +23,39 @@ def _coerce_addr(value): - """Pull an integer address out of a raw arg.""" + """Pull an integer address out of a raw arg. + + Accepts plain Python ints and concrete z3 ``BitVecVal``s (after + z3.simplify), which is how concrete bytes flow when they come + back through a region overlay or per-path symbolic shadow. + Symbolic z3 expressions, ``None``, and other non-int shapes + return None — callers raise a typed error. + """ if isinstance(value, int) and not isinstance(value, bool): return int(value) + return _bv_value_as_int(value) + + +def _bv_value_as_int(value): + """Return the int value of a concrete z3 ``BitVecVal``, or None. + + ``z3.simplify`` is run first so a constant expression like + ``Concat(BitVecVal(0, 8), BitVecVal(42, 8))`` collapses to a + single ``BitVecVal`` and round-trips correctly. Returns None for + any non-z3 value, symbolic z3 value, or when z3 is not installed. + """ + try: + import z3 as _z3 + except ImportError: + return None + if not isinstance(value, _z3.ExprRef): + return None + try: + simplified = _z3.simplify(value) + except Exception: + return None + if _z3.is_bv_value(simplified): + return simplified.as_long() return None @@ -174,6 +204,18 @@ def read_int(self, i, size=None, signed=False): return int(v) if isinstance(v, int): return v + # Concrete z3 BitVecVal (after simplify) — unwrap to a Python + # int. This is how concrete arg values arrive when they came + # back through a per-path shadow / region overlay. + bv = _bv_value_as_int(v) + if bv is not None: + if size is None: + return bv + mask = (1 << (size * 8)) - 1 + bv &= mask + if signed and bv & (1 << (size * 8 - 1)): + bv -= 1 << (size * 8) + return bv # Pointer-shaped — read through it. a = _coerce_addr(v) if a is None: diff --git a/bindings/Python/symex/path.py b/bindings/Python/symex/path.py index 71ccc54c3..747180f09 100644 --- a/bindings/Python/symex/path.py +++ b/bindings/Python/symex/path.py @@ -207,6 +207,11 @@ def __init__(self, state, mem, *, parent_id=None, # visible to every path that shares the reference. self.vars: dict = {} self.shared: dict = {} + # Engine reference set by `engine._init_path` (and propagated + # by clones / forks) so `path.explore_next(...)` can locate + # the engine that produced this path. None for paths + # constructed bare (snapshot restore, manual test paths). + self._engine = None @property def state(self): @@ -245,6 +250,7 @@ def clone(self): new_path.findings = FindingsList(self.findings) new_path.vars = dict(self.vars) new_path.shared = self.shared + new_path._engine = getattr(self, "_engine", None) return new_path def snapshot(self): @@ -317,22 +323,74 @@ def restore(self, snap): self.vars = dict(snap.vars) self.shared = dict(snap.shared) - def replay(self, *, modify, engine, slice_steps=1024, + def explore_next(self, start_func, *, start_block=None, args=None, + seed=None, policy=None, until=None, + slice_steps=None, concretize=None, strategy=None): + """Run a fresh `engine.explore(start_func, ...)` on top of this + path's final state. + + Accepts the same keyword arguments as ``engine.explore`` and + forwards them with ``from_path=self``. The new exploration + inherits this path's per-path symbolic state + (``_symbolic_shadow``, ``path_condition``, ``solver`` fresh + vars, ``_origin_by_name``, ``_tls_shadow``, ``findings``, + ``vars``, ``shared``) and the engine's shared layout / + ``ConcreteMemory``. The interpreter's call stack and work + stack are reset so ``start_func`` runs from its entry block + (or ``start_block`` if given) as if invoked fresh. + + Returns a ``PathSet``. Raises if this path wasn't produced by + an ``engine.explore(...)`` call (no engine reference to use). + """ + engine = getattr(self, "_engine", None) + if engine is None: + raise RuntimeError( + "Path.explore_next requires the engine that produced " + "this path; the path was constructed without one " + "(e.g., via snapshot restore or a bare Path(...) call)") + kwargs = {"args": args, "from_path": self} + if slice_steps is not None: + kwargs["slice_steps"] = slice_steps + if start_block is not None: + kwargs["start_block"] = start_block + if seed is not None: + kwargs["seed"] = seed + if policy is not None: + kwargs["policy"] = policy + if until is not None: + kwargs["until"] = until + if concretize is not None: + kwargs["concretize"] = concretize + if strategy is not None: + kwargs["strategy"] = strategy + return engine.explore(start_func, **kwargs) + + def replay(self, *, modify, engine=None, slice_steps=None, concretize=None, until=None): """Run a fresh exploration starting from a clone of this path, with `modify(path)` applied once before resuming. + ``engine`` defaults to the engine that produced this path + (``self._engine``); pass it explicitly only when working with + a snapshot-restored or manually-constructed Path. + Returns the list of paths produced by the resumed exploration. Doesn't mutate `self`. The `modify` callback receives the cloned Path; it can use `path.mem`, `path.solver`, or directly write through the interpreter state. """ + if engine is None: + engine = getattr(self, "_engine", None) + if engine is None: + raise RuntimeError( + "Path.replay needs an engine — pass `engine=...` " + "or use a path produced by `engine.explore(...)`") snap = self.snapshot() - return engine.resume_from(snap, modify=modify, - slice_steps=slice_steps, - concretize=concretize, - parent_id=self.id, - until=until) + kwargs = {"modify": modify, "concretize": concretize, + "until": until, "parent_id": self.id} + if slice_steps is not None: + kwargs["slice_steps"] = slice_steps + return engine.resume_from(snap, **kwargs) def assert_(self, cond): """Add a z3 assertion to the path condition. If the path becomes @@ -533,19 +591,90 @@ def write_symbolic(self, addr: int, value, size: int = None, else: _shadow_write(self._symbolic_shadow, addr, value, size) + def read_memory(self, addr: int, size: int, + *, endian: Endian = None, + byte_order: Endian = None, + signed: bool = False): + """Read ``size`` bytes from ``addr``, symbolic-aware. + + Returns a z3 ``BitVec(8 * size)`` if any byte in the range has + a symbolic shadow entry; otherwise an unsigned ``int`` built + from the concrete bytes. Pass ``signed=True`` to interpret a + purely concrete result as two's-complement. + + ``endian`` (or its alias ``byte_order``) defaults to the + path's byte order (``path.byte_order``). The mirror of + ``write_memory``: round-tripping a value through write→read + produces the same value (concrete or symbolic). + """ + bo = self._resolve_endian(endian, byte_order) + try: + data = self._mem.read_bytes(addr, size) + except RuntimeError: + data = bytes(size) + sym = self._read_symbolic_bytes(addr, size, data, bo) + if sym is not None: + # `write_memory` plants concrete bytes as `BitVecVal` shadow + # entries (so they survive forks via `_symbolic_shadow`'s + # CoW). When every overlapping shadow byte is concrete the + # post-simplify result collapses to a `BitVecVal`; unwrap it + # to a Python int so concrete writes round-trip as ints. + z3 = _z3_module() + simplified = z3.simplify(sym) if z3 is not None else sym + if z3 is not None and z3.is_bv_value(simplified): + val = simplified.as_long() + if signed and val & (1 << (8 * size - 1)): + val -= 1 << (8 * size) + return val + return simplified + return int.from_bytes(data, str(bo), signed=signed) + + def _read_symbolic_bytes(self, addr, size, data, bo): + """Reconstruct a symbolic value from ``_symbolic_shadow`` if any + byte in ``[addr, addr+size)`` is shadowed; ``None`` otherwise. + + Little-endian path defers to the existing ``_shadow_read`` + helper used by the substrate-side default. Big-endian path + is spelled out separately because ``_shadow_read`` always + emits little-endian Concat order. + """ + shadow = self._symbolic_shadow + if not shadow: + return None + from .dispatch import _shadow_read + if bo == Endian.LITTLE: + return _shadow_read(shadow, addr, size, data, self._shadow_buf) + # Big-endian: byte 0 is MSB. Bail to None when no overlap so + # the caller falls through to a concrete int. + z3 = _z3_module() + if not any((addr + i) in shadow for i in range(size)): + return None + parts = [] + for i in range(size): + entry = shadow.get(addr + i) + parts.append(entry if entry is not None + else z3.BitVecVal(data[i], 8)) + return parts[0] if size == 1 else z3.Concat(*parts) + def write_memory(self, addr: int, value, size: int = None, *, endian: Endian = None, byte_order: Endian = None) -> None: - """Write ``value`` to ``addr``, handling symbolic and concrete cases. - - - z3 expression → symbolic shadow - - int / bool → concrete memory; ``size`` is required - - bytes/bytearray → concrete memory; ``size`` is ignored - - ``endian`` (or its alias ``byte_order``) defaults to the path's byte - order (``path.byte_order``); pass ``Endian.BIG`` or ``Endian.LITTLE`` - to override for this write only. For ``bytes`` input the byte order - is moot — bytes are written as-is. + """Write ``value`` to ``addr``, per-path. + + Routes every shape through ``_symbolic_shadow`` so the write is + cloned-on-fork (``Path.clone`` deep-copies the shadow but + shares the underlying ``ConcreteMemory`` across paths; + mutating that map directly would leak into siblings). + + - z3 expression → stored verbatim per byte + - int / bool → wrapped as ``BitVecVal`` per byte; ``size`` required + - bytes/bytearray → ``BitVecVal`` per byte; ``size`` ignored + - sequence → element-wise (each int / z3 is one byte) + + ``endian`` (or its alias ``byte_order``) defaults to the path's + byte order; pass ``Endian.BIG`` or ``Endian.LITTLE`` to override + per-call. For ``bytes`` input the byte order is moot — bytes are + written in their natural address order. """ from .dispatch import _shadow_write, _is_z3, _z3_byte_at bo = self._resolve_endian(endian, byte_order) @@ -558,14 +687,28 @@ def write_memory(self, addr: int, value, size: int = None, else: _shadow_write(self._symbolic_shadow, addr, value, size) elif isinstance(value, _BYTES_TYPES): - self._mem.write_bytes(addr, value) + z3 = _z3_module() + if z3 is None: + # No z3 available: shared-memory write is the only + # option. Document the limitation instead of silently + # losing per-path semantics. + raise RuntimeError( + "write_memory of bytes requires z3; install z3 to " + "get per-path concrete writes") + for i, b in enumerate(value): + self._symbolic_shadow[addr + i] = z3.BitVecVal(int(b) & 0xFF, 8) elif isinstance(value, _SEQ_TYPES): i = 0 for elem in value: if _is_z3(elem): _shadow_write(self._symbolic_shadow, addr + i, elem, 1) elif isinstance(elem, _INT_TYPES): - self._mem.write_bytes(addr + i, _BYTE_TABLE[int(elem) & 0xFF]) + z3 = _z3_module() + if z3 is None: + raise RuntimeError( + "write_memory of int sequence requires z3") + self._symbolic_shadow[addr + i] = \ + z3.BitVecVal(int(elem) & 0xFF, 8) else: raise TypeError( f"write_memory: element {i} has unsupported type " @@ -575,10 +718,21 @@ def write_memory(self, addr: int, value, size: int = None, if size is None: raise ValueError( "write_memory: size is required when writing an integer") + z3 = _z3_module() + if z3 is None: + raise RuntimeError( + "write_memory of int requires z3; install z3 to " + "get per-path concrete writes") val = int(value) - self._mem.write_bytes(addr, - val.to_bytes(size, str(bo), - signed=(val < 0))) + mask_bits = 8 * size + packed = val & ((1 << mask_bits) - 1) + sym_val = z3.BitVecVal(packed, mask_bits) + if bo == Endian.BIG: + for i in range(size): + self._symbolic_shadow[addr + i] = \ + _z3_byte_at(sym_val, size - 1 - i) + else: + _shadow_write(self._symbolic_shadow, addr, sym_val, size) else: raise TypeError( f"write_memory: unsupported value type {type(value).__name__}; " diff --git a/bindings/Python/symex/sinks.py b/bindings/Python/symex/sinks.py index e0813c469..440bcee1c 100644 --- a/bindings/Python/symex/sinks.py +++ b/bindings/Python/symex/sinks.py @@ -251,28 +251,22 @@ def check(self, event_kind, ctx, payload): mode="symbolic") -# OpCode ranges for udiv / sdiv / urem / srem. Anchored to the IR -# opcode names exactly like dispatch.py's compare/binary tables. -_DIV_KINDS = ("UDIV", "DIV", "UREM", "REM") - - def _is_div_or_rem(op): """Return ("div"|"rem", is_signed) if `op` is a divide or remainder - opcode of any width, else None. Walks the cached `mx.ir.OpCode` - enum once per call; callers cache.""" + opcode of any width, else None. + + Drives off `mx.ir.OpCode(int(op)).name` so this stays in + lock-step with the C++ enum (no hand-maintained int table). + """ import multiplier as mx - OP = mx.ir.OpCode - op_int = int(op) - for prefix in _DIV_KINDS: - for w in (8, 16, 32, 64): - try: - v = int(getattr(OP, f"{prefix}_{w}")) - except AttributeError: - continue - if v == op_int: - kind = "rem" if "REM" in prefix else "div" - signed = not prefix.startswith("U") - return (kind, signed) + try: + name = mx.ir.OpCode(int(op)).name + except ValueError: + return None + if name.startswith("UDIV_"): return ("div", False) + if name.startswith("DIV_"): return ("div", True) + if name.startswith("UREM_"): return ("rem", False) + if name.startswith("REM_"): return ("rem", True) return None diff --git a/tests/symex/test_bulk_memory.py b/tests/symex/test_bulk_memory.py new file mode 100644 index 000000000..a27ee9205 --- /dev/null +++ b/tests/symex/test_bulk_memory.py @@ -0,0 +1,125 @@ +# Copyright (c) 2026-present, Trail of Bits, Inc. +# +# This source code is licensed in accordance with the terms specified in +# the LICENSE file found in the root directory of this source tree. + +"""Bulk-memory hook surface. + +Pre-fix, IR ops in the bulk MemOp range (MEMCPY / MEMSET / STRLEN / …) +went straight to `concrete_mem_bulk_op`, bypassing every Python hook. +The fix adds an `intercept.bulk_memory(op="…")` hook surface and a +default chain bottom that decomposes supported ops into per-byte +`mem_read` / `mem_write` calls so analysts who only registered +fine-grained memory hooks still see them fire. + +Tests: + +B.1 `intercept.bulk_memory(op="memcpy")` fires once per memcpy op + when the IR runs a struct copy. +B.2 Without a bulk hook, the per-byte default fires `mem_write` + hooks for each byte of the memcpy. +B.3 A bulk hook that delegates via `next_hook(...)` runs the + per-byte default, so existing memory hooks keep firing. +""" + +import pytest + +from multiplier.symex import Layout, SymExEngine +from multiplier.symex.events import Terminal + + +def _explore(index, name): + e = SymExEngine(index) + e.layout = Layout() + return e, e.explore(name) + + +def test_b1_bulk_memcpy_hook_fires(index): + """A registered `intercept.bulk_memory(op="memcpy")` hook fires + when the IR contains MEMCPY (struct assignment in C).""" + e = SymExEngine(index) + e.layout = Layout() + fired = [] + + @e.intercept.bulk_memory(op="memcpy") + def hook(ctx, dst, src, n, next_hook): + fired.append({"dst": dst, "src": src, "n": n}) + return next_hook(ctx, dst, src, n) + + paths = e.explore("test_struct_assign") + assert paths, "test_struct_assign produced no paths" + assert fired, "memcpy bulk hook never fired despite struct-assign IR" + + +def test_b2_per_byte_default_fires_mem_write(index): + """When no bulk hook is registered, the per-byte default + decomposition fires `mem_write` hooks per byte. Pre-fix the + substrate's `concrete_mem_bulk_op` bypassed `mem_write` entirely.""" + e = SymExEngine(index) + e.layout = Layout() + write_count = [0] + + @e.intercept.memory_write + def on_write(ctx, addr, val, size, next_hook): + write_count[0] += 1 + return next_hook(ctx, addr, val, size) + + paths = e.explore("test_struct_assign") + assert paths + # struct_assign performs at least one struct = struct copy, which + # the IR lowers to MEMCPY. The per-byte default fires one + # mem_write per byte; the count is opaque but must be > 0. + assert write_count[0] > 0, \ + "memory_write hook did not fire — bulk-op decomposition is " \ + "still bypassing the per-byte hook surface" + + +def test_b3_bulk_hook_delegates_to_default(index): + """A bulk hook that calls `next_hook(...)` runs the per-byte + default, so a co-registered `memory_write` hook still fires.""" + e = SymExEngine(index) + e.layout = Layout() + bulk_fires = [0] + write_fires = [0] + + @e.intercept.bulk_memory(op="memcpy") + def bulk(ctx, dst, src, n, next_hook): + bulk_fires[0] += 1 + return next_hook(ctx, dst, src, n) + + @e.intercept.memory_write + def on_write(ctx, addr, val, size, next_hook): + write_fires[0] += 1 + return next_hook(ctx, addr, val, size) + + e.explore("test_struct_assign") + assert bulk_fires[0] > 0 + assert write_fires[0] > 0, \ + "delegated default did not call mem_write hook" + + +def test_b4_bulk_hook_short_circuits(index): + """A bulk hook that does NOT call `next_hook(...)` short-circuits + — the per-byte default doesn't run, so a co-registered + `memory_write` hook does not fire from the memcpy.""" + e = SymExEngine(index) + e.layout = Layout() + bulk_fires = [0] + write_fires = [0] + + @e.intercept.bulk_memory(op="memcpy") + def bulk(ctx, dst, src, n, next_hook): + bulk_fires[0] += 1 + return dst # claim the op; no decomposition runs + + @e.intercept.memory_write + def on_write(ctx, addr, val, size, next_hook): + # If this fires from a memcpy decomposition, the bulk hook's + # short-circuit was ignored. + write_fires[0] += 1 + return next_hook(ctx, addr, val, size) + + e.explore("test_struct_assign") + assert bulk_fires[0] > 0 + # Note: write_fires may still be > 0 from non-MEMCPY stores in + # the function; we only assert that the bulk hook ran. diff --git a/tests/symex/test_explore_next.py b/tests/symex/test_explore_next.py new file mode 100644 index 000000000..8bdcf07dc --- /dev/null +++ b/tests/symex/test_explore_next.py @@ -0,0 +1,143 @@ +# Copyright (c) 2026-present, Trail of Bits, Inc. +# +# This source code is licensed in accordance with the terms specified in +# the LICENSE file found in the root directory of this source tree. + +"""Chained explores via `path.explore_next` / `engine.explore(from_path=…)`. + +Pre-fix, an analyst running setup-then-target as two separate +``engine.explore`` calls saw the second start with a fresh +``_symbolic_shadow`` and empty ``path_condition``, dropping every +per-path mutation accumulated during setup. The fix adds +``path.explore_next(target, args=…)`` which clones the seed path's +``InterpreterState``, calls ``interp_init_state`` to push a fresh +entry frame for ``target``, and inherits the seed's per-path bits. + +Tests: + +E.1 ``path.explore_next`` produces a non-empty PathSet for a + concrete entry function. +E.2 Symbolic shadow set on the seed (via ``path.write_memory``) + survives into the chained explore — a downstream + ``intercept.memory_read`` of the shadowed address sees the + planted symbolic byte. +E.3 ``path_condition`` accumulated on the seed is inherited. +E.4 Bare ``Path(...)`` (no engine) raises a clear error from + ``explore_next``. +E.5 ``shared`` is shared by reference, ``vars`` are copied — same + contract as forks. +""" + +import pytest + +from multiplier.symex import Layout, SymExEngine +from multiplier.symex.events import Terminal +from multiplier.symex.path import Path +import multiplier as mx + + +def test_e1_explore_next_basic(index): + """`path.explore_next(target)` returns a non-empty PathSet.""" + e = SymExEngine(index) + e.layout = Layout() + + setup_paths = e.explore("test_arithmetic") + assert setup_paths, "setup explore produced no paths" + seed = setup_paths[0] + + next_paths = seed.explore_next("test_function_calls") + assert next_paths, "explore_next produced no paths" + completed = [p for p in next_paths if p.terminal == Terminal.COMPLETED] + assert completed, \ + "no path completed via explore_next; " \ + f"terminals: {[p.terminal for p in next_paths]}" + + +def test_e2_explore_next_inherits_symbolic_shadow(index): + """A symbolic byte planted on the seed via ``path.write_memory`` + survives the chained explore — ``intercept.memory_read`` of the + shadowed address sees the same z3 expression.""" + z3 = pytest.importorskip("z3") + e = SymExEngine(index) + e.layout = Layout() + + seed_paths = e.explore("test_arithmetic") + seed = seed_paths[0] + + BUF = 0x40000 + e.layout.place_global("g_chain_buf", addr=BUF, size=8, init=bytes(8)) + sym_byte = z3.BitVec("sym_chain_byte", 8) + seed.write_memory(BUF, sym_byte, size=1) + + seen = [] + + @e.intercept.memory_read(addr_range=(BUF, 1)) + def watch(ctx, addr, size, next_hook): + v = next_hook(ctx, addr, size) + seen.append(v) + return v + + next_paths = seed.explore_next("test_function_calls") + assert next_paths + # The seed's shadow should be the chained-explore path's starting + # shadow; if any read of BUF happens during the chained explore, + # it should see the symbolic byte. We don't require a read to + # happen (test_function_calls may not touch BUF), but if one does + # the value must be the planted z3 BitVec, never a stale int. + for v in seen: + assert isinstance(v, z3.ExprRef), \ + f"shadowed byte read returned {v!r}; shadow not inherited" + + +def test_e3_path_condition_inherited(index): + """Path condition assertions on the seed are inherited.""" + z3 = pytest.importorskip("z3") + e = SymExEngine(index) + e.layout = Layout() + + seed_paths = e.explore("test_arithmetic") + seed = seed_paths[0] + x = seed.solver.fresh_int("x_chain", size=4) + seed.assert_(x > 100) + + next_paths = seed.explore_next("test_function_calls") + assert next_paths + chained = next_paths[0] + # The constraint x > 100 must be present in every chained path. + assert any(str(c) == str(x > 100) for c in chained.path_condition), \ + f"x > 100 missing from chained path_condition: " \ + f"{[str(c) for c in chained.path_condition]}" + + +def test_e4_bare_path_raises(index): + """A path without an engine reference raises from ``explore_next``.""" + interp = mx.ir.interpret + state = interp.InterpreterState() + mem = interp.ConcreteMemory() + bare = Path(state, mem) + with pytest.raises(RuntimeError, match="explore_next"): + bare.explore_next("test_arithmetic") + + +def test_e5_shared_and_vars_inheritance(index): + """`vars` is copied (per-path), `shared` is by reference (shared). + Same contract as fork inheritance.""" + e = SymExEngine(index) + e.layout = Layout() + + seed_paths = e.explore("test_arithmetic") + seed = seed_paths[0] + seed.vars["seed_token"] = 42 + seed.shared["shared_token"] = "hello" + + next_paths = seed.explore_next("test_function_calls") + assert next_paths + chained = next_paths[0] + + assert chained.vars.get("seed_token") == 42, \ + "vars not inherited from seed" + assert chained.vars is not seed.vars, \ + "vars must be copied, not aliased" + assert chained.shared.get("shared_token") == "hello" + assert chained.shared is seed.shared, \ + "shared must be the same dict object" diff --git a/tests/symex/test_phase15.py b/tests/symex/test_phase15.py index df73f9014..fedd37a14 100644 --- a/tests/symex/test_phase15.py +++ b/tests/symex/test_phase15.py @@ -149,13 +149,17 @@ def test_p15_4_symbolic_switch_no_default(symex_index): sel = z3.BitVec("sel", 32) paths = e.explore("si_switch_no_default", args=[sel]) completed = _completed(paths) - # Two case paths only. - assert len(completed) == 2, \ - f"expected 2 paths (no default), got {len(completed)}: " \ - f"{[p.return_value for p in completed]}" + # Three paths: case 1 (r=10), case 2 (r=20), and the implicit + # default that falls through to `return r` with r still at its + # initializer value 0. C "no `default:` label" doesn't remove the + # IR's default edge — the SwitchInst has three successors and the + # third is reachable for any sel ∉ {1, 2}. + assert len(completed) == 3, \ + f"expected 3 paths (case 1, case 2, fall-through), " \ + f"got {len(completed)}: {[p.return_value for p in completed]}" return_values = {p.return_value for p in completed} - assert return_values == {10, 20}, \ - f"return values must be {{10, 20}}, got {return_values}" + assert return_values == {10, 20, 0}, \ + f"return values must be {{10, 20, 0}}, got {return_values}" # ---------------------------------------------------------------------------