Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 11 additions & 10 deletions openviking/retrieve/hierarchical_retriever.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@

import heapq
import logging
import asyncio
import math
import time
from datetime import datetime
Expand Down Expand Up @@ -129,7 +130,7 @@ async def retrieve(
query_vector = None
sparse_query_vector = None
if self.embedder:
result: EmbedResult = self.embedder.embed(query.query, is_query=True)
result: EmbedResult = await asyncio.to_thread(self.embedder.embed, query.query, True)
query_vector = result.dense_vector
sparse_query_vector = result.sparse_vector

Expand Down Expand Up @@ -168,7 +169,7 @@ async def retrieve(
)

# Step 3: Merge starting points
starting_points = self._merge_starting_points(
starting_points = await self._merge_starting_points(
query.query,
root_uris,
global_results,
Expand All @@ -178,7 +179,7 @@ async def retrieve(
# 从 global_results 中提取 level 2 的文件作为初始候选者
initial_candidates = [r for r in global_results if r.get("level", 2) == 2]

initial_candidates = self._prepare_initial_candidates(
initial_candidates = await self._prepare_initial_candidates(
query.query,
initial_candidates,
mode=mode,
Expand Down Expand Up @@ -247,7 +248,7 @@ async def _global_vector_search(
telemetry.count("vector.scanned", len(results))
return results

def _rerank_scores(
async def _rerank_scores(
self,
query: str,
documents: List[str],
Expand All @@ -258,7 +259,7 @@ def _rerank_scores(
return fallback_scores

try:
scores = self._rerank_client.rerank_batch(query, documents)
scores = await asyncio.to_thread(self._rerank_client.rerank_batch, query, documents)
except Exception as e:
logger.warning(
"[HierarchicalRetriever] Rerank failed, fallback to vector scores: %s", e
Expand All @@ -279,7 +280,7 @@ def _rerank_scores(
normalized_scores.append(fallback)
return normalized_scores

def _merge_starting_points(
async def _merge_starting_points(
self,
query: str,
root_uris: List[str],
Expand All @@ -301,7 +302,7 @@ def _merge_starting_points(
]
if self._rerank_client and mode == RetrieverMode.THINKING:
docs = [str(r.get("abstract", "")) for r in global_results]
query_scores = self._rerank_scores(query, docs, default_scores)
query_scores = await self._rerank_scores(query, docs, default_scores)
for i, r in enumerate(global_results):
# 只添加非 level 2 的项目到起始点
if r.get("level", 2) != 2:
Expand All @@ -322,7 +323,7 @@ def _merge_starting_points(

return points

def _prepare_initial_candidates(
async def _prepare_initial_candidates(
self,
query: str,
global_results: List[Dict[str, Any]],
Expand All @@ -339,7 +340,7 @@ def _prepare_initial_candidates(
]
if self._rerank_client and mode == RetrieverMode.THINKING:
docs = [str(r.get("abstract", "")) for r in initial_candidates]
query_scores = self._rerank_scores(query, docs, default_scores)
query_scores = await self._rerank_scores(query, docs, default_scores)
else:
query_scores = default_scores

Expand Down Expand Up @@ -442,7 +443,7 @@ def passes_threshold(score: float) -> bool:
]
if self._rerank_client and mode == RetrieverMode.THINKING:
documents = [str(r.get("abstract", "")) for r in results]
query_scores = self._rerank_scores(query, documents, query_scores)
query_scores = await self._rerank_scores(query, documents, query_scores)

for r, score in zip(results, query_scores):
uri = r.get("uri", "")
Expand Down
17 changes: 11 additions & 6 deletions openviking/storage/viking_fs.py
Original file line number Diff line number Diff line change
Expand Up @@ -791,11 +791,12 @@ async def abstract(
"""Read directory's L0 summary (.abstract.md)."""
self._ensure_access(uri, ctx)
path = self._uri_to_path(uri, ctx=ctx)
info = self.agfs.stat(path)
info = await asyncio.to_thread(self.agfs.stat, path)
if not info.get("isDir"):
raise ValueError(f"{uri} is not a directory")
file_path = f"{path}/.abstract.md"
content_bytes = self._handle_agfs_read(self.agfs.read(file_path))
content_bytes = await asyncio.to_thread(self.agfs.read, file_path)
content_bytes = self._handle_agfs_read(content_bytes)

if self._encryptor:
real_ctx = self._ctx_or_default(ctx)
Expand All @@ -811,11 +812,12 @@ async def overview(
"""Read directory's L1 overview (.overview.md)."""
self._ensure_access(uri, ctx)
path = self._uri_to_path(uri, ctx=ctx)
info = self.agfs.stat(path)
info = await asyncio.to_thread(self.agfs.stat, path)
if not info.get("isDir"):
raise ValueError(f"{uri} is not a directory")
file_path = f"{path}/.overview.md"
content_bytes = self._handle_agfs_read(self.agfs.read(file_path))
content_bytes = await asyncio.to_thread(self.agfs.read, file_path)
content_bytes = self._handle_agfs_read(content_bytes)

if self._encryptor:
real_ctx = self._ctx_or_default(ctx)
Expand Down Expand Up @@ -1380,7 +1382,7 @@ async def _update_vector_store_uris(
) -> None:
"""Update URIs in vector store (when moving files).

Preserves vector data and updates URI-derived identifiers without regenerating embeddings.
Preserves vector data, only updates uri and parent_uri fields, no need to regenerate embeddings.
"""
vector_store = self._get_vector_store()
if not vector_store:
Expand All @@ -1392,11 +1394,13 @@ async def _update_vector_store_uris(
for uri in uris:
try:
new_uri = uri.replace(old_base_uri, new_base_uri, 1)
new_parent_uri = VikingURI(new_uri).parent.uri

await vector_store.update_uri_mapping(
ctx=self._ctx_or_default(ctx),
uri=uri,
new_uri=new_uri,
new_parent_uri=new_parent_uri,
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[Bug] (blocking) update_uri_mapping() in openviking/storage/viking_vector_index_backend.py still has the signature (ctx, uri, new_uri, levels=None). Passing new_parent_uri= here will raise TypeError as soon as the mv/rename path hits this branch. If this parent-uri rewrite is needed, the callee and its tests need to be updated in the same PR; otherwise this hunk should be removed from the async-blocking fix.

levels=levels,
)
logger.debug(f"[VikingFS] Updated URI: {uri} -> {new_uri}")
Expand Down Expand Up @@ -1496,7 +1500,8 @@ async def _read_relation_table(
"""Read .relations.json."""
table_path = f"{dir_path}/.relations.json"
try:
content = self._handle_agfs_read(self.agfs.read(table_path))
content = await asyncio.to_thread(self.agfs.read, table_path)
content = self._handle_agfs_read(content)
content = await self._decrypt_content(content, ctx=ctx)
data = json.loads(content.decode("utf-8"))
except FileNotFoundError:
Expand Down
3 changes: 2 additions & 1 deletion openviking/storage/viking_vector_index_backend.py
Original file line number Diff line number Diff line change
Expand Up @@ -307,7 +307,8 @@ async def query(
f"[_SingleAccountBackend.query] Applied account filter, final filter={filter}"
)

return self._adapter.query(
return await asyncio.to_thread(
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[Bug] (blocking) This now calls asyncio.to_thread(...), but this file still does not import asyncio. The first query/search call on this branch will raise NameError: name asyncio is not defined, so the fix will fail before it can offload anything.

self._adapter.query,
query_vector=query_vector,
sparse_query_vector=sparse_query_vector,
filter=filter,
Expand Down