-
Notifications
You must be signed in to change notification settings - Fork 9
Evaluation: STT #571
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Evaluation: STT #571
Conversation
…o feature/stt-evaluation
…o feature/stt-evaluation
📝 WalkthroughWalkthroughAdds end-to-end STT evaluation: DB migration and models; API routes/docs for audio upload, datasets, runs, and feedback; CRUD/services and Gemini batch provider for batch transcription; storage/mime utilities and signed-URL limits; cron polling; tests and dependency/export updates. Changes
Sequence Diagram(s)sequenceDiagram
participant Client
participant API as "STT API"
participant Services as "STT Services / CRUD"
participant Storage as "Cloud Storage"
participant Gemini as "Gemini Batch Provider"
participant DB as Database
Client->>API: POST /evaluations/stt/files/audio (file)
API->>Services: validate_audio_file
Services->>Storage: upload file -> s3_url
Services-->>API: AudioUploadResponse
Client->>API: POST /evaluations/stt/datasets (name + sample s3_urls)
API->>Services: upload_stt_dataset
Services->>DB: insert EvaluationDataset + STTSample records
Services-->>API: STTDatasetPublic
Client->>API: POST /evaluations/stt/runs (dataset_id, providers)
API->>Services: create_stt_run -> create STTResult records
Services->>Storage: generate signed URLs for samples
Services->>Gemini: create batch (upload JSONL)
Gemini-->>Services: provider_batch_id / status
Services->>DB: update run (batch_job_id, status=processing)
Services-->>API: STTEvaluationRunPublic
loop Cron
Services->>Gemini: poll batch status
alt completed
Gemini->>Services: download results
Services->>DB: update STTResult transcriptions/status
Services->>DB: update run status -> completed
else still_processing
Note over Services: await next cycle
end
end
Client->>API: PATCH /evaluations/stt/results/{id} (is_correct, comment)
API->>Services: update_human_feedback
Services->>DB: update result (is_correct, comment)
Services-->>API: STTResultPublic
Estimated code review effort🎯 4 (Complex) | ⏱️ ~60 minutes Possibly related PRs
Suggested labels
Suggested reviewers
Poem
🚥 Pre-merge checks | ✅ 3✅ Passed checks (3 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing touches
🧪 Generate unit tests (beta)
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
Codecov Report❌ Patch coverage is 📢 Thoughts on this report? Let us know! |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 16
🤖 Fix all issues with AI agents
In `@backend/app/api/routes/stt_evaluations/dataset.py`:
- Around line 51-66: Remove the extra sample_count argument when constructing
STTDatasetPublic: in the return block that creates STTDatasetPublic (using
symbols STTDatasetPublic, dataset, samples, dataset_metadata), delete the
sample_count=len(samples) entry so the model matches the schema (sample count is
available via dataset.dataset_metadata) and avoid passing an unsupported field
to Pydantic.
In `@backend/app/api/routes/stt_evaluations/evaluation.py`:
- Around line 67-98: The code currently fetches samples with
get_samples_by_dataset_id(..., limit=10000) which truncates datasets >10k and
makes total_items (used in create_stt_run) drift; replace the single call with a
paginated fetch using get_samples_by_dataset_id to iterate pages until no more
samples, accumulate all samples (update the samples variable) and recompute
sample_count from the final samples length before calling create_stt_run (so
total_items = len(samples) * len(run_create.providers)), then call
create_stt_results with the full samples list; reference
get_samples_by_dataset_id, create_stt_run, create_stt_results, run_create,
samples, sample_count and total_items when implementing the fix.
In `@backend/app/core/batch/gemini.py`:
- Around line 58-67: The __init__ method in the Gemini batch provider is missing
an explicit return type which violates strict mypy rules; update the signature
of the __init__ method (the one that initializes self._client and self._model
and uses DEFAULT_MODEL) to include an explicit "-> None" return annotation so it
reads as a constructor that returns None, leaving the body unchanged.
- Around line 150-166: Add a small helper function named
_normalize_state(state_obj) that safely converts Gemini state values to a
normalized string (handle None, strings, Enum members, objects with
.name/.value, and test mocks) and use it in get_batch_status (where
batch_job.state is read) and any other comparisons (e.g., checks against
_FAILED_STATES and BatchJobState.SUCCEEDED.value); specifically, replace direct
uses of batch_job.state.name with state = _normalize_state(batch_job.state) and
ensure the helper returns the same casing/format as entries in _FAILED_STATES
and BatchJobState.SUCCEEDED.value so the membership and equality checks work
reliably in production and tests.
In `@backend/app/core/storage_utils.py`:
- Line 85: The call to storage.put uses the keyword file_path which doesn't
match the abstract CloudStorage.put signature (expects filepath); update the
invocation in storage.put(source=upload_file, file_path=file_path) to use the
correct parameter name (storage.put(source=upload_file, filepath=file_path)) so
it matches CloudStorage.put; alternatively, ensure all implementations of
CloudStorage.put use the same parameter name filepath, but the quickest fix is
changing the call to use filepath to avoid the TypeError.
In `@backend/app/crud/stt_evaluations/batch.py`:
- Around line 119-134: The batch job config is missing the stt_provider key so
process_completed_stt_batch can't match non-default models; when calling
start_batch_job (where model is derived from run.providers or DEFAULT_MODEL and
model_path/batch_provider are created) include "stt_provider": model in the
config argument (e.g., add stt_provider: model alongside the existing "model"
entry) so batch_job.config["stt_provider"] is available for result matching.
In `@backend/app/crud/stt_evaluations/cron.py`:
- Around line 105-191: poll_stt_run currently returns failure results in-memory
but doesn't persist run status to DB in some failure paths; update run status to
avoid leaving runs stuck in "processing". Specifically: when run.batch_job_id is
missing (inside poll_stt_run) call update_stt_run(session, run_id=run.id,
status="failed", error_message="No batch job found"); when get_batch_job returns
None do the same with error_message="Batch job not found"; and in the except
Exception block call update_stt_run(session, run_id=run.id, status="failed",
error_message=str(e)) before returning. Keep existing successful and
batch-failed update logic (process_completed_stt_batch and the existing update
for provider failures).
- Around line 194-310: When an exception occurs in process_completed_stt_batch
you must mark all pending STTResult rows for this run (and the same
stt_provider) as failed and update the EvaluationRun to a failed state before
re-raising; inside the except block (in process_completed_stt_batch) query
STTResult where evaluation_run_id == run.id, provider == stt_provider, and
status == STTResultStatus.PENDING.value, iterate those results and call
update_stt_result(session=session, result_id=result.id,
status=STTResultStatus.FAILED.value, error_message=str(e)) for each, then call
update_stt_run(session=session, run_id=run.id, status="failed",
error_message=str(e)) and only then re-raise the exception.
In `@backend/app/crud/stt_evaluations/result.py`:
- Around line 196-242: The update_stt_result function currently selects
STTResult only by result_id which permits cross-tenant writes; modify
update_stt_result to accept org_id and project_id parameters (e.g., add org_id:
int and project_id: int), and change the query (the select(STTResult).where(...)
call) to include filters for STTResult.org_id == org_id and STTResult.project_id
== project_id in addition to STTResult.id == result_id; update the docstring and
type hints accordingly and update all callers to pass the appropriate org_id and
project_id so updates are scoped to the correct tenant.
In `@backend/app/crud/stt_evaluations/run.py`:
- Around line 180-233: The update_stt_run function currently queries
EvaluationRun by run_id only, allowing cross-tenant or wrong-type updates;
change the select in update_stt_run to filter by run_id plus the caller's
org_id, project_id and EvaluationRun.type == "stt" (or the appropriate
enum/constant) and require passing org_id and project_id into update_stt_run
signature (or accept a context object) so callers supply those values; update
all call sites to pass the org_id/project_id and ensure only matching rows are
updated, then keep the same attribute-setting/commit workflow.
In `@backend/app/models/evaluation.py`:
- Around line 220-228: The providers field currently sets its comment inside
sa_column=Column(...), but per file conventions JSONB/metadata columns should
use sa_column_kwargs for column metadata; update the SQLField declaration for
providers (the providers: list[str] | None = SQLField(...)) to remove the
comment from the Column(...) and instead pass it via
sa_column_kwargs={"comment": "List of STT/TTS providers used (e.g.,
['gemini-2.5-pro'])"}, keeping nullable=True and JSONB in sa_column or using
sa_column=Column(JSONB, nullable=True) without a comment.
In `@backend/app/models/stt_evaluation.py`:
- Around line 318-328: The providers field on STTEvaluationRunCreate must
disallow empty lists and avoid a mutable default: change the Field(...) for
providers in class STTEvaluationRunCreate to use default_factory that returns
["gemini-2.5-pro"] (instead of a mutable list literal) and add a minimum-items
constraint (e.g., min_items=1) so an empty providers list is rejected before
batch submission.
In `@backend/app/services/stt_evaluations/gemini/client.py`:
- Around line 24-32: The __init__ method in the Gemini client is missing an
explicit return type; update the method signature for def __init__(self,
api_key: str) to include -> None (i.e., def __init__(self, api_key: str) ->
None:) to satisfy the typing guideline, leaving the body that assigns
self._api_key and creates self._client unchanged.
In `@backend/app/services/stt_evaluations/gemini/files.py`:
- Around line 54-61: The __init__ constructor for the files manager is missing
an explicit return type; update the signature of def __init__(self, client:
genai.Client) in the files manager class to include the return type annotation
-> None so the constructor is fully typed per project guidelines.
In `@backend/app/tests/api/routes/test_stt_evaluation.py`:
- Around line 67-102: The STTDatasetPublic Pydantic model is missing the
sample_count field so values passed by the route (dataset creation in the
handler that sets sample_count=len(samples)) are dropped; add a field
declaration sample_count: int to the STTDatasetPublic model (in
backend/app/models/stt_evaluation.py) so the API responses for create/get
include sample_count, and run tests to verify the dataset response now contains
the sample_count value.
In `@backend/app/tests/core/batch/test_gemini.py`:
- Around line 65-77: The mock_batch_job.state is incorrectly configured using
the name parameter in MagicMock, which sets the internal repr name rather than
the .name attribute. When the actual code in gemini.py accesses
batch_job.state.name, it will return an auto-generated MagicMock instead of the
string "JOB_STATE_PENDING", causing the assertion for result["provider_status"]
to fail. Fix this by first creating mock_batch_job.state as a plain MagicMock
object, then explicitly setting its .name attribute to "JOB_STATE_PENDING" as a
separate assignment statement.
🧹 Nitpick comments (9)
backend/app/core/cloud/storage.py (1)
216-228: Good security hardening for signed URL expiry.The 24-hour cap prevents excessively long-lived signed URLs, which is a sound security practice.
Consider logging when the expiry is capped, so callers are aware their requested expiry was reduced:
💡 Optional: Log when expiry is capped
# Cap expiry at maximum allowed value to prevent excessively long-lived URLs + if expires_in > self.MAX_SIGNED_URL_EXPIRY: + logger.debug( + f"[AmazonCloudStorage.get_signed_url] Capping expiry from {expires_in}s to {self.MAX_SIGNED_URL_EXPIRY}s" + ) expires_in = min(expires_in, self.MAX_SIGNED_URL_EXPIRY)backend/app/core/storage_utils.py (1)
166-166: Consider using a standard MIME type for JSONL.
application/jsonlis not a registered IANA MIME type. Common alternatives areapplication/x-ndjsonorapplication/jsonlines. While this may not cause immediate issues, using a non-standard type could affect interoperability with external systems that validate content types.backend/app/services/stt_evaluations/audio.py (2)
173-178: Content-type fallback may produce incorrect MIME types.The fallback
f"audio/{extension}"will produceaudio/m4afor M4A files, but the canonical MIME type isaudio/mp4. Consider using theEXTENSION_TO_MIMEmapping from constants for accuracy.♻️ Proposed fix
+from app.services.stt_evaluations.constants import ( + EXTENSION_TO_MIME, MAX_FILE_SIZE_BYTES, MIME_TO_EXTENSION, SUPPORTED_AUDIO_FORMATS, )Then update the return statement:
return AudioUploadResponse( s3_url=s3_url, filename=file.filename or new_filename, size_bytes=size_bytes, - content_type=file.content_type or f"audio/{extension}", + content_type=file.content_type or EXTENSION_TO_MIME.get(extension, f"audio/{extension}"), )
160-166: Returningsize_bytes=0may be misleading to API consumers.If both
storage.get_file_size_kb()fails andfile.sizeisNone, the response will containsize_bytes=0, which could be misinterpreted as an empty file. Consider logging a warning when falling back to 0, or documenting this behavior.♻️ Proposed enhancement
try: size_kb = storage.get_file_size_kb(s3_url) size_bytes = int(size_kb * 1024) - except Exception: + except Exception as size_err: # If we can't get size from S3, use the upload file size size_bytes = file.size or 0 + if size_bytes == 0: + logger.warning( + f"[upload_audio_file] Could not determine file size | " + f"s3_url: {s3_url}, error: {size_err}" + )backend/app/api/routes/stt_evaluations/files.py (1)
25-42: Misleading underscore prefix on_sessionparameter.The
_sessionparameter is actually used (passed toupload_audio_fileat line 37), but the underscore prefix conventionally indicates an unused variable. Consider renaming tosessionfor clarity.Suggested fix
def upload_audio( - _session: SessionDep, + session: SessionDep, auth_context: AuthContextDep, file: UploadFile = File(..., description="Audio file to upload"), ) -> APIResponse[AudioUploadResponse]: """Upload an audio file for STT evaluation.""" logger.info( f"[upload_audio] Uploading audio file | " f"project_id: {auth_context.project_.id}, filename: {file.filename}" ) result = upload_audio_file( - session=_session, + session=session, file=file, project_id=auth_context.project_.id, )backend/app/services/stt_evaluations/dataset.py (1)
60-66: Object store upload failure is silently ignored.The
_upload_samples_to_object_storefunction returnsNoneon failure (line 157), and the workflow continues creating the dataset without the backup CSV. While this provides graceful degradation, it may lead to unnoticed data loss. Consider whether this should be a harder failure or at least emit a higher-severity log.Consider upgrading log level for visibility
except Exception as e: - logger.warning( + logger.error( f"[_upload_samples_to_object_store] Failed to upload | {e}", exc_info=True, ) return Nonebackend/app/api/routes/stt_evaluations/result.py (2)
43-62: Redundant existence check before update.The
get_stt_result_by_idcheck at lines 44-52 is redundant becauseupdate_human_feedback(from the CRUD layer atbackend/app/crud/stt_evaluations/result.pylines 268-276) already performs the same check and raisesHTTPException(status_code=404)if the result is not found.Remove redundant check
def update_result_feedback( _session: SessionDep, auth_context: AuthContextDep, result_id: int, feedback: STTFeedbackUpdate = Body(...), ) -> APIResponse[STTResultPublic]: """Update human feedback on an STT result.""" logger.info( f"[update_result_feedback] Updating feedback | " f"result_id: {result_id}, is_correct: {feedback.is_correct}" ) - # Verify result exists and belongs to this project - existing = get_stt_result_by_id( - session=_session, - result_id=result_id, - org_id=auth_context.organization_.id, - project_id=auth_context.project_.id, - ) - - if not existing: - raise HTTPException(status_code=404, detail="Result not found") - # Update feedback result = update_human_feedback(
64-81: Consider extracting result-to-public conversion to reduce duplication.The manual construction of
STTResultPublicis duplicated in both endpoints. Consider extracting a helper function or using Pydantic'smodel_validateif the model supports it.Extract helper function
def _to_public(result: STTResult) -> STTResultPublic: return STTResultPublic( id=result.id, transcription=result.transcription, provider=result.provider, status=result.status, score=result.score, is_correct=result.is_correct, comment=result.comment, error_message=result.error_message, stt_sample_id=result.stt_sample_id, evaluation_run_id=result.evaluation_run_id, organization_id=result.organization_id, project_id=result.project_id, inserted_at=result.inserted_at, updated_at=result.updated_at, )Also applies to: 107-124
backend/app/crud/stt_evaluations/result.py (1)
328-350: Minor: simplify count mapping withdict(rows).
Ruff flagged the comprehension as unnecessary.♻️ Suggested refactor
- return {status: count for status, count in rows} + return dict(rows)
| # Create run record | ||
| run = create_stt_run( | ||
| session=_session, | ||
| run_name=run_create.run_name, | ||
| dataset_id=run_create.dataset_id, | ||
| dataset_name=dataset.name, | ||
| org_id=auth_context.organization_.id, | ||
| project_id=auth_context.project_.id, | ||
| providers=run_create.providers, | ||
| language=run_create.language or dataset.language, | ||
| total_items=sample_count * len(run_create.providers), | ||
| ) | ||
|
|
||
| # Get samples for the dataset | ||
| samples, _ = get_samples_by_dataset_id( | ||
| session=_session, | ||
| dataset_id=run_create.dataset_id, | ||
| org_id=auth_context.organization_.id, | ||
| project_id=auth_context.project_.id, | ||
| limit=10000, # Get all samples | ||
| ) | ||
|
|
||
| # Create result records for each sample and provider | ||
| results = create_stt_results( | ||
| session=_session, | ||
| samples=samples, | ||
| evaluation_run_id=run.id, | ||
| org_id=auth_context.organization_.id, | ||
| project_id=auth_context.project_.id, | ||
| providers=run_create.providers, | ||
| ) | ||
|
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Avoid truncating datasets >10k samples; total_items may drift.
get_samples_by_dataset_id(..., limit=10000) silently drops samples when datasets exceed 10k, but total_items is computed from sample_count. That yields partial result creation and inaccurate totals.
✅ Suggested fix (paginate + recompute totals)
- samples, _ = get_samples_by_dataset_id(
- session=_session,
- dataset_id=run_create.dataset_id,
- org_id=auth_context.organization_.id,
- project_id=auth_context.project_.id,
- limit=10000, # Get all samples
- )
+ samples: list[STTSample] = []
+ offset = 0
+ page_size = 1000
+ while True:
+ page, total = get_samples_by_dataset_id(
+ session=_session,
+ dataset_id=run_create.dataset_id,
+ org_id=auth_context.organization_.id,
+ project_id=auth_context.project_.id,
+ limit=page_size,
+ offset=offset,
+ )
+ samples.extend(page)
+ if len(samples) >= total:
+ break
+ offset += len(page)
+
+ # Keep totals consistent with what we actually process
+ run = update_stt_run(
+ session=_session,
+ run_id=run.id,
+ status=run.status,
+ score=run.score,
+ error_message=run.error_message,
+ object_store_url=run.object_store_url,
+ batch_job_id=run.batch_job_id,
+ )🤖 Prompt for AI Agents
In `@backend/app/api/routes/stt_evaluations/evaluation.py` around lines 67 - 98,
The code currently fetches samples with get_samples_by_dataset_id(...,
limit=10000) which truncates datasets >10k and makes total_items (used in
create_stt_run) drift; replace the single call with a paginated fetch using
get_samples_by_dataset_id to iterate pages until no more samples, accumulate all
samples (update the samples variable) and recompute sample_count from the final
samples length before calling create_stt_run (so total_items = len(samples) *
len(run_create.providers)), then call create_stt_results with the full samples
list; reference get_samples_by_dataset_id, create_stt_run, create_stt_results,
run_create, samples, sample_count and total_items when implementing the fix.
| try: | ||
| batch_job = self._client.batches.get(name=batch_id) | ||
| state = batch_job.state.name if batch_job.state else "UNKNOWN" | ||
|
|
||
| result: dict[str, Any] = { | ||
| "provider_status": state, | ||
| # Gemini uses the same batch name to fetch results | ||
| "provider_output_file_id": batch_id, | ||
| } | ||
|
|
||
| if state in _FAILED_STATES: | ||
| result["error_message"] = f"Batch {state}" | ||
|
|
||
| logger.info( | ||
| f"[get_batch_status] Gemini batch status | batch_id={batch_id} | " | ||
| f"status={state}" | ||
| ) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🧩 Analysis chain
🏁 Script executed:
cat -n backend/app/core/batch/gemini.py | head -220 | tail -80Repository: ProjectTech4DevAI/kaapi-backend
Length of output: 3671
🏁 Script executed:
head -100 backend/app/core/batch/gemini.pyRepository: ProjectTech4DevAI/kaapi-backend
Length of output: 3094
🏁 Script executed:
rg "batch_job.state" backend/app/core/batch/gemini.py -A 2 -B 2Repository: ProjectTech4DevAI/kaapi-backend
Length of output: 608
🏁 Script executed:
grep -n "def " backend/app/core/batch/gemini.py | head -20Repository: ProjectTech4DevAI/kaapi-backend
Length of output: 586
🏁 Script executed:
sed -n '134,176p' backend/app/core/batch/gemini.pyRepository: ProjectTech4DevAI/kaapi-backend
Length of output: 1479
🏁 Script executed:
sed -n '1,50p' backend/app/core/batch/gemini.pyRepository: ProjectTech4DevAI/kaapi-backend
Length of output: 1154
🏁 Script executed:
sed -n '68,130p' backend/app/core/batch/gemini.pyRepository: ProjectTech4DevAI/kaapi-backend
Length of output: 2470
🏁 Script executed:
sed -n '200,210p' backend/app/core/batch/gemini.pyRepository: ProjectTech4DevAI/kaapi-backend
Length of output: 520
🏁 Script executed:
find backend -type f -name "*test*gemini*" -o -name "*gemini*test*" | head -10Repository: ProjectTech4DevAI/kaapi-backend
Length of output: 118
🏁 Script executed:
cat backend/app/tests/core/batch/test_gemini.pyRepository: ProjectTech4DevAI/kaapi-backend
Length of output: 21666
🏁 Script executed:
rg "batch_job\.state" backend/app/core/batch/gemini.py -nRepository: ProjectTech4DevAI/kaapi-backend
Length of output: 319
🏁 Script executed:
sed -n '100,120p' backend/app/core/batch/gemini.pyRepository: ProjectTech4DevAI/kaapi-backend
Length of output: 872
🏁 Script executed:
python3 << 'EOF'
# Check if there are any other state-related comparisons or usages
import re
with open('backend/app/core/batch/gemini.py', 'r') as f:
content = f.read()
lines = content.split('\n')
# Find all references to 'state' variable in context
for i, line in enumerate(lines, 1):
if 'state' in line.lower() and i > 130:
if '_FAILED_STATES' in line or 'BatchJobState' in line or 'state in' in line or 'state !=' in line or 'state =' in line:
print(f"Line {i}: {line}")
EOFRepository: ProjectTech4DevAI/kaapi-backend
Length of output: 359
🏁 Script executed:
sed -n '150,210p' backend/app/core/batch/gemini.py | cat -nRepository: ProjectTech4DevAI/kaapi-backend
Length of output: 2717
🏁 Script executed:
sed -n '134,180p' backend/app/core/batch/gemini.py | grep -E "def |logger\.|->|:"Repository: ProjectTech4DevAI/kaapi-backend
Length of output: 866
🏁 Script executed:
grep -A5 "mock_batch_job.state = " backend/app/tests/core/batch/test_gemini.py | head -20Repository: ProjectTech4DevAI/kaapi-backend
Length of output: 935
Add _normalize_state() helper to safely handle state objects from Gemini API responses.
The code currently assumes batch_job.state.name always returns a proper string, but Gemini API responses may return states as strings, enums, or mock objects in test scenarios. This causes CI failures in comparisons at if state in _FAILED_STATES (line 160) and if state != BatchJobState.SUCCEEDED.value (line 204). Implement the suggested normalization helper to defensively handle all input types.
✅ Suggested fix
+ `@staticmethod`
+ def _normalize_state(state: Any) -> str:
+ if isinstance(state, str):
+ return state
+ if isinstance(state, Enum):
+ return state.name
+ name = getattr(state, "name", None)
+ if isinstance(name, str):
+ return name
+ value = getattr(state, "value", None)
+ if isinstance(value, str):
+ return value
+ return "UNKNOWN"
@@
- state = batch_job.state.name if batch_job.state else "UNKNOWN"
+ state = self._normalize_state(batch_job.state) if batch_job.state else "UNKNOWN"
@@
- state = batch_job.state.name if batch_job.state else "UNKNOWN"
+ state = self._normalize_state(batch_job.state) if batch_job.state else "UNKNOWN"🤖 Prompt for AI Agents
In `@backend/app/core/batch/gemini.py` around lines 150 - 166, Add a small helper
function named _normalize_state(state_obj) that safely converts Gemini state
values to a normalized string (handle None, strings, Enum members, objects with
.name/.value, and test mocks) and use it in get_batch_status (where
batch_job.state is read) and any other comparisons (e.g., checks against
_FAILED_STATES and BatchJobState.SUCCEEDED.value); specifically, replace direct
uses of batch_job.state.name with state = _normalize_state(batch_job.state) and
ensure the helper returns the same casing/format as entries in _FAILED_STATES
and BatchJobState.SUCCEEDED.value so the membership and equality checks work
reliably in production and tests.
|
|
||
| # Upload to object store | ||
| destination = storage.put(source=csv_file, file_path=file_path) | ||
| destination = storage.put(source=upload_file, file_path=file_path) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🧩 Analysis chain
🏁 Script executed:
# Find CloudStorage class and its put() method signature
cd /
ast-grep --pattern $'def put(self, source: $_, $_: $_) -> $_:
$$$
'Repository: ProjectTech4DevAI/kaapi-backend
Length of output: 3281
🏁 Script executed:
# Check line 85 in storage_utils.py with context
sed -n '80,90p' backend/app/core/storage_utils.pyRepository: ProjectTech4DevAI/kaapi-backend
Length of output: 445
🏁 Script executed:
# Check the function containing line 85
sed -n '65,95p' backend/app/core/storage_utils.pyRepository: ProjectTech4DevAI/kaapi-backend
Length of output: 1082
🏁 Script executed:
# Find the function signature for the function containing line 85
sed -n '50,75p' backend/app/core/storage_utils.pyRepository: ProjectTech4DevAI/kaapi-backend
Length of output: 1040
🏁 Script executed:
# Get the function definition line
sed -n '40,55p' backend/app/core/storage_utils.pyRepository: ProjectTech4DevAI/kaapi-backend
Length of output: 476
Fix parameter name mismatch in storage.put() call.
Line 85 calls storage.put(source=upload_file, file_path=file_path) using a keyword argument, but the abstract CloudStorage.put() method signature defines the parameter as filepath (without underscore). Since the parameter is typed as CloudStorage (the abstract base class), using the mismatched parameter name will cause a TypeError at runtime. Either rename the parameter in all implementations to match the abstract method, or change the call to use the abstract method's parameter name.
🤖 Prompt for AI Agents
In `@backend/app/core/storage_utils.py` at line 85, The call to storage.put uses
the keyword file_path which doesn't match the abstract CloudStorage.put
signature (expects filepath); update the invocation in
storage.put(source=upload_file, file_path=file_path) to use the correct
parameter name (storage.put(source=upload_file, filepath=file_path)) so it
matches CloudStorage.put; alternatively, ensure all implementations of
CloudStorage.put use the same parameter name filepath, but the quickest fix is
changing the call to use filepath to avoid the TypeError.
| def __init__(self, client: genai.Client): | ||
| """Initialize files manager. | ||
|
|
||
| Args: | ||
| client: Gemini client instance | ||
| """ | ||
| self._client = client | ||
|
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🧩 Analysis chain
🏁 Script executed:
# Check if the file exists
find . -path "*/backend/app/services/stt_evaluations/gemini/files.py" -type fRepository: ProjectTech4DevAI/kaapi-backend
Length of output: 129
🏁 Script executed:
# If file exists, read lines 54-61 to verify the snippet
if [ -f "backend/app/services/stt_evaluations/gemini/files.py" ]; then
sed -n '54,61p' backend/app/services/stt_evaluations/gemini/files.py
fiRepository: ProjectTech4DevAI/kaapi-backend
Length of output: 258
🏁 Script executed:
# Check how other __init__ methods in backend/app/services are typed
rg "def __init__" backend/app/services --type py -A 2 | head -50Repository: ProjectTech4DevAI/kaapi-backend
Length of output: 1768
Add explicit None return type for __init__.
The __init__ method is missing the return type annotation. Per the coding guidelines requiring type hints on all function parameters and return values, add -> None to keep the constructor signature fully typed.
- def __init__(self, client: genai.Client):
+ def __init__(self, client: genai.Client) -> None:📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| def __init__(self, client: genai.Client): | |
| """Initialize files manager. | |
| Args: | |
| client: Gemini client instance | |
| """ | |
| self._client = client | |
| def __init__(self, client: genai.Client) -> None: | |
| """Initialize files manager. | |
| Args: | |
| client: Gemini client instance | |
| """ | |
| self._client = client | |
🤖 Prompt for AI Agents
In `@backend/app/services/stt_evaluations/gemini/files.py` around lines 54 - 61,
The __init__ constructor for the files manager is missing an explicit return
type; update the signature of def __init__(self, client: genai.Client) in the
files manager class to include the return type annotation -> None so the
constructor is fully typed per project guidelines.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 1
🤖 Fix all issues with AI agents
In `@backend/app/api/routes/stt_evaluations/evaluation.py`:
- Around line 110-113: The log is referencing a non-existent key 'batch_jobs' in
batch_result so it always prints an empty set; update the logger.info in
start_stt_evaluation to reference the real key (e.g., use
batch_result.get('jobs', {}).keys() if the batch job entries are under 'jobs')
or, if you want to show all returned fields, use list(batch_result.keys())
instead of batch_result.get('batch_jobs', {}).keys() so the log prints the
actual batch info alongside run.id.
🧹 Nitpick comments (1)
backend/app/crud/evaluations/cron.py (1)
127-130: Consider clarifying the failure count semantics in org-level errors.When an org-level exception is caught (e.g., if
poll_all_pending_stt_evaluationsthrows),total_failedis incremented by 1. However, this single increment may not accurately represent the actual number of text + STT runs that were in progress for that organization. This was arguably the same issue before STT integration, but now it's more pronounced since two subsystems are being polled.This is a minor semantic inconsistency in the reporting—not a functional bug—so it can be addressed later if more precise failure counting is needed.
| logger.info( | ||
| f"[start_stt_evaluation] STT evaluation batch submitted | " | ||
| f"run_id: {run.id}, batch_jobs: {batch_result.get('batch_jobs', {}).keys()}" | ||
| ) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fix batch log field name.
batch_result doesn’t contain batch_jobs, so the log always prints an empty set.
🛠️ Suggested fix
- logger.info(
- f"[start_stt_evaluation] STT evaluation batch submitted | "
- f"run_id: {run.id}, batch_jobs: {batch_result.get('batch_jobs', {}).keys()}"
- )
+ logger.info(
+ f"[start_stt_evaluation] STT evaluation batch submitted | "
+ f"run_id: {run.id}, batch_job_id: {batch_result.get('batch_job_id')}, "
+ f"provider_batch_id: {batch_result.get('provider_batch_id')}"
+ )📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| logger.info( | |
| f"[start_stt_evaluation] STT evaluation batch submitted | " | |
| f"run_id: {run.id}, batch_jobs: {batch_result.get('batch_jobs', {}).keys()}" | |
| ) | |
| logger.info( | |
| f"[start_stt_evaluation] STT evaluation batch submitted | " | |
| f"run_id: {run.id}, batch_job_id: {batch_result.get('batch_job_id')}, " | |
| f"provider_batch_id: {batch_result.get('provider_batch_id')}" | |
| ) |
🤖 Prompt for AI Agents
In `@backend/app/api/routes/stt_evaluations/evaluation.py` around lines 110 - 113,
The log is referencing a non-existent key 'batch_jobs' in batch_result so it
always prints an empty set; update the logger.info in start_stt_evaluation to
reference the real key (e.g., use batch_result.get('jobs', {}).keys() if the
batch job entries are under 'jobs') or, if you want to show all returned fields,
use list(batch_result.keys()) instead of batch_result.get('batch_jobs',
{}).keys() so the log prints the actual batch info alongside run.id.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 1
🤖 Fix all issues with AI agents
In `@backend/app/core/batch/gemini.py`:
- Around line 285-325: The upload_file method is setting mime_type="jsonl" in
the types.UploadFileConfig passed to self._client.files.upload; change that
mime_type to "text/plain" to ensure Gemini Files API accepts the JSONL file.
Locate the upload_file function and the call to self._client.files.upload (and
the types.UploadFileConfig instantiation) and replace mime_type="jsonl" with
mime_type="text/plain", keeping the rest (display_name, file path/tmp_path
handling, and cleanup) unchanged.
🧹 Nitpick comments (2)
backend/app/models/stt_evaluation.py (1)
15-16: Consider making SUPPORTED_STT_PROVIDERS a constant tuple or frozenset.Using a list for a constant that shouldn't be modified could lead to accidental mutation. A tuple or frozenset would be safer.
Suggested change
# Supported STT providers for evaluation -SUPPORTED_STT_PROVIDERS = ["gemini-2.5-pro"] +SUPPORTED_STT_PROVIDERS = frozenset({"gemini-2.5-pro"})Note: If you change to
frozenset, update the error message in the validator (line 328) to usesorted(SUPPORTED_STT_PROVIDERS)for consistent ordering.backend/app/services/stt_evaluations/gemini/client.py (1)
95-113: Consider caching or limiting the model list call invalidate_connection.The
list(self._client.models.list())call fetches all available models, which could be slow or resource-intensive. For connection validation, checking just one model or using a lighter API call might be more efficient.Alternative approach
def validate_connection(self) -> bool: """Validate that the client can connect to Gemini. Returns: bool: True if connection is valid """ try: - # List models to verify connection - models = list(self._client.models.list()) + # Fetch first model to verify connection (lighter than full list) + models = self._client.models.list() + first_model = next(iter(models), None) logger.info( f"[validate_connection] Connection validated | " - f"available_models_count: {len(models)}" + f"connection_verified: {first_model is not None}" ) return True
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 1
🤖 Fix all issues with AI agents
In `@backend/app/tests/services/stt_evaluations/test_audio.py`:
- Around line 22-378: Add missing type annotations to all test methods (e.g.,
change def test_valid_mp3_file(self) to def test_valid_mp3_file(self: Self) ->
None) across the file so every test method has parameter and return type hints;
then remove the duplicated helpers TestValidateAudioFile._create_upload_file and
TestUploadAudioFile._create_upload_file and replace them with a single shared
factory fixture in the module (e.g., a pytest fixture named upload_file_factory)
that returns a callable to produce MagicMock UploadFile instances used by
validate_audio_file and upload_audio_file tests; update tests to call the
fixture instead of the class-level helper.
🧹 Nitpick comments (3)
backend/app/tests/services/stt_evaluations/test_audio.py (1)
139-150: Consolidate UploadFile creation via a factory fixture.
_create_upload_fileis duplicated (Line 139 and Line 286). Please replace these helpers with a shared pytest factory fixture to align with the test fixture guideline and reduce duplication.♻️ Suggested fixture-based factory (apply in this file)
+from collections.abc import Callable @@ +@pytest.fixture() +def upload_file_factory() -> Callable[..., UploadFile]: + def _factory( + filename: str | None = "test.mp3", + content_type: str | None = "audio/mpeg", + size: int | None = 1024, + ) -> UploadFile: + mock_file = MagicMock(spec=UploadFile) + mock_file.filename = filename + mock_file.content_type = content_type + mock_file.size = size + return mock_file + return _factory @@ - def test_valid_mp3_file(self): + def test_valid_mp3_file(self, upload_file_factory: Callable[..., UploadFile]) -> None: """Test validation of valid MP3 file.""" - file = self._create_upload_file(filename="test.mp3") + file = upload_file_factory(filename="test.mp3")As per coding guidelines, Use factory pattern for test fixtures in
backend/app/tests/.backend/app/tests/core/test_storage_utils.py (2)
88-107: Use factory-style fixtures for storage mocks.Returning a factory keeps instance creation explicit per test and aligns with the test-fixture convention in this repo.
♻️ Proposed refactor (apply similarly to other mock_storage fixtures)
`@pytest.fixture` def mock_storage(self): - storage = MagicMock() - storage.put.return_value = "s3://bucket/test/file.txt" - return storage + def _factory(): + storage = MagicMock() + storage.put.return_value = "s3://bucket/test/file.txt" + return storage + return _factory @@ - def test_successful_upload(self, mock_storage): + def test_successful_upload(self, mock_storage): content = b"test content" - result = upload_to_object_store( - storage=mock_storage, + storage = mock_storage() + result = upload_to_object_store( + storage=storage, content=content, filename="test.txt", subdirectory="uploads", content_type="text/plain", )As per coding guidelines, "Use factory pattern for test fixtures in backend/app/tests/".
22-25: Add type hints to test and fixture signatures.Repo guidelines require parameter and return annotations on all functions; please apply this across all test methods and fixtures in this file.
🧩 Example pattern (apply broadly)
+from typing import Self @@ - def test_mp3_url(self): + def test_mp3_url(self: Self) -> None: """Test MIME detection for MP3 files.""" url = "https://bucket.s3.amazonaws.com/audio/test.mp3" assert get_mime_from_url(url) == "audio/mpeg"#!/bin/bash python - <<'PY' import ast, pathlib path = pathlib.Path("backend/app/tests/core/test_storage_utils.py") tree = ast.parse(path.read_text()) def check(fn): missing = [a.arg for a in fn.args.args if a.annotation is None] if fn.returns is None: missing.append("return") if missing: print(f"{fn.name} @ line {fn.lineno} missing: {missing}") for node in ast.walk(tree): if isinstance(node, (ast.FunctionDef, ast.AsyncFunctionDef)): check(node) PYAs per coding guidelines, "Always add type hints to all function parameters and return values in Python code".
| def test_mp3_extension(self): | ||
| """Test extracting MP3 extension.""" | ||
| assert get_extension_from_filename("audio.mp3") == "mp3" | ||
|
|
||
| def test_wav_extension(self): | ||
| """Test extracting WAV extension.""" | ||
| assert get_extension_from_filename("audio.wav") == "wav" | ||
|
|
||
| def test_flac_extension(self): | ||
| """Test extracting FLAC extension.""" | ||
| assert get_extension_from_filename("audio.flac") == "flac" | ||
|
|
||
| def test_m4a_extension(self): | ||
| """Test extracting M4A extension.""" | ||
| assert get_extension_from_filename("audio.m4a") == "m4a" | ||
|
|
||
| def test_ogg_extension(self): | ||
| """Test extracting OGG extension.""" | ||
| assert get_extension_from_filename("audio.ogg") == "ogg" | ||
|
|
||
| def test_webm_extension(self): | ||
| """Test extracting WEBM extension.""" | ||
| assert get_extension_from_filename("audio.webm") == "webm" | ||
|
|
||
| def test_uppercase_extension(self): | ||
| """Test that uppercase extensions are normalized to lowercase.""" | ||
| assert get_extension_from_filename("audio.MP3") == "mp3" | ||
| assert get_extension_from_filename("audio.WAV") == "wav" | ||
|
|
||
| def test_mixed_case_extension(self): | ||
| """Test mixed case extensions.""" | ||
| assert get_extension_from_filename("audio.Mp3") == "mp3" | ||
|
|
||
| def test_empty_filename(self): | ||
| """Test empty filename returns None.""" | ||
| assert get_extension_from_filename("") is None | ||
|
|
||
| def test_none_filename(self): | ||
| """Test None filename returns None.""" | ||
| assert get_extension_from_filename(None) is None | ||
|
|
||
| def test_no_extension(self): | ||
| """Test filename without extension returns None.""" | ||
| assert get_extension_from_filename("audiofile") is None | ||
|
|
||
| def test_multiple_dots(self): | ||
| """Test filename with multiple dots.""" | ||
| assert get_extension_from_filename("audio.backup.mp3") == "mp3" | ||
|
|
||
| def test_hidden_file_with_extension(self): | ||
| """Test hidden file with extension.""" | ||
| assert get_extension_from_filename(".audio.mp3") == "mp3" | ||
|
|
||
| def test_path_with_filename(self): | ||
| """Test full path with filename.""" | ||
| assert get_extension_from_filename("/path/to/audio.mp3") == "mp3" | ||
|
|
||
|
|
||
| class TestGetExtensionFromContentType: | ||
| """Test cases for get_extension_from_content_type function.""" | ||
|
|
||
| def test_audio_mpeg(self): | ||
| """Test audio/mpeg content type.""" | ||
| assert get_extension_from_content_type("audio/mpeg") == "mp3" | ||
|
|
||
| def test_audio_mp3(self): | ||
| """Test audio/mp3 content type.""" | ||
| assert get_extension_from_content_type("audio/mp3") == "mp3" | ||
|
|
||
| def test_audio_wav(self): | ||
| """Test audio/wav content type.""" | ||
| assert get_extension_from_content_type("audio/wav") == "wav" | ||
|
|
||
| def test_audio_x_wav(self): | ||
| """Test audio/x-wav content type.""" | ||
| assert get_extension_from_content_type("audio/x-wav") == "wav" | ||
|
|
||
| def test_audio_wave(self): | ||
| """Test audio/wave content type.""" | ||
| assert get_extension_from_content_type("audio/wave") == "wav" | ||
|
|
||
| def test_audio_flac(self): | ||
| """Test audio/flac content type.""" | ||
| assert get_extension_from_content_type("audio/flac") == "flac" | ||
|
|
||
| def test_audio_mp4(self): | ||
| """Test audio/mp4 content type (m4a).""" | ||
| assert get_extension_from_content_type("audio/mp4") == "m4a" | ||
|
|
||
| def test_audio_ogg(self): | ||
| """Test audio/ogg content type.""" | ||
| assert get_extension_from_content_type("audio/ogg") == "ogg" | ||
|
|
||
| def test_audio_webm(self): | ||
| """Test audio/webm content type.""" | ||
| assert get_extension_from_content_type("audio/webm") == "webm" | ||
|
|
||
| def test_uppercase_content_type(self): | ||
| """Test uppercase content type is normalized.""" | ||
| assert get_extension_from_content_type("AUDIO/MPEG") == "mp3" | ||
|
|
||
| def test_empty_content_type(self): | ||
| """Test empty content type returns None.""" | ||
| assert get_extension_from_content_type("") is None | ||
|
|
||
| def test_none_content_type(self): | ||
| """Test None content type returns None.""" | ||
| assert get_extension_from_content_type(None) is None | ||
|
|
||
| def test_unknown_content_type(self): | ||
| """Test unknown content type returns None.""" | ||
| assert get_extension_from_content_type("application/octet-stream") is None | ||
|
|
||
|
|
||
| class TestValidateAudioFile: | ||
| """Test cases for validate_audio_file function.""" | ||
|
|
||
| def _create_upload_file( | ||
| self, | ||
| filename: str | None = "test.mp3", | ||
| content_type: str | None = "audio/mpeg", | ||
| size: int | None = 1024, | ||
| ) -> UploadFile: | ||
| """Create a mock UploadFile for testing.""" | ||
| mock_file = MagicMock(spec=UploadFile) | ||
| mock_file.filename = filename | ||
| mock_file.content_type = content_type | ||
| mock_file.size = size | ||
| return mock_file | ||
|
|
||
| def test_valid_mp3_file(self): | ||
| """Test validation of valid MP3 file.""" | ||
| file = self._create_upload_file(filename="test.mp3") | ||
| result = validate_audio_file(file) | ||
| assert result == "mp3" | ||
|
|
||
| def test_valid_wav_file(self): | ||
| """Test validation of valid WAV file.""" | ||
| file = self._create_upload_file(filename="test.wav") | ||
| result = validate_audio_file(file) | ||
| assert result == "wav" | ||
|
|
||
| def test_valid_flac_file(self): | ||
| """Test validation of valid FLAC file.""" | ||
| file = self._create_upload_file(filename="test.flac") | ||
| result = validate_audio_file(file) | ||
| assert result == "flac" | ||
|
|
||
| def test_valid_m4a_file(self): | ||
| """Test validation of valid M4A file.""" | ||
| file = self._create_upload_file(filename="test.m4a") | ||
| result = validate_audio_file(file) | ||
| assert result == "m4a" | ||
|
|
||
| def test_valid_ogg_file(self): | ||
| """Test validation of valid OGG file.""" | ||
| file = self._create_upload_file(filename="test.ogg") | ||
| result = validate_audio_file(file) | ||
| assert result == "ogg" | ||
|
|
||
| def test_valid_webm_file(self): | ||
| """Test validation of valid WEBM file.""" | ||
| file = self._create_upload_file(filename="test.webm") | ||
| result = validate_audio_file(file) | ||
| assert result == "webm" | ||
|
|
||
| def test_missing_filename(self): | ||
| """Test validation fails when filename is missing.""" | ||
| file = self._create_upload_file(filename=None) | ||
| with pytest.raises(AudioValidationError) as exc_info: | ||
| validate_audio_file(file) | ||
| assert "Filename is required" in str(exc_info.value) | ||
|
|
||
| def test_empty_filename(self): | ||
| """Test validation fails when filename is empty.""" | ||
| file = self._create_upload_file(filename="") | ||
| with pytest.raises(AudioValidationError) as exc_info: | ||
| validate_audio_file(file) | ||
| assert "Filename is required" in str(exc_info.value) | ||
|
|
||
| def test_unsupported_format(self): | ||
| """Test validation fails for unsupported format.""" | ||
| file = self._create_upload_file(filename="test.txt") | ||
| with pytest.raises(AudioValidationError) as exc_info: | ||
| validate_audio_file(file) | ||
| assert "Unsupported audio format" in str(exc_info.value) | ||
|
|
||
| def test_extension_from_content_type_fallback(self): | ||
| """Test fallback to content type when filename has no extension.""" | ||
| file = self._create_upload_file(filename="audiofile", content_type="audio/mpeg") | ||
| result = validate_audio_file(file) | ||
| assert result == "mp3" | ||
|
|
||
| def test_file_too_large(self): | ||
| """Test validation fails when file is too large.""" | ||
| file = self._create_upload_file( | ||
| filename="test.mp3", | ||
| size=MAX_FILE_SIZE_BYTES + 1, | ||
| ) | ||
| with pytest.raises(AudioValidationError) as exc_info: | ||
| validate_audio_file(file) | ||
| assert "File too large" in str(exc_info.value) | ||
|
|
||
| def test_file_at_max_size(self): | ||
| """Test validation passes when file is exactly at max size.""" | ||
| file = self._create_upload_file( | ||
| filename="test.mp3", | ||
| size=MAX_FILE_SIZE_BYTES, | ||
| ) | ||
| result = validate_audio_file(file) | ||
| assert result == "mp3" | ||
|
|
||
| def test_file_with_no_size(self): | ||
| """Test validation passes when file size is not available.""" | ||
| file = self._create_upload_file(filename="test.mp3", size=None) | ||
| result = validate_audio_file(file) | ||
| assert result == "mp3" | ||
|
|
||
|
|
||
| class TestValidateS3AudioUrl: | ||
| """Test cases for validate_s3_audio_url function.""" | ||
|
|
||
| def test_valid_s3_mp3_url(self): | ||
| """Test valid S3 MP3 URL.""" | ||
| url = "s3://bucket/audio/test.mp3" | ||
| assert validate_s3_audio_url(url) is True | ||
|
|
||
| def test_valid_s3_wav_url(self): | ||
| """Test valid S3 WAV URL.""" | ||
| url = "s3://bucket/audio/test.wav" | ||
| assert validate_s3_audio_url(url) is True | ||
|
|
||
| def test_valid_s3_flac_url(self): | ||
| """Test valid S3 FLAC URL.""" | ||
| url = "s3://bucket/audio/test.flac" | ||
| assert validate_s3_audio_url(url) is True | ||
|
|
||
| def test_empty_url(self): | ||
| """Test empty URL returns False.""" | ||
| assert validate_s3_audio_url("") is False | ||
|
|
||
| def test_none_url(self): | ||
| """Test None URL returns False.""" | ||
| assert validate_s3_audio_url(None) is False | ||
|
|
||
| def test_https_url(self): | ||
| """Test HTTPS URL returns False.""" | ||
| url = "https://bucket.s3.amazonaws.com/audio/test.mp3" | ||
| assert validate_s3_audio_url(url) is False | ||
|
|
||
| def test_s3_url_with_invalid_extension(self): | ||
| """Test S3 URL with invalid extension returns False.""" | ||
| url = "s3://bucket/file.txt" | ||
| assert validate_s3_audio_url(url) is False | ||
|
|
||
| def test_s3_url_with_no_extension(self): | ||
| """Test S3 URL with no extension returns False.""" | ||
| url = "s3://bucket/audiofile" | ||
| assert validate_s3_audio_url(url) is False | ||
|
|
||
|
|
||
| class TestUploadAudioFile: | ||
| """Test cases for upload_audio_file function.""" | ||
|
|
||
| def _create_upload_file( | ||
| self, | ||
| filename: str = "test.mp3", | ||
| content_type: str = "audio/mpeg", | ||
| size: int = 1024, | ||
| ) -> UploadFile: | ||
| """Create a mock UploadFile for testing.""" | ||
| mock_file = MagicMock(spec=UploadFile) | ||
| mock_file.filename = filename | ||
| mock_file.content_type = content_type | ||
| mock_file.size = size | ||
| return mock_file | ||
|
|
||
| @patch("app.services.stt_evaluations.audio.get_cloud_storage") | ||
| def test_successful_upload(self, mock_get_storage): | ||
| """Test successful audio file upload.""" | ||
| mock_storage = MagicMock() | ||
| mock_storage.put.return_value = "s3://bucket/stt/audio/test.mp3" | ||
| mock_storage.get_file_size_kb.return_value = 1.0 | ||
| mock_get_storage.return_value = mock_storage | ||
|
|
||
| mock_session = MagicMock() | ||
| file = self._create_upload_file() | ||
|
|
||
| result = upload_audio_file( | ||
| session=mock_session, | ||
| file=file, | ||
| project_id=1, | ||
| ) | ||
|
|
||
| assert result.s3_url == "s3://bucket/stt/audio/test.mp3" | ||
| assert result.filename == "test.mp3" | ||
| assert result.size_bytes == 1024 | ||
| assert result.content_type == "audio/mpeg" | ||
|
|
||
| @patch("app.services.stt_evaluations.audio.get_cloud_storage") | ||
| def test_upload_validation_error(self, mock_get_storage): | ||
| """Test upload fails on validation error.""" | ||
| from app.core.exception_handlers import HTTPException | ||
|
|
||
| mock_session = MagicMock() | ||
| file = self._create_upload_file(filename="test.txt") | ||
|
|
||
| with pytest.raises(HTTPException) as exc_info: | ||
| upload_audio_file( | ||
| session=mock_session, | ||
| file=file, | ||
| project_id=1, | ||
| ) | ||
|
|
||
| assert exc_info.value.status_code == 400 | ||
| assert "Unsupported audio format" in str(exc_info.value.detail) | ||
|
|
||
| @patch("app.services.stt_evaluations.audio.get_cloud_storage") | ||
| def test_upload_storage_error(self, mock_get_storage): | ||
| """Test upload handles storage errors.""" | ||
| from app.core.exception_handlers import HTTPException | ||
|
|
||
| mock_storage = MagicMock() | ||
| mock_storage.put.side_effect = Exception("S3 connection failed") | ||
| mock_get_storage.return_value = mock_storage | ||
|
|
||
| mock_session = MagicMock() | ||
| file = self._create_upload_file() | ||
|
|
||
| with pytest.raises(HTTPException) as exc_info: | ||
| upload_audio_file( | ||
| session=mock_session, | ||
| file=file, | ||
| project_id=1, | ||
| ) | ||
|
|
||
| assert exc_info.value.status_code == 500 | ||
| assert "Failed to upload audio file" in str(exc_info.value.detail) | ||
|
|
||
| @patch("app.services.stt_evaluations.audio.get_cloud_storage") | ||
| def test_upload_uses_file_size_on_s3_error(self, mock_get_storage): | ||
| """Test upload uses file.size when S3 size retrieval fails.""" | ||
| mock_storage = MagicMock() | ||
| mock_storage.put.return_value = "s3://bucket/stt/audio/test.mp3" | ||
| mock_storage.get_file_size_kb.side_effect = Exception("Failed to get size") | ||
| mock_get_storage.return_value = mock_storage | ||
|
|
||
| mock_session = MagicMock() | ||
| file = self._create_upload_file(size=2048) | ||
|
|
||
| result = upload_audio_file( | ||
| session=mock_session, | ||
| file=file, | ||
| project_id=1, | ||
| ) | ||
|
|
||
| assert result.size_bytes == 2048 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🧩 Analysis chain
🏁 Script executed:
#!/bin/bash
# Check for test methods and their signatures in the file
rg -nP 'def (test_|_create)[^(]*\([^)]*\)\s*:' backend/app/tests/services/stt_evaluations/test_audio.py | head -30Repository: ProjectTech4DevAI/kaapi-backend
Length of output: 309
🏁 Script executed:
#!/bin/bash
# Get full context of a few test methods to see current annotations
sed -n '22,30p' backend/app/tests/services/stt_evaluations/test_audio.py
sed -n '139,150p' backend/app/tests/services/stt_evaluations/test_audio.py
sed -n '286,297p' backend/app/tests/services/stt_evaluations/test_audio.pyRepository: ProjectTech4DevAI/kaapi-backend
Length of output: 309
🏁 Script executed:
#!/bin/bash
# Count occurrences of _create_upload_file method
rg -c '_create_upload_file' backend/app/tests/services/stt_evaluations/test_audio.py
rg -B2 'def _create_upload_file' backend/app/tests/services/stt_evaluations/test_audio.pyRepository: ProjectTech4DevAI/kaapi-backend
Length of output: 309
Add type hints to all test methods and consolidate fixture creation using a factory pattern.
Test methods throughout this file lack type hints. Add -> None return annotations and parameter annotations (e.g., self: Self) to all test methods per the typing requirement.
Additionally, the _create_upload_file helper is duplicated across TestValidateAudioFile and TestUploadAudioFile classes. Extract this into a shared factory fixture in the test module to follow the factory pattern guideline for test fixtures in backend/app/tests/.
🤖 Prompt for AI Agents
In `@backend/app/tests/services/stt_evaluations/test_audio.py` around lines 22 -
378, Add missing type annotations to all test methods (e.g., change def
test_valid_mp3_file(self) to def test_valid_mp3_file(self: Self) -> None) across
the file so every test method has parameter and return type hints; then remove
the duplicated helpers TestValidateAudioFile._create_upload_file and
TestUploadAudioFile._create_upload_file and replace them with a single shared
factory fixture in the module (e.g., a pytest fixture named upload_file_factory)
that returns a callable to produce MagicMock UploadFile instances used by
validate_audio_file and upload_audio_file tests; update tests to call the
fixture instead of the class-level helper.
Summary
Target issue is #533
Checklist
Before submitting a pull request, please ensure that you mark these task.
fastapi run --reload app/main.pyordocker compose upin the repository root and test.Notes
New Features
Functional Requirements Testing