Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions application/single_app/app.py
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,7 @@
from route_backend_public_prompts import *
from route_backend_user_agreement import register_route_backend_user_agreement
from route_backend_conversation_export import register_route_backend_conversation_export
from route_backend_thoughts import register_route_backend_thoughts
from route_backend_speech import register_route_backend_speech
from route_backend_tts import register_route_backend_tts
from route_enhanced_citations import register_enhanced_citations_routes
Expand Down Expand Up @@ -641,6 +642,9 @@ def list_semantic_kernel_plugins():
# ------------------- API User Agreement Routes ----------
register_route_backend_user_agreement(app)

# ------------------- API Thoughts Routes ----------------
register_route_backend_thoughts(app)

# ------------------- Extenral Health Routes ----------
register_route_external_health(app)

Expand Down
12 changes: 12 additions & 0 deletions application/single_app/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -459,6 +459,18 @@ def get_redis_cache_infrastructure_endpoint(redis_hostname: str) -> str:
default_ttl=-1 # TTL disabled by default, enabled per-document for auto-cleanup
)

cosmos_thoughts_container_name = "thoughts"
cosmos_thoughts_container = cosmos_database.create_container_if_not_exists(
id=cosmos_thoughts_container_name,
partition_key=PartitionKey(path="/user_id")
)

cosmos_archived_thoughts_container_name = "archive_thoughts"
cosmos_archived_thoughts_container = cosmos_database.create_container_if_not_exists(
id=cosmos_archived_thoughts_container_name,
partition_key=PartitionKey(path="/user_id")
)

def ensure_custom_logo_file_exists(app, settings):
"""
If custom_logo_base64 or custom_logo_dark_base64 is present in settings, ensure the appropriate
Expand Down
3 changes: 3 additions & 0 deletions application/single_app/functions_settings.py
Original file line number Diff line number Diff line change
Expand Up @@ -205,6 +205,9 @@ def get_settings(use_cosmos=False):
'require_member_of_feedback_admin': False,
'enable_conversation_archiving': False,

# Processing Thoughts
'enable_thoughts': False,

# Search and Extract
'azure_ai_search_endpoint': '',
'azure_ai_search_key': '',
Expand Down
256 changes: 256 additions & 0 deletions application/single_app/functions_thoughts.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,256 @@
# functions_thoughts.py

import uuid
import time
from datetime import datetime, timezone
from config import cosmos_thoughts_container, cosmos_archived_thoughts_container, cosmos_messages_container
from functions_appinsights import log_event
from functions_settings import get_settings


class ThoughtTracker:
"""Stateful per-request tracker that writes processing step records to Cosmos DB.

Each add_thought() call immediately upserts a document so that polling
clients can see partial progress before the final response is sent.

All Cosmos writes are wrapped in try/except so thought errors never
interrupt the chat processing flow.
"""

def __init__(self, conversation_id, message_id, thread_id, user_id):
self.conversation_id = conversation_id
self.message_id = message_id
self.thread_id = thread_id
self.user_id = user_id
self.current_index = 0
settings = get_settings()
self.enabled = settings.get('enable_thoughts', False)

def add_thought(self, step_type, content, detail=None):
"""Write a thought step to Cosmos immediately.

Args:
step_type: One of search, tabular_analysis, web_search,
agent_tool_call, generation, content_safety.
content: Short human-readable description of the step.
detail: Optional technical detail (function names, params, etc.).

Returns:
The thought document id, or None if disabled/failed.
"""
if not self.enabled:
return None

thought_id = str(uuid.uuid4())
thought_doc = {
'id': thought_id,
'conversation_id': self.conversation_id,
'message_id': self.message_id,
'thread_id': self.thread_id,
'user_id': self.user_id,
'step_index': self.current_index,
'step_type': step_type,
'content': content,
'detail': detail,
'duration_ms': None,
'timestamp': datetime.now(timezone.utc).isoformat()
}
self.current_index += 1

try:
cosmos_thoughts_container.upsert_item(thought_doc)
except Exception as e:
log_event(f"ThoughtTracker.add_thought failed: {e}", level="WARNING")
return None

return thought_id

def complete_thought(self, thought_id, duration_ms):
"""Patch an existing thought with its duration after the step finishes."""
if not self.enabled or not thought_id:
return

try:
thought_doc = cosmos_thoughts_container.read_item(
item=thought_id,
partition_key=self.user_id
)
thought_doc['duration_ms'] = duration_ms
cosmos_thoughts_container.upsert_item(thought_doc)
except Exception as e:
log_event(f"ThoughtTracker.complete_thought failed: {e}", level="WARNING")

def timed_thought(self, step_type, content, detail=None):
"""Convenience: add a thought and return a timer helper.

