diff --git a/CLAUDE.md b/CLAUDE.md index 4e9c2830..2ad07a8a 100644 --- a/CLAUDE.md +++ b/CLAUDE.md @@ -58,6 +58,7 @@ return render_template('page.html', settings=public_settings) ## Version Management +- Its important to update the version at the end of every plan - Version is stored in `config.py`: `VERSION = "X.XXX.XXX"` - When incrementing, only change the third segment (e.g., `0.238.024` -> `0.238.025`) - Include the current version in functional test file headers and documentation files @@ -83,7 +84,7 @@ return render_template('page.html', settings=public_settings) ## Release Notes -After completing code changes, offer to update `docs/explanation/release_notes.md`. +After completing plans and code changes, offer to update `docs/explanation/release_notes.md`. - Add entries under the current version from `config.py` - If the version was bumped, create a new section at the top: `### **(vX.XXX.XXX)**` diff --git a/README.md b/README.md index 31ea020b..cffbeedc 100644 --- a/README.md +++ b/README.md @@ -144,6 +144,7 @@ azd up - **Metadata Extraction (Optional)**: Apply an AI model (configurable GPT model via Admin Settings) to automatically generate keywords, two-sentence summaries, and infer author/date for uploaded documents. Allows manual override for richer search context. - **File Processing Logs (Optional)**: Enable verbose logging for all ingestion pipelines (workspaces and ephemeral chat uploads) to aid in debugging, monitoring, and auditing file processing steps. - **Redis Cache (Optional)**: Integrate Azure Cache for Redis to provide a distributed, high-performance session store. This enables true horizontal scaling and high availability by decoupling user sessions from individual app instances. +- **SQL Database Agents (Optional)**: Connect agents to Azure SQL or other SQL databases through configurable SQL Query and SQL Schema plugins. Database schema is automatically discovered and injected into agent instructions at load time, enabling agents to answer natural language questions by generating and executing SQL queries without requiring users to know table or column names. - **Authentication & RBAC**: Secure access via Azure Active Directory (Entra ID) using MSAL. Supports Managed Identities for Azure service authentication, group-based controls, and custom application roles (`Admin`, `User`, `CreateGroup`, `SafetyAdmin`, `FeedbackAdmin`). - **Supported File Types**: diff --git a/application/single_app/app.py b/application/single_app/app.py index 805137d4..594d245d 100644 --- a/application/single_app/app.py +++ b/application/single_app/app.py @@ -75,6 +75,7 @@ from route_backend_public_prompts import * from route_backend_user_agreement import register_route_backend_user_agreement from route_backend_conversation_export import register_route_backend_conversation_export +from route_backend_thoughts import register_route_backend_thoughts from route_backend_speech import register_route_backend_speech from route_backend_tts import register_route_backend_tts from route_enhanced_citations import register_enhanced_citations_routes @@ -657,6 +658,9 @@ def list_semantic_kernel_plugins(): # ------------------- API User Agreement Routes ---------- register_route_backend_user_agreement(app) +# ------------------- API Thoughts Routes ---------------- +register_route_backend_thoughts(app) + # ------------------- Extenral Health Routes ---------- register_route_external_health(app) diff --git a/application/single_app/config.py b/application/single_app/config.py index 08c0adf1..741d8b55 100644 --- a/application/single_app/config.py +++ b/application/single_app/config.py @@ -94,7 +94,7 @@ EXECUTOR_TYPE = 'thread' EXECUTOR_MAX_WORKERS = 30 SESSION_TYPE = 'filesystem' -VERSION = "0.239.005" +VERSION = "0.239.031" SECRET_KEY = os.getenv('SECRET_KEY', 'dev-secret-key-change-in-production') @@ -257,6 +257,8 @@ def get_redis_cache_infrastructure_endpoint(redis_hostname: str) -> str: storage_account_user_documents_container_name = "user-documents" storage_account_group_documents_container_name = "group-documents" storage_account_public_documents_container_name = "public-documents" +storage_account_personal_chat_container_name = "personal-chat" +storage_account_group_chat_container_name = "group-chat" # Initialize Azure Cosmos DB client cosmos_endpoint = os.getenv("AZURE_COSMOS_ENDPOINT") @@ -459,6 +461,18 @@ def get_redis_cache_infrastructure_endpoint(redis_hostname: str) -> str: default_ttl=-1 # TTL disabled by default, enabled per-document for auto-cleanup ) +cosmos_thoughts_container_name = "thoughts" +cosmos_thoughts_container = cosmos_database.create_container_if_not_exists( + id=cosmos_thoughts_container_name, + partition_key=PartitionKey(path="/user_id") +) + +cosmos_archived_thoughts_container_name = "archive_thoughts" +cosmos_archived_thoughts_container = cosmos_database.create_container_if_not_exists( + id=cosmos_archived_thoughts_container_name, + partition_key=PartitionKey(path="/user_id") +) + def ensure_custom_logo_file_exists(app, settings): """ If custom_logo_base64 or custom_logo_dark_base64 is present in settings, ensure the appropriate @@ -745,9 +759,11 @@ def initialize_clients(settings): # This addresses the issue where the application assumes containers exist if blob_service_client: for container_name in [ - storage_account_user_documents_container_name, - storage_account_group_documents_container_name, - storage_account_public_documents_container_name + storage_account_user_documents_container_name, + storage_account_group_documents_container_name, + storage_account_public_documents_container_name, + storage_account_personal_chat_container_name, + storage_account_group_chat_container_name ]: try: container_client = blob_service_client.get_container_client(container_name) diff --git a/application/single_app/functions_activity_logging.py b/application/single_app/functions_activity_logging.py index 2a653a47..efb6e780 100644 --- a/application/single_app/functions_activity_logging.py +++ b/application/single_app/functions_activity_logging.py @@ -1393,3 +1393,332 @@ def log_retention_policy_force_push( level=logging.ERROR ) debug_print(f"⚠️ Warning: Failed to log retention policy force push: {str(e)}") + + +# === AGENT & ACTION ACTIVITY LOGGING === + +def log_agent_creation( + user_id: str, + agent_id: str, + agent_name: str, + agent_display_name: Optional[str] = None, + scope: str = 'personal', + group_id: Optional[str] = None +) -> None: + """ + Log an agent creation activity. + + Args: + user_id: The ID of the user who created the agent + agent_id: The unique ID of the new agent + agent_name: The name of the agent + agent_display_name: The display name of the agent + scope: 'personal', 'group', or 'global' + group_id: The group ID (only for group scope) + """ + try: + activity_record = { + 'id': str(uuid.uuid4()), + 'user_id': user_id, + 'activity_type': 'agent_creation', + 'timestamp': datetime.utcnow().isoformat(), + 'created_at': datetime.utcnow().isoformat(), + 'entity_type': 'agent', + 'operation': 'create', + 'entity': { + 'id': agent_id, + 'name': agent_name, + 'display_name': agent_display_name or agent_name + }, + 'workspace_type': scope, + 'workspace_context': {} + } + if scope == 'group' and group_id: + activity_record['workspace_context']['group_id'] = group_id + + cosmos_activity_logs_container.create_item(body=activity_record) + log_event( + message=f"Agent created: {agent_name} ({scope}) by user {user_id}", + extra=activity_record, + level=logging.INFO + ) + debug_print(f"✅ Agent creation logged: {agent_name} ({scope})") + except Exception as e: + log_event( + message=f"Error logging agent creation: {str(e)}", + extra={'user_id': user_id, 'agent_id': agent_id, 'scope': scope, 'error': str(e)}, + level=logging.ERROR + ) + debug_print(f"⚠️ Warning: Failed to log agent creation: {str(e)}") + + +def log_agent_update( + user_id: str, + agent_id: str, + agent_name: str, + agent_display_name: Optional[str] = None, + scope: str = 'personal', + group_id: Optional[str] = None +) -> None: + """ + Log an agent update activity. + + Args: + user_id: The ID of the user who updated the agent + agent_id: The unique ID of the agent + agent_name: The name of the agent + agent_display_name: The display name of the agent + scope: 'personal', 'group', or 'global' + group_id: The group ID (only for group scope) + """ + try: + activity_record = { + 'id': str(uuid.uuid4()), + 'user_id': user_id, + 'activity_type': 'agent_update', + 'timestamp': datetime.utcnow().isoformat(), + 'created_at': datetime.utcnow().isoformat(), + 'entity_type': 'agent', + 'operation': 'update', + 'entity': { + 'id': agent_id, + 'name': agent_name, + 'display_name': agent_display_name or agent_name + }, + 'workspace_type': scope, + 'workspace_context': {} + } + if scope == 'group' and group_id: + activity_record['workspace_context']['group_id'] = group_id + + cosmos_activity_logs_container.create_item(body=activity_record) + log_event( + message=f"Agent updated: {agent_name} ({scope}) by user {user_id}", + extra=activity_record, + level=logging.INFO + ) + debug_print(f"✅ Agent update logged: {agent_name} ({scope})") + except Exception as e: + log_event( + message=f"Error logging agent update: {str(e)}", + extra={'user_id': user_id, 'agent_id': agent_id, 'scope': scope, 'error': str(e)}, + level=logging.ERROR + ) + debug_print(f"⚠️ Warning: Failed to log agent update: {str(e)}") + + +def log_agent_deletion( + user_id: str, + agent_id: str, + agent_name: str, + scope: str = 'personal', + group_id: Optional[str] = None +) -> None: + """ + Log an agent deletion activity. + + Args: + user_id: The ID of the user who deleted the agent + agent_id: The unique ID of the agent + agent_name: The name of the agent + scope: 'personal', 'group', or 'global' + group_id: The group ID (only for group scope) + """ + try: + activity_record = { + 'id': str(uuid.uuid4()), + 'user_id': user_id, + 'activity_type': 'agent_deletion', + 'timestamp': datetime.utcnow().isoformat(), + 'created_at': datetime.utcnow().isoformat(), + 'entity_type': 'agent', + 'operation': 'delete', + 'entity': { + 'id': agent_id, + 'name': agent_name + }, + 'workspace_type': scope, + 'workspace_context': {} + } + if scope == 'group' and group_id: + activity_record['workspace_context']['group_id'] = group_id + + cosmos_activity_logs_container.create_item(body=activity_record) + log_event( + message=f"Agent deleted: {agent_name} ({scope}) by user {user_id}", + extra=activity_record, + level=logging.INFO + ) + debug_print(f"✅ Agent deletion logged: {agent_name} ({scope})") + except Exception as e: + log_event( + message=f"Error logging agent deletion: {str(e)}", + extra={'user_id': user_id, 'agent_id': agent_id, 'scope': scope, 'error': str(e)}, + level=logging.ERROR + ) + debug_print(f"⚠️ Warning: Failed to log agent deletion: {str(e)}") + + +def log_action_creation( + user_id: str, + action_id: str, + action_name: str, + action_type: Optional[str] = None, + scope: str = 'personal', + group_id: Optional[str] = None +) -> None: + """ + Log an action/plugin creation activity. + + Args: + user_id: The ID of the user who created the action + action_id: The unique ID of the new action + action_name: The name of the action + action_type: The type of the action (e.g., 'openapi', 'sql_query') + scope: 'personal', 'group', or 'global' + group_id: The group ID (only for group scope) + """ + try: + activity_record = { + 'id': str(uuid.uuid4()), + 'user_id': user_id, + 'activity_type': 'action_creation', + 'timestamp': datetime.utcnow().isoformat(), + 'created_at': datetime.utcnow().isoformat(), + 'entity_type': 'action', + 'operation': 'create', + 'entity': { + 'id': action_id, + 'name': action_name, + 'type': action_type + }, + 'workspace_type': scope, + 'workspace_context': {} + } + if scope == 'group' and group_id: + activity_record['workspace_context']['group_id'] = group_id + + cosmos_activity_logs_container.create_item(body=activity_record) + log_event( + message=f"Action created: {action_name} ({scope}) by user {user_id}", + extra=activity_record, + level=logging.INFO + ) + debug_print(f"✅ Action creation logged: {action_name} ({scope})") + except Exception as e: + log_event( + message=f"Error logging action creation: {str(e)}", + extra={'user_id': user_id, 'action_id': action_id, 'scope': scope, 'error': str(e)}, + level=logging.ERROR + ) + debug_print(f"⚠️ Warning: Failed to log action creation: {str(e)}") + + +def log_action_update( + user_id: str, + action_id: str, + action_name: str, + action_type: Optional[str] = None, + scope: str = 'personal', + group_id: Optional[str] = None +) -> None: + """ + Log an action/plugin update activity. + + Args: + user_id: The ID of the user who updated the action + action_id: The unique ID of the action + action_name: The name of the action + action_type: The type of the action + scope: 'personal', 'group', or 'global' + group_id: The group ID (only for group scope) + """ + try: + activity_record = { + 'id': str(uuid.uuid4()), + 'user_id': user_id, + 'activity_type': 'action_update', + 'timestamp': datetime.utcnow().isoformat(), + 'created_at': datetime.utcnow().isoformat(), + 'entity_type': 'action', + 'operation': 'update', + 'entity': { + 'id': action_id, + 'name': action_name, + 'type': action_type + }, + 'workspace_type': scope, + 'workspace_context': {} + } + if scope == 'group' and group_id: + activity_record['workspace_context']['group_id'] = group_id + + cosmos_activity_logs_container.create_item(body=activity_record) + log_event( + message=f"Action updated: {action_name} ({scope}) by user {user_id}", + extra=activity_record, + level=logging.INFO + ) + debug_print(f"✅ Action update logged: {action_name} ({scope})") + except Exception as e: + log_event( + message=f"Error logging action update: {str(e)}", + extra={'user_id': user_id, 'action_id': action_id, 'scope': scope, 'error': str(e)}, + level=logging.ERROR + ) + debug_print(f"⚠️ Warning: Failed to log action update: {str(e)}") + + +def log_action_deletion( + user_id: str, + action_id: str, + action_name: str, + action_type: Optional[str] = None, + scope: str = 'personal', + group_id: Optional[str] = None +) -> None: + """ + Log an action/plugin deletion activity. + + Args: + user_id: The ID of the user who deleted the action + action_id: The unique ID of the action + action_name: The name of the action + action_type: The type of the action + scope: 'personal', 'group', or 'global' + group_id: The group ID (only for group scope) + """ + try: + activity_record = { + 'id': str(uuid.uuid4()), + 'user_id': user_id, + 'activity_type': 'action_deletion', + 'timestamp': datetime.utcnow().isoformat(), + 'created_at': datetime.utcnow().isoformat(), + 'entity_type': 'action', + 'operation': 'delete', + 'entity': { + 'id': action_id, + 'name': action_name, + 'type': action_type + }, + 'workspace_type': scope, + 'workspace_context': {} + } + if scope == 'group' and group_id: + activity_record['workspace_context']['group_id'] = group_id + + cosmos_activity_logs_container.create_item(body=activity_record) + log_event( + message=f"Action deleted: {action_name} ({scope}) by user {user_id}", + extra=activity_record, + level=logging.INFO + ) + debug_print(f"✅ Action deletion logged: {action_name} ({scope})") + except Exception as e: + log_event( + message=f"Error logging action deletion: {str(e)}", + extra={'user_id': user_id, 'action_id': action_id, 'scope': scope, 'error': str(e)}, + level=logging.ERROR + ) + debug_print(f"⚠️ Warning: Failed to log action deletion: {str(e)}") diff --git a/application/single_app/functions_content.py b/application/single_app/functions_content.py index 376d23f4..3116ed82 100644 --- a/application/single_app/functions_content.py +++ b/application/single_app/functions_content.py @@ -352,7 +352,7 @@ def generate_embedding( embedding_model = selected_embedding_model['deploymentName'] while True: - random_delay = random.uniform(0.5, 2.0) + random_delay = random.uniform(0.05, 0.2) time.sleep(random_delay) try: @@ -385,3 +385,102 @@ def generate_embedding( except Exception as e: raise + +def generate_embeddings_batch( + texts, + batch_size=16, + max_retries=5, + initial_delay=1.0, + delay_multiplier=2.0 +): + """Generate embeddings for multiple texts in batches. + + Azure OpenAI embeddings API accepts a list of strings as input. + This reduces per-call overhead and delay significantly. + + Args: + texts: List of text strings to embed. + batch_size: Number of texts per API call (default 16). + max_retries: Max retries on rate limit errors. + initial_delay: Initial retry delay in seconds. + delay_multiplier: Multiplier for exponential backoff. + + Returns: + list of (embedding, token_usage) tuples, one per input text. + """ + settings = get_settings() + + enable_embedding_apim = settings.get('enable_embedding_apim', False) + + if enable_embedding_apim: + embedding_model = settings.get('azure_apim_embedding_deployment') + embedding_client = AzureOpenAI( + api_version=settings.get('azure_apim_embedding_api_version'), + azure_endpoint=settings.get('azure_apim_embedding_endpoint'), + api_key=settings.get('azure_apim_embedding_subscription_key')) + else: + if (settings.get('azure_openai_embedding_authentication_type') == 'managed_identity'): + token_provider = get_bearer_token_provider(DefaultAzureCredential(), cognitive_services_scope) + + embedding_client = AzureOpenAI( + api_version=settings.get('azure_openai_embedding_api_version'), + azure_endpoint=settings.get('azure_openai_embedding_endpoint'), + azure_ad_token_provider=token_provider + ) + + embedding_model_obj = settings.get('embedding_model', {}) + if embedding_model_obj and embedding_model_obj.get('selected'): + selected_embedding_model = embedding_model_obj['selected'][0] + embedding_model = selected_embedding_model['deploymentName'] + else: + embedding_client = AzureOpenAI( + api_version=settings.get('azure_openai_embedding_api_version'), + azure_endpoint=settings.get('azure_openai_embedding_endpoint'), + api_key=settings.get('azure_openai_embedding_key') + ) + + embedding_model_obj = settings.get('embedding_model', {}) + if embedding_model_obj and embedding_model_obj.get('selected'): + selected_embedding_model = embedding_model_obj['selected'][0] + embedding_model = selected_embedding_model['deploymentName'] + + results = [] + for i in range(0, len(texts), batch_size): + batch = texts[i:i + batch_size] + retries = 0 + current_delay = initial_delay + + while True: + random_delay = random.uniform(0.05, 0.2) + time.sleep(random_delay) + + try: + response = embedding_client.embeddings.create( + model=embedding_model, + input=batch + ) + + for item in response.data: + token_usage = None + if hasattr(response, 'usage') and response.usage: + token_usage = { + 'prompt_tokens': response.usage.prompt_tokens // len(batch), + 'total_tokens': response.usage.total_tokens // len(batch), + 'model_deployment_name': embedding_model + } + results.append((item.embedding, token_usage)) + break + + except RateLimitError as e: + retries += 1 + if retries > max_retries: + raise + + wait_time = current_delay * random.uniform(1.0, 1.5) + time.sleep(wait_time) + current_delay *= delay_multiplier + + except Exception as e: + raise + + return results diff --git a/application/single_app/functions_documents.py b/application/single_app/functions_documents.py index ce08066d..110afbd2 100644 --- a/application/single_app/functions_documents.py +++ b/application/single_app/functions_documents.py @@ -1646,6 +1646,191 @@ def save_chunks(page_text_content, page_number, file_name, user_id, document_id, # Return token usage information for accumulation return token_usage +def save_chunks_batch(chunks_data, user_id, document_id, group_id=None, public_workspace_id=None): + """ + Save multiple chunks at once using batch embedding and batch AI Search upload. + Significantly faster than calling save_chunks() per chunk. + + Args: + chunks_data: list of dicts with keys: page_text_content, page_number, file_name + user_id: The user ID + document_id: The document ID + group_id: Optional group ID for group documents + public_workspace_id: Optional public workspace ID for public documents + + Returns: + dict with 'total_tokens', 'prompt_tokens', 'model_deployment_name' + """ + from functions_content import generate_embeddings_batch + + current_time = datetime.now(timezone.utc).strftime('%Y-%m-%dT%H:%M:%SZ') + is_group = group_id is not None + is_public_workspace = public_workspace_id is not None + + # Retrieve metadata once for all chunks + try: + if is_public_workspace: + metadata = get_document_metadata( + document_id=document_id, + user_id=user_id, + public_workspace_id=public_workspace_id + ) + elif is_group: + metadata = get_document_metadata( + document_id=document_id, + user_id=user_id, + group_id=group_id + ) + else: + metadata = get_document_metadata( + document_id=document_id, + user_id=user_id + ) + + if not metadata: + raise ValueError(f"No metadata found for document {document_id}") + + version = metadata.get("version") if metadata.get("version") else 1 + except Exception as e: + log_event(f"[save_chunks_batch] Error retrieving metadata for document {document_id}: {repr(e)}", level=logging.ERROR) + raise + + # Generate all embeddings in batches + texts = [c['page_text_content'] for c in chunks_data] + try: + embedding_results = generate_embeddings_batch(texts) + except Exception as e: + log_event(f"[save_chunks_batch] Error generating batch embeddings for document {document_id}: {e}", level=logging.ERROR) + raise + + # Check for vision analysis once + vision_analysis = metadata.get('vision_analysis') + vision_text = "" + if vision_analysis: + vision_text_parts = [] + vision_text_parts.append("\n\n=== AI Vision Analysis ===") + vision_text_parts.append(f"Model: {vision_analysis.get('model', 'unknown')}") + if vision_analysis.get('description'): + vision_text_parts.append(f"\nDescription: {vision_analysis['description']}") + if vision_analysis.get('objects'): + objects_list = vision_analysis['objects'] + if isinstance(objects_list, list): + vision_text_parts.append(f"\nObjects Detected: {', '.join(objects_list)}") + else: + vision_text_parts.append(f"\nObjects Detected: {objects_list}") + if vision_analysis.get('text'): + vision_text_parts.append(f"\nVisible Text: {vision_analysis['text']}") + if vision_analysis.get('analysis'): + vision_text_parts.append(f"\nContextual Analysis: {vision_analysis['analysis']}") + vision_text = "\n".join(vision_text_parts) + + # Build all chunk documents + chunk_documents = [] + total_token_usage = {'total_tokens': 0, 'prompt_tokens': 0, 'model_deployment_name': None} + + for idx, chunk_info in enumerate(chunks_data): + embedding, token_usage = embedding_results[idx] + page_number = chunk_info['page_number'] + file_name = chunk_info['file_name'] + page_text_content = chunk_info['page_text_content'] + + if token_usage: + total_token_usage['total_tokens'] += token_usage.get('total_tokens', 0) + total_token_usage['prompt_tokens'] += token_usage.get('prompt_tokens', 0) + if not total_token_usage['model_deployment_name']: + total_token_usage['model_deployment_name'] = token_usage.get('model_deployment_name') + + chunk_id = f"{document_id}_{page_number}" + enhanced_chunk_text = page_text_content + vision_text if vision_text else page_text_content + + if is_public_workspace: + chunk_document = { + "id": chunk_id, + "document_id": document_id, + "chunk_id": str(page_number), + "chunk_text": enhanced_chunk_text, + "embedding": embedding, + "file_name": file_name, + "chunk_keywords": [], + "chunk_summary": "", + "page_number": page_number, + "author": [], + "title": "", + "document_classification": "None", + "document_tags": metadata.get('tags', []), + "chunk_sequence": page_number, + "upload_date": current_time, + "version": version, + "public_workspace_id": public_workspace_id + } + elif is_group: + shared_group_ids = metadata.get('shared_group_ids', []) if metadata else [] + chunk_document = { + "id": chunk_id, + "document_id": document_id, + "chunk_id": str(page_number), + "chunk_text": enhanced_chunk_text, + "embedding": embedding, + "file_name": file_name, + "chunk_keywords": [], + "chunk_summary": "", + "page_number": page_number, + "author": [], + "title": "", + "document_classification": "None", + "document_tags": metadata.get('tags', []), + "chunk_sequence": page_number, + "upload_date": current_time, + "version": version, + "group_id": group_id, + "shared_group_ids": shared_group_ids + } + else: + shared_user_ids = metadata.get('shared_user_ids', []) if metadata else [] + chunk_document = { + "id": chunk_id, + "document_id": document_id, + "chunk_id": str(page_number), + "chunk_text": enhanced_chunk_text, + "embedding": embedding, + "file_name": file_name, + "chunk_keywords": [], + "chunk_summary": "", + "page_number": page_number, + "author": [], + "title": "", + "document_classification": "None", + "document_tags": metadata.get('tags', []), + "chunk_sequence": page_number, + "upload_date": current_time, + "version": version, + "user_id": user_id, + "shared_user_ids": shared_user_ids + } + + chunk_documents.append(chunk_document) + + # Batch upload to AI Search + try: + if is_public_workspace: + search_client = CLIENTS["search_client_public"] + elif is_group: + search_client = CLIENTS["search_client_group"] + else: + search_client = CLIENTS["search_client_user"] + + # Upload in sub-batches of 32 to avoid request size limits + upload_batch_size = 32 + for i in range(0, len(chunk_documents), upload_batch_size): + sub_batch = chunk_documents[i:i + upload_batch_size] + search_client.upload_documents(documents=sub_batch) + + except Exception as e: + log_event(f"[save_chunks_batch] Error uploading batch to AI Search for document {document_id}: {e}", level=logging.ERROR) + raise + + return total_token_usage + def get_document_metadata_for_citations(document_id, user_id=None, group_id=None, public_workspace_id=None): """ Retrieve keywords and abstract from a document for creating metadata citations. @@ -4669,37 +4854,30 @@ def process_single_tabular_sheet(df, document_id, user_id, file_name, update_cal # Consider accumulating page count in the caller if needed. update_callback(number_of_pages=num_chunks_final) - # Save chunks, prepending the header to each + # Save chunks, prepending the header to each — use batch processing for speed + all_chunks = [] for idx, chunk_rows_content in enumerate(final_chunks_content, start=1): - # Prepend header - header length does not count towards chunk size limit chunk_with_header = header_string + chunk_rows_content - - update_callback( - current_file_chunk=idx, - status=f"Saving chunk {idx}/{num_chunks_final} from {file_name}..." - ) - - args = { + all_chunks.append({ "page_text_content": chunk_with_header, "page_number": idx, - "file_name": file_name, - "user_id": user_id, - "document_id": document_id - } + "file_name": file_name + }) - if is_public_workspace: - args["public_workspace_id"] = public_workspace_id - elif is_group: - args["group_id"] = group_id + if all_chunks: + update_callback( + current_file_chunk=1, + status=f"Batch processing {num_chunks_final} chunks from {file_name}..." + ) - token_usage = save_chunks(**args) - total_chunks_saved += 1 - - # Accumulate embedding tokens - if token_usage: - total_embedding_tokens += token_usage.get('total_tokens', 0) - if not embedding_model_name: - embedding_model_name = token_usage.get('model_deployment_name') + batch_token_usage = save_chunks_batch( + all_chunks, user_id, document_id, + group_id=group_id, public_workspace_id=public_workspace_id + ) + total_chunks_saved = len(all_chunks) + if batch_token_usage: + total_embedding_tokens = batch_token_usage.get('total_tokens', 0) + embedding_model_name = batch_token_usage.get('model_deployment_name') return total_chunks_saved, total_embedding_tokens, embedding_model_name @@ -4729,63 +4907,75 @@ def process_tabular(document_id, user_id, temp_file_path, original_filename, fil args["group_id"] = group_id upload_to_blob(**args) + update_callback(enhanced_citations=True, status=f"Enhanced citations enabled for {file_ext}") - try: - if file_ext == '.csv': - # Process CSV - # Read CSV, attempt to infer header, keep data as string initially - df = pandas.read_csv( - temp_file_path, - keep_default_na=False, - dtype=str + # When enhanced citations is on, index a single schema summary chunk + # instead of row-by-row chunking. The tabular processing plugin handles analysis. + if enable_enhanced_citations: + try: + if file_ext == '.csv': + df_preview = pandas.read_csv(temp_file_path, keep_default_na=False, dtype=str, nrows=5) + full_df = pandas.read_csv(temp_file_path, keep_default_na=False, dtype=str) + elif file_ext in ('.xlsx', '.xls', '.xlsm'): + engine = 'openpyxl' if file_ext in ('.xlsx', '.xlsm') else 'xlrd' + df_preview = pandas.read_excel(temp_file_path, engine=engine, keep_default_na=False, dtype=str, nrows=5) + full_df = pandas.read_excel(temp_file_path, engine=engine, keep_default_na=False, dtype=str) + else: + raise ValueError(f"Unsupported tabular file type: {file_ext}") + + row_count = len(full_df) + columns = list(df_preview.columns) + preview_rows = df_preview.head(5).to_string(index=False) + + schema_summary = ( + f"Tabular data file: {original_filename}\n" + f"Columns ({len(columns)}): {', '.join(columns)}\n" + f"Total rows: {row_count}\n" + f"Preview (first 5 rows):\n{preview_rows}\n\n" + f"This file is available for detailed analysis via the Tabular Processing plugin." ) - args = { - "df": df, - "document_id": document_id, - "user_id": user_id, + + update_callback(number_of_pages=1, status=f"Indexing schema summary for {original_filename}...") + + save_args = { + "page_text_content": schema_summary, + "page_number": 1, "file_name": original_filename, - "update_callback": update_callback + "user_id": user_id, + "document_id": document_id } - if is_public_workspace: - args["public_workspace_id"] = public_workspace_id + save_args["public_workspace_id"] = public_workspace_id elif is_group: - args["group_id"] = group_id + save_args["group_id"] = group_id - result = process_single_tabular_sheet(**args) - if isinstance(result, tuple) and len(result) == 3: - chunks, tokens, model = result - total_chunks_saved = chunks - total_embedding_tokens += tokens - if not embedding_model_name: - embedding_model_name = model - else: - total_chunks_saved = result - - elif file_ext in ('.xlsx', '.xls', '.xlsm'): - # Process Excel (potentially multiple sheets) - excel_file = pandas.ExcelFile( - temp_file_path, - engine='openpyxl' if file_ext in ('.xlsx', '.xlsm') else 'xlrd' - ) - sheet_names = excel_file.sheet_names - base_name, ext = os.path.splitext(original_filename) - - accumulated_total_chunks = 0 - for sheet_name in sheet_names: - update_callback(status=f"Processing sheet '{sheet_name}'...") - # Read specific sheet, get values (not formulas), keep data as string - # Note: pandas typically reads values, not formulas by default. - df = excel_file.parse(sheet_name, keep_default_na=False, dtype=str) + token_usage = save_chunks(**save_args) + total_chunks_saved = 1 + if token_usage: + total_embedding_tokens = token_usage.get('total_tokens', 0) + embedding_model_name = token_usage.get('model_deployment_name') - # Create effective filename for this sheet - effective_filename = f"{base_name}-{sheet_name}{ext}" if len(sheet_names) > 1 else original_filename + # Don't return here — fall through to metadata extraction below + except Exception as e: + log_event(f"[process_tabular] Error creating schema summary, falling back to row-by-row: {e}", level=logging.WARNING) + # Fall through to existing row-by-row processing + # Only do row-by-row chunking if schema-only didn't produce chunks + if total_chunks_saved == 0: + try: + if file_ext == '.csv': + # Process CSV + # Read CSV, attempt to infer header, keep data as string initially + df = pandas.read_csv( + temp_file_path, + keep_default_na=False, + dtype=str + ) args = { "df": df, "document_id": document_id, "user_id": user_id, - "file_name": effective_filename, + "file_name": original_filename, "update_callback": update_callback } @@ -4797,21 +4987,62 @@ def process_tabular(document_id, user_id, temp_file_path, original_filename, fil result = process_single_tabular_sheet(**args) if isinstance(result, tuple) and len(result) == 3: chunks, tokens, model = result - accumulated_total_chunks += chunks + total_chunks_saved = chunks total_embedding_tokens += tokens if not embedding_model_name: embedding_model_name = model else: - accumulated_total_chunks += result + total_chunks_saved = result - total_chunks_saved = accumulated_total_chunks # Total across all sheets + elif file_ext in ('.xlsx', '.xls', '.xlsm'): + # Process Excel (potentially multiple sheets) + excel_file = pandas.ExcelFile( + temp_file_path, + engine='openpyxl' if file_ext in ('.xlsx', '.xlsm') else 'xlrd' + ) + sheet_names = excel_file.sheet_names + base_name, ext = os.path.splitext(original_filename) + accumulated_total_chunks = 0 + for sheet_name in sheet_names: + update_callback(status=f"Processing sheet '{sheet_name}'...") + # Read specific sheet, get values (not formulas), keep data as string + # Note: pandas typically reads values, not formulas by default. + df = excel_file.parse(sheet_name, keep_default_na=False, dtype=str) - except pandas.errors.EmptyDataError: - print(f"Warning: Tabular file or sheet is empty: {original_filename}") - update_callback(status=f"Warning: File/sheet is empty - {original_filename}", number_of_pages=0) - except Exception as e: - raise Exception(f"Failed processing Tabular file {original_filename}: {e}") + # Create effective filename for this sheet + effective_filename = f"{base_name}-{sheet_name}{ext}" if len(sheet_names) > 1 else original_filename + + args = { + "df": df, + "document_id": document_id, + "user_id": user_id, + "file_name": effective_filename, + "update_callback": update_callback + } + + if is_public_workspace: + args["public_workspace_id"] = public_workspace_id + elif is_group: + args["group_id"] = group_id + + result = process_single_tabular_sheet(**args) + if isinstance(result, tuple) and len(result) == 3: + chunks, tokens, model = result + accumulated_total_chunks += chunks + total_embedding_tokens += tokens + if not embedding_model_name: + embedding_model_name = model + else: + accumulated_total_chunks += result + + total_chunks_saved = accumulated_total_chunks # Total across all sheets + + except pandas.errors.EmptyDataError: + log_event(f"[process_tabular] Warning: Tabular file or sheet is empty: {original_filename}", level=logging.WARNING) + update_callback(status=f"Warning: File/sheet is empty - {original_filename}", number_of_pages=0) + except Exception as e: + raise Exception(f"Failed processing Tabular file {original_filename}: {e}") # Extract metadata if enabled and chunks were processed settings = get_settings() diff --git a/application/single_app/functions_global_actions.py b/application/single_app/functions_global_actions.py index 91f0d9f9..4d7293cd 100644 --- a/application/single_app/functions_global_actions.py +++ b/application/single_app/functions_global_actions.py @@ -60,12 +60,13 @@ def get_global_action(action_id, return_type=SecretReturnType.TRIGGER): return None -def save_global_action(action_data): +def save_global_action(action_data, user_id=None): """ Save or update a global action. Args: action_data (dict): Action data to save + user_id (str, optional): The user ID of the person performing the action Returns: dict: Saved action data or None if failed @@ -76,8 +77,27 @@ def save_global_action(action_data): action_data['id'] = str(uuid.uuid4()) # Add metadata action_data['is_global'] = True - action_data['created_at'] = datetime.utcnow().isoformat() - action_data['updated_at'] = datetime.utcnow().isoformat() + now = datetime.utcnow().isoformat() + + # Check if this is a new action or an update to preserve created_by/created_at + existing_action = None + try: + existing_action = cosmos_global_actions_container.read_item( + item=action_data['id'], + partition_key=action_data['id'] + ) + except Exception: + pass + + if existing_action: + action_data['created_by'] = existing_action.get('created_by', user_id) + action_data['created_at'] = existing_action.get('created_at', now) + else: + action_data['created_by'] = user_id + action_data['created_at'] = now + action_data['modified_by'] = user_id + action_data['modified_at'] = now + action_data['updated_at'] = now print(f"💾 Saving global action: {action_data.get('name', 'Unknown')}") # Store secrets in Key Vault before upsert action_data = keyvault_plugin_save_helper(action_data, scope_value=action_data.get('id'), scope="global") diff --git a/application/single_app/functions_global_agents.py b/application/single_app/functions_global_agents.py index 5cf6a3d4..87976510 100644 --- a/application/single_app/functions_global_agents.py +++ b/application/single_app/functions_global_agents.py @@ -163,25 +163,46 @@ def get_global_agent(agent_id): return None -def save_global_agent(agent_data): +def save_global_agent(agent_data, user_id=None): """ Save or update a global agent. Args: agent_data (dict): Agent data to save + user_id (str, optional): The user ID of the person performing the action Returns: dict: Saved agent data or None if failed """ try: - user_id = get_current_user_id() + if user_id is None: + user_id = get_current_user_id() cleaned_agent = sanitize_agent_payload(agent_data) if 'id' not in cleaned_agent: cleaned_agent['id'] = str(uuid.uuid4()) cleaned_agent['is_global'] = True cleaned_agent['is_group'] = False - cleaned_agent['created_at'] = datetime.utcnow().isoformat() - cleaned_agent['updated_at'] = datetime.utcnow().isoformat() + now = datetime.utcnow().isoformat() + + # Check if this is a new agent or an update to preserve created_by/created_at + existing_agent = None + try: + existing_agent = cosmos_global_agents_container.read_item( + item=cleaned_agent['id'], + partition_key=cleaned_agent['id'] + ) + except Exception: + pass + + if existing_agent: + cleaned_agent['created_by'] = existing_agent.get('created_by', user_id) + cleaned_agent['created_at'] = existing_agent.get('created_at', now) + else: + cleaned_agent['created_by'] = user_id + cleaned_agent['created_at'] = now + cleaned_agent['modified_by'] = user_id + cleaned_agent['modified_at'] = now + cleaned_agent['updated_at'] = now log_event( "Saving global agent.", extra={"agent_name": cleaned_agent.get('name', 'Unknown')}, diff --git a/application/single_app/functions_group_actions.py b/application/single_app/functions_group_actions.py index bc6aa4ea..c0d264b1 100644 --- a/application/single_app/functions_group_actions.py +++ b/application/single_app/functions_group_actions.py @@ -82,14 +82,36 @@ def get_group_action( return _clean_action(action, group_id, return_type) -def save_group_action(group_id: str, action_data: Dict[str, Any]) -> Dict[str, Any]: +def save_group_action(group_id: str, action_data: Dict[str, Any], user_id: Optional[str] = None) -> Dict[str, Any]: """Create or update a group action entry.""" payload = dict(action_data) action_id = payload.get("id") or str(uuid.uuid4()) payload["id"] = action_id payload["group_id"] = group_id - payload["last_updated"] = datetime.utcnow().isoformat() + now = datetime.utcnow().isoformat() + payload["last_updated"] = now + + # Track who created/modified this action + existing_action = None + try: + existing_action = cosmos_group_actions_container.read_item( + item=action_id, + partition_key=group_id, + ) + except exceptions.CosmosResourceNotFoundError: + pass + except Exception: + pass + + if existing_action: + payload["created_by"] = existing_action.get("created_by", user_id) + payload["created_at"] = existing_action.get("created_at", now) + else: + payload["created_by"] = user_id + payload["created_at"] = now + payload["modified_by"] = user_id + payload["modified_at"] = now payload.setdefault("name", "") payload.setdefault("displayName", payload.get("name", "")) diff --git a/application/single_app/functions_group_agents.py b/application/single_app/functions_group_agents.py index 8bf6f87c..7cbb8324 100644 --- a/application/single_app/functions_group_agents.py +++ b/application/single_app/functions_group_agents.py @@ -63,16 +63,38 @@ def get_group_agent(group_id: str, agent_id: str) -> Optional[Dict[str, Any]]: return None -def save_group_agent(group_id: str, agent_data: Dict[str, Any]) -> Dict[str, Any]: +def save_group_agent(group_id: str, agent_data: Dict[str, Any], user_id: Optional[str] = None) -> Dict[str, Any]: """Create or update a group agent entry.""" payload = sanitize_agent_payload(agent_data) agent_id = payload.get("id") or str(uuid.uuid4()) payload["id"] = agent_id payload["group_id"] = group_id - payload["last_updated"] = datetime.utcnow().isoformat() + now = datetime.utcnow().isoformat() + payload["last_updated"] = now payload["is_global"] = False payload["is_group"] = True + # Track who created/modified this agent + existing_agent = None + try: + existing_agent = cosmos_group_agents_container.read_item( + item=agent_id, + partition_key=group_id, + ) + except exceptions.CosmosResourceNotFoundError: + pass + except Exception: + pass + + if existing_agent: + payload["created_by"] = existing_agent.get("created_by", user_id) + payload["created_at"] = existing_agent.get("created_at", now) + else: + payload["created_by"] = user_id + payload["created_at"] = now + payload["modified_by"] = user_id + payload["modified_at"] = now + # Required/defaulted fields payload.setdefault("name", "") payload.setdefault("display_name", payload.get("name", "")) diff --git a/application/single_app/functions_personal_actions.py b/application/single_app/functions_personal_actions.py index 6345438e..91d849f3 100644 --- a/application/single_app/functions_personal_actions.py +++ b/application/single_app/functions_personal_actions.py @@ -113,15 +113,26 @@ def save_personal_action(user_id, action_data): existing_action = get_personal_action(user_id, action_data['name']) # Preserve existing ID if updating, or generate new ID if creating + now = datetime.utcnow().isoformat() if existing_action: - # Update existing action - preserve the original ID + # Update existing action - preserve the original ID and creation tracking action_data['id'] = existing_action['id'] + action_data['created_by'] = existing_action.get('created_by', user_id) + action_data['created_at'] = existing_action.get('created_at', now) elif 'id' not in action_data or not action_data['id']: # New action - generate UUID for ID action_data['id'] = str(uuid.uuid4()) - + action_data['created_by'] = user_id + action_data['created_at'] = now + else: + # Has an ID but no existing action found - treat as new + action_data['created_by'] = user_id + action_data['created_at'] = now + action_data['modified_by'] = user_id + action_data['modified_at'] = now + action_data['user_id'] = user_id - action_data['last_updated'] = datetime.utcnow().isoformat() + action_data['last_updated'] = now # Validate required fields required_fields = ['name', 'displayName', 'type', 'description'] diff --git a/application/single_app/functions_personal_agents.py b/application/single_app/functions_personal_agents.py index a4a5e47d..3c6c275e 100644 --- a/application/single_app/functions_personal_agents.py +++ b/application/single_app/functions_personal_agents.py @@ -128,9 +128,33 @@ def save_personal_agent(user_id, agent_data): cleaned_agent.setdefault(field, '') if 'id' not in cleaned_agent: cleaned_agent['id'] = str(f"{user_id}_{cleaned_agent.get('name', 'default')}") - + + # Check if this is a new agent or an update to preserve created_by/created_at + existing_agent = None + try: + existing_agent = cosmos_personal_agents_container.read_item( + item=cleaned_agent['id'], + partition_key=user_id + ) + except exceptions.CosmosResourceNotFoundError: + pass + except Exception: + pass + + now = datetime.utcnow().isoformat() + if existing_agent: + # Preserve original creation tracking + cleaned_agent['created_by'] = existing_agent.get('created_by', user_id) + cleaned_agent['created_at'] = existing_agent.get('created_at', now) + else: + # New agent + cleaned_agent['created_by'] = user_id + cleaned_agent['created_at'] = now + cleaned_agent['modified_by'] = user_id + cleaned_agent['modified_at'] = now + cleaned_agent['user_id'] = user_id - cleaned_agent['last_updated'] = datetime.utcnow().isoformat() + cleaned_agent['last_updated'] = now cleaned_agent['is_global'] = False cleaned_agent['is_group'] = False diff --git a/application/single_app/functions_settings.py b/application/single_app/functions_settings.py index 8176939d..f3dc59de 100644 --- a/application/single_app/functions_settings.py +++ b/application/single_app/functions_settings.py @@ -25,6 +25,7 @@ def get_settings(use_cosmos=False): 'enable_text_plugin': True, 'enable_default_embedding_model_plugin': False, 'enable_fact_memory_plugin': True, + 'enable_tabular_processing_plugin': False, 'enable_multi_agent_orchestration': False, 'max_rounds_per_agent': 1, 'enable_semantic_kernel': False, @@ -205,6 +206,9 @@ def get_settings(use_cosmos=False): 'require_member_of_feedback_admin': False, 'enable_conversation_archiving': False, + # Processing Thoughts + 'enable_thoughts': False, + # Search and Extract 'azure_ai_search_endpoint': '', 'azure_ai_search_key': '', @@ -391,6 +395,9 @@ def update_settings(new_settings): # always fetch the latest settings doc, which includes your merges settings_item = get_settings() settings_item.update(new_settings) + # Dependency enforcement: tabular processing requires enhanced citations + if not settings_item.get('enable_enhanced_citations', False): + settings_item['enable_tabular_processing_plugin'] = False cosmos_settings_container.upsert_item(settings_item) cache_updater = getattr(app_settings_cache, "update_settings_cache", None) if callable(cache_updater): diff --git a/application/single_app/functions_thoughts.py b/application/single_app/functions_thoughts.py new file mode 100644 index 00000000..c6ffe9dd --- /dev/null +++ b/application/single_app/functions_thoughts.py @@ -0,0 +1,256 @@ +# functions_thoughts.py + +import uuid +import time +from datetime import datetime, timezone +from config import cosmos_thoughts_container, cosmos_archived_thoughts_container, cosmos_messages_container +from functions_appinsights import log_event +from functions_settings import get_settings + + +class ThoughtTracker: + """Stateful per-request tracker that writes processing step records to Cosmos DB. + + Each add_thought() call immediately upserts a document so that polling + clients can see partial progress before the final response is sent. + + All Cosmos writes are wrapped in try/except so thought errors never + interrupt the chat processing flow. + """ + + def __init__(self, conversation_id, message_id, thread_id, user_id): + self.conversation_id = conversation_id + self.message_id = message_id + self.thread_id = thread_id + self.user_id = user_id + self.current_index = 0 + settings = get_settings() + self.enabled = settings.get('enable_thoughts', False) + + def add_thought(self, step_type, content, detail=None): + """Write a thought step to Cosmos immediately. + + Args: + step_type: One of search, tabular_analysis, web_search, + agent_tool_call, generation, content_safety. + content: Short human-readable description of the step. + detail: Optional technical detail (function names, params, etc.). + + Returns: + The thought document id, or None if disabled/failed. + """ + if not self.enabled: + return None + + thought_id = str(uuid.uuid4()) + thought_doc = { + 'id': thought_id, + 'conversation_id': self.conversation_id, + 'message_id': self.message_id, + 'thread_id': self.thread_id, + 'user_id': self.user_id, + 'step_index': self.current_index, + 'step_type': step_type, + 'content': content, + 'detail': detail, + 'duration_ms': None, + 'timestamp': datetime.now(timezone.utc).isoformat() + } + self.current_index += 1 + + try: + cosmos_thoughts_container.upsert_item(thought_doc) + except Exception as e: + log_event(f"ThoughtTracker.add_thought failed: {e}", level="WARNING") + return None + + return thought_id + + def complete_thought(self, thought_id, duration_ms): + """Patch an existing thought with its duration after the step finishes.""" + if not self.enabled or not thought_id: + return + + try: + thought_doc = cosmos_thoughts_container.read_item( + item=thought_id, + partition_key=self.user_id + ) + thought_doc['duration_ms'] = duration_ms + cosmos_thoughts_container.upsert_item(thought_doc) + except Exception as e: + log_event(f"ThoughtTracker.complete_thought failed: {e}", level="WARNING") + + def timed_thought(self, step_type, content, detail=None): + """Convenience: add a thought and return a timer helper. + + Usage: + timer = tracker.timed_thought('search', 'Searching documents...') + # ... do work ... + timer.stop() + """ + start = time.time() + thought_id = self.add_thought(step_type, content, detail) + return _ThoughtTimer(self, thought_id, start) + + +class _ThoughtTimer: + """Helper returned by ThoughtTracker.timed_thought() for auto-duration capture.""" + + def __init__(self, tracker, thought_id, start_time): + self._tracker = tracker + self._thought_id = thought_id + self._start = start_time + + def stop(self): + elapsed_ms = int((time.time() - self._start) * 1000) + self._tracker.complete_thought(self._thought_id, elapsed_ms) + return elapsed_ms + + +# --------------------------------------------------------------------------- +# CRUD helpers +# --------------------------------------------------------------------------- + +def get_thoughts_for_message(conversation_id, message_id, user_id): + """Return all thoughts for a specific assistant message, ordered by step_index.""" + try: + query = ( + "SELECT * FROM c " + "WHERE c.conversation_id = @conv_id " + "AND c.message_id = @msg_id " + "ORDER BY c.step_index ASC" + ) + params = [ + {"name": "@conv_id", "value": conversation_id}, + {"name": "@msg_id", "value": message_id}, + ] + results = list(cosmos_thoughts_container.query_items( + query=query, + parameters=params, + partition_key=user_id + )) + return results + except Exception as e: + log_event(f"get_thoughts_for_message failed: {e}", level="WARNING") + return [] + + +def get_pending_thoughts(conversation_id, user_id): + """Return the latest thoughts for a conversation that are still in-progress. + + Used by the polling endpoint. Retrieves thoughts created within the last + 5 minutes for the conversation, grouped by the most recent message_id. + """ + try: + five_minutes_ago = datetime.now(timezone.utc) + from datetime import timedelta + five_minutes_ago = (five_minutes_ago - timedelta(minutes=5)).isoformat() + + query = ( + "SELECT * FROM c " + "WHERE c.conversation_id = @conv_id " + "AND c.timestamp >= @since " + "ORDER BY c.timestamp DESC" + ) + params = [ + {"name": "@conv_id", "value": conversation_id}, + {"name": "@since", "value": five_minutes_ago}, + ] + results = list(cosmos_thoughts_container.query_items( + query=query, + parameters=params, + partition_key=user_id + )) + + if not results: + return [] + + # Group by the most recent message_id + latest_message_id = results[0].get('message_id') + latest_thoughts = [ + t for t in results if t.get('message_id') == latest_message_id + ] + # Return in ascending step_index order + latest_thoughts.sort(key=lambda t: t.get('step_index', 0)) + return latest_thoughts + except Exception as e: + log_event(f"get_pending_thoughts failed: {e}", level="WARNING") + return [] + + +def get_thoughts_for_conversation(conversation_id, user_id): + """Return all thoughts for a conversation.""" + try: + query = ( + "SELECT * FROM c " + "WHERE c.conversation_id = @conv_id " + "ORDER BY c.timestamp ASC" + ) + params = [ + {"name": "@conv_id", "value": conversation_id}, + ] + results = list(cosmos_thoughts_container.query_items( + query=query, + parameters=params, + partition_key=user_id + )) + return results + except Exception as e: + log_event(f"get_thoughts_for_conversation failed: {e}", level="WARNING") + return [] + + +def archive_thoughts_for_conversation(conversation_id, user_id): + """Copy all thoughts for a conversation to the archive container, then delete originals.""" + try: + thoughts = get_thoughts_for_conversation(conversation_id, user_id) + for thought in thoughts: + archived = dict(thought) + archived['archived_at'] = datetime.now(timezone.utc).isoformat() + cosmos_archived_thoughts_container.upsert_item(archived) + + for thought in thoughts: + cosmos_thoughts_container.delete_item( + item=thought['id'], + partition_key=user_id + ) + except Exception as e: + log_event(f"archive_thoughts_for_conversation failed: {e}", level="WARNING") + + +def delete_thoughts_for_conversation(conversation_id, user_id): + """Delete all thoughts for a conversation.""" + try: + thoughts = get_thoughts_for_conversation(conversation_id, user_id) + for thought in thoughts: + cosmos_thoughts_container.delete_item( + item=thought['id'], + partition_key=user_id + ) + except Exception as e: + log_event(f"delete_thoughts_for_conversation failed: {e}", level="WARNING") + + +def delete_thoughts_for_message(message_id, user_id): + """Delete all thoughts associated with a specific assistant message.""" + try: + query = ( + "SELECT * FROM c " + "WHERE c.message_id = @msg_id" + ) + params = [ + {"name": "@msg_id", "value": message_id}, + ] + results = list(cosmos_thoughts_container.query_items( + query=query, + parameters=params, + partition_key=user_id + )) + for thought in results: + cosmos_thoughts_container.delete_item( + item=thought['id'], + partition_key=user_id + ) + except Exception as e: + log_event(f"delete_thoughts_for_message failed: {e}", level="WARNING") diff --git a/application/single_app/route_backend_agents.py b/application/single_app/route_backend_agents.py index 57097ee5..2f631af7 100644 --- a/application/single_app/route_backend_agents.py +++ b/application/single_app/route_backend_agents.py @@ -23,6 +23,11 @@ from functions_appinsights import log_event from json_schema_validation import validate_agent from swagger_wrapper import swagger_route, get_auth_security +from functions_activity_logging import ( + log_agent_creation, + log_agent_update, + log_agent_deletion, +) bpa = Blueprint('admin_agents', __name__) @@ -147,6 +152,18 @@ def set_user_agents(): for agent_name in agents_to_delete: delete_personal_agent(user_id, agent_name) + # Log individual agent activities + for agent in filtered_agents: + a_name = agent.get('name', '') + a_id = agent.get('id', '') + a_display = agent.get('display_name', a_name) + if a_name in current_agent_names: + log_agent_update(user_id=user_id, agent_id=a_id, agent_name=a_name, agent_display_name=a_display, scope='personal') + else: + log_agent_creation(user_id=user_id, agent_id=a_id, agent_name=a_name, agent_display_name=a_display, scope='personal') + for agent_name in agents_to_delete: + log_agent_deletion(user_id=user_id, agent_id=agent_name, agent_name=agent_name, scope='personal') + log_event("User agents updated", extra={"user_id": user_id, "agents_count": len(filtered_agents)}) return jsonify({'success': True}) @@ -175,6 +192,9 @@ def delete_user_agent(agent_name): # Delete from personal_agents container delete_personal_agent(user_id, agent_name) + # Log agent deletion activity + log_agent_deletion(user_id=user_id, agent_id=agent_to_delete.get('id', agent_name), agent_name=agent_name, scope='personal') + # Check if there are any agents left and if they match global_selected_agent remaining_agents = get_personal_agents(user_id) if len(remaining_agents) > 0: @@ -270,11 +290,12 @@ def create_group_agent_route(): cleaned_payload.pop(key, None) try: - saved = save_group_agent(active_group, cleaned_payload) + saved = save_group_agent(active_group, cleaned_payload, user_id=user_id) except Exception as exc: debug_print('Failed to save group agent: %s', exc) return jsonify({'error': 'Unable to save agent'}), 500 + log_agent_creation(user_id=user_id, agent_id=saved.get('id', ''), agent_name=saved.get('name', ''), agent_display_name=saved.get('display_name', ''), scope='group', group_id=active_group) return jsonify(saved), 201 @@ -325,11 +346,12 @@ def update_group_agent_route(agent_id): return jsonify({'error': str(exc)}), 400 try: - saved = save_group_agent(active_group, cleaned_payload) + saved = save_group_agent(active_group, cleaned_payload, user_id=user_id) except Exception as exc: debug_print('Failed to update group agent %s: %s', agent_id, exc) return jsonify({'error': 'Unable to update agent'}), 500 + log_agent_update(user_id=user_id, agent_id=agent_id, agent_name=saved.get('name', ''), agent_display_name=saved.get('display_name', ''), scope='group', group_id=active_group) return jsonify(saved), 200 @@ -360,6 +382,7 @@ def delete_group_agent_route(agent_id): if not removed: return jsonify({'error': 'Agent not found'}), 404 + log_agent_deletion(user_id=user_id, agent_id=agent_id, agent_name=agent_id, scope='group', group_id=active_group) return jsonify({'message': 'Agent deleted'}), 200 # User endpoint to set selected agent (new model, not legacy default_agent) @@ -504,10 +527,11 @@ def add_agent(): cleaned_agent['id'] = '15b0c92a-741d-42ff-ba0b-367c7ee0c848' # Save to global agents container - result = save_global_agent(cleaned_agent) + result = save_global_agent(cleaned_agent, user_id=str(get_current_user_id())) if not result: return jsonify({'error': 'Failed to save agent.'}), 500 + log_agent_creation(user_id=str(get_current_user_id()), agent_id=cleaned_agent.get('id', ''), agent_name=cleaned_agent.get('name', ''), agent_display_name=cleaned_agent.get('display_name', ''), scope='global') log_event("Agent added", extra={"action": "add", "agent": {k: v for k, v in cleaned_agent.items() if k != 'id'}, "user": str(get_current_user_id())}) # --- HOT RELOAD TRIGGER --- setattr(builtins, "kernel_reload_needed", True) @@ -615,10 +639,11 @@ def edit_agent(agent_name): return jsonify({'error': 'Agent not found.'}), 404 # Save the updated agent - result = save_global_agent(cleaned_agent) + result = save_global_agent(cleaned_agent, user_id=str(get_current_user_id())) if not result: return jsonify({'error': 'Failed to save agent.'}), 500 + log_agent_update(user_id=str(get_current_user_id()), agent_id=cleaned_agent.get('id', ''), agent_name=agent_name, agent_display_name=cleaned_agent.get('display_name', ''), scope='global') log_event( f"Agent {agent_name} edited", extra={ @@ -660,6 +685,7 @@ def delete_agent(agent_name): if not success: return jsonify({'error': 'Failed to delete agent.'}), 500 + log_agent_deletion(user_id=str(get_current_user_id()), agent_id=agent_to_delete.get('id', ''), agent_name=agent_name, scope='global') log_event("Agent deleted", extra={"action": "delete", "agent_name": agent_name, "user": str(get_current_user_id())}) # --- HOT RELOAD TRIGGER --- setattr(builtins, "kernel_reload_needed", True) diff --git a/application/single_app/route_backend_chats.py b/application/single_app/route_backend_chats.py index e452fed4..f923dd1b 100644 --- a/application/single_app/route_backend_chats.py +++ b/application/single_app/route_backend_chats.py @@ -28,6 +28,7 @@ from functions_activity_logging import log_chat_activity, log_conversation_creation, log_token_usage from flask import current_app from swagger_wrapper import swagger_route, get_auth_security +from functions_thoughts import ThoughtTracker def get_kernel(): @@ -39,6 +40,185 @@ def get_kernel_agents(): log_event(f"[SKChat] get_kernel_agents - g.kernel_agents: {type(g_agents)} ({len(g_agents) if g_agents else 0} agents), builtins.kernel_agents: {type(builtins_agents)} ({len(builtins_agents) if builtins_agents else 0} agents)", level=logging.INFO) return g_agents or builtins_agents +async def run_tabular_sk_analysis(user_question, tabular_filenames, user_id, + conversation_id, gpt_model, settings, + source_hint="workspace", group_id=None, + public_workspace_id=None): + """Run lightweight SK with TabularProcessingPlugin to analyze tabular data. + + Creates a temporary Kernel with only the TabularProcessingPlugin, uses the + same chat model as the user's session, and returns computed analysis results. + Returns None on failure for graceful degradation. + """ + from semantic_kernel import Kernel as SKKernel + from semantic_kernel.connectors.ai.open_ai import AzureChatCompletion + from semantic_kernel.connectors.ai.function_choice_behavior import FunctionChoiceBehavior + from semantic_kernel.connectors.ai.open_ai.prompt_execution_settings.azure_chat_prompt_execution_settings import AzureChatPromptExecutionSettings + from semantic_kernel.contents.chat_history import ChatHistory as SKChatHistory + from semantic_kernel_plugins.tabular_processing_plugin import TabularProcessingPlugin + + try: + log_event(f"[Tabular SK Analysis] Starting analysis for files: {tabular_filenames}", level=logging.INFO) + + # 1. Create lightweight kernel with only tabular plugin + kernel = SKKernel() + tabular_plugin = TabularProcessingPlugin() + kernel.add_plugin(tabular_plugin, plugin_name="tabular_processing") + + # 2. Create chat service using same config as main chat + enable_gpt_apim = settings.get('enable_gpt_apim', False) + if enable_gpt_apim: + chat_service = AzureChatCompletion( + service_id="tabular-analysis", + deployment_name=gpt_model, + endpoint=settings.get('azure_apim_gpt_endpoint'), + api_key=settings.get('azure_apim_gpt_subscription_key'), + api_version=settings.get('azure_apim_gpt_api_version'), + ) + else: + auth_type = settings.get('azure_openai_gpt_authentication_type') + if auth_type == 'managed_identity': + token_provider = get_bearer_token_provider(DefaultAzureCredential(), cognitive_services_scope) + chat_service = AzureChatCompletion( + service_id="tabular-analysis", + deployment_name=gpt_model, + endpoint=settings.get('azure_openai_gpt_endpoint'), + api_version=settings.get('azure_openai_gpt_api_version'), + ad_token_provider=token_provider, + ) + else: + chat_service = AzureChatCompletion( + service_id="tabular-analysis", + deployment_name=gpt_model, + endpoint=settings.get('azure_openai_gpt_endpoint'), + api_key=settings.get('azure_openai_gpt_key'), + api_version=settings.get('azure_openai_gpt_api_version'), + ) + kernel.add_service(chat_service) + + # 3. Pre-dispatch: load file schemas to eliminate discovery LLM rounds + source_context = f"source='{source_hint}'" + if group_id: + source_context += f", group_id='{group_id}'" + if public_workspace_id: + source_context += f", public_workspace_id='{public_workspace_id}'" + + schema_parts = [] + for fname in tabular_filenames: + try: + container, blob_path = tabular_plugin._resolve_blob_location_with_fallback( + user_id, conversation_id, fname, source_hint, + group_id=group_id, public_workspace_id=public_workspace_id + ) + df = tabular_plugin._read_tabular_blob_to_dataframe(container, blob_path) + df_numeric = tabular_plugin._try_numeric_conversion(df.copy()) + schema_info = { + "filename": fname, + "row_count": len(df), + "columns": list(df.columns), + "dtypes": {col: str(dtype) for col, dtype in df_numeric.dtypes.items()}, + "preview": df.head(3).to_dict(orient='records') + } + schema_parts.append(json.dumps(schema_info, indent=2, default=str)) + log_event(f"[Tabular SK Analysis] Pre-loaded schema for {fname} ({len(df)} rows)", level=logging.DEBUG) + except Exception as e: + log_event(f"[Tabular SK Analysis] Failed to pre-load schema for {fname}: {e}", level=logging.WARNING) + schema_parts.append(json.dumps({"filename": fname, "error": f"Could not pre-load: {str(e)}"})) + + schema_context = "\n".join(schema_parts) + + # 4. Build chat history with pre-loaded schemas + chat_history = SKChatHistory() + chat_history.add_system_message( + "You are a data analyst. Use the tabular_processing plugin functions to " + "analyze the data and answer the user's question.\n\n" + f"FILE SCHEMAS (pre-loaded — do NOT call list_tabular_files or describe_tabular_file):\n" + f"{schema_context}\n\n" + "IMPORTANT: Batch multiple independent function calls in a SINGLE response. " + "For example, call multiple aggregate_column or group_by_aggregate functions " + "at once rather than one at a time.\n\n" + "Return the computed results clearly." + ) + + chat_history.add_user_message( + f"Analyze the tabular data to answer: {user_question}\n" + f"Use user_id='{user_id}', conversation_id='{conversation_id}', {source_context}." + ) + + # 5. Execute with auto function calling + execution_settings = AzureChatPromptExecutionSettings( + service_id="tabular-analysis", + function_choice_behavior=FunctionChoiceBehavior.Auto( + maximum_auto_invoke_attempts=5 + ), + ) + + result = await chat_service.get_chat_message_contents( + chat_history, execution_settings, kernel=kernel + ) + + if result and result[0].content: + analysis = result[0].content + # Cap at 20k characters to stay within token budget + if len(analysis) > 20000: + analysis = analysis[:20000] + "\n[Analysis truncated]" + log_event(f"[Tabular SK Analysis] Analysis complete, {len(analysis)} chars", level=logging.INFO) + return analysis + log_event("[Tabular SK Analysis] No content in SK response", level=logging.WARNING) + return None + + except Exception as e: + log_event(f"[Tabular SK Analysis] Error: {e}", level=logging.WARNING, exceptionTraceback=True) + return None + +def collect_tabular_sk_citations(user_id, conversation_id): + """Collect plugin invocations from the tabular SK analysis and convert to citation format.""" + from semantic_kernel_plugins.plugin_invocation_logger import get_plugin_logger + + plugin_logger = get_plugin_logger() + plugin_invocations = plugin_logger.get_invocations_for_conversation(user_id, conversation_id) + + if not plugin_invocations: + return [] + + def make_json_serializable(obj): + if obj is None: + return None + elif isinstance(obj, (str, int, float, bool)): + return obj + elif isinstance(obj, dict): + return {str(k): make_json_serializable(v) for k, v in obj.items()} + elif isinstance(obj, (list, tuple)): + return [make_json_serializable(item) for item in obj] + else: + return str(obj) + + citations = [] + for inv in plugin_invocations: + timestamp_str = None + if inv.timestamp: + if hasattr(inv.timestamp, 'isoformat'): + timestamp_str = inv.timestamp.isoformat() + else: + timestamp_str = str(inv.timestamp) + + citation = { + 'tool_name': f"{inv.plugin_name}.{inv.function_name}", + 'function_name': inv.function_name, + 'plugin_name': inv.plugin_name, + 'function_arguments': make_json_serializable(inv.parameters), + 'function_result': make_json_serializable(inv.result), + 'duration_ms': inv.duration_ms, + 'timestamp': timestamp_str, + 'success': inv.success, + 'error_message': make_json_serializable(inv.error_message), + 'user_id': inv.user_id + } + citations.append(citation) + + log_event(f"[Tabular SK Citations] Collected {len(citations)} tool execution citations", level=logging.INFO) + return citations + def register_route_backend_chats(app): @app.route('/api/chat', methods=['POST']) @swagger_route(security=get_auth_security()) @@ -46,6 +226,7 @@ def register_route_backend_chats(app): @user_required def chat_api(): try: + request_start_time = time.time() settings = get_settings() data = request.get_json() user_id = get_current_user_id() @@ -668,6 +849,18 @@ def result_requires_message_reload(result: Any) -> bool: conversation_item['last_updated'] = datetime.utcnow().isoformat() cosmos_conversations_container.upsert_item(conversation_item) # Update timestamp and potentially title + + # Generate assistant_message_id early for thought tracking + assistant_message_id = f"{conversation_id}_assistant_{int(time.time())}_{random.randint(1000,9999)}" + + # Initialize thought tracker + thought_tracker = ThoughtTracker( + conversation_id=conversation_id, + message_id=assistant_message_id, + thread_id=current_user_thread_id, + user_id=user_id + ) + # region 3 - Content Safety # --------------------------------------------------------------------- # 3) Check Content Safety (but DO NOT return 403). @@ -679,6 +872,7 @@ def result_requires_message_reload(result: Any) -> bool: blocklist_matches = [] if settings.get('enable_content_safety') and "content_safety_client" in CLIENTS: + thought_tracker.add_thought('content_safety', 'Checking content safety...') try: content_safety_client = CLIENTS["content_safety_client"] request_obj = AnalyzeTextOptions(text=user_message) @@ -836,6 +1030,7 @@ def result_requires_message_reload(result: Any) -> bool: # Perform the search + thought_tracker.add_thought('search', f"Searching {document_scope or 'personal'} workspace documents for '{(search_query or user_message)[:50]}'") try: # Prepare search arguments # Set default and maximum values for top_n @@ -899,6 +1094,8 @@ def result_requires_message_reload(result: Any) -> bool: }), 500 if search_results: + unique_doc_names = set(doc.get('file_name', 'Unknown') for doc in search_results) + thought_tracker.add_thought('search', f"Found {len(search_results)} results from {len(unique_doc_names)} documents") retrieved_texts = [] combined_documents = [] classifications_found = set(conversation_item.get('classification', [])) # Load existing @@ -953,6 +1150,70 @@ def result_requires_message_reload(result: Any) -> bool: 'documents': combined_documents # Keep track of docs used }) + # Auto-detect tabular files in search results and prompt the LLM to use the plugin + if settings.get('enable_tabular_processing_plugin', False) and settings.get('enable_enhanced_citations', False): + tabular_files_in_results = set() + for source_doc in combined_documents: + fname = source_doc.get('file_name', '') + if fname and any(fname.lower().endswith(ext) for ext in TABULAR_EXTENSIONS): + tabular_files_in_results.add(fname) + + if tabular_files_in_results: + # Determine source based on document_scope, not just active IDs + if document_scope == 'group' and active_group_id: + tabular_source_hint = "group" + elif document_scope == 'public' and active_public_workspace_id: + tabular_source_hint = "public" + else: + tabular_source_hint = "workspace" + + tabular_filenames_str = ", ".join(tabular_files_in_results) + + # Run SK tabular analysis to pre-compute results + tabular_analysis = asyncio.run(run_tabular_sk_analysis( + user_question=user_message, + tabular_filenames=tabular_files_in_results, + user_id=user_id, + conversation_id=conversation_id, + gpt_model=gpt_model, + settings=settings, + source_hint=tabular_source_hint, + group_id=active_group_id if tabular_source_hint == "group" else None, + public_workspace_id=active_public_workspace_id if tabular_source_hint == "public" else None, + )) + + if tabular_analysis: + # Inject pre-computed analysis results as context + tabular_system_msg = ( + f"The following analysis was computed from the tabular file(s) " + f"{tabular_filenames_str} using data analysis functions:\n\n" + f"{tabular_analysis}\n\n" + f"Use these computed results to answer the user's question accurately." + ) + else: + # Fallback: instruct LLM to use plugin functions (for agent mode) + tabular_system_msg = ( + f"IMPORTANT: The search results include data from tabular file(s): {tabular_filenames_str}. " + f"The search results contain only a schema summary (column names and a few sample rows), NOT the full data. " + f"You MUST use the tabular_processing plugin functions to answer ANY question about these files. " + f"Do NOT attempt to answer using the schema summary alone — it is incomplete. " + f"Available functions: describe_tabular_file, aggregate_column, filter_rows, query_tabular_data, group_by_aggregate. " + f"Use source='{tabular_source_hint}'" + + (f" and group_id='{active_group_id}'" if tabular_source_hint == "group" else "") + + (f" and public_workspace_id='{active_public_workspace_id}'" if tabular_source_hint == "public" else "") + + "." + ) + system_messages_for_augmentation.append({ + 'role': 'system', + 'content': tabular_system_msg + }) + + # Collect tool execution citations from SK tabular analysis + if tabular_analysis: + tabular_sk_citations = collect_tabular_sk_citations(user_id, conversation_id) + if tabular_sk_citations: + agent_citations_list.extend(tabular_sk_citations) + # Loop through each source document/chunk used for this message for source_doc in combined_documents: # 4. Create a citation dictionary, selecting the desired fields @@ -1138,8 +1399,8 @@ def result_requires_message_reload(result: Any) -> bool: """ # Update the system message with enhanced content and updated documents array if system_messages_for_augmentation: - system_messages_for_augmentation[-1]['content'] = system_prompt_search - system_messages_for_augmentation[-1]['documents'] = combined_documents + system_messages_for_augmentation[0]['content'] = system_prompt_search + system_messages_for_augmentation[0]['documents'] = combined_documents # --- END NEW METADATA CITATIONS --- # Update conversation classifications if new ones were found @@ -1489,6 +1750,7 @@ def result_requires_message_reload(result: Any) -> bool: }), status_code if web_search_enabled: + thought_tracker.add_thought('web_search', f"Searching the web for '{(search_query or user_message)[:50]}'") perform_web_search( settings=settings, conversation_id=conversation_id, @@ -1504,7 +1766,9 @@ def result_requires_message_reload(result: Any) -> bool: agent_citations_list=agent_citations_list, web_search_citations_list=web_search_citations_list, ) - + if web_search_citations_list: + thought_tracker.add_thought('web_search', f"Got {len(web_search_citations_list)} web search results") + # region 5 - FINAL conversation history preparation # --------------------------------------------------------------------- # 5) Prepare FINAL conversation history for GPT (including summarization) @@ -1650,6 +1914,7 @@ def result_requires_message_reload(result: Any) -> bool: allowed_roles_in_history = ['user', 'assistant'] # Add 'system' if you PERSIST general system messages not related to augmentation max_file_content_length_in_history = 50000 # Increased limit for all file content in history max_tabular_content_length_in_history = 50000 # Same limit for tabular data consistency + chat_tabular_files = set() # Track tabular files uploaded directly to chat for message in recent_messages: role = message.get('role') @@ -1685,25 +1950,38 @@ def result_requires_message_reload(result: Any) -> bool: filename = message.get('filename', 'uploaded_file') file_content = message.get('file_content', '') # Assuming file content is stored is_table = message.get('is_table', False) - - # Use higher limit for tabular data that needs complete analysis - content_limit = max_tabular_content_length_in_history if is_table else max_file_content_length_in_history - - display_content = file_content[:content_limit] - if len(file_content) > content_limit: - display_content += "..." - - # Enhanced message for tabular data - if is_table: + file_content_source = message.get('file_content_source', '') + + # Tabular files stored in blob (enhanced citations enabled) - reference plugin + if is_table and file_content_source == 'blob': + chat_tabular_files.add(filename) # Track for mini SK analysis conversation_history_for_api.append({ - 'role': 'system', # Represent file as system info - 'content': f"[User uploaded a tabular data file named '{filename}'. This is CSV format data for analysis:\n{display_content}]\nThis is complete tabular data in CSV format. You can perform calculations, analysis, and data operations on this dataset." + 'role': 'system', + 'content': f"[User uploaded a tabular data file named '{filename}'. " + f"The file is stored in blob storage and available for analysis. " + f"Use the tabular_processing plugin functions (list_tabular_files, describe_tabular_file, " + f"aggregate_column, filter_rows, query_tabular_data, group_by_aggregate) to analyze this data. " + f"The file source is 'chat'.]" }) else: - conversation_history_for_api.append({ - 'role': 'system', # Represent file as system info - 'content': f"[User uploaded a file named '{filename}'. Content preview:\n{display_content}]\nUse this file context if relevant." - }) + # Use higher limit for tabular data that needs complete analysis + content_limit = max_tabular_content_length_in_history if is_table else max_file_content_length_in_history + + display_content = file_content[:content_limit] + if len(file_content) > content_limit: + display_content += "..." + + # Enhanced message for tabular data + if is_table: + conversation_history_for_api.append({ + 'role': 'system', # Represent file as system info + 'content': f"[User uploaded a tabular data file named '{filename}'. This is CSV format data for analysis:\n{display_content}]\nThis is complete tabular data in CSV format. You can perform calculations, analysis, and data operations on this dataset." + }) + else: + conversation_history_for_api.append({ + 'role': 'system', # Represent file as system info + 'content': f"[User uploaded a file named '{filename}'. Content preview:\n{display_content}]\nUse this file context if relevant." + }) elif role == 'image': # Handle image uploads with extracted text and vision analysis filename = message.get('filename', 'uploaded_image') is_user_upload = message.get('metadata', {}).get('is_user_upload', False) @@ -1767,6 +2045,45 @@ def result_requires_message_reload(result: Any) -> bool: # Ignored roles: 'safety', 'blocked', 'system' (if they are only for augmentation/summary) + # --- Mini SK analysis for tabular files uploaded directly to chat --- + if chat_tabular_files and settings.get('enable_tabular_processing_plugin', False) and settings.get('enable_enhanced_citations', False): + chat_tabular_filenames_str = ", ".join(chat_tabular_files) + log_event( + f"[Chat Tabular SK] Detected {len(chat_tabular_files)} tabular file(s) uploaded to chat: {chat_tabular_filenames_str}", + level=logging.INFO + ) + + chat_tabular_analysis = asyncio.run(run_tabular_sk_analysis( + user_question=user_message, + tabular_filenames=chat_tabular_files, + user_id=user_id, + conversation_id=conversation_id, + gpt_model=gpt_model, + settings=settings, + source_hint="chat", + )) + + if chat_tabular_analysis: + # Inject pre-computed analysis results as context + conversation_history_for_api.append({ + 'role': 'system', + 'content': ( + f"The following analysis was computed from the chat-uploaded tabular file(s) " + f"{chat_tabular_filenames_str} using data analysis functions:\n\n" + f"{chat_tabular_analysis}\n\n" + f"Use these computed results to answer the user's question accurately." + ) + }) + + # Collect tool execution citations from SK tabular analysis + chat_tabular_sk_citations = collect_tabular_sk_citations(user_id, conversation_id) + if chat_tabular_sk_citations: + agent_citations_list.extend(chat_tabular_sk_citations) + + debug_print(f"[Chat Tabular SK] Analysis injected, {len(chat_tabular_analysis)} chars") + else: + debug_print("[Chat Tabular SK] Analysis returned None, relying on existing file context messages") + # Ensure the very last message is the current user's message (it should be if fetched correctly) if not conversation_history_for_api or conversation_history_for_api[-1]['role'] != 'user': debug_print("Warning: Last message in history is not the user's current message. Appending.") @@ -2110,6 +2427,27 @@ def orchestrator_error(e): }) if selected_agent: + agent_deployment_name = getattr(selected_agent, 'deployment_name', None) or gpt_model + thought_tracker.add_thought('agent_tool_call', f"Sending to agent '{getattr(selected_agent, 'display_name', getattr(selected_agent, 'name', 'unknown'))}'") + thought_tracker.add_thought('generation', f"Sending to '{agent_deployment_name}'") + + # Register callback to write plugin thoughts to Cosmos in real-time + callback_key = f"{user_id}:{conversation_id}" + plugin_logger = get_plugin_logger() + + def on_plugin_invocation(inv): + duration_str = f" ({int(inv.duration_ms)}ms)" if inv.duration_ms else "" + tool_name = f"{inv.plugin_name}.{inv.function_name}" + thought_tracker.add_thought( + 'agent_tool_call', + f"Agent called {tool_name}{duration_str}", + detail=f"success={inv.success}" + ) + + plugin_logger.register_callback(callback_key, on_plugin_invocation) + + agent_invoke_start_time = time.time() + def invoke_selected_agent(): return asyncio.run(run_sk_call( selected_agent.invoke, @@ -2120,16 +2458,22 @@ def agent_success(result): msg = str(result) notice = None agent_used = getattr(selected_agent, 'name', 'All Plugins') - + + # Emit responded thought with total duration from user message + agent_total_duration_s = round(time.time() - request_start_time, 1) + thought_tracker.add_thought('generation', f"'{agent_deployment_name}' responded ({agent_total_duration_s}s from initial message)") + + # Deregister real-time thought callback + plugin_logger.deregister_callbacks(callback_key) + # Get the actual model deployment used by the agent actual_model_deployment = getattr(selected_agent, 'deployment_name', None) or agent_used debug_print(f"Agent '{agent_used}' using deployment: {actual_model_deployment}") - + # Extract detailed plugin invocations for enhanced agent citations - plugin_logger = get_plugin_logger() - # CRITICAL FIX: Filter by user_id and conversation_id to prevent cross-conversation contamination + # (Thoughts already written to Cosmos in real-time by callback) plugin_invocations = plugin_logger.get_invocations_for_conversation(user_id, conversation_id) - + # Convert plugin invocations to citation format with detailed information detailed_citations = [] for inv in plugin_invocations: @@ -2204,6 +2548,7 @@ def make_json_serializable(obj): ) return (msg, actual_model_deployment, "agent", notice) def agent_error(e): + plugin_logger.deregister_callbacks(callback_key) debug_print(f"Error during Semantic Kernel Agent invocation: {str(e)}") log_event( f"Error during Semantic Kernel Agent invocation: {str(e)}", @@ -2244,8 +2589,21 @@ def foundry_agent_success(result): or agent_used ) + # Emit responded thought with total duration from user message + foundry_total_duration_s = round(time.time() - request_start_time, 1) + thought_tracker.add_thought('generation', f"'{actual_model_deployment}' responded ({foundry_total_duration_s}s from initial message)") + + # Deregister real-time thought callback + plugin_logger.deregister_callbacks(callback_key) + foundry_citations = getattr(selected_agent, 'last_run_citations', []) or [] if foundry_citations: + # Emit thoughts for Foundry agent citations/tool calls + for citation in foundry_citations: + thought_tracker.add_thought( + 'agent_tool_call', + f"Agent retrieved citation from Azure AI Foundry" + ) for citation in foundry_citations: try: serializable = json.loads(json.dumps(citation, default=str)) @@ -2282,6 +2640,7 @@ def foundry_agent_success(result): return (msg, actual_model_deployment, 'agent', notice) def foundry_agent_error(e): + plugin_logger.deregister_callbacks(callback_key) log_event( f"Error during Azure AI Foundry agent invocation: {str(e)}", extra={ @@ -2360,6 +2719,7 @@ def kernel_error(e): 'on_error': kernel_error }) + thought_tracker.add_thought('generation', f"Sending to '{gpt_model}'") def invoke_gpt_fallback(): if not conversation_history_for_api: raise Exception('Cannot generate response: No conversation history available.') @@ -2443,12 +2803,18 @@ def gpt_error(e): }) fallback_result = try_fallback_chain(fallback_steps) + # Unpack result - handle both 4-tuple (SK) and 5-tuple (GPT with tokens) if len(fallback_result) == 5: ai_message, final_model_used, chat_mode, kernel_fallback_notice, token_usage_data = fallback_result else: ai_message, final_model_used, chat_mode, kernel_fallback_notice = fallback_result token_usage_data = None + + # Emit responded thought for non-agent paths (agent paths emit their own inside callbacks) + if not selected_agent: + gpt_total_duration_s = round(time.time() - request_start_time, 1) + thought_tracker.add_thought('generation', f"'{final_model_used}' responded ({gpt_total_duration_s}s from initial message)") # Collect token usage from Semantic Kernel services if available if kernel and not token_usage_data: @@ -2510,8 +2876,8 @@ def gpt_error(e): if hasattr(selected_agent, 'name'): agent_name = selected_agent.name - assistant_message_id = f"{conversation_id}_assistant_{int(time.time())}_{random.randint(1000,9999)}" - + # assistant_message_id was generated earlier for thought tracking + # Get user_info and thread_id from the user message for ownership tracking and threading user_info_for_assistant = None user_thread_id = None @@ -2672,7 +3038,8 @@ def gpt_error(e): 'web_search_citations': web_search_citations_list, 'agent_citations': agent_citations_list, 'reload_messages': reload_messages_required, - 'kernel_fallback_notice': kernel_fallback_notice + 'kernel_fallback_notice': kernel_fallback_notice, + 'thoughts_enabled': thought_tracker.enabled }), 200 except Exception as e: @@ -2713,6 +3080,7 @@ def chat_stream_api(): data = request.get_json() user_id = get_current_user_id() settings = get_settings() + request_start_time = time.time() except Exception as e: return jsonify({'error': f'Failed to parse request: {str(e)}'}), 400 @@ -3111,10 +3479,122 @@ def generate(): conversation_item['last_updated'] = datetime.utcnow().isoformat() cosmos_conversations_container.upsert_item(conversation_item) - + + # Generate assistant_message_id early for thought tracking + assistant_message_id = f"{conversation_id}_assistant_{int(time.time())}_{random.randint(1000,9999)}" + + # Initialize thought tracker for streaming path + thought_tracker = ThoughtTracker( + conversation_id=conversation_id, + message_id=assistant_message_id, + thread_id=current_user_thread_id, + user_id=user_id + ) + + def emit_thought(step_type, content, detail=None): + """Add a thought to Cosmos and return an SSE event string.""" + thought_tracker.add_thought(step_type, content, detail) + return f"data: {json.dumps({'type': 'thought', 'step_index': thought_tracker.current_index - 1, 'step_type': step_type, 'content': content})}\n\n" + + # Content Safety check (matching non-streaming path) + blocked = False + if settings.get('enable_content_safety') and "content_safety_client" in CLIENTS: + yield emit_thought('content_safety', 'Checking content safety...') + try: + content_safety_client = CLIENTS["content_safety_client"] + request_obj = AnalyzeTextOptions(text=user_message) + cs_response = content_safety_client.analyze_text(request_obj) + + max_severity = 0 + triggered_categories = [] + blocklist_matches = [] + block_reasons = [] + + for cat_result in cs_response.categories_analysis: + triggered_categories.append({ + "category": cat_result.category, + "severity": cat_result.severity + }) + if cat_result.severity > max_severity: + max_severity = cat_result.severity + + if cs_response.blocklists_match: + for match in cs_response.blocklists_match: + blocklist_matches.append({ + "blocklistName": match.blocklist_name, + "blocklistItemId": match.blocklist_item_id, + "blocklistItemText": match.blocklist_item_text + }) + + if max_severity >= 4: + blocked = True + block_reasons.append("Max severity >= 4") + if len(blocklist_matches) > 0: + blocked = True + block_reasons.append("Blocklist match") + + if blocked: + # Upsert to safety container + safety_item = { + 'id': str(uuid.uuid4()), + 'user_id': user_id, + 'conversation_id': conversation_id, + 'message': user_message, + 'triggered_categories': triggered_categories, + 'blocklist_matches': blocklist_matches, + 'timestamp': datetime.utcnow().isoformat(), + 'reason': "; ".join(block_reasons), + 'metadata': {} + } + cosmos_safety_container.upsert_item(safety_item) + + # Build blocked message + blocked_msg_content = ( + "Your message was blocked by Content Safety.\n\n" + f"**Reason**: {', '.join(block_reasons)}\n" + "Triggered categories:\n" + ) + for cat in triggered_categories: + blocked_msg_content += ( + f" - {cat['category']} (severity={cat['severity']})\n" + ) + if blocklist_matches: + blocked_msg_content += ( + "\nBlocklist Matches:\n" + + "\n".join([f" - {m['blocklistItemText']} (in {m['blocklistName']})" + for m in blocklist_matches]) + ) + + # Insert safety message + safety_message_id = f"{conversation_id}_safety_{int(time.time())}_{random.randint(1000,9999)}" + safety_doc = { + 'id': safety_message_id, + 'conversation_id': conversation_id, + 'role': 'safety', + 'content': blocked_msg_content.strip(), + 'timestamp': datetime.utcnow().isoformat(), + 'model_deployment_name': None, + 'metadata': {}, + } + cosmos_messages_container.upsert_item(safety_doc) + + conversation_item['last_updated'] = datetime.utcnow().isoformat() + cosmos_conversations_container.upsert_item(conversation_item) + + # Stream the blocked response and stop + yield f"data: {json.dumps({'content': blocked_msg_content.strip(), 'blocked': True})}\n\n" + yield "data: [DONE]\n\n" + return + + except HttpResponseError as e: + debug_print(f"[Content Safety Error - Streaming] {e}") + except Exception as ex: + debug_print(f"[Content Safety - Streaming] Unexpected error: {ex}") + # Hybrid search (if enabled) combined_documents = [] if hybrid_search_enabled: + yield emit_thought('search', f"Searching {document_scope or 'personal'} workspace documents for '{(search_query or user_message)[:50]}'") try: search_args = { "query": search_query, @@ -3144,8 +3624,10 @@ def generate(): search_results = hybrid_search(**search_args) except Exception as e: debug_print(f"Error during hybrid search: {e}") - + if search_results: + unique_doc_names_stream = set(doc.get('file_name', 'Unknown') for doc in search_results) + yield emit_thought('search', f"Found {len(search_results)} results from {len(unique_doc_names_stream)} documents") retrieved_texts = [] for doc in search_results: @@ -3319,11 +3801,60 @@ def generate(): 'content': system_prompt_search, 'documents': combined_documents }) - + + # Auto-detect tabular files in search results and run SK analysis + if settings.get('enable_tabular_processing_plugin', False) and settings.get('enable_enhanced_citations', False): + tabular_files_in_results = set() + for source_doc in combined_documents: + fname = source_doc.get('file_name', '') + if fname and any(fname.lower().endswith(ext) for ext in TABULAR_EXTENSIONS): + tabular_files_in_results.add(fname) + + if tabular_files_in_results: + # Determine source based on document_scope, not just active IDs + if document_scope == 'group' and active_group_id: + tabular_source_hint = "group" + elif document_scope == 'public' and active_public_workspace_id: + tabular_source_hint = "public" + else: + tabular_source_hint = "workspace" + + tabular_filenames_str = ", ".join(tabular_files_in_results) + + # Run SK tabular analysis to pre-compute results + tabular_analysis = asyncio.run(run_tabular_sk_analysis( + user_question=user_message, + tabular_filenames=tabular_files_in_results, + user_id=user_id, + conversation_id=conversation_id, + gpt_model=gpt_model, + settings=settings, + source_hint=tabular_source_hint, + group_id=active_group_id if tabular_source_hint == "group" else None, + public_workspace_id=active_public_workspace_id if tabular_source_hint == "public" else None, + )) + + if tabular_analysis: + system_messages_for_augmentation.append({ + 'role': 'system', + 'content': ( + f"The following analysis was computed from the tabular file(s) " + f"{tabular_filenames_str} using data analysis functions:\n\n" + f"{tabular_analysis}\n\n" + f"Use these computed results to answer the user's question accurately." + ) + }) + + # Collect tool execution citations from SK tabular analysis + tabular_sk_citations = collect_tabular_sk_citations(user_id, conversation_id) + if tabular_sk_citations: + agent_citations_list.extend(tabular_sk_citations) + # Reorder hybrid citations list in descending order based on page_number hybrid_citations_list.sort(key=lambda x: x.get('page_number', 0), reverse=True) if web_search_enabled: + yield emit_thought('web_search', f"Searching the web for '{(search_query or user_message)[:50]}'") perform_web_search( settings=settings, conversation_id=conversation_id, @@ -3339,6 +3870,8 @@ def generate(): agent_citations_list=agent_citations_list, web_search_citations_list=web_search_citations_list, ) + if web_search_citations_list: + yield emit_thought('web_search', f"Got {len(web_search_citations_list)} web search results") # Update message chat type message_chat_type = None @@ -3381,15 +3914,108 @@ def generate(): 'content': aug_msg['content'] }) - # Add recent messages + # Add recent messages (with file role handling) allowed_roles_in_history = ['user', 'assistant'] + max_file_content_length_in_history = 50000 + max_tabular_content_length_in_history = 50000 + chat_tabular_files = set() # Track tabular files uploaded directly to chat + for message in recent_messages: - if message.get('role') in allowed_roles_in_history: + role = message.get('role') + content = message.get('content', '') + + if role in allowed_roles_in_history: conversation_history_for_api.append({ - 'role': message['role'], - 'content': message.get('content', '') + 'role': role, + 'content': content }) - + elif role == 'file': + filename = message.get('filename', 'uploaded_file') + file_content = message.get('file_content', '') + is_table = message.get('is_table', False) + file_content_source = message.get('file_content_source', '') + + # Tabular files stored in blob - track for mini SK analysis + if is_table and file_content_source == 'blob': + chat_tabular_files.add(filename) + conversation_history_for_api.append({ + 'role': 'system', + 'content': ( + f"[User uploaded a tabular data file named '{filename}'. " + f"The file is stored in blob storage and available for analysis. " + f"Use the tabular_processing plugin functions (list_tabular_files, " + f"describe_tabular_file, aggregate_column, filter_rows, " + f"query_tabular_data, group_by_aggregate) to analyze this data. " + f"The file source is 'chat'.]" + ) + }) + else: + content_limit = ( + max_tabular_content_length_in_history if is_table + else max_file_content_length_in_history + ) + display_content = file_content[:content_limit] + if len(file_content) > content_limit: + display_content += "..." + + if is_table: + conversation_history_for_api.append({ + 'role': 'system', + 'content': ( + f"[User uploaded a tabular data file named '{filename}'. " + f"This is CSV format data for analysis:\n{display_content}]\n" + f"This is complete tabular data in CSV format. You can perform " + f"calculations, analysis, and data operations on this dataset." + ) + }) + else: + conversation_history_for_api.append({ + 'role': 'system', + 'content': ( + f"[User uploaded a file named '{filename}'. " + f"Content preview:\n{display_content}]\n" + f"Use this file context if relevant." + ) + }) + + # --- Mini SK analysis for tabular files uploaded directly to chat --- + if chat_tabular_files and settings.get('enable_tabular_processing_plugin', False) and settings.get('enable_enhanced_citations', False): + chat_tabular_filenames_str = ", ".join(chat_tabular_files) + log_event( + f"[Chat Tabular SK] Streaming: Detected {len(chat_tabular_files)} tabular file(s) uploaded to chat: {chat_tabular_filenames_str}", + level=logging.INFO + ) + + chat_tabular_analysis = asyncio.run(run_tabular_sk_analysis( + user_question=user_message, + tabular_filenames=chat_tabular_files, + user_id=user_id, + conversation_id=conversation_id, + gpt_model=gpt_model, + settings=settings, + source_hint="chat", + )) + + if chat_tabular_analysis: + conversation_history_for_api.append({ + 'role': 'system', + 'content': ( + f"The following analysis was computed from the chat-uploaded tabular file(s) " + f"{chat_tabular_filenames_str} using data analysis functions:\n\n" + f"{chat_tabular_analysis}\n\n" + f"Use these computed results to answer the user's question accurately." + ) + }) + + # Collect tool execution citations + chat_tabular_sk_citations = collect_tabular_sk_citations(user_id, conversation_id) + if chat_tabular_sk_citations: + agent_citations_list.extend(chat_tabular_sk_citations) + + debug_print(f"[Chat Tabular SK] Streaming: Analysis injected, {len(chat_tabular_analysis)} chars") + else: + debug_print("[Chat Tabular SK] Streaming: Analysis returned None, relying on existing file context") + except Exception as e: yield f"data: {json.dumps({'error': f'History error: {str(e)}'})}\n\n" return @@ -3472,7 +4098,7 @@ def generate(): # Stream the response accumulated_content = "" token_usage_data = None # Will be populated from final stream chunk - assistant_message_id = f"{conversation_id}_assistant_{int(time.time())}_{random.randint(1000,9999)}" + # assistant_message_id was generated earlier for thought tracking final_model_used = gpt_model # Default to gpt_model, will be overridden if agent is used # DEBUG: Check agent streaming decision @@ -3482,8 +4108,24 @@ def generate(): try: if use_agent_streaming and selected_agent: # Stream from agent using invoke_stream + yield emit_thought('agent_tool_call', f"Sending to agent '{agent_display_name_used or agent_name_used}'") + yield emit_thought('generation', f"Sending to '{actual_model_used}'") debug_print(f"--- Streaming from Agent: {agent_name_used} ---") - + + # Register callback to persist plugin thoughts to Cosmos in real-time + callback_key = f"{user_id}:{conversation_id}" + plugin_logger_cb = get_plugin_logger() + + def on_plugin_invocation_streaming(inv): + duration_str = f" ({int(inv.duration_ms)}ms)" if inv.duration_ms else "" + tool_name = f"{inv.plugin_name}.{inv.function_name}" + thought_tracker.add_thought( + 'agent_tool_call', + f"Agent called {tool_name}{duration_str}" + ) + + plugin_logger_cb.register_callback(callback_key, on_plugin_invocation_streaming) + # Import required classes from semantic_kernel.contents.chat_message_content import ChatMessageContent @@ -3497,6 +4139,8 @@ def generate(): for msg in conversation_history_for_api ] + agent_stream_start_time = time.time() + # Stream agent responses - collect chunks first then yield async def stream_agent_async(): """Collect all streaming chunks from agent""" @@ -3524,7 +4168,6 @@ async def stream_agent_async(): return chunks, usage_data # Execute async streaming - import asyncio try: # Try to get existing event loop loop = asyncio.get_event_loop() @@ -3539,36 +4182,53 @@ async def stream_agent_async(): try: # Run streaming and collect chunks and usage chunks, stream_usage = loop.run_until_complete(stream_agent_async()) - - # Yield chunks to frontend - for chunk_content in chunks: - accumulated_content += chunk_content - yield f"data: {json.dumps({'content': chunk_content})}\n\n" - - # Try to capture token usage from stream metadata - if stream_usage: - # stream_usage is a CompletionUsage object, not a dict - prompt_tokens = getattr(stream_usage, 'prompt_tokens', 0) - completion_tokens = getattr(stream_usage, 'completion_tokens', 0) - total_tokens = getattr(stream_usage, 'total_tokens', None) - - # Calculate total if not provided - if total_tokens is None or total_tokens == 0: - total_tokens = prompt_tokens + completion_tokens - - token_usage_data = { - 'prompt_tokens': prompt_tokens, - 'completion_tokens': completion_tokens, - 'total_tokens': total_tokens, - 'captured_at': datetime.utcnow().isoformat() - } - debug_print(f"[Agent Streaming Tokens] From metadata - prompt: {prompt_tokens}, completion: {completion_tokens}, total: {total_tokens}") except Exception as stream_error: + plugin_logger_cb.deregister_callbacks(callback_key) debug_print(f"❌ Agent streaming error: {stream_error}") import traceback traceback.print_exc() yield f"data: {json.dumps({'error': f'Agent streaming failed: {str(stream_error)}'})}\n\n" return + + # Emit responded thought with total duration from user message + agent_stream_total_duration_s = round(time.time() - request_start_time, 1) + yield emit_thought('generation', f"'{actual_model_used}' responded ({agent_stream_total_duration_s}s from initial message)") + + # Deregister callback (agent completed successfully) + plugin_logger_cb.deregister_callbacks(callback_key) + + # Emit SSE-only events for streaming UI (Cosmos writes already done by callback) + agent_plugin_invocations = plugin_logger_cb.get_invocations_for_conversation(user_id, conversation_id) + for inv in agent_plugin_invocations: + duration_str = f" ({int(inv.duration_ms)}ms)" if inv.duration_ms else "" + tool_name = f"{inv.plugin_name}.{inv.function_name}" + content = f"Agent called {tool_name}{duration_str}" + yield f"data: {json.dumps({'type': 'thought', 'step_index': thought_tracker.current_index, 'step_type': 'agent_tool_call', 'content': content})}\n\n" + thought_tracker.current_index += 1 + + # Yield chunks to frontend + for chunk_content in chunks: + accumulated_content += chunk_content + yield f"data: {json.dumps({'content': chunk_content})}\n\n" + + # Try to capture token usage from stream metadata + if stream_usage: + # stream_usage is a CompletionUsage object, not a dict + prompt_tokens = getattr(stream_usage, 'prompt_tokens', 0) + completion_tokens = getattr(stream_usage, 'completion_tokens', 0) + total_tokens = getattr(stream_usage, 'total_tokens', None) + + # Calculate total if not provided + if total_tokens is None or total_tokens == 0: + total_tokens = prompt_tokens + completion_tokens + + token_usage_data = { + 'prompt_tokens': prompt_tokens, + 'completion_tokens': completion_tokens, + 'total_tokens': total_tokens, + 'captured_at': datetime.utcnow().isoformat() + } + debug_print(f"[Agent Streaming Tokens] From metadata - prompt: {prompt_tokens}, completion: {completion_tokens}, total: {total_tokens}") # Collect token usage from kernel services if not captured from stream if not token_usage_data: @@ -3650,6 +4310,7 @@ def make_json_serializable(obj): else: # Stream from regular GPT model (non-agent) + yield emit_thought('generation', f"Sending to '{gpt_model}'") debug_print(f"--- Streaming from GPT ({gpt_model}) ---") # Prepare stream parameters @@ -3700,6 +4361,10 @@ def make_json_serializable(obj): 'captured_at': datetime.utcnow().isoformat() } debug_print(f"[Streaming Tokens] Captured usage - prompt: {chunk.usage.prompt_tokens}, completion: {chunk.usage.completion_tokens}, total: {chunk.usage.total_tokens}") + + # Emit responded thought for regular LLM streaming + gpt_stream_total_duration_s = round(time.time() - request_start_time, 1) + yield emit_thought('generation', f"'{gpt_model}' responded ({gpt_stream_total_duration_s}s from initial message)") # Stream complete - save message and send final metadata # Get user thread info to maintain thread consistency @@ -3818,7 +4483,8 @@ def make_json_serializable(obj): 'agent_citations': agent_citations_list, 'agent_display_name': agent_display_name_used if use_agent_streaming else None, 'agent_name': agent_name_used if use_agent_streaming else None, - 'full_content': accumulated_content + 'full_content': accumulated_content, + 'thoughts_enabled': thought_tracker.enabled } yield f"data: {json.dumps(final_data)}\n\n" diff --git a/application/single_app/route_backend_conversation_export.py b/application/single_app/route_backend_conversation_export.py index aad750e4..abd6490f 100644 --- a/application/single_app/route_backend_conversation_export.py +++ b/application/single_app/route_backend_conversation_export.py @@ -2,17 +2,31 @@ import io import json +import markdown2 +import re +import tempfile import zipfile +from collections import Counter, defaultdict from datetime import datetime +from html import escape as _escape_html +from typing import Any, Dict, List, Optional from config import * +from flask import jsonify, make_response, request +from functions_appinsights import log_event from functions_authentication import * -from functions_settings import * -from flask import Response, jsonify, request, make_response +from functions_chat import sort_messages_by_thread +from functions_conversation_metadata import update_conversation_with_metadata from functions_debug import debug_print +from functions_settings import * +from functions_thoughts import get_thoughts_for_conversation from swagger_wrapper import swagger_route, get_auth_security +TRANSCRIPT_ROLES = {'user', 'assistant'} +SUMMARY_SOURCE_CHAR_LIMIT = 60000 + + def register_route_backend_conversation_export(app): """Register conversation export API routes.""" @@ -29,32 +43,36 @@ def api_export_conversations(): conversation_ids (list): List of conversation IDs to export. format (str): Export format — "json" or "markdown". packaging (str): Output packaging — "single" or "zip". + include_summary_intro (bool): Whether to generate a per-conversation intro. + summary_model_deployment (str): Optional model deployment for summary generation. """ user_id = get_current_user_id() if not user_id: return jsonify({'error': 'User not authenticated'}), 401 - data = request.get_json() + data = request.get_json(silent=True) if not data: return jsonify({'error': 'Request body is required'}), 400 conversation_ids = data.get('conversation_ids', []) - export_format = data.get('format', 'json').lower() - packaging = data.get('packaging', 'single').lower() + export_format = str(data.get('format', 'json')).lower() + packaging = str(data.get('packaging', 'single')).lower() + include_summary_intro = bool(data.get('include_summary_intro', False)) + summary_model_deployment = str(data.get('summary_model_deployment', '') or '').strip() if not conversation_ids or not isinstance(conversation_ids, list): return jsonify({'error': 'At least one conversation_id is required'}), 400 - if export_format not in ('json', 'markdown'): - return jsonify({'error': 'Format must be "json" or "markdown"'}), 400 + if export_format not in ('json', 'markdown', 'pdf'): + return jsonify({'error': 'Format must be "json", "markdown", or "pdf"'}), 400 if packaging not in ('single', 'zip'): return jsonify({'error': 'Packaging must be "single" or "zip"'}), 400 try: + settings = get_settings() exported = [] for conv_id in conversation_ids: - # Verify ownership and fetch conversation try: conversation = cosmos_conversations_container.read_item( item=conv_id, @@ -64,225 +82,1597 @@ def api_export_conversations(): debug_print(f"Export: conversation {conv_id} not found or access denied") continue - # Verify user owns this conversation if conversation.get('user_id') != user_id: debug_print(f"Export: user {user_id} does not own conversation {conv_id}") continue - # Fetch messages ordered by timestamp - message_query = f""" + message_query = """ SELECT * FROM c - WHERE c.conversation_id = '{conv_id}' + WHERE c.conversation_id = @conversation_id ORDER BY c.timestamp ASC """ messages = list(cosmos_messages_container.query_items( query=message_query, + parameters=[{'name': '@conversation_id', 'value': conv_id}], partition_key=conv_id )) - # Filter for active thread messages only - filtered_messages = [] - for msg in messages: - thread_info = msg.get('metadata', {}).get('thread_info', {}) - active = thread_info.get('active_thread') - if active is True or active is None or 'active_thread' not in thread_info: - filtered_messages.append(msg) - - exported.append({ - 'conversation': _sanitize_conversation(conversation), - 'messages': [_sanitize_message(m) for m in filtered_messages] - }) + exported.append( + _build_export_entry( + conversation=conversation, + raw_messages=messages, + user_id=user_id, + settings=settings, + include_summary_intro=include_summary_intro, + summary_model_deployment=summary_model_deployment + ) + ) if not exported: return jsonify({'error': 'No accessible conversations found'}), 404 - # Generate export content timestamp_str = datetime.utcnow().strftime('%Y%m%d_%H%M%S') if packaging == 'zip': return _build_zip_response(exported, export_format, timestamp_str) - else: - return _build_single_file_response(exported, export_format, timestamp_str) - - except Exception as e: - debug_print(f"Export error: {str(e)}") - return jsonify({'error': f'Export failed: {str(e)}'}), 500 - - def _sanitize_conversation(conv): - """Return only user-facing conversation fields.""" - return { - 'id': conv.get('id'), - 'title': conv.get('title', 'Untitled'), - 'last_updated': conv.get('last_updated', ''), - 'chat_type': conv.get('chat_type', 'personal'), - 'tags': conv.get('tags', []), - 'is_pinned': conv.get('is_pinned', False), - 'context': conv.get('context', []) - } - - def _sanitize_message(msg): - """Return only user-facing message fields.""" - result = { - 'role': msg.get('role', ''), - 'content': msg.get('content', ''), - 'timestamp': msg.get('timestamp', ''), - } - # Include citations if present - if msg.get('citations'): - result['citations'] = msg['citations'] - # Include context/tool info if present - if msg.get('context'): - result['context'] = msg['context'] - return result - - def _build_single_file_response(exported, export_format, timestamp_str): - """Build a single-file download response.""" - if export_format == 'json': - content = json.dumps(exported, indent=2, ensure_ascii=False, default=str) - filename = f"conversations_export_{timestamp_str}.json" - content_type = 'application/json; charset=utf-8' + + return _build_single_file_response(exported, export_format, timestamp_str) + + except Exception as exc: + debug_print(f"Export error: {str(exc)}") + log_event(f"Conversation export failed: {exc}", level="WARNING") + return jsonify({'error': f'Export failed: {str(exc)}'}), 500 + + +def _build_export_entry( + conversation: Dict[str, Any], + raw_messages: List[Dict[str, Any]], + user_id: str, + settings: Dict[str, Any], + include_summary_intro: bool = False, + summary_model_deployment: str = '' +) -> Dict[str, Any]: + filtered_messages = _filter_messages_for_export(raw_messages) + ordered_messages = sort_messages_by_thread(filtered_messages) + + raw_thoughts = get_thoughts_for_conversation(conversation.get('id'), user_id) + thoughts_by_message = defaultdict(list) + for thought in raw_thoughts: + thoughts_by_message[thought.get('message_id')].append(_sanitize_thought(thought)) + + exported_messages = [] + role_counts = Counter() + total_citation_counts = Counter({'document': 0, 'web': 0, 'agent_tool': 0, 'legacy': 0, 'total': 0}) + transcript_index = 0 + total_thoughts = 0 + + for sequence_index, message in enumerate(ordered_messages, start=1): + role = message.get('role', 'unknown') + role_counts[role] += 1 + + message_transcript_index = None + if role in TRANSCRIPT_ROLES: + transcript_index += 1 + message_transcript_index = transcript_index + + thoughts = thoughts_by_message.get(message.get('id'), []) + exported_message = _sanitize_message( + message, + sequence_index=sequence_index, + transcript_index=message_transcript_index, + thoughts=thoughts + ) + exported_messages.append(exported_message) + + counts = exported_message.get('citation_counts', {}) + for key in total_citation_counts: + total_citation_counts[key] += counts.get(key, 0) + total_thoughts += len(thoughts) + + # Compute message time range for summary caching + message_time_start = None + message_time_end = None + if ordered_messages: + message_time_start = ordered_messages[0].get('timestamp') + message_time_end = ordered_messages[-1].get('timestamp') + + sanitized_conversation = _sanitize_conversation( + conversation, + messages=exported_messages, + role_counts=role_counts, + citation_counts=total_citation_counts, + thought_count=total_thoughts + ) + summary_intro = _build_summary_intro( + messages=exported_messages, + conversation=conversation, + sanitized_conversation=sanitized_conversation, + settings=settings, + enabled=include_summary_intro, + summary_model_deployment=summary_model_deployment, + message_time_start=message_time_start, + message_time_end=message_time_end + ) + + return { + 'conversation': sanitized_conversation, + 'summary_intro': summary_intro, + 'messages': exported_messages + } + + +def _filter_messages_for_export(messages: List[Dict[str, Any]]) -> List[Dict[str, Any]]: + filtered_messages = [] + for message in messages: + metadata = message.get('metadata', {}) or {} + if metadata.get('is_deleted') is True: + continue + + thread_info = metadata.get('thread_info', {}) or {} + active = thread_info.get('active_thread') + if active is True or active is None or 'active_thread' not in thread_info: + filtered_messages.append(message) + + return filtered_messages + + +def _sanitize_conversation( + conversation: Dict[str, Any], + messages: List[Dict[str, Any]], + role_counts: Counter, + citation_counts: Counter, + thought_count: int +) -> Dict[str, Any]: + transcript_count = sum(1 for message in messages if message.get('is_transcript_message')) + return { + 'id': conversation.get('id'), + 'title': conversation.get('title', 'Untitled'), + 'last_updated': conversation.get('last_updated', ''), + 'chat_type': conversation.get('chat_type', 'personal'), + 'tags': conversation.get('tags', []), + 'context': conversation.get('context', []), + 'classification': conversation.get('classification', []), + 'strict': conversation.get('strict', False), + 'is_pinned': conversation.get('is_pinned', False), + 'scope_locked': conversation.get('scope_locked'), + 'locked_contexts': conversation.get('locked_contexts', []), + 'message_count': len(messages), + 'transcript_message_count': transcript_count, + 'message_counts_by_role': dict(role_counts), + 'citation_counts': dict(citation_counts), + 'thought_count': thought_count + } + + +def _sanitize_message( + message: Dict[str, Any], + sequence_index: int, + transcript_index: Optional[int], + thoughts: List[Dict[str, Any]] +) -> Dict[str, Any]: + role = message.get('role', '') + content = message.get('content', '') + raw_citation_buckets = _collect_raw_citation_buckets(message) + normalized_citations = _normalize_citations(raw_citation_buckets) + citation_counts = _build_citation_counts(normalized_citations) + details = _curate_message_details(message, citation_counts, len(thoughts)) + + return { + 'id': message.get('id'), + 'role': role, + 'speaker_label': _role_to_label(role), + 'sequence_index': sequence_index, + 'transcript_index': transcript_index, + 'label': f"Turn {transcript_index}" if transcript_index else f"Message {sequence_index}", + 'is_transcript_message': role in TRANSCRIPT_ROLES, + 'timestamp': message.get('timestamp', ''), + 'content': content, + 'content_text': _normalize_content(content), + 'details': details, + 'citations': normalized_citations, + 'citation_counts': citation_counts, + 'thoughts': thoughts, + 'legacy_citations': raw_citation_buckets['legacy'], + 'hybrid_citations': raw_citation_buckets['hybrid'], + 'web_search_citations': raw_citation_buckets['web'], + 'agent_citations': raw_citation_buckets['agent'] + } + + +def _sanitize_thought(thought: Dict[str, Any]) -> Dict[str, Any]: + return { + 'step_index': thought.get('step_index'), + 'step_type': thought.get('step_type'), + 'content': thought.get('content'), + 'detail': thought.get('detail'), + 'duration_ms': thought.get('duration_ms'), + 'timestamp': thought.get('timestamp') + } + + +def _collect_raw_citation_buckets(message: Dict[str, Any]) -> Dict[str, List[Any]]: + def ensure_list(value: Any) -> List[Any]: + if not value: + return [] + return value if isinstance(value, list) else [value] + + return { + 'legacy': ensure_list(message.get('citations')), + 'hybrid': ensure_list(message.get('hybrid_citations')), + 'web': ensure_list(message.get('web_search_citations')), + 'agent': ensure_list(message.get('agent_citations')) + } + + +def _normalize_citations(raw_citation_buckets: Dict[str, List[Any]]) -> List[Dict[str, Any]]: + normalized = [] + + for citation in raw_citation_buckets.get('hybrid', []): + if isinstance(citation, dict): + normalized.append({ + 'citation_type': 'document', + 'label': _build_document_citation_label(citation), + 'file_name': citation.get('file_name'), + 'title': citation.get('title') or citation.get('file_name'), + 'page_number': citation.get('page_number'), + 'citation_id': citation.get('citation_id'), + 'chunk_id': citation.get('chunk_id'), + 'metadata_type': citation.get('metadata_type'), + 'metadata_content': citation.get('metadata_content'), + 'score': citation.get('score'), + 'classification': citation.get('classification'), + 'url': citation.get('url') + }) else: - parts = [] - for entry in exported: - parts.append(_conversation_to_markdown(entry)) - content = '\n\n---\n\n'.join(parts) - filename = f"conversations_export_{timestamp_str}.md" - content_type = 'text/markdown; charset=utf-8' - - response = make_response(content) - response.headers['Content-Type'] = content_type - response.headers['Content-Disposition'] = f'attachment; filename="{filename}"' - return response - - def _build_zip_response(exported, export_format, timestamp_str): - """Build a ZIP archive containing one file per conversation.""" - buffer = io.BytesIO() - with zipfile.ZipFile(buffer, 'w', zipfile.ZIP_DEFLATED) as zf: - for entry in exported: - conv = entry['conversation'] - safe_title = _safe_filename(conv.get('title', 'Untitled')) - conv_id_short = conv.get('id', 'unknown')[:8] - - if export_format == 'json': - file_content = json.dumps(entry, indent=2, ensure_ascii=False, default=str) - ext = 'json' - else: - file_content = _conversation_to_markdown(entry) - ext = 'md' + normalized.append({ + 'citation_type': 'document', + 'label': str(citation), + 'value': str(citation) + }) + + for citation in raw_citation_buckets.get('web', []): + if isinstance(citation, dict): + title = citation.get('title') or citation.get('url') or 'Web source' + normalized.append({ + 'citation_type': 'web', + 'label': title, + 'title': title, + 'url': citation.get('url') + }) + else: + normalized.append({ + 'citation_type': 'web', + 'label': str(citation), + 'value': str(citation) + }) + + for citation in raw_citation_buckets.get('agent', []): + if isinstance(citation, dict): + tool_name = citation.get('tool_name') or citation.get('function_name') or 'Tool invocation' + normalized.append({ + 'citation_type': 'agent_tool', + 'label': tool_name, + 'tool_name': citation.get('tool_name'), + 'function_name': citation.get('function_name'), + 'plugin_name': citation.get('plugin_name'), + 'success': citation.get('success'), + 'timestamp': citation.get('timestamp') + }) + else: + normalized.append({ + 'citation_type': 'agent_tool', + 'label': str(citation), + 'value': str(citation) + }) + + for citation in raw_citation_buckets.get('legacy', []): + if isinstance(citation, dict): + title = citation.get('title') or citation.get('filepath') or citation.get('url') or 'Legacy citation' + normalized.append({ + 'citation_type': 'legacy', + 'label': title, + 'title': title, + 'url': citation.get('url'), + 'filepath': citation.get('filepath') + }) + else: + normalized.append({ + 'citation_type': 'legacy', + 'label': str(citation), + 'value': str(citation) + }) + + return normalized + + +def _build_document_citation_label(citation: Dict[str, Any]) -> str: + file_name = citation.get('file_name') or citation.get('title') or 'Document source' + metadata_type = citation.get('metadata_type') + page_number = citation.get('page_number') + + if metadata_type: + return f"{file_name} — {metadata_type.replace('_', ' ').title()}" + if page_number not in (None, ''): + return f"{file_name} — Page {page_number}" + return file_name + + +def _build_citation_counts(citations: List[Dict[str, Any]]) -> Dict[str, int]: + counts = { + 'document': 0, + 'web': 0, + 'agent_tool': 0, + 'legacy': 0, + 'total': len(citations) + } + for citation in citations: + citation_type = citation.get('citation_type') + if citation_type in counts: + counts[citation_type] += 1 + return counts + + +def _curate_message_details( + message: Dict[str, Any], + citation_counts: Dict[str, int], + thought_count: int +) -> Dict[str, Any]: + role = message.get('role', '') + metadata = message.get('metadata', {}) or {} + details: Dict[str, Any] = {} + + if role == 'user': + details['interaction_mode'] = _remove_empty_values({ + 'button_states': metadata.get('button_states'), + 'workspace_search': _curate_workspace_search(metadata.get('workspace_search')), + 'prompt_selection': _curate_prompt_selection(metadata.get('prompt_selection')), + 'agent_selection': _curate_agent_selection(metadata.get('agent_selection')), + 'model_selection': _curate_model_selection(metadata.get('model_selection')) + }) + elif role == 'assistant': + details['generation'] = _remove_empty_values({ + 'augmented': message.get('augmented'), + 'model_deployment': message.get('model_deployment_name'), + 'agent_name': message.get('agent_name'), + 'agent_display_name': message.get('agent_display_name'), + 'reasoning_effort': metadata.get('reasoning_effort'), + 'hybrid_search_query': message.get('hybridsearch_query'), + 'token_usage': _curate_token_usage(metadata.get('token_usage')), + 'citation_counts': citation_counts, + 'thought_count': thought_count + }) + else: + details['message_context'] = _remove_empty_values({ + 'filename': message.get('filename'), + 'prompt': message.get('prompt'), + 'is_table': message.get('is_table'), + 'model_deployment': message.get('model_deployment_name') + }) + + return _remove_empty_values(details) + + +def _curate_workspace_search(workspace_search: Optional[Dict[str, Any]]) -> Dict[str, Any]: + if not isinstance(workspace_search, dict): + return {} + return _remove_empty_values({ + 'search_enabled': workspace_search.get('search_enabled'), + 'document_scope': workspace_search.get('document_scope'), + 'document_name': workspace_search.get('document_name'), + 'document_filename': workspace_search.get('document_filename'), + 'group_name': workspace_search.get('group_name'), + 'classification': workspace_search.get('classification'), + 'public_workspace_id': workspace_search.get('active_public_workspace_id') + }) - file_name = f"{safe_title}_{conv_id_short}.{ext}" - zf.writestr(file_name, file_content) - buffer.seek(0) - filename = f"conversations_export_{timestamp_str}.zip" +def _curate_prompt_selection(prompt_selection: Optional[Dict[str, Any]]) -> Dict[str, Any]: + if not isinstance(prompt_selection, dict): + return {} + return _remove_empty_values({ + 'prompt_name': prompt_selection.get('prompt_name'), + 'selected_prompt_index': prompt_selection.get('selected_prompt_index'), + 'selected_prompt_text': prompt_selection.get('selected_prompt_text') + }) - response = make_response(buffer.read()) - response.headers['Content-Type'] = 'application/zip' - response.headers['Content-Disposition'] = f'attachment; filename="{filename}"' - return response - def _conversation_to_markdown(entry): - """Convert a conversation + messages entry to Markdown format.""" - conv = entry['conversation'] - messages = entry['messages'] +def _curate_agent_selection(agent_selection: Optional[Dict[str, Any]]) -> Dict[str, Any]: + if not isinstance(agent_selection, dict): + return {} + return _remove_empty_values({ + 'selected_agent': agent_selection.get('selected_agent'), + 'agent_display_name': agent_selection.get('agent_display_name'), + 'is_global': agent_selection.get('is_global'), + 'is_group': agent_selection.get('is_group'), + 'group_name': agent_selection.get('group_name') + }) + + +def _curate_model_selection(model_selection: Optional[Dict[str, Any]]) -> Dict[str, Any]: + if not isinstance(model_selection, dict): + return {} + return _remove_empty_values({ + 'selected_model': model_selection.get('selected_model'), + 'frontend_requested_model': model_selection.get('frontend_requested_model'), + 'reasoning_effort': model_selection.get('reasoning_effort'), + 'streaming': model_selection.get('streaming') + }) + + +def _curate_token_usage(token_usage: Any) -> Dict[str, Any]: + if not isinstance(token_usage, dict): + return {} + return _remove_empty_values({ + 'prompt_tokens': token_usage.get('prompt_tokens'), + 'completion_tokens': token_usage.get('completion_tokens'), + 'total_tokens': token_usage.get('total_tokens') + }) + + +def _remove_empty_values(value: Any) -> Any: + if isinstance(value, dict): + cleaned = {} + for key, item in value.items(): + cleaned_item = _remove_empty_values(item) + if cleaned_item in (None, '', [], {}): + continue + cleaned[key] = cleaned_item + return cleaned + + if isinstance(value, list): + cleaned_list = [] + for item in value: + cleaned_item = _remove_empty_values(item) + if cleaned_item in (None, '', [], {}): + continue + cleaned_list.append(cleaned_item) + return cleaned_list + + return value + + +def generate_conversation_summary( + messages: List[Dict[str, Any]], + conversation_title: str, + settings: Dict[str, Any], + model_deployment: str, + message_time_start: str = None, + message_time_end: str = None, + conversation_id: str = None +) -> Dict[str, Any]: + """Generate a conversation summary using the LLM and optionally persist it. + + This is the shared helper used by both the export pipeline and the + on-demand summary API endpoint. Returns a summary dict suitable for + storage in conversation metadata. + + Raises ValueError when there is no content to summarise and + RuntimeError on model errors. + """ + transcript_lines = [] + for message in messages: + content_text = message.get('content_text', '') + if not content_text: + continue + role = message.get('role', 'unknown') + speaker = message.get('speaker_label', role).upper() + transcript_lines.append(f"{speaker}: {content_text}") + + transcript_text = '\n\n'.join(transcript_lines).strip() + if not transcript_text: + raise ValueError('No message content was available to summarize.') + + transcript_text = _truncate_for_summary(transcript_text) + + gpt_client, gpt_model = _initialize_gpt_client(settings, model_deployment) + summary_prompt = ( + "You are summarizing a conversation for an export document. " + "Read the full conversation below and write a concise summary. " + "Use your judgement on length: for short conversations write one brief paragraph, " + "for longer or more detailed conversations write two paragraphs. " + "If you need refer to the user, use their name, but do not refer to the user too often." + "Cover the goals, the key topics discussed, any data or tools referenced, " + "and the main outcomes or answers provided. " + "Be factual and neutral. Return plain text only — no headings, no bullet points, no markdown formatting." + ) + + model_lower = gpt_model.lower() + is_reasoning_model = ( + 'o1' in model_lower or 'o3' in model_lower or 'gpt-5' in model_lower + ) + instruction_role = 'developer' if is_reasoning_model else 'system' + + debug_print(f"Summary generation: sending {len(transcript_lines)} messages " + f"({len(transcript_text)} chars) to {gpt_model} (role={instruction_role})") + + summary_response = gpt_client.chat.completions.create( + model=gpt_model, + messages=[ + { + 'role': instruction_role, + 'content': summary_prompt + }, + { + 'role': 'user', + 'content': ( + f"Conversation Title: {conversation_title}\n\n" + f"{transcript_text}" + ) + } + ] + ) + + debug_print(f"Summary generation: response choices=" + f"{len(summary_response.choices) if summary_response.choices else 0}, " + f"finish_reason={summary_response.choices[0].finish_reason if summary_response.choices else 'N/A'}") + + summary_text = (summary_response.choices[0].message.content or '').strip() if summary_response.choices else '' + if not summary_text: + debug_print('Summary generation: model returned an empty response') + log_event('Conversation summary generation returned empty response', level='WARNING') + raise RuntimeError('Summary model returned an empty response.') + + summary_data = { + 'content': summary_text, + 'model_deployment': gpt_model, + 'generated_at': datetime.utcnow().isoformat(), + 'message_time_start': message_time_start, + 'message_time_end': message_time_end + } + + # Persist to Cosmos when a conversation_id is available + if conversation_id: + try: + update_conversation_with_metadata(conversation_id, {'summary': summary_data}) + debug_print(f"Summary persisted to conversation {conversation_id}") + except Exception as persist_exc: + debug_print(f"Failed to persist summary to Cosmos: {persist_exc}") + log_event(f"Failed to persist conversation summary: {persist_exc}", level="WARNING") + + return summary_data + + +def _build_summary_intro( + messages: List[Dict[str, Any]], + conversation: Dict[str, Any], + sanitized_conversation: Dict[str, Any], + settings: Dict[str, Any], + enabled: bool, + summary_model_deployment: str, + message_time_start: str = None, + message_time_end: str = None +) -> Dict[str, Any]: + """Build the summary_intro block for the export payload. + + Uses cached summary from conversation metadata when present and + still current (no newer messages). Otherwise generates a fresh + summary via ``generate_conversation_summary`` and persists it. + """ + summary_intro = { + 'enabled': enabled, + 'generated': False, + 'model_deployment': summary_model_deployment or None, + 'generated_at': None, + 'content': '', + 'error': None + } + + if not enabled: + return summary_intro + + # Check for a cached summary stored in the conversation document + existing_summary = conversation.get('summary') + if existing_summary and isinstance(existing_summary, dict): + cached_end = existing_summary.get('message_time_end') + if cached_end and message_time_end and cached_end >= message_time_end: + debug_print('Export summary: using cached summary from conversation metadata') + summary_intro.update({ + 'generated': True, + 'model_deployment': existing_summary.get('model_deployment'), + 'generated_at': existing_summary.get('generated_at'), + 'content': existing_summary.get('content', ''), + 'error': None + }) + return summary_intro + debug_print('Export summary: cached summary is stale, regenerating') + + try: + conversation_id = conversation.get('id') + conversation_title = sanitized_conversation.get('title', 'Untitled') + + summary_data = generate_conversation_summary( + messages=messages, + conversation_title=conversation_title, + settings=settings, + model_deployment=summary_model_deployment, + message_time_start=message_time_start, + message_time_end=message_time_end, + conversation_id=conversation_id + ) + + summary_intro.update({ + 'generated': True, + 'model_deployment': summary_data.get('model_deployment'), + 'generated_at': summary_data.get('generated_at'), + 'content': summary_data.get('content', ''), + 'error': None + }) + return summary_intro + + except (ValueError, RuntimeError) as known_exc: + debug_print(f"Export summary generation issue: {known_exc}") + summary_intro['error'] = str(known_exc) + if hasattr(known_exc, 'model_deployment'): + summary_intro['model_deployment'] = known_exc.model_deployment + return summary_intro + + except Exception as exc: + debug_print(f"Export summary generation failed: {exc}") + log_event(f"Conversation export summary generation failed: {exc}", level="WARNING") + summary_intro['error'] = str(exc) + return summary_intro + + +def _truncate_for_summary(transcript_text: str) -> str: + if len(transcript_text) <= SUMMARY_SOURCE_CHAR_LIMIT: + return transcript_text + + head_chars = SUMMARY_SOURCE_CHAR_LIMIT // 2 + tail_chars = SUMMARY_SOURCE_CHAR_LIMIT - head_chars + return ( + transcript_text[:head_chars] + + "\n\n[... transcript truncated for export summary generation ...]\n\n" + + transcript_text[-tail_chars:] + ) + + +def _initialize_gpt_client(settings: Dict[str, Any], requested_model: str = ''): + enable_gpt_apim = settings.get('enable_gpt_apim', False) + + if enable_gpt_apim: + raw_models = settings.get('azure_apim_gpt_deployment', '') or '' + apim_models = [model.strip() for model in raw_models.split(',') if model.strip()] + if not apim_models: + raise ValueError('APIM GPT deployment name is not configured.') + + if requested_model and requested_model not in apim_models: + raise ValueError(f"Requested summary model '{requested_model}' is not configured for APIM.") + + gpt_model = requested_model or apim_models[0] + gpt_client = AzureOpenAI( + api_version=settings.get('azure_apim_gpt_api_version'), + azure_endpoint=settings.get('azure_apim_gpt_endpoint'), + api_key=settings.get('azure_apim_gpt_subscription_key') + ) + return gpt_client, gpt_model + + auth_type = settings.get('azure_openai_gpt_authentication_type') + endpoint = settings.get('azure_openai_gpt_endpoint') + api_version = settings.get('azure_openai_gpt_api_version') + gpt_model_obj = settings.get('gpt_model', {}) or {} + + if requested_model: + gpt_model = requested_model + elif gpt_model_obj.get('selected'): + gpt_model = gpt_model_obj['selected'][0]['deploymentName'] + else: + raise ValueError('No GPT model selected or configured for export summary generation.') + + if auth_type == 'managed_identity': + token_provider = get_bearer_token_provider(DefaultAzureCredential(), cognitive_services_scope) + gpt_client = AzureOpenAI( + api_version=api_version, + azure_endpoint=endpoint, + azure_ad_token_provider=token_provider + ) + else: + api_key = settings.get('azure_openai_gpt_key') + if not api_key: + raise ValueError('Azure OpenAI API Key not configured.') + gpt_client = AzureOpenAI( + api_version=api_version, + azure_endpoint=endpoint, + api_key=api_key + ) + + return gpt_client, gpt_model + + +def _build_single_file_response(exported: List[Dict[str, Any]], export_format: str, timestamp_str: str): + """Build a single-file download response.""" + if export_format == 'json': + content = json.dumps(exported, indent=2, ensure_ascii=False, default=str) + filename = f"conversations_export_{timestamp_str}.json" + content_type = 'application/json; charset=utf-8' + elif export_format == 'pdf': + if len(exported) == 1: + content = _conversation_to_pdf_bytes(exported[0]) + else: + combined_parts = [] + for idx, entry in enumerate(exported): + if idx > 0: + combined_parts.append( + '
' + ) + combined_parts.append(_build_pdf_html_body(entry)) + content = _html_body_to_pdf_bytes('\n'.join(combined_parts)) + filename = f"conversations_export_{timestamp_str}.pdf" + content_type = 'application/pdf' + else: + parts = [] + for entry in exported: + parts.append(_conversation_to_markdown(entry)) + content = '\n\n---\n\n'.join(parts) + filename = f"conversations_export_{timestamp_str}.md" + content_type = 'text/markdown; charset=utf-8' + + response = make_response(content) + response.headers['Content-Type'] = content_type + response.headers['Content-Disposition'] = f'attachment; filename="{filename}"' + return response + + +def _build_zip_response(exported: List[Dict[str, Any]], export_format: str, timestamp_str: str): + """Build a ZIP archive containing one file per conversation.""" + buffer = io.BytesIO() + with zipfile.ZipFile(buffer, 'w', zipfile.ZIP_DEFLATED) as zf: + for entry in exported: + conversation = entry['conversation'] + safe_title = _safe_filename(conversation.get('title', 'Untitled')) + conversation_id_short = conversation.get('id', 'unknown')[:8] + + if export_format == 'json': + file_content = json.dumps(entry, indent=2, ensure_ascii=False, default=str) + ext = 'json' + elif export_format == 'pdf': + file_content = _conversation_to_pdf_bytes(entry) + ext = 'pdf' + else: + file_content = _conversation_to_markdown(entry) + ext = 'md' - lines = [] - title = conv.get('title', 'Untitled') - lines.append(f"# {title}") + file_name = f"{safe_title}_{conversation_id_short}.{ext}" + zf.writestr(file_name, file_content) + + buffer.seek(0) + filename = f"conversations_export_{timestamp_str}.zip" + + response = make_response(buffer.read()) + response.headers['Content-Type'] = 'application/zip' + response.headers['Content-Disposition'] = f'attachment; filename="{filename}"' + return response + + +def _conversation_to_markdown(entry: Dict[str, Any]) -> str: + """Convert a conversation + messages entry to Markdown format.""" + conversation = entry['conversation'] + messages = entry['messages'] + summary_intro = entry.get('summary_intro', {}) or {} + + transcript_messages = [message for message in messages if message.get('is_transcript_message')] + detail_messages = [message for message in messages if message.get('details')] + reference_messages = [message for message in messages if message.get('citations')] + thought_messages = [message for message in messages if message.get('thoughts')] + supplemental_messages = [message for message in messages if not message.get('is_transcript_message')] + + lines: List[str] = [] + lines.append(f"# {conversation.get('title', 'Untitled')}") + lines.append('') + lines.append(f"**Last Updated:** {conversation.get('last_updated', '')} ") + lines.append(f"**Chat Type:** {conversation.get('chat_type', 'personal')} ") + lines.append(f"**Messages:** {conversation.get('message_count', len(messages))} ") + if conversation.get('tags'): + lines.append(f"**Tags:** {', '.join(_format_tag(tag) for tag in conversation.get('tags', []))} ") + if conversation.get('classification'): + lines.append(f"**Classification:** {', '.join(_format_tag(item) for item in conversation.get('classification', []))} ") + lines.append('') + + if summary_intro.get('enabled') and summary_intro.get('generated') and summary_intro.get('content'): + lines.append('## Abstract') lines.append('') + lines.append(summary_intro.get('content', '')) + lines.append('') + lines.append(f"_Generated with {summary_intro.get('model_deployment') or 'configured model'} on {summary_intro.get('generated_at')}_") + lines.append('') + elif summary_intro.get('enabled') and summary_intro.get('error'): + lines.append('> _A summary intro was requested, but it could not be generated for this export._') + lines.append(f"> _Error: {summary_intro.get('error')}_") + lines.append('') + + lines.append('## Transcript') + lines.append('') + if not transcript_messages: + lines.append('_No user or assistant transcript messages were available for export._') + lines.append('') + else: + for message in transcript_messages: + lines.append(f"### {message.get('label')} — {message.get('speaker_label')}") + if message.get('timestamp'): + lines.append(f"*{message.get('timestamp')}*") + lines.append('') + lines.append(message.get('content_text') or '_No content recorded._') + lines.append('') - # Metadata - last_updated = conv.get('last_updated', '') - chat_type = conv.get('chat_type', 'personal') - tags = conv.get('tags', []) - - lines.append(f"**Last Updated:** {last_updated} ") - lines.append(f"**Chat Type:** {chat_type} ") - if tags: - tag_strs = [str(t) for t in tags] - lines.append(f"**Tags:** {', '.join(tag_strs)} ") - lines.append(f"**Messages:** {len(messages)} ") + lines.append('## Appendix A — Conversation Metadata') + lines.append('') + metadata_to_render = _remove_empty_values({ + 'context': conversation.get('context'), + 'classification': conversation.get('classification'), + 'strict': conversation.get('strict'), + 'is_pinned': conversation.get('is_pinned'), + 'scope_locked': conversation.get('scope_locked'), + 'locked_contexts': conversation.get('locked_contexts'), + 'message_counts_by_role': conversation.get('message_counts_by_role'), + 'citation_counts': conversation.get('citation_counts'), + 'thought_count': conversation.get('thought_count') + }) + _append_markdown_mapping(lines, metadata_to_render) + lines.append('') + + if detail_messages: + lines.append('## Appendix B — Message Details') lines.append('') - lines.append('---') + for message in detail_messages: + lines.append(f"### {message.get('label')} — {message.get('speaker_label')}") + if message.get('timestamp'): + lines.append(f"*{message.get('timestamp')}*") + lines.append('') + _append_markdown_mapping(lines, message.get('details', {})) + lines.append('') + + if reference_messages: + lines.append('## Appendix C — References') lines.append('') + for message in reference_messages: + lines.append(f"### {message.get('label')} — {message.get('speaker_label')}") + if message.get('timestamp'): + lines.append(f"*{message.get('timestamp')}*") + lines.append('') + _append_citations_markdown(lines, message) + lines.append('') - # Messages - for msg in messages: - role = msg.get('role', 'unknown') - timestamp = msg.get('timestamp', '') - raw_content = msg.get('content', '') - content = _normalize_content(raw_content) - - role_label = role.capitalize() - if role == 'assistant': - role_label = 'Assistant' - elif role == 'user': - role_label = 'User' - elif role == 'system': - role_label = 'System' - elif role == 'tool': - role_label = 'Tool' - - lines.append(f"### {role_label}") - if timestamp: - lines.append(f"*{timestamp}*") + if thought_messages: + lines.append('## Appendix D — Processing Thoughts') + lines.append('') + for message in thought_messages: + lines.append(f"### {message.get('label')} — {message.get('speaker_label')}") + if message.get('timestamp'): + lines.append(f"*{message.get('timestamp')}*") lines.append('') - lines.append(content) + for thought in message.get('thoughts', []): + thought_label = thought.get('step_type', 'step').replace('_', ' ').title() + lines.append(f"1. **{thought_label}:** {thought.get('content') or 'No content recorded.'}") + if thought.get('duration_ms') is not None: + lines.append(f" - **Duration:** {thought.get('duration_ms')} ms") + if thought.get('timestamp'): + lines.append(f" - **Timestamp:** {thought.get('timestamp')}") + if thought.get('detail'): + lines.append(' - **Detail:**') + _append_code_block(lines, thought.get('detail'), indent=' ') lines.append('') - # Citations - citations = msg.get('citations') - if citations: - lines.append('**Citations:**') - if isinstance(citations, list): - for cit in citations: - if isinstance(cit, dict): - source = cit.get('title') or cit.get('filepath') or cit.get('url', 'Unknown') - lines.append(f"- {source}") - else: - lines.append(f"- {cit}") - lines.append('') - - lines.append('---') + if supplemental_messages: + lines.append('## Appendix E — Supplemental Messages') + lines.append('') + for message in supplemental_messages: + lines.append(f"### {message.get('label')} — {message.get('speaker_label')}") + if message.get('timestamp'): + lines.append(f"*{message.get('timestamp')}*") + lines.append('') + lines.append(message.get('content_text') or '_No content recorded._') lines.append('') - return '\n'.join(lines) + return '\n'.join(lines).strip() - def _normalize_content(content): - """Normalize message content to a plain string. - - Content may be a string, a list of content-part dicts - (e.g. [{"type": "text", "text": "..."}, ...]), or a dict. - """ - if isinstance(content, str): - return content - if isinstance(content, list): - parts = [] - for item in content: - if isinstance(item, dict): - if item.get('type') == 'text': - parts.append(item.get('text', '')) - elif item.get('type') == 'image_url': - parts.append('[Image]') + +def _append_citations_markdown(lines: List[str], message: Dict[str, Any]): + document_citations = [citation for citation in message.get('citations', []) if citation.get('citation_type') == 'document'] + web_citations = [citation for citation in message.get('citations', []) if citation.get('citation_type') == 'web'] + agent_citations = message.get('agent_citations', []) or [] + legacy_citations = [citation for citation in message.get('citations', []) if citation.get('citation_type') == 'legacy'] + + if not any([document_citations, web_citations, agent_citations, legacy_citations]): + lines.append('_No citations were recorded for this message._') + return + + if document_citations: + lines.append('#### Document Sources') + lines.append('') + for index, citation in enumerate(document_citations, start=1): + lines.append(f"{index}. **{citation.get('label', 'Document source')}**") + detail_mapping = _remove_empty_values({ + 'citation_id': citation.get('citation_id'), + 'page_number': citation.get('page_number'), + 'classification': citation.get('classification'), + 'score': citation.get('score'), + 'metadata_type': citation.get('metadata_type') + }) + _append_markdown_mapping(lines, detail_mapping, indent=1) + if citation.get('metadata_content'): + lines.append(' - **Metadata Content:**') + _append_code_block(lines, citation.get('metadata_content'), indent=' ') + lines.append('') + + if web_citations: + lines.append('#### Web Sources') + lines.append('') + for index, citation in enumerate(web_citations, start=1): + title = citation.get('title') or citation.get('label') or 'Web source' + url = citation.get('url') + if url: + lines.append(f"{index}. [{title}]({url})") + else: + lines.append(f"{index}. {title}") + lines.append('') + + if agent_citations: + lines.append('#### Tool Invocations') + lines.append('') + for index, citation in enumerate(agent_citations, start=1): + label = citation.get('tool_name') or citation.get('function_name') or f"Tool {index}" + lines.append(f"{index}. **{label}**") + detail_mapping = _remove_empty_values({ + 'function_name': citation.get('function_name'), + 'plugin_name': citation.get('plugin_name'), + 'success': citation.get('success'), + 'timestamp': citation.get('timestamp') + }) + _append_markdown_mapping(lines, detail_mapping, indent=1) + if citation.get('function_arguments') not in (None, '', [], {}): + lines.append(' - **Arguments:**') + _append_code_block(lines, citation.get('function_arguments'), indent=' ') + if citation.get('function_result') not in (None, '', [], {}): + lines.append(' - **Result:**') + _append_code_block(lines, citation.get('function_result'), indent=' ') + lines.append('') + + if legacy_citations: + lines.append('#### Legacy Citation Records') + lines.append('') + for index, citation in enumerate(legacy_citations, start=1): + lines.append(f"{index}. {citation.get('label', 'Legacy citation')}") + lines.append('') + + +def _append_markdown_mapping(lines: List[str], mapping: Dict[str, Any], indent: int = 0): + if not isinstance(mapping, dict) or not mapping: + return + + prefix = ' ' * indent + for key, value in mapping.items(): + label = _format_markdown_key(key) + if isinstance(value, dict): + lines.append(f"{prefix}- **{label}:**") + _append_markdown_mapping(lines, value, indent + 1) + elif isinstance(value, list): + if not value: + continue + if all(not isinstance(item, (dict, list)) for item in value): + lines.append(f"{prefix}- **{label}:** {', '.join(_stringify_markdown_value(item) for item in value)}") + else: + lines.append(f"{prefix}- **{label}:**") + for item in value: + if isinstance(item, dict): + lines.append(f"{prefix} -") + _append_markdown_mapping(lines, item, indent + 2) else: - parts.append(str(item)) + lines.append(f"{prefix} - {_stringify_markdown_value(item)}") + else: + lines.append(f"{prefix}- **{label}:** {_stringify_markdown_value(value)}") + + +def _append_code_block(lines: List[str], value: Any, indent: str = ''): + if isinstance(value, (dict, list)): + code_block = json.dumps(value, indent=2, ensure_ascii=False, default=str) + language = 'json' + else: + code_block = str(value) + language = 'text' + + lines.append(f"{indent}```{language}") + for line in code_block.splitlines() or ['']: + lines.append(f"{indent}{line}") + lines.append(f"{indent}```") + + +def _format_markdown_key(key: str) -> str: + return str(key).replace('_', ' ').title() + + +def _stringify_markdown_value(value: Any) -> str: + if isinstance(value, bool): + return 'Yes' if value else 'No' + return str(value) + + +def _format_tag(tag: Any) -> str: + """Format a tag or classification entry for display. + + Tags in Cosmos are stored as dicts such as + ``{'category': 'model', 'value': 'gpt-5'}`` or + ``{'category': 'participant', 'name': 'Alice', 'user_id': '...'}`` + but they can also be plain strings in older data. + """ + if isinstance(tag, dict): + category = tag.get('category', '') + # Participant tags carry a readable name / email + name = tag.get('name') or tag.get('email') or tag.get('display_name') + if name: + return f"{category}: {name}" if category else str(name) + # Document tags carry a title + title = tag.get('title') or tag.get('document_id') + if title: + return f"{category}: {title}" if category else str(title) + # Generic category/value tags + value = tag.get('value') + if value: + return f"{category}: {value}" if category else str(value) + return category or str(tag) + return str(tag) + + +def _role_to_label(role: str) -> str: + role_map = { + 'assistant': 'Assistant', + 'user': 'User', + 'system': 'System', + 'tool': 'Tool', + 'file': 'File', + 'image': 'Image', + 'safety': 'Safety', + 'blocked': 'Blocked' + } + return role_map.get(role, str(role).capitalize() or 'Message') + + +def _normalize_content(content: Any) -> str: + """Normalize message content to a plain string.""" + if isinstance(content, str): + return content + if isinstance(content, list): + parts = [] + for item in content: + if isinstance(item, dict): + if item.get('type') == 'text': + parts.append(item.get('text', '')) + elif item.get('type') == 'image_url': + parts.append('[Image]') else: parts.append(str(item)) - return '\n'.join(parts) - if isinstance(content, dict): - if content.get('type') == 'text': - return content.get('text', '') - return str(content) - return str(content) if content else '' - - def _safe_filename(title): - """Create a filesystem-safe filename from a conversation title.""" - import re - # Remove or replace unsafe characters - safe = re.sub(r'[<>:"/\\|?*]', '_', title) - safe = re.sub(r'\s+', '_', safe) - safe = safe.strip('_. ') - # Truncate to reasonable length - if len(safe) > 50: - safe = safe[:50] - return safe or 'Untitled' + else: + parts.append(str(item)) + return '\n'.join(parts) + if isinstance(content, dict): + if content.get('type') == 'text': + return content.get('text', '') + return str(content) + return str(content) if content else '' + + +def _safe_filename(title: str) -> str: + """Create a filesystem-safe filename from a conversation title.""" + safe = re.sub(r'[<>:"/\\|?*]', '_', title) + safe = re.sub(r'\s+', '_', safe) + safe = safe.strip('_. ') + if len(safe) > 50: + safe = safe[:50] + return safe or 'Untitled' + + +# --------------------------------------------------------------------------- +# PDF Export — HTML generation and PyMuPDF Story rendering +# --------------------------------------------------------------------------- + +_PDF_CSS = """ +body { + font-family: sans-serif; + font-size: 10pt; + color: #222; + line-height: 1.4; +} +h1 { + font-size: 16pt; + color: #1a1a2e; + margin-bottom: 2pt; +} +h2 { + font-size: 13pt; + color: #16213e; + margin-top: 16pt; + margin-bottom: 6pt; + border-bottom: 1px solid #ccc; + padding-bottom: 4pt; +} +h3 { + font-size: 11pt; + color: #0f3460; + margin-top: 10pt; + margin-bottom: 4pt; +} +h4 { + font-size: 10pt; + color: #333; + margin-top: 8pt; + margin-bottom: 4pt; +} +p { + margin-top: 2pt; + margin-bottom: 4pt; +} +.metadata { + font-size: 8pt; + color: #666; +} +.abstract { + background-color: #f8f9fa; + padding: 8pt; + margin-bottom: 8pt; +} +.note { + font-size: 9pt; + color: #856404; + background-color: #fff3cd; + padding: 6pt; +} +.bubble { + padding: 8pt 12pt; + margin-bottom: 8pt; +} +.bubble-header { + font-size: 8pt; + color: #444; + margin-bottom: 2pt; +} +.ts { + font-weight: normal; + color: #888; +} +.user-bubble { + background-color: #c8e0fa; + margin-left: 60pt; +} +.assistant-bubble { + background-color: #f1f0f0; + margin-right: 60pt; +} +.system-bubble { + background-color: #fff3cd; + margin-left: 30pt; + margin-right: 30pt; + font-size: 9pt; +} +.file-bubble { + background-color: #e8f5e9; + margin-right: 60pt; + font-size: 9pt; +} +.other-bubble { + background-color: #f5f5f5; + margin-left: 30pt; + margin-right: 30pt; + font-size: 9pt; +} +table { + border-collapse: collapse; + width: 100%; + font-size: 9pt; + margin-bottom: 8pt; +} +th, td { + border: 1px solid #ddd; + padding: 4pt 6pt; + text-align: left; +} +th { + background-color: #f5f5f5; + font-weight: bold; +} +pre { + background-color: #f5f5f5; + padding: 6pt; + font-size: 8pt; + font-family: monospace; +} +code { + font-family: monospace; + font-size: 9pt; + background-color: #f0f0f0; + padding: 1pt 3pt; +} +ol, ul { + margin-top: 4pt; + margin-bottom: 8pt; +} +li { + margin-bottom: 4pt; +} +small { + font-size: 8pt; + color: #666; +} +a { + color: #0066cc; +} +""" + + +def _pdf_bubble_class(role: str) -> str: + """Return the CSS class for a chat bubble based on message role.""" + role_classes = { + 'user': 'user-bubble', + 'assistant': 'assistant-bubble', + 'system': 'system-bubble', + 'file': 'file-bubble', + 'image': 'file-bubble' + } + return role_classes.get(role, 'other-bubble') + + +def _build_pdf_html_body(entry: Dict[str, Any]) -> str: + """Build the HTML body content for a single conversation PDF.""" + conversation = entry['conversation'] + messages = entry['messages'] + summary_intro = entry.get('summary_intro', {}) or {} + + transcript_messages = [m for m in messages if m.get('is_transcript_message')] + detail_messages = [m for m in messages if m.get('details')] + reference_messages = [m for m in messages if m.get('citations')] + thought_messages = [m for m in messages if m.get('thoughts')] + supplemental_messages = [m for m in messages if not m.get('is_transcript_message')] + + parts: List[str] = [] + + # --- Title and metadata --- + parts.append(f'A summary intro was requested, '
+ 'but could not be generated for this export.
'
+ f'Error: {error_text}
No user or assistant transcript messages were available for export.
' + ) + else: + for message in transcript_messages: + role = message.get('role', '') + bubble_class = _pdf_bubble_class(role) + label = message.get('label', '') + speaker = message.get('speaker_label', '') + timestamp = message.get('timestamp', '') + content = message.get('content_text', '') or 'No content recorded.' + + parts.append(f'{_escape_html(label)} — ' + f'{_escape_html(speaker)}{ts_str}
' + ) + content_html = markdown2.markdown( + content, + extras=['fenced-code-blocks', 'tables', 'break-on-newline'] + ) + parts.append(content_html) + parts.append('No data available.
') + return + + parts.append('| Property | Value |
|---|---|
| {_escape_html(label)} | {formatted} |
No citations were recorded for this message.
') + return + + doc_citations = [c for c in citations if c.get('citation_type') == 'document'] + web_citations = [c for c in citations if c.get('citation_type') == 'web'] + agent_citations = [c for c in citations if c.get('citation_type') == 'agent_tool'] + legacy_citations = [c for c in citations if c.get('citation_type') == 'legacy'] + + if doc_citations: + parts.append('{_escape_html(code_text)}')
diff --git a/application/single_app/route_backend_conversations.py b/application/single_app/route_backend_conversations.py
index ed15cb91..d90d7e4a 100644
--- a/application/single_app/route_backend_conversations.py
+++ b/application/single_app/route_backend_conversations.py
@@ -3,11 +3,12 @@
from config import *
from functions_authentication import *
from functions_settings import *
-from functions_conversation_metadata import get_conversation_metadata
+from functions_conversation_metadata import get_conversation_metadata, update_conversation_with_metadata
from flask import Response, request
from functions_debug import debug_print
from swagger_wrapper import swagger_route, get_auth_security
from functions_activity_logging import log_conversation_creation, log_conversation_deletion, log_conversation_archival
+from functions_thoughts import archive_thoughts_for_conversation, delete_thoughts_for_conversation
def register_route_backend_conversations(app):
@@ -430,7 +431,14 @@ def delete_conversation(conversation_id):
cosmos_archived_messages_container.upsert_item(archived_doc)
cosmos_messages_container.delete_item(doc['id'], partition_key=conversation_id)
-
+
+ # Archive/delete thoughts for conversation
+ user_id_for_thoughts = conversation_item.get('user_id')
+ if archiving_enabled:
+ archive_thoughts_for_conversation(conversation_id, user_id_for_thoughts)
+ else:
+ delete_thoughts_for_conversation(conversation_id, user_id_for_thoughts)
+
# Log conversation deletion before actual deletion
log_conversation_deletion(
user_id=conversation_item.get('user_id'),
@@ -530,7 +538,13 @@ def delete_multiple_conversations():
cosmos_archived_messages_container.upsert_item(archived_message)
cosmos_messages_container.delete_item(message['id'], partition_key=conversation_id)
-
+
+ # Archive/delete thoughts for conversation
+ if archiving_enabled:
+ archive_thoughts_for_conversation(conversation_id, user_id)
+ else:
+ delete_thoughts_for_conversation(conversation_id, user_id)
+
# Log conversation deletion before actual deletion
log_conversation_deletion(
user_id=user_id,
@@ -798,7 +812,8 @@ def get_conversation_metadata_api(conversation_id):
"is_hidden": conversation_item.get('is_hidden', False),
"scope_locked": conversation_item.get('scope_locked'),
"locked_contexts": conversation_item.get('locked_contexts', []),
- "chat_type": conversation_item.get('chat_type', 'personal') # Default to 'personal' if chat_type is not defined (legacy conversations)
+ "chat_type": conversation_item.get('chat_type'),
+ "summary": conversation_item.get('summary')
}), 200
except CosmosResourceNotFoundError:
@@ -807,6 +822,95 @@ def get_conversation_metadata_api(conversation_id):
print(f"Error retrieving conversation metadata: {e}")
return jsonify({'error': 'Failed to retrieve conversation metadata'}), 500
+ @app.route('/api/conversations/${escapeHtml(summary.content)}
+No summary has been generated for this conversation yet.
+Loading data preview...
| ${escaped} | `; + } + html += '
|---|
| ${escaped} | `; + } + html += '
Select the format for your exported conversations.
Print-ready format with chat bubbles. Ideal for archiving and printing.
+Add a short abstract before the exported transcript. ${perConversationText}
+Advanced settings are typically not required. Expand below if you need to customize metadata or additional fields.
| Display Name | -Description | -Actions | -
|---|---|---|
|
-
- Loading...
-
- Select a group to load agents.
- |
- ||
| Display Name | +Description | +Actions | +
|---|---|---|
|
+
+ Loading...
+
+ Select a group to load agents.
+ |
+ ||
| Display Name | -Description | -Actions | -
|---|---|---|
|
-
- Loading...
-
- Select a group to load actions.
- |
- ||
| Display Name | +Description | +Actions | +
|---|---|---|
|
+
+ Loading...
+
+ Select a group to load actions.
+ |
+ ||
| Display Name | Description | Actions |
|---|---|---|
|
- Loading...
- Loading agents...
- |
- ||
| Display Name | Description | Actions |
|---|---|---|
|
+ Loading...
+ Loading agents...
+ |
+ ||
| Display Name | Description | Actions |
|---|
| Display Name | Description | Actions |
|---|