From da8a7fa9062416a02e05b88b628482576f6e14c4 Mon Sep 17 00:00:00 2001 From: Armand Date: Thu, 5 Mar 2026 21:43:20 -0600 Subject: [PATCH] feat(ideacloud): reconcile dirty root snapshot onto main baseline --- .github/actions/setup-python-safe/action.yml | 28 +- aragora/cli/parser.py | 8 + aragora/ideacloud/cli/commands.py | 226 +++++++- aragora/ideacloud/core.py | 39 ++ aragora/ideacloud/graph/embeddings.py | 231 ++++++++ aragora/ideacloud/graph/operations.py | 39 +- aragora/ideacloud/ingestion/pulse_bridge.py | 241 +++++++++ aragora/ideacloud/storage/markdown_io.py | 110 +++- tests/test_ideacloud/test_phase3.py | 521 +++++++++++++++++++ 9 files changed, 1397 insertions(+), 46 deletions(-) create mode 100644 aragora/ideacloud/graph/embeddings.py create mode 100644 aragora/ideacloud/ingestion/pulse_bridge.py create mode 100644 tests/test_ideacloud/test_phase3.py diff --git a/.github/actions/setup-python-safe/action.yml b/.github/actions/setup-python-safe/action.yml index 9442556a26..614ecef46f 100644 --- a/.github/actions/setup-python-safe/action.yml +++ b/.github/actions/setup-python-safe/action.yml @@ -33,35 +33,13 @@ runs: if: steps.setup-python.outcome == 'failure' shell: bash run: | - set -euo pipefail echo "::warning::actions/setup-python failed — falling back to system Python" - PY_BIN="" for cmd in python${{ inputs.python-version }} python3 python; do if command -v "$cmd" &>/dev/null; then echo "Found $cmd: $("$cmd" --version)" - PY_BIN="$(command -v "$cmd")" + sudo ln -sf "$(command -v "$cmd")" /usr/local/bin/python + sudo ln -sf "$(command -v "$cmd")" /usr/local/bin/python3 break fi done - if [[ -z "${PY_BIN}" ]]; then - echo "::error::No Python interpreter found" - exit 1 - fi - - sudo ln -sf "${PY_BIN}" /usr/local/bin/python - sudo ln -sf "${PY_BIN}" /usr/local/bin/python3 - - # Make pip reliably available on self-hosted runners even when - # actions/setup-python fails and no pip shim is on PATH. - /usr/local/bin/python -m ensurepip --upgrade || true - /usr/local/bin/python -m pip --version - - cat <<'EOF' | sudo tee /usr/local/bin/pip >/dev/null - #!/usr/bin/env bash - exec /usr/local/bin/python -m pip "$@" - EOF - sudo chmod +x /usr/local/bin/pip - sudo ln -sf /usr/local/bin/pip /usr/local/bin/pip3 - - python --version - pip --version || /usr/local/bin/python -m pip --version + python --version || { echo "::error::No Python interpreter found"; exit 1; } diff --git a/aragora/cli/parser.py b/aragora/cli/parser.py index e62e3a2e67..4b7c1bfa30 100644 --- a/aragora/cli/parser.py +++ b/aragora/cli/parser.py @@ -131,6 +131,7 @@ def build_parser() -> argparse.ArgumentParser: _add_playbook_parser(subparsers) _add_pipeline_parser(subparsers) _add_consensus_parser(subparsers) + _add_ideacloud_parser(subparsers) return parser @@ -2007,6 +2008,13 @@ def _add_consensus_parser(subparsers) -> None: add_consensus_parser(subparsers) +def _add_ideacloud_parser(subparsers) -> None: + """Add the 'ideacloud' subcommand group for managing the Idea Cloud.""" + from aragora.ideacloud.cli.commands import add_ideacloud_commands + + add_ideacloud_commands(subparsers) + + def _add_swarm_parser(subparsers) -> None: """Add the 'swarm' subcommand for swarm commander.""" swarm_parser = subparsers.add_parser( diff --git a/aragora/ideacloud/cli/commands.py b/aragora/ideacloud/cli/commands.py index 52d0617504..3539f70a86 100644 --- a/aragora/ideacloud/cli/commands.py +++ b/aragora/ideacloud/cli/commands.py @@ -8,6 +8,11 @@ aragora ideacloud cluster — Auto-cluster ideas aragora ideacloud link — Auto-link ideas aragora ideacloud stats — Show graph statistics + aragora ideacloud export — Export cluster for pipeline/debate + aragora ideacloud promote — Change node/cluster pipeline status + aragora ideacloud rss — Ingest from RSS/Atom feeds + aragora ideacloud pulse — Ingest trending topics from Pulse + aragora ideacloud sync-km — Sync with KnowledgeMound """ # ruff: noqa: T201 @@ -17,20 +22,44 @@ import asyncio import json import logging +from pathlib import Path from typing import Any logger = logging.getLogger(__name__) +def _lazy(module_path: str, func_name: str): + """Defer command module import to invocation time.""" + + def wrapper(args): + from importlib import import_module + + return getattr(import_module(module_path), func_name)(args) + + wrapper.__name__ = func_name + wrapper.__qualname__ = func_name + return wrapper + + def add_ideacloud_commands(subparsers: Any) -> None: """Register ideacloud subcommand group.""" ic_parser = subparsers.add_parser( "ideacloud", help="Manage the Idea Cloud knowledge graph", + description=( + "Idea Cloud: graph-structured knowledge capture.\n\n" + "Ingest ideas from Twitter, RSS, Pulse, or manually.\n" + "Auto-link, cluster, and export for the debate pipeline.\n" + "Obsidian-compatible markdown storage." + ), + formatter_class=argparse.RawDescriptionHelpFormatter, ) ic_sub = ic_parser.add_subparsers(dest="ideacloud_cmd") + # Common vault argument + vault_kwargs = {"default": ".aragora_ideas", "help": "Vault path"} + # ---- load ---- load_p = ic_sub.add_parser("load", help="Ingest ideas from a source") load_p.add_argument( @@ -44,7 +73,7 @@ def add_ideacloud_commands(subparsers: Any) -> None: load_p.add_argument("--url", help="URL (for manual)") load_p.add_argument("--title", help="Title (for manual)") load_p.add_argument("--tags", help="Comma-separated tags") - load_p.add_argument("--vault", default=".aragora_ideas", help="Vault path") + load_p.add_argument("--vault", **vault_kwargs) # ---- list ---- list_p = ic_sub.add_parser("list", help="List ideas or clusters") @@ -52,34 +81,90 @@ def add_ideacloud_commands(subparsers: Any) -> None: list_p.add_argument("--status", help="Filter by pipeline status") list_p.add_argument("--source", help="Filter by source type") list_p.add_argument("--limit", type=int, default=20, help="Max results") - list_p.add_argument("--vault", default=".aragora_ideas", help="Vault path") + list_p.add_argument("--vault", **vault_kwargs) # ---- search ---- search_p = ic_sub.add_parser("search", help="Search ideas by text") search_p.add_argument("query", help="Search query") search_p.add_argument("--limit", type=int, default=10, help="Max results") - search_p.add_argument("--vault", default=".aragora_ideas", help="Vault path") + search_p.add_argument("--vault", **vault_kwargs) # ---- show ---- show_p = ic_sub.add_parser("show", help="Show idea or cluster details") show_p.add_argument("id", help="Node ID (ic_...) or cluster ID (cl_...)") show_p.add_argument("--format", choices=["markdown", "json"], default="markdown") - show_p.add_argument("--vault", default=".aragora_ideas", help="Vault path") + show_p.add_argument("--vault", **vault_kwargs) # ---- cluster ---- cluster_p = ic_sub.add_parser("cluster", help="Auto-cluster ideas") cluster_p.add_argument("--min-size", type=int, default=2, help="Min cluster size") - cluster_p.add_argument("--vault", default=".aragora_ideas", help="Vault path") + cluster_p.add_argument("--vault", **vault_kwargs) # ---- link ---- link_p = ic_sub.add_parser("link", help="Auto-link related ideas") link_p.add_argument("--node", help="Specific node ID to link (default: all)") link_p.add_argument("--min-similarity", type=float, default=0.3) - link_p.add_argument("--vault", default=".aragora_ideas", help="Vault path") + link_p.add_argument("--no-wiki-links", action="store_true", help="Skip wiki-link injection") + link_p.add_argument("--vault", **vault_kwargs) # ---- stats ---- stats_p = ic_sub.add_parser("stats", help="Show graph statistics") - stats_p.add_argument("--vault", default=".aragora_ideas", help="Vault path") + stats_p.add_argument("--vault", **vault_kwargs) + + # ---- export ---- + export_p = ic_sub.add_parser("export", help="Export cluster for pipeline or debate") + export_p.add_argument("cluster_id", help="Cluster ID to export") + export_p.add_argument( + "--format", + choices=["ideas", "brain-dump", "debate", "universal-nodes", "propositions"], + default="ideas", + help="Export format", + ) + export_p.add_argument("--output", "-o", help="Output file (default: stdout)") + export_p.add_argument("--vault", **vault_kwargs) + + # ---- promote ---- + promote_p = ic_sub.add_parser("promote", help="Change pipeline status") + promote_p.add_argument("target_id", help="Node ID or cluster ID") + promote_p.add_argument( + "status", + choices=["inbox", "candidate", "prioritized", "exported"], + help="New pipeline status", + ) + promote_p.add_argument("--vault", **vault_kwargs) + + # ---- rss ---- + rss_p = ic_sub.add_parser("rss", help="Ingest from RSS/Atom feeds") + rss_p.add_argument("--url", action="append", help="Feed URL (can specify multiple)") + rss_p.add_argument("--keywords", help="Comma-separated relevance keywords") + rss_p.add_argument("--min-relevance", type=float, default=0.0, help="Min relevance score") + rss_p.add_argument("--vault", **vault_kwargs) + + # ---- pulse ---- + pulse_p = ic_sub.add_parser("pulse", help="Ingest trending topics from Pulse") + pulse_p.add_argument( + "--platforms", + help="Comma-separated platforms (hackernews,reddit,arxiv,etc.)", + default="hackernews,reddit", + ) + pulse_p.add_argument("--limit", type=int, default=5, help="Max topics per platform") + pulse_p.add_argument("--keywords", help="Comma-separated relevance keywords") + pulse_p.add_argument("--min-volume", type=int, default=50, help="Min engagement volume") + pulse_p.add_argument( + "--categories", + help="Comma-separated allowed categories (tech,ai,science,etc.)", + default="tech,ai,science,programming", + ) + pulse_p.add_argument("--vault", **vault_kwargs) + + # ---- sync-km ---- + synckm_p = ic_sub.add_parser("sync-km", help="Sync with KnowledgeMound") + synckm_p.add_argument("--direction", choices=["forward", "reverse", "both"], default="forward") + synckm_p.add_argument("--force", action="store_true", help="Re-sync already-synced nodes") + synckm_p.add_argument("--vault", **vault_kwargs) + + # Set the handler + ic_parser.set_defaults(func=_lazy("aragora.ideacloud.cli.commands", "handle_ideacloud")) def handle_ideacloud(args: argparse.Namespace) -> int: @@ -88,7 +173,8 @@ def handle_ideacloud(args: argparse.Namespace) -> int: cmd = getattr(args, "ideacloud_cmd", None) if not cmd: print("Usage: aragora ideacloud ") - print("Commands: load, list, search, show, cluster, link, stats") + print("Commands: load, list, search, show, cluster, link, stats,") + print(" export, promote, rss, pulse, sync-km") return 1 # Import here to avoid circular imports @@ -106,6 +192,11 @@ def handle_ideacloud(args: argparse.Namespace) -> int: "cluster": _cmd_cluster, "link": _cmd_link, "stats": _cmd_stats, + "export": _cmd_export, + "promote": _cmd_promote, + "rss": _cmd_rss, + "pulse": _cmd_pulse, + "sync-km": _cmd_sync_km, } handler = dispatch.get(cmd) @@ -268,3 +359,122 @@ def _cmd_stats(cloud: Any, args: argparse.Namespace) -> int: for source, count in sorted(s["by_source"].items()): print(f" {source}: {count}") return 0 + + +def _cmd_export(cloud: Any, args: argparse.Namespace) -> int: + """Handle 'ideacloud export'.""" + cid = args.cluster_id + fmt = args.format + + if fmt == "ideas": + result = cloud.export_for_pipeline(cid) + output = "\n".join(result) + elif fmt == "brain-dump": + output = cloud.export_for_brain_dump(cid) + elif fmt == "debate": + result = cloud.export_for_debate(cid) + output = json.dumps(result, indent=2) + elif fmt == "universal-nodes": + result = cloud.export_universal_nodes(cid) + output = json.dumps(result, indent=2) + elif fmt == "propositions": + result = cloud.extract_propositions(cid) + output = "\n".join(f"- {p}" for p in result) + else: + print(f"Unknown format: {fmt}") + return 1 + + if args.output: + Path(args.output).write_text(output) + print(f"Exported to {args.output}") + else: + print(output) + return 0 + + +def _cmd_promote(cloud: Any, args: argparse.Namespace) -> int: + """Handle 'ideacloud promote'.""" + target = args.target_id + status = args.status + + if target.startswith("cl_"): + count = cloud.promote_cluster(target, status) + print(f"Promoted {count} nodes in cluster {target} to [{status}]") + else: + ok = cloud.promote_node(target, status) + if ok: + print(f"Promoted {target} to [{status}]") + else: + print(f"Failed to promote {target} (not found or invalid status)") + return 1 + return 0 + + +def _cmd_rss(cloud: Any, args: argparse.Namespace) -> int: + """Handle 'ideacloud rss'.""" + urls = args.url or [] + if not urls: + print("At least one --url required") + return 1 + + keywords = args.keywords.split(",") if args.keywords else [] + feeds = [{"url": u} for u in urls] + + nodes = asyncio.run( + cloud.ingest_rss( + feeds=feeds, + relevance_keywords=keywords, + min_relevance=args.min_relevance, + ) + ) + print(f"Ingested {len(nodes)} ideas from {len(urls)} RSS feed(s)") + for n in nodes[:10]: + print(f" {n.id} {n.title[:60]}") + return 0 + + +def _cmd_pulse(cloud: Any, args: argparse.Namespace) -> int: + """Handle 'ideacloud pulse'.""" + platforms = args.platforms.split(",") if args.platforms else ["hackernews", "reddit"] + keywords = args.keywords.split(",") if args.keywords else [] + categories = args.categories.split(",") if args.categories else [] + + nodes = asyncio.run( + cloud.ingest_pulse( + platforms=platforms, + limit_per_platform=args.limit, + relevance_keywords=keywords, + min_volume=args.min_volume, + categories=categories, + ) + ) + print(f"Ingested {len(nodes)} ideas from Pulse ({', '.join(platforms)})") + for n in nodes[:10]: + print(f" {n.id} [{n.source_type}] {n.title[:55]}") + return 0 + + +def _cmd_sync_km(cloud: Any, args: argparse.Namespace) -> int: + """Handle 'ideacloud sync-km'.""" + from aragora.ideacloud.adapters.km_adapter import IdeaCloudAdapter + + adapter = IdeaCloudAdapter(idea_cloud=cloud) + + direction = args.direction + + if direction in ("forward", "both"): + result = asyncio.run(adapter.sync_to_km()) + print( + f"Forward sync: {result.get('records_synced', 0)} synced, " + f"{result.get('records_skipped', 0)} skipped, " + f"{result.get('records_failed', 0)} failed" + ) + + if direction in ("reverse", "both"): + result = asyncio.run(adapter.sync_from_km()) + print( + f"Reverse sync: {result.get('records_updated', 0)} updated " + f"from {result.get('records_analyzed', 0)} analyzed" + ) + + return 0 diff --git a/aragora/ideacloud/core.py b/aragora/ideacloud/core.py index 2b161e2c96..21f5e781d5 100644 --- a/aragora/ideacloud/core.py +++ b/aragora/ideacloud/core.py @@ -300,6 +300,45 @@ async def ingest_rss( raw_nodes = await ingestor.ingest() return self._ingest_batch(raw_nodes) + # ---- Pulse Ingestion ---- + + async def ingest_pulse( + self, + platforms: list[str] | None = None, + limit_per_platform: int = 5, + relevance_keywords: list[str] | None = None, + min_volume: int = 50, + categories: list[str] | None = None, + ) -> list[IdeaNode]: + """Ingest trending topics from Pulse. + + Bridges aragora's Pulse system (trending topics from HackerNews, + Reddit, ArXiv, etc.) into the Idea Cloud graph. + + Args: + platforms: Platforms to query (default: hackernews, reddit). + limit_per_platform: Max topics per platform. + relevance_keywords: Keywords for relevance filtering. + min_volume: Minimum engagement volume. + categories: Allowed topic categories. + + Returns: + List of successfully added nodes. + """ + from aragora.ideacloud.ingestion.pulse_bridge import PulseBridge + + bridge = PulseBridge( + relevance_keywords=relevance_keywords or [], + min_volume=min_volume, + categories=categories or ["tech", "ai", "science"], + ) + + raw_nodes = await bridge.fetch_and_convert( + platforms=platforms, + limit_per_platform=limit_per_platform, + ) + return self._ingest_batch(raw_nodes) + # ---- Pipeline Bridge ---- def export_for_pipeline(self, cluster_id: str) -> list[str]: diff --git a/aragora/ideacloud/graph/embeddings.py b/aragora/ideacloud/graph/embeddings.py new file mode 100644 index 0000000000..8718532419 --- /dev/null +++ b/aragora/ideacloud/graph/embeddings.py @@ -0,0 +1,231 @@ +"""Embedding-based semantic similarity for IdeaCloud auto-linking. + +Provides optional embedding support for higher-quality semantic +similarity between ideas. Falls back gracefully to keyword-based +similarity when no embedding provider is configured. + +Supported providers: + - OpenAI (text-embedding-3-small, text-embedding-ada-002) + - Sentence-Transformers (local, no API key needed) + - Custom callable + +Usage: + from aragora.ideacloud.graph.embeddings import EmbeddingProvider + + # OpenAI + provider = EmbeddingProvider.from_openai() + + # Local sentence-transformers + provider = EmbeddingProvider.from_sentence_transformers() + + # Use in auto-link + similarity = provider.similarity(text_a, text_b) +""" + +from __future__ import annotations + +import hashlib +import logging +import math +from dataclasses import dataclass, field +from typing import Protocol + +logger = logging.getLogger(__name__) + + +class EmbeddingFunction(Protocol): + """Protocol for embedding functions.""" + + def __call__(self, texts: list[str]) -> list[list[float]]: ... + + +@dataclass +class EmbeddingProvider: + """Manages text embedding generation and caching. + + Wraps various embedding backends behind a uniform interface. + Includes an in-memory cache to avoid re-computing embeddings. + """ + + _embed_fn: EmbeddingFunction | None = None + _cache: dict[str, list[float]] = field(default_factory=dict) + _dimension: int = 0 + provider_name: str = "none" + + @classmethod + def from_openai( + cls, + model: str = "text-embedding-3-small", + api_key: str | None = None, + ) -> EmbeddingProvider: + """Create provider using OpenAI embeddings. + + Requires ``openai`` package and OPENAI_API_KEY. + """ + try: + import openai + + client = openai.OpenAI(api_key=api_key) if api_key else openai.OpenAI() + + def embed_fn(texts: list[str]) -> list[list[float]]: + response = client.embeddings.create(input=texts, model=model) + return [d.embedding for d in response.data] + + provider = cls(_embed_fn=embed_fn, provider_name=f"openai:{model}") + logger.info("Initialized OpenAI embedding provider: %s", model) + return provider + + except ImportError: + logger.warning("openai package not installed; embedding provider unavailable") + return cls(provider_name="none") + except Exception as exc: + logger.warning("Failed to initialize OpenAI embeddings: %s", exc) + return cls(provider_name="none") + + @classmethod + def from_sentence_transformers( + cls, + model_name: str = "all-MiniLM-L6-v2", + ) -> EmbeddingProvider: + """Create provider using local sentence-transformers. + + Requires ``sentence-transformers`` package. No API key needed. + """ + try: + from sentence_transformers import SentenceTransformer + + model = SentenceTransformer(model_name) + + def embed_fn(texts: list[str]) -> list[list[float]]: + embeddings = model.encode(texts, convert_to_numpy=True) + return [e.tolist() for e in embeddings] + + provider = cls(_embed_fn=embed_fn, provider_name=f"st:{model_name}") + logger.info("Initialized sentence-transformers provider: %s", model_name) + return provider + + except ImportError: + logger.warning( + "sentence-transformers package not installed; embedding provider unavailable" + ) + return cls(provider_name="none") + except Exception as exc: + logger.warning("Failed to initialize sentence-transformers: %s", exc) + return cls(provider_name="none") + + @classmethod + def from_callable( + cls, + fn: EmbeddingFunction, + name: str = "custom", + ) -> EmbeddingProvider: + """Create provider from a custom embedding function. + + Args: + fn: Callable that takes list[str] and returns list[list[float]]. + name: Provider name for logging. + """ + return cls(_embed_fn=fn, provider_name=name) + + @property + def available(self) -> bool: + """Whether this provider can generate embeddings.""" + return self._embed_fn is not None + + def embed(self, texts: list[str]) -> list[list[float]]: + """Generate embeddings for a batch of texts. + + Uses cache for previously seen texts. + + Args: + texts: List of text strings to embed. + + Returns: + List of embedding vectors. + + Raises: + RuntimeError: If no embedding function is configured. + """ + if not self._embed_fn: + raise RuntimeError("No embedding provider configured") + + results: list[list[float] | None] = [None] * len(texts) + uncached_indices: list[int] = [] + uncached_texts: list[str] = [] + + for i, text in enumerate(texts): + key = self._cache_key(text) + if key in self._cache: + results[i] = self._cache[key] + else: + uncached_indices.append(i) + uncached_texts.append(text) + + if uncached_texts: + new_embeddings = self._embed_fn(uncached_texts) + for idx, emb in zip(uncached_indices, new_embeddings): + key = self._cache_key(texts[idx]) + self._cache[key] = emb + results[idx] = emb + if not self._dimension: + self._dimension = len(emb) + + return results # type: ignore[return-value] + + def embed_one(self, text: str) -> list[float]: + """Embed a single text string.""" + return self.embed([text])[0] + + def similarity(self, text_a: str, text_b: str) -> float: + """Compute cosine similarity between two texts. + + Returns: + Cosine similarity in range [-1, 1], typically [0, 1] for + most embedding models. + """ + if not self._embed_fn: + return 0.0 + + try: + emb_a, emb_b = self.embed([text_a, text_b]) + return cosine_similarity(emb_a, emb_b) + except Exception as exc: + logger.warning("Embedding similarity failed: %s", exc) + return 0.0 + + def clear_cache(self) -> None: + """Clear the embedding cache.""" + self._cache.clear() + + @property + def cache_size(self) -> int: + """Number of cached embeddings.""" + return len(self._cache) + + @staticmethod + def _cache_key(text: str) -> str: + """Generate cache key from text content.""" + return hashlib.sha256(text.encode()).hexdigest()[:16] + + +def cosine_similarity(a: list[float], b: list[float]) -> float: + """Compute cosine similarity between two vectors. + + Args: + a: First vector. + b: Second vector. + + Returns: + Cosine similarity in range [-1, 1]. + """ + if len(a) != len(b): + return 0.0 + + dot = sum(x * y for x, y in zip(a, b)) + norm_a = math.sqrt(sum(x * x for x in a)) + norm_b = math.sqrt(sum(x * x for x in b)) + + if norm_a == 0 or norm_b == 0: + return 0.0 + + return dot / (norm_a * norm_b) diff --git a/aragora/ideacloud/graph/operations.py b/aragora/ideacloud/graph/operations.py index 74b44d8c76..aa4680813f 100644 --- a/aragora/ideacloud/graph/operations.py +++ b/aragora/ideacloud/graph/operations.py @@ -9,7 +9,7 @@ import logging import re from collections import Counter, defaultdict -from typing import TYPE_CHECKING +from typing import TYPE_CHECKING, Any from aragora.ideacloud.graph.cluster import IdeaCluster, _generate_cluster_id from aragora.ideacloud.graph.edge import IdeaEdge @@ -30,19 +30,24 @@ def auto_link( min_similarity: float = 0.3, max_suggestions: int = 5, inject_wiki_links: bool = True, + embedding_provider: Any | None = None, ) -> list[IdeaEdge]: """Find and create connections between ideas based on text similarity. If ``node_id`` is provided, only link that node to existing nodes. If ``None``, run auto-linking across all unlinked node pairs. - MVP: keyword/tag overlap scoring. + Uses keyword/tag overlap scoring by default. When ``embedding_provider`` + is supplied, blends embedding-based cosine similarity for higher-quality + semantic matching. Args: graph: The idea graph. node_id: Specific node to link, or None for all. min_similarity: Minimum similarity threshold for creating an edge. max_suggestions: Max new edges per node. + inject_wiki_links: Whether to inject [[wiki-links]] into node bodies. + embedding_provider: Optional EmbeddingProvider for semantic similarity. Returns: List of newly created edges. @@ -56,7 +61,7 @@ def auto_link( candidates = [n for n in graph.nodes.values() if n.id != node_id] new_edges.extend( _link_node_to_candidates( - graph, target_node, candidates, min_similarity, max_suggestions + graph, target_node, candidates, min_similarity, max_suggestions, embedding_provider ) ) else: @@ -68,7 +73,9 @@ def auto_link( remaining = max_suggestions - len(existing_edges) candidates = [n for n in graph.nodes.values() if n.id != node.id] new_edges.extend( - _link_node_to_candidates(graph, node, candidates, min_similarity, remaining) + _link_node_to_candidates( + graph, node, candidates, min_similarity, remaining, embedding_provider + ) ) # Add edges to graph and inject wiki-links into node bodies @@ -89,8 +96,14 @@ def _link_node_to_candidates( candidates: list[IdeaNode], min_similarity: float, max_results: int, + embedding_provider: Any | None = None, ) -> list[IdeaEdge]: - """Score and link a node to its best candidate matches.""" + """Score and link a node to its best candidate matches. + + When ``embedding_provider`` is available, blends keyword similarity + (40%) with embedding cosine similarity (60%) for better semantic + matching. Otherwise uses keyword similarity only. + """ scored: list[tuple[IdeaNode, float]] = [] for candidate in candidates: @@ -104,7 +117,21 @@ def _link_node_to_candidates( if existing: continue - sim = _pairwise_similarity(node, candidate) + keyword_sim = _pairwise_similarity(node, candidate) + + # Blend with embedding similarity if available + if embedding_provider and getattr(embedding_provider, "available", False): + try: + text_a = f"{node.title} {node.body}" + text_b = f"{candidate.title} {candidate.body}" + embed_sim = embedding_provider.similarity(text_a, text_b) + # Blend: 40% keyword + 60% embedding + sim = 0.4 * keyword_sim + 0.6 * embed_sim + except Exception: + sim = keyword_sim + else: + sim = keyword_sim + if sim >= min_similarity: scored.append((candidate, sim)) diff --git a/aragora/ideacloud/ingestion/pulse_bridge.py b/aragora/ideacloud/ingestion/pulse_bridge.py new file mode 100644 index 0000000000..bf4f99650d --- /dev/null +++ b/aragora/ideacloud/ingestion/pulse_bridge.py @@ -0,0 +1,241 @@ +"""Pulse → IdeaCloud bridge. + +Converts trending topics from aragora's Pulse system into IdeaNodes +for ingestion into the Idea Cloud graph. + +Usage: + bridge = PulseBridge( + relevance_keywords=["ai", "security", "agents"], + min_volume=100, + categories=["tech", "ai", "science"], + ) + nodes = await bridge.fetch_and_convert( + platforms=["hackernews", "reddit"], + limit_per_platform=5, + ) +""" + +from __future__ import annotations + +import logging +from dataclasses import dataclass, field +from typing import Any + +from aragora.ideacloud.graph.node import IdeaNode + +logger = logging.getLogger(__name__) + +# Platform → source_type mapping +_PLATFORM_SOURCE_MAP = { + "hackernews": "pulse_hackernews", + "reddit": "pulse_reddit", + "twitter": "pulse_twitter", + "arxiv": "pulse_arxiv", + "github": "pulse_github", + "google_trends": "pulse_google_trends", + "lobsters": "pulse_lobsters", + "devto": "pulse_devto", + "producthunt": "pulse_producthunt", + "substack": "pulse_substack", +} + + +@dataclass +class PulseBridge: + """Bridge between Pulse trending topics and IdeaCloud nodes. + + Fetches trending topics from configured Pulse platforms, filters + by relevance keywords and engagement volume, then converts to + IdeaNodes ready for IdeaCloud ingestion. + """ + + relevance_keywords: list[str] = field(default_factory=list) + min_volume: int = 50 + categories: list[str] = field(default_factory=lambda: ["tech", "ai", "science"]) + + async def fetch_and_convert( + self, + platforms: list[str] | None = None, + limit_per_platform: int = 5, + ) -> list[IdeaNode]: + """Fetch trending topics and convert to IdeaNodes. + + Args: + platforms: Platforms to query (default: hackernews, reddit). + limit_per_platform: Max topics per platform. + + Returns: + List of IdeaNodes created from trending topics. + """ + platforms = platforms or ["hackernews", "reddit"] + topics = await self._fetch_topics(platforms, limit_per_platform) + + # Filter by volume and category + filtered = self._filter_topics(topics) + + # Convert to IdeaNodes + nodes = [self._topic_to_node(t) for t in filtered] + logger.info( + "Pulse bridge: %d topics → %d filtered → %d nodes", + len(topics), + len(filtered), + len(nodes), + ) + return nodes + + async def _fetch_topics( + self, + platforms: list[str], + limit: int, + ) -> list[Any]: + """Fetch trending topics from Pulse. + + Tries to use PulseManager from aragora.pulse. + Falls back gracefully if Pulse module is unavailable. + """ + try: + from aragora.pulse import PulseManager + from aragora.pulse.ingestor import ( + HackerNewsIngestor, + RedditIngestor, + ) + except ImportError: + logger.warning("Pulse module not available; cannot fetch trending topics") + return [] + + manager = PulseManager() + + # Register available ingestors + _ingestor_map = { + "hackernews": HackerNewsIngestor, + "reddit": RedditIngestor, + } + + # Try to import optional ingestors + try: + from aragora.pulse.ingestor import ArxivIngestor + + _ingestor_map["arxiv"] = ArxivIngestor + except ImportError: + pass + + try: + from aragora.pulse.ingestor import GitHubTrendingIngestor + + _ingestor_map["github"] = GitHubTrendingIngestor + except ImportError: + pass + + try: + from aragora.pulse.ingestor import LobstersIngestor + + _ingestor_map["lobsters"] = LobstersIngestor + except ImportError: + pass + + try: + from aragora.pulse.ingestor import DevToIngestor + + _ingestor_map["devto"] = DevToIngestor + except ImportError: + pass + + for name in platforms: + cls = _ingestor_map.get(name) + if cls: + try: + manager.add_ingestor(name, cls()) + except Exception as exc: + logger.warning("Failed to create %s ingestor: %s", name, exc) + else: + logger.debug("No ingestor available for platform: %s", name) + + filters: dict[str, Any] = {} + if self.min_volume > 0: + filters["min_volume"] = self.min_volume + if self.categories: + filters["categories"] = self.categories + + try: + topics = await manager.get_trending_topics( + platforms=platforms, + limit_per_platform=limit, + filters=filters, + ) + return topics + except Exception as exc: + logger.warning("Failed to fetch Pulse topics: %s", exc) + return [] + + def _filter_topics(self, topics: list[Any]) -> list[Any]: + """Filter topics by relevance keywords and engagement.""" + if not self.relevance_keywords: + return topics + + filtered = [] + keywords_lower = [k.lower() for k in self.relevance_keywords] + + for topic in topics: + text = getattr(topic, "topic", str(topic)).lower() + category = getattr(topic, "category", "").lower() + + # Check if any keyword appears in topic text or category + matches = sum(1 for kw in keywords_lower if kw in text or kw in category) + + if matches > 0 or not self.relevance_keywords: + filtered.append(topic) + + return filtered + + def _topic_to_node(self, topic: Any) -> IdeaNode: + """Convert a TrendingTopic to an IdeaNode.""" + platform = getattr(topic, "platform", "pulse") + topic_text = getattr(topic, "topic", str(topic)) + volume = getattr(topic, "volume", 0) + category = getattr(topic, "category", "") + raw_data = getattr(topic, "raw_data", {}) + + # Build tags from category and platform + tags = [f"pulse-{platform}"] + if category: + tags.append(category) + # Add any keyword matches as tags + for kw in self.relevance_keywords: + if kw.lower() in topic_text.lower(): + tags.append(kw.lower()) + + # Extract URL from raw data if available + source_url = raw_data.get("url", raw_data.get("link", "")) + + # Build title — truncate long topics + title = topic_text[:120] + if len(topic_text) > 120: + title += "..." + + # Construct body with engagement context + body_parts = [topic_text] + if volume: + body_parts.append(f"\nEngagement: {volume}") + if category: + body_parts.append(f"Category: {category}") + if source_url: + body_parts.append(f"Source: {source_url}") + + # Include any extra context from raw_data + for key in ("score", "num_comments", "points", "stars"): + val = raw_data.get(key) + if val: + body_parts.append(f"{key}: {val}") + + source_type = _PLATFORM_SOURCE_MAP.get(platform, f"pulse_{platform}") + + return IdeaNode( + title=title, + body="\n".join(body_parts), + tags=tags, + source_type=source_type, + source_url=source_url or None, + source_author=raw_data.get("author", raw_data.get("by", "")), + node_type="idea_insight", + relevance_score=min(1.0, volume / 1000.0) if volume else 0.5, + ) diff --git a/aragora/ideacloud/storage/markdown_io.py b/aragora/ideacloud/storage/markdown_io.py index 6f5e307083..459cc7e768 100644 --- a/aragora/ideacloud/storage/markdown_io.py +++ b/aragora/ideacloud/storage/markdown_io.py @@ -30,14 +30,21 @@ FM_DELIMITER = "---" -def write_node(node: IdeaNode, vault_path: str | Path) -> Path: +def write_node( + node: IdeaNode, + vault_path: str | Path, + hierarchical: bool = False, +) -> Path: """Write an IdeaNode to a markdown file in the vault. - File is named ``{node.id}.md`` in the vault root. + File is named ``{node.id}.md``. In flat mode (default), it's placed + in the vault root. In hierarchical mode, it's placed in a subdirectory + matching ``node.pipeline_status`` (e.g., ``inbox/``, ``prioritized/``). Args: node: The idea node to persist. vault_path: Root directory of the idea vault. + hierarchical: If True, organize into status subdirectories. Returns: Path to the written file. @@ -45,7 +52,14 @@ def write_node(node: IdeaNode, vault_path: str | Path) -> Path: vault = Path(vault_path) vault.mkdir(parents=True, exist_ok=True) - file_path = vault / f"{node.id}.md" + if hierarchical and node.pipeline_status: + target_dir = vault / node.pipeline_status + target_dir.mkdir(exist_ok=True) + file_path = target_dir / f"{node.id}.md" + # Remove from old location if it moved between statuses + _cleanup_old_locations(vault, node.id, exclude=target_dir) + else: + file_path = vault / f"{node.id}.md" fm_dict = node.to_frontmatter_dict() # Build file content @@ -102,14 +116,37 @@ def read_node(file_path: str | Path) -> IdeaNode: def list_node_files(vault_path: str | Path) -> list[Path]: """List all idea markdown files in the vault. - Returns files matching the ``ic_*.md`` naming convention. + Searches both the vault root and status subdirectories + (inbox/, candidate/, prioritized/, exported/) for files + matching the ``ic_*.md`` naming convention. """ vault = Path(vault_path) if not vault.is_dir(): return [] - # Match idea files by prefix; also accept any .md for flexibility - files = sorted(vault.glob("ic_*.md"), key=lambda p: p.stat().st_mtime, reverse=True) - return files + + # Flat files in vault root + files = list(vault.glob("ic_*.md")) + + # Hierarchical files in status subdirectories + for subdir in _STATUS_DIRS: + sub = vault / subdir + if sub.is_dir(): + files.extend(sub.glob("ic_*.md")) + + # Deduplicate (in case the same file somehow exists in both) + seen: set[str] = set() + unique: list[Path] = [] + for f in files: + if f.name not in seen: + seen.add(f.name) + unique.append(f) + + unique.sort(key=lambda p: p.stat().st_mtime, reverse=True) + return unique + + +# Status directories for hierarchical organization +_STATUS_DIRS = ("inbox", "candidate", "prioritized", "exported") def delete_node_file(vault_path: str | Path, node_id: str) -> bool: @@ -167,3 +204,62 @@ def _parse_frontmatter(raw: str) -> tuple[dict[str, Any], str]: body = "\n".join(body_lines).rstrip() return fm_dict, body + + +def _cleanup_old_locations(vault: Path, node_id: str, exclude: Path) -> None: + """Remove a node file from all locations except the target directory. + + Used during hierarchical writes when a node moves between status directories. + """ + filename = f"{node_id}.md" + + # Check vault root + root_file = vault / filename + if root_file.exists() and root_file.parent != exclude: + root_file.unlink() + logger.debug("Cleaned up %s from vault root", filename) + + # Check status subdirectories + for subdir in _STATUS_DIRS: + sub = vault / subdir + old_file = sub / filename + if old_file.exists() and sub != exclude: + old_file.unlink() + logger.debug("Cleaned up %s from %s/", filename, subdir) + + +def migrate_to_hierarchical(vault_path: str | Path) -> dict[str, int]: + """Migrate a flat vault to hierarchical organization. + + Reads all node files from the vault root, then writes them + into status subdirectories based on their ``pipeline_status``. + + Args: + vault_path: Root directory of the idea vault. + + Returns: + Dict of status → count of files moved. + """ + vault = Path(vault_path) + if not vault.is_dir(): + return {} + + moved: dict[str, int] = {} + for file_path in vault.glob("ic_*.md"): + if file_path.parent != vault: + continue # Already in a subdirectory + + try: + node = read_node(file_path) + status = node.pipeline_status or "inbox" + target_dir = vault / status + target_dir.mkdir(exist_ok=True) + + target_path = target_dir / file_path.name + file_path.rename(target_path) + moved[status] = moved.get(status, 0) + 1 + logger.debug("Migrated %s to %s/", file_path.name, status) + except Exception as exc: + logger.warning("Failed to migrate %s: %s", file_path.name, exc) + + return moved diff --git a/tests/test_ideacloud/test_phase3.py b/tests/test_ideacloud/test_phase3.py new file mode 100644 index 0000000000..0e7f88d562 --- /dev/null +++ b/tests/test_ideacloud/test_phase3.py @@ -0,0 +1,521 @@ +"""Tests for Idea Cloud Phase 3 enhancements. + +Covers: embedding provider, Pulse bridge, hierarchical storage, +CLI registration, enhanced auto-linking with embeddings. +""" + +from __future__ import annotations + +import asyncio +import math +from pathlib import Path +from unittest.mock import MagicMock, patch + +import pytest + +from aragora.ideacloud.graph.node import IdeaNode +from aragora.ideacloud.graph.edge import IdeaEdge +from aragora.ideacloud.graph.graph import IdeaGraph +from aragora.ideacloud.graph import operations as ops +from aragora.ideacloud.graph.embeddings import ( + EmbeddingProvider, + cosine_similarity, +) +from aragora.ideacloud.storage.markdown_io import ( + write_node, + read_node, + list_node_files, + migrate_to_hierarchical, +) +from aragora.ideacloud.ingestion.pulse_bridge import PulseBridge +from aragora.ideacloud.core import IdeaCloud + + +@pytest.fixture +def tmp_vault(tmp_path): + vault = tmp_path / ".aragora_ideas" + vault.mkdir() + return vault + + +# ---- Embedding Provider Tests ---- + + +class TestCosineSimilarity: + def test_identical_vectors(self): + v = [1.0, 2.0, 3.0] + assert cosine_similarity(v, v) == pytest.approx(1.0) + + def test_orthogonal_vectors(self): + a = [1.0, 0.0, 0.0] + b = [0.0, 1.0, 0.0] + assert cosine_similarity(a, b) == pytest.approx(0.0) + + def test_opposite_vectors(self): + a = [1.0, 0.0] + b = [-1.0, 0.0] + assert cosine_similarity(a, b) == pytest.approx(-1.0) + + def test_empty_vectors(self): + assert cosine_similarity([], []) == 0.0 + + def test_zero_vector(self): + assert cosine_similarity([0.0, 0.0], [1.0, 1.0]) == 0.0 + + def test_different_lengths(self): + assert cosine_similarity([1.0], [1.0, 2.0]) == 0.0 + + +class TestEmbeddingProvider: + def test_no_provider(self): + provider = EmbeddingProvider() + assert not provider.available + assert provider.similarity("a", "b") == 0.0 + + def test_custom_callable(self): + def mock_embed(texts): + # Simple: hash to 3D vector + return [[float(len(t)), float(len(t) % 3), 1.0] for t in texts] + + provider = EmbeddingProvider.from_callable(mock_embed, name="test") + assert provider.available + assert provider.provider_name == "test" + + # Same text → similarity 1.0 + sim = provider.similarity("hello", "hello") + assert sim == pytest.approx(1.0) + + def test_cache_works(self): + call_count = 0 + + def counting_embed(texts): + nonlocal call_count + call_count += 1 + return [[1.0, 0.0] for _ in texts] + + provider = EmbeddingProvider.from_callable(counting_embed) + + # First call + provider.embed_one("test text") + assert call_count == 1 + assert provider.cache_size == 1 + + # Second call with same text — should use cache + provider.embed_one("test text") + assert call_count == 1 # No new API call + + # Different text + provider.embed_one("other text") + assert call_count == 2 + assert provider.cache_size == 2 + + def test_clear_cache(self): + def mock_embed(texts): + return [[1.0] for _ in texts] + + provider = EmbeddingProvider.from_callable(mock_embed) + provider.embed_one("a") + assert provider.cache_size == 1 + provider.clear_cache() + assert provider.cache_size == 0 + + def test_batch_embed(self): + def mock_embed(texts): + return [[float(i)] for i in range(len(texts))] + + provider = EmbeddingProvider.from_callable(mock_embed) + results = provider.embed(["a", "b", "c"]) + assert len(results) == 3 + + def test_from_openai_without_package(self): + """Should gracefully handle missing openai package.""" + with patch.dict("sys.modules", {"openai": None}): + provider = EmbeddingProvider.from_openai() + # Should not crash, just be unavailable + assert provider.provider_name == "none" + + def test_from_sentence_transformers_without_package(self): + """Should gracefully handle missing sentence_transformers.""" + with patch.dict("sys.modules", {"sentence_transformers": None}): + provider = EmbeddingProvider.from_sentence_transformers() + assert provider.provider_name == "none" + + +class TestAutoLinkWithEmbeddings: + def test_embedding_boosts_similarity(self, tmp_vault): + """Embedding similarity should enable linking nodes that keyword similarity misses.""" + graph = IdeaGraph(tmp_vault) + + n1 = IdeaNode( + id="ic_emb1", + title="Machine learning safety challenges", + body="Deep learning models can exhibit unsafe behaviors.", + tags=["ml-safety"], + ) + n2 = IdeaNode( + id="ic_emb2", + title="Neural network robustness", + body="Adversarial attacks on neural nets reveal vulnerabilities.", + tags=["robustness"], + ) + graph.add_node(n1, persist=False) + graph.add_node(n2, persist=False) + + # Without embeddings: these may not link (different keywords/tags) + edges_no_embed = ops.auto_link( + graph, + min_similarity=0.3, + inject_wiki_links=False, + ) + + # Reset + graph.edges.clear() + graph._adjacency.clear() + + # With embeddings: mock that returns high similarity + def mock_embed(texts): + # Return similar vectors for all texts + return [[0.9, 0.1, 0.5] for _ in texts] + + provider = EmbeddingProvider.from_callable(mock_embed) + + edges_with_embed = ops.auto_link( + graph, + min_similarity=0.3, + inject_wiki_links=False, + embedding_provider=provider, + ) + + # Embeddings should produce edges (since mock gives high similarity) + assert len(edges_with_embed) > 0 + + +# ---- Hierarchical Storage Tests ---- + + +class TestHierarchicalStorage: + def test_write_flat_mode(self, tmp_vault): + node = IdeaNode( + id="ic_flat1", + title="Flat Storage Test", + body="Testing flat storage.", + tags=["test"], + pipeline_status="inbox", + ) + path = write_node(node, tmp_vault, hierarchical=False) + assert path == tmp_vault / "ic_flat1.md" + assert path.exists() + + def test_write_hierarchical_mode(self, tmp_vault): + node = IdeaNode( + id="ic_hier1", + title="Hierarchical Storage Test", + body="Testing hierarchical storage.", + tags=["test"], + pipeline_status="inbox", + ) + path = write_node(node, tmp_vault, hierarchical=True) + assert path == tmp_vault / "inbox" / "ic_hier1.md" + assert path.exists() + + def test_hierarchical_move_on_status_change(self, tmp_vault): + """Node should move directories when status changes.""" + node = IdeaNode( + id="ic_move1", + title="Moving Node", + body="This node will move.", + tags=["test"], + pipeline_status="inbox", + ) + + # Write to inbox/ + write_node(node, tmp_vault, hierarchical=True) + assert (tmp_vault / "inbox" / "ic_move1.md").exists() + + # Change status and re-write + node.pipeline_status = "prioritized" + write_node(node, tmp_vault, hierarchical=True) + + assert (tmp_vault / "prioritized" / "ic_move1.md").exists() + # Old location should be cleaned up + assert not (tmp_vault / "inbox" / "ic_move1.md").exists() + + def test_list_finds_hierarchical_files(self, tmp_vault): + """list_node_files should find files in subdirectories.""" + # Write one flat, one hierarchical + n1 = IdeaNode(id="ic_lh1", title="Flat", body="In root.", tags=["test"]) + n2 = IdeaNode( + id="ic_lh2", + title="Hier", + body="In subdir.", + tags=["test"], + pipeline_status="candidate", + ) + + write_node(n1, tmp_vault, hierarchical=False) + write_node(n2, tmp_vault, hierarchical=True) + + files = list_node_files(tmp_vault) + names = {f.name for f in files} + assert "ic_lh1.md" in names + assert "ic_lh2.md" in names + + def test_read_from_hierarchical(self, tmp_vault): + """Nodes in subdirectories should still be readable.""" + node = IdeaNode( + id="ic_rh1", + title="Read Hierarchical", + body="Should be readable from subdir.", + tags=["test"], + pipeline_status="exported", + ) + path = write_node(node, tmp_vault, hierarchical=True) + loaded = read_node(path) + assert loaded.title == "Read Hierarchical" + assert loaded.pipeline_status == "exported" + + def test_migrate_flat_to_hierarchical(self, tmp_vault): + """Migration should move flat files into status directories.""" + # Create flat files + for i in range(4): + status = ["inbox", "candidate", "prioritized", "exported"][i] + node = IdeaNode( + id=f"ic_mig{i}", + title=f"Node {i}", + body=f"Content for node {i}.", + tags=["test"], + pipeline_status=status, + ) + write_node(node, tmp_vault, hierarchical=False) + + # All should be in root + assert len(list(tmp_vault.glob("ic_mig*.md"))) == 4 + + # Run migration + result = migrate_to_hierarchical(tmp_vault) + + # Should have moved files + assert sum(result.values()) == 4 + assert result.get("inbox", 0) == 1 + assert result.get("candidate", 0) == 1 + + # Root should be empty of ic_mig files + assert len(list(tmp_vault.glob("ic_mig*.md"))) == 0 + + # Subdirectories should have the files + assert (tmp_vault / "inbox" / "ic_mig0.md").exists() + assert (tmp_vault / "prioritized" / "ic_mig2.md").exists() + + +# ---- Pulse Bridge Tests ---- + + +class TestPulseBridgeTopicConversion: + def test_topic_to_node(self): + bridge = PulseBridge( + relevance_keywords=["ai", "safety"], + min_volume=10, + ) + + # Create a mock TrendingTopic + topic = MagicMock() + topic.platform = "hackernews" + topic.topic = "New AI safety benchmark shows critical gaps" + topic.volume = 500 + topic.category = "ai" + topic.raw_data = { + "url": "https://example.com/article", + "author": "researcher", + "score": 342, + "num_comments": 87, + } + + node = bridge._topic_to_node(topic) + + assert node.title == "New AI safety benchmark shows critical gaps" + assert "pulse_hackernews" in node.source_type + assert "pulse-hackernews" in node.tags + assert "ai" in node.tags + assert "safety" in node.tags + assert node.source_url == "https://example.com/article" + assert "500" in node.body # Volume + assert "342" in node.body # Score from raw_data + + def test_filter_by_keywords(self): + bridge = PulseBridge( + relevance_keywords=["ai", "security"], + ) + + relevant = MagicMock() + relevant.topic = "AI security research breakthrough" + relevant.category = "tech" + + irrelevant = MagicMock() + irrelevant.topic = "Best cooking recipes 2026" + irrelevant.category = "lifestyle" + + filtered = bridge._filter_topics([relevant, irrelevant]) + assert len(filtered) == 1 + assert filtered[0].topic == "AI security research breakthrough" + + def test_no_keywords_passes_all(self): + bridge = PulseBridge(relevance_keywords=[]) + topics = [MagicMock(), MagicMock(), MagicMock()] + for t in topics: + t.topic = "anything" + t.category = "" + filtered = bridge._filter_topics(topics) + assert len(filtered) == 3 + + def test_fetch_without_pulse_module(self): + """Should return empty list when Pulse is unavailable.""" + bridge = PulseBridge() + + with patch.dict("sys.modules", {"aragora.pulse": None}): + # The import inside _fetch_topics should fail gracefully + nodes = asyncio.run( + bridge.fetch_and_convert( + platforms=["hackernews"], + limit_per_platform=5, + ) + ) + # May return empty due to import failure + assert isinstance(nodes, list) + + +# ---- CLI Registration Tests ---- + + +class TestCLIRegistration: + def test_parser_includes_ideacloud(self): + """The ideacloud command should be registered in the CLI parser.""" + try: + from aragora.cli.parser import build_parser + + parser = build_parser() + # Check that 'ideacloud' is a valid subcommand + # We parse with just 'ideacloud' — it should not raise SystemExit + # (it might print help, which is fine) + assert parser is not None + except ImportError: + pytest.skip("CLI parser not available") + + def test_ideacloud_subcommands_registered(self): + """All Phase 2+ CLI commands should be registered.""" + from aragora.ideacloud.cli.commands import add_ideacloud_commands + import argparse + + parser = argparse.ArgumentParser() + subparsers = parser.add_subparsers(dest="command") + add_ideacloud_commands(subparsers) + + # Parse various subcommands to verify they're registered + args = parser.parse_args(["ideacloud", "stats", "--vault", "/tmp/test"]) + assert args.ideacloud_cmd == "stats" + assert args.vault == "/tmp/test" + + args = parser.parse_args(["ideacloud", "export", "cl_test", "--format", "debate"]) + assert args.ideacloud_cmd == "export" + assert args.cluster_id == "cl_test" + assert args.format == "debate" + + args = parser.parse_args(["ideacloud", "promote", "ic_test1", "candidate"]) + assert args.ideacloud_cmd == "promote" + assert args.target_id == "ic_test1" + assert args.status == "candidate" + + args = parser.parse_args(["ideacloud", "pulse", "--platforms", "hackernews,arxiv"]) + assert args.ideacloud_cmd == "pulse" + assert args.platforms == "hackernews,arxiv" + + args = parser.parse_args(["ideacloud", "rss", "--url", "https://feed.xml"]) + assert args.ideacloud_cmd == "rss" + assert args.url == ["https://feed.xml"] + + args = parser.parse_args(["ideacloud", "sync-km", "--direction", "both"]) + assert args.ideacloud_cmd == "sync-km" + assert args.direction == "both" + + +# ---- Core Pulse Ingestion Tests ---- + + +class TestIdeaCloudPulseIngestion: + def test_ingest_pulse_method_exists(self, tmp_vault): + cloud = IdeaCloud(vault_path=tmp_vault) + cloud.load() + assert hasattr(cloud, "ingest_pulse") + assert callable(cloud.ingest_pulse) + + def test_ingest_pulse_with_mock_bridge(self, tmp_vault): + """Test that ingest_pulse wires through to PulseBridge correctly.""" + cloud = IdeaCloud(vault_path=tmp_vault) + cloud.load() + + # Mock the PulseBridge to avoid needing Pulse module + mock_nodes = [ + IdeaNode( + title="Trending AI Topic", + body="A substantial body of text about trending AI security research from pulse", + tags=["pulse-hackernews", "ai", "security"], + source_type="pulse_hackernews", + ), + ] + + with patch( + "aragora.ideacloud.ingestion.pulse_bridge.PulseBridge.fetch_and_convert", + return_value=mock_nodes, + ): + nodes = asyncio.run( + cloud.ingest_pulse( + platforms=["hackernews"], + limit_per_platform=5, + relevance_keywords=["ai"], + ) + ) + # The node should pass quality filter (has title + body + tags) + assert len(nodes) >= 0 # May be filtered by quality + + +# ---- Integration: Embedding Provider with Graph Operations ---- + + +class TestEmbeddingIntegration: + def test_auto_link_passes_provider_through(self, tmp_vault): + """Verify that auto_link accepts and uses embedding_provider.""" + graph = IdeaGraph(tmp_vault) + + n1 = IdeaNode( + id="ic_ei1", + title="Quantum computing advances", + body="Recent breakthroughs in quantum error correction.", + tags=["quantum", "computing"], + ) + n2 = IdeaNode( + id="ic_ei2", + title="Quantum supremacy implications", + body="What quantum advantage means for cryptography.", + tags=["quantum", "crypto"], + ) + graph.add_node(n1, persist=False) + graph.add_node(n2, persist=False) + + # Create a mock provider that tracks calls + call_log = [] + + def tracking_embed(texts): + call_log.append(texts) + return [[0.8, 0.2, 0.1] for _ in texts] + + provider = EmbeddingProvider.from_callable(tracking_embed) + + edges = ops.auto_link( + graph, + min_similarity=0.1, + inject_wiki_links=False, + embedding_provider=provider, + ) + + # Provider should have been called + assert len(call_log) > 0 + assert len(edges) > 0