diff --git a/apps/base_rag_example.py b/apps/base_rag_example.py index 25e3b594..761717a3 100644 --- a/apps/base_rag_example.py +++ b/apps/base_rag_example.py @@ -6,7 +6,7 @@ import argparse from abc import ABC, abstractmethod from pathlib import Path -from typing import Any +from typing import Any, Callable import dotenv from leann.api import LeannBuilder, LeannChat @@ -55,10 +55,12 @@ def __init__( name: str, description: str, default_index_name: str, + example_queries: list[str] | None = None, ): self.name = name self.description = description self.default_index_name = default_index_name + self.example_queries = example_queries or [] self.parser = self._create_parser() def _create_parser(self) -> argparse.ArgumentParser: @@ -282,6 +284,50 @@ def get_llm_config(self, args) -> dict[str, Any]: return config + def _foreach_source( + self, + sources: list, + args, + load: Callable[[Any, int], list | None], + *, + source_label: str = "source", + start_total: int = 0, + ) -> tuple[list, int]: + """Process sources with max_items tracking and error handling. + + Args: + sources: List of source paths/identifiers to iterate. + args: Parsed argparse namespace (must have ``max_items``). + load: Callable ``(source, max_count) -> list | None`` that loads + documents for a single source. Return None/empty to skip. + source_label: Label used in progress messages. + start_total: Starting count of already-processed documents. + + Returns: + ``(all_documents, total_processed)``. + """ + all_docs = [] + total = start_total + for i, source in enumerate(sources): + print(f"\nProcessing {source_label} {i + 1}/{len(sources)}: {source}") + try: + max_count = -1 + if args.max_items > 0: + remaining = args.max_items - total + if remaining <= 0: + print(f"Reached max_items limit ({args.max_items})") + break + max_count = remaining + docs = load(source, max_count) + if docs: + all_docs.extend(docs) + total += len(docs) + print(f"Processed {len(docs)} items from this {source_label}") + except Exception as e: + print(f"Error processing {source}: {e}") + continue + return all_docs, total + async def build_index(self, args, texts: list[dict[str, Any]]) -> str: """Build LEANN index from text chunks (dicts with 'text' and 'metadata' keys).""" index_path = str(Path(args.index_dir) / f"{self.default_index_name}.leann") @@ -411,3 +457,25 @@ async def run(self): await self.run_single_query(args, index_path, args.query) else: await self.run_interactive_chat(args, index_path) + + def _print_header(self): + """Print a header with name, example queries, and usage hint. + + Override in subclasses to add platform warnings or extra help text. + """ + print(f"\n{self.name} RAG Example") + print("=" * 50) + if self.example_queries: + print("\nExample queries you can try:") + for q in self.example_queries: + print(f"- '{q}'") + print("\nOr run without --query for interactive mode\n") + + @classmethod + def main(cls): + """Standard __main__ entry point. Prints header, then runs the app.""" + import asyncio + + app = cls() + app._print_header() + asyncio.run(app.run()) diff --git a/apps/browser_rag.py b/apps/browser_rag.py index 00bb3f54..20c74ac5 100644 --- a/apps/browser_rag.py +++ b/apps/browser_rag.py @@ -30,8 +30,18 @@ def __init__(self): name="Browser History", description="Process and query Chrome browser history with LEANN", default_index_name="google_history_index", + example_queries=[ + "What websites did I visit about machine learning?", + "Find my search history about programming", + "What YouTube videos did I watch recently?", + "Show me websites about travel planning", + ], ) + def _print_header(self): + super()._print_header() + print("Note: Make sure Chrome is closed before running\n") + def _add_specific_arguments(self, parser): """Add browser-specific arguments.""" browser_group = parser.add_argument_group("Browser Parameters") @@ -111,35 +121,14 @@ async def load_data(self, args) -> list[dict[str, Any]]: reader = ChromeHistoryReader() # Process each profile - all_documents = [] - total_processed = 0 - - for i, profile_dir in enumerate(profile_dirs): - print(f"\nProcessing profile {i + 1}/{len(profile_dirs)}: {profile_dir.name}") - - try: - # Apply max_items limit per profile - max_per_profile = -1 - if args.max_items > 0: - remaining = args.max_items - total_processed - if remaining <= 0: - break - max_per_profile = remaining - - # Load history - documents = reader.load_data( - chrome_profile_path=str(profile_dir), - max_count=max_per_profile, - ) - - if documents: - all_documents.extend(documents) - total_processed += len(documents) - print(f"Processed {len(documents)} history entries from this profile") - - except Exception as e: - print(f"Error processing {profile_dir}: {e}") - continue + all_documents, _ = self._foreach_source( + profile_dirs, + args, + load=lambda src, mc: reader.load_data( + chrome_profile_path=str(src), max_count=mc + ), + source_label="profile", + ) if not all_documents: print("No browser history found to process!") @@ -156,17 +145,4 @@ async def load_data(self, args) -> list[dict[str, Any]]: if __name__ == "__main__": - import asyncio - - # Example queries for browser history RAG - print("\n🌐 Browser History RAG Example") - print("=" * 50) - print("\nExample queries you can try:") - print("- 'What websites did I visit about machine learning?'") - print("- 'Find my search history about programming'") - print("- 'What YouTube videos did I watch recently?'") - print("- 'Show me websites about travel planning'") - print("\nNote: Make sure Chrome is closed before running\n") - - rag = BrowserRAG() - asyncio.run(rag.run()) + BrowserRAG.main() diff --git a/apps/chat_export_rag.py b/apps/chat_export_rag.py new file mode 100644 index 00000000..07585f1a --- /dev/null +++ b/apps/chat_export_rag.py @@ -0,0 +1,134 @@ +""" +Shared base for chat export RAG apps (ChatGPT, Claude, etc.). +Unifies: find export files → load with reader → chunk → index. +""" +import sys +from pathlib import Path +from typing import Any, Callable + +sys.path.insert(0, str(Path(__file__).parent)) + +from base_rag_example import BaseRAGExample +from chunking import create_text_chunks + + +class ChatExportRAG(BaseRAGExample): + """Generic RAG app for chat export data (ChatGPT, Claude, etc.). + + No method overrides needed — just provide constructor args. + """ + + def __init__( + self, + name: str, + description: str, + default_index_name: str, + reader_factory: Callable[[bool], Any], + export_keyword: str, + file_extensions: list[str], + default_export_dir: str, + example_queries: list[str], + export_setup_instructions: list[str], + ): + self._reader_factory = reader_factory + self._export_keyword = export_keyword + self._file_extensions = file_extensions + self._default_export_dir = default_export_dir + self._export_setup_instructions = export_setup_instructions + + self.max_items_default = -1 + self.embedding_model_default = "sentence-transformers/all-MiniLM-L6-v2" + + super().__init__( + name=name, + description=description, + default_index_name=default_index_name, + example_queries=example_queries, + ) + + def _add_specific_arguments(self, parser): + group = parser.add_argument_group(f"{self.name} Parameters") + group.add_argument( + "--export-path", + type=str, + default=self._default_export_dir, + help=f"Path to {self.name} export file or directory (default: {self._default_export_dir})", + ) + group.add_argument( + "--concatenate-conversations", + action="store_true", + default=True, + help="Concatenate messages within conversations for better context (default: True)", + ) + group.add_argument( + "--separate-messages", + action="store_true", + help="Process each message as a separate document (overrides --concatenate-conversations)", + ) + group.add_argument( + "--chunk-size", type=int, default=512, help="Text chunk size (default: 512)" + ) + group.add_argument( + "--chunk-overlap", type=int, default=128, help="Text chunk overlap (default: 128)" + ) + + def _find_exports(self, export_path: Path) -> list[Path]: + export_files: list[Path] = [] + if export_path.is_file(): + if export_path.suffix.lower() in self._file_extensions: + export_files.append(export_path) + elif export_path.is_dir(): + for ext in self._file_extensions: + export_files.extend(export_path.glob(f"*{ext}")) + return export_files + + async def load_data(self, args) -> list[dict[str, Any]]: + export_path = Path(args.export_path) + + if not export_path.exists(): + print(f"{self.name} export path not found: {export_path}") + print("Please ensure you have exported your data and placed it in the correct location.") + for line in self._export_setup_instructions: + print(line) + return [] + + export_files = self._find_exports(export_path) + + if not export_files: + exts = ", ".join(self._file_extensions) + print(f"No {self.name} export files ({exts}) found in: {export_path}") + return [] + + print(f"Found {len(export_files)} {self.name} export files") + + concatenate = args.concatenate_conversations and not args.separate_messages + reader = self._reader_factory(concatenate) + + all_documents, _ = self._foreach_source( + export_files, + args, + load=lambda src, mc: reader.load_data( + **{ + f"{self._export_keyword}_export_path": str(src), + "max_count": mc, + "include_metadata": True, + } + ), + source_label="export file", + ) + + if not all_documents: + print("No conversations found to process!") + print("\nTroubleshooting:") + print("- Ensure the export file is a valid export") + return [] + + print(f"\nTotal conversations processed: {len(all_documents)}") + print("Now starting to split into text chunks... this may take some time") + + all_texts = create_text_chunks( + all_documents, chunk_size=args.chunk_size, chunk_overlap=args.chunk_overlap + ) + + print(f"Created {len(all_texts)} text chunks from {len(all_documents)} conversations") + return all_texts diff --git a/apps/chatgpt_rag.py b/apps/chatgpt_rag.py index c97d2cdc..72a0ea6d 100644 --- a/apps/chatgpt_rag.py +++ b/apps/chatgpt_rag.py @@ -1,187 +1,36 @@ """ -ChatGPT RAG example using the unified interface. -Supports ChatGPT export data from chat.html files. +ChatGPT RAG example. Indexes ChatGPT export data (chat.html / .zip files). """ +from apps.chat_export_rag import ChatExportRAG +from apps.chatgpt_data.chatgpt_reader import ChatGPTReader -import sys -from pathlib import Path -from typing import Any - -# Add parent directory to path for imports -sys.path.insert(0, str(Path(__file__).parent)) - -from base_rag_example import BaseRAGExample -from chunking import create_text_chunks - -from .chatgpt_data.chatgpt_reader import ChatGPTReader - - -class ChatGPTRAG(BaseRAGExample): - """RAG example for ChatGPT conversation data.""" +class ChatGPTRAG(ChatExportRAG): def __init__(self): - # Set default values BEFORE calling super().__init__ - self.max_items_default = -1 # Process all conversations by default - self.embedding_model_default = ( - "sentence-transformers/all-MiniLM-L6-v2" # Fast 384-dim model - ) - super().__init__( name="ChatGPT", description="Process and query ChatGPT conversation exports with LEANN", default_index_name="chatgpt_conversations_index", + reader_factory=lambda concat: ChatGPTReader(concatenate_conversations=concat), + export_keyword="chatgpt", + file_extensions=[".zip", ".html"], + default_export_dir="./chatgpt_export", + example_queries=[ + "What did I ask about Python programming?", + "Show me conversations about machine learning", + "Find discussions about travel planning", + "What advice did ChatGPT give me about career development?", + "Search for conversations about cooking recipes", + ], + export_setup_instructions=[ + "1. Sign in to ChatGPT", + "2. Click on your profile icon → Settings → Data Controls", + "3. Click 'Export' under Export Data", + "4. Download the zip file from the email link", + "5. Extract or place the file/directory at the specified path", + ], ) - def _add_specific_arguments(self, parser): - """Add ChatGPT-specific arguments.""" - chatgpt_group = parser.add_argument_group("ChatGPT Parameters") - chatgpt_group.add_argument( - "--export-path", - type=str, - default="./chatgpt_export", - help="Path to ChatGPT export file (.zip or .html) or directory containing exports (default: ./chatgpt_export)", - ) - chatgpt_group.add_argument( - "--concatenate-conversations", - action="store_true", - default=True, - help="Concatenate messages within conversations for better context (default: True)", - ) - chatgpt_group.add_argument( - "--separate-messages", - action="store_true", - help="Process each message as a separate document (overrides --concatenate-conversations)", - ) - chatgpt_group.add_argument( - "--chunk-size", type=int, default=512, help="Text chunk size (default: 512)" - ) - chatgpt_group.add_argument( - "--chunk-overlap", type=int, default=128, help="Text chunk overlap (default: 128)" - ) - - def _find_chatgpt_exports(self, export_path: Path) -> list[Path]: - """ - Find ChatGPT export files in the given path. - - Args: - export_path: Path to search for exports - - Returns: - List of paths to ChatGPT export files - """ - export_files = [] - - if export_path.is_file(): - if export_path.suffix.lower() in [".zip", ".html"]: - export_files.append(export_path) - elif export_path.is_dir(): - # Look for zip and html files - export_files.extend(export_path.glob("*.zip")) - export_files.extend(export_path.glob("*.html")) - - return export_files - - async def load_data(self, args) -> list[dict[str, Any]]: - """Load ChatGPT export data and convert to text chunks.""" - export_path = Path(args.export_path) - - if not export_path.exists(): - print(f"ChatGPT export path not found: {export_path}") - print( - "Please ensure you have exported your ChatGPT data and placed it in the correct location." - ) - print("\nTo export your ChatGPT data:") - print("1. Sign in to ChatGPT") - print("2. Click on your profile icon → Settings → Data Controls") - print("3. Click 'Export' under Export Data") - print("4. Download the zip file from the email link") - print("5. Extract or place the file/directory at the specified path") - return [] - - # Find export files - export_files = self._find_chatgpt_exports(export_path) - - if not export_files: - print(f"No ChatGPT export files (.zip or .html) found in: {export_path}") - return [] - - print(f"Found {len(export_files)} ChatGPT export files") - - # Create reader with appropriate settings - concatenate = args.concatenate_conversations and not args.separate_messages - reader = ChatGPTReader(concatenate_conversations=concatenate) - - # Process each export file - all_documents = [] - total_processed = 0 - - for i, export_file in enumerate(export_files): - print(f"\nProcessing export file {i + 1}/{len(export_files)}: {export_file.name}") - - try: - # Apply max_items limit per file - max_per_file = -1 - if args.max_items > 0: - remaining = args.max_items - total_processed - if remaining <= 0: - break - max_per_file = remaining - - # Load conversations - documents = reader.load_data( - chatgpt_export_path=str(export_file), - max_count=max_per_file, - include_metadata=True, - ) - - if documents: - all_documents.extend(documents) - total_processed += len(documents) - print(f"Processed {len(documents)} conversations from this file") - else: - print(f"No conversations loaded from {export_file}") - - except Exception as e: - print(f"Error processing {export_file}: {e}") - continue - - if not all_documents: - print("No conversations found to process!") - print("\nTroubleshooting:") - print("- Ensure the export file is a valid ChatGPT export") - print("- Check that the HTML file contains conversation data") - print("- Try extracting the zip file and pointing to the HTML file directly") - return [] - - print(f"\nTotal conversations processed: {len(all_documents)}") - print("Now starting to split into text chunks... this may take some time") - - # Convert to text chunks - all_texts = create_text_chunks( - all_documents, chunk_size=args.chunk_size, chunk_overlap=args.chunk_overlap - ) - - print(f"Created {len(all_texts)} text chunks from {len(all_documents)} conversations") - return all_texts - if __name__ == "__main__": - import asyncio - - # Example queries for ChatGPT RAG - print("\n🤖 ChatGPT RAG Example") - print("=" * 50) - print("\nExample queries you can try:") - print("- 'What did I ask about Python programming?'") - print("- 'Show me conversations about machine learning'") - print("- 'Find discussions about travel planning'") - print("- 'What advice did ChatGPT give me about career development?'") - print("- 'Search for conversations about cooking recipes'") - print("\nTo get started:") - print("1. Export your ChatGPT data from Settings → Data Controls → Export") - print("2. Place the downloaded zip file or extracted HTML in ./chatgpt_export/") - print("3. Run this script to build your personal ChatGPT knowledge base!") - print("\nOr run without --query for interactive mode\n") - - rag = ChatGPTRAG() - asyncio.run(rag.run()) + ChatGPTRAG.main() diff --git a/apps/claude_rag.py b/apps/claude_rag.py index 2cc80dd3..f7d105f9 100644 --- a/apps/claude_rag.py +++ b/apps/claude_rag.py @@ -1,190 +1,37 @@ """ -Claude RAG example using the unified interface. -Supports Claude export data from JSON files. +Claude RAG example. Indexes Claude conversation export data (.json / .zip files). """ +from apps.chat_export_rag import ChatExportRAG +from apps.claude_data.claude_reader import ClaudeReader -import sys -from pathlib import Path -from typing import Any - -# Add parent directory to path for imports -sys.path.insert(0, str(Path(__file__).parent)) - -from base_rag_example import BaseRAGExample -from chunking import create_text_chunks - -from .claude_data.claude_reader import ClaudeReader - - -class ClaudeRAG(BaseRAGExample): - """RAG example for Claude conversation data.""" +class ClaudeRAG(ChatExportRAG): def __init__(self): - # Set default values BEFORE calling super().__init__ - self.max_items_default = -1 # Process all conversations by default - self.embedding_model_default = ( - "sentence-transformers/all-MiniLM-L6-v2" # Fast 384-dim model - ) - super().__init__( name="Claude", description="Process and query Claude conversation exports with LEANN", default_index_name="claude_conversations_index", + reader_factory=lambda concat: ClaudeReader(concatenate_conversations=concat), + export_keyword="claude", + file_extensions=[".zip", ".json"], + default_export_dir="./claude_export", + example_queries=[ + "What did I ask Claude about Python programming?", + "Show me conversations about machine learning", + "Find discussions about code optimization", + "What advice did Claude give me about software design?", + "Search for conversations about debugging techniques", + ], + export_setup_instructions=[ + "1. Open Claude in your browser", + "2. Look for export/download options in settings or conversation menu", + "3. Download the conversation data (usually in JSON format)", + "4. Place the file/directory at the specified path", + "", + "Note: Claude export methods may vary. Check Claude's help documentation for current instructions.", + ], ) - def _add_specific_arguments(self, parser): - """Add Claude-specific arguments.""" - claude_group = parser.add_argument_group("Claude Parameters") - claude_group.add_argument( - "--export-path", - type=str, - default="./claude_export", - help="Path to Claude export file (.json or .zip) or directory containing exports (default: ./claude_export)", - ) - claude_group.add_argument( - "--concatenate-conversations", - action="store_true", - default=True, - help="Concatenate messages within conversations for better context (default: True)", - ) - claude_group.add_argument( - "--separate-messages", - action="store_true", - help="Process each message as a separate document (overrides --concatenate-conversations)", - ) - claude_group.add_argument( - "--chunk-size", type=int, default=512, help="Text chunk size (default: 512)" - ) - claude_group.add_argument( - "--chunk-overlap", type=int, default=128, help="Text chunk overlap (default: 128)" - ) - - def _find_claude_exports(self, export_path: Path) -> list[Path]: - """ - Find Claude export files in the given path. - - Args: - export_path: Path to search for exports - - Returns: - List of paths to Claude export files - """ - export_files = [] - - if export_path.is_file(): - if export_path.suffix.lower() in [".zip", ".json"]: - export_files.append(export_path) - elif export_path.is_dir(): - # Look for zip and json files - export_files.extend(export_path.glob("*.zip")) - export_files.extend(export_path.glob("*.json")) - - return export_files - - async def load_data(self, args) -> list[dict[str, Any]]: - """Load Claude export data and convert to text chunks.""" - export_path = Path(args.export_path) - - if not export_path.exists(): - print(f"Claude export path not found: {export_path}") - print( - "Please ensure you have exported your Claude data and placed it in the correct location." - ) - print("\nTo export your Claude data:") - print("1. Open Claude in your browser") - print("2. Look for export/download options in settings or conversation menu") - print("3. Download the conversation data (usually in JSON format)") - print("4. Place the file/directory at the specified path") - print( - "\nNote: Claude export methods may vary. Check Claude's help documentation for current instructions." - ) - return [] - - # Find export files - export_files = self._find_claude_exports(export_path) - - if not export_files: - print(f"No Claude export files (.json or .zip) found in: {export_path}") - return [] - - print(f"Found {len(export_files)} Claude export files") - - # Create reader with appropriate settings - concatenate = args.concatenate_conversations and not args.separate_messages - reader = ClaudeReader(concatenate_conversations=concatenate) - - # Process each export file - all_documents = [] - total_processed = 0 - - for i, export_file in enumerate(export_files): - print(f"\nProcessing export file {i + 1}/{len(export_files)}: {export_file.name}") - - try: - # Apply max_items limit per file - max_per_file = -1 - if args.max_items > 0: - remaining = args.max_items - total_processed - if remaining <= 0: - break - max_per_file = remaining - - # Load conversations - documents = reader.load_data( - claude_export_path=str(export_file), - max_count=max_per_file, - include_metadata=True, - ) - - if documents: - all_documents.extend(documents) - total_processed += len(documents) - print(f"Processed {len(documents)} conversations from this file") - else: - print(f"No conversations loaded from {export_file}") - - except Exception as e: - print(f"Error processing {export_file}: {e}") - continue - - if not all_documents: - print("No conversations found to process!") - print("\nTroubleshooting:") - print("- Ensure the export file is a valid Claude export") - print("- Check that the JSON file contains conversation data") - print("- Try using a different export format or method") - print("- Check Claude's documentation for current export procedures") - return [] - - print(f"\nTotal conversations processed: {len(all_documents)}") - print("Now starting to split into text chunks... this may take some time") - - # Convert to text chunks - all_texts = create_text_chunks( - all_documents, chunk_size=args.chunk_size, chunk_overlap=args.chunk_overlap - ) - - print(f"Created {len(all_texts)} text chunks from {len(all_documents)} conversations") - return all_texts - if __name__ == "__main__": - import asyncio - - # Example queries for Claude RAG - print("\n🤖 Claude RAG Example") - print("=" * 50) - print("\nExample queries you can try:") - print("- 'What did I ask Claude about Python programming?'") - print("- 'Show me conversations about machine learning'") - print("- 'Find discussions about code optimization'") - print("- 'What advice did Claude give me about software design?'") - print("- 'Search for conversations about debugging techniques'") - print("\nTo get started:") - print("1. Export your Claude conversation data") - print("2. Place the JSON/ZIP file in ./claude_export/") - print("3. Run this script to build your personal Claude knowledge base!") - print("\nOr run without --query for interactive mode\n") - - rag = ClaudeRAG() - asyncio.run(rag.run()) + ClaudeRAG.main() diff --git a/apps/code_rag.py b/apps/code_rag.py index 452e0a63..a0845af9 100644 --- a/apps/code_rag.py +++ b/apps/code_rag.py @@ -24,11 +24,30 @@ def __init__(self): name="Code", description="Process and query code repositories with AST-aware chunking", default_index_name="code_index", + example_queries=[ + "How does the embedding computation work?", + "What are the main classes in this codebase?", + "Show me the search implementation", + "How is error handling implemented?", + "What design patterns are used?", + "Explain the chunking logic", + ], ) # Override defaults for code-specific usage self.embedding_model_default = "facebook/contriever" # Good for code self.max_items_default = -1 # Process all code files by default + def _print_header(self): + super()._print_header() + print("Features:") + print(" - AST-aware chunking preserves code structure") + print(" - Automatic language detection") + print(" - Smart filtering of large files and common excludes") + print(" - Optimized for code understanding") + print("\nUsage examples:") + print(" python -m apps.code_rag --repo-dir ./my_project") + print(" python -m apps.code_rag --include-extensions .py .js --query 'How does authentication work?'") + def _add_specific_arguments(self, parser): """Add code-specific arguments.""" code_group = parser.add_argument_group("Code Repository Parameters") @@ -179,29 +198,4 @@ def file_filter(file_path: str) -> bool: if __name__ == "__main__": - import asyncio - - # Example queries for code RAG - print("\n💻 Code RAG Example") - print("=" * 50) - print("\nExample queries you can try:") - print("- 'How does the embedding computation work?'") - print("- 'What are the main classes in this codebase?'") - print("- 'Show me the search implementation'") - print("- 'How is error handling implemented?'") - print("- 'What design patterns are used?'") - print("- 'Explain the chunking logic'") - print("\n🚀 Features:") - print("- ✅ AST-aware chunking preserves code structure") - print("- ✅ Automatic language detection") - print("- ✅ Smart filtering of large files and common excludes") - print("- ✅ Optimized for code understanding") - print("\nUsage examples:") - print(" python -m apps.code_rag --repo-dir ./my_project") - print( - " python -m apps.code_rag --include-extensions .py .js --query 'How does authentication work?'" - ) - print("\nOr run without --query for interactive mode\n") - - rag = CodeRAG() - asyncio.run(rag.run()) + CodeRAG.main() diff --git a/apps/document_rag.py b/apps/document_rag.py index fae1860d..5be6e8f9 100644 --- a/apps/document_rag.py +++ b/apps/document_rag.py @@ -23,8 +23,21 @@ def __init__(self): name="Document", description="Process and query documents (PDF, TXT, MD, etc.) with LEANN", default_index_name="test_doc_files", + example_queries=[ + "What are the main techniques LEANN uses?", + "What is the technique DLPM?", + "Who does Elizabeth Bennet marry?", + "What challenges did Huawei face while developing the Pangu model?", + ], ) + def _print_header(self): + super()._print_header() + print("NEW: Code-aware chunking available!") + print("- Use --enable-code-chunking to enable AST-aware chunking for code files") + print("- Supports Python, Java, C#, TypeScript files") + print("- Better semantic understanding of code structure") + def _add_specific_arguments(self, parser): """Add document-specific arguments.""" doc_group = parser.add_argument_group("Document Parameters") @@ -106,21 +119,4 @@ async def load_data(self, args) -> list[dict[str, Any]]: if __name__ == "__main__": - import asyncio - - # Example queries for document RAG - print("\nDocument RAG Example") - print("=" * 50) - print("\nExample queries you can try:") - print("- 'What are the main techniques LEANN uses?'") - print("- 'What is the technique DLPM?'") - print("- 'Who does Elizabeth Bennet marry?'") - print("- 'What challenges did Huawei face while developing the Pangu model?'") - print("\nNEW: Code-aware chunking available!") - print("- Use --enable-code-chunking to enable AST-aware chunking for code files") - print("- Supports Python, Java, C#, TypeScript files") - print("- Better semantic understanding of code structure") - print("\nOr run without --query for interactive mode\n") - - rag = DocumentRAG() - asyncio.run(rag.run()) + DocumentRAG.main() diff --git a/apps/email_rag.py b/apps/email_rag.py index 05586783..bcf19772 100644 --- a/apps/email_rag.py +++ b/apps/email_rag.py @@ -30,8 +30,21 @@ def __init__(self): name="Email", description="Process and query Apple Mail emails with LEANN", default_index_name="mail_index", + example_queries=[ + "What did my boss say about deadlines?", + "Find emails about travel expenses", + "Show me emails from last month about the project", + "What food did I order from DoorDash?", + ], ) + def _print_header(self): + if sys.platform != "darwin": + print("\n⚠️ Warning: This example is designed for macOS (Apple Mail)") + print(" Windows/Linux support coming soon!\n") + super()._print_header() + print("Note: You may need to grant Full Disk Access to your terminal\n") + def _add_specific_arguments(self, parser): """Add email-specific arguments.""" email_group = parser.add_argument_group("Email Parameters") @@ -85,40 +98,14 @@ async def load_data(self, args) -> list[dict[str, Any]]: reader = EmlxReader(include_html=args.include_html) # Process each directory - all_documents = [] - total_processed = 0 - - for i, messages_dir in enumerate(messages_dirs): - print(f"\nProcessing directory {i + 1}/{len(messages_dirs)}: {messages_dir}") - - try: - # Count emlx files - emlx_files = list(messages_dir.glob("*.emlx")) - print(f"Found {len(emlx_files)} email files") - - # Apply max_items limit per directory - max_per_dir = -1 # Default to process all - if args.max_items > 0: - remaining = args.max_items - total_processed - if remaining <= 0: - break - max_per_dir = remaining - # If args.max_items == -1, max_per_dir stays -1 (process all) - - # Load emails - fix the parameter passing - documents = reader.load_data( - input_dir=str(messages_dir), - max_count=max_per_dir, - ) - - if documents: - all_documents.extend(documents) - total_processed += len(documents) - print(f"Processed {len(documents)} emails from this directory") - - except Exception as e: - print(f"Error processing {messages_dir}: {e}") - continue + all_documents, _ = self._foreach_source( + messages_dirs, + args, + load=lambda src, mc: reader.load_data( + input_dir=str(src), max_count=mc + ), + source_label="directory", + ) if not all_documents: print("No emails found to process!") @@ -137,22 +124,4 @@ async def load_data(self, args) -> list[dict[str, Any]]: if __name__ == "__main__": - import asyncio - - # Check platform - if sys.platform != "darwin": - print("\n⚠️ Warning: This example is designed for macOS (Apple Mail)") - print(" Windows/Linux support coming soon!\n") - - # Example queries for email RAG - print("\n📧 Email RAG Example") - print("=" * 50) - print("\nExample queries you can try:") - print("- 'What did my boss say about deadlines?'") - print("- 'Find emails about travel expenses'") - print("- 'Show me emails from last month about the project'") - print("- 'What food did I order from DoorDash?'") - print("\nNote: You may need to grant Full Disk Access to your terminal\n") - - rag = EmailRAG() - asyncio.run(rag.run()) + EmailRAG.main() diff --git a/apps/gemini_rag.py b/apps/gemini_rag.py index 95788509..23097ecc 100644 --- a/apps/gemini_rag.py +++ b/apps/gemini_rag.py @@ -60,10 +60,4 @@ async def load_data(self, args) -> list[dict[str, Any]]: if __name__ == "__main__": - import asyncio - - print("\n✨ Gemini CLI RAG") - print("=" * 50) - - rag = GeminiRAG() - asyncio.run(rag.run()) + GeminiRAG.main() diff --git a/apps/image_rag.py b/apps/image_rag.py index 8dcd62b7..571da2f6 100644 --- a/apps/image_rag.py +++ b/apps/image_rag.py @@ -207,13 +207,5 @@ async def build_index(self, args, texts: list[dict[str, Any]]) -> str: Path(pkl_path).unlink() -def main(): - """Main entry point for the image RAG application.""" - import asyncio - - app = ImageRAG() - asyncio.run(app.run()) - - if __name__ == "__main__": - main() + ImageRAG.main() diff --git a/apps/imessage_rag.py b/apps/imessage_rag.py index bd4ab686..986be036 100644 --- a/apps/imessage_rag.py +++ b/apps/imessage_rag.py @@ -116,11 +116,5 @@ async def load_data(self, args) -> list[dict[str, Any]]: return all_texts -async def main(): - """Main entry point.""" - app = IMessageRAG() - await app.run() - - if __name__ == "__main__": - asyncio.run(main()) + IMessageRAG.main() diff --git a/apps/qwen_rag.py b/apps/qwen_rag.py index ae665be9..1547bc8b 100644 --- a/apps/qwen_rag.py +++ b/apps/qwen_rag.py @@ -60,10 +60,4 @@ async def load_data(self, args) -> list[dict[str, Any]]: if __name__ == "__main__": - import asyncio - - print("\n✨ Qwen Code RAG") - print("=" * 50) - - rag = QwenRAG() - asyncio.run(rag.run()) + QwenRAG.main() diff --git a/apps/slack_rag.py b/apps/slack_rag.py index 89804574..f4371409 100644 --- a/apps/slack_rag.py +++ b/apps/slack_rag.py @@ -219,11 +219,5 @@ async def run(self): await super().run() -async def main(): - """Main entry point for the Slack MCP RAG application.""" - app = SlackMCPRAG() - await app.run() - - if __name__ == "__main__": - asyncio.run(main()) + SlackMCPRAG.main() diff --git a/apps/twitter_rag.py b/apps/twitter_rag.py index 5446a5aa..a3c37f0b 100644 --- a/apps/twitter_rag.py +++ b/apps/twitter_rag.py @@ -187,11 +187,5 @@ async def run(self): await super().run() -async def main(): - """Main entry point for the Twitter MCP RAG application.""" - app = TwitterMCPRAG() - await app.run() - - if __name__ == "__main__": - asyncio.run(main()) + TwitterMCPRAG.main() diff --git a/apps/wechat_rag.py b/apps/wechat_rag.py index 1e5dd319..0951be71 100644 --- a/apps/wechat_rag.py +++ b/apps/wechat_rag.py @@ -14,6 +14,7 @@ from base_rag_example import BaseRAGExample from .history_data.wechat_history import WeChatHistoryReader +from llama_index.core.node_parser import SentenceSplitter class WeChatRAG(BaseRAGExample): @@ -30,8 +31,21 @@ def __init__(self): name="WeChat History", description="Process and query WeChat chat history with LEANN", default_index_name="wechat_history_magic_test_11Debug_new", + example_queries=[ + "Show me conversations about travel plans", + "Find group chats about weekend activities", + "我想买魔术师约翰逊的球衣,给我一些对应聊天记录?", + "What did we discuss about the project last month?", + ], ) + def _print_header(self): + if sys.platform != "darwin": + print("\n⚠️ Warning: WeChat export is only supported on macOS") + print(" You can still query existing exports on other platforms\n") + super()._print_header() + print("Note: WeChat must be running for export to work\n") + def _add_specific_arguments(self, parser): """Add WeChat-specific arguments.""" wechat_group = parser.add_argument_group("WeChat Parameters") @@ -108,37 +122,16 @@ async def load_data(self, args) -> list[dict[str, Any]]: return [] # Load documents from all found export directories - all_documents = [] - total_processed = 0 - - for i, export_dir in enumerate(export_dirs): - print(f"\nProcessing WeChat export {i + 1}/{len(export_dirs)}: {export_dir}") - - try: - # Apply max_items limit per export - max_per_export = -1 - if args.max_items > 0: - remaining = args.max_items - total_processed - if remaining <= 0: - break - max_per_export = remaining - - documents = reader.load_data( - wechat_export_dir=str(export_dir), - max_count=max_per_export, - concatenate_messages=True, # Enable message concatenation for better context - ) - - if documents: - print(f"Loaded {len(documents)} chat documents from {export_dir}") - all_documents.extend(documents) - total_processed += len(documents) - else: - print(f"No documents loaded from {export_dir}") - - except Exception as e: - print(f"Error processing {export_dir}: {e}") - continue + all_documents, _ = self._foreach_source( + export_dirs, + args, + load=lambda src, mc: reader.load_data( + wechat_export_dir=str(src), + max_count=mc, + concatenate_messages=True, + ), + source_label="export", + ) if not all_documents: print("No documents loaded from any source. Exiting.") @@ -151,8 +144,6 @@ async def load_data(self, args) -> list[dict[str, Any]]: all_texts = [] for doc in all_documents: # Split the document into chunks - from llama_index.core.node_parser import SentenceSplitter - text_splitter = SentenceSplitter( chunk_size=args.chunk_size, chunk_overlap=args.chunk_overlap ) @@ -169,22 +160,4 @@ async def load_data(self, args) -> list[dict[str, Any]]: if __name__ == "__main__": - import asyncio - - # Check platform - if sys.platform != "darwin": - print("\n⚠️ Warning: WeChat export is only supported on macOS") - print(" You can still query existing exports on other platforms\n") - - # Example queries for WeChat RAG - print("\n💬 WeChat History RAG Example") - print("=" * 50) - print("\nExample queries you can try:") - print("- 'Show me conversations about travel plans'") - print("- 'Find group chats about weekend activities'") - print("- '我想买魔术师约翰逊的球衣,给我一些对应聊天记录?'") - print("- 'What did we discuss about the project last month?'") - print("\nNote: WeChat must be running for export to work\n") - - rag = WeChatRAG() - asyncio.run(rag.run()) + WeChatRAG.main()