From 918b2d08e8c62ecca0b74b5e04955987161be5ed Mon Sep 17 00:00:00 2001 From: Sam Calder-Mason Date: Thu, 18 Jun 2026 11:43:02 +1000 Subject: [PATCH 1/2] feat(proxy): add versioned /v2/embedding route + server v2 client MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Adds /v2/embedding (+ /v2/embedding/check) to the proxy: serves a config-selected embedding model at a fixed dimensionality, returns fp32 vectors, and advertises the model in every response. Additive — /embed (v1) and its cache format are untouched, so existing clients are unaffected. The v2 cache key folds in dimensions ({model}:{dims}:{hash}); v1 keys are unchanged. The server search runtime prefers /v2/embedding (probed at startup) and falls back to the legacy /embed routes when the proxy lacks v2, so a new server still works against an older or self-hosted proxy. This is the cutover mechanism for switching the embedding model (e.g. text-embedding-3-large -> gemini-embedding-2 @ 1536): point embedding_v2 at the new model. A separate route + cache namespace means v1 clients keep using the old model and are never rolled. --- pkg/embedding/probe.go | 59 ++++++++++++++ pkg/embedding/remote.go | 96 +++++++++++++++++------ pkg/proxy/embedding.go | 69 ++++++++++++++-- pkg/proxy/embedding_v2_test.go | 139 +++++++++++++++++++++++++++++++++ pkg/proxy/server.go | 102 ++++++++++++++++++++++++ pkg/proxy/server_config.go | 41 +++++++++- pkg/searchruntime/runtime.go | 20 ++++- 7 files changed, 491 insertions(+), 35 deletions(-) create mode 100644 pkg/embedding/probe.go create mode 100644 pkg/proxy/embedding_v2_test.go diff --git a/pkg/embedding/probe.go b/pkg/embedding/probe.go new file mode 100644 index 00000000..c0a048b3 --- /dev/null +++ b/pkg/embedding/probe.go @@ -0,0 +1,59 @@ +package embedding + +import ( + "bytes" + "context" + "encoding/json" + "net/http" + "time" +) + +// probeTimeout bounds the v2 capability probe so a slow or unreachable proxy +// falls back to v1 quickly rather than stalling startup. +const probeTimeout = 15 * time.Second + +// ProbeV2 reports whether the proxy exposes the versioned /v2/embedding route +// and, if so, the embedding model it currently advertises. It POSTs an empty +// check request to /v2/embedding/check; a 200 response means v2 is available and +// its model is returned. Any non-200 status or transport error yields +// ("", false), signalling the caller to fall back to the legacy /embed routes. +func ProbeV2(ctx context.Context, proxyURL string, tokenFn func() string) (string, bool) { + body, err := json.Marshal(embedCheckRequest{Hashes: []string{}}) + if err != nil { + return "", false + } + + req, err := http.NewRequestWithContext( + ctx, http.MethodPost, proxyURL+"/v2/embedding/check", bytes.NewReader(body), + ) + if err != nil { + return "", false + } + + req.Header.Set("Content-Type", "application/json") + + if tokenFn != nil { + if token := tokenFn(); token != "" { + req.Header.Set("Authorization", "Bearer "+token) + } + } + + client := &http.Client{Timeout: probeTimeout} + + resp, err := client.Do(req) + if err != nil { + return "", false + } + defer func() { _ = resp.Body.Close() }() + + if resp.StatusCode != http.StatusOK { + return "", false + } + + var checkResp embedCheckResponse + if err := json.NewDecoder(resp.Body).Decode(&checkResp); err != nil { + return "", false + } + + return checkResp.Model, true +} diff --git a/pkg/embedding/remote.go b/pkg/embedding/remote.go index 93211425..562fa6eb 100644 --- a/pkg/embedding/remote.go +++ b/pkg/embedding/remote.go @@ -22,18 +22,21 @@ const ( maxBatchSize = 500 ) -// embedCheckRequest is the request payload for the proxy /embed/check endpoint. +// embedCheckRequest is the request payload for the proxy embed-check endpoint. type embedCheckRequest struct { Model string `json:"model"` Hashes []string `json:"hashes"` } -// embedCheckResponse is the response from /embed/check. +// embedCheckResponse is the response from the embed-check endpoint. Model is +// populated by the v2 route (and empty on v1) so callers can observe which +// embedding model the proxy is currently serving. type embedCheckResponse struct { + Model string `json:"model"` Cached []embedResult `json:"cached"` } -// embedRequest is the request payload for the proxy /embed endpoint. +// embedRequest is the request payload for the proxy embed endpoint. type embedRequest struct { Items []embedItem `json:"items"` } @@ -44,10 +47,13 @@ type embedItem struct { Text string `json:"text"` } -// embedResponse is the response payload from the proxy /embed endpoint. +// embedResponse is the response payload from the proxy embed endpoint. Both v1 +// and v2 advertise the serving model; v2 additionally reports its fixed output +// dimensionality. type embedResponse struct { - Results []embedResult `json:"results"` - Model string `json:"model"` + Results []embedResult `json:"results"` + Model string `json:"model"` + Dimensions int `json:"dimensions"` } // embedResult is a single embedding result. @@ -56,8 +62,11 @@ type embedResult struct { Vector []float32 `json:"vector"` } -// RemoteEmbedder implements Embedder by calling the proxy's /embed endpoint. +// RemoteEmbedder implements Embedder by calling the proxy's embed endpoint. // An optional local cache avoids round-trips to the proxy on warm restarts. +// When v2 is set it targets the versioned /v2/embedding routes (fp32 at a fixed +// dimensionality, model advertised per response); otherwise it uses the legacy +// /embed routes. type RemoteEmbedder struct { log logrus.FieldLogger proxyURL string @@ -66,26 +75,14 @@ type RemoteEmbedder struct { invalidateFn func() localCache cache.Cache model string + v2 bool progressFn func(completed, total int) } -// OnProgress registers a callback invoked during EmbedBatch with the number of -// documents embedded so far and the total in the batch. It enables -// document-level progress reporting for index builds. -func (e *RemoteEmbedder) OnProgress(fn func(completed, total int)) { - e.progressFn = fn -} - -func (e *RemoteEmbedder) reportProgress(completed, total int) { - if e.progressFn != nil { - e.progressFn(completed, total) - } -} - // Compile-time interface check. var _ Embedder = (*RemoteEmbedder)(nil) -// NewRemote creates a new RemoteEmbedder that calls the proxy's /embed endpoint. +// NewRemote creates a RemoteEmbedder that calls the proxy's legacy /embed routes. // tokenFn is called on each request to get the current auth token, and // invalidateFn drops the cached token so a 401/403 can be retried with a fresh // one (it may be nil to disable the retry). @@ -98,6 +95,20 @@ func NewRemote( invalidateFn func(), localCache cache.Cache, model string, +) *RemoteEmbedder { + return NewRemoteWithEndpoint(log, proxyURL, tokenFn, invalidateFn, localCache, model, false) +} + +// NewRemoteWithEndpoint creates a RemoteEmbedder targeting either the v2 +// (/v2/embedding) routes when v2 is true, or the legacy /embed routes otherwise. +func NewRemoteWithEndpoint( + log logrus.FieldLogger, + proxyURL string, + tokenFn func() string, + invalidateFn func(), + localCache cache.Cache, + model string, + v2 bool, ) *RemoteEmbedder { return &RemoteEmbedder{ log: log.WithField("component", "remote-embedder"), @@ -107,9 +118,24 @@ func NewRemote( invalidateFn: invalidateFn, localCache: localCache, model: model, + v2: v2, } } +// Model returns the embedding model this embedder is keyed to. Index builds use +// it to tag the embedding space they were built in, so a later change to the +// proxy's served model can be detected and trigger a re-index. +func (e *RemoteEmbedder) Model() string { + return e.model +} + +// OnProgress registers a callback invoked during EmbedBatch with the number of +// documents embedded so far and the total in the batch. It enables +// document-level progress reporting for index builds. +func (e *RemoteEmbedder) OnProgress(fn func(completed, total int)) { + e.progressFn = fn +} + // Embed returns the L2-normalized embedding vector for a single text string. func (e *RemoteEmbedder) Embed(text string) ([]float32, error) { vectors, err := e.EmbedBatch([]string{text}) @@ -328,6 +354,28 @@ func (e *RemoteEmbedder) Close() error { return nil } +func (e *RemoteEmbedder) reportProgress(completed, total int) { + if e.progressFn != nil { + e.progressFn(completed, total) + } +} + +func (e *RemoteEmbedder) embedPath() string { + if e.v2 { + return "/v2/embedding" + } + + return "/embed" +} + +func (e *RemoteEmbedder) checkPath() string { + if e.v2 { + return "/v2/embedding/check" + } + + return "/embed/check" +} + func (e *RemoteEmbedder) localCacheKey(textHash string) string { return e.model + ":" + textHash } @@ -345,7 +393,7 @@ func (e *RemoteEmbedder) queueLocalCache(toCache map[string][]byte, textHash str toCache[e.localCacheKey(textHash)] = data } -// embedDirect sends all items to /embed without checking the cache first. +// embedDirect sends all items to the embed route without checking the cache first. func (e *RemoteEmbedder) embedDirect( texts []string, hashes []string, @@ -418,7 +466,7 @@ func (e *RemoteEmbedder) checkCached(hashes []string) ([]embedResult, error) { return nil, fmt.Errorf("marshaling check request: %w", err) } - resp, err := e.doWithAuthRetry("/embed/check", reqBody) + resp, err := e.doWithAuthRetry(e.checkPath(), reqBody) if err != nil { return nil, fmt.Errorf("calling embed check: %w", err) } @@ -444,7 +492,7 @@ func (e *RemoteEmbedder) callEmbed(items []embedItem) (*embedResponse, error) { return nil, fmt.Errorf("marshaling embed request: %w", err) } - resp, err := e.doWithAuthRetry("/embed", reqBody) + resp, err := e.doWithAuthRetry(e.embedPath(), reqBody) if err != nil { return nil, fmt.Errorf("calling proxy embed: %w", err) } diff --git a/pkg/proxy/embedding.go b/pkg/proxy/embedding.go index d0892f84..6c432483 100644 --- a/pkg/proxy/embedding.go +++ b/pkg/proxy/embedding.go @@ -59,6 +59,22 @@ type EmbedResult struct { Vector []float32 `json:"vector"` } +// EmbedV2Response is the response payload from the /v2/embedding endpoint. It +// advertises the model and dimensions so clients can detect a model change and +// re-index. Vectors are fp32. +type EmbedV2Response struct { + Model string `json:"model"` + Dimensions int `json:"dimensions"` + Results []EmbedResult `json:"results"` +} + +// EmbedV2CheckResponse is the response from /v2/embedding/check. It advertises +// the model so clients can detect a model change. +type EmbedV2CheckResponse struct { + Model string `json:"model"` + Cached []EmbedResult `json:"cached"` +} + // EmbeddingService handles embedding requests using a remote API with caching. type EmbeddingService struct { log logrus.FieldLogger @@ -68,15 +84,31 @@ type EmbeddingService struct { apiURL string client *http.Client costPerToken float64 + // dimensions, when > 0, requests a fixed output dimensionality from the + // embedding API (Matryoshka truncation). 0 leaves it unset (native dims). + dimensions int } -// NewEmbeddingService creates a new EmbeddingService. -// If costPerToken is 0, the service fetches pricing from the API's /models endpoint. +// NewEmbeddingService creates a new EmbeddingService with native output +// dimensionality. If costPerToken is 0, the service fetches pricing from the +// API's /models endpoint. func NewEmbeddingService( log logrus.FieldLogger, c cache.Cache, apiKey, model, apiURL string, costPerToken float64, +) *EmbeddingService { + return NewEmbeddingServiceWithDimensions(log, c, apiKey, model, apiURL, costPerToken, 0) +} + +// NewEmbeddingServiceWithDimensions creates a new EmbeddingService that requests +// a fixed output dimensionality from the embedding API when dimensions > 0. +func NewEmbeddingServiceWithDimensions( + log logrus.FieldLogger, + c cache.Cache, + apiKey, model, apiURL string, + costPerToken float64, + dimensions int, ) *EmbeddingService { svcLog := log.WithField("component", "embedding-service") normalizedURL := strings.TrimRight(apiURL, "/") @@ -103,6 +135,7 @@ func NewEmbeddingService( apiURL: normalizedURL, client: httpClient, costPerToken: costPerToken, + dimensions: dimensions, } } @@ -111,6 +144,24 @@ func (s *EmbeddingService) Model() string { return s.model } +// Dimensions returns the configured output dimensionality, or 0 for native. +func (s *EmbeddingService) Dimensions() int { + return s.dimensions +} + +// cacheKeyPrefix returns the cache-key namespace for this service's vectors. +// It folds the requested dimensionality into the key when set (> 0), so two +// services on the same model id but different output dimensions never collide. +// Native-dimension services (dimensions == 0, i.e. v1) keep the legacy +// {model}: prefix unchanged, so existing cached vectors stay valid. +func (s *EmbeddingService) cacheKeyPrefix() string { + if s.dimensions > 0 { + return s.model + ":" + strconv.Itoa(s.dimensions) + ":" + } + + return s.model + ":" +} + // Embed computes embeddings for the given items, using the cache where possible. // Uncached items are sent to the upstream API in sub-batches of maxEmbedBatchSize. func (s *EmbeddingService) Embed(ctx context.Context, items []EmbedItem) (*EmbedResponse, error) { @@ -124,11 +175,11 @@ func (s *EmbeddingService) Embed(ctx context.Context, items []EmbedItem) (*Embed s.log.WithField("items", len(items)).Info("Embed request received") - // Build cache keys: {model}:{hash}. + // Build cache keys: {model}:{hash} (or {model}:{dims}:{hash} when dims set). cacheKeys := make([]string, len(items)) for i, item := range items { - cacheKeys[i] = s.model + ":" + item.Hash + cacheKeys[i] = s.cacheKeyPrefix() + item.Hash } // Check cache for existing vectors. @@ -266,7 +317,7 @@ func (s *EmbeddingService) CheckCached(ctx context.Context, hashes []string) ([] cacheKeys := make([]string, len(hashes)) for i, h := range hashes { - cacheKeys[i] = s.model + ":" + h + cacheKeys[i] = s.cacheKeyPrefix() + h } cached, err := s.cache.GetMulti(ctx, cacheKeys) @@ -312,6 +363,9 @@ func (s *EmbeddingService) Close() error { type openRouterRequest struct { Model string `json:"model"` Input []string `json:"input"` + // Dimensions requests a fixed output size (Matryoshka). Omitted when 0 so + // callers using native dimensionality send a byte-identical request. + Dimensions int `json:"dimensions,omitempty"` } // openRouterResponse is the response body from the OpenRouter embeddings API. @@ -334,8 +388,9 @@ type openRouterEmbedding struct { func (s *EmbeddingService) callEmbeddingAPI(ctx context.Context, texts []string) ([][]float32, *openRouterUsage, error) { reqBody := openRouterRequest{ - Model: s.model, - Input: texts, + Model: s.model, + Input: texts, + Dimensions: s.dimensions, } body, err := json.Marshal(reqBody) diff --git a/pkg/proxy/embedding_v2_test.go b/pkg/proxy/embedding_v2_test.go new file mode 100644 index 00000000..73923c84 --- /dev/null +++ b/pkg/proxy/embedding_v2_test.go @@ -0,0 +1,139 @@ +package proxy + +import ( + "context" + "encoding/json" + "math" + "net/http" + "net/http/httptest" + "sync" + "testing" + + "github.com/sirupsen/logrus" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + + "github.com/ethpandaops/panda/pkg/cache" +) + +// newRecordingOpenRouterServer mimics the OpenRouter /v1/embeddings endpoint and +// records the dimensions field from the most recent request, so tests can assert +// what the embedding service sent upstream. +func newRecordingOpenRouterServer(t *testing.T, lastDims *int, mu *sync.Mutex) *httptest.Server { + t.Helper() + + srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + require.Equal(t, "/v1/embeddings", r.URL.Path) + + var req openRouterRequest + require.NoError(t, json.NewDecoder(r.Body).Decode(&req)) + + mu.Lock() + *lastDims = req.Dimensions + mu.Unlock() + + data := make([]openRouterEmbedding, 0, len(req.Input)) + for i := range req.Input { + data = append(data, openRouterEmbedding{Index: i, Embedding: []float32{0.3, 0.4, 0.5}}) + } + + w.Header().Set("Content-Type", "application/json") + require.NoError(t, json.NewEncoder(w).Encode(openRouterResponse{Data: data})) + })) + t.Cleanup(srv.Close) + + return srv +} + +func TestEmbeddingServiceV2_SendsDimensions(t *testing.T) { + t.Parallel() + + var ( + lastDims int + mu sync.Mutex + ) + + mockAPI := newRecordingOpenRouterServer(t, &lastDims, &mu) + + svc := NewEmbeddingServiceWithDimensions( + logrus.New(), cache.NewInMemory(0), + "test-api-key", "google/gemini-embedding-2", mockAPI.URL+"/v1", 0.01, 1536, + ) + + assert.Equal(t, "google/gemini-embedding-2", svc.Model()) + assert.Equal(t, 1536, svc.Dimensions()) + + resp, err := svc.Embed(context.Background(), []EmbedItem{{Hash: "aaa", Text: "hello"}}) + require.NoError(t, err) + require.Len(t, resp.Results, 1) + assert.Equal(t, "google/gemini-embedding-2", resp.Model) + + mu.Lock() + assert.Equal(t, 1536, lastDims, "v2 service must request dimensions=1536 upstream") + mu.Unlock() + + // Vector is L2-normalized. + var norm float64 + for _, v := range resp.Results[0].Vector { + norm += float64(v) * float64(v) + } + + assert.InDelta(t, 1.0, math.Sqrt(norm), 1e-6) +} + +func TestEmbeddingServiceV1_OmitsDimensions(t *testing.T) { + t.Parallel() + + var ( + lastDims = -1 + mu sync.Mutex + ) + + mockAPI := newRecordingOpenRouterServer(t, &lastDims, &mu) + + // v1 constructor → dimensions must stay 0 and be omitted from the request. + svc := NewEmbeddingService( + logrus.New(), cache.NewInMemory(0), + "test-api-key", "openai/text-embedding-3-small", mockAPI.URL+"/v1", 0.01, + ) + + assert.Equal(t, 0, svc.Dimensions()) + + _, err := svc.Embed(context.Background(), []EmbedItem{{Hash: "aaa", Text: "hello"}}) + require.NoError(t, err) + + mu.Lock() + assert.Equal(t, 0, lastDims, "v1 request must omit dimensions (decodes to 0)") + mu.Unlock() +} + +func TestEmbeddingServiceV2_CheckCached(t *testing.T) { + t.Parallel() + + var ( + lastDims int + mu sync.Mutex + ) + + mockAPI := newRecordingOpenRouterServer(t, &lastDims, &mu) + memCache := cache.NewInMemory(0) + + svc := NewEmbeddingServiceWithDimensions( + logrus.New(), memCache, + "test-api-key", "google/gemini-embedding-2", mockAPI.URL+"/v1", 0.01, 1536, + ) + + // Nothing cached yet. + results, err := svc.CheckCached(context.Background(), []string{"aaa"}) + require.NoError(t, err) + assert.Empty(t, results) + + // Embed populates the cache, then the same hash is a hit. + _, err = svc.Embed(context.Background(), []EmbedItem{{Hash: "aaa", Text: "hello"}}) + require.NoError(t, err) + + results, err = svc.CheckCached(context.Background(), []string{"aaa"}) + require.NoError(t, err) + require.Len(t, results, 1) + assert.Equal(t, "aaa", results[0].Hash) +} diff --git a/pkg/proxy/server.go b/pkg/proxy/server.go index bea0db40..615104c8 100644 --- a/pkg/proxy/server.go +++ b/pkg/proxy/server.go @@ -72,6 +72,7 @@ type server struct { ethNodeHandler *handlers.EthNodeHandler benchmarkoorHandler *handlers.BenchmarkoorHandler embeddingService *EmbeddingService + embeddingServiceV2 *EmbeddingService githubHandler *handlers.GitHubHandler autodiscoverHTTPClient *http.Client @@ -211,6 +212,24 @@ func newServer(log logrus.FieldLogger, cfg ServerConfig, hostURL, port string) ( ) } + // Create v2 embedding service if configured (independent model + dimensions). + if cfg.EmbeddingV2 != nil { + embCacheV2, err := buildEmbeddingCache(cfg.EmbeddingV2.Cache) + if err != nil { + return nil, fmt.Errorf("creating v2 embedding cache: %w", err) + } + + s.embeddingServiceV2 = NewEmbeddingServiceWithDimensions( + log, + embCacheV2, + cfg.EmbeddingV2.APIKey, + cfg.EmbeddingV2.Model, + cfg.EmbeddingV2.APIURL, + 0, + cfg.EmbeddingV2.Dimensions, + ) + } + // Create GitHub handler if configured. if cfg.GitHub != nil && cfg.GitHub.Token != "" { s.githubTriggerLimiter = NewRateLimiter(log, RateLimiterConfig{ @@ -277,6 +296,11 @@ func (s *server) registerRoutes() { s.mux.Method(http.MethodPost, "/embed/check", s.metricsMiddleware(chain(http.HandlerFunc(s.handleEmbedCheck)))) } + if s.embeddingServiceV2 != nil { + s.mux.Method(http.MethodPost, "/v2/embedding", s.metricsMiddleware(chain(http.HandlerFunc(s.handleEmbedV2)))) + s.mux.Method(http.MethodPost, "/v2/embedding/check", s.metricsMiddleware(chain(http.HandlerFunc(s.handleEmbedCheckV2)))) + } + // Authenticated routes. if s.clickhouseHandler != nil { s.handleSubtreeRoute("/clickhouse", s.metricsMiddleware(chain(s.clickhouseHandler))) @@ -515,6 +539,77 @@ func (s *server) handleEmbedCheck(w http.ResponseWriter, r *http.Request) { } } +// handleEmbedV2 embeds items via the v2 embedding service. The response +// advertises the model and dimensions; vectors are fp32. The model behind this +// route is config-swappable, and the advertised model lets clients detect a +// change and re-index. +func (s *server) handleEmbedV2(w http.ResponseWriter, r *http.Request) { + var req EmbedRequest + if err := json.NewDecoder(r.Body).Decode(&req); err != nil { + http.Error(w, fmt.Sprintf("invalid request body: %v", err), http.StatusBadRequest) + + return + } + + if len(req.Items) > maxEmbedItems { + http.Error(w, fmt.Sprintf("too many items: %d exceeds maximum of %d", len(req.Items), maxEmbedItems), http.StatusBadRequest) + + return + } + + resp, err := s.embeddingServiceV2.Embed(r.Context(), req.Items) + if err != nil { + s.log.WithError(err).Error("V2 embedding request failed") + http.Error(w, fmt.Sprintf("embedding failed: %v", err), http.StatusInternalServerError) + + return + } + + w.Header().Set("Content-Type", "application/json") + w.WriteHeader(http.StatusOK) + + out := EmbedV2Response{ + Model: resp.Model, + Dimensions: s.embeddingServiceV2.Dimensions(), + Results: resp.Results, + } + + if err := json.NewEncoder(w).Encode(out); err != nil { + s.log.WithError(err).Error("Failed to encode v2 embedding response") + } +} + +// handleEmbedCheckV2 returns cached v2 vectors for the given hashes without +// embedding new content. The response advertises the model. +func (s *server) handleEmbedCheckV2(w http.ResponseWriter, r *http.Request) { + var req EmbedCheckRequest + if err := json.NewDecoder(r.Body).Decode(&req); err != nil { + http.Error(w, fmt.Sprintf("invalid request body: %v", err), http.StatusBadRequest) + + return + } + + results, err := s.embeddingServiceV2.CheckCached(r.Context(), req.Hashes) + if err != nil { + s.log.WithError(err).Error("V2 embed check failed") + http.Error(w, fmt.Sprintf("embed check failed: %v", err), http.StatusInternalServerError) + + return + } + + w.Header().Set("Content-Type", "application/json") + w.WriteHeader(http.StatusOK) + + out := EmbedV2CheckResponse{ + Model: s.embeddingServiceV2.Model(), + Cached: results, + } + + if err := json.NewEncoder(w).Encode(out); err != nil { + s.log.WithError(err).Error("Failed to encode v2 embed check response") + } +} + // handleBranding returns the success_page config as JSON, or 204 if not configured. func (s *server) handleBranding(w http.ResponseWriter, _ *http.Request) { if s.cfg.Auth.SuccessPage == nil { @@ -647,6 +742,13 @@ func (s *server) Stop(ctx context.Context) error { } } + // Close v2 embedding service. + if s.embeddingServiceV2 != nil { + if err := s.embeddingServiceV2.Close(); err != nil { + s.log.WithError(err).Warn("Error closing v2 embedding service") + } + } + // Shutdown HTTP server. if s.httpSrv != nil { if err := s.httpSrv.Shutdown(ctx); err != nil { diff --git a/pkg/proxy/server_config.go b/pkg/proxy/server_config.go index ef2b4855..c608148a 100644 --- a/pkg/proxy/server_config.go +++ b/pkg/proxy/server_config.go @@ -48,9 +48,14 @@ type ServerConfig struct { // Metrics holds Prometheus metrics configuration. Metrics MetricsConfig `yaml:"metrics"` - // Embedding holds optional embedding API configuration. + // Embedding holds optional embedding API configuration (v1 route: /embed). Embedding *EmbeddingConfig `yaml:"embedding,omitempty"` + // EmbeddingV2 holds optional configuration for the v2 embedding route + // (/v2/embedding). The route's contract is fixed (fp32, model advertised in + // the response); the model itself is swappable here without touching v1. + EmbeddingV2 *EmbeddingConfig `yaml:"embedding_v2,omitempty"` + // GitHub holds optional GitHub API configuration for triggering workflows. GitHub *GitHubAPIConfig `yaml:"github,omitempty"` } @@ -309,6 +314,10 @@ type EmbeddingConfig struct { // APIURL is the base URL of the embedding API (default: "https://openrouter.ai/api/v1"). APIURL string `yaml:"api_url,omitempty"` + // Dimensions, when > 0, requests a fixed output dimensionality (Matryoshka) + // from the embedding API. Used by the v2 embedding route; ignored by v1. + Dimensions int `yaml:"dimensions,omitempty"` + // Cache holds embedding cache configuration. Cache EmbeddingCacheConfig `yaml:"cache"` } @@ -407,6 +416,25 @@ func (c *ServerConfig) ApplyDefaults() { } } + // Embedding v2 defaults. + if c.EmbeddingV2 != nil { + if c.EmbeddingV2.Model == "" { + c.EmbeddingV2.Model = "google/gemini-embedding-2" + } + + if c.EmbeddingV2.APIURL == "" { + c.EmbeddingV2.APIURL = "https://openrouter.ai/api/v1" + } + + if c.EmbeddingV2.Dimensions == 0 { + c.EmbeddingV2.Dimensions = 1536 + } + + if c.EmbeddingV2.Cache.Backend == "" { + c.EmbeddingV2.Cache.Backend = "memory" + } + } + // ClickHouse defaults. for i := range c.ClickHouse { if len(c.ClickHouse[i].Variants) > 0 { @@ -480,6 +508,17 @@ func (c *ServerConfig) Validate() error { } } + // Validate embedding v2 config. + if c.EmbeddingV2 != nil { + if c.EmbeddingV2.APIKey == "" { + return fmt.Errorf("embedding_v2.api_key is required when embedding_v2 is configured") + } + + if c.EmbeddingV2.Cache.Backend == "redis" && c.EmbeddingV2.Cache.RedisURL == "" { + return fmt.Errorf("embedding_v2.cache.redis_url is required when cache backend is 'redis'") + } + } + // Validate at least one datasource is configured. if len(c.ClickHouse) == 0 && len(c.Prometheus) == 0 && len(c.Loki) == 0 && c.EthNode == nil && len(c.Benchmarkoor) == 0 { return fmt.Errorf("at least one datasource (clickhouse, prometheus, loki, ethnode, or benchmarkoor) must be configured") diff --git a/pkg/searchruntime/runtime.go b/pkg/searchruntime/runtime.go index 1b0164df..c6024507 100644 --- a/pkg/searchruntime/runtime.go +++ b/pkg/searchruntime/runtime.go @@ -75,9 +75,22 @@ func Build( return runtime, nil } + // Prefer the versioned /v2/embedding route (fp32 @ a fixed dimensionality, + // model advertised per response). Fall back to the legacy /embed routes when + // the proxy does not expose v2, so a new server still works against an older + // or self-hosted proxy. The model is whatever the chosen route serves: v2's + // advertised model, or v1's configured model. + tokenFn := func() string { return proxyService.RegisterToken() } + model := proxyService.EmbeddingModel() + useV2 := false + + if v2Model, ok := embedding.ProbeV2(ctx, proxyService.URL(), tokenFn); ok && v2Model != "" { + model = v2Model + useV2 = true + } - log.WithField("model", model). + log.WithFields(logrus.Fields{"model": model, "v2": useV2}). Info("Using remote embedder via proxy") var localCache cache.Cache @@ -93,13 +106,14 @@ func Build( } } - embedder := embedding.NewRemote( + embedder := embedding.NewRemoteWithEndpoint( log, proxyService.URL(), - func() string { return proxyService.RegisterToken() }, + tokenFn, proxyService.Invalidate, localCache, model, + useV2, ) runtime.embedder = embedder From 7ad275f87bcefb7dc90ffc2a088291cb73fb6839 Mon Sep 17 00:00:00 2001 From: Sam Calder-Mason Date: Thu, 18 Jun 2026 12:03:42 +1000 Subject: [PATCH 2/2] fix(proxy): count v2 embedding service in EmbeddingAvailable/EmbeddingModel A proxy configured with only embedding_v2 (no v1 embedding) advertised embedding_available=false, so the search runtime's guard tripped before the v2 probe ran and silently disabled all search. Both accessors now consider embeddingServiceV2: available when either service exists; model prefers v1 (what /embed serves) and falls through to the v2 model so a v2-only proxy advertises a non-empty model. --- pkg/proxy/server.go | 21 +++++++++++++++------ 1 file changed, 15 insertions(+), 6 deletions(-) diff --git a/pkg/proxy/server.go b/pkg/proxy/server.go index 615104c8..31c668aa 100644 --- a/pkg/proxy/server.go +++ b/pkg/proxy/server.go @@ -1015,18 +1015,27 @@ func (s *server) EthNodeDatasourceInfo() []types.DatasourceInfo { return ethNodeDatasourceInfo(s.EthNodeAvailable()) } -// EmbeddingAvailable returns true if the embedding service is configured. +// EmbeddingAvailable returns true if either the v1 or the v2 embedding service +// is configured. A proxy that only configures embedding_v2 still offers +// embedding (via /v2/embedding), and the search runtime discovers v2 by probe. func (s *server) EmbeddingAvailable() bool { - return s.embeddingService != nil + return s.embeddingService != nil || s.embeddingServiceV2 != nil } -// EmbeddingModel returns the configured embedding model name. +// EmbeddingModel returns the advertised embedding model name. It prefers the v1 +// model — what /embed serves and what unspecified/legacy clients get — and falls +// through to the v2 model so a v2-only proxy still advertises a non-empty model. +// v2-capable clients discover the v2 model from the /v2/embedding response. func (s *server) EmbeddingModel() string { - if s.embeddingService == nil { - return "" + if s.embeddingService != nil { + return s.embeddingService.Model() + } + + if s.embeddingServiceV2 != nil { + return s.embeddingServiceV2.Model() } - return s.embeddingService.Model() + return "" } func advertisedURLs(listenAddr string) (string, string) {