diff --git a/CLAUDE.md b/CLAUDE.md index 4e9c2830..2ad07a8a 100644 --- a/CLAUDE.md +++ b/CLAUDE.md @@ -58,6 +58,7 @@ return render_template('page.html', settings=public_settings) ## Version Management +- Its important to update the version at the end of every plan - Version is stored in `config.py`: `VERSION = "X.XXX.XXX"` - When incrementing, only change the third segment (e.g., `0.238.024` -> `0.238.025`) - Include the current version in functional test file headers and documentation files @@ -83,7 +84,7 @@ return render_template('page.html', settings=public_settings) ## Release Notes -After completing code changes, offer to update `docs/explanation/release_notes.md`. +After completing plans and code changes, offer to update `docs/explanation/release_notes.md`. - Add entries under the current version from `config.py` - If the version was bumped, create a new section at the top: `### **(vX.XXX.XXX)**` diff --git a/application/single_app/config.py b/application/single_app/config.py index 1e83e045..bec5513b 100644 --- a/application/single_app/config.py +++ b/application/single_app/config.py @@ -257,6 +257,8 @@ def get_redis_cache_infrastructure_endpoint(redis_hostname: str) -> str: storage_account_user_documents_container_name = "user-documents" storage_account_group_documents_container_name = "group-documents" storage_account_public_documents_container_name = "public-documents" +storage_account_personal_chat_container_name = "personal-chat" +storage_account_group_chat_container_name = "group-chat" # Initialize Azure Cosmos DB client cosmos_endpoint = os.getenv("AZURE_COSMOS_ENDPOINT") @@ -757,9 +759,11 @@ def initialize_clients(settings): # This addresses the issue where the application assumes containers exist if blob_service_client: for container_name in [ - storage_account_user_documents_container_name, - storage_account_group_documents_container_name, - storage_account_public_documents_container_name + storage_account_user_documents_container_name, + storage_account_group_documents_container_name, + storage_account_public_documents_container_name, + storage_account_personal_chat_container_name, + storage_account_group_chat_container_name ]: try: container_client = blob_service_client.get_container_client(container_name) diff --git a/application/single_app/functions_content.py b/application/single_app/functions_content.py index 376d23f4..3116ed82 100644 --- a/application/single_app/functions_content.py +++ b/application/single_app/functions_content.py @@ -352,7 +352,7 @@ def generate_embedding( embedding_model = selected_embedding_model['deploymentName'] while True: - random_delay = random.uniform(0.5, 2.0) + random_delay = random.uniform(0.05, 0.2) time.sleep(random_delay) try: @@ -385,3 +385,102 @@ def generate_embedding( except Exception as e: raise + +def generate_embeddings_batch( + texts, + batch_size=16, + max_retries=5, + initial_delay=1.0, + delay_multiplier=2.0 +): + """Generate embeddings for multiple texts in batches. + + Azure OpenAI embeddings API accepts a list of strings as input. + This reduces per-call overhead and delay significantly. + + Args: + texts: List of text strings to embed. + batch_size: Number of texts per API call (default 16). + max_retries: Max retries on rate limit errors. + initial_delay: Initial retry delay in seconds. + delay_multiplier: Multiplier for exponential backoff. + + Returns: + list of (embedding, token_usage) tuples, one per input text. + """ + settings = get_settings() + + enable_embedding_apim = settings.get('enable_embedding_apim', False) + + if enable_embedding_apim: + embedding_model = settings.get('azure_apim_embedding_deployment') + embedding_client = AzureOpenAI( + api_version=settings.get('azure_apim_embedding_api_version'), + azure_endpoint=settings.get('azure_apim_embedding_endpoint'), + api_key=settings.get('azure_apim_embedding_subscription_key')) + else: + if (settings.get('azure_openai_embedding_authentication_type') == 'managed_identity'): + token_provider = get_bearer_token_provider(DefaultAzureCredential(), cognitive_services_scope) + + embedding_client = AzureOpenAI( + api_version=settings.get('azure_openai_embedding_api_version'), + azure_endpoint=settings.get('azure_openai_embedding_endpoint'), + azure_ad_token_provider=token_provider + ) + + embedding_model_obj = settings.get('embedding_model', {}) + if embedding_model_obj and embedding_model_obj.get('selected'): + selected_embedding_model = embedding_model_obj['selected'][0] + embedding_model = selected_embedding_model['deploymentName'] + else: + embedding_client = AzureOpenAI( + api_version=settings.get('azure_openai_embedding_api_version'), + azure_endpoint=settings.get('azure_openai_embedding_endpoint'), + api_key=settings.get('azure_openai_embedding_key') + ) + + embedding_model_obj = settings.get('embedding_model', {}) + if embedding_model_obj and embedding_model_obj.get('selected'): + selected_embedding_model = embedding_model_obj['selected'][0] + embedding_model = selected_embedding_model['deploymentName'] + + results = [] + for i in range(0, len(texts), batch_size): + batch = texts[i:i + batch_size] + retries = 0 + current_delay = initial_delay + + while True: + random_delay = random.uniform(0.05, 0.2) + time.sleep(random_delay) + + try: + response = embedding_client.embeddings.create( + model=embedding_model, + input=batch + ) + + for item in response.data: + token_usage = None + if hasattr(response, 'usage') and response.usage: + token_usage = { + 'prompt_tokens': response.usage.prompt_tokens // len(batch), + 'total_tokens': response.usage.total_tokens // len(batch), + 'model_deployment_name': embedding_model + } + results.append((item.embedding, token_usage)) + break + + except RateLimitError as e: + retries += 1 + if retries > max_retries: + raise + + wait_time = current_delay * random.uniform(1.0, 1.5) + time.sleep(wait_time) + current_delay *= delay_multiplier + + except Exception as e: + raise + + return results diff --git a/application/single_app/functions_documents.py b/application/single_app/functions_documents.py index ce08066d..110afbd2 100644 --- a/application/single_app/functions_documents.py +++ b/application/single_app/functions_documents.py @@ -1646,6 +1646,191 @@ def save_chunks(page_text_content, page_number, file_name, user_id, document_id, # Return token usage information for accumulation return token_usage +def save_chunks_batch(chunks_data, user_id, document_id, group_id=None, public_workspace_id=None): + """ + Save multiple chunks at once using batch embedding and batch AI Search upload. + Significantly faster than calling save_chunks() per chunk. + + Args: + chunks_data: list of dicts with keys: page_text_content, page_number, file_name + user_id: The user ID + document_id: The document ID + group_id: Optional group ID for group documents + public_workspace_id: Optional public workspace ID for public documents + + Returns: + dict with 'total_tokens', 'prompt_tokens', 'model_deployment_name' + """ + from functions_content import generate_embeddings_batch + + current_time = datetime.now(timezone.utc).strftime('%Y-%m-%dT%H:%M:%SZ') + is_group = group_id is not None + is_public_workspace = public_workspace_id is not None + + # Retrieve metadata once for all chunks + try: + if is_public_workspace: + metadata = get_document_metadata( + document_id=document_id, + user_id=user_id, + public_workspace_id=public_workspace_id + ) + elif is_group: + metadata = get_document_metadata( + document_id=document_id, + user_id=user_id, + group_id=group_id + ) + else: + metadata = get_document_metadata( + document_id=document_id, + user_id=user_id + ) + + if not metadata: + raise ValueError(f"No metadata found for document {document_id}") + + version = metadata.get("version") if metadata.get("version") else 1 + except Exception as e: + log_event(f"[save_chunks_batch] Error retrieving metadata for document {document_id}: {repr(e)}", level=logging.ERROR) + raise + + # Generate all embeddings in batches + texts = [c['page_text_content'] for c in chunks_data] + try: + embedding_results = generate_embeddings_batch(texts) + except Exception as e: + log_event(f"[save_chunks_batch] Error generating batch embeddings for document {document_id}: {e}", level=logging.ERROR) + raise + + # Check for vision analysis once + vision_analysis = metadata.get('vision_analysis') + vision_text = "" + if vision_analysis: + vision_text_parts = [] + vision_text_parts.append("\n\n=== AI Vision Analysis ===") + vision_text_parts.append(f"Model: {vision_analysis.get('model', 'unknown')}") + if vision_analysis.get('description'): + vision_text_parts.append(f"\nDescription: {vision_analysis['description']}") + if vision_analysis.get('objects'): + objects_list = vision_analysis['objects'] + if isinstance(objects_list, list): + vision_text_parts.append(f"\nObjects Detected: {', '.join(objects_list)}") + else: + vision_text_parts.append(f"\nObjects Detected: {objects_list}") + if vision_analysis.get('text'): + vision_text_parts.append(f"\nVisible Text: {vision_analysis['text']}") + if vision_analysis.get('analysis'): + vision_text_parts.append(f"\nContextual Analysis: {vision_analysis['analysis']}") + vision_text = "\n".join(vision_text_parts) + + # Build all chunk documents + chunk_documents = [] + total_token_usage = {'total_tokens': 0, 'prompt_tokens': 0, 'model_deployment_name': None} + + for idx, chunk_info in enumerate(chunks_data): + embedding, token_usage = embedding_results[idx] + page_number = chunk_info['page_number'] + file_name = chunk_info['file_name'] + page_text_content = chunk_info['page_text_content'] + + if token_usage: + total_token_usage['total_tokens'] += token_usage.get('total_tokens', 0) + total_token_usage['prompt_tokens'] += token_usage.get('prompt_tokens', 0) + if not total_token_usage['model_deployment_name']: + total_token_usage['model_deployment_name'] = token_usage.get('model_deployment_name') + + chunk_id = f"{document_id}_{page_number}" + enhanced_chunk_text = page_text_content + vision_text if vision_text else page_text_content + + if is_public_workspace: + chunk_document = { + "id": chunk_id, + "document_id": document_id, + "chunk_id": str(page_number), + "chunk_text": enhanced_chunk_text, + "embedding": embedding, + "file_name": file_name, + "chunk_keywords": [], + "chunk_summary": "", + "page_number": page_number, + "author": [], + "title": "", + "document_classification": "None", + "document_tags": metadata.get('tags', []), + "chunk_sequence": page_number, + "upload_date": current_time, + "version": version, + "public_workspace_id": public_workspace_id + } + elif is_group: + shared_group_ids = metadata.get('shared_group_ids', []) if metadata else [] + chunk_document = { + "id": chunk_id, + "document_id": document_id, + "chunk_id": str(page_number), + "chunk_text": enhanced_chunk_text, + "embedding": embedding, + "file_name": file_name, + "chunk_keywords": [], + "chunk_summary": "", + "page_number": page_number, + "author": [], + "title": "", + "document_classification": "None", + "document_tags": metadata.get('tags', []), + "chunk_sequence": page_number, + "upload_date": current_time, + "version": version, + "group_id": group_id, + "shared_group_ids": shared_group_ids + } + else: + shared_user_ids = metadata.get('shared_user_ids', []) if metadata else [] + chunk_document = { + "id": chunk_id, + "document_id": document_id, + "chunk_id": str(page_number), + "chunk_text": enhanced_chunk_text, + "embedding": embedding, + "file_name": file_name, + "chunk_keywords": [], + "chunk_summary": "", + "page_number": page_number, + "author": [], + "title": "", + "document_classification": "None", + "document_tags": metadata.get('tags', []), + "chunk_sequence": page_number, + "upload_date": current_time, + "version": version, + "user_id": user_id, + "shared_user_ids": shared_user_ids + } + + chunk_documents.append(chunk_document) + + # Batch upload to AI Search + try: + if is_public_workspace: + search_client = CLIENTS["search_client_public"] + elif is_group: + search_client = CLIENTS["search_client_group"] + else: + search_client = CLIENTS["search_client_user"] + + # Upload in sub-batches of 32 to avoid request size limits + upload_batch_size = 32 + for i in range(0, len(chunk_documents), upload_batch_size): + sub_batch = chunk_documents[i:i + upload_batch_size] + search_client.upload_documents(documents=sub_batch) + + except Exception as e: + log_event(f"[save_chunks_batch] Error uploading batch to AI Search for document {document_id}: {e}", level=logging.ERROR) + raise + + return total_token_usage + def get_document_metadata_for_citations(document_id, user_id=None, group_id=None, public_workspace_id=None): """ Retrieve keywords and abstract from a document for creating metadata citations. @@ -4669,37 +4854,30 @@ def process_single_tabular_sheet(df, document_id, user_id, file_name, update_cal # Consider accumulating page count in the caller if needed. update_callback(number_of_pages=num_chunks_final) - # Save chunks, prepending the header to each + # Save chunks, prepending the header to each — use batch processing for speed + all_chunks = [] for idx, chunk_rows_content in enumerate(final_chunks_content, start=1): - # Prepend header - header length does not count towards chunk size limit chunk_with_header = header_string + chunk_rows_content - - update_callback( - current_file_chunk=idx, - status=f"Saving chunk {idx}/{num_chunks_final} from {file_name}..." - ) - - args = { + all_chunks.append({ "page_text_content": chunk_with_header, "page_number": idx, - "file_name": file_name, - "user_id": user_id, - "document_id": document_id - } + "file_name": file_name + }) - if is_public_workspace: - args["public_workspace_id"] = public_workspace_id - elif is_group: - args["group_id"] = group_id + if all_chunks: + update_callback( + current_file_chunk=1, + status=f"Batch processing {num_chunks_final} chunks from {file_name}..." + ) - token_usage = save_chunks(**args) - total_chunks_saved += 1 - - # Accumulate embedding tokens - if token_usage: - total_embedding_tokens += token_usage.get('total_tokens', 0) - if not embedding_model_name: - embedding_model_name = token_usage.get('model_deployment_name') + batch_token_usage = save_chunks_batch( + all_chunks, user_id, document_id, + group_id=group_id, public_workspace_id=public_workspace_id + ) + total_chunks_saved = len(all_chunks) + if batch_token_usage: + total_embedding_tokens = batch_token_usage.get('total_tokens', 0) + embedding_model_name = batch_token_usage.get('model_deployment_name') return total_chunks_saved, total_embedding_tokens, embedding_model_name @@ -4729,63 +4907,75 @@ def process_tabular(document_id, user_id, temp_file_path, original_filename, fil args["group_id"] = group_id upload_to_blob(**args) + update_callback(enhanced_citations=True, status=f"Enhanced citations enabled for {file_ext}") - try: - if file_ext == '.csv': - # Process CSV - # Read CSV, attempt to infer header, keep data as string initially - df = pandas.read_csv( - temp_file_path, - keep_default_na=False, - dtype=str + # When enhanced citations is on, index a single schema summary chunk + # instead of row-by-row chunking. The tabular processing plugin handles analysis. + if enable_enhanced_citations: + try: + if file_ext == '.csv': + df_preview = pandas.read_csv(temp_file_path, keep_default_na=False, dtype=str, nrows=5) + full_df = pandas.read_csv(temp_file_path, keep_default_na=False, dtype=str) + elif file_ext in ('.xlsx', '.xls', '.xlsm'): + engine = 'openpyxl' if file_ext in ('.xlsx', '.xlsm') else 'xlrd' + df_preview = pandas.read_excel(temp_file_path, engine=engine, keep_default_na=False, dtype=str, nrows=5) + full_df = pandas.read_excel(temp_file_path, engine=engine, keep_default_na=False, dtype=str) + else: + raise ValueError(f"Unsupported tabular file type: {file_ext}") + + row_count = len(full_df) + columns = list(df_preview.columns) + preview_rows = df_preview.head(5).to_string(index=False) + + schema_summary = ( + f"Tabular data file: {original_filename}\n" + f"Columns ({len(columns)}): {', '.join(columns)}\n" + f"Total rows: {row_count}\n" + f"Preview (first 5 rows):\n{preview_rows}\n\n" + f"This file is available for detailed analysis via the Tabular Processing plugin." ) - args = { - "df": df, - "document_id": document_id, - "user_id": user_id, + + update_callback(number_of_pages=1, status=f"Indexing schema summary for {original_filename}...") + + save_args = { + "page_text_content": schema_summary, + "page_number": 1, "file_name": original_filename, - "update_callback": update_callback + "user_id": user_id, + "document_id": document_id } - if is_public_workspace: - args["public_workspace_id"] = public_workspace_id + save_args["public_workspace_id"] = public_workspace_id elif is_group: - args["group_id"] = group_id + save_args["group_id"] = group_id - result = process_single_tabular_sheet(**args) - if isinstance(result, tuple) and len(result) == 3: - chunks, tokens, model = result - total_chunks_saved = chunks - total_embedding_tokens += tokens - if not embedding_model_name: - embedding_model_name = model - else: - total_chunks_saved = result - - elif file_ext in ('.xlsx', '.xls', '.xlsm'): - # Process Excel (potentially multiple sheets) - excel_file = pandas.ExcelFile( - temp_file_path, - engine='openpyxl' if file_ext in ('.xlsx', '.xlsm') else 'xlrd' - ) - sheet_names = excel_file.sheet_names - base_name, ext = os.path.splitext(original_filename) - - accumulated_total_chunks = 0 - for sheet_name in sheet_names: - update_callback(status=f"Processing sheet '{sheet_name}'...") - # Read specific sheet, get values (not formulas), keep data as string - # Note: pandas typically reads values, not formulas by default. - df = excel_file.parse(sheet_name, keep_default_na=False, dtype=str) + token_usage = save_chunks(**save_args) + total_chunks_saved = 1 + if token_usage: + total_embedding_tokens = token_usage.get('total_tokens', 0) + embedding_model_name = token_usage.get('model_deployment_name') - # Create effective filename for this sheet - effective_filename = f"{base_name}-{sheet_name}{ext}" if len(sheet_names) > 1 else original_filename + # Don't return here — fall through to metadata extraction below + except Exception as e: + log_event(f"[process_tabular] Error creating schema summary, falling back to row-by-row: {e}", level=logging.WARNING) + # Fall through to existing row-by-row processing + # Only do row-by-row chunking if schema-only didn't produce chunks + if total_chunks_saved == 0: + try: + if file_ext == '.csv': + # Process CSV + # Read CSV, attempt to infer header, keep data as string initially + df = pandas.read_csv( + temp_file_path, + keep_default_na=False, + dtype=str + ) args = { "df": df, "document_id": document_id, "user_id": user_id, - "file_name": effective_filename, + "file_name": original_filename, "update_callback": update_callback } @@ -4797,21 +4987,62 @@ def process_tabular(document_id, user_id, temp_file_path, original_filename, fil result = process_single_tabular_sheet(**args) if isinstance(result, tuple) and len(result) == 3: chunks, tokens, model = result - accumulated_total_chunks += chunks + total_chunks_saved = chunks total_embedding_tokens += tokens if not embedding_model_name: embedding_model_name = model else: - accumulated_total_chunks += result + total_chunks_saved = result - total_chunks_saved = accumulated_total_chunks # Total across all sheets + elif file_ext in ('.xlsx', '.xls', '.xlsm'): + # Process Excel (potentially multiple sheets) + excel_file = pandas.ExcelFile( + temp_file_path, + engine='openpyxl' if file_ext in ('.xlsx', '.xlsm') else 'xlrd' + ) + sheet_names = excel_file.sheet_names + base_name, ext = os.path.splitext(original_filename) + accumulated_total_chunks = 0 + for sheet_name in sheet_names: + update_callback(status=f"Processing sheet '{sheet_name}'...") + # Read specific sheet, get values (not formulas), keep data as string + # Note: pandas typically reads values, not formulas by default. + df = excel_file.parse(sheet_name, keep_default_na=False, dtype=str) - except pandas.errors.EmptyDataError: - print(f"Warning: Tabular file or sheet is empty: {original_filename}") - update_callback(status=f"Warning: File/sheet is empty - {original_filename}", number_of_pages=0) - except Exception as e: - raise Exception(f"Failed processing Tabular file {original_filename}: {e}") + # Create effective filename for this sheet + effective_filename = f"{base_name}-{sheet_name}{ext}" if len(sheet_names) > 1 else original_filename + + args = { + "df": df, + "document_id": document_id, + "user_id": user_id, + "file_name": effective_filename, + "update_callback": update_callback + } + + if is_public_workspace: + args["public_workspace_id"] = public_workspace_id + elif is_group: + args["group_id"] = group_id + + result = process_single_tabular_sheet(**args) + if isinstance(result, tuple) and len(result) == 3: + chunks, tokens, model = result + accumulated_total_chunks += chunks + total_embedding_tokens += tokens + if not embedding_model_name: + embedding_model_name = model + else: + accumulated_total_chunks += result + + total_chunks_saved = accumulated_total_chunks # Total across all sheets + + except pandas.errors.EmptyDataError: + log_event(f"[process_tabular] Warning: Tabular file or sheet is empty: {original_filename}", level=logging.WARNING) + update_callback(status=f"Warning: File/sheet is empty - {original_filename}", number_of_pages=0) + except Exception as e: + raise Exception(f"Failed processing Tabular file {original_filename}: {e}") # Extract metadata if enabled and chunks were processed settings = get_settings() diff --git a/application/single_app/functions_settings.py b/application/single_app/functions_settings.py index 4164fab4..f3dc59de 100644 --- a/application/single_app/functions_settings.py +++ b/application/single_app/functions_settings.py @@ -25,6 +25,7 @@ def get_settings(use_cosmos=False): 'enable_text_plugin': True, 'enable_default_embedding_model_plugin': False, 'enable_fact_memory_plugin': True, + 'enable_tabular_processing_plugin': False, 'enable_multi_agent_orchestration': False, 'max_rounds_per_agent': 1, 'enable_semantic_kernel': False, @@ -394,6 +395,9 @@ def update_settings(new_settings): # always fetch the latest settings doc, which includes your merges settings_item = get_settings() settings_item.update(new_settings) + # Dependency enforcement: tabular processing requires enhanced citations + if not settings_item.get('enable_enhanced_citations', False): + settings_item['enable_tabular_processing_plugin'] = False cosmos_settings_container.upsert_item(settings_item) cache_updater = getattr(app_settings_cache, "update_settings_cache", None) if callable(cache_updater): diff --git a/application/single_app/route_backend_chats.py b/application/single_app/route_backend_chats.py index abd7a9f4..8154b7d4 100644 --- a/application/single_app/route_backend_chats.py +++ b/application/single_app/route_backend_chats.py @@ -40,6 +40,185 @@ def get_kernel_agents(): log_event(f"[SKChat] get_kernel_agents - g.kernel_agents: {type(g_agents)} ({len(g_agents) if g_agents else 0} agents), builtins.kernel_agents: {type(builtins_agents)} ({len(builtins_agents) if builtins_agents else 0} agents)", level=logging.INFO) return g_agents or builtins_agents +async def run_tabular_sk_analysis(user_question, tabular_filenames, user_id, + conversation_id, gpt_model, settings, + source_hint="workspace", group_id=None, + public_workspace_id=None): + """Run lightweight SK with TabularProcessingPlugin to analyze tabular data. + + Creates a temporary Kernel with only the TabularProcessingPlugin, uses the + same chat model as the user's session, and returns computed analysis results. + Returns None on failure for graceful degradation. + """ + from semantic_kernel import Kernel as SKKernel + from semantic_kernel.connectors.ai.open_ai import AzureChatCompletion + from semantic_kernel.connectors.ai.function_choice_behavior import FunctionChoiceBehavior + from semantic_kernel.connectors.ai.open_ai.prompt_execution_settings.azure_chat_prompt_execution_settings import AzureChatPromptExecutionSettings + from semantic_kernel.contents.chat_history import ChatHistory as SKChatHistory + from semantic_kernel_plugins.tabular_processing_plugin import TabularProcessingPlugin + + try: + log_event(f"[Tabular SK Analysis] Starting analysis for files: {tabular_filenames}", level=logging.INFO) + + # 1. Create lightweight kernel with only tabular plugin + kernel = SKKernel() + tabular_plugin = TabularProcessingPlugin() + kernel.add_plugin(tabular_plugin, plugin_name="tabular_processing") + + # 2. Create chat service using same config as main chat + enable_gpt_apim = settings.get('enable_gpt_apim', False) + if enable_gpt_apim: + chat_service = AzureChatCompletion( + service_id="tabular-analysis", + deployment_name=gpt_model, + endpoint=settings.get('azure_apim_gpt_endpoint'), + api_key=settings.get('azure_apim_gpt_subscription_key'), + api_version=settings.get('azure_apim_gpt_api_version'), + ) + else: + auth_type = settings.get('azure_openai_gpt_authentication_type') + if auth_type == 'managed_identity': + token_provider = get_bearer_token_provider(DefaultAzureCredential(), cognitive_services_scope) + chat_service = AzureChatCompletion( + service_id="tabular-analysis", + deployment_name=gpt_model, + endpoint=settings.get('azure_openai_gpt_endpoint'), + api_version=settings.get('azure_openai_gpt_api_version'), + ad_token_provider=token_provider, + ) + else: + chat_service = AzureChatCompletion( + service_id="tabular-analysis", + deployment_name=gpt_model, + endpoint=settings.get('azure_openai_gpt_endpoint'), + api_key=settings.get('azure_openai_gpt_key'), + api_version=settings.get('azure_openai_gpt_api_version'), + ) + kernel.add_service(chat_service) + + # 3. Pre-dispatch: load file schemas to eliminate discovery LLM rounds + source_context = f"source='{source_hint}'" + if group_id: + source_context += f", group_id='{group_id}'" + if public_workspace_id: + source_context += f", public_workspace_id='{public_workspace_id}'" + + schema_parts = [] + for fname in tabular_filenames: + try: + container, blob_path = tabular_plugin._resolve_blob_location_with_fallback( + user_id, conversation_id, fname, source_hint, + group_id=group_id, public_workspace_id=public_workspace_id + ) + df = tabular_plugin._read_tabular_blob_to_dataframe(container, blob_path) + df_numeric = tabular_plugin._try_numeric_conversion(df.copy()) + schema_info = { + "filename": fname, + "row_count": len(df), + "columns": list(df.columns), + "dtypes": {col: str(dtype) for col, dtype in df_numeric.dtypes.items()}, + "preview": df.head(3).to_dict(orient='records') + } + schema_parts.append(json.dumps(schema_info, indent=2, default=str)) + log_event(f"[Tabular SK Analysis] Pre-loaded schema for {fname} ({len(df)} rows)", level=logging.DEBUG) + except Exception as e: + log_event(f"[Tabular SK Analysis] Failed to pre-load schema for {fname}: {e}", level=logging.WARNING) + schema_parts.append(json.dumps({"filename": fname, "error": f"Could not pre-load: {str(e)}"})) + + schema_context = "\n".join(schema_parts) + + # 4. Build chat history with pre-loaded schemas + chat_history = SKChatHistory() + chat_history.add_system_message( + "You are a data analyst. Use the tabular_processing plugin functions to " + "analyze the data and answer the user's question.\n\n" + f"FILE SCHEMAS (pre-loaded — do NOT call list_tabular_files or describe_tabular_file):\n" + f"{schema_context}\n\n" + "IMPORTANT: Batch multiple independent function calls in a SINGLE response. " + "For example, call multiple aggregate_column or group_by_aggregate functions " + "at once rather than one at a time.\n\n" + "Return the computed results clearly." + ) + + chat_history.add_user_message( + f"Analyze the tabular data to answer: {user_question}\n" + f"Use user_id='{user_id}', conversation_id='{conversation_id}', {source_context}." + ) + + # 5. Execute with auto function calling + execution_settings = AzureChatPromptExecutionSettings( + service_id="tabular-analysis", + function_choice_behavior=FunctionChoiceBehavior.Auto( + maximum_auto_invoke_attempts=5 + ), + ) + + result = await chat_service.get_chat_message_contents( + chat_history, execution_settings, kernel=kernel + ) + + if result and result[0].content: + analysis = result[0].content + # Cap at 20k characters to stay within token budget + if len(analysis) > 20000: + analysis = analysis[:20000] + "\n[Analysis truncated]" + log_event(f"[Tabular SK Analysis] Analysis complete, {len(analysis)} chars", level=logging.INFO) + return analysis + log_event("[Tabular SK Analysis] No content in SK response", level=logging.WARNING) + return None + + except Exception as e: + log_event(f"[Tabular SK Analysis] Error: {e}", level=logging.WARNING, exceptionTraceback=True) + return None + +def collect_tabular_sk_citations(user_id, conversation_id): + """Collect plugin invocations from the tabular SK analysis and convert to citation format.""" + from semantic_kernel_plugins.plugin_invocation_logger import get_plugin_logger + + plugin_logger = get_plugin_logger() + plugin_invocations = plugin_logger.get_invocations_for_conversation(user_id, conversation_id) + + if not plugin_invocations: + return [] + + def make_json_serializable(obj): + if obj is None: + return None + elif isinstance(obj, (str, int, float, bool)): + return obj + elif isinstance(obj, dict): + return {str(k): make_json_serializable(v) for k, v in obj.items()} + elif isinstance(obj, (list, tuple)): + return [make_json_serializable(item) for item in obj] + else: + return str(obj) + + citations = [] + for inv in plugin_invocations: + timestamp_str = None + if inv.timestamp: + if hasattr(inv.timestamp, 'isoformat'): + timestamp_str = inv.timestamp.isoformat() + else: + timestamp_str = str(inv.timestamp) + + citation = { + 'tool_name': f"{inv.plugin_name}.{inv.function_name}", + 'function_name': inv.function_name, + 'plugin_name': inv.plugin_name, + 'function_arguments': make_json_serializable(inv.parameters), + 'function_result': make_json_serializable(inv.result), + 'duration_ms': inv.duration_ms, + 'timestamp': timestamp_str, + 'success': inv.success, + 'error_message': make_json_serializable(inv.error_message), + 'user_id': inv.user_id + } + citations.append(citation) + + log_event(f"[Tabular SK Citations] Collected {len(citations)} tool execution citations", level=logging.INFO) + return citations + def register_route_backend_chats(app): @app.route('/api/chat', methods=['POST']) @swagger_route(security=get_auth_security()) @@ -970,6 +1149,70 @@ def result_requires_message_reload(result: Any) -> bool: 'documents': combined_documents # Keep track of docs used }) + # Auto-detect tabular files in search results and prompt the LLM to use the plugin + if settings.get('enable_tabular_processing_plugin', False) and settings.get('enable_enhanced_citations', False): + tabular_files_in_results = set() + for source_doc in combined_documents: + fname = source_doc.get('file_name', '') + if fname and any(fname.lower().endswith(ext) for ext in TABULAR_EXTENSIONS): + tabular_files_in_results.add(fname) + + if tabular_files_in_results: + # Determine source based on document_scope, not just active IDs + if document_scope == 'group' and active_group_id: + tabular_source_hint = "group" + elif document_scope == 'public' and active_public_workspace_id: + tabular_source_hint = "public" + else: + tabular_source_hint = "workspace" + + tabular_filenames_str = ", ".join(tabular_files_in_results) + + # Run SK tabular analysis to pre-compute results + tabular_analysis = asyncio.run(run_tabular_sk_analysis( + user_question=user_message, + tabular_filenames=tabular_files_in_results, + user_id=user_id, + conversation_id=conversation_id, + gpt_model=gpt_model, + settings=settings, + source_hint=tabular_source_hint, + group_id=active_group_id if tabular_source_hint == "group" else None, + public_workspace_id=active_public_workspace_id if tabular_source_hint == "public" else None, + )) + + if tabular_analysis: + # Inject pre-computed analysis results as context + tabular_system_msg = ( + f"The following analysis was computed from the tabular file(s) " + f"{tabular_filenames_str} using data analysis functions:\n\n" + f"{tabular_analysis}\n\n" + f"Use these computed results to answer the user's question accurately." + ) + else: + # Fallback: instruct LLM to use plugin functions (for agent mode) + tabular_system_msg = ( + f"IMPORTANT: The search results include data from tabular file(s): {tabular_filenames_str}. " + f"The search results contain only a schema summary (column names and a few sample rows), NOT the full data. " + f"You MUST use the tabular_processing plugin functions to answer ANY question about these files. " + f"Do NOT attempt to answer using the schema summary alone — it is incomplete. " + f"Available functions: describe_tabular_file, aggregate_column, filter_rows, query_tabular_data, group_by_aggregate. " + f"Use source='{tabular_source_hint}'" + + (f" and group_id='{active_group_id}'" if tabular_source_hint == "group" else "") + + (f" and public_workspace_id='{active_public_workspace_id}'" if tabular_source_hint == "public" else "") + + "." + ) + system_messages_for_augmentation.append({ + 'role': 'system', + 'content': tabular_system_msg + }) + + # Collect tool execution citations from SK tabular analysis + if tabular_analysis: + tabular_sk_citations = collect_tabular_sk_citations(user_id, conversation_id) + if tabular_sk_citations: + agent_citations_list.extend(tabular_sk_citations) + # Loop through each source document/chunk used for this message for source_doc in combined_documents: # 4. Create a citation dictionary, selecting the desired fields @@ -1155,8 +1398,8 @@ def result_requires_message_reload(result: Any) -> bool: """ # Update the system message with enhanced content and updated documents array if system_messages_for_augmentation: - system_messages_for_augmentation[-1]['content'] = system_prompt_search - system_messages_for_augmentation[-1]['documents'] = combined_documents + system_messages_for_augmentation[0]['content'] = system_prompt_search + system_messages_for_augmentation[0]['documents'] = combined_documents # --- END NEW METADATA CITATIONS --- # Update conversation classifications if new ones were found @@ -1670,6 +1913,7 @@ def result_requires_message_reload(result: Any) -> bool: allowed_roles_in_history = ['user', 'assistant'] # Add 'system' if you PERSIST general system messages not related to augmentation max_file_content_length_in_history = 50000 # Increased limit for all file content in history max_tabular_content_length_in_history = 50000 # Same limit for tabular data consistency + chat_tabular_files = set() # Track tabular files uploaded directly to chat for message in recent_messages: role = message.get('role') @@ -1705,25 +1949,38 @@ def result_requires_message_reload(result: Any) -> bool: filename = message.get('filename', 'uploaded_file') file_content = message.get('file_content', '') # Assuming file content is stored is_table = message.get('is_table', False) - - # Use higher limit for tabular data that needs complete analysis - content_limit = max_tabular_content_length_in_history if is_table else max_file_content_length_in_history - - display_content = file_content[:content_limit] - if len(file_content) > content_limit: - display_content += "..." - - # Enhanced message for tabular data - if is_table: + file_content_source = message.get('file_content_source', '') + + # Tabular files stored in blob (enhanced citations enabled) - reference plugin + if is_table and file_content_source == 'blob': + chat_tabular_files.add(filename) # Track for mini SK analysis conversation_history_for_api.append({ - 'role': 'system', # Represent file as system info - 'content': f"[User uploaded a tabular data file named '{filename}'. This is CSV format data for analysis:\n{display_content}]\nThis is complete tabular data in CSV format. You can perform calculations, analysis, and data operations on this dataset." + 'role': 'system', + 'content': f"[User uploaded a tabular data file named '{filename}'. " + f"The file is stored in blob storage and available for analysis. " + f"Use the tabular_processing plugin functions (list_tabular_files, describe_tabular_file, " + f"aggregate_column, filter_rows, query_tabular_data, group_by_aggregate) to analyze this data. " + f"The file source is 'chat'.]" }) else: - conversation_history_for_api.append({ - 'role': 'system', # Represent file as system info - 'content': f"[User uploaded a file named '{filename}'. Content preview:\n{display_content}]\nUse this file context if relevant." - }) + # Use higher limit for tabular data that needs complete analysis + content_limit = max_tabular_content_length_in_history if is_table else max_file_content_length_in_history + + display_content = file_content[:content_limit] + if len(file_content) > content_limit: + display_content += "..." + + # Enhanced message for tabular data + if is_table: + conversation_history_for_api.append({ + 'role': 'system', # Represent file as system info + 'content': f"[User uploaded a tabular data file named '{filename}'. This is CSV format data for analysis:\n{display_content}]\nThis is complete tabular data in CSV format. You can perform calculations, analysis, and data operations on this dataset." + }) + else: + conversation_history_for_api.append({ + 'role': 'system', # Represent file as system info + 'content': f"[User uploaded a file named '{filename}'. Content preview:\n{display_content}]\nUse this file context if relevant." + }) elif role == 'image': # Handle image uploads with extracted text and vision analysis filename = message.get('filename', 'uploaded_image') is_user_upload = message.get('metadata', {}).get('is_user_upload', False) @@ -1787,6 +2044,45 @@ def result_requires_message_reload(result: Any) -> bool: # Ignored roles: 'safety', 'blocked', 'system' (if they are only for augmentation/summary) + # --- Mini SK analysis for tabular files uploaded directly to chat --- + if chat_tabular_files and settings.get('enable_tabular_processing_plugin', False) and settings.get('enable_enhanced_citations', False): + chat_tabular_filenames_str = ", ".join(chat_tabular_files) + log_event( + f"[Chat Tabular SK] Detected {len(chat_tabular_files)} tabular file(s) uploaded to chat: {chat_tabular_filenames_str}", + level=logging.INFO + ) + + chat_tabular_analysis = asyncio.run(run_tabular_sk_analysis( + user_question=user_message, + tabular_filenames=chat_tabular_files, + user_id=user_id, + conversation_id=conversation_id, + gpt_model=gpt_model, + settings=settings, + source_hint="chat", + )) + + if chat_tabular_analysis: + # Inject pre-computed analysis results as context + conversation_history_for_api.append({ + 'role': 'system', + 'content': ( + f"The following analysis was computed from the chat-uploaded tabular file(s) " + f"{chat_tabular_filenames_str} using data analysis functions:\n\n" + f"{chat_tabular_analysis}\n\n" + f"Use these computed results to answer the user's question accurately." + ) + }) + + # Collect tool execution citations from SK tabular analysis + chat_tabular_sk_citations = collect_tabular_sk_citations(user_id, conversation_id) + if chat_tabular_sk_citations: + agent_citations_list.extend(chat_tabular_sk_citations) + + debug_print(f"[Chat Tabular SK] Analysis injected, {len(chat_tabular_analysis)} chars") + else: + debug_print("[Chat Tabular SK] Analysis returned None, relying on existing file context messages") + # Ensure the very last message is the current user's message (it should be if fetched correctly) if not conversation_history_for_api or conversation_history_for_api[-1]['role'] != 'user': debug_print("Warning: Last message in history is not the user's current message. Appending.") @@ -3390,7 +3686,55 @@ def emit_thought(step_type, content, detail=None): 'content': system_prompt_search, 'documents': combined_documents }) - + + # Auto-detect tabular files in search results and run SK analysis + if settings.get('enable_tabular_processing_plugin', False) and settings.get('enable_enhanced_citations', False): + tabular_files_in_results = set() + for source_doc in combined_documents: + fname = source_doc.get('file_name', '') + if fname and any(fname.lower().endswith(ext) for ext in TABULAR_EXTENSIONS): + tabular_files_in_results.add(fname) + + if tabular_files_in_results: + # Determine source based on document_scope, not just active IDs + if document_scope == 'group' and active_group_id: + tabular_source_hint = "group" + elif document_scope == 'public' and active_public_workspace_id: + tabular_source_hint = "public" + else: + tabular_source_hint = "workspace" + + tabular_filenames_str = ", ".join(tabular_files_in_results) + + # Run SK tabular analysis to pre-compute results + tabular_analysis = asyncio.run(run_tabular_sk_analysis( + user_question=user_message, + tabular_filenames=tabular_files_in_results, + user_id=user_id, + conversation_id=conversation_id, + gpt_model=gpt_model, + settings=settings, + source_hint=tabular_source_hint, + group_id=active_group_id if tabular_source_hint == "group" else None, + public_workspace_id=active_public_workspace_id if tabular_source_hint == "public" else None, + )) + + if tabular_analysis: + system_messages_for_augmentation.append({ + 'role': 'system', + 'content': ( + f"The following analysis was computed from the tabular file(s) " + f"{tabular_filenames_str} using data analysis functions:\n\n" + f"{tabular_analysis}\n\n" + f"Use these computed results to answer the user's question accurately." + ) + }) + + # Collect tool execution citations from SK tabular analysis + tabular_sk_citations = collect_tabular_sk_citations(user_id, conversation_id) + if tabular_sk_citations: + agent_citations_list.extend(tabular_sk_citations) + # Reorder hybrid citations list in descending order based on page_number hybrid_citations_list.sort(key=lambda x: x.get('page_number', 0), reverse=True) @@ -3455,15 +3799,108 @@ def emit_thought(step_type, content, detail=None): 'content': aug_msg['content'] }) - # Add recent messages + # Add recent messages (with file role handling) allowed_roles_in_history = ['user', 'assistant'] + max_file_content_length_in_history = 50000 + max_tabular_content_length_in_history = 50000 + chat_tabular_files = set() # Track tabular files uploaded directly to chat + for message in recent_messages: - if message.get('role') in allowed_roles_in_history: + role = message.get('role') + content = message.get('content', '') + + if role in allowed_roles_in_history: conversation_history_for_api.append({ - 'role': message['role'], - 'content': message.get('content', '') + 'role': role, + 'content': content }) - + elif role == 'file': + filename = message.get('filename', 'uploaded_file') + file_content = message.get('file_content', '') + is_table = message.get('is_table', False) + file_content_source = message.get('file_content_source', '') + + # Tabular files stored in blob - track for mini SK analysis + if is_table and file_content_source == 'blob': + chat_tabular_files.add(filename) + conversation_history_for_api.append({ + 'role': 'system', + 'content': ( + f"[User uploaded a tabular data file named '{filename}'. " + f"The file is stored in blob storage and available for analysis. " + f"Use the tabular_processing plugin functions (list_tabular_files, " + f"describe_tabular_file, aggregate_column, filter_rows, " + f"query_tabular_data, group_by_aggregate) to analyze this data. " + f"The file source is 'chat'.]" + ) + }) + else: + content_limit = ( + max_tabular_content_length_in_history if is_table + else max_file_content_length_in_history + ) + display_content = file_content[:content_limit] + if len(file_content) > content_limit: + display_content += "..." + + if is_table: + conversation_history_for_api.append({ + 'role': 'system', + 'content': ( + f"[User uploaded a tabular data file named '{filename}'. " + f"This is CSV format data for analysis:\n{display_content}]\n" + f"This is complete tabular data in CSV format. You can perform " + f"calculations, analysis, and data operations on this dataset." + ) + }) + else: + conversation_history_for_api.append({ + 'role': 'system', + 'content': ( + f"[User uploaded a file named '{filename}'. " + f"Content preview:\n{display_content}]\n" + f"Use this file context if relevant." + ) + }) + + # --- Mini SK analysis for tabular files uploaded directly to chat --- + if chat_tabular_files and settings.get('enable_tabular_processing_plugin', False) and settings.get('enable_enhanced_citations', False): + chat_tabular_filenames_str = ", ".join(chat_tabular_files) + log_event( + f"[Chat Tabular SK] Streaming: Detected {len(chat_tabular_files)} tabular file(s) uploaded to chat: {chat_tabular_filenames_str}", + level=logging.INFO + ) + + chat_tabular_analysis = asyncio.run(run_tabular_sk_analysis( + user_question=user_message, + tabular_filenames=chat_tabular_files, + user_id=user_id, + conversation_id=conversation_id, + gpt_model=gpt_model, + settings=settings, + source_hint="chat", + )) + + if chat_tabular_analysis: + conversation_history_for_api.append({ + 'role': 'system', + 'content': ( + f"The following analysis was computed from the chat-uploaded tabular file(s) " + f"{chat_tabular_filenames_str} using data analysis functions:\n\n" + f"{chat_tabular_analysis}\n\n" + f"Use these computed results to answer the user's question accurately." + ) + }) + + # Collect tool execution citations + chat_tabular_sk_citations = collect_tabular_sk_citations(user_id, conversation_id) + if chat_tabular_sk_citations: + agent_citations_list.extend(chat_tabular_sk_citations) + + debug_print(f"[Chat Tabular SK] Streaming: Analysis injected, {len(chat_tabular_analysis)} chars") + else: + debug_print("[Chat Tabular SK] Streaming: Analysis returned None, relying on existing file context") + except Exception as e: yield f"data: {json.dumps({'error': f'History error: {str(e)}'})}\n\n" return @@ -3613,7 +4050,6 @@ async def stream_agent_async(): return chunks, usage_data # Execute async streaming - import asyncio try: # Try to get existing event loop loop = asyncio.get_event_loop() diff --git a/application/single_app/route_backend_documents.py b/application/single_app/route_backend_documents.py index 28d4ef69..0e9d490b 100644 --- a/application/single_app/route_backend_documents.py +++ b/application/single_app/route_backend_documents.py @@ -7,6 +7,7 @@ from utils_cache import invalidate_personal_search_cache from functions_debug import * from functions_activity_logging import log_document_upload, log_document_metadata_update_transaction +import io import os import requests from flask import current_app @@ -72,7 +73,58 @@ def get_file_content(): filename = items_sorted[0].get('filename', 'Untitled') is_table = items_sorted[0].get('is_table', False) - debug_print(f"[GET_FILE_CONTENT] Filename: {filename}, is_table: {is_table}") + file_content_source = items_sorted[0].get('file_content_source', '') + debug_print(f"[GET_FILE_CONTENT] Filename: {filename}, is_table: {is_table}, source: {file_content_source}") + + # Handle blob-stored tabular files (enhanced citations enabled) + if file_content_source == 'blob': + blob_container = items_sorted[0].get('blob_container', '') + blob_path = items_sorted[0].get('blob_path', '') + debug_print(f"[GET_FILE_CONTENT] Blob-stored file: container={blob_container}, path={blob_path}") + + if not blob_container or not blob_path: + return jsonify({'error': 'Blob storage reference is incomplete'}), 500 + + try: + blob_service_client = CLIENTS.get("storage_account_office_docs_client") + if not blob_service_client: + return jsonify({'error': 'Blob storage client not available'}), 500 + + blob_client = blob_service_client.get_blob_client( + container=blob_container, + blob=blob_path + ) + stream = blob_client.download_blob() + blob_data = stream.readall() + + # Convert to CSV using pandas for display + file_ext = os.path.splitext(filename)[1].lower() + if file_ext == '.csv': + import pandas + df = pandas.read_csv(io.BytesIO(blob_data)) + combined_content = df.to_csv(index=False) + elif file_ext in ['.xlsx', '.xlsm']: + import pandas + df = pandas.read_excel(io.BytesIO(blob_data), engine='openpyxl') + combined_content = df.to_csv(index=False) + elif file_ext == '.xls': + import pandas + df = pandas.read_excel(io.BytesIO(blob_data), engine='xlrd') + combined_content = df.to_csv(index=False) + else: + combined_content = blob_data.decode('utf-8', errors='replace') + + debug_print(f"[GET_FILE_CONTENT] Successfully read blob content, length: {len(combined_content)}") + return jsonify({ + 'file_content': combined_content, + 'filename': filename, + 'is_table': is_table, + 'file_content_source': 'blob' + }), 200 + + except Exception as blob_err: + debug_print(f"[GET_FILE_CONTENT] Error reading from blob: {blob_err}") + return jsonify({'error': f'Error reading file from storage: {str(blob_err)}'}), 500 add_file_task_to_file_processing_log(document_id=file_id, user_id=user_id, content="Combining file content from chunks, filename: " + filename + ", is_table: " + str(is_table)) combined_parts = [] diff --git a/application/single_app/route_backend_plugins.py b/application/single_app/route_backend_plugins.py index 63c7854e..f4d4dca0 100644 --- a/application/single_app/route_backend_plugins.py +++ b/application/single_app/route_backend_plugins.py @@ -624,6 +624,8 @@ def get_core_plugin_settings(): 'enable_text_plugin': bool(settings.get('enable_text_plugin', True)), 'enable_default_embedding_model_plugin': bool(settings.get('enable_default_embedding_model_plugin', True)), 'enable_fact_memory_plugin': bool(settings.get('enable_fact_memory_plugin', True)), + 'enable_tabular_processing_plugin': bool(settings.get('enable_tabular_processing_plugin', False)), + 'enable_enhanced_citations': bool(settings.get('enable_enhanced_citations', False)), 'enable_semantic_kernel': bool(settings.get('enable_semantic_kernel', False)), 'allow_user_plugins': bool(settings.get('allow_user_plugins', True)), 'allow_group_plugins': bool(settings.get('allow_group_plugins', True)), @@ -646,6 +648,7 @@ def update_core_plugin_settings(): 'enable_text_plugin', 'enable_default_embedding_model_plugin', 'enable_fact_memory_plugin', + 'enable_tabular_processing_plugin', 'allow_user_plugins', 'allow_group_plugins' ] @@ -663,6 +666,11 @@ def update_core_plugin_settings(): return jsonify({'error': f"Field '{key}' must be a boolean."}), 400 updates[key] = data[key] logging.info("Validated plugin settings: %s", updates) + # Dependency: tabular processing requires enhanced citations + if updates.get('enable_tabular_processing_plugin', False): + full_settings = get_settings() + if not full_settings.get('enable_enhanced_citations', False): + return jsonify({'error': 'Tabular Processing requires Enhanced Citations to be enabled.'}), 400 # Update settings success = update_settings(updates) if success: diff --git a/application/single_app/route_enhanced_citations.py b/application/single_app/route_enhanced_citations.py index c81ef225..44a35223 100644 --- a/application/single_app/route_enhanced_citations.py +++ b/application/single_app/route_enhanced_citations.py @@ -8,6 +8,7 @@ import requests import mimetypes import io +import pandas from functions_authentication import login_required, user_required, get_current_user_id from functions_settings import get_settings, enabled_required @@ -15,7 +16,7 @@ from functions_group import get_user_groups from functions_public_workspaces import get_user_visible_public_workspace_ids_from_settings from swagger_wrapper import swagger_route, get_auth_security -from config import CLIENTS, storage_account_user_documents_container_name, storage_account_group_documents_container_name, storage_account_public_documents_container_name, IMAGE_EXTENSIONS, VIDEO_EXTENSIONS, AUDIO_EXTENSIONS +from config import CLIENTS, storage_account_user_documents_container_name, storage_account_group_documents_container_name, storage_account_public_documents_container_name, storage_account_personal_chat_container_name, IMAGE_EXTENSIONS, VIDEO_EXTENSIONS, AUDIO_EXTENSIONS, TABULAR_EXTENSIONS, cosmos_messages_container from functions_debug import debug_print def register_enhanced_citations_routes(app): @@ -183,6 +184,189 @@ def get_enhanced_citation_pdf(): except Exception as e: return jsonify({"error": str(e)}), 500 + @app.route("/api/enhanced_citations/tabular", methods=["GET"]) + @swagger_route(security=get_auth_security()) + @login_required + @user_required + @enabled_required("enable_enhanced_citations") + def get_enhanced_citation_tabular(): + """ + Serve original tabular file (CSV, XLSX, etc.) from blob storage for download. + Used for chat-uploaded tabular files stored in blob storage. + """ + conversation_id = request.args.get("conversation_id") + file_id = request.args.get("file_id") + + if not conversation_id or not file_id: + return jsonify({"error": "conversation_id and file_id are required"}), 400 + + user_id = get_current_user_id() + if not user_id: + return jsonify({"error": "User not authenticated"}), 401 + + try: + # Look up the file message in Cosmos to get blob reference + query_str = """ + SELECT * FROM c + WHERE c.conversation_id = @conversation_id + AND c.id = @file_id + """ + items = list(cosmos_messages_container.query_items( + query=query_str, + parameters=[ + {'name': '@conversation_id', 'value': conversation_id}, + {'name': '@file_id', 'value': file_id} + ], + partition_key=conversation_id + )) + + if not items: + return jsonify({"error": "File not found"}), 404 + + file_msg = items[0] + file_content_source = file_msg.get('file_content_source', '') + + if file_content_source != 'blob': + return jsonify({"error": "File is not stored in blob storage"}), 400 + + blob_container = file_msg.get('blob_container', '') + blob_path = file_msg.get('blob_path', '') + filename = file_msg.get('filename', 'download') + + if not blob_container or not blob_path: + return jsonify({"error": "Blob reference is incomplete"}), 500 + + blob_service_client = CLIENTS.get("storage_account_office_docs_client") + if not blob_service_client: + return jsonify({"error": "Storage not available"}), 500 + + blob_client = blob_service_client.get_blob_client( + container=blob_container, + blob=blob_path + ) + stream = blob_client.download_blob() + content = stream.readall() + + # Determine content type + content_type, _ = mimetypes.guess_type(filename) + if not content_type: + content_type = 'application/octet-stream' + + return Response( + content, + content_type=content_type, + headers={ + 'Content-Length': str(len(content)), + 'Content-Disposition': f'attachment; filename="{filename}"', + 'Cache-Control': 'private, max-age=300', + } + ) + + except Exception as e: + debug_print(f"Error serving tabular citation: {e}") + return jsonify({"error": str(e)}), 500 + + @app.route("/api/enhanced_citations/tabular_workspace", methods=["GET"]) + @swagger_route(security=get_auth_security()) + @login_required + @user_required + @enabled_required("enable_enhanced_citations") + def get_enhanced_citation_tabular_workspace(): + """ + Serve tabular file (CSV, XLSX, etc.) from blob storage for workspace documents. + Uses doc_id to look up the document across personal, group, and public workspaces. + """ + doc_id = request.args.get("doc_id") + if not doc_id: + return jsonify({"error": "doc_id is required"}), 400 + + user_id = get_current_user_id() + if not user_id: + return jsonify({"error": "User not authenticated"}), 401 + + try: + doc_response, status_code = get_document(user_id, doc_id) + if status_code != 200: + return doc_response, status_code + + raw_doc = doc_response.get_json() + file_name = raw_doc.get('file_name', '') + ext = file_name.lower().split('.')[-1] if '.' in file_name else '' + + if ext not in ('csv', 'xlsx', 'xls', 'xlsm'): + return jsonify({"error": "File is not a tabular file"}), 400 + + return serve_enhanced_citation_content(raw_doc, force_download=True) + + except Exception as e: + debug_print(f"Error serving tabular workspace citation: {e}") + return jsonify({"error": str(e)}), 500 + + @app.route("/api/enhanced_citations/tabular_preview", methods=["GET"]) + @swagger_route(security=get_auth_security()) + @login_required + @user_required + @enabled_required("enable_enhanced_citations") + def get_enhanced_citation_tabular_preview(): + """ + Return JSON preview of a tabular file for rendering as an HTML table. + Reads the file into a pandas DataFrame and returns columns + rows as JSON. + """ + doc_id = request.args.get("doc_id") + max_rows = min(int(request.args.get("max_rows", 200)), 500) + if not doc_id: + return jsonify({"error": "doc_id is required"}), 400 + + user_id = get_current_user_id() + if not user_id: + return jsonify({"error": "User not authenticated"}), 401 + + try: + doc_response, status_code = get_document(user_id, doc_id) + if status_code != 200: + return doc_response, status_code + + raw_doc = doc_response.get_json() + file_name = raw_doc.get('file_name', '') + ext = file_name.lower().rsplit('.', 1)[-1] if '.' in file_name else '' + if ext not in ('csv', 'xlsx', 'xls', 'xlsm'): + return jsonify({"error": "File is not a tabular file"}), 400 + + # Download blob + workspace_type, container_name = determine_workspace_type_and_container(raw_doc) + blob_name = get_blob_name(raw_doc, workspace_type) + blob_service_client = CLIENTS.get("storage_account_office_docs_client") + if not blob_service_client: + return jsonify({"error": "Blob storage client not available"}), 500 + blob_client = blob_service_client.get_blob_client(container=container_name, blob=blob_name) + data = blob_client.download_blob().readall() + + # Read into DataFrame + if ext == 'csv': + df = pandas.read_csv(io.BytesIO(data), keep_default_na=False, dtype=str) + elif ext in ('xlsx', 'xlsm'): + df = pandas.read_excel(io.BytesIO(data), engine='openpyxl', keep_default_na=False, dtype=str) + elif ext == 'xls': + df = pandas.read_excel(io.BytesIO(data), engine='xlrd', keep_default_na=False, dtype=str) + else: + return jsonify({"error": f"Unsupported file type: {ext}"}), 400 + + total_rows = len(df) + preview = df.head(max_rows) + + return jsonify({ + "filename": file_name, + "total_rows": total_rows, + "total_columns": len(df.columns), + "columns": list(df.columns), + "rows": preview.values.tolist(), + "truncated": total_rows > max_rows + }) + + except Exception as e: + debug_print(f"Error generating tabular preview: {e}") + return jsonify({"error": str(e)}), 500 + def get_document(user_id, doc_id): """ Get document metadata - searches across all enabled workspace types diff --git a/application/single_app/route_frontend_admin_settings.py b/application/single_app/route_frontend_admin_settings.py index 8c49522b..fdd77ce5 100644 --- a/application/single_app/route_frontend_admin_settings.py +++ b/application/single_app/route_frontend_admin_settings.py @@ -98,6 +98,8 @@ def admin_settings(): settings['enable_text_plugin'] = False if 'enable_fact_memory_plugin' not in settings: settings['enable_fact_memory_plugin'] = False + if 'enable_tabular_processing_plugin' not in settings: + settings['enable_tabular_processing_plugin'] = False if 'enable_default_embedding_model_plugin' not in settings: settings['enable_default_embedding_model_plugin'] = False if 'enable_multi_agent_orchestration' not in settings: diff --git a/application/single_app/route_frontend_chats.py b/application/single_app/route_frontend_chats.py index ca0feb1a..67a41879 100644 --- a/application/single_app/route_frontend_chats.py +++ b/application/single_app/route_frontend_chats.py @@ -237,8 +237,33 @@ def upload_file(): # Handle XML, YAML, and LOG files as text for inline chat extracted_content = extract_text_file(temp_file_path) elif file_ext_nodot in TABULAR_EXTENSIONS: - extracted_content = extract_table_file(temp_file_path, file_ext) is_table = True + + # Upload tabular file to blob storage for tabular processing plugin access + if settings.get('enable_enhanced_citations', False): + try: + blob_service_client = CLIENTS.get("storage_account_office_docs_client") + if blob_service_client: + blob_path = f"{user_id}/{conversation_id}/{filename}" + blob_client = blob_service_client.get_blob_client( + container=storage_account_personal_chat_container_name, + blob=blob_path + ) + metadata = { + "conversation_id": str(conversation_id), + "user_id": str(user_id) + } + with open(temp_file_path, "rb") as blob_f: + blob_client.upload_blob(blob_f, overwrite=True, metadata=metadata) + log_event(f"Uploaded chat tabular file to blob storage: {blob_path}") + except Exception as blob_err: + log_event( + f"Warning: Failed to upload chat tabular file to blob storage: {blob_err}", + level=logging.WARNING + ) + else: + # Only extract content for Cosmos storage when enhanced citations is disabled + extracted_content = extract_table_file(temp_file_path, file_ext) else: return jsonify({'error': 'Unsupported file type'}), 400 @@ -395,25 +420,50 @@ def upload_file(): current_thread_id = str(uuid.uuid4()) - file_message = { - 'id': file_message_id, - 'conversation_id': conversation_id, - 'role': 'file', - 'filename': filename, - 'file_content': extracted_content, - 'is_table': is_table, - 'timestamp': datetime.utcnow().isoformat(), - 'model_deployment_name': None, - 'metadata': { - 'thread_info': { - 'thread_id': current_thread_id, - 'previous_thread_id': previous_thread_id, - 'active_thread': True, - 'thread_attempt': 1 + # When enhanced citations is enabled and file is tabular, store a lightweight + # reference without file_content to avoid Cosmos DB size limits. + # The tabular data lives in blob storage and is served from there. + if is_table and settings.get('enable_enhanced_citations', False): + file_message = { + 'id': file_message_id, + 'conversation_id': conversation_id, + 'role': 'file', + 'filename': filename, + 'is_table': is_table, + 'file_content_source': 'blob', + 'blob_container': storage_account_personal_chat_container_name, + 'blob_path': f"{user_id}/{conversation_id}/{filename}", + 'timestamp': datetime.utcnow().isoformat(), + 'model_deployment_name': None, + 'metadata': { + 'thread_info': { + 'thread_id': current_thread_id, + 'previous_thread_id': previous_thread_id, + 'active_thread': True, + 'thread_attempt': 1 + } } } - } - + else: + file_message = { + 'id': file_message_id, + 'conversation_id': conversation_id, + 'role': 'file', + 'filename': filename, + 'file_content': extracted_content, + 'is_table': is_table, + 'timestamp': datetime.utcnow().isoformat(), + 'model_deployment_name': None, + 'metadata': { + 'thread_info': { + 'thread_id': current_thread_id, + 'previous_thread_id': previous_thread_id, + 'active_thread': True, + 'thread_attempt': 1 + } + } + } + # Add vision analysis if available if vision_analysis: file_message['vision_analysis'] = vision_analysis diff --git a/application/single_app/semantic_kernel_loader.py b/application/single_app/semantic_kernel_loader.py index 78f54203..eb9685bc 100644 --- a/application/single_app/semantic_kernel_loader.py +++ b/application/single_app/semantic_kernel_loader.py @@ -19,6 +19,7 @@ from semantic_kernel.functions.kernel_plugin import KernelPlugin from semantic_kernel_plugins.embedding_model_plugin import EmbeddingModelPlugin from semantic_kernel_plugins.fact_memory_plugin import FactMemoryPlugin +from semantic_kernel_plugins.tabular_processing_plugin import TabularProcessingPlugin from functions_settings import get_settings, get_user_settings from foundry_agent_runtime import AzureAIFoundryChatCompletionAgent from functions_appinsights import log_event, get_appinsights_logger @@ -408,6 +409,13 @@ def load_embedding_model_plugin(kernel: Kernel, settings): description="Provides text embedding functions using the configured embedding model." ) +def load_tabular_processing_plugin(kernel: Kernel): + kernel.add_plugin( + TabularProcessingPlugin(), + plugin_name="tabular_processing", + description="Provides data analysis on tabular files (CSV, XLSX) stored in blob storage. Can list files, describe schemas, aggregate columns, filter rows, run queries, and perform group-by operations." + ) + def load_core_plugins_only(kernel: Kernel, settings): """Load only core plugins for model-only conversations without agents.""" debug_print(f"[SK Loader] Loading core plugins only for model-only mode...") @@ -429,6 +437,10 @@ def load_core_plugins_only(kernel: Kernel, settings): load_text_plugin(kernel) log_event("[SK Loader] Loaded Text plugin.", level=logging.INFO) + if settings.get('enable_tabular_processing_plugin', False) and settings.get('enable_enhanced_citations', False): + load_tabular_processing_plugin(kernel) + log_event("[SK Loader] Loaded Tabular Processing plugin.", level=logging.INFO) + # =================== Semantic Kernel Initialization =================== def initialize_semantic_kernel(user_id: str=None, redis_client=None): debug_print(f"[SK Loader] Initializing Semantic Kernel and plugins...") @@ -1013,6 +1025,14 @@ def load_plugins_for_kernel(kernel, plugin_manifests, settings, mode_label="glob except Exception as e: log_event(f"[SK Loader] Failed to load Fact Memory Plugin: {e}", level=logging.WARNING) + # Register Tabular Processing Plugin if enabled (requires enhanced citations) + if settings.get('enable_tabular_processing_plugin', False) and settings.get('enable_enhanced_citations', False): + try: + load_tabular_processing_plugin(kernel) + log_event("[SK Loader] Loaded Tabular Processing plugin.", level=logging.INFO) + except Exception as e: + log_event(f"[SK Loader] Failed to load Tabular Processing plugin: {e}", level=logging.WARNING) + # Conditionally load static embedding model plugin if settings.get('enable_default_embedding_model_plugin', True): try: @@ -1357,7 +1377,11 @@ def load_user_semantic_kernel(kernel: Kernel, settings, user_id: str, redis_clie load_embedding_model_plugin(kernel, settings) print(f"[SK Loader] Loaded Default Embedding Model plugin.") log_event("[SK Loader] Loaded Default Embedding Model plugin.", level=logging.INFO) - + + if settings.get('enable_tabular_processing_plugin', False) and settings.get('enable_enhanced_citations', False): + load_tabular_processing_plugin(kernel) + log_event("[SK Loader] Loaded Tabular Processing plugin.", level=logging.INFO) + # Get selected agent from user settings (this still needs to be in user settings for UI state) user_settings = get_user_settings(user_id).get('settings', {}) selected_agent = user_settings.get('selected_agent') diff --git a/application/single_app/semantic_kernel_plugins/tabular_processing_plugin.py b/application/single_app/semantic_kernel_plugins/tabular_processing_plugin.py new file mode 100644 index 00000000..a525250b --- /dev/null +++ b/application/single_app/semantic_kernel_plugins/tabular_processing_plugin.py @@ -0,0 +1,515 @@ +# tabular_processing_plugin.py +""" +TabularProcessingPlugin for Semantic Kernel: provides data analysis operations +on tabular files (CSV, XLSX, XLS, XLSM) stored in Azure Blob Storage. + +Works with workspace documents (user-documents, group-documents, public-documents) +and chat-uploaded documents (personal-chat container). +""" +import asyncio +import io +import json +import logging +import pandas +from typing import Annotated, Optional, List +from semantic_kernel.functions import kernel_function +from semantic_kernel_plugins.plugin_invocation_logger import plugin_function_logger +from functions_appinsights import log_event +from config import ( + CLIENTS, + TABULAR_EXTENSIONS, + storage_account_user_documents_container_name, + storage_account_personal_chat_container_name, + storage_account_group_documents_container_name, + storage_account_public_documents_container_name, +) + + +class TabularProcessingPlugin: + """Provides data analysis functions on tabular files stored in blob storage.""" + + SUPPORTED_EXTENSIONS = {'.csv', '.xlsx', '.xls', '.xlsm'} + + def __init__(self): + self._df_cache = {} # Per-instance cache: (container, blob_name) -> DataFrame + + def _get_blob_service_client(self): + """Get the blob service client from CLIENTS cache.""" + client = CLIENTS.get("storage_account_office_docs_client") + if not client: + raise RuntimeError("Blob storage client not available. Enhanced citations must be enabled.") + return client + + def _list_tabular_blobs(self, container_name: str, prefix: str) -> List[str]: + """List all tabular file blobs under a given prefix.""" + client = self._get_blob_service_client() + container_client = client.get_container_client(container_name) + blobs = [] + for blob in container_client.list_blobs(name_starts_with=prefix): + name_lower = blob['name'].lower() + if any(name_lower.endswith(ext) for ext in self.SUPPORTED_EXTENSIONS): + blobs.append(blob['name']) + return blobs + + def _read_tabular_blob_to_dataframe(self, container_name: str, blob_name: str) -> pandas.DataFrame: + """Download a blob and read it into a pandas DataFrame. Uses per-instance cache.""" + cache_key = (container_name, blob_name) + if cache_key in self._df_cache: + log_event(f"[TabularProcessingPlugin] Cache hit for {blob_name}", level=logging.DEBUG) + return self._df_cache[cache_key].copy() + + client = self._get_blob_service_client() + blob_client = client.get_blob_client(container=container_name, blob=blob_name) + stream = blob_client.download_blob() + data = stream.readall() + + name_lower = blob_name.lower() + if name_lower.endswith('.csv'): + df = pandas.read_csv(io.BytesIO(data), keep_default_na=False, dtype=str) + elif name_lower.endswith('.xlsx') or name_lower.endswith('.xlsm'): + df = pandas.read_excel(io.BytesIO(data), engine='openpyxl', keep_default_na=False, dtype=str) + elif name_lower.endswith('.xls'): + df = pandas.read_excel(io.BytesIO(data), engine='xlrd', keep_default_na=False, dtype=str) + else: + raise ValueError(f"Unsupported tabular file type: {blob_name}") + + self._df_cache[cache_key] = df + log_event(f"[TabularProcessingPlugin] Cached DataFrame for {blob_name} ({len(df)} rows)", level=logging.DEBUG) + return df.copy() + + def _try_numeric_conversion(self, df: pandas.DataFrame) -> pandas.DataFrame: + """Attempt to convert string columns to numeric where possible.""" + for col in df.columns: + try: + df[col] = pandas.to_numeric(df[col]) + except (ValueError, TypeError): + pass + return df + + def _resolve_blob_location(self, user_id: str, conversation_id: str, filename: str, source: str, + group_id: str = None, public_workspace_id: str = None) -> tuple: + """Resolve container name and blob path from source type.""" + source = source.lower().strip() + if source == 'chat': + container = storage_account_personal_chat_container_name + blob_path = f"{user_id}/{conversation_id}/{filename}" + elif source == 'workspace': + container = storage_account_user_documents_container_name + blob_path = f"{user_id}/{filename}" + elif source == 'group': + if not group_id: + raise ValueError("group_id is required for source='group'") + container = storage_account_group_documents_container_name + blob_path = f"{group_id}/{filename}" + elif source == 'public': + if not public_workspace_id: + raise ValueError("public_workspace_id is required for source='public'") + container = storage_account_public_documents_container_name + blob_path = f"{public_workspace_id}/{filename}" + else: + raise ValueError(f"Unknown source '{source}'. Use 'workspace', 'chat', 'group', or 'public'.") + return container, blob_path + + def _resolve_blob_location_with_fallback(self, user_id: str, conversation_id: str, filename: str, source: str, + group_id: str = None, public_workspace_id: str = None) -> tuple: + """Try primary source first, then fall back to other containers if blob not found.""" + source = source.lower().strip() + attempts = [] + + # Primary attempt based on specified source + try: + primary = self._resolve_blob_location(user_id, conversation_id, filename, source, group_id, public_workspace_id) + attempts.append(primary) + except ValueError: + pass + + # Fallback attempts in priority order (skip the primary source) + if source != 'workspace': + attempts.append((storage_account_user_documents_container_name, f"{user_id}/{filename}")) + if source != 'group' and group_id: + attempts.append((storage_account_group_documents_container_name, f"{group_id}/{filename}")) + if source != 'public' and public_workspace_id: + attempts.append((storage_account_public_documents_container_name, f"{public_workspace_id}/{filename}")) + if source != 'chat': + attempts.append((storage_account_personal_chat_container_name, f"{user_id}/{conversation_id}/{filename}")) + + client = self._get_blob_service_client() + for container, blob_path in attempts: + try: + blob_client = client.get_blob_client(container=container, blob=blob_path) + if blob_client.exists(): + log_event(f"[TabularProcessingPlugin] Found blob at {container}/{blob_path}", level=logging.DEBUG) + return container, blob_path + except Exception: + continue + + # If nothing found, return primary for the original error message + if attempts: + return attempts[0] + raise ValueError(f"Could not resolve blob location for {filename}") + + @kernel_function( + description=( + "List all tabular data files available for a user. Checks workspace documents " + "(user-documents container), chat-uploaded documents (personal-chat container), " + "and optionally group or public workspace documents. " + "Returns a JSON list of available files with their source." + ), + name="list_tabular_files" + ) + @plugin_function_logger("TabularProcessingPlugin") + async def list_tabular_files( + self, + user_id: Annotated[str, "The user ID (from Scope ID in Conversation Metadata)"], + conversation_id: Annotated[str, "The conversation ID (from Conversation Metadata)"], + group_id: Annotated[Optional[str], "Group ID (for group workspace documents)"] = None, + public_workspace_id: Annotated[Optional[str], "Public workspace ID (for public workspace documents)"] = None, + ) -> Annotated[str, "JSON list of available tabular files"]: + """List all tabular files available for the user across all accessible containers.""" + def _sync_work(): + results = [] + try: + workspace_prefix = f"{user_id}/" + workspace_blobs = self._list_tabular_blobs( + storage_account_user_documents_container_name, workspace_prefix + ) + for blob in workspace_blobs: + filename = blob.split('/')[-1] + results.append({ + "filename": filename, + "blob_path": blob, + "source": "workspace", + "container": storage_account_user_documents_container_name + }) + except Exception as e: + log_event(f"[TabularProcessingPlugin] Error listing workspace blobs: {e}", level=logging.WARNING) + + try: + chat_prefix = f"{user_id}/{conversation_id}/" + chat_blobs = self._list_tabular_blobs( + storage_account_personal_chat_container_name, chat_prefix + ) + for blob in chat_blobs: + filename = blob.split('/')[-1] + results.append({ + "filename": filename, + "blob_path": blob, + "source": "chat", + "container": storage_account_personal_chat_container_name + }) + except Exception as e: + log_event(f"[TabularProcessingPlugin] Error listing chat blobs: {e}", level=logging.WARNING) + + if group_id: + try: + group_prefix = f"{group_id}/" + group_blobs = self._list_tabular_blobs( + storage_account_group_documents_container_name, group_prefix + ) + for blob in group_blobs: + filename = blob.split('/')[-1] + results.append({ + "filename": filename, + "blob_path": blob, + "source": "group", + "container": storage_account_group_documents_container_name + }) + except Exception as e: + log_event(f"[TabularProcessingPlugin] Error listing group blobs: {e}", level=logging.WARNING) + + if public_workspace_id: + try: + public_prefix = f"{public_workspace_id}/" + public_blobs = self._list_tabular_blobs( + storage_account_public_documents_container_name, public_prefix + ) + for blob in public_blobs: + filename = blob.split('/')[-1] + results.append({ + "filename": filename, + "blob_path": blob, + "source": "public", + "container": storage_account_public_documents_container_name + }) + except Exception as e: + log_event(f"[TabularProcessingPlugin] Error listing public blobs: {e}", level=logging.WARNING) + + return json.dumps(results, indent=2) + return await asyncio.to_thread(_sync_work) + + @kernel_function( + description=( + "Get a summary of a tabular file including column names, row count, data types, " + "and a preview of the first few rows." + ), + name="describe_tabular_file" + ) + @plugin_function_logger("TabularProcessingPlugin") + async def describe_tabular_file( + self, + user_id: Annotated[str, "The user ID (from Scope ID in Conversation Metadata)"], + conversation_id: Annotated[str, "The conversation ID (from Conversation Metadata)"], + filename: Annotated[str, "The filename of the tabular file"], + source: Annotated[str, "Source: 'workspace', 'chat', 'group', or 'public'"] = "chat", + group_id: Annotated[Optional[str], "Group ID (for group workspace documents)"] = None, + public_workspace_id: Annotated[Optional[str], "Public workspace ID (for public workspace documents)"] = None, + ) -> Annotated[str, "JSON summary of the tabular file"]: + """Get schema and preview of a tabular file.""" + def _sync_work(): + try: + container, blob_path = self._resolve_blob_location( + user_id, conversation_id, filename, source, + group_id=group_id, public_workspace_id=public_workspace_id + ) + df = self._read_tabular_blob_to_dataframe(container, blob_path) + df_numeric = self._try_numeric_conversion(df.copy()) + + summary = { + "filename": filename, + "row_count": len(df), + "column_count": len(df.columns), + "columns": list(df.columns), + "dtypes": {col: str(dtype) for col, dtype in df_numeric.dtypes.items()}, + "preview": df.head(5).to_dict(orient='records'), + "null_counts": df.isnull().sum().to_dict() + } + return json.dumps(summary, indent=2, default=str) + except Exception as e: + log_event(f"[TabularProcessingPlugin] Error describing file: {e}", level=logging.WARNING) + return json.dumps({"error": str(e)}) + return await asyncio.to_thread(_sync_work) + + @kernel_function( + description=( + "Execute an aggregation operation on a column of a tabular file. " + "Supported operations: sum, mean, count, min, max, median, std, nunique, value_counts." + ), + name="aggregate_column" + ) + @plugin_function_logger("TabularProcessingPlugin") + async def aggregate_column( + self, + user_id: Annotated[str, "The user ID (from Scope ID in Conversation Metadata)"], + conversation_id: Annotated[str, "The conversation ID (from Conversation Metadata)"], + filename: Annotated[str, "The filename of the tabular file"], + column: Annotated[str, "The column name to aggregate"], + operation: Annotated[str, "Aggregation: sum, mean, count, min, max, median, std, nunique, value_counts"], + source: Annotated[str, "Source: 'workspace', 'chat', 'group', or 'public'"] = "chat", + group_id: Annotated[Optional[str], "Group ID (for group workspace documents)"] = None, + public_workspace_id: Annotated[Optional[str], "Public workspace ID (for public workspace documents)"] = None, + ) -> Annotated[str, "JSON result of the aggregation"]: + """Execute an aggregation operation on a column.""" + def _sync_work(): + try: + container, blob_path = self._resolve_blob_location( + user_id, conversation_id, filename, source, + group_id=group_id, public_workspace_id=public_workspace_id + ) + df = self._read_tabular_blob_to_dataframe(container, blob_path) + df = self._try_numeric_conversion(df) + + if column not in df.columns: + return json.dumps({"error": f"Column '{column}' not found. Available: {list(df.columns)}"}) + + series = df[column] + op = operation.lower().strip() + + if op == 'sum': + result = series.sum() + elif op == 'mean': + result = series.mean() + elif op == 'count': + result = series.count() + elif op == 'min': + result = series.min() + elif op == 'max': + result = series.max() + elif op == 'median': + result = series.median() + elif op == 'std': + result = series.std() + elif op == 'nunique': + result = series.nunique() + elif op == 'value_counts': + result = series.value_counts().to_dict() + else: + return json.dumps({"error": f"Unsupported operation: {operation}. Use sum, mean, count, min, max, median, std, nunique, value_counts."}) + + return json.dumps({"column": column, "operation": op, "result": result}, indent=2, default=str) + except Exception as e: + log_event(f"[TabularProcessingPlugin] Error aggregating column: {e}", level=logging.WARNING) + return json.dumps({"error": str(e)}) + return await asyncio.to_thread(_sync_work) + + @kernel_function( + description=( + "Filter rows in a tabular file based on conditions and return matching rows. " + "Supports operators: ==, !=, >, <, >=, <=, contains, startswith, endswith." + ), + name="filter_rows" + ) + @plugin_function_logger("TabularProcessingPlugin") + async def filter_rows( + self, + user_id: Annotated[str, "The user ID (from Scope ID in Conversation Metadata)"], + conversation_id: Annotated[str, "The conversation ID (from Conversation Metadata)"], + filename: Annotated[str, "The filename of the tabular file"], + column: Annotated[str, "The column to filter on"], + operator: Annotated[str, "Operator: ==, !=, >, <, >=, <=, contains, startswith, endswith"], + value: Annotated[str, "The value to compare against"], + source: Annotated[str, "Source: 'workspace', 'chat', 'group', or 'public'"] = "chat", + max_rows: Annotated[str, "Maximum rows to return"] = "100", + group_id: Annotated[Optional[str], "Group ID (for group workspace documents)"] = None, + public_workspace_id: Annotated[Optional[str], "Public workspace ID (for public workspace documents)"] = None, + ) -> Annotated[str, "JSON list of matching rows"]: + """Filter rows based on a condition.""" + def _sync_work(): + try: + container, blob_path = self._resolve_blob_location( + user_id, conversation_id, filename, source, + group_id=group_id, public_workspace_id=public_workspace_id + ) + df = self._read_tabular_blob_to_dataframe(container, blob_path) + df = self._try_numeric_conversion(df) + + if column not in df.columns: + return json.dumps({"error": f"Column '{column}' not found. Available: {list(df.columns)}"}) + + series = df[column] + op = operator.strip().lower() + + numeric_value = None + try: + numeric_value = float(value) + except (ValueError, TypeError): + pass + + if op == '==' or op == 'equals': + if numeric_value is not None and pandas.api.types.is_numeric_dtype(series): + mask = series == numeric_value + else: + mask = series.astype(str).str.lower() == value.lower() + elif op == '!=': + if numeric_value is not None and pandas.api.types.is_numeric_dtype(series): + mask = series != numeric_value + else: + mask = series.astype(str).str.lower() != value.lower() + elif op == '>': + mask = series > numeric_value + elif op == '<': + mask = series < numeric_value + elif op == '>=': + mask = series >= numeric_value + elif op == '<=': + mask = series <= numeric_value + elif op == 'contains': + mask = series.astype(str).str.contains(value, case=False, na=False) + elif op == 'startswith': + mask = series.astype(str).str.lower().str.startswith(value.lower()) + elif op == 'endswith': + mask = series.astype(str).str.lower().str.endswith(value.lower()) + else: + return json.dumps({"error": f"Unsupported operator: {operator}"}) + + limit = int(max_rows) + filtered = df[mask].head(limit) + return json.dumps({ + "total_matches": int(mask.sum()), + "returned_rows": len(filtered), + "data": filtered.to_dict(orient='records') + }, indent=2, default=str) + except Exception as e: + log_event(f"[TabularProcessingPlugin] Error filtering rows: {e}", level=logging.WARNING) + return json.dumps({"error": str(e)}) + return await asyncio.to_thread(_sync_work) + + @kernel_function( + description=( + "Execute a pandas query expression against a tabular file for advanced analysis. " + "The query string uses pandas DataFrame.query() syntax. " + "Examples: 'Age > 30 and State == \"CA\"', 'Price < 100'" + ), + name="query_tabular_data" + ) + @plugin_function_logger("TabularProcessingPlugin") + async def query_tabular_data( + self, + user_id: Annotated[str, "The user ID (from Scope ID in Conversation Metadata)"], + conversation_id: Annotated[str, "The conversation ID (from Conversation Metadata)"], + filename: Annotated[str, "The filename of the tabular file"], + query_expression: Annotated[str, "Pandas query expression (e.g. 'Age > 30 and State == \"CA\"')"], + source: Annotated[str, "Source: 'workspace', 'chat', 'group', or 'public'"] = "chat", + max_rows: Annotated[str, "Maximum rows to return"] = "100", + group_id: Annotated[Optional[str], "Group ID (for group workspace documents)"] = None, + public_workspace_id: Annotated[Optional[str], "Public workspace ID (for public workspace documents)"] = None, + ) -> Annotated[str, "JSON result of the query"]: + """Execute a pandas query expression against a tabular file.""" + def _sync_work(): + try: + container, blob_path = self._resolve_blob_location( + user_id, conversation_id, filename, source, + group_id=group_id, public_workspace_id=public_workspace_id + ) + df = self._read_tabular_blob_to_dataframe(container, blob_path) + df = self._try_numeric_conversion(df) + + result_df = df.query(query_expression) + limit = int(max_rows) + return json.dumps({ + "total_matches": len(result_df), + "returned_rows": min(len(result_df), limit), + "data": result_df.head(limit).to_dict(orient='records') + }, indent=2, default=str) + except Exception as e: + log_event(f"[TabularProcessingPlugin] Error querying data: {e}", level=logging.WARNING) + return json.dumps({"error": f"Query error: {str(e)}. Ensure column names and values are correct."}) + return await asyncio.to_thread(_sync_work) + + @kernel_function( + description=( + "Perform a group-by aggregation on a tabular file. " + "Groups data by one column and aggregates another column. " + "Supported operations: sum, mean, count, min, max." + ), + name="group_by_aggregate" + ) + @plugin_function_logger("TabularProcessingPlugin") + async def group_by_aggregate( + self, + user_id: Annotated[str, "The user ID (from Scope ID in Conversation Metadata)"], + conversation_id: Annotated[str, "The conversation ID (from Conversation Metadata)"], + filename: Annotated[str, "The filename of the tabular file"], + group_by_column: Annotated[str, "The column to group by"], + aggregate_column: Annotated[str, "The column to aggregate"], + operation: Annotated[str, "Aggregation operation: sum, mean, count, min, max"], + source: Annotated[str, "Source: 'workspace', 'chat', 'group', or 'public'"] = "chat", + group_id: Annotated[Optional[str], "Group ID (for group workspace documents)"] = None, + public_workspace_id: Annotated[Optional[str], "Public workspace ID (for public workspace documents)"] = None, + ) -> Annotated[str, "JSON result of the group-by aggregation"]: + """Group by one column and aggregate another.""" + def _sync_work(): + try: + container, blob_path = self._resolve_blob_location( + user_id, conversation_id, filename, source, + group_id=group_id, public_workspace_id=public_workspace_id + ) + df = self._read_tabular_blob_to_dataframe(container, blob_path) + df = self._try_numeric_conversion(df) + + for col in [group_by_column, aggregate_column]: + if col not in df.columns: + return json.dumps({"error": f"Column '{col}' not found. Available: {list(df.columns)}"}) + + op = operation.lower().strip() + grouped = df.groupby(group_by_column)[aggregate_column].agg(op) + return json.dumps({ + "group_by": group_by_column, + "aggregate_column": aggregate_column, + "operation": op, + "groups": len(grouped), + "result": grouped.to_dict() + }, indent=2, default=str) + except Exception as e: + log_event(f"[TabularProcessingPlugin] Error in group-by: {e}", level=logging.WARNING) + return json.dumps({"error": str(e)}) + return await asyncio.to_thread(_sync_work) diff --git a/application/single_app/static/js/admin/admin_settings.js b/application/single_app/static/js/admin/admin_settings.js index 87478db0..9cf4580d 100644 --- a/application/single_app/static/js/admin/admin_settings.js +++ b/application/single_app/static/js/admin/admin_settings.js @@ -1237,10 +1237,11 @@ function setupToggles() { const mathToggle = document.getElementById('toggle-math-plugin'); const textToggle = document.getElementById('toggle-text-plugin'); const factMemoryToggle = document.getElementById('toggle-fact-memory-plugin'); + const tabularProcessingToggle = document.getElementById('toggle-tabular-processing-plugin'); const embeddingToggle = document.getElementById('toggle-default-embedding-model-plugin'); const allowUserPluginsToggle = document.getElementById('toggle-allow-user-plugins'); const allowGroupPluginsToggle = document.getElementById('toggle-allow-group-plugins'); - const toggles = [timeToggle, httpToggle, waitToggle, mathToggle, textToggle, factMemoryToggle, embeddingToggle, allowUserPluginsToggle, allowGroupPluginsToggle]; + const toggles = [timeToggle, httpToggle, waitToggle, mathToggle, textToggle, factMemoryToggle, tabularProcessingToggle, embeddingToggle, allowUserPluginsToggle, allowGroupPluginsToggle]; // Feedback area let feedbackDiv = document.getElementById('core-plugin-toggles-feedback'); if (!feedbackDiv) { @@ -1270,6 +1271,16 @@ function setupToggles() { if (textToggle) textToggle.checked = !!settings.enable_text_plugin; if (embeddingToggle) embeddingToggle.checked = !!settings.enable_default_embedding_model_plugin; if (factMemoryToggle) factMemoryToggle.checked = !!settings.enable_fact_memory_plugin; + if (tabularProcessingToggle) { + tabularProcessingToggle.checked = !!settings.enable_tabular_processing_plugin; + const ecEnabled = !!settings.enable_enhanced_citations; + tabularProcessingToggle.disabled = !ecEnabled; + const depNote = document.getElementById('tabular-processing-dependency-note'); + if (depNote) { + depNote.textContent = ecEnabled ? 'Requires Enhanced Citations' : 'Requires Enhanced Citations (currently disabled)'; + depNote.className = ecEnabled ? 'text-muted d-block ms-4' : 'text-danger d-block ms-4'; + } + } if (allowUserPluginsToggle) allowUserPluginsToggle.checked = !!settings.allow_user_plugins; if (allowGroupPluginsToggle) allowGroupPluginsToggle.checked = !!settings.allow_group_plugins; } catch (err) { @@ -1291,6 +1302,7 @@ function setupToggles() { enable_text_plugin: textToggle ? textToggle.checked : false, enable_default_embedding_model_plugin: embeddingToggle ? embeddingToggle.checked : false, enable_fact_memory_plugin: factMemoryToggle ? factMemoryToggle.checked : false, + enable_tabular_processing_plugin: tabularProcessingToggle ? tabularProcessingToggle.checked : false, allow_user_plugins: allowUserPluginsToggle ? allowUserPluginsToggle.checked : false, allow_group_plugins: allowGroupPluginsToggle ? allowGroupPluginsToggle.checked : false }; diff --git a/application/single_app/static/js/chat/chat-enhanced-citations.js b/application/single_app/static/js/chat/chat-enhanced-citations.js index dcda708b..9d4344bb 100644 --- a/application/single_app/static/js/chat/chat-enhanced-citations.js +++ b/application/single_app/static/js/chat/chat-enhanced-citations.js @@ -18,11 +18,13 @@ export function getFileType(fileName) { const imageExtensions = ['jpg', 'jpeg', 'png', 'bmp', 'tiff', 'tif']; const videoExtensions = ['mp4', 'mov', 'avi', 'mkv', 'flv', 'webm', 'wmv', 'm4v', '3gp']; const audioExtensions = ['mp3', 'wav', 'ogg', 'aac', 'flac', 'm4a']; - + const tabularExtensions = ['csv', 'xlsx', 'xls', 'xlsm']; + if (imageExtensions.includes(ext)) return 'image'; if (ext === 'pdf') return 'pdf'; if (videoExtensions.includes(ext)) return 'video'; if (audioExtensions.includes(ext)) return 'audio'; + if (tabularExtensions.includes(ext)) return 'tabular'; return 'other'; } @@ -66,6 +68,9 @@ export function showEnhancedCitationModal(docId, pageNumberOrTimestamp, citation const audioTimestamp = convertTimestampToSeconds(pageNumberOrTimestamp); showAudioModal(docId, audioTimestamp, docMetadata.file_name); break; + case 'tabular': + showTabularDownloadModal(docId, docMetadata.file_name); + break; default: // Fall back to text citation for unsupported types import('./chat-citations.js').then(module => { @@ -291,6 +296,119 @@ export function showAudioModal(docId, timestamp, fileName) { modalInstance.show(); } +/** + * Show tabular file preview modal with data table + * @param {string} docId - Document ID + * @param {string} fileName - File name + */ +export function showTabularDownloadModal(docId, fileName) { + console.log(`Showing tabular preview modal for docId: ${docId}, fileName: ${fileName}`); + showLoadingIndicator(); + + // Create or get tabular modal + let tabularModal = document.getElementById("enhanced-tabular-modal"); + if (!tabularModal) { + tabularModal = createTabularModal(); + } + + const title = tabularModal.querySelector(".modal-title"); + const tableContainer = tabularModal.querySelector("#enhanced-tabular-table-container"); + const rowInfo = tabularModal.querySelector("#enhanced-tabular-row-info"); + const downloadBtn = tabularModal.querySelector("#enhanced-tabular-download"); + const errorContainer = tabularModal.querySelector("#enhanced-tabular-error"); + + title.textContent = `Tabular Data: ${fileName}`; + tableContainer.innerHTML = '
Loading...

