diff --git a/openviking/retrieve/hierarchical_retriever.py b/openviking/retrieve/hierarchical_retriever.py index 1fbe4d468..7802a9a4f 100644 --- a/openviking/retrieve/hierarchical_retriever.py +++ b/openviking/retrieve/hierarchical_retriever.py @@ -9,6 +9,7 @@ import heapq import logging +import asyncio import math import time from datetime import datetime @@ -129,7 +130,7 @@ async def retrieve( query_vector = None sparse_query_vector = None if self.embedder: - result: EmbedResult = self.embedder.embed(query.query, is_query=True) + result: EmbedResult = await asyncio.to_thread(self.embedder.embed, query.query, True) query_vector = result.dense_vector sparse_query_vector = result.sparse_vector @@ -168,7 +169,7 @@ async def retrieve( ) # Step 3: Merge starting points - starting_points = self._merge_starting_points( + starting_points = await self._merge_starting_points( query.query, root_uris, global_results, @@ -178,7 +179,7 @@ async def retrieve( # 从 global_results 中提取 level 2 的文件作为初始候选者 initial_candidates = [r for r in global_results if r.get("level", 2) == 2] - initial_candidates = self._prepare_initial_candidates( + initial_candidates = await self._prepare_initial_candidates( query.query, initial_candidates, mode=mode, @@ -247,7 +248,7 @@ async def _global_vector_search( telemetry.count("vector.scanned", len(results)) return results - def _rerank_scores( + async def _rerank_scores( self, query: str, documents: List[str], @@ -258,7 +259,7 @@ def _rerank_scores( return fallback_scores try: - scores = self._rerank_client.rerank_batch(query, documents) + scores = await asyncio.to_thread(self._rerank_client.rerank_batch, query, documents) except Exception as e: logger.warning( "[HierarchicalRetriever] Rerank failed, fallback to vector scores: %s", e @@ -279,7 +280,7 @@ def _rerank_scores( normalized_scores.append(fallback) return normalized_scores - def _merge_starting_points( + async def _merge_starting_points( self, query: str, root_uris: List[str], @@ -301,7 +302,7 @@ def _merge_starting_points( ] if self._rerank_client and mode == RetrieverMode.THINKING: docs = [str(r.get("abstract", "")) for r in global_results] - query_scores = self._rerank_scores(query, docs, default_scores) + query_scores = await self._rerank_scores(query, docs, default_scores) for i, r in enumerate(global_results): # 只添加非 level 2 的项目到起始点 if r.get("level", 2) != 2: @@ -322,7 +323,7 @@ def _merge_starting_points( return points - def _prepare_initial_candidates( + async def _prepare_initial_candidates( self, query: str, global_results: List[Dict[str, Any]], @@ -339,7 +340,7 @@ def _prepare_initial_candidates( ] if self._rerank_client and mode == RetrieverMode.THINKING: docs = [str(r.get("abstract", "")) for r in initial_candidates] - query_scores = self._rerank_scores(query, docs, default_scores) + query_scores = await self._rerank_scores(query, docs, default_scores) else: query_scores = default_scores @@ -442,7 +443,7 @@ def passes_threshold(score: float) -> bool: ] if self._rerank_client and mode == RetrieverMode.THINKING: documents = [str(r.get("abstract", "")) for r in results] - query_scores = self._rerank_scores(query, documents, query_scores) + query_scores = await self._rerank_scores(query, documents, query_scores) for r, score in zip(results, query_scores): uri = r.get("uri", "") diff --git a/openviking/storage/viking_fs.py b/openviking/storage/viking_fs.py index fbefca0ed..011e2fc25 100644 --- a/openviking/storage/viking_fs.py +++ b/openviking/storage/viking_fs.py @@ -791,11 +791,12 @@ async def abstract( """Read directory's L0 summary (.abstract.md).""" self._ensure_access(uri, ctx) path = self._uri_to_path(uri, ctx=ctx) - info = self.agfs.stat(path) + info = await asyncio.to_thread(self.agfs.stat, path) if not info.get("isDir"): raise ValueError(f"{uri} is not a directory") file_path = f"{path}/.abstract.md" - content_bytes = self._handle_agfs_read(self.agfs.read(file_path)) + content_bytes = await asyncio.to_thread(self.agfs.read, file_path) + content_bytes = self._handle_agfs_read(content_bytes) if self._encryptor: real_ctx = self._ctx_or_default(ctx) @@ -811,11 +812,12 @@ async def overview( """Read directory's L1 overview (.overview.md).""" self._ensure_access(uri, ctx) path = self._uri_to_path(uri, ctx=ctx) - info = self.agfs.stat(path) + info = await asyncio.to_thread(self.agfs.stat, path) if not info.get("isDir"): raise ValueError(f"{uri} is not a directory") file_path = f"{path}/.overview.md" - content_bytes = self._handle_agfs_read(self.agfs.read(file_path)) + content_bytes = await asyncio.to_thread(self.agfs.read, file_path) + content_bytes = self._handle_agfs_read(content_bytes) if self._encryptor: real_ctx = self._ctx_or_default(ctx) @@ -1380,7 +1382,7 @@ async def _update_vector_store_uris( ) -> None: """Update URIs in vector store (when moving files). - Preserves vector data and updates URI-derived identifiers without regenerating embeddings. + Preserves vector data, only updates uri and parent_uri fields, no need to regenerate embeddings. """ vector_store = self._get_vector_store() if not vector_store: @@ -1392,11 +1394,13 @@ async def _update_vector_store_uris( for uri in uris: try: new_uri = uri.replace(old_base_uri, new_base_uri, 1) + new_parent_uri = VikingURI(new_uri).parent.uri await vector_store.update_uri_mapping( ctx=self._ctx_or_default(ctx), uri=uri, new_uri=new_uri, + new_parent_uri=new_parent_uri, levels=levels, ) logger.debug(f"[VikingFS] Updated URI: {uri} -> {new_uri}") @@ -1496,7 +1500,8 @@ async def _read_relation_table( """Read .relations.json.""" table_path = f"{dir_path}/.relations.json" try: - content = self._handle_agfs_read(self.agfs.read(table_path)) + content = await asyncio.to_thread(self.agfs.read, table_path) + content = self._handle_agfs_read(content) content = await self._decrypt_content(content, ctx=ctx) data = json.loads(content.decode("utf-8")) except FileNotFoundError: diff --git a/openviking/storage/viking_vector_index_backend.py b/openviking/storage/viking_vector_index_backend.py index 22e4604bf..582d88b3f 100644 --- a/openviking/storage/viking_vector_index_backend.py +++ b/openviking/storage/viking_vector_index_backend.py @@ -307,7 +307,8 @@ async def query( f"[_SingleAccountBackend.query] Applied account filter, final filter={filter}" ) - return self._adapter.query( + return await asyncio.to_thread( + self._adapter.query, query_vector=query_vector, sparse_query_vector=sparse_query_vector, filter=filter,