Usage:
timer = tracker.timed_thought('search', 'Searching documents...')
# ... do work ...
timer.stop()
"""
start = time.time()
thought_id = self.add_thought(step_type, content, detail)
return _ThoughtTimer(self, thought_id, start)


class _ThoughtTimer:
"""Helper returned by ThoughtTracker.timed_thought() for auto-duration capture."""

def __init__(self, tracker, thought_id, start_time):
self._tracker = tracker
self._thought_id = thought_id
self._start = start_time

def stop(self):
elapsed_ms = int((time.time() - self._start) * 1000)
self._tracker.complete_thought(self._thought_id, elapsed_ms)
return elapsed_ms


# ---------------------------------------------------------------------------
# CRUD helpers
# ---------------------------------------------------------------------------

def get_thoughts_for_message(conversation_id, message_id, user_id):
"""Return all thoughts for a specific assistant message, ordered by step_index."""
try:
query = (
"SELECT * FROM c "
"WHERE c.conversation_id = @conv_id "
"AND c.message_id = @msg_id "
"ORDER BY c.step_index ASC"
)
params = [
{"name": "@conv_id", "value": conversation_id},
{"name": "@msg_id", "value": message_id},
]
results = list(cosmos_thoughts_container.query_items(
query=query,
parameters=params,
partition_key=user_id
))
return results
except Exception as e:
log_event(f"get_thoughts_for_message failed: {e}", level="WARNING")
return []


def get_pending_thoughts(conversation_id, user_id):
"""Return the latest thoughts for a conversation that are still in-progress.

Used by the polling endpoint. Retrieves thoughts created within the last
5 minutes for the conversation, grouped by the most recent message_id.
"""
try:
five_minutes_ago = datetime.now(timezone.utc)
from datetime import timedelta
five_minutes_ago = (five_minutes_ago - timedelta(minutes=5)).isoformat()

query = (
"SELECT * FROM c "
"WHERE c.conversation_id = @conv_id "
"AND c.timestamp >= @since "
"ORDER BY c.timestamp DESC"
)
params = [
{"name": "@conv_id", "value": conversation_id},
{"name": "@since", "value": five_minutes_ago},
]
results = list(cosmos_thoughts_container.query_items(
query=query,
parameters=params,
partition_key=user_id
))

if not results:
return []

# Group by the most recent message_id
latest_message_id = results[0].get('message_id')
latest_thoughts = [
t for t in results if t.get('message_id') == latest_message_id
]
# Return in ascending step_index order
latest_thoughts.sort(key=lambda t: t.get('step_index', 0))
return latest_thoughts
except Exception as e:
log_event(f"get_pending_thoughts failed: {e}", level="WARNING")
return []


def get_thoughts_for_conversation(conversation_id, user_id):
"""Return all thoughts for a conversation."""
try:
query = (
"SELECT * FROM c "
"WHERE c.conversation_id = @conv_id "
"ORDER BY c.timestamp ASC"
)
params = [
{"name": "@conv_id", "value": conversation_id},
]
results = list(cosmos_thoughts_container.query_items(
query=query,
parameters=params,
partition_key=user_id
))
return results
except Exception as e:
log_event(f"get_thoughts_for_conversation failed: {e}", level="WARNING")
return []


def archive_thoughts_for_conversation(conversation_id, user_id):
"""Copy all thoughts for a conversation to the archive container, then delete originals."""
try:
thoughts = get_thoughts_for_conversation(conversation_id, user_id)
for thought in thoughts:
archived = dict(thought)
archived['archived_at'] = datetime.now(timezone.utc).isoformat()
cosmos_archived_thoughts_container.upsert_item(archived)

for thought in thoughts:
cosmos_thoughts_container.delete_item(
item=thought['id'],
partition_key=user_id
)
except Exception as e:
log_event(f"archive_thoughts_for_conversation failed: {e}", level="WARNING")


def delete_thoughts_for_conversation(conversation_id, user_id):
"""Delete all thoughts for a conversation."""
try:
thoughts = get_thoughts_for_conversation(conversation_id, user_id)
for thought in thoughts:
cosmos_thoughts_container.delete_item(
item=thought['id'],
partition_key=user_id
)
except Exception as e:
log_event(f"delete_thoughts_for_conversation failed: {e}", level="WARNING")


def delete_thoughts_for_message(message_id, user_id):
"""Delete all thoughts associated with a specific assistant message."""
try:
query = (
"SELECT * FROM c "
"WHERE c.message_id = @msg_id"
)
params = [
{"name": "@msg_id", "value": message_id},
]
results = list(cosmos_thoughts_container.query_items(
query=query,
parameters=params,
partition_key=user_id
))
for thought in results:
cosmos_thoughts_container.delete_item(
item=thought['id'],
partition_key=user_id
)
except Exception as e:
log_event(f"delete_thoughts_for_message failed: {e}", level="WARNING")
Loading