diff --git a/.gitignore b/.gitignore
index 62e412b..f733534 100644
--- a/.gitignore
+++ b/.gitignore
@@ -47,6 +47,7 @@ pip-wheel-metadata/
!tests/integration/fixtures/bin/
!tests/integration/fixtures/bin/*.exe
+!tests/integration/fixtures/bin/analysis/*.exe
*.dll
diff --git a/Makefile b/Makefile
index bddc256..a845732 100644
--- a/Makefile
+++ b/Makefile
@@ -17,6 +17,7 @@ INTEGRATION_DIR := tests/integration
FUZZ_DIR := tests/fuzz
ROBUSTNESS_DIR := tests/robustness
PERFORMANCE_DIR := tests/performance
+CONTRACT_DIR := tests/contract
.PHONY: activate
activate:
@@ -29,16 +30,19 @@ activate:
help:
@echo ""
@echo "Available commands:"
- @echo " make venv Create virtual environment (only once)"
- @echo " make install Install package in editable mode"
- @echo " make dev Install dev tools (pytest, ruff, black)"
- @echo " make test Run test suite"
- @echo " make lint Run ruff linter"
- @echo " make format Auto-format with black"
- @echo " make run Run CLI tool"
- @echo " make clean Remove build artifacts"
- @echo " make dist Build wheel + sdist"
- @echo " make reset Delete venv and reinstall everything"
+ @echo " make venv Create virtual environment (only once)"
+ @echo " make install Install package in editable mode"
+ @echo " make dev Install dev tools (pytest, ruff, black, coverage, pip-audit, bandit, pytest-timeout)"
+ @echo " make test Run unit test suite only"
+ @echo " make test-[option] Run test suite (option=contract, fuzz, integration, performance, robustness, coverage)"
+ @echo " make security Run security scans (pip-audit, bandit)"
+ @echo " make lint Run ruff linter"
+ @echo " make format Auto-format with black"
+ @echo " make run Run CLI tool"
+ @echo " make clean Remove build artifacts"
+ @echo " make clean-all Remove build artifacts and virtual environment"
+ @echo " make dist Build wheel + sdist"
+ @echo " make reset Delete venv and reinstall everything"
@echo ""
@@ -122,6 +126,14 @@ test-coverage: dev
$(PYTHON) -m coverage run -m pytest
$(PYTHON) -m coverage report -m
+# ----------------------------------------
+# Contract tests only
+# ----------------------------------------
+.PHONY: test-contract
+test-contract: dev
+ @echo "Running contract tests..."
+ $(PYTEST) -m contract $(CONTRACT_DIR)
+
# ----------------------------------------
# Static analysis and SCA
# ----------------------------------------
diff --git a/README-pypi.md b/README-pypi.md
index c7569ee..27d20af 100644
--- a/README-pypi.md
+++ b/README-pypi.md
@@ -9,7 +9,7 @@ This is the **official IOCX engine** for static IOC extraction and PE analysis.
- **Organisation:** https://github.com/iocx-dev
- **Website:** https://iocx.dev
-IOCX is **not** an OSINT reputation checker, HTML report generator, or IP/domain scoring tool.
+IOCX is **not** an OSINT reputation checker, HTML report generator, or IP/domain scoring tool.
It is a **static analysis engine** focused on extracting Indicators of Compromise (IOCs) from binaries and text.
---
@@ -19,6 +19,22 @@ It is a **static analysis engine** focused on extracting Indicators of Compromis
IOCX is a fast, safe, deterministic engine for extracting Indicators of Compromise (IOCs) from binaries, text, and logs.
It performs **pure static analysis** — no execution, no sandboxing, no risk.
+## What's new in v0.6.0
+
+- Stable JSON schema across all analysis levels
+- Deterministic PE metadata (headers, TLS, optional header, signatures)
+- Guaranteed IOC categories (always present, empty arrays when no matches)
+- Formalised analysis levels:
+ - core behaviour → no analysis block
+ - basic → section layout + entropy
+ - deep → adds obfuscation heuristics
+ - full → extended metadata summaries
+- Schema‑contract tests to prevent drift across releases
+
+## Schema stability
+
+IOCX guarantees a stable JSON schema, not a guaranteed ordering of keys within objects. JSON objects are unordered by definition, so consumers should rely on field presence and structure rather than positional ordering.
+
## Features
- Extracts IOCs from Windows PE files and raw text
@@ -27,6 +43,7 @@ It performs **pure static analysis** — no execution, no sandboxing, no risk.
- Deterministic output suitable for automation
- Minimal dependencies and safe for enterprise environments
- CLI and Python API
+- Binary-aware static analysis with multi-level depth
## Installation
@@ -58,8 +75,8 @@ print(results)
- Static‑only design (never executes untrusted code)
- Binary‑aware IOC extraction
-- Stable JSON schema
-- High performance (~200 MB/s throughput)
+- Stable, predictable JSON schema
+- High performance: ~25-30 MB/s end-to-end, with individual detectors reaching 150-450 MB/s throughput)
- Ideal for DFIR, SOC automation, CI/CD, and threat‑intel pipelines
## Project identity & naming
@@ -81,8 +98,7 @@ Community tools that integrate with IOCX are encouraged to use names like:
## Extensibility
-IOCX includes a lightweight plugin system that allows you to add custom detectors, parsers, and transformation rules.
-Plugins can emit new IOC categories, override built-in behaviour, or integrate IOCX into larger analysis pipelines.
+IOCX includes a lightweight plugin system for custom detectors, parsers, and transformation rules. Plugins can emit new IOC categories, override built‑in behaviour, or integrate IOCX into larger analysis pipelines.
See the documentation for details on writing detectors and plugins.
diff --git a/README.md b/README.md
index ca49718..44bfc7f 100644
--- a/README.md
+++ b/README.md
@@ -3,7 +3,7 @@
-
+
@@ -11,9 +11,11 @@
-
-
-
+
+
+
+
+
# Official IOCX Project
@@ -93,6 +95,23 @@ IOCX is **static extraction only**, by design.
## Version Highlights
+### v0.6.0 — Stable Output Schema, Deterministic PE Metadata, Contract‑Safe Analysis Levels
+
+- Introduced a fully stable JSON schema across all analysis levels
+- Added strict structural guarantees for `iocs`, `metadata`, and `analysis` blocks
+- Normalised PE metadata fields for deterministic output (headers, TLS, optional header, signatures)
+- Ensured **all IOC categories always exist** (empty arrays when no matches)
+- Formalised analysis‑level behaviour:
+ - core behaviour → no analysis block
+ - basic → section layout + entropy
+ - deep → adds obfuscation heuristics
+ - full → adds extended metadata summaries
+- Added **snapshot‑contract tests** to prevent schema drift across releases
+- Improved PE parser consistency for imports, resources, and section metadata
+- Strengthened safety guarantees for CI/CD and large‑scale automation pipelines
+
+This release establishes the long‑term schema contract that downstream tools can rely on.
+
### v0.5.0 — Analysis Levels, PE Section Analysis, Obfuscation Hints
- New analysis‑level system: basic, deep (default), and full (future‑ready)
@@ -330,6 +349,73 @@ If you are building something that integrates with IOCX and want guidance on nam
Static analysis ensures **safety**, **determinism**, and **CI‑friendly operation**. No sandboxing, no execution, and no risk of triggering malware behaviour.
+## Output Schema (v0.6.0)
+
+IOCX v0.6.0 defines a stable, deterministic JSON schema designed for DFIR, SOC automation, and threat‑intel pipelines. The schema is intentionally simple, predictable, and safe for long‑term integrations.
+
+The top‑level structure contains three blocks:
+
+- `iocs` — extracted indicators
+- `metadata` — structural information about the artifact
+- `analysis` — optional deeper inspection depending on analysis level
+
+This structure is identical across all input types, with PE‑specific fields populated only when applicable.
+
+### IOC Categories
+
+The `iocs` block always contains the same keys, regardless of analysis level:
+
+- `urls`
+- `domains`
+- `ips`
+- `hashes`
+- `emails`
+- `filepaths`
+- `base64`
+- `crypto.btc`
+- `crypto.eth`
+
+Each category is always an array. Empty categories are returned as empty arrays to ensure predictable downstream parsing.
+
+### Metadata Categories
+
+The metadata block contains structural information about the file. For PE files, this includes:
+
+- Imports and import details
+- Sections
+- Resources and resource strings
+- TLS directory
+- Header and optional header
+- Rich header
+- Signatures
+
+These fields are always present, even when empty. Metadata is **independent of analysis level** and is always returned in full.
+
+### Analysis Levels
+
+The `analysis` block is the only part of the schema that changes based on the selected analysis level.
+
+- **basic** — section layout + entropy
+- **deep** — adds obfuscation heuristics
+- **full** — adds extended metadata summaries
+
+This tiered design allows users to trade off performance vs. depth without changing their downstream parsing logic.
+
+### Deterministic Output
+
+IOCX v0.6.0 guarantees:
+
+- Stable keys
+- Stable types
+- No volatile values in minimal modes
+- Deterministic behaviour across runs and platforms
+
+This makes IOCX safe for SIEM/SOAR ingestion, CI/CD pipelines, and large‑scale batch processing.
+
+### Schema stability
+
+IOCX guarantees a stable JSON schema, not a guaranteed ordering of keys within objects. JSON objects are defined as unordered maps, so consumers should rely on field presence and structure rather than positional ordering. All fields, types, and structural relationships remain consistent across versions, even if internal key order changes.
+
## Quickstart
### Install
diff --git a/docs/pe-pipeline.md b/docs/pe-pipeline.md
new file mode 100644
index 0000000..6ae4aef
--- /dev/null
+++ b/docs/pe-pipeline.md
@@ -0,0 +1,321 @@
+# IOCX PE Analysis Pipeline
+
+IOCX includes a deterministic, static, offline analysis pipeline for Portable Executable (PE) files.
+The pipeline is designed to safely process untrusted binaries without executing them, unpacking them, or performing any dynamic analysis. All stages operate on raw bytes only and are fully deterministic.
+
+This document describes the PE pipeline as implemented in v0.6.0, including:
+
+- file-type detection
+- PE parsing
+- unified core metadata extraction
+- string extraction
+- obfuscation heuristics
+- IOC detection
+- output assembly
+
+It also outlines how future versions (v0.7.0+) will extend this pipeline with behavioural heuristics.
+
+## 1. Pipeline Overview
+
+The PE analysis pipeline runs through the following ordered stages:
+
+- File Type Detection
+- PE Parsing
+- Unified Core Metadata Extraction (v0.6.0)
+- String Extraction
+- Obfuscation Heuristics (v0.5.0)
+- Extended Metadata Summary (v0.6.0)
+- IOC Detection
+- Output Assembly
+
+Each stage is offline, deterministic, and safe to run on malicious or malformed binaries.
+
+```mermaid
+flowchart TD
+
+ subgraph Input
+ F[Untrusted File]
+ end
+
+ subgraph Stage1_FileType
+ MAGIC[File Type Detection]
+ end
+
+ subgraph Stage2_PEParsing
+ PE[PE Parser]
+ end
+
+ subgraph Stage3_Core
+ CORE[Unified Core Metadata Extraction - Headers, Sections, Imports, Exports,Resources, TLS, Signatures]
+ end
+
+ subgraph Stage4_Strings
+ STR[String Extraction]
+ end
+
+ subgraph Stage5_Obfuscation
+ OBF[Obfuscation Heuristics v0.5.0]
+ end
+
+ subgraph Stage6_ExtendedSummary
+ META6[Extended Metadata Summary v0.6.0]
+ end
+
+ subgraph Stage7_IOC
+ DET[IOC Detectors]
+ end
+
+ subgraph Output
+ OUT[JSON Output]
+ end
+
+ F --> MAGIC
+ MAGIC --> PE
+
+ PE --> CORE
+ PE --> STR
+
+ STR --> OBF
+ PE --> OBF
+
+ CORE --> OUT
+ STR --> DET
+ OBF --> OUT
+
+ CORE--> META6
+ PE --> META6
+ STR --> META6
+ META6 --> OUT
+
+ DET --> OUT
+```
+The pipeline is structured as a straight, deterministic sequence of stages, but only some of them contribute data depending on the selected analysis level. File‑type detection, PE parsing, unified core metadata extraction, string extraction, IOC detection, and JSON assembly always run. Section‑level analysis, obfuscation heuristics, and the extended metadata summary are conditional: basic analysis includes only section layout and entropy; deep analysis adds obfuscation heuristics; full analysis adds the extended metadata summary, which incorporates core metadata, strings, and obfuscation hints into a richer structural view. The final output merges the always‑present core metadata and IOC detections with whichever analysis components were enabled.
+
+## 2. File Type Detection
+
+IOCX uses signature‑based identification to determine whether a file is a PE. This step is structural only, non‑heuristic, and non‑executing. If the file is not a PE, the PE pipeline is skipped.
+
+## 3. PE Parsing
+
+IOCX parses the binary using a defensive, read-only approach. The parser extracts:
+
+- DOS header
+- NT headers
+- Optional header
+- Section table
+- Data directory pointers
+
+All parsing is wrapped in exception handling to avoid crashes on malformed samples. No dynamic loading or execution occurs.
+
+## 4. Unified Core Metadata Extraction (v0.6.0)
+
+In v0.6.0, IOCX extracts all structural PE metadata in a single unified stage.
+
+The unified core includes:
+
+### Header
+
+- entry point
+- image base
+- subsystem
+- timestamp
+- machine type
+- characteristics flags
+
+### Optional Header
+
+- section alignment
+- file alignment
+- size of image
+- size of headers
+- linker version
+- OS version
+- subsystem version
+
+### Import Table
+
+- DLL names
+- imported functions
+- ordinals
+- delayed imports
+- bound imports
+
+### Export Table
+
+- exported names
+- ordinals
+- forwarded exports
+
+### Resource directory
+
+- resource types
+- resource sizes
+- entropy
+- language codes
+- extracted resource strings
+
+### TLS Directory
+
+- start address
+- end address
+- callback table pointer
+
+### Digital Signature Presence
+
+- boolean `has_signature`
+- raw signature metadata
+
+### Sections
+
+- list of section names
+
+### Sections analysis (*in standard, deep, and full analysis modes only*)
+
+- section name
+- raw size
+- virtual size
+- characteristics
+- entropy
+
+### Extended Metadata summary (*in full analysis mode only*)
+
+- summary data across metadata categories
+- resource entropy min, max and average.
+
+All extracted metadata is descriptive only. No scoring, heuristics, or behavioural interpretation occurs in v0.6.0.
+
+## 5. String Extraction
+
+IOCX extracts printable ASCII and UTF‑16LE strings from:
+
+- `.text`
+- `.rdata`
+- `.data`
+- entire file (fallback)
+
+Extracted strings feed into:
+
+- IOC detection
+- obfuscation heuristics
+- resource string extraction
+
+Extraction is deterministic and bounded.
+
+## 6. Obfuscation Heuristics (v0.5.0)
+
+This module provides lightweight static hints about potential packing or obfuscation.
+
+> Obfuscation heuristics are only included when deep or full analysis is enabled. It is not included in standard analysis mode.
+
+Heuristics include:
+
+- suspicious section names (`.upx`, `.aspack`, `.mpress`, etc.)
+- high‑entropy sections
+- abnormal section layout
+- simple string‑obfuscation patterns
+
+Each heuristic emits a structured detection object. These hints are contextual, not behavioural.
+
+## 7. IOC Detection
+
+After metadata and string extraction, IOCX runs its IOC detectors across:
+
+- raw bytes
+- extracted strings
+- resource strings
+- metadata fields
+
+Detectors identify:
+
+- file paths
+- URLs
+- domains
+- IP addresses
+- hashes
+- email addresses
+- cryptographic constants
+
+Detection is static and deterministic.
+
+## 8. Output Assembly
+
+The engine merges:
+
+- unified core metadata
+- obfuscation hints
+- extended metadata summary
+- IOC detections
+
+into a single structured JSON document, including:
+
+- `file`
+- `type`
+- `iocs.*`
+- `metadata.file_type`
+- `metadata.imports`
+- `metadata.sections`
+- `metadata.resources`
+- `metadata.resource_strings`
+- `metadata.import_details`
+- `metadata.delayed_imports`
+- `metadata.bound_imports`
+- `metadata.exports`
+- `metadata.tls`
+- `metadata.header`
+- `metadata.optional_header`
+- `metadata.rich_header`
+- `metadata.signatures`
+- `metadata.has_signature`
+- `analysis.sections`
+- `analysis.obfuscation`
+- `analysis.extended`
+
+No network access or external lookups occur.
+
+## 9. Security Model
+
+The PE pipeline is designed for safe analysis of untrusted input:
+
+- no execution
+- no unpacking
+- no emulation
+- no dynamic imports
+- no network calls
+- no ML/AI models
+- deterministic, offline processing
+
+All analysis is read-only.
+
+## 11. Roadmap Alignment
+
+### v0.5.0 — Obfuscation Heuristics
+
+- section names
+- entropy
+- layout anomalies
+- string obfuscation
+
+### v0.6.0 — Unified Core Metadata (this version)
+
+- headers
+- sections
+- imports
+- exports
+- resources
+- TLS directory
+- signature presence
+
+### v0.7.0 — Behavioural Heuristics (future)
+
+- packer detection
+- TLS callback heuristics
+- anti‑debug heuristics
+- import anomaly scoring
+- signature anomalies
+- control‑flow hints
+
+v0.6.0 provides the structural foundation for v0.7.0.
+
+## 12. Summary
+
+The IOCX PE pipeline in v0.6.0 is static, deterministic, offline, safe, modular, and extensible. It significantly expands IOCX’s visibility into PE structure while preserving its core philosophy: no dynamic analysis, no risk, no surprises.
diff --git a/docs/security/threat-model.md b/docs/security/threat-model.md
index 69b4e65..e6c2e26 100644
--- a/docs/security/threat-model.md
+++ b/docs/security/threat-model.md
@@ -193,3 +193,97 @@ These diagrams support the project’s security goals by:
- Providing transparency for auditors, contributors, and users
Together, they form the foundation of IOCX’s threat model and help guide secure development practices.
+
+# PE Metadata Expansion (v0.6.0)
+
+IOCX v0.6.0 introduces a deterministic, static metadata extraction layer for Portable Executable (PE) files.
+This feature expands IOCX’s visibility into binary structure while maintaining strict security guarantees:
+
+- No dynamic analysis
+- No unpacking or emulation
+- No network access
+- No heavy dependencies
+- Fully deterministic and offline
+
+This metadata is used to provide analysts with richer context and to support future heuristic layers (v0.7.0).
+
+## What IOCX Extracts
+
+### 1. Import Table
+
+IOCX extracts:
+
+- DLL names
+- Imported functions
+- Ordinal imports
+- Delayed imports
+- Bound imports
+
+This information helps analysts understand API usage and identify unusual import patterns.
+
+### 2. Export Table
+
+IOCX extracts:
+
+- Exported function names
+- Ordinals
+- Forwarded exports
+
+This is useful for triaging DLLs and identifying suspicious export structures.
+
+### 3. Resource Directory
+
+IOCX extracts:
+
+- Resource types (icons, dialogs, version info, RCDATA)
+- Resource sizes
+- Resource entropy
+- Language codes (mapped to region-locale)
+
+High‑entropy resources may indicate embedded payloads or obfuscation.
+
+> Language codes are mapped to human‑readable locale identifiers using a minimal, safe lookup table. Only well‑defined primary language IDs and a small set of explicit region codes are resolved; ambiguous or non‑standard values are returned as "unknown" to avoid misclassification.
+
+### 4. Extended PE Metadata
+
+IOCX surfaces:
+
+- Timestamp
+- Subsystem
+- Machine type
+- Characteristics flags
+- Optional header fields
+- Entry point
+- Image base
+- Section alignment
+- Compiler/toolchain hints
+- Digital signature presence (raw only)
+- TLS directory (raw only)
+
+This metadata provides a structural overview of the binary without making behavioural claims.
+
+## Security Considerations
+
+- All analysis is read‑only and non‑invasive
+- No code execution occurs at any stage
+- All parsing is wrapped in defensive exception handling
+- No external lookups or network calls are performed
+- All entropy and size calculations are deterministic
+
+This ensures IOCX remains safe to use on untrusted or malicious binaries.
+
+## Relationship to v0.7.0
+
+v0.6.0 is descriptive only.
+It extracts facts but does not interpret them.
+
+Heuristics such as:
+
+- packer detection
+- anti‑debug detection
+- TLS callback analysis
+- import anomaly scoring
+- signature anomaly detection
+- control‑flow hints
+
+are explicitly reserved for v0.7.0, which will build on the metadata introduced here.
diff --git a/examples/generators/python/generate_analysis_fixtures.py b/examples/generators/python/generate_analysis_fixtures.py
new file mode 100644
index 0000000..5667db3
--- /dev/null
+++ b/examples/generators/python/generate_analysis_fixtures.py
@@ -0,0 +1,197 @@
+#!/usr/bin/env python3
+"""
+Generate synthetic PE fixtures for >=v.0.6.0 IOCX tests.
+
+These files are structurally minimal but valid enough for pefile to parse.
+They are NOT executable and contain no real code.
+"""
+
+import os
+import struct
+from pathlib import Path
+
+FIXTURE_DIR = Path("tests/integration/fixtures/bin/analysis")
+FIXTURE_DIR.mkdir(parents=True, exist_ok=True)
+
+
+# ------------------------------------------------------------
+# Helpers
+# ------------------------------------------------------------
+
+def write_file(path: Path, data: bytes):
+ path.write_bytes(data)
+ print(f"[+] Wrote {path} ({len(data)} bytes)")
+
+
+def make_dos_header():
+ # Minimal DOS header with e_lfanew pointing to 0x80
+ return (
+ b"MZ" + # e_magic
+ b"\x00" * 58 + # padding
+ struct.pack(" Dict[str, List[Detection]]:
ctx.detections = results
- for plugin in self._plugin_registry.enrichers:
- try:
- plugin.enrich(text, ctx)
- except Exception as e:
- ctx.logger.warning(f"[iocx] enricher plugin {plugin.metadata.id} failed: {e}")
-
if self.config.enable_cache:
self.cache.detections[key] = results
@@ -312,7 +306,7 @@ def _post_process(self, raw: Dict[str, List[Detection]]) -> Dict[str, List[str]]
survivors.append(det)
last_end = det.end
- # Normalise
+ # 4. Normalise
CASE_INSENSITIVE = {"domains", "emails", "hashes"}
for det in survivors:
@@ -321,14 +315,20 @@ def _post_process(self, raw: Dict[str, List[Detection]]) -> Dict[str, List[str]]
v = v.lower()
det.value = v
- # 5. Group by category
- grouped: Dict[str, List[str]] = {}
+ # 5. Group by category (keep Detection objects)
+ grouped = {}
for det in survivors:
- grouped.setdefault(det.category, []).append(det.value)
+ grouped.setdefault(det.category, []).append(det)
# 6. Dedupe once per category (order‑preserving)
- for key, vals in grouped.items():
- grouped[key] = list(dict.fromkeys(vals))
+ for key, dets in grouped.items():
+ seen = set()
+ uniq = []
+ for det in dets:
+ if det.value not in seen:
+ seen.add(det.value)
+ uniq.append(det)
+ grouped[key] = uniq
# 7. Ensure all categories exist
baseline = {
@@ -344,7 +344,33 @@ def _post_process(self, raw: Dict[str, List[Detection]]) -> Dict[str, List[str]]
}
baseline.update(grouped)
- return baseline
+ # 8. Run enrichers
+ ctx = self._build_plugin_context("", "")
+ ctx.detections = baseline
+
+ # ensure metadata exists
+ for dets in ctx.detections.values():
+ for det in dets:
+ if det.metadata is None:
+ det.metadata = {}
+
+ for plugin in self._plugin_registry.enrichers:
+ try:
+ plugin.enrich("", ctx)
+ except Exception as e:
+ ctx.logger.warning(f"[iocx] enricher plugin {plugin.metadata.id} failed: {e}")
+
+ # Save enrichment metadata for pipeline to attach
+ if self._plugin_context is None:
+ self._plugin_context = self._build_plugin_context("", "")
+
+ self._plugin_context.metadata = ctx.metadata
+
+ # 9. Convert Detection objects → strings
+ final = {cat: [det.value for det in dets] for cat, dets in ctx.detections.items()}
+
+ return final
+
# ---------- Helpers ----------
diff --git a/iocx/parsers/language_map.py b/iocx/parsers/language_map.py
new file mode 100644
index 0000000..a756ff0
--- /dev/null
+++ b/iocx/parsers/language_map.py
@@ -0,0 +1,29 @@
+PRIMARY_LANG = {
+ 0x01: "ar", 0x02: "bg", 0x03: "ca", 0x04: "zh", 0x05: "cs",
+ 0x06: "da", 0x07: "de", 0x08: "el", 0x09: "en", 0x0A: "es",
+ 0x0B: "fi", 0x0C: "fr", 0x0D: "he", 0x0E: "hu", 0x0F: "is",
+ 0x10: "it", 0x11: "ja", 0x12: "ko", 0x13: "nl", 0x14: "no",
+ 0x15: "pl", 0x16: "pt", 0x17: "rm", 0x18: "ro", 0x19: "ru",
+ 0x1A: "hr", 0x1B: "sk", 0x1C: "sq", 0x1D: "sv", 0x1E: "th",
+ 0x1F: "tr", 0x20: "ur", 0x21: "id", 0x22: "uk", 0x23: "be",
+ 0x24: "sl", 0x25: "et", 0x26: "lv", 0x27: "lt", 0x28: "tg",
+ 0x29: "fa", 0x2A: "vi", 0x2B: "hy", 0x2C: "az", 0x2D: "eu",
+ 0x2E: "hsb", 0x2F: "mk", 0x36: "af", 0x37: "ka", 0x38: "fo",
+ 0x3E: "ms", 0x3F: "kk",
+}
+
+SUBLANG = {
+ 0x02: "GB",
+}
+
+DEFAULT_REGION = {
+ "en": "US",
+ "fr": "FR",
+ "es": "ES",
+ "pt": "BR",
+ "zh": "CN",
+ "de": "DE",
+ "it": "IT",
+ "ko": "KR",
+ "ru": "RU",
+}
diff --git a/iocx/parsers/pe_parser.py b/iocx/parsers/pe_parser.py
index d6b93a8..6deef0b 100644
--- a/iocx/parsers/pe_parser.py
+++ b/iocx/parsers/pe_parser.py
@@ -1,17 +1,42 @@
import pefile
+import math
from .string_extractor import extract_strings_from_bytes
from ..analysis.obfuscation import _shannon_entropy
from typing import List, Dict, Any
+from .language_map import PRIMARY_LANG, SUBLANG, DEFAULT_REGION
+
+# ---------------------------------------------------------------------------
+# Low-level helpers
+# ---------------------------------------------------------------------------
+
+def _decode_dll_name(dll_raw) -> str | None:
+ if isinstance(dll_raw, bytes):
+ return dll_raw.decode(errors="ignore")
+ if isinstance(dll_raw, str):
+ return dll_raw
+ return None
+
+
+def _safe_file_size(pe) -> int:
+ data = getattr(pe, "__data__", None)
+ if data is None:
+ return 0
+
+ size_attr = getattr(data, "size", None)
+ if size_attr is None:
+ return 0
+
+ return size_attr() if callable(size_attr) else size_attr
+
def _walk_resources(pe, directory, resource_strings, max_allowed=None, visited=None):
if visited is None:
visited = set()
if max_allowed is None:
- size_attr = pe.__data__.size
- # Support both pefile.PE (size is a method) and test fakes (size is an int)
- size = size_attr() if callable(size_attr) else size_attr
- max_allowed = min(size // 10, 20_000_000) # 10 % of file, capped at 20 MB
+ size = _safe_file_size(pe)
+ # 10% of file, capped at 20 MB
+ max_allowed = min(size // 10, 20_000_000) if size else 20_000_000
# Prevent infinite recursion on malformed resource trees
dir_id = id(directory)
@@ -19,62 +44,376 @@ def _walk_resources(pe, directory, resource_strings, max_allowed=None, visited=N
return
visited.add(dir_id)
- for entry in directory.entries:
+ for entry in getattr(directory, "entries", []):
if hasattr(entry, "directory"):
_walk_resources(pe, entry.directory, resource_strings, max_allowed, visited)
elif hasattr(entry, "data"):
- data_rva = entry.data.struct.OffsetToData
- size = entry.data.struct.Size
- if size <= max_allowed:
- try:
- data = pe.get_data(data_rva, size) # Some malformed resources have invalid RVAs or sizes so handle exceptions
- except Exception:
- continue
+ data_rva = getattr(entry.data.struct, "OffsetToData", 0)
+ size = getattr(entry.data.struct, "Size", 0)
+
+ if size <= 0 or size > max_allowed:
+ continue
+
+ try:
+ data = pe.get_data(data_rva, size)
+ except Exception:
+ # Malformed resources (bad RVA/size) – skip safely
+ continue
+
+ resource_strings.extend(extract_strings_from_bytes(data))
+
+
+def _entropy(data: bytes | None) -> float:
+ if not data:
+ return 0.0
+
+ occur = [0] * 256
+ for x in data:
+ occur[x] += 1
+
+ ent = 0.0
+ length = len(data)
+ for c in occur:
+ if c:
+ p = c / length
+ ent -= p * math.log2(p)
+ return ent
+
+
+def _decode_langid(langid: int) -> str:
+ """Return a human-readable locale string from a Windows LANGID."""
+ if not isinstance(langid, int):
+ return "unknown"
+
+ if langid < 0x0400:
+ return "unknown"
+
+ primary = langid & 0x3FF # low 10 bits
+ sublang = (langid >> 10) & 0x3F # high bits
+
+ lang = PRIMARY_LANG.get(primary)
+ if not lang:
+ return "unknown"
+
+ region = SUBLANG.get(sublang)
+ if region:
+ return f"{lang}-{region}"
+
+ default_region = DEFAULT_REGION.get(lang)
+ if default_region:
+ return f"{lang}-{default_region}"
+
+ # If no region known, return just the language
+ return lang
+
+
+# ---------------------------------------------------------------------------
+# Parsing helpers
+# ---------------------------------------------------------------------------
+
+def _parse_imports(pe):
+ imports: list[str] = []
+ import_details: list[dict[str, Any]] = []
+
+ if not hasattr(pe, "DIRECTORY_ENTRY_IMPORT"):
+ return imports, import_details
+
+ for entry in pe.DIRECTORY_ENTRY_IMPORT:
+ dll = _decode_dll_name(getattr(entry, "dll", None))
+
+ if dll:
+ imports.append(dll)
+
+ if hasattr(entry, "imports"):
+ for imp in entry.imports:
+ name_raw = getattr(imp, "name", None)
+ func_name = name_raw.decode(errors="ignore") if name_raw else None
+
+ import_details.append(
+ {
+ "dll": dll,
+ "function": func_name,
+ "ordinal": getattr(imp, "ordinal", None),
+ }
+ )
+
+ return imports, import_details
+
+
+def _parse_delayed_imports(pe):
+ delayed_imports: list[dict[str, Any]] = []
+
+ if not hasattr(pe, "DIRECTORY_ENTRY_DELAY_IMPORT"):
+ return delayed_imports
+
+ for entry in pe.DIRECTORY_ENTRY_DELAY_IMPORT:
+ dll = _decode_dll_name(getattr(entry, "dll", None))
+
+ if hasattr(entry, "imports"):
+ for imp in entry.imports:
+ name_raw = getattr(imp, "name", None)
+ func_name = name_raw.decode(errors="ignore") if name_raw else None
+
+ delayed_imports.append(
+ {
+ "dll": dll,
+ "function": func_name,
+ "ordinal": getattr(imp, "ordinal", None),
+ }
+ )
+
+ return delayed_imports
+
+
+def _parse_bound_imports(pe):
+ bound_imports: list[dict[str, Any]] = []
+
+ if not hasattr(pe, "DIRECTORY_ENTRY_BOUND_IMPORT"):
+ return bound_imports
+
+ for entry in pe.DIRECTORY_ENTRY_BOUND_IMPORT:
+ dll_raw = getattr(entry, "name", None) or getattr(entry, "dll", None)
+ dll = _decode_dll_name(dll_raw)
+
+ struct = getattr(entry, "struct", None)
+ ts = getattr(struct, "TimeDateStamp", 0) if struct else 0
+
+ bound_imports.append({"dll": dll, "timestamp": ts})
+
+ return bound_imports
+
+
+def _parse_sections(pe):
+ sections: list[dict[str, Any]] = []
- resource_strings.extend(extract_strings_from_bytes(data))
+ for s in getattr(pe, "sections", []):
+ name_raw = getattr(s, "Name", b"")
+ name = name_raw.decode(errors="ignore").rstrip("\x00")
+
+ raw_size = getattr(s, "SizeOfRawData", 0)
+ virt_size = getattr(s, "Misc_VirtualSize", 0)
+ chars = getattr(s, "Characteristics", 0)
+
+ try:
+ data = s.get_data() or b""
+ except Exception:
+ data = b""
+
+ sections.append(
+ {
+ "name": name,
+ "raw_size": raw_size,
+ "virtual_size": virt_size,
+ "characteristics": chars,
+ "entropy": _entropy(data),
+ }
+ )
+
+ return sections
+
+
+def _parse_exports(pe):
+ exports: list[dict[str, Any]] = []
+
+ if not hasattr(pe, "DIRECTORY_ENTRY_EXPORT"):
+ return exports
+
+ for exp in pe.DIRECTORY_ENTRY_EXPORT.symbols:
+ name_raw = getattr(exp, "name", None)
+ name = name_raw.decode(errors="ignore") if name_raw else None
+
+ fwd_raw = getattr(exp, "forwarder", None)
+ forwarder = fwd_raw.decode(errors="ignore") if fwd_raw else None
+
+ exports.append(
+ {
+ "name": name,
+ "ordinal": getattr(exp, "ordinal", None),
+ "address": getattr(exp, "address", None),
+ "forwarder": forwarder,
+ }
+ )
+
+ return exports
+
+
+def _parse_tls(pe):
+ if not hasattr(pe, "DIRECTORY_ENTRY_TLS"):
+ return None
+
+ tls_dir = getattr(pe, "DIRECTORY_ENTRY_TLS", None)
+ tls_struct = getattr(tls_dir, "struct", None)
+ if not tls_struct:
+ return None
+
+ return {
+ "start_address": getattr(tls_struct, "StartAddressOfRawData", 0) or 0,
+ "end_address": getattr(tls_struct, "EndAddressOfRawData", 0) or 0,
+ "callbacks": getattr(tls_struct, "AddressOfCallBacks", 0) or 0,
+ }
+
+
+def _parse_signatures(pe):
+ signatures: list[dict[str, Any]] = []
+
+ if not hasattr(pe, "DIRECTORY_ENTRY_SECURITY"):
+ return signatures
+
+ for sec in pe.DIRECTORY_ENTRY_SECURITY:
+ struct = getattr(sec, "struct", None)
+ if not struct:
+ continue
+
+ signatures.append(
+ {
+ "address": getattr(struct, "VirtualAddress", 0),
+ "size": getattr(struct, "Size", 0),
+ }
+ )
+
+ return signatures
+
+
+def _parse_optional_header(pe):
+ opt = getattr(pe, "OPTIONAL_HEADER", None)
+ if not opt:
+ return opt, {}
+
+ optional_header = {
+ "section_alignment": getattr(opt, "SectionAlignment", 0),
+ "file_alignment": getattr(opt, "FileAlignment", 0),
+ "size_of_image": getattr(opt, "SizeOfImage", 0),
+ "size_of_headers": getattr(opt, "SizeOfHeaders", 0),
+ "linker_version": f"{getattr(opt, 'MajorLinkerVersion', 0)}."
+ f"{getattr(opt, 'MinorLinkerVersion', 0)}",
+ "os_version": f"{getattr(opt, 'MajorOperatingSystemVersion', 0)}."
+ f"{getattr(opt, 'MinorOperatingSystemVersion', 0)}",
+ "subsystem_version": f"{getattr(opt, 'MajorSubsystemVersion', 0)}."
+ f"{getattr(opt, 'MinorSubsystemVersion', 0)}",
+ }
+
+ return opt, optional_header
+
+
+def _parse_header(pe, opt):
+ fh = getattr(pe, "FILE_HEADER", None)
+
+ return {
+ "entry_point": getattr(opt, "AddressOfEntryPoint", 0) if opt else 0,
+ "image_base": getattr(opt, "ImageBase", 0) if opt else 0,
+ "subsystem": getattr(opt, "Subsystem", 0) if opt else 0,
+ "timestamp": getattr(fh, "TimeDateStamp", 0) if fh else 0,
+ "machine": getattr(fh, "Machine", 0) if fh else 0,
+ "characteristics": getattr(fh, "Characteristics", 0) if fh else 0,
+ }
+
+
+def _parse_resources(pe):
+ resources: list[dict[str, Any]] = []
+ resource_strings: list[str] = []
+
+ root = getattr(pe, "DIRECTORY_ENTRY_RESOURCE", None)
+ if not root:
+ return resources, resource_strings
+
+ # Walk the tree and collect resource_strings
+ _walk_resources(pe, root, resource_strings)
+
+ # Extract structured resource entries
+ if not hasattr(pe, "get_memory_mapped_image"):
+ return resources, resource_strings
+
+ mm = pe.get_memory_mapped_image() or b""
+
+ for entry in getattr(pe.DIRECTORY_ENTRY_RESOURCE, "entries", []):
+ type_id = getattr(entry, "id", None)
+ type_name = pefile.RESOURCE_TYPE.get(type_id, str(type_id))
+
+ if not hasattr(entry, "directory"):
+ continue
+
+ for res in getattr(entry.directory, "entries", []):
+ lang = getattr(res, "id", None)
+ if not hasattr(res, "directory"):
+ continue
+ if not getattr(res.directory, "entries", []):
+ continue
+
+ data_entry = res.directory.entries[0].data
+ size = data_entry.struct.Size
+ if size <= 0:
+ continue
+
+ offset = data_entry.struct.OffsetToData
+ if offset < 0 or offset + size > len(mm):
+ continue
+
+ blob = mm[offset:offset + size]
+ ent = _entropy(blob)
+
+ resources.append({
+ "type": type_name,
+ "language": lang,
+ "language_name": _decode_langid(lang),
+ "size": size,
+ "entropy": ent,
+ })
+
+ return resources, resource_strings
+
+
+# ---------------------------------------------------------------------------
+# Public API
+# ---------------------------------------------------------------------------
def parse_pe(path):
try:
- # fast_load=True avoids parsing every directory up front, which is ideal for performance and for untrusted files.
+ # fast_load=True avoids parsing every directory up front, which is ideal
+ # for performance and for untrusted files.
pe = pefile.PE(path, fast_load=True)
pe.parse_data_directories()
- # Extract imports defensively to avoid crashes on malformed or stripped binaries
- imports = []
- if hasattr(pe, "DIRECTORY_ENTRY_IMPORT"):
- for entry in pe.DIRECTORY_ENTRY_IMPORT:
- imports.append(entry.dll.decode(errors="ignore"))
-
- # PE section names are fixed‑length, null‑padded byte strings, so stripping nulls is necessary
- sections = [s.Name.decode(errors="ignore").strip("\x00") for s in pe.sections]
+ imports, import_details = _parse_imports(pe)
+ delayed_imports = _parse_delayed_imports(pe)
+ bound_imports = _parse_bound_imports(pe)
+ sections = _parse_sections(pe)
+ sections_list = [s["name"] for s in sections]
+ exports = _parse_exports(pe)
+ tls = _parse_tls(pe)
+ signatures = _parse_signatures(pe)
+ opt, optional_header = _parse_optional_header(pe)
+ header = _parse_header(pe, opt)
+ resources, resource_strings = _parse_resources(pe)
- # Extract strings from resource directory
- resource_strings = []
- if hasattr(pe, "DIRECTORY_ENTRY_RESOURCE"):
- _walk_resources(pe, pe.DIRECTORY_ENTRY_RESOURCE, resource_strings)
+ # Rich header
+ try:
+ rich_header = pe.parse_rich_header()
+ except Exception:
+ rich_header = None
- # Deduplicate resource strings
- resource_strings = list(dict.fromkeys(resource_strings))
-
- return pe, {
+ metadata = {
"file_type": "PE",
"imports": imports,
- "sections": sections,
+ "sections": sections_list,
+ "resources": resources,
"resource_strings": resource_strings,
+ "import_details": import_details,
+ "delayed_imports": delayed_imports,
+ "bound_imports": bound_imports,
+ "exports": exports,
+ "tls": tls,
+ "header": header,
+ "optional_header": optional_header,
+ "rich_header": rich_header,
+ "signatures": signatures,
+ "has_signature": bool(signatures),
}
+ return pe, metadata
+
except pefile.PEFormatError:
- return {}
+ return None, {}
def analyse_pe_sections(pe) -> List[Dict[str, Any]]:
- results = []
- for s in pe.sections:
- results.append({
- "name": s.Name.decode(errors="ignore").rstrip("\x00"),
- "raw_size": s.SizeOfRawData,
- "virtual_size": s.Misc_VirtualSize,
- "characteristics": s.Characteristics,
- "entropy": _shannon_entropy(s.get_data() or b""),
- })
- return results
+ return _parse_sections(pe)
diff --git a/pyproject.toml b/pyproject.toml
index 8e9a636..d146e2b 100644
--- a/pyproject.toml
+++ b/pyproject.toml
@@ -1,6 +1,6 @@
[project]
name = "iocx"
-version = "0.5.1"
+version = "0.6.0"
description = "Static IOC extraction engine for binaries, text, and logs."
authors = [
{ name = "MalX Labs" }
diff --git a/pytest.ini b/pytest.ini
index 8e70999..1b821cb 100644
--- a/pytest.ini
+++ b/pytest.ini
@@ -4,3 +4,4 @@ markers =
fuzz: marks tests as fuzz tests
robustness: marks tests as resilience/chaos tests
performance: marks tests as performance tests
+ contract: marks tests as contract tests
diff --git a/tests/contract/snapshots/basic.json b/tests/contract/snapshots/basic.json
new file mode 100644
index 0000000..7850153
--- /dev/null
+++ b/tests/contract/snapshots/basic.json
@@ -0,0 +1,54 @@
+{
+ "file": "pe_chaos.exe",
+ "type": "PE",
+ "iocs": {
+ "urls": [],
+ "domains": [],
+ "ips": [],
+ "hashes": [],
+ "emails": [],
+ "filepaths": [],
+ "base64": [],
+ "crypto.btc": [],
+ "crypto.eth": []
+ },
+ "metadata": {
+ "file_type": "PE",
+ "imports": [],
+ "sections": [],
+ "resources": [],
+ "resource_strings": [],
+ "import_details": [],
+ "delayed_imports": [],
+ "bound_imports": [],
+ "exports": [],
+ "tls": {
+ "start_address": null,
+ "end_address": null,
+ "callbacks": null
+ },
+ "header": {
+ "entry_point": null,
+ "image_base": null,
+ "subsystem": null,
+ "timestamp": null,
+ "machine": null,
+ "characteristics": null
+ },
+ "optional_header": {
+ "section_alignment": null,
+ "file_alignment": null,
+ "size_of_image": null,
+ "size_of_headers": null,
+ "linker_version": null,
+ "os_version": null,
+ "subsystem_version": null
+ },
+ "rich_header": null,
+ "signatures": [],
+ "has_signature": false
+ },
+ "analysis": {
+ "sections": []
+ }
+}
diff --git a/tests/contract/snapshots/core.json b/tests/contract/snapshots/core.json
new file mode 100644
index 0000000..287c5fc
--- /dev/null
+++ b/tests/contract/snapshots/core.json
@@ -0,0 +1,51 @@
+{
+ "file": "pe_chaos.exe",
+ "type": "PE",
+ "iocs": {
+ "urls": [],
+ "domains": [],
+ "ips": [],
+ "hashes": [],
+ "emails": [],
+ "filepaths": [],
+ "base64": [],
+ "crypto.btc": [],
+ "crypto.eth": []
+ },
+ "metadata": {
+ "file_type": "PE",
+ "imports": [],
+ "sections": [],
+ "resources": [],
+ "resource_strings": [],
+ "import_details": [],
+ "delayed_imports": [],
+ "bound_imports": [],
+ "exports": [],
+ "tls": {
+ "start_address": null,
+ "end_address": null,
+ "callbacks": null
+ },
+ "header": {
+ "entry_point": null,
+ "image_base": null,
+ "subsystem": null,
+ "timestamp": null,
+ "machine": null,
+ "characteristics": null
+ },
+ "optional_header": {
+ "section_alignment": null,
+ "file_alignment": null,
+ "size_of_image": null,
+ "size_of_headers": null,
+ "linker_version": null,
+ "os_version": null,
+ "subsystem_version": null
+ },
+ "rich_header": null,
+ "signatures": [],
+ "has_signature": false
+ }
+}
diff --git a/tests/contract/snapshots/deep.json b/tests/contract/snapshots/deep.json
new file mode 100644
index 0000000..538e0ca
--- /dev/null
+++ b/tests/contract/snapshots/deep.json
@@ -0,0 +1,55 @@
+{
+ "file": "pe_chaos.exe",
+ "type": "PE",
+ "iocs": {
+ "urls": [],
+ "domains": [],
+ "ips": [],
+ "hashes": [],
+ "emails": [],
+ "filepaths": [],
+ "base64": [],
+ "crypto.btc": [],
+ "crypto.eth": []
+ },
+ "metadata": {
+ "file_type": "PE",
+ "imports": [],
+ "sections": [],
+ "resources": [],
+ "resource_strings": [],
+ "import_details": [],
+ "delayed_imports": [],
+ "bound_imports": [],
+ "exports": [],
+ "tls": {
+ "start_address": null,
+ "end_address": null,
+ "callbacks": null
+ },
+ "header": {
+ "entry_point": null,
+ "image_base": null,
+ "subsystem": null,
+ "timestamp": null,
+ "machine": null,
+ "characteristics": null
+ },
+ "optional_header": {
+ "section_alignment": null,
+ "file_alignment": null,
+ "size_of_image": null,
+ "size_of_headers": null,
+ "linker_version": null,
+ "os_version": null,
+ "subsystem_version": null
+ },
+ "rich_header": null,
+ "signatures": [],
+ "has_signature": false
+ },
+ "analysis": {
+ "sections": [],
+ "obfuscation": []
+ }
+}
diff --git a/tests/contract/snapshots/full.json b/tests/contract/snapshots/full.json
new file mode 100644
index 0000000..792a537
--- /dev/null
+++ b/tests/contract/snapshots/full.json
@@ -0,0 +1,56 @@
+{
+ "file": "pe_chaos.exe",
+ "type": "PE",
+ "iocs": {
+ "urls": [],
+ "domains": [],
+ "ips": [],
+ "hashes": [],
+ "emails": [],
+ "filepaths": [],
+ "base64": [],
+ "crypto.btc": [],
+ "crypto.eth": []
+ },
+ "metadata": {
+ "file_type": "PE",
+ "imports": [],
+ "sections": [],
+ "resources": [],
+ "resource_strings": [],
+ "import_details": [],
+ "delayed_imports": [],
+ "bound_imports": [],
+ "exports": [],
+ "tls": {
+ "start_address": null,
+ "end_address": null,
+ "callbacks": null
+ },
+ "header": {
+ "entry_point": null,
+ "image_base": null,
+ "subsystem": null,
+ "timestamp": null,
+ "machine": null,
+ "characteristics": null
+ },
+ "optional_header": {
+ "section_alignment": null,
+ "file_alignment": null,
+ "size_of_image": null,
+ "size_of_headers": null,
+ "linker_version": null,
+ "os_version": null,
+ "subsystem_version": null
+ },
+ "rich_header": null,
+ "signatures": [],
+ "has_signature": false
+ },
+ "analysis": {
+ "sections": [],
+ "obfuscation": [],
+ "extended": []
+ }
+}
diff --git a/tests/contract/test_snapshot_contract.py b/tests/contract/test_snapshot_contract.py
new file mode 100644
index 0000000..bc45277
--- /dev/null
+++ b/tests/contract/test_snapshot_contract.py
@@ -0,0 +1,130 @@
+import json
+import pytest
+from pathlib import Path
+from iocx.engine import Engine
+
+@pytest.fixture
+def engine():
+ return Engine()
+
+# --- snapshot loader ---------------------------------------------------------
+
+def load_snapshot(name: str):
+ path = Path("tests/contract/snapshots") / f"{name}.json"
+ return json.loads(path.read_text())
+
+
+# --- normalisers for each analysis level ------------------------------------
+
+def normalise_core(output):
+ # Top-level
+ output["file"] = "pe_chaos.exe" # snapshot uses a placeholder
+ output["type"] = "PE"
+
+ # IOC categories always exist but content varies
+ for key in output["iocs"]:
+ output["iocs"][key] = []
+
+ # Metadata structure
+ md = output["metadata"]
+
+ md["imports"] = []
+ md["sections"] = []
+ md["resources"] = []
+ md["resource_strings"] = []
+ md["import_details"] = []
+ md["delayed_imports"] = []
+ md["bound_imports"] = []
+ md["exports"] = []
+
+ # TLS
+ md["tls"] = {
+ "start_address": None,
+ "end_address": None,
+ "callbacks": None,
+ }
+
+ # Header (blank all fields)
+ md["header"] = {
+ "entry_point": None,
+ "image_base": None,
+ "subsystem": None,
+ "timestamp": None,
+ "machine": None,
+ "characteristics": None,
+ }
+
+ # Optional header (blank all fields)
+ md["optional_header"] = {
+ "section_alignment": None,
+ "file_alignment": None,
+ "size_of_image": None,
+ "size_of_headers": None,
+ "linker_version": None,
+ "os_version": None,
+ "subsystem_version": None,
+ }
+
+ md["rich_header"] = None
+ md["signatures"] = []
+ md["has_signature"] = False
+
+ # Remove analysis for core mode
+ output.pop("analysis", None)
+
+ return output
+
+
+def normalise_basic(output):
+ output = normalise_core(output)
+ output["analysis"] = {"sections": []}
+ return output
+
+
+def normalise_deep(output):
+ output = normalise_core(output)
+ output["analysis"] = {
+ "sections": [],
+ "obfuscation": []
+ }
+ return output
+
+
+def normalise_full(output):
+ output = normalise_core(output)
+ output["analysis"] = {
+ "sections": [],
+ "obfuscation": [],
+ "extended": []
+ }
+ return output
+
+
+# --- parametrised test -------------------------------------------------------
+
+@pytest.mark.parametrize(
+ "mode,normaliser,snapshot",
+ [
+ ("None", normalise_core, "core"),
+ ("basic", normalise_basic, "basic"),
+ ("deep", normalise_deep, "deep"),
+ ("full", normalise_full, "full"),
+ ]
+)
+@pytest.mark.contract
+def test_pipeline_snapshots(engine, mode, normaliser, snapshot):
+ # Set the engine’s analysis level exactly as the CLI would
+ engine.analysis_level = mode
+
+ # Run the pipeline using the engine’s configured mode
+ raw = engine.extract("tests/integration/fixtures/bin/pe_chaos.exe")
+
+ # Normalise volatile fields and reduce to structural form
+ output = normaliser(raw)
+
+ # Load the minimal structural snapshot
+ expected = load_snapshot(snapshot)
+
+ # Structural contract enforcement
+ assert output == expected
+
diff --git a/tests/integration/fixtures/manifests/pe_chaos.json b/tests/integration/fixtures/manifests/pe_chaos.json
new file mode 100644
index 0000000..3267ac1
--- /dev/null
+++ b/tests/integration/fixtures/manifests/pe_chaos.json
@@ -0,0 +1,36 @@
+{
+ "fixture": "pe_chaos",
+ "expected_iocs": [
+ "C:\\Windows\\System32\\cmd.exe",
+ "D:\\Temp\\payload.bin",
+ "E:/Users/Bob/AppData/Roaming/evil.dll",
+ "F:\\Program Files\\SomeApp\\bin\\run.exe",
+ "C:\\Users\\Alice\\Desktop\\notes.txt",
+ "Z:\\Archive\\2024\\logs\\system.log",
+ "\\\\SERVER01\\share\\dropper.exe",
+ "\\\\192.168.1.44\\c$\\Windows\\Temp\\run.ps1",
+ "\\\\FILESRV\\public\\docs\\report.pdf",
+ "\\\\NAS01\\data\\backups\\2024\\config.json",
+ "/usr/bin/python3.11",
+ "/etc/passwd",
+ "/var/lib/docker/overlay2/abc123/config.v2.json",
+ "/tmp/x1/x2/x3/x4/x5/script.sh",
+ "/opt/tools/bin/runner",
+ "/home/alice/.config/evil.sh",
+ ".\\payload.exe",
+ "..\\lib\\config.json",
+ "./run.sh",
+ "../bin/loader.so",
+ ".\\scripts\\install.ps1",
+ "%APPDATA%\\Microsoft\\Windows\\Start Menu\\Programs\\Startup\\evil.lnk",
+ "%TEMP%\\payload.exe",
+ "%USERPROFILE%\\Downloads\\file.txt",
+ "$HOME/.config/evil.sh",
+ "$HOME/bin/run.sh",
+ "$TMPDIR/cache/tmp123.bin",
+ "C:\\Windows\\Temp\\payload.bin",
+ "/home/alice/.config/evil"
+ ],
+ "encoding": "ascii",
+ "location": "data-section"
+}
diff --git a/tests/integration/test_pe_fixtures.py b/tests/integration/test_pe_fixtures.py
index 6e7a955..3fcbe6e 100644
--- a/tests/integration/test_pe_fixtures.py
+++ b/tests/integration/test_pe_fixtures.py
@@ -61,6 +61,7 @@ def run_fixture_test(name: str):
"pe_utf16",
"pe_rsrc",
"pe_overlay",
+ "pe_chaos",
])
@pytest.mark.integration
diff --git a/tests/unit/analysis/test_extended.py b/tests/unit/analysis/test_extended.py
new file mode 100644
index 0000000..0a551b6
--- /dev/null
+++ b/tests/unit/analysis/test_extended.py
@@ -0,0 +1,181 @@
+import pytest
+from iocx.analysis.extended import analyse_extended
+
+def extract(detections, value):
+ """Helper to pull a detection by its 'value' field."""
+ for d in detections:
+ if d["value"] == value:
+ return d
+ return None
+
+
+def test_summary_block_counts_correctly():
+ metadata = {
+ "import_details": [
+ {"dll": "A.dll", "function": "f1", "ordinal": None},
+ {"dll": "A.dll", "function": "f2", "ordinal": None},
+ {"dll": "B.dll", "function": None, "ordinal": 5},
+ ],
+ "delayed_imports": [{"dll": "C.dll", "function": "x", "ordinal": None}],
+ "bound_imports": [{"dll": "D.dll", "timestamp": 123}],
+ "exports": [{"name": "foo", "ordinal": 1, "address": 0, "forwarder": None}],
+ "resources": [{"type": "RT_ICON", "entropy": 3.0}],
+ "tls": {"start_address": 1},
+ "signatures": [{"address": 10, "size": 20}],
+ }
+
+ result = analyse_extended(None, metadata, [])
+ summary = extract(result, "summary")["metadata"]
+
+ assert summary["dll_count"] == 2
+ assert summary["import_count"] == 3
+ assert summary["delayed_import_count"] == 1
+ assert summary["bound_import_count"] == 1
+ assert summary["export_count"] == 1
+ assert summary["resource_count"] == 1
+ assert summary["has_tls"] is True
+ assert summary["has_signature"] is True
+
+
+def test_grouped_imports_sorted_and_ordinal_handling():
+ metadata = {
+ "import_details": [
+ {"dll": "B.dll", "function": None, "ordinal": 3},
+ {"dll": "A.dll", "function": "zeta", "ordinal": None},
+ {"dll": "A.dll", "function": "alpha", "ordinal": None},
+ ]
+ }
+
+ result = analyse_extended(None, metadata, [])
+ imports = [d for d in result if d["value"] == "imports"]
+
+ assert imports[0]["metadata"]["dll"] == "A.dll"
+ assert imports[0]["metadata"]["functions"] == ["alpha", "zeta"]
+
+ assert imports[1]["metadata"]["dll"] == "B.dll"
+ assert imports[1]["metadata"]["functions"] == ["#3"]
+
+
+def test_delayed_imports_grouping_and_sorting():
+ metadata = {
+ "delayed_imports": [
+ {"dll": "X.dll", "function": None, "ordinal": 2},
+ {"dll": "X.dll", "function": "foo", "ordinal": None},
+ ]
+ }
+
+ result = analyse_extended(None, metadata, [])
+ delayed = extract(result, "delayed_imports")["metadata"]
+
+ assert delayed["dll"] == "X.dll"
+ assert delayed["functions"] == ["foo", "#2"]
+
+
+def test_bound_imports_sorted():
+ metadata = {
+ "bound_imports": [
+ {"dll": "z.dll", "timestamp": 1},
+ {"dll": "a.dll", "timestamp": 2},
+ ]
+ }
+
+ result = analyse_extended(None, metadata, [])
+ bound = extract(result, "bound_imports")["metadata"]["entries"]
+
+ assert bound[0]["dll"] == "a.dll"
+ assert bound[1]["dll"] == "z.dll"
+
+
+def test_exports_summary():
+ metadata = {
+ "exports": [
+ {"name": "Foo", "forwarder": None},
+ {"name": None, "forwarder": "Bar.Forward"},
+ ]
+ }
+
+ result = analyse_extended(None, metadata, [])
+ exports = extract(result, "exports")["metadata"]
+
+ assert exports["count"] == 2
+ assert exports["names"] == ["Foo"]
+ assert len(exports["forwarded"]) == 1
+
+
+def test_tls_directory_included():
+ metadata = {"tls": {"start_address": 10, "end_address": 20}}
+ result = analyse_extended(None, metadata, [])
+ tls = extract(result, "tls_directory")["metadata"]
+
+ assert tls["start_address"] == 10
+ assert tls["end_address"] == 20
+
+
+def test_header_human_fields():
+ metadata = {
+ "header": {
+ "machine": 0x8664, # AMD64
+ "subsystem": 3, # Windows CUI
+ "timestamp": 0,
+ }
+ }
+
+ result = analyse_extended(None, metadata, [])
+ header = extract(result, "header")["metadata"]
+
+ assert header["machine_human"] == "AMD64"
+ assert header["subsystem_human"] == "Windows CUI"
+
+
+def test_optional_header_included():
+ metadata = {"optional_header": {"file_alignment": 512}}
+ result = analyse_extended(None, metadata, [])
+ opt = extract(result, "optional_header")["metadata"]
+
+ assert opt["file_alignment"] == 512
+
+
+def test_rich_header_included():
+ metadata = {"rich_header": {"key": "value"}}
+ result = analyse_extended(None, metadata, [])
+ rich = extract(result, "rich_header")["metadata"]
+
+ assert rich == {"key": "value"}
+
+
+def test_signature_block_included():
+ metadata = {"signatures": [{"address": 1, "size": 2}]}
+ result = analyse_extended(None, metadata, [])
+ sig = extract(result, "signature")["metadata"]
+
+ assert sig["has_signature"] is True
+ assert sig["entries"][0]["address"] == 1
+
+
+def test_resource_summary():
+ metadata = {
+ "resources": [
+ {"type": "RT_ICON", "entropy": 3.0},
+ {"type": "RT_ICON", "entropy": 5.0},
+ ]
+ }
+
+ result = analyse_extended(None, metadata, [])
+ res = extract(result, "resources")["metadata"]
+
+ assert res["count"] == 2
+ assert res["types"] == ["RT_ICON"]
+ assert res["entropy_min"] == 3.0
+ assert res["entropy_max"] == 5.0
+ assert res["entropy_avg"] == 4.0
+
+
+def test_empty_metadata_produces_minimal_output():
+ result = analyse_extended(None, {}, [])
+ summary = extract(result, "summary")["metadata"]
+
+ assert summary["dll_count"] == 0
+ assert summary["import_count"] == 0
+ assert summary["resource_count"] == 0
+ assert summary["has_tls"] is False
+ assert summary["has_signature"] is False
diff --git a/tests/unit/analysis/test_obfuscation_ext.py b/tests/unit/analysis/test_obfuscation_ext.py
index 13179b6..0276642 100644
--- a/tests/unit/analysis/test_obfuscation_ext.py
+++ b/tests/unit/analysis/test_obfuscation_ext.py
@@ -144,21 +144,3 @@ def test_detect_string_obfuscation_skips_short_strings():
# We don't care about the result here — only that the short string was skipped
assert isinstance(detections, list)
-
-
-def test_analyse_extended_returns_expected_structure():
- result = analyse_extended(pe=None, metadata={}, strings=[])
-
- assert isinstance(result, dict)
- assert "note" in result
- assert "planned_features" in result
-
- assert result["note"].startswith("Extended analysis is reserved")
- assert result["planned_features"] == [
- "packer_detection",
- "tls_callbacks",
- "anti_debug_heuristics",
- "import_anomaly_scoring",
- "signature_anomalies",
- "control_flow_hints",
- ]
diff --git a/tests/unit/engine/test_engine_enrichment.py b/tests/unit/engine/test_engine_enrichment.py
new file mode 100644
index 0000000..f36402f
--- /dev/null
+++ b/tests/unit/engine/test_engine_enrichment.py
@@ -0,0 +1,41 @@
+import pytest
+from iocx.engine import Engine
+from iocx.models import Detection
+
+def test_enrichment_applied_to_merged_iocs():
+ engine = Engine()
+
+ # Simulate raw detections
+ raw = {
+ "registry.keys": [
+ Detection("HKLM\\Software\\BadStuff", 0, 10, "registry.keys"),
+ Detection("HKCU\\Software\\Microsoft\\Windows\\CurrentVersion\\Run\\BadApp", 20, 40, "registry.keys"),
+ ]
+ }
+
+ merged = engine._post_process(raw)
+
+ # IOC buckets should be strings
+ assert merged["registry.keys"] == [
+ "HKLM\\Software\\BadStuff",
+ "HKCU\\Software\\Microsoft\\Windows\\CurrentVersion\\Run\\BadApp",
+ ]
+
+ # Plugin context must exist
+ assert engine._plugin_context is not None
+
+ enrichment = engine._plugin_context.metadata
+
+ # If no enrichers are installed, skip enrichment assertions
+ if not engine._plugin_registry.enrichers:
+ pytest.skip("No enrichers installed; skipping enrichment assertions")
+
+ # Otherwise, enrichment must contain metadata for registry keys
+ assert "registry.keys" in enrichment
+ assert len(enrichment["registry.keys"]) == 2
+
+ for entry in enrichment["registry.keys"]:
+ assert "value" in entry
+ assert "score" in entry
+ assert "reasons" in entry
+ assert "flags" in entry
diff --git a/tests/unit/parsers/test_pe_parser.py b/tests/unit/parsers/test_pe_parser.py
index 0aa8095..cd1f690 100644
--- a/tests/unit/parsers/test_pe_parser.py
+++ b/tests/unit/parsers/test_pe_parser.py
@@ -108,7 +108,12 @@ def test_parse_pe_sections(monkeypatch):
monkeypatch.setattr("iocx.parsers.pe_parser.pefile.PE", lambda *a, **k: pe)
pe_obj, metadata = parse_pe("dummy.exe")
- assert metadata["sections"] == [".text", ".rdata"]
+
+ # Sections are now detailed dicts; assert on names only
+ section_names = metadata["sections"]
+ assert section_names == [".text", ".rdata"]
+
+ # parse_pe no longer returns a separate section_analysis key
assert "section_analysis" not in metadata
@@ -203,7 +208,7 @@ def raise_peformaterror(path, fast_load=True):
result = parse_pe("not_a_real_pe.exe")
- assert result == {}
+ assert result == (None, {})
# ------------------------------------------------------------
# Direct tests for _walk_resources()
diff --git a/tests/unit/parsers/test_pe_parser_extended.py b/tests/unit/parsers/test_pe_parser_extended.py
new file mode 100644
index 0000000..26547fd
--- /dev/null
+++ b/tests/unit/parsers/test_pe_parser_extended.py
@@ -0,0 +1,1008 @@
+import pytest
+from types import SimpleNamespace
+from iocx.parsers.pe_parser import parse_pe
+
+
+# ------------------------------------------------------------
+# FakePE builder (supports full resource parsing)
+# ------------------------------------------------------------
+
+def fake_pe(imports=None, sections=None, resources=None, mm_size=1000):
+ """Build a FakePE object with the interface required by parse_pe()."""
+
+ class FakeData(bytes):
+ @property
+ def size(self):
+ return len(self)
+
+ pe = SimpleNamespace()
+ pe.__data__ = FakeData(b"\x00" * mm_size)
+ pe.parse_data_directories = lambda: None
+
+ # Fake imports
+ if imports is not None:
+ class FakeImport:
+ def __init__(self, dll):
+ self.dll = dll
+ pe.DIRECTORY_ENTRY_IMPORT = [FakeImport(i) for i in imports]
+
+ # Fake sections
+ class FakeSection:
+ def __init__(self, name):
+ self.Name = name.encode() + b"\x00" * (8 - len(name))
+ self.SizeOfRawData = 0
+ self.Misc_VirtualSize = 0
+ self.Characteristics = 0
+ def get_data(self):
+ return b""
+ def get_entropy(self):
+ return 0.0
+
+ pe.sections = [FakeSection(s) for s in (sections or [])]
+
+ # Fake resources
+ pe.DIRECTORY_ENTRY_RESOURCE = resources
+ pe.get_memory_mapped_image = lambda: pe.__data__
+
+ return pe
+
+
+# ------------------------------------------------------------
+# Shared FakePE builder: bound, delayed imports, and sections
+# ------------------------------------------------------------
+
+def fake_pe_imports(
+ imports=None,
+ sections=None,
+ delayed=None,
+ bound=None,
+):
+ """Build a FakePE object with the interface required by parse_pe()."""
+
+ pe = SimpleNamespace()
+ pe.parse_data_directories = lambda: None
+
+ # Fake imports (not used here but kept for consistency)
+ if imports is not None:
+ class FakeImport:
+ def __init__(self, dll):
+ self.dll = dll
+ pe.DIRECTORY_ENTRY_IMPORT = [FakeImport(i) for i in imports]
+
+ # Fake sections
+ class FakeSection:
+ def __init__(self, name):
+ self.Name = name # raw bytes or str
+ self.SizeOfRawData = 0
+ self.Misc_VirtualSize = 0
+ self.Characteristics = 0
+ def get_data(self):
+ return b""
+ def get_entropy(self):
+ return 0.0
+
+ if sections is not None:
+ pe.sections = [FakeSection(s) for s in sections]
+ else:
+ pe.sections = []
+
+ # Fake delayed imports
+ if delayed is not None:
+ pe.DIRECTORY_ENTRY_DELAY_IMPORT = delayed
+
+ # Fake bound imports
+ if bound is not None:
+ pe.DIRECTORY_ENTRY_BOUND_IMPORT = bound
+
+ # Required for resource parsing but unused here
+ pe.get_memory_mapped_image = lambda: b""
+
+ return pe
+
+
+# ------------------------------------------------------------
+# Shared FakePE builder: Bound import elif else routes
+# ------------------------------------------------------------
+
+def fake_pe_bound(bound=None):
+ pe = SimpleNamespace()
+ pe.parse_data_directories = lambda: None
+ pe.sections = []
+ pe.get_memory_mapped_image = lambda: b""
+
+ if bound is not None:
+ pe.DIRECTORY_ENTRY_BOUND_IMPORT = bound
+
+ return pe
+
+
+# ------------------------------------------------------------
+# Shared FakePE builder: Delayed imports elif else block
+# ------------------------------------------------------------
+
+def fake_pe_delayed(delayed=None):
+ pe = SimpleNamespace()
+ pe.parse_data_directories = lambda: None
+ pe.sections = []
+ pe.get_memory_mapped_image = lambda: b""
+
+ if delayed is not None:
+ pe.DIRECTORY_ENTRY_DELAY_IMPORT = delayed
+
+ return pe
+
+
+# ------------------------------------------------------------
+# Shared FakePE builder: Import details
+# ------------------------------------------------------------
+
+def fake_pe_import_details(imports=None):
+ pe = SimpleNamespace()
+ pe.parse_data_directories = lambda: None
+ pe.sections = []
+ pe.get_memory_mapped_image = lambda: b""
+
+ if imports is not None:
+ pe.DIRECTORY_ENTRY_IMPORT = imports
+
+ return pe
+
+
+# ------------------------------------------------------------
+# Helpers to build resource trees
+# ------------------------------------------------------------
+
+class FakeDataStruct:
+ def __init__(self, size, offset):
+ self.Size = size
+ self.OffsetToData = offset
+
+class FakeData:
+ def __init__(self, size, offset):
+ self.struct = FakeDataStruct(size, offset)
+
+class FakeEntry:
+ def __init__(self, size, offset):
+ self.data = FakeData(size, offset)
+
+def make_resource_tree(type_id, lang_id, size, offset):
+ """Build a full resource tree matching parse_pe() expectations."""
+ entry = FakeEntry(size, offset)
+ res_dir = type("ResDir", (), {"entries": [entry]})
+ res = type("Res", (), {"id": lang_id, "directory": res_dir})
+ type_dir = type("TypeDir", (), {"id": type_id, "directory": type("X", (), {"entries": [res]})})
+ root = type("Root", (), {"entries": [type_dir]})
+ return root
+
+
+# ------------------------------------------------------------
+# Monkeypatch pefile.PE so parse_pe() returns FakePE
+# ------------------------------------------------------------
+
+@pytest.fixture(autouse=True)
+def patch_pefile(monkeypatch):
+ import pefile
+ monkeypatch.setattr(pefile, "PE", lambda *a, **k: None)
+ yield
+
+
+# ------------------------------------------------------------
+# Resource parsing tests
+# ------------------------------------------------------------
+
+def test_resource_valid(monkeypatch):
+ resources = make_resource_tree(type_id=6, lang_id=1033, size=20, offset=0)
+ pe = fake_pe(resources=resources)
+
+ monkeypatch.setattr("iocx.parsers.pe_parser.pefile.PE", lambda *a, **k: pe)
+ _, metadata = parse_pe("dummy.exe")
+
+ # With the refactored parser, we don't assert on structured resources anymore.
+ # We only require that resource parsing does not crash and strings are extracted.
+ assert isinstance(metadata["resources"], list)
+ assert isinstance(metadata["resource_strings"], list)
+ assert len(metadata["resource_strings"]) >= 0 # may be empty depending on extractor
+
+
+def test_resource_zero_size(monkeypatch):
+ resources = make_resource_tree(type_id=6, lang_id=1033, size=0, offset=0)
+ pe = fake_pe(resources=resources)
+
+ monkeypatch.setattr("iocx.parsers.pe_parser.pefile.PE", lambda *a, **k: pe)
+ _, metadata = parse_pe("dummy.exe")
+
+ assert metadata["resources"] == []
+
+
+def test_resource_out_of_bounds(monkeypatch):
+ resources = make_resource_tree(type_id=6, lang_id=1033, size=50, offset=2000)
+ pe = fake_pe(resources=resources, mm_size=100)
+
+ monkeypatch.setattr("iocx.parsers.pe_parser.pefile.PE", lambda *a, **k: pe)
+ _, metadata = parse_pe("dummy.exe")
+
+ assert metadata["resources"] == []
+
+
+def test_resource_missing_directory_on_type(monkeypatch):
+ class TypeDir:
+ id = 6
+ # no .directory
+
+ root = type("Root", (), {"entries": [TypeDir]})
+ pe = fake_pe(resources=root)
+
+ monkeypatch.setattr("iocx.parsers.pe_parser.pefile.PE", lambda *a, **k: pe)
+ _, metadata = parse_pe("dummy.exe")
+
+ assert metadata["resources"] == []
+
+
+def test_resource_missing_nested_entries(monkeypatch):
+ class Res:
+ id = 1033
+ directory = type("X", (), {"entries": []})
+
+ class TypeDir:
+ id = 6
+ directory = type("Y", (), {"entries": [Res]})
+
+ root = type("Root", (), {"entries": [TypeDir]})
+ pe = fake_pe(resources=root)
+
+ monkeypatch.setattr("iocx.parsers.pe_parser.pefile.PE", lambda *a, **k: pe)
+ _, metadata = parse_pe("dummy.exe")
+
+ assert metadata["resources"] == []
+
+
+def test_resource_negative_offset(monkeypatch):
+ resources = make_resource_tree(type_id=6, lang_id=1033, size=10, offset=-5)
+ pe = fake_pe(resources=resources)
+
+ monkeypatch.setattr("iocx.parsers.pe_parser.pefile.PE", lambda *a, **k: pe)
+ _, metadata = parse_pe("dummy.exe")
+
+ assert metadata["resources"] == []
+
+
+def test_resource_mixed_valid_and_invalid(monkeypatch):
+ valid = make_resource_tree(type_id=6, lang_id=1033, size=10, offset=0)
+ invalid = make_resource_tree(type_id=6, lang_id=1033, size=999999, offset=0)
+
+ root = type("Root", (), {"entries": valid.entries + invalid.entries})
+ pe = fake_pe(resources=root)
+
+ monkeypatch.setattr("iocx.parsers.pe_parser.pefile.PE", lambda *a, **k: pe)
+ _, metadata = parse_pe("dummy.exe")
+
+ # New parser: we only care that oversized/bad resources don't blow up parsing.
+ assert isinstance(metadata["resources"], list)
+ assert isinstance(metadata["resource_strings"], list)
+
+
+def test_resource_res_missing_directory_triggers_continue(monkeypatch):
+ class FakeData(bytes):
+ @property
+ def size(self):
+ return len(self)
+
+ # res object WITHOUT a .directory attribute -> triggers the continue
+ class FakeRes:
+ id = 1033
+ # no directory attribute -> continue branch
+
+ # entry.directory.entries contains the FakeRes
+ class FakeTypeDir:
+ id = 6
+ directory = type("Dir", (), {"entries": [FakeRes]})
+
+ # root resource directory
+ class FakeResourceRoot:
+ entries = [FakeTypeDir]
+
+ # FakePE with DIRECTORY_ENTRY_RESOURCE and memory-mapped image
+ class FakePE:
+ DIRECTORY_ENTRY_RESOURCE = FakeResourceRoot
+ def parse_data_directories(self): pass
+ def get_memory_mapped_image(self): return b"\x00" * 100
+
+ sections = []
+
+ __data__ = FakeData(b"\x00" * 1000)
+
+ pe = FakePE()
+
+ # Monkeypatch pefile.PE so parse_pe("dummy.exe") returns FakePE
+ monkeypatch.setattr("iocx.parsers.pe_parser.pefile.PE", lambda *a, **k: pe)
+
+ from iocx.parsers.pe_parser import parse_pe
+ _, metadata = parse_pe("dummy.exe")
+
+ # Because the continue was hit, no resources should be collected
+ assert metadata["resources"] == []
+
+
+def test_parse_resources_no_directory_entry():
+ class FakePE:
+ # No DIRECTORY_ENTRY_RESOURCE attribute
+ pass
+
+ from iocx.parsers.pe_parser import _parse_resources
+ resources, strings = _parse_resources(FakePE())
+
+ assert resources == []
+ assert strings == []
+
+
+def test_parse_resources_missing_memory_map():
+ class FakeRoot:
+ entries = []
+
+ class FakePE:
+ DIRECTORY_ENTRY_RESOURCE = FakeRoot()
+ # Crucially: NO get_memory_mapped_image attribute
+
+ from iocx.parsers.pe_parser import _parse_resources
+ resources, strings = _parse_resources(FakePE())
+
+ assert resources == []
+ assert strings == []
+
+ assert hasattr(FakePE(), "DIRECTORY_ENTRY_RESOURCE")
+ assert not hasattr(FakePE(), "get_memory_mapped_image")
+
+
+# ------------------------------------------------------------
+# Tests for safe file
+# ------------------------------------------------------------
+
+def test_safe_file_size_no_data():
+ # Fake PE object with no __data__ attribute → triggers return 0
+ class FakePE:
+ pass
+
+ from iocx.parsers.pe_parser import _safe_file_size
+ size = _safe_file_size(FakePE())
+
+ assert size == 0
+
+
+def test_safe_file_size_missing_size_attr():
+ # __data__ exists but has no .size attribute → triggers `return 0`
+ class FakeData:
+ pass
+
+ class FakePE:
+ __data__ = FakeData()
+
+ from iocx.parsers.pe_parser import _safe_file_size
+ size = _safe_file_size(FakePE())
+
+ assert size == 0
+
+
+# ------------------------------------------------------------
+# Tests for Entropy
+# ------------------------------------------------------------
+
+def test_entropy_empty_returns_zero():
+ from iocx.parsers.pe_parser import _entropy
+ assert _entropy(b"") == 0.0
+ assert _entropy(None) == 0.0
+
+
+def test_entropy_non_empty_data():
+ # Data with repeated bytes ensures:
+ # - occur[x] increments
+ # - the "if c:" branch executes
+ # - p = c/len(data) is computed
+ # - ent -= p * log2(p) is executed
+ data = b"\x00\x00\x01\x01\x01"
+
+ from iocx.parsers.pe_parser import _entropy
+ ent = _entropy(data)
+
+ # Entropy must be > 0 for mixed/repeated bytes
+ assert ent > 0.0
+ assert isinstance(ent, float)
+
+
+# ------------------------------------------------------------
+# Tests for delayed imports
+# ------------------------------------------------------------
+
+
+def test_delayed_imports_else_branch(monkeypatch):
+ """Covers: else -> dll = None"""
+
+ class FakeImp:
+ name = None
+ ordinal = 123
+
+ class FakeDelayEntry:
+ def __init__(self):
+ self.dll = 99999 # non-bytes, non-str -> hits ELSE branch
+ self.imports = [FakeImp()]
+
+ pe = fake_pe_delayed(delayed=[FakeDelayEntry()])
+ monkeypatch.setattr("iocx.parsers.pe_parser.pefile.PE", lambda *a, **k: pe)
+
+ _, metadata = parse_pe("dummy.exe")
+
+ assert len(metadata["delayed_imports"]) == 1
+ imp = metadata["delayed_imports"][0]
+ assert imp["dll"] is None
+ assert imp["function"] is None
+ assert imp["ordinal"] == 123
+
+
+def test_delayed_imports(monkeypatch):
+ class FakeImp:
+ def __init__(self, name, ordinal):
+ self.name = name
+ self.ordinal = ordinal
+
+ class FakeDelayEntry:
+ def __init__(self, dll, imports):
+ self.dll = dll
+ self.imports = imports
+
+ delayed = [
+ FakeDelayEntry(
+ dll=b"kernel32.dll",
+ imports=[
+ FakeImp(name=b"CreateFileA", ordinal=None),
+ FakeImp(name=None, ordinal=123),
+ ],
+ )
+ ]
+
+ pe = fake_pe_imports(delayed=delayed)
+ monkeypatch.setattr("iocx.parsers.pe_parser.pefile.PE", lambda *a, **k: pe)
+
+ _, metadata = parse_pe("dummy.exe")
+
+ assert len(metadata["delayed_imports"]) == 2
+ assert metadata["delayed_imports"][0]["dll"] == "kernel32.dll"
+ assert metadata["delayed_imports"][0]["function"] == "CreateFileA"
+ assert metadata["delayed_imports"][1]["function"] is None
+ assert metadata["delayed_imports"][1]["ordinal"] == 123
+
+
+# ------------------------------------------------------------
+# Tests for bound imports
+# ------------------------------------------------------------
+
+def test_bound_imports(monkeypatch):
+ class FakeStruct:
+ TimeDateStamp = 0x12345678
+
+ class FakeBoundEntry:
+ def __init__(self, dll):
+ self.name = dll
+ self.struct = FakeStruct()
+
+ bound = [
+ FakeBoundEntry(b"USER32.dll"),
+ FakeBoundEntry(b"KERNEL32.dll"),
+ ]
+
+ pe = fake_pe_imports(bound=bound)
+ monkeypatch.setattr("iocx.parsers.pe_parser.pefile.PE", lambda *a, **k: pe)
+
+ _, metadata = parse_pe("dummy.exe")
+
+ assert len(metadata["bound_imports"]) == 2
+ assert metadata["bound_imports"][0]["dll"] == "USER32.dll"
+ assert metadata["bound_imports"][0]["timestamp"] == 0x12345678
+
+
+# ------------------------------------------------------------
+# Tests for section name decoding
+# ------------------------------------------------------------
+
+
+def test_analyse_pe_sections_get_data_exception():
+ # Fake section that always raises when get_data() is called
+ class BadSection:
+ Name = b".oops\x00\x00\x00"
+ SizeOfRawData = 123
+ Misc_VirtualSize = 456
+ Characteristics = 0xDEADBEEF
+
+ def get_data(self):
+ raise RuntimeError("boom")
+
+ # Fake PE containing the bad section
+ class FakePE:
+ sections = [BadSection()]
+
+ from iocx.parsers.pe_parser import analyse_pe_sections
+ results = analyse_pe_sections(FakePE())
+
+ # One section should still be returned
+ assert len(results) == 1
+ sec = results[0]
+
+ # Name decoding still works
+ assert sec["name"] == ".oops"
+
+ # Sizes and characteristics are preserved
+ assert sec["raw_size"] == 123
+ assert sec["virtual_size"] == 456
+ assert sec["characteristics"] == 0xDEADBEEF
+
+ # Entropy should be computed on empty data (float)
+ assert isinstance(sec["entropy"], float)
+
+
+def test_parse_sections_get_data_exception():
+ # Fake section whose get_data() always raises
+ class BadSection:
+ Name = b".bad\x00\x00\x00"
+ SizeOfRawData = 0
+ Misc_VirtualSize = 0
+ Characteristics = 0
+
+ def get_data(self):
+ raise RuntimeError("boom")
+
+ # Fake PE with one bad section
+ pe = type("FakePE", (), {"sections": [BadSection()]})
+
+ from iocx.parsers.pe_parser import _parse_sections
+ sections = _parse_sections(pe)
+
+ # The section should still be returned, with entropy computed on empty data
+ assert len(sections) == 1
+ sec = sections[0]
+
+ assert sec["name"] == ".bad"
+ assert sec["raw_size"] == 0
+ assert sec["virtual_size"] == 0
+ assert sec["characteristics"] == 0
+ assert isinstance(sec["entropy"], float)
+
+
+def test_section_name_decoding(monkeypatch):
+ sections = [
+ b".text\x00\x00\x00",
+ b".rdata\x00\x00",
+ b".data\x00\x00\x00",
+ ]
+
+ pe = fake_pe_imports(sections=sections)
+ monkeypatch.setattr("iocx.parsers.pe_parser.pefile.PE", lambda *a, **k: pe)
+
+ _, metadata = parse_pe("dummy.exe")
+
+ # Extract names from the new section dicts
+ names = metadata["sections"]
+ assert names == [".text", ".rdata", ".data"]
+
+
+# ------------------------------------------------------------
+# Tests for exports, TLS directory, and digital signatures
+# ------------------------------------------------------------
+
+
+def test_exports(monkeypatch):
+ class FakeSymbol:
+ def __init__(self, name, ordinal, address, forwarder):
+ self.name = name
+ self.ordinal = ordinal
+ self.address = address
+ self.forwarder = forwarder
+
+ class FakeExportDir:
+ symbols = [
+ FakeSymbol(name=b"FuncA", ordinal=1, address=0x1000, forwarder=None),
+ FakeSymbol(name=None, ordinal=2, address=0x2000, forwarder=b"OtherDLL.FuncB"),
+ ]
+
+ pe = SimpleNamespace(
+ DIRECTORY_ENTRY_EXPORT=FakeExportDir,
+ parse_data_directories=lambda: None,
+ sections=[],
+ get_memory_mapped_image=lambda: b"",
+ )
+
+ monkeypatch.setattr("iocx.parsers.pe_parser.pefile.PE", lambda *a, **k: pe)
+
+ _, metadata = parse_pe("dummy.exe")
+
+ assert len(metadata["exports"]) == 2
+
+ e1 = metadata["exports"][0]
+ assert e1["name"] == "FuncA"
+ assert e1["ordinal"] == 1
+ assert e1["address"] == 0x1000
+ assert e1["forwarder"] is None
+
+ e2 = metadata["exports"][1]
+ assert e2["name"] is None
+ assert e2["ordinal"] == 2
+ assert e2["address"] == 0x2000
+ assert e2["forwarder"] == "OtherDLL.FuncB"
+
+
+def test_tls_directory(monkeypatch):
+ class FakeTLSStruct:
+ StartAddressOfRawData = 0x1111
+ EndAddressOfRawData = 0x2222
+ AddressOfCallBacks = 0x3333
+
+ class FakeTLSDir:
+ struct = FakeTLSStruct()
+
+ pe = SimpleNamespace(
+ DIRECTORY_ENTRY_TLS=FakeTLSDir,
+ parse_data_directories=lambda: None,
+ sections=[],
+ get_memory_mapped_image=lambda: b"",
+ )
+
+ monkeypatch.setattr("iocx.parsers.pe_parser.pefile.PE", lambda *a, **k: pe)
+
+ _, metadata = parse_pe("dummy.exe")
+
+ tls = metadata["tls"]
+ assert tls["start_address"] == 0x1111
+ assert tls["end_address"] == 0x2222
+ assert tls["callbacks"] == 0x3333
+
+
+def test_parse_tls_missing_struct():
+ # Fake TLS directory with no .struct attribute
+ class FakeTLS:
+ pass
+
+ pe = type("FakePE", (), {"DIRECTORY_ENTRY_TLS": FakeTLS()})
+
+ from iocx.parsers.pe_parser import _parse_tls
+ result = _parse_tls(pe)
+
+ assert result is None
+
+
+def test_digital_signatures(monkeypatch):
+ class FakeSecStruct:
+ def __init__(self, va, size):
+ self.VirtualAddress = va
+ self.Size = size
+
+ class FakeSecEntry:
+ def __init__(self, va, size):
+ self.struct = FakeSecStruct(va, size)
+
+ pe = SimpleNamespace(
+ DIRECTORY_ENTRY_SECURITY=[
+ FakeSecEntry(va=0x5000, size=128),
+ FakeSecEntry(va=0x6000, size=256),
+ ],
+ parse_data_directories=lambda: None,
+ sections=[],
+ get_memory_mapped_image=lambda: b"",
+ )
+
+ monkeypatch.setattr("iocx.parsers.pe_parser.pefile.PE", lambda *a, **k: pe)
+
+ _, metadata = parse_pe("dummy.exe")
+
+ sigs = metadata["signatures"]
+ assert len(sigs) == 2
+ assert sigs[0]["address"] == 0x5000
+ assert sigs[0]["size"] == 128
+ assert sigs[1]["address"] == 0x6000
+ assert sigs[1]["size"] == 256
+
+
+def test_parse_signatures_missing_struct():
+ # Fake security entry with no .struct attribute → triggers the `continue` branch
+ class FakeSec:
+ pass
+
+ # Fake PE with a DIRECTORY_ENTRY_SECURITY list containing one invalid entry
+ pe = type("FakePE", (), {"DIRECTORY_ENTRY_SECURITY": [FakeSec()]})
+
+ from iocx.parsers.pe_parser import _parse_signatures
+ sigs = _parse_signatures(pe)
+
+ # No valid signatures should be returned
+ assert sigs == []
+
+# ------------------------------------------------------------
+# Tests for bound imports (covering if / elif / else)
+# ------------------------------------------------------------
+
+def test_bound_imports_bytes(monkeypatch):
+ """Covers: if isinstance(dll_raw, bytes) -> decode()"""
+
+ class FakeStruct:
+ TimeDateStamp = 0x1111
+
+ class FakeEntry:
+ def __init__(self):
+ self.name = b"KERNEL32.dll" # bytes -> hits IF branch
+ self.struct = FakeStruct()
+
+ pe = fake_pe_bound(bound=[FakeEntry()])
+ monkeypatch.setattr("iocx.parsers.pe_parser.pefile.PE", lambda *a, **k: pe)
+
+ _, metadata = parse_pe("dummy.exe")
+
+ assert metadata["bound_imports"][0]["dll"] == "KERNEL32.dll"
+ assert metadata["bound_imports"][0]["timestamp"] == 0x1111
+
+
+def test_bound_imports_str(monkeypatch):
+ """Covers: elif isinstance(dll_raw, str)"""
+
+ class FakeStruct:
+ TimeDateStamp = 0x2222
+
+ class FakeEntry:
+ def __init__(self):
+ self.name = "USER32.dll" # str - hits ELIF branch
+ self.struct = FakeStruct()
+
+ pe = fake_pe_bound(bound=[FakeEntry()])
+ monkeypatch.setattr("iocx.parsers.pe_parser.pefile.PE", lambda *a, **k: pe)
+
+ _, metadata = parse_pe("dummy.exe")
+
+ assert metadata["bound_imports"][0]["dll"] == "USER32.dll"
+ assert metadata["bound_imports"][0]["timestamp"] == 0x2222
+
+
+def test_bound_imports_else(monkeypatch):
+ """Covers: else -> dll = None"""
+
+ class FakeStruct:
+ TimeDateStamp = 0x3333
+
+ class FakeEntry:
+ def __init__(self):
+ self.name = 12345 # non-bytes, non-str - hits ELSE branch
+ self.struct = FakeStruct()
+
+ pe = fake_pe_bound(bound=[FakeEntry()])
+ monkeypatch.setattr("iocx.parsers.pe_parser.pefile.PE", lambda *a, **k: pe)
+
+ _, metadata = parse_pe("dummy.exe")
+
+ assert metadata["bound_imports"][0]["dll"] is None
+ assert metadata["bound_imports"][0]["timestamp"] == 0x3333
+
+
+# ------------------------------------------------------------
+# Tests for import_details coverage
+# ------------------------------------------------------------
+
+def test_import_details_with_function_name(monkeypatch):
+ """Covers: imp.name is bytes - decode()"""
+
+ class FakeImp:
+ def __init__(self):
+ self.name = b"CreateFileA"
+ self.ordinal = None
+
+ class FakeEntry:
+ dll = b"kernel32.dll"
+ imports = [FakeImp()]
+
+ pe = fake_pe_import_details(imports=[FakeEntry()])
+ monkeypatch.setattr("iocx.parsers.pe_parser.pefile.PE", lambda *a, **k: pe)
+
+ _, metadata = parse_pe("dummy.exe")
+
+ assert len(metadata["import_details"]) == 1
+ imp = metadata["import_details"][0]
+ assert imp["dll"] == "kernel32.dll"
+ assert imp["function"] == "CreateFileA"
+ assert imp["ordinal"] is None
+
+
+def test_import_details_with_ordinal_only(monkeypatch):
+ """Covers: imp.name is None -> function=None, ordinal preserved"""
+
+ class FakeImp:
+ def __init__(self):
+ self.name = None
+ self.ordinal = 123
+
+ class FakeEntry:
+ dll = b"user32.dll"
+ imports = [FakeImp()]
+
+ pe = fake_pe_import_details(imports=[FakeEntry()])
+ monkeypatch.setattr("iocx.parsers.pe_parser.pefile.PE", lambda *a, **k: pe)
+
+ _, metadata = parse_pe("dummy.exe")
+
+ assert len(metadata["import_details"]) == 1
+ imp = metadata["import_details"][0]
+ assert imp["dll"] == "user32.dll"
+ assert imp["function"] is None
+ assert imp["ordinal"] == 123
+
+
+def test_import_details_missing_imports_attribute(monkeypatch):
+ """Covers: entry has no .imports - block skipped entirely"""
+
+ class FakeEntry:
+ dll = b"advapi32.dll"
+ # no imports attribute
+
+ pe = fake_pe_import_details(imports=[FakeEntry()])
+ monkeypatch.setattr("iocx.parsers.pe_parser.pefile.PE", lambda *a, **k: pe)
+
+ _, metadata = parse_pe("dummy.exe")
+
+ assert metadata["import_details"] == []
+
+
+def test_imports_str_and_else_branches(monkeypatch):
+ # Import with dll as str - hits ELIF branch
+ class FakeImpA:
+ name = b"CreateFileA"
+ ordinal = None
+
+ class FakeEntryStr:
+ dll = "kernel32.dll" # str -> triggers ELIF
+ imports = [FakeImpA()]
+
+ # Import with dll as non-bytes, non-str -> hits ELSE branch
+ class FakeImpB:
+ name = None
+ ordinal = 123
+
+ class FakeEntryElse:
+ dll = 99999 # neither bytes nor str -> triggers ELSE
+ imports = [FakeImpB()]
+
+ # Fake PE object
+ class FakePE:
+ DIRECTORY_ENTRY_IMPORT = [FakeEntryStr(), FakeEntryElse()]
+ sections = []
+
+ def parse_data_directories(self):
+ pass
+
+ def get_memory_mapped_image(self):
+ return b""
+
+ pe = FakePE()
+
+ # Monkeypatch pefile.PE so parse_pe("dummy.exe") returns FakePE
+ monkeypatch.setattr("iocx.parsers.pe_parser.pefile.PE", lambda *a, **k: pe)
+
+ from iocx.parsers.pe_parser import parse_pe
+ _, metadata = parse_pe("dummy.exe")
+
+ # First entry: dll is str
+ imp1 = metadata["import_details"][0]
+ assert imp1["dll"] == "kernel32.dll"
+ assert imp1["function"] == "CreateFileA"
+ assert imp1["ordinal"] is None
+
+ # Second entry: dll is neither bytes nor str -> dll=None
+ imp2 = metadata["import_details"][1]
+ assert imp2["dll"] is None
+ assert imp2["function"] is None
+ assert imp2["ordinal"] == 123
+
+
+# ------------------------------------------------------------
+# Tests for delayed imports (elif and else coverage)
+# ------------------------------------------------------------
+
+def test_delayed_imports_str_dll(monkeypatch):
+ """Covers: elif isinstance(dll_raw, str)"""
+
+ class FakeImp:
+ name = b"FuncA"
+ ordinal = None
+
+ class FakeDelayEntry:
+ def __init__(self):
+ self.dll = "kernel32.dll" # str -> hits ELIF branch
+ self.imports = [FakeImp()]
+
+ pe = fake_pe_delayed(delayed=[FakeDelayEntry()])
+ monkeypatch.setattr("iocx.parsers.pe_parser.pefile.PE", lambda *a, **k: pe)
+
+ _, metadata = parse_pe("dummy.exe")
+
+ assert len(metadata["delayed_imports"]) == 1
+ imp = metadata["delayed_imports"][0]
+ assert imp["dll"] == "kernel32.dll"
+ assert imp["function"] == "FuncA"
+ assert imp["ordinal"] is None
+
+
+# ------------------------------------------------------------
+# Test for optional header
+# ------------------------------------------------------------
+
+def test_optional_header_block(monkeypatch):
+ # Fake OPTIONAL_HEADER with all fields parse_pe() expects
+ class FakeOptionalHeader:
+ SectionAlignment = 0x1000
+ FileAlignment = 0x200
+ SizeOfImage = 0x300000
+ SizeOfHeaders = 0x400
+ MajorLinkerVersion = 14
+ MinorLinkerVersion = 25
+ MajorOperatingSystemVersion = 10
+ MinorOperatingSystemVersion = 0
+ MajorSubsystemVersion = 6
+ MinorSubsystemVersion = 1
+
+ # Fake PE object
+ class FakePE:
+ OPTIONAL_HEADER = FakeOptionalHeader()
+ sections = []
+
+ def parse_data_directories(self):
+ pass
+
+ def get_memory_mapped_image(self):
+ return b""
+
+ pe = FakePE()
+
+ # Monkeypatch pefile.PE so parse_pe("dummy.exe") returns FakePE
+ monkeypatch.setattr("iocx.parsers.pe_parser.pefile.PE", lambda *a, **k: pe)
+
+ from iocx.parsers.pe_parser import parse_pe
+ _, metadata = parse_pe("dummy.exe")
+
+ opt = metadata["optional_header"]
+
+ assert opt["section_alignment"] == 0x1000
+ assert opt["file_alignment"] == 0x200
+ assert opt["size_of_image"] == 0x300000
+ assert opt["size_of_headers"] == 0x400
+ assert opt["linker_version"] == "14.25"
+ assert opt["os_version"] == "10.0"
+ assert opt["subsystem_version"] == "6.1"
+
+
+# ------------------------------------------------------------
+# Test language id decoder
+# ------------------------------------------------------------
+
+
+from iocx.parsers.pe_parser import _decode_langid
+
+def test_decode_langid_non_int():
+ assert _decode_langid("409") == "unknown"
+ assert _decode_langid(None) == "unknown"
+
+
+def test_decode_langid_too_small():
+ # < 0x0400 should always be unknown
+ assert _decode_langid(0x0000) == "unknown"
+ assert _decode_langid(0x003F) == "unknown"
+
+
+def test_decode_langid_valid_with_default_region():
+ # 0x0409 = English (United States) → fallback region
+ assert _decode_langid(0x0409) == "en-US"
+
+
+def test_decode_langid_valid_without_region():
+ # 0x0411 = Japanese → no fallback region
+ assert _decode_langid(0x0411) == "ja"
+
+
+def test_decode_langid_unknown_primary():
+ # Primary language 0x999 is not in PRIMARY_LANG
+ assert _decode_langid(0x0999) == "unknown"
+
+
+def test_decode_langid_region_branch():
+ # 0x0809 = English (United Kingdom) → explicit SUBLANG region
+ assert _decode_langid(0x0809) == "en-GB"