diff --git a/backend/app/alembic/versions/043_add_score_trace_url_to_evaluation_run.py b/backend/app/alembic/versions/043_add_score_trace_url_to_evaluation_run.py new file mode 100644 index 000000000..19620401d --- /dev/null +++ b/backend/app/alembic/versions/043_add_score_trace_url_to_evaluation_run.py @@ -0,0 +1,32 @@ +"""Add score_trace_url to evaluation_run + +Revision ID: 043 +Revises: 042 +Create Date: 2026-01-24 19:34:46.763908 + +""" +from alembic import op +import sqlalchemy as sa +import sqlmodel.sql.sqltypes + + +revision = "043" +down_revision = "042" +branch_labels = None +depends_on = None + + +def upgrade(): + op.add_column( + "evaluation_run", + sa.Column( + "score_trace_url", + sqlmodel.sql.sqltypes.AutoString(), + nullable=True, + comment="S3 URL where per-trace evaluation scores are stored", + ), + ) + + +def downgrade(): + op.drop_column("evaluation_run", "score_trace_url") diff --git a/backend/app/core/storage_utils.py b/backend/app/core/storage_utils.py index 63830d7d0..bf87033f0 100644 --- a/backend/app/core/storage_utils.py +++ b/backend/app/core/storage_utils.py @@ -84,10 +84,7 @@ def __init__(self, content: bytes): def upload_jsonl_to_object_store( - storage: CloudStorage, - results: list[dict], - filename: str, - subdirectory: str, + storage: CloudStorage, results: list[dict], filename: str, subdirectory: str ) -> str | None: """ Upload JSONL (JSON Lines) content to object store. @@ -114,8 +111,7 @@ def upload_jsonl_to_object_store( # Create file path file_path = Path(subdirectory) / filename - # Convert results to JSONL - jsonl_content = "\n".join([json.dumps(result) for result in results]) + jsonl_content = json.dumps(results, ensure_ascii=False) content_bytes = jsonl_content.encode("utf-8") # Create UploadFile-like object @@ -152,6 +148,37 @@ def upload_jsonl_to_object_store( return None +def load_json_from_object_store(storage: CloudStorage, url: str) -> list | None: + logger.info(f"[load_json_from_object_store] Loading JSON from '{url}") + try: + body = storage.stream(url) + content = body.read() + + data = json.loads(content.decode("utf-8")) + + logger.info( + f"[load_json_from_object_store] Download successful | " + f"url='{url}', size={len(content)} bytes" + ) + return data + except CloudStorageError as e: + logger.warning( + f"[load_json_from_object_store] failed to load JSON from '{url}': {e}", + ) + return None + except json.JSONDecodeError as e: + logger.warning( + f"[load_json_from_object_store] JSON decode error loading JSON from '{url}': {e}", + ) + return None + except Exception as e: + logger.warning( + f"[load_json_from_object_store] unexpected error loading JSON from '{url}': {e}", + exc_info=True, + ) + return None + + def generate_timestamped_filename(base_name: str, extension: str = "csv") -> str: """ Generate a filename with timestamp. diff --git a/backend/app/crud/evaluations/core.py b/backend/app/crud/evaluations/core.py index 6d17afe60..7e2b593b2 100644 --- a/backend/app/crud/evaluations/core.py +++ b/backend/app/crud/evaluations/core.py @@ -187,6 +187,7 @@ def update_evaluation_run( status: str | None = None, error_message: str | None = None, object_store_url: str | None = None, + score_trace_url: str | None = None, score: dict | None = None, embedding_batch_job_id: int | None = None, ) -> EvaluationRun: @@ -219,6 +220,8 @@ def update_evaluation_run( eval_run.score = score if embedding_batch_job_id is not None: eval_run.embedding_batch_job_id = embedding_batch_job_id + if score_trace_url is not None: + eval_run.score_trace_url = score_trace_url # Always update timestamp eval_run.updated_at = now() @@ -336,6 +339,8 @@ def save_score( Updated EvaluationRun instance, or None if not found """ from app.core.db import engine + from app.core.cloud.storage import get_cloud_storage + from app.core.storage_utils import upload_jsonl_to_object_store with Session(engine) as session: eval_run = get_evaluation_run_by_id( @@ -344,12 +349,60 @@ def save_score( organization_id=organization_id, project_id=project_id, ) - if eval_run: - update_evaluation_run(session=session, eval_run=eval_run, score=score) - logger.info( - f"[save_score] Saved score | evaluation_id={eval_run_id} | " - f"traces={len(score.get('traces', []))}" - ) + if not eval_run: + return None + + traces = score.get("traces", []) + summary_score = score.get("summary_scores", []) + score_trace_url = None + + if traces: + try: + storage = get_cloud_storage(session=session, project_id=project_id) + score_trace_url = upload_jsonl_to_object_store( + storage=storage, + results=traces, + filename=f"traces_{eval_run_id}.json", + subdirectory=f"evaluations/score/{eval_run_id}", + ) + if score_trace_url: + logger.info( + f"[save_score] uploaded traces to S3 | " + f"evaluation_id={eval_run_id} | url={score_trace_url} | " + f"traces_count={len(traces)}" + ) + else: + logger.warning( + f"[save_score] failed to upload traces to S3, " + f"falling back to DB storage | evaluation_id={eval_run_id}" + ) + except Exception as e: + logger.error( + f"[save_score] Error uploading traces to S3: {e} | " + f"evaluation_id={eval_run_id}", + exc_info=True, + ) + + # IF TRACES DATA IS STORED IN S3 URL THEN HERE WE ARE JUST STORING THE SUMMARY SCORE + # TODO: Evaluate weather this behaviour is needed or completely discard the storing data in db + if score_trace_url: + db_score = {"summary_scores": summary_score} + else: + # fallback to store data in db if failed to store in s3 + db_score = score + + update_evaluation_run( + session=session, + eval_run=eval_run, + score=db_score, + score_trace_url=score_trace_url, + ) + + logger.info( + f"[save_score] Saved score | evaluation_id={eval_run_id} | " + f"traces={len(score.get('traces', []))}" + ) + return eval_run diff --git a/backend/app/models/evaluation.py b/backend/app/models/evaluation.py index 6ae4542fb..5aa8bf29d 100644 --- a/backend/app/models/evaluation.py +++ b/backend/app/models/evaluation.py @@ -250,6 +250,13 @@ class EvaluationRun(SQLModel, table=True): description="Object store URL of processed evaluation results for future reference", sa_column_kwargs={"comment": "S3 URL of processed evaluation results"}, ) + score_trace_url: str | None = SQLField( + default=None, + description="S3 URL per-trace score data is stored", + sa_column_kwargs={ + "comment": "S3 URL where per-trace evaluation scores are stored" + }, + ) total_items: int = SQLField( default=0, description="Total number of items evaluated (set during processing)", diff --git a/backend/app/services/evaluations/evaluation.py b/backend/app/services/evaluations/evaluation.py index 785eb02af..42f1f44f8 100644 --- a/backend/app/services/evaluations/evaluation.py +++ b/backend/app/services/evaluations/evaluation.py @@ -189,6 +189,9 @@ def get_evaluation_with_scores( Returns: Tuple of (EvaluationRun or None, error_message or None) """ + from app.core.cloud.storage import get_cloud_storage + from app.core.storage_utils import load_json_from_object_store + logger.info( f"[get_evaluation_with_scores] Fetching status for evaluation run | " f"evaluation_id={evaluation_id} | " @@ -227,9 +230,41 @@ def get_evaluation_with_scores( return eval_run, None # Check if we already have cached traces - has_cached_traces = eval_run.score is not None and "traces" in eval_run.score - if not resync_score and has_cached_traces: - return eval_run, None + has_cached_traces_s3 = eval_run.score_trace_url is not None + has_cached_traces_db = eval_run.score is not None and "traces" in eval_run.score + if not resync_score: + if has_cached_traces_s3: + try: + storage = get_cloud_storage(session=session, project_id=project_id) + traces = load_json_from_object_store( + storage=storage, url=eval_run.score_trace_url + ) + if traces is not None: + eval_run.score = { + "summary_scores": (eval_run.score or {}).get( + "summary_scores", [] + ), + "traces": traces, + } + logger.info( + f"[get_evaluation_with_scores] Loaded traces from S3 | " + f"evaluation_id={evaluation_id} | " + f"traces_count={len(traces)}" + ) + return eval_run, None + except Exception as e: + logger.error( + f"[get_evaluation_with_scores] Error loading traces from S3: {e} | " + f"evaluation_id={evaluation_id}", + exc_info=True, + ) + + if has_cached_traces_db: + logger.info( + f"[get_evaluation_with_scores] Returning traces from DB | " + f"evaluation_id={evaluation_id}" + ) + return eval_run, None langfuse = get_langfuse_client( session=session, @@ -289,4 +324,7 @@ def get_evaluation_with_scores( score=score, ) + if eval_run: + eval_run.score = score + return eval_run, None diff --git a/backend/app/tests/core/test_storage_utils.py b/backend/app/tests/core/test_storage_utils.py new file mode 100644 index 000000000..1df88168c --- /dev/null +++ b/backend/app/tests/core/test_storage_utils.py @@ -0,0 +1,98 @@ +"""Tests for storage_utils.py - upload and load functions for object store.""" + +import json +from io import BytesIO +from unittest.mock import MagicMock + +import pytest + +from app.core.cloud.storage import CloudStorageError +from app.core.storage_utils import ( + load_json_from_object_store, + upload_jsonl_to_object_store, +) + + +class TestUploadJsonlToObjectStore: + """Test uploading JSON content to object store.""" + + def test_upload_success(self) -> None: + """Verify successful upload returns URL and content is correct.""" + mock_storage = MagicMock() + mock_storage.put.return_value = "s3://bucket/path/traces.json" + + results = [{"trace_id": "t1", "score": 0.9}] + + url = upload_jsonl_to_object_store( + storage=mock_storage, + results=results, + filename="traces.json", + subdirectory="evaluations/score/70", + ) + + assert url == "s3://bucket/path/traces.json" + mock_storage.put.assert_called_once() + + # Verify content is valid JSON + call_args = mock_storage.put.call_args + upload_file = call_args.kwargs.get("source") + content = upload_file.file.read().decode("utf-8") + assert json.loads(content) == results + + def test_upload_returns_none_on_error(self) -> None: + """Verify returns None on CloudStorageError.""" + mock_storage = MagicMock() + mock_storage.put.side_effect = CloudStorageError("Upload failed") + + url = upload_jsonl_to_object_store( + storage=mock_storage, + results=[{"id": 1}], + filename="test.json", + subdirectory="test", + ) + + assert url is None + + +class TestLoadJsonFromObjectStore: + """Test loading JSON from object store.""" + + def test_load_success(self) -> None: + """Verify successful load returns parsed JSON.""" + mock_storage = MagicMock() + test_data = [{"id": 1, "value": "test"}] + mock_storage.stream.return_value = BytesIO( + json.dumps(test_data).encode("utf-8") + ) + + result = load_json_from_object_store( + storage=mock_storage, + url="s3://bucket/path/file.json", + ) + + assert result == test_data + mock_storage.stream.assert_called_once_with("s3://bucket/path/file.json") + + def test_load_returns_none_on_error(self) -> None: + """Verify returns None on CloudStorageError.""" + mock_storage = MagicMock() + mock_storage.stream.side_effect = CloudStorageError("Download failed") + + result = load_json_from_object_store( + storage=mock_storage, + url="s3://bucket/file.json", + ) + + assert result is None + + def test_load_returns_none_on_invalid_json(self) -> None: + """Verify returns None on invalid JSON content.""" + mock_storage = MagicMock() + mock_storage.stream.return_value = BytesIO(b"not valid json") + + result = load_json_from_object_store( + storage=mock_storage, + url="s3://bucket/file.json", + ) + + assert result is None diff --git a/backend/app/tests/crud/evaluations/test_score_storage.py b/backend/app/tests/crud/evaluations/test_score_storage.py new file mode 100644 index 000000000..6c455b7ad --- /dev/null +++ b/backend/app/tests/crud/evaluations/test_score_storage.py @@ -0,0 +1,110 @@ +"""Tests for save_score() S3 upload functionality.""" + +from unittest.mock import MagicMock, patch + +import pytest + +from app.crud.evaluations.core import save_score +from app.models import EvaluationRun + + +class TestSaveScoreS3Upload: + """Test save_score() S3 upload functionality.""" + + @pytest.fixture + def mock_eval_run(self): + """Create a mock EvaluationRun.""" + eval_run = MagicMock(spec=EvaluationRun) + eval_run.id = 100 + return eval_run + + @patch("app.crud.evaluations.core.update_evaluation_run") + @patch("app.crud.evaluations.core.get_evaluation_run_by_id") + @patch("app.core.storage_utils.upload_jsonl_to_object_store") + @patch("app.core.cloud.storage.get_cloud_storage") + @patch("app.core.db.engine") + def test_uploads_traces_to_s3_and_stores_summary_only( + self, + mock_engine, + mock_get_storage, + mock_upload, + mock_get_eval, + mock_update, + mock_eval_run, + ) -> None: + """Verify traces uploaded to S3, only summary_scores stored in DB.""" + mock_get_eval.return_value = mock_eval_run + mock_get_storage.return_value = MagicMock() + mock_upload.return_value = "s3://bucket/traces.json" + + score = { + "summary_scores": [{"name": "accuracy", "avg": 0.9}], + "traces": [{"trace_id": "t1"}], + } + + save_score(eval_run_id=100, organization_id=1, project_id=1, score=score) + + # Verify upload was called with traces + mock_upload.assert_called_once() + assert mock_upload.call_args.kwargs["results"] == [{"trace_id": "t1"}] + + # Verify DB gets summary only, not traces + call_kwargs = mock_update.call_args.kwargs + assert call_kwargs["score"] == { + "summary_scores": [{"name": "accuracy", "avg": 0.9}] + } + assert call_kwargs["score_trace_url"] == "s3://bucket/traces.json" + + @patch("app.crud.evaluations.core.update_evaluation_run") + @patch("app.crud.evaluations.core.get_evaluation_run_by_id") + @patch("app.core.storage_utils.upload_jsonl_to_object_store") + @patch("app.core.cloud.storage.get_cloud_storage") + @patch("app.core.db.engine") + def test_fallback_to_db_when_s3_fails( + self, + mock_engine, + mock_get_storage, + mock_upload, + mock_get_eval, + mock_update, + mock_eval_run, + ) -> None: + """Verify full score stored in DB when S3 upload fails.""" + mock_get_eval.return_value = mock_eval_run + mock_get_storage.return_value = MagicMock() + mock_upload.return_value = None # S3 failed + + score = { + "summary_scores": [{"name": "accuracy", "avg": 0.9}], + "traces": [{"trace_id": "t1"}], + } + + save_score(eval_run_id=100, organization_id=1, project_id=1, score=score) + + # Full score stored in DB as fallback + call_kwargs = mock_update.call_args.kwargs + assert call_kwargs["score"] == score + assert call_kwargs["score_trace_url"] is None + + @patch("app.crud.evaluations.core.update_evaluation_run") + @patch("app.crud.evaluations.core.get_evaluation_run_by_id") + @patch("app.core.storage_utils.upload_jsonl_to_object_store") + @patch("app.core.cloud.storage.get_cloud_storage") + @patch("app.core.db.engine") + def test_no_s3_upload_when_no_traces( + self, + mock_engine, + mock_get_storage, + mock_upload, + mock_get_eval, + mock_update, + mock_eval_run, + ) -> None: + """Verify S3 upload skipped when traces is empty.""" + mock_get_eval.return_value = mock_eval_run + + score = {"summary_scores": [{"name": "accuracy", "avg": 0.9}], "traces": []} + + save_score(eval_run_id=100, organization_id=1, project_id=1, score=score) + + mock_upload.assert_not_called() diff --git a/backend/app/tests/services/__init__.py b/backend/app/tests/services/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/backend/app/tests/services/evaluations/__init__.py b/backend/app/tests/services/evaluations/__init__.py new file mode 100644 index 000000000..293031958 --- /dev/null +++ b/backend/app/tests/services/evaluations/__init__.py @@ -0,0 +1 @@ +"""Evaluation service tests.""" diff --git a/backend/app/tests/services/evaluations/test_evaluation_service_s3.py b/backend/app/tests/services/evaluations/test_evaluation_service_s3.py new file mode 100644 index 000000000..28a91615c --- /dev/null +++ b/backend/app/tests/services/evaluations/test_evaluation_service_s3.py @@ -0,0 +1,120 @@ +"""Tests for get_evaluation_with_scores() S3 retrieval.""" + +from unittest.mock import MagicMock, patch + +import pytest + +from app.models import EvaluationRun +from app.services.evaluations.evaluation import get_evaluation_with_scores + + +class TestGetEvaluationWithScoresS3: + """Test get_evaluation_with_scores() S3 retrieval.""" + + @pytest.fixture + def completed_eval_run_with_s3(self): + """Completed eval run with S3 URL.""" + eval_run = MagicMock(spec=EvaluationRun) + eval_run.id = 100 + eval_run.status = "completed" + eval_run.score = {"summary_scores": [{"name": "accuracy", "avg": 0.9}]} + eval_run.score_trace_url = "s3://bucket/traces.json" + eval_run.dataset_name = "test_dataset" + eval_run.run_name = "test_run" + return eval_run + + @pytest.fixture + def completed_eval_run_with_db_traces(self): + """Completed eval run with traces in DB.""" + eval_run = MagicMock(spec=EvaluationRun) + eval_run.id = 101 + eval_run.status = "completed" + eval_run.score = { + "summary_scores": [{"name": "accuracy", "avg": 0.85}], + "traces": [{"trace_id": "db_trace"}], + } + eval_run.score_trace_url = None + return eval_run + + @patch("app.services.evaluations.evaluation.get_evaluation_run_by_id") + @patch("app.core.storage_utils.load_json_from_object_store") + @patch("app.core.cloud.storage.get_cloud_storage") + def test_loads_traces_from_s3( + self, mock_get_storage, mock_load, mock_get_eval, completed_eval_run_with_s3 + ) -> None: + """Verify traces loaded from S3 and score reconstructed.""" + mock_get_eval.return_value = completed_eval_run_with_s3 + mock_get_storage.return_value = MagicMock() + mock_load.return_value = [{"trace_id": "s3_trace"}] + + result, error = get_evaluation_with_scores( + session=MagicMock(), + evaluation_id=100, + organization_id=1, + project_id=1, + get_trace_info=True, + resync_score=False, + ) + + assert error is None + mock_load.assert_called_once() + assert result.score["traces"] == [{"trace_id": "s3_trace"}] + assert result.score["summary_scores"] == [{"name": "accuracy", "avg": 0.9}] + + @patch("app.services.evaluations.evaluation.get_evaluation_run_by_id") + @patch("app.core.cloud.storage.get_cloud_storage") + def test_returns_db_traces_when_no_s3_url( + self, mock_get_storage, mock_get_eval, completed_eval_run_with_db_traces + ) -> None: + """Verify DB traces returned when no S3 URL.""" + mock_get_eval.return_value = completed_eval_run_with_db_traces + + result, error = get_evaluation_with_scores( + session=MagicMock(), + evaluation_id=101, + organization_id=1, + project_id=1, + get_trace_info=True, + resync_score=False, + ) + + assert error is None + mock_get_storage.assert_not_called() + assert result.score["traces"] == [{"trace_id": "db_trace"}] + + @patch("app.services.evaluations.evaluation.save_score") + @patch("app.services.evaluations.evaluation.fetch_trace_scores_from_langfuse") + @patch("app.services.evaluations.evaluation.get_langfuse_client") + @patch("app.services.evaluations.evaluation.get_evaluation_run_by_id") + @patch("app.core.storage_utils.load_json_from_object_store") + @patch("app.core.cloud.storage.get_cloud_storage") + def test_resync_bypasses_cache_and_fetches_langfuse( + self, + mock_get_storage, + mock_load, + mock_get_eval, + mock_get_langfuse, + mock_fetch_langfuse, + mock_save_score, + completed_eval_run_with_s3, + ) -> None: + """Verify resync=True skips S3/DB and fetches from Langfuse.""" + mock_get_eval.return_value = completed_eval_run_with_s3 + mock_get_langfuse.return_value = MagicMock() + mock_fetch_langfuse.return_value = { + "summary_scores": [], + "traces": [{"trace_id": "new"}], + } + mock_save_score.return_value = completed_eval_run_with_s3 + + get_evaluation_with_scores( + session=MagicMock(), + evaluation_id=100, + organization_id=1, + project_id=1, + get_trace_info=True, + resync_score=True, + ) + + mock_load.assert_not_called() # S3 skipped + mock_fetch_langfuse.assert_called_once() # Langfuse called