Loading data preview...

'; + rowInfo.textContent = ''; + errorContainer.classList.add('d-none'); + + const downloadUrl = `/api/enhanced_citations/tabular_workspace?doc_id=${encodeURIComponent(docId)}`; + downloadBtn.href = downloadUrl; + downloadBtn.download = fileName; + + // Show modal immediately with loading state + const modalInstance = new bootstrap.Modal(tabularModal); + modalInstance.show(); + + // Fetch preview data + const previewUrl = `/api/enhanced_citations/tabular_preview?doc_id=${encodeURIComponent(docId)}`; + fetch(previewUrl) + .then(response => { + if (!response.ok) throw new Error(`HTTP ${response.status}`); + return response.json(); + }) + .then(data => { + hideLoadingIndicator(); + if (data.error) { + showTabularError(tableContainer, errorContainer, data.error); + return; + } + renderTabularPreview(tableContainer, rowInfo, data); + }) + .catch(error => { + hideLoadingIndicator(); + console.error('Error loading tabular preview:', error); + showTabularError(tableContainer, errorContainer, 'Could not load data preview.'); + }); +} + +/** + * Render tabular data as an HTML table + * @param {HTMLElement} container - Table container element + * @param {HTMLElement} rowInfo - Row info display element + * @param {Object} data - Preview data from API + */ +function renderTabularPreview(container, rowInfo, data) { + const { columns, rows, total_rows, truncated } = data; + + // Build table HTML + let html = ''; + + // Header + html += ''; + for (const col of columns) { + const escaped = col.replace(/&/g, '&').replace(//g, '>'); + html += ``; + } + html += ''; + + // Body + html += ''; + for (const row of rows) { + html += ''; + for (const cell of row) { + const val = cell === null || cell === undefined ? '' : String(cell); + const escaped = val.replace(/&/g, '&').replace(//g, '>'); + html += ``; + } + html += ''; + } + html += '
${escaped}
${escaped}
'; + + container.innerHTML = html; + + // Row info + const displayedRows = rows.length; + const totalFormatted = total_rows.toLocaleString(); + if (truncated) { + rowInfo.textContent = `Showing ${displayedRows.toLocaleString()} of ${totalFormatted} rows`; + } else { + rowInfo.textContent = `${totalFormatted} rows, ${columns.length} columns`; + } +} + +/** + * Show error state in tabular modal with download fallback + * @param {HTMLElement} tableContainer - Table container element + * @param {HTMLElement} errorContainer - Error display element + * @param {string} message - Error message + */ +function showTabularError(tableContainer, errorContainer, message) { + tableContainer.innerHTML = '
'; + errorContainer.textContent = message + ' You can still download the file below.'; + errorContainer.classList.remove('d-none'); +} + /** * Convert timestamp string to seconds * @param {string|number} timestamp - Timestamp in various formats @@ -445,3 +563,36 @@ function createPdfModal() { document.body.appendChild(modal); return modal; } + +/** + * Create tabular file preview modal HTML structure + * @returns {HTMLElement} - Modal element + */ +function createTabularModal() { + const modal = document.createElement("div"); + modal.id = "enhanced-tabular-modal"; + modal.classList.add("modal", "fade"); + modal.tabIndex = -1; + modal.innerHTML = ` + + `; + document.body.appendChild(modal); + return modal; +} diff --git a/application/single_app/static/js/chat/chat-input-actions.js b/application/single_app/static/js/chat/chat-input-actions.js index 77851319..c0c7832b 100644 --- a/application/single_app/static/js/chat/chat-input-actions.js +++ b/application/single_app/static/js/chat/chat-input-actions.js @@ -127,11 +127,11 @@ export function fetchFileContent(conversationId, fileId) { hideLoadingIndicator(); if (data.file_content && data.filename) { - showFileContentPopup(data.file_content, data.filename, data.is_table); + showFileContentPopup(data.file_content, data.filename, data.is_table, data.file_content_source, conversationId, fileId); } else if (data.error) { showToast(data.error, "danger"); } else { - ashowToastlert("Unexpected response from server.", "danger"); + showToast("Unexpected response from server.", "danger"); } }) .catch((error) => { @@ -141,7 +141,7 @@ export function fetchFileContent(conversationId, fileId) { }); } -export function showFileContentPopup(fileContent, filename, isTable) { +export function showFileContentPopup(fileContent, filename, isTable, fileContentSource, conversationId, fileId) { let modalContainer = document.getElementById("file-modal"); if (!modalContainer) { modalContainer = document.createElement("div"); @@ -155,6 +155,7 @@ export function showFileContentPopup(fileContent, filename, isTable) { diff --git a/application/single_app/tmp2xmw9c75.xlsx b/application/single_app/tmp2xmw9c75.xlsx new file mode 100644 index 00000000..e2603075 Binary files /dev/null and b/application/single_app/tmp2xmw9c75.xlsx differ diff --git a/docs/explanation/fixes/v0.239.008/CHAT_TABULAR_SK_TRIGGER_FIX.md b/docs/explanation/fixes/v0.239.008/CHAT_TABULAR_SK_TRIGGER_FIX.md new file mode 100644 index 00000000..d525e2d1 --- /dev/null +++ b/docs/explanation/fixes/v0.239.008/CHAT_TABULAR_SK_TRIGGER_FIX.md @@ -0,0 +1,67 @@ +# Chat-Uploaded Tabular File SK Mini-Agent Trigger Fix + +## Issue Description + +When a user uploads a tabular file (CSV, XLSX, XLS, XLSM) directly to a chat conversation and asks a question in model-only mode (no agent selected), the SK mini-agent (`run_tabular_sk_analysis`) did not trigger. The model would see instructions to "use plugin functions" but could not call them without an agent, resulting in the model describing what it would do instead of providing actual analysis results. + +The full agent mode worked correctly because the agent has direct access to the `TabularProcessingPlugin` and can call its functions. + +## Root Cause + +Three gaps prevented the mini SK agent from activating for chat-uploaded tabular files: + +1. **Streaming path ignored `file` role messages**: The streaming conversation history loop (`/api/chat/stream`) only processed `user` and `assistant` roles, making chat-uploaded files completely invisible to the model. + +2. **Mini SK only triggered from search results**: Both streaming and non-streaming paths only invoked `run_tabular_sk_analysis()` when tabular files appeared in hybrid search results (`combined_documents`). Chat-uploaded files are stored in blob storage as `file` role messages and are not indexed in Azure AI Search, so they never appeared in search results. + +3. **Model-only mode can't call plugin functions**: The non-streaming path's file handler injected "Use the tabular_processing plugin functions" as a system message, but in model-only mode the model has no function-calling capability. + +## Technical Details + +### Files Modified + +- `application/single_app/route_backend_chats.py` — All code changes +- `application/single_app/config.py` — Version bump to 0.239.008 + +### Code Changes + +#### Non-streaming path (`/api/chat`) + +1. Added `chat_tabular_files = set()` tracker before the conversation history loop (~line 1896) +2. Added `chat_tabular_files.add(filename)` inside the `if is_table and file_content_source == 'blob':` block (~line 1936) +3. After the history loop, added a block that checks `chat_tabular_files` and calls `run_tabular_sk_analysis(source_hint="chat")`, injecting pre-computed results as a system message (~line 2027) + +#### Streaming path (`/api/chat/stream`) + +4. Replaced the simple 8-line history loop (which only handled `user`/`assistant`) with expanded logic that mirrors the non-streaming path's `file` role handling, including blob tabular file tracking (~line 3687) +5. Added the same mini SK trigger block after the expanded loop (~line 3751) + +### How It Works After Fix + +1. User uploads `sales.xlsx` to chat, asks "analyze sales/profit" +2. During conversation history building, the `file` role message with `is_table=True` and `file_content_source='blob'` is detected +3. The filename is collected into `chat_tabular_files` +4. After the history loop, `run_tabular_sk_analysis()` is called with `source_hint="chat"`, which resolves the file from the `personal-chat` blob container +5. The mini SK agent pre-loads the file schema, calls plugin functions (aggregate, filter, etc.), and returns computed results +6. Results are injected as a system message so the model can present accurate numbers +7. Plugin invocation citations are collected for transparency + +## Testing + +1. Upload a tabular file (xlsx/csv) directly to chat +2. With no agent selected, send a data analysis question +3. Verify the response contains actual computed data (not just a description of steps) +4. Check logs for `[Chat Tabular SK]` entries confirming the mini SK trigger +5. Verify agent mode still works as before + +## Impact + +- Enables tabular data analysis in model-only chat mode for chat-uploaded files +- No changes to existing search-result-based tabular detection +- No changes to full agent mode behavior +- Streaming and non-streaming paths both fixed + +## Version + +- **Version**: 0.239.008 +- **Implemented in**: 0.239.008 diff --git a/docs/explanation/release_notes.md b/docs/explanation/release_notes.md index cbd6dfc2..bba394db 100644 --- a/docs/explanation/release_notes.md +++ b/docs/explanation/release_notes.md @@ -2,6 +2,17 @@ # Feature Release +### **(v0.239.008)** + +#### Bug Fixes + +* **Chat-Uploaded Tabular Files Now Trigger SK Mini-Agent in Model-Only Mode** + * Fixed an issue where tabular files (CSV, XLSX, XLS, XLSM) uploaded directly to a chat conversation were not analyzed by the SK mini-agent when no agent was selected. The model would describe what analysis it would perform instead of returning actual computed results. + * **Root Cause**: The mini SK agent only triggered from search results, but chat-uploaded files are stored in blob storage and not indexed in Azure AI Search. Additionally, the streaming path completely ignored `file` role messages in conversation history. + * **Fix**: Both streaming and non-streaming chat paths now detect chat-uploaded tabular files during conversation history building and trigger `run_tabular_sk_analysis(source_hint="chat")` to pre-compute results. The streaming path also now properly handles `file` role messages (tabular and non-tabular) matching the non-streaming path's behavior. + * (Ref: `route_backend_chats.py`, `run_tabular_sk_analysis()`, `collect_tabular_sk_citations()`) + +--- ### **(v0.239.013)** #### New Features @@ -38,6 +49,36 @@ #### New Features +* **Tabular Data Analysis — SK Mini-Agent for Normal Chat** + * Tabular files (CSV, XLSX, XLS, XLSM) detected in search results now trigger a lightweight Semantic Kernel mini-agent that pre-computes data analysis before the main LLM response. This brings the same analytical depth previously only available in full agent mode to every normal chat conversation. + * **Automatic Detection**: When AI Search results include tabular files from any workspace (personal, group, or public) or chat-uploaded documents, the system automatically identifies them via the `TABULAR_EXTENSIONS` configuration and routes the query through the SK mini-agent pipeline. + * **Unified Workspace and Chat Handling**: Tabular files are processed identically regardless of their storage location. The plugin resolves blob paths across all four container types (`user-documents`, `group-documents`, `public-documents`, `personal-chat`) with automatic fallback resolution if the primary source lookup fails. A user asking about an Excel file in their personal workspace gets the same analytical treatment as one asking about a CSV uploaded directly to a chat. + * **Six Data Analysis Functions**: The `TabularProcessingPlugin` exposes `describe_tabular_file`, `aggregate_column` (sum, mean, count, min, max, median, std, nunique, value_counts), `filter_rows` (==, !=, >, <, >=, <=, contains, startswith, endswith), `query_tabular_data` (pandas query syntax), `group_by_aggregate`, and `list_tabular_files` — all registered as Semantic Kernel functions that the mini-agent orchestrates autonomously. + * **Pre-Computed Results Injected as Context**: The mini-agent's computed analysis (exact numerical results, aggregations, filtered data) is injected into the main LLM's system context so it can present accurate, citation-backed answers without hallucinating numbers. + * **Graceful Degradation**: If the mini-agent analysis fails for any reason, the system falls back to instructing the main LLM to use the tabular processing plugin functions directly, preserving full functionality. + * **Non-Streaming and Streaming Support**: Both chat modes are supported. The mini-agent runs synchronously before the main LLM call in both paths. + * **Requires Enhanced Citations**: The tabular processing plugin depends on the blob storage client initialized by the enhanced citations system. The `enable_enhanced_citations` admin setting must be enabled for tabular data analysis to activate. + * **Files Modified**: `route_backend_chats.py`, `semantic_kernel_plugins/tabular_processing_plugin.py`, `config.py`. + * (Ref: `run_tabular_sk_analysis()`, `TabularProcessingPlugin`, `collect_tabular_sk_citations()`, `TABULAR_EXTENSIONS`) + +* **Tabular Tool Execution Citations** + * Every tool call made by the SK mini-agent during tabular analysis is captured and surfaced as an agent citation, providing full transparency into the data analysis pipeline. + * **Automatic Capture**: The existing `@plugin_function_logger` decorator on all `TabularProcessingPlugin` functions records each invocation including function name, input parameters, returned results, execution duration, and success/failure status. + * **Citation Format**: Tool execution citations appear in the same "Agent Tool Execution" modal used by full agent mode, showing `tool_name` (e.g., `TabularProcessingPlugin.aggregate_column`), `function_arguments` (the exact parameters passed), and `function_result` (the computed data returned). + * **End-to-End Auditability**: Users can verify exactly which aggregations, filters, or queries were run against their data, what parameters were used, and what raw results were returned — before the LLM summarized them into the final response. + * **Files Modified**: `route_backend_chats.py`. + * (Ref: `collect_tabular_sk_citations()`, `plugin_invocation_logger.py`) + +* **SK Mini-Agent Performance Optimization** + * Reduced typical tabular analysis time from ~74 seconds to an estimated ~30-33 seconds (55-60% reduction) through three complementary optimizations. + * **DataFrame Caching**: Per-request in-memory cache eliminates redundant blob downloads. Previously, each of the ~8 tool calls in a typical analysis downloaded and parsed the same file independently. Now the file is downloaded once and subsequent calls read from cache. Cache is automatically scoped to the request (new plugin instance per analysis) and garbage-collected afterward. + * **Pre-Dispatch Schema Injection**: File schemas (columns, data types, row counts, and a 3-row preview) are pre-loaded and injected into the SK mini-agent's system prompt before execution begins. This eliminates 2 LLM round-trips that were previously spent on file discovery (`list_tabular_files`) and schema inspection (`describe_tabular_file`), allowing the model to jump directly to analysis tool calls. + * **Async Plugin Functions**: All six `@kernel_function` methods converted to `async def` using `asyncio.to_thread()`. This enables Semantic Kernel's built-in `asyncio.gather()` to truly parallelize batched tool calls (e.g., 3 simultaneous `aggregate_column` calls) instead of executing them serially on the event loop. + * **Batching Instructions**: The system prompt now instructs the model to batch multiple independent function calls in a single response, reducing LLM round-trips further. + * **Files Modified**: `tabular_processing_plugin.py`, `route_backend_chats.py`, `config.py`. + * (Ref: `_df_cache`, `asyncio.to_thread`, pre-dispatch schema injection in `run_tabular_sk_analysis()`) + +--- * **Chat with Agent Button for Group Agents** * Added a "Chat" button to each group agent row, allowing users to quickly select a group agent and navigate to the chat page. * (Ref: `group_agents.js`, `group_workspaces.html`) diff --git a/functional_tests/test_chat_tabular_sk_trigger.py b/functional_tests/test_chat_tabular_sk_trigger.py new file mode 100644 index 00000000..9de91b39 --- /dev/null +++ b/functional_tests/test_chat_tabular_sk_trigger.py @@ -0,0 +1,319 @@ +#!/usr/bin/env python3 +# test_chat_tabular_sk_trigger.py +""" +Functional test for chat-uploaded tabular file SK mini-agent trigger fix. +Version: 0.239.008 +Implemented in: 0.239.008 + +This test ensures that when tabular data files (CSV, XLSX) are uploaded directly +to a chat conversation, the SK mini-agent (`run_tabular_sk_analysis`) is properly +triggered in model-only mode (no agent selected) to pre-compute analysis results. + +Previously, the mini SK agent only triggered from search results, missing +chat-uploaded files entirely. The streaming path also ignored file role messages. + +This test validates: +1. Chat-uploaded tabular files are detected in conversation history (file role) +2. Filenames are collected into chat_tabular_files set for both blob and inline cases +3. The streaming path properly handles file role messages (not just user/assistant) +4. The mini SK trigger block activates when chat_tabular_files is non-empty +""" + +import sys +import os + +sys.path.append(os.path.dirname(os.path.dirname(os.path.abspath(__file__)))) +sys.path.append(os.path.join(os.path.dirname(os.path.dirname(os.path.abspath(__file__))), 'application', 'single_app')) + + +def test_chat_tabular_file_detection(): + """Test that chat-uploaded tabular files are detected from file role messages.""" + print("Testing chat-uploaded tabular file detection...") + + try: + # Simulate conversation messages as they would appear from Cosmos DB + recent_messages = [ + { + 'role': 'file', + 'filename': 'Sample_-_Superstore_1.xlsx', + 'file_content': '', + 'is_table': True, + 'file_content_source': 'blob', + 'metadata': {}, + }, + { + 'role': 'user', + 'content': 'analyze sales/profit', + 'metadata': {}, + }, + ] + + # Simulate the conversation history building logic (streaming path) + conversation_history_for_api = [] + allowed_roles_in_history = ['user', 'assistant'] + max_file_content_length_in_history = 50000 + max_tabular_content_length_in_history = 50000 + chat_tabular_files = set() + + for message in recent_messages: + role = message.get('role') + content = message.get('content', '') + + if role in allowed_roles_in_history: + conversation_history_for_api.append({ + 'role': role, + 'content': content + }) + elif role == 'file': + filename = message.get('filename', 'uploaded_file') + file_content = message.get('file_content', '') + is_table = message.get('is_table', False) + file_content_source = message.get('file_content_source', '') + + if is_table and file_content_source == 'blob': + chat_tabular_files.add(filename) + conversation_history_for_api.append({ + 'role': 'system', + 'content': ( + f"[User uploaded a tabular data file named '{filename}'. " + f"The file is stored in blob storage and available for analysis. " + f"The file source is 'chat'.]" + ) + }) + + # Verify file was detected + assert 'Sample_-_Superstore_1.xlsx' in chat_tabular_files, \ + f"Expected xlsx file in chat_tabular_files, got: {chat_tabular_files}" + + # Verify system message was added for the file + system_msgs = [m for m in conversation_history_for_api if m['role'] == 'system'] + assert len(system_msgs) == 1, f"Expected 1 system message, got {len(system_msgs)}" + assert 'Sample_-_Superstore_1.xlsx' in system_msgs[0]['content'], \ + "System message should reference the uploaded file" + + # Verify user message was included + user_msgs = [m for m in conversation_history_for_api if m['role'] == 'user'] + assert len(user_msgs) == 1, f"Expected 1 user message, got {len(user_msgs)}" + assert user_msgs[0]['content'] == 'analyze sales/profit', \ + "User message content should be preserved" + + print("Test passed!") + return True + + except Exception as e: + print(f"Test failed: {e}") + import traceback + traceback.print_exc() + return False + + +def test_non_tabular_file_handling(): + """Test that non-tabular files are handled with content preview.""" + print("Testing non-tabular file handling in streaming path...") + + try: + recent_messages = [ + { + 'role': 'file', + 'filename': 'report.txt', + 'file_content': 'This is the report content.', + 'is_table': False, + 'file_content_source': 'inline', + 'metadata': {}, + }, + { + 'role': 'user', + 'content': 'summarize the report', + 'metadata': {}, + }, + ] + + conversation_history_for_api = [] + allowed_roles_in_history = ['user', 'assistant'] + max_file_content_length_in_history = 50000 + max_tabular_content_length_in_history = 50000 + chat_tabular_files = set() + + for message in recent_messages: + role = message.get('role') + content = message.get('content', '') + + if role in allowed_roles_in_history: + conversation_history_for_api.append({ + 'role': role, + 'content': content + }) + elif role == 'file': + filename = message.get('filename', 'uploaded_file') + file_content = message.get('file_content', '') + is_table = message.get('is_table', False) + file_content_source = message.get('file_content_source', '') + + if is_table and file_content_source == 'blob': + chat_tabular_files.add(filename) + conversation_history_for_api.append({ + 'role': 'system', + 'content': f"[User uploaded a tabular data file named '{filename}'.]" + }) + else: + content_limit = ( + max_tabular_content_length_in_history if is_table + else max_file_content_length_in_history + ) + display_content = file_content[:content_limit] + if len(file_content) > content_limit: + display_content += "..." + + conversation_history_for_api.append({ + 'role': 'system', + 'content': ( + f"[User uploaded a file named '{filename}'. " + f"Content preview:\n{display_content}]\n" + f"Use this file context if relevant." + ) + }) + + # Non-tabular files should NOT be in chat_tabular_files + assert len(chat_tabular_files) == 0, \ + f"Non-tabular files should not be tracked, got: {chat_tabular_files}" + + # System message should contain file content preview + system_msgs = [m for m in conversation_history_for_api if m['role'] == 'system'] + assert len(system_msgs) == 1, f"Expected 1 system message, got {len(system_msgs)}" + assert 'report.txt' in system_msgs[0]['content'], \ + "System message should reference the file" + assert 'This is the report content.' in system_msgs[0]['content'], \ + "System message should include file content preview" + + print("Test passed!") + return True + + except Exception as e: + print(f"Test failed: {e}") + import traceback + traceback.print_exc() + return False + + +def test_multiple_tabular_files(): + """Test detection of multiple tabular files uploaded to chat.""" + print("Testing multiple tabular file detection...") + + try: + recent_messages = [ + { + 'role': 'file', + 'filename': 'sales_2024.xlsx', + 'file_content': '', + 'is_table': True, + 'file_content_source': 'blob', + 'metadata': {}, + }, + { + 'role': 'file', + 'filename': 'inventory.csv', + 'file_content': 'item,count\nwidget,100', + 'is_table': True, + 'file_content_source': 'blob', + 'metadata': {}, + }, + { + 'role': 'user', + 'content': 'compare sales to inventory', + 'metadata': {}, + }, + ] + + chat_tabular_files = set() + conversation_history_for_api = [] + allowed_roles_in_history = ['user', 'assistant'] + + for message in recent_messages: + role = message.get('role') + content = message.get('content', '') + + if role in allowed_roles_in_history: + conversation_history_for_api.append({'role': role, 'content': content}) + elif role == 'file': + filename = message.get('filename', 'uploaded_file') + is_table = message.get('is_table', False) + file_content_source = message.get('file_content_source', '') + + if is_table and file_content_source == 'blob': + chat_tabular_files.add(filename) + conversation_history_for_api.append({ + 'role': 'system', + 'content': f"[Tabular file '{filename}' available for analysis.]" + }) + + assert len(chat_tabular_files) == 2, \ + f"Expected 2 tabular files, got {len(chat_tabular_files)}" + assert 'sales_2024.xlsx' in chat_tabular_files, "Missing sales_2024.xlsx" + assert 'inventory.csv' in chat_tabular_files, "Missing inventory.csv" + + print("Test passed!") + return True + + except Exception as e: + print(f"Test failed: {e}") + import traceback + traceback.print_exc() + return False + + +def test_inline_tabular_not_tracked_for_sk(): + """Test that inline tabular files (not blob) are NOT tracked for SK analysis.""" + print("Testing inline tabular files are not tracked for SK analysis...") + + try: + recent_messages = [ + { + 'role': 'file', + 'filename': 'data.csv', + 'file_content': 'col1,col2\n1,2\n3,4', + 'is_table': True, + 'file_content_source': 'inline', # Not blob + 'metadata': {}, + }, + ] + + chat_tabular_files = set() + + for message in recent_messages: + role = message.get('role') + if role == 'file': + is_table = message.get('is_table', False) + file_content_source = message.get('file_content_source', '') + filename = message.get('filename', '') + + if is_table and file_content_source == 'blob': + chat_tabular_files.add(filename) + + assert len(chat_tabular_files) == 0, \ + f"Inline tabular files should not be tracked for SK, got: {chat_tabular_files}" + + print("Test passed!") + return True + + except Exception as e: + print(f"Test failed: {e}") + import traceback + traceback.print_exc() + return False + + +if __name__ == "__main__": + results = [] + results.append(test_chat_tabular_file_detection()) + results.append(test_non_tabular_file_handling()) + results.append(test_multiple_tabular_files()) + results.append(test_inline_tabular_not_tracked_for_sk()) + + print(f"\n{'='*50}") + print(f"Results: {sum(results)}/{len(results)} tests passed") + if all(results): + print("All tests passed!") + sys.exit(0) + else: + print("Some tests failed!") + sys.exit(1)