Add v2 S3 original storage and hybrid search#229
Conversation
|
✅ Staging Deployment Report
🟢 Staging is live and healthy! Test your changes at the staging URL above. Ready to ship? Comment |
🔍 API Schema Diff---REPORT--- Auto-generated by API Schema Diff workflow |
✅ Staging Deployment Report
🟢 Staging is live and healthy! Test your changes at the staging URL above. Ready to ship? Comment |
There was a problem hiding this comment.
Code Review
This pull request introduces v2-only raw original document storage (S3) and original chunk indexing, alongside a new hybrid search endpoint (/hybrid-search) that queries both extracted memories and original document chunks. It also enhances security in the scanner enricher by escaping untrusted code blocks and validating symbol types and languages against allowlists. Feedback on these changes highlights three key areas: wrapping synchronous database/service search calls in asyncio.to_thread within the async FastAPI route handler to prevent blocking the event loop, utilizing a try...finally block in the batch ingest workflow to safely cancel and clean up background original storage tasks in case of extraction failures, and normalizing the language string to lowercase before validating it against the allowlist.
Important
The consumer version of Gemini Code Assist on GitHub is being sunset. Starting June 18, 2026, new organization installations will be blocked, and all code review activity will officially cease on July 17, 2026.
For more details on the timeline and next steps, please review the Help Documentation.
I am having trouble creating individual review comments. Click here to see my feedback.
src/api/routes/v2/memory.py (356-367)
The route handler hybrid_search_memory_v2 is an async def function, but it calls memory_v1._search_profile and memory_v1._search_temporal synchronously. These functions perform database/service I/O and will block the FastAPI event loop, violating the rule to wrap synchronous database or service I/O calls in asyncio.to_thread.
memory_results: list[SourceRecord] = []
if "profile" in req.domains:
profile_results = await asyncio.to_thread(
memory_v1._search_profile, pipeline, user_id
)
memory_results.extend(profile_results)
if "temporal" in req.domains:
temporal_results = await asyncio.to_thread(
memory_v1._search_temporal,
pipeline,
req.query,
user_id,
memory_top_k,
)
memory_results.extend(temporal_results)References
- In FastAPI async def route handlers, wrap synchronous database or service I/O calls in asyncio.to_thread to prevent blocking the event loop.
src/api/routes/v2/workflows.py (291-316)
Wrapping _execute in asyncio.create_task here is redundant since it is immediately awaited. More importantly, if the extraction task fails, original_task is left dangling (leaked) in the Temporal workflow. Using a try...finally block ensures that the background original_task is properly cancelled and cleaned up if any part of the item execution fails.
async def _run_item(index: int, item: Dict[str, Any]):
item_payload = dict(item)
item_payload["user_id"] = (
item_payload.get("user_id") or payload["user_id"]
)
for key in (
"original_storage_enabled",
"original_storage_fail_closed",
"original_storage_timeout_seconds",
"original_config",
):
if key in payload and key not in item_payload:
item_payload[key] = payload[key]
original_task = _start_original_task(job_id, item_payload)
try:
item_result = await _execute(
"memory_run_pipeline_activity",
{**item_payload, **billing_activity},
item_timeout,
)
item_result["original_storage"] = await _await_original_task(
original_task,
item_payload,
)
original_task = None
return index, item_result
finally:
if original_task and not original_task.done():
original_task.cancel()
try:
await original_task
except Exception:
passsrc/scanner/enricher.py (357)
The language value should be normalized to lowercase before checking against _ALLOWED_LANGUAGES to prevent false negatives for capitalized language names (e.g., 'Python').
language=_allowlist(language.lower() if language else "", _ALLOWED_LANGUAGES, "python"),
🔍 API Schema Diff---REPORT--- Auto-generated by API Schema Diff workflow |
✅ Staging Deployment Report
🟢 Staging is live and healthy! Test your changes at the staging URL above. Ready to ship? Comment |
🔍 API Schema Diff---REPORT--- Auto-generated by API Schema Diff workflow |
✅ Staging Deployment Report
🟢 Staging is live and healthy! Test your changes at the staging URL above. Ready to ship? Comment |
🔍 API Schema Diff---REPORT--- Auto-generated by API Schema Diff workflow |
|
@greptileai review this PR |
|
@greptileai review |
|
| Filename | Overview |
|---|---|
| src/api/routes/v2/workflows.py | Adds parallel original-storage task to both single-item and batch ingest workflows. Single-item workflow leaks the original_task asyncio Task when a non-high-effort activity fails — the task is never cancelled in the except handler, causing background Temporal activity dispatch for dead-lettered jobs. |
| src/api/routes/v2/memory.py | Adds hybrid-search endpoint and original-storage config injection. The hybrid_search endpoint calls get_retrieval_pipeline() before the try/except block without a require_ready dependency guard, producing unstructured 500s if the pipeline is not ready. |
| src/storage/original.py | New module implementing S3 upload and vector-store chunk indexing for raw originals. Deterministic S3 keys and vector IDs make the operation replay-safe. Well-structured with bounded concurrency controls. |
| src/api/routes/v2/activities.py | Adds memory_store_original_activity Temporal activity that delegates to preserve_original. Straightforward; correctly registers the activity in ALL_ACTIVITIES. |
| src/api/schemas.py | Adds HybridSearchRequest and HybridSearchResponse Pydantic models. Domain validation and field constraints follow the same pattern as the existing SearchRequest. |
| src/config/settings.py | Adds ~30 new settings fields for original storage and hybrid search, all with sensible defaults. Also adds aws_session_token for temporary-credential support. |
Reviews (2): Last reviewed commit: "Address Greptile review feedback" | Re-trigger Greptile
✅ Staging Deployment Report
🟢 Staging is live and healthy! Test your changes at the staging URL above. Ready to ship? Comment |
🔍 API Schema Diff---REPORT--- Auto-generated by API Schema Diff workflow |
Summary
/v2/memory/hybrid-searchreturning extracted memory hits plus indexed original chunksTesting
.venv/bin/python -m pytest tests/unit/test_original_storage.py tests/api/test_v2_hybrid_search.py.venv/bin/python -m pytest tests/api/test_memory_versioning.py tests/api/test_dependencies_and_routes.py tests/integration/test_retrieval_pipeline.py.venv/bin/python -m pytest tests/api.venv/bin/python -m ruff check src/storage/original.py src/api/routes/v2/memory.py src/api/routes/v2/workflows.py src/api/routes/v2/activities.py src/api/schemas.py src/config/settings.py tests/unit/test_original_storage.py tests/api/test_v2_hybrid_search.py.venv/bin/python -m py_compile src/storage/original.py src/api/routes/v2/memory.py src/api/routes/v2/workflows.py src/api/routes/v2/activities.py src/api/schemas.py src/config/settings.py