diff --git a/pkg/embedding/probe.go b/pkg/embedding/probe.go new file mode 100644 index 0000000..c0a048b --- /dev/null +++ b/pkg/embedding/probe.go @@ -0,0 +1,59 @@ +package embedding + +import ( + "bytes" + "context" + "encoding/json" + "net/http" + "time" +) + +// probeTimeout bounds the v2 capability probe so a slow or unreachable proxy +// falls back to v1 quickly rather than stalling startup. +const probeTimeout = 15 * time.Second + +// ProbeV2 reports whether the proxy exposes the versioned /v2/embedding route +// and, if so, the embedding model it currently advertises. It POSTs an empty +// check request to /v2/embedding/check; a 200 response means v2 is available and +// its model is returned. Any non-200 status or transport error yields +// ("", false), signalling the caller to fall back to the legacy /embed routes. +func ProbeV2(ctx context.Context, proxyURL string, tokenFn func() string) (string, bool) { + body, err := json.Marshal(embedCheckRequest{Hashes: []string{}}) + if err != nil { + return "", false + } + + req, err := http.NewRequestWithContext( + ctx, http.MethodPost, proxyURL+"/v2/embedding/check", bytes.NewReader(body), + ) + if err != nil { + return "", false + } + + req.Header.Set("Content-Type", "application/json") + + if tokenFn != nil { + if token := tokenFn(); token != "" { + req.Header.Set("Authorization", "Bearer "+token) + } + } + + client := &http.Client{Timeout: probeTimeout} + + resp, err := client.Do(req) + if err != nil { + return "", false + } + defer func() { _ = resp.Body.Close() }() + + if resp.StatusCode != http.StatusOK { + return "", false + } + + var checkResp embedCheckResponse + if err := json.NewDecoder(resp.Body).Decode(&checkResp); err != nil { + return "", false + } + + return checkResp.Model, true +} diff --git a/pkg/embedding/remote.go b/pkg/embedding/remote.go index 9321142..562fa6e 100644 --- a/pkg/embedding/remote.go +++ b/pkg/embedding/remote.go @@ -22,18 +22,21 @@ const ( maxBatchSize = 500 ) -// embedCheckRequest is the request payload for the proxy /embed/check endpoint. +// embedCheckRequest is the request payload for the proxy embed-check endpoint. type embedCheckRequest struct { Model string `json:"model"` Hashes []string `json:"hashes"` } -// embedCheckResponse is the response from /embed/check. +// embedCheckResponse is the response from the embed-check endpoint. Model is +// populated by the v2 route (and empty on v1) so callers can observe which +// embedding model the proxy is currently serving. type embedCheckResponse struct { + Model string `json:"model"` Cached []embedResult `json:"cached"` } -// embedRequest is the request payload for the proxy /embed endpoint. +// embedRequest is the request payload for the proxy embed endpoint. type embedRequest struct { Items []embedItem `json:"items"` } @@ -44,10 +47,13 @@ type embedItem struct { Text string `json:"text"` } -// embedResponse is the response payload from the proxy /embed endpoint. +// embedResponse is the response payload from the proxy embed endpoint. Both v1 +// and v2 advertise the serving model; v2 additionally reports its fixed output +// dimensionality. type embedResponse struct { - Results []embedResult `json:"results"` - Model string `json:"model"` + Results []embedResult `json:"results"` + Model string `json:"model"` + Dimensions int `json:"dimensions"` } // embedResult is a single embedding result. @@ -56,8 +62,11 @@ type embedResult struct { Vector []float32 `json:"vector"` } -// RemoteEmbedder implements Embedder by calling the proxy's /embed endpoint. +// RemoteEmbedder implements Embedder by calling the proxy's embed endpoint. // An optional local cache avoids round-trips to the proxy on warm restarts. +// When v2 is set it targets the versioned /v2/embedding routes (fp32 at a fixed +// dimensionality, model advertised per response); otherwise it uses the legacy +// /embed routes. type RemoteEmbedder struct { log logrus.FieldLogger proxyURL string @@ -66,26 +75,14 @@ type RemoteEmbedder struct { invalidateFn func() localCache cache.Cache model string + v2 bool progressFn func(completed, total int) } -// OnProgress registers a callback invoked during EmbedBatch with the number of -// documents embedded so far and the total in the batch. It enables -// document-level progress reporting for index builds. -func (e *RemoteEmbedder) OnProgress(fn func(completed, total int)) { - e.progressFn = fn -} - -func (e *RemoteEmbedder) reportProgress(completed, total int) { - if e.progressFn != nil { - e.progressFn(completed, total) - } -} - // Compile-time interface check. var _ Embedder = (*RemoteEmbedder)(nil) -// NewRemote creates a new RemoteEmbedder that calls the proxy's /embed endpoint. +// NewRemote creates a RemoteEmbedder that calls the proxy's legacy /embed routes. // tokenFn is called on each request to get the current auth token, and // invalidateFn drops the cached token so a 401/403 can be retried with a fresh // one (it may be nil to disable the retry). @@ -98,6 +95,20 @@ func NewRemote( invalidateFn func(), localCache cache.Cache, model string, +) *RemoteEmbedder { + return NewRemoteWithEndpoint(log, proxyURL, tokenFn, invalidateFn, localCache, model, false) +} + +// NewRemoteWithEndpoint creates a RemoteEmbedder targeting either the v2 +// (/v2/embedding) routes when v2 is true, or the legacy /embed routes otherwise. +func NewRemoteWithEndpoint( + log logrus.FieldLogger, + proxyURL string, + tokenFn func() string, + invalidateFn func(), + localCache cache.Cache, + model string, + v2 bool, ) *RemoteEmbedder { return &RemoteEmbedder{ log: log.WithField("component", "remote-embedder"), @@ -107,9 +118,24 @@ func NewRemote( invalidateFn: invalidateFn, localCache: localCache, model: model, + v2: v2, } } +// Model returns the embedding model this embedder is keyed to. Index builds use +// it to tag the embedding space they were built in, so a later change to the +// proxy's served model can be detected and trigger a re-index. +func (e *RemoteEmbedder) Model() string { + return e.model +} + +// OnProgress registers a callback invoked during EmbedBatch with the number of +// documents embedded so far and the total in the batch. It enables +// document-level progress reporting for index builds. +func (e *RemoteEmbedder) OnProgress(fn func(completed, total int)) { + e.progressFn = fn +} + // Embed returns the L2-normalized embedding vector for a single text string. func (e *RemoteEmbedder) Embed(text string) ([]float32, error) { vectors, err := e.EmbedBatch([]string{text}) @@ -328,6 +354,28 @@ func (e *RemoteEmbedder) Close() error { return nil } +func (e *RemoteEmbedder) reportProgress(completed, total int) { + if e.progressFn != nil { + e.progressFn(completed, total) + } +} + +func (e *RemoteEmbedder) embedPath() string { + if e.v2 { + return "/v2/embedding" + } + + return "/embed" +} + +func (e *RemoteEmbedder) checkPath() string { + if e.v2 { + return "/v2/embedding/check" + } + + return "/embed/check" +} + func (e *RemoteEmbedder) localCacheKey(textHash string) string { return e.model + ":" + textHash } @@ -345,7 +393,7 @@ func (e *RemoteEmbedder) queueLocalCache(toCache map[string][]byte, textHash str toCache[e.localCacheKey(textHash)] = data } -// embedDirect sends all items to /embed without checking the cache first. +// embedDirect sends all items to the embed route without checking the cache first. func (e *RemoteEmbedder) embedDirect( texts []string, hashes []string, @@ -418,7 +466,7 @@ func (e *RemoteEmbedder) checkCached(hashes []string) ([]embedResult, error) { return nil, fmt.Errorf("marshaling check request: %w", err) } - resp, err := e.doWithAuthRetry("/embed/check", reqBody) + resp, err := e.doWithAuthRetry(e.checkPath(), reqBody) if err != nil { return nil, fmt.Errorf("calling embed check: %w", err) } @@ -444,7 +492,7 @@ func (e *RemoteEmbedder) callEmbed(items []embedItem) (*embedResponse, error) { return nil, fmt.Errorf("marshaling embed request: %w", err) } - resp, err := e.doWithAuthRetry("/embed", reqBody) + resp, err := e.doWithAuthRetry(e.embedPath(), reqBody) if err != nil { return nil, fmt.Errorf("calling proxy embed: %w", err) } diff --git a/pkg/proxy/embedding.go b/pkg/proxy/embedding.go index d0892f8..6c43248 100644 --- a/pkg/proxy/embedding.go +++ b/pkg/proxy/embedding.go @@ -59,6 +59,22 @@ type EmbedResult struct { Vector []float32 `json:"vector"` } +// EmbedV2Response is the response payload from the /v2/embedding endpoint. It +// advertises the model and dimensions so clients can detect a model change and +// re-index. Vectors are fp32. +type EmbedV2Response struct { + Model string `json:"model"` + Dimensions int `json:"dimensions"` + Results []EmbedResult `json:"results"` +} + +// EmbedV2CheckResponse is the response from /v2/embedding/check. It advertises +// the model so clients can detect a model change. +type EmbedV2CheckResponse struct { + Model string `json:"model"` + Cached []EmbedResult `json:"cached"` +} + // EmbeddingService handles embedding requests using a remote API with caching. type EmbeddingService struct { log logrus.FieldLogger @@ -68,15 +84,31 @@ type EmbeddingService struct { apiURL string client *http.Client costPerToken float64 + // dimensions, when > 0, requests a fixed output dimensionality from the + // embedding API (Matryoshka truncation). 0 leaves it unset (native dims). + dimensions int } -// NewEmbeddingService creates a new EmbeddingService. -// If costPerToken is 0, the service fetches pricing from the API's /models endpoint. +// NewEmbeddingService creates a new EmbeddingService with native output +// dimensionality. If costPerToken is 0, the service fetches pricing from the +// API's /models endpoint. func NewEmbeddingService( log logrus.FieldLogger, c cache.Cache, apiKey, model, apiURL string, costPerToken float64, +) *EmbeddingService { + return NewEmbeddingServiceWithDimensions(log, c, apiKey, model, apiURL, costPerToken, 0) +} + +// NewEmbeddingServiceWithDimensions creates a new EmbeddingService that requests +// a fixed output dimensionality from the embedding API when dimensions > 0. +func NewEmbeddingServiceWithDimensions( + log logrus.FieldLogger, + c cache.Cache, + apiKey, model, apiURL string, + costPerToken float64, + dimensions int, ) *EmbeddingService { svcLog := log.WithField("component", "embedding-service") normalizedURL := strings.TrimRight(apiURL, "/") @@ -103,6 +135,7 @@ func NewEmbeddingService( apiURL: normalizedURL, client: httpClient, costPerToken: costPerToken, + dimensions: dimensions, } } @@ -111,6 +144,24 @@ func (s *EmbeddingService) Model() string { return s.model } +// Dimensions returns the configured output dimensionality, or 0 for native. +func (s *EmbeddingService) Dimensions() int { + return s.dimensions +} + +// cacheKeyPrefix returns the cache-key namespace for this service's vectors. +// It folds the requested dimensionality into the key when set (> 0), so two +// services on the same model id but different output dimensions never collide. +// Native-dimension services (dimensions == 0, i.e. v1) keep the legacy +// {model}: prefix unchanged, so existing cached vectors stay valid. +func (s *EmbeddingService) cacheKeyPrefix() string { + if s.dimensions > 0 { + return s.model + ":" + strconv.Itoa(s.dimensions) + ":" + } + + return s.model + ":" +} + // Embed computes embeddings for the given items, using the cache where possible. // Uncached items are sent to the upstream API in sub-batches of maxEmbedBatchSize. func (s *EmbeddingService) Embed(ctx context.Context, items []EmbedItem) (*EmbedResponse, error) { @@ -124,11 +175,11 @@ func (s *EmbeddingService) Embed(ctx context.Context, items []EmbedItem) (*Embed s.log.WithField("items", len(items)).Info("Embed request received") - // Build cache keys: {model}:{hash}. + // Build cache keys: {model}:{hash} (or {model}:{dims}:{hash} when dims set). cacheKeys := make([]string, len(items)) for i, item := range items { - cacheKeys[i] = s.model + ":" + item.Hash + cacheKeys[i] = s.cacheKeyPrefix() + item.Hash } // Check cache for existing vectors. @@ -266,7 +317,7 @@ func (s *EmbeddingService) CheckCached(ctx context.Context, hashes []string) ([] cacheKeys := make([]string, len(hashes)) for i, h := range hashes { - cacheKeys[i] = s.model + ":" + h + cacheKeys[i] = s.cacheKeyPrefix() + h } cached, err := s.cache.GetMulti(ctx, cacheKeys) @@ -312,6 +363,9 @@ func (s *EmbeddingService) Close() error { type openRouterRequest struct { Model string `json:"model"` Input []string `json:"input"` + // Dimensions requests a fixed output size (Matryoshka). Omitted when 0 so + // callers using native dimensionality send a byte-identical request. + Dimensions int `json:"dimensions,omitempty"` } // openRouterResponse is the response body from the OpenRouter embeddings API. @@ -334,8 +388,9 @@ type openRouterEmbedding struct { func (s *EmbeddingService) callEmbeddingAPI(ctx context.Context, texts []string) ([][]float32, *openRouterUsage, error) { reqBody := openRouterRequest{ - Model: s.model, - Input: texts, + Model: s.model, + Input: texts, + Dimensions: s.dimensions, } body, err := json.Marshal(reqBody) diff --git a/pkg/proxy/embedding_v2_test.go b/pkg/proxy/embedding_v2_test.go new file mode 100644 index 0000000..73923c8 --- /dev/null +++ b/pkg/proxy/embedding_v2_test.go @@ -0,0 +1,139 @@ +package proxy + +import ( + "context" + "encoding/json" + "math" + "net/http" + "net/http/httptest" + "sync" + "testing" + + "github.com/sirupsen/logrus" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + + "github.com/ethpandaops/panda/pkg/cache" +) + +// newRecordingOpenRouterServer mimics the OpenRouter /v1/embeddings endpoint and +// records the dimensions field from the most recent request, so tests can assert +// what the embedding service sent upstream. +func newRecordingOpenRouterServer(t *testing.T, lastDims *int, mu *sync.Mutex) *httptest.Server { + t.Helper() + + srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + require.Equal(t, "/v1/embeddings", r.URL.Path) + + var req openRouterRequest + require.NoError(t, json.NewDecoder(r.Body).Decode(&req)) + + mu.Lock() + *lastDims = req.Dimensions + mu.Unlock() + + data := make([]openRouterEmbedding, 0, len(req.Input)) + for i := range req.Input { + data = append(data, openRouterEmbedding{Index: i, Embedding: []float32{0.3, 0.4, 0.5}}) + } + + w.Header().Set("Content-Type", "application/json") + require.NoError(t, json.NewEncoder(w).Encode(openRouterResponse{Data: data})) + })) + t.Cleanup(srv.Close) + + return srv +} + +func TestEmbeddingServiceV2_SendsDimensions(t *testing.T) { + t.Parallel() + + var ( + lastDims int + mu sync.Mutex + ) + + mockAPI := newRecordingOpenRouterServer(t, &lastDims, &mu) + + svc := NewEmbeddingServiceWithDimensions( + logrus.New(), cache.NewInMemory(0), + "test-api-key", "google/gemini-embedding-2", mockAPI.URL+"/v1", 0.01, 1536, + ) + + assert.Equal(t, "google/gemini-embedding-2", svc.Model()) + assert.Equal(t, 1536, svc.Dimensions()) + + resp, err := svc.Embed(context.Background(), []EmbedItem{{Hash: "aaa", Text: "hello"}}) + require.NoError(t, err) + require.Len(t, resp.Results, 1) + assert.Equal(t, "google/gemini-embedding-2", resp.Model) + + mu.Lock() + assert.Equal(t, 1536, lastDims, "v2 service must request dimensions=1536 upstream") + mu.Unlock() + + // Vector is L2-normalized. + var norm float64 + for _, v := range resp.Results[0].Vector { + norm += float64(v) * float64(v) + } + + assert.InDelta(t, 1.0, math.Sqrt(norm), 1e-6) +} + +func TestEmbeddingServiceV1_OmitsDimensions(t *testing.T) { + t.Parallel() + + var ( + lastDims = -1 + mu sync.Mutex + ) + + mockAPI := newRecordingOpenRouterServer(t, &lastDims, &mu) + + // v1 constructor → dimensions must stay 0 and be omitted from the request. + svc := NewEmbeddingService( + logrus.New(), cache.NewInMemory(0), + "test-api-key", "openai/text-embedding-3-small", mockAPI.URL+"/v1", 0.01, + ) + + assert.Equal(t, 0, svc.Dimensions()) + + _, err := svc.Embed(context.Background(), []EmbedItem{{Hash: "aaa", Text: "hello"}}) + require.NoError(t, err) + + mu.Lock() + assert.Equal(t, 0, lastDims, "v1 request must omit dimensions (decodes to 0)") + mu.Unlock() +} + +func TestEmbeddingServiceV2_CheckCached(t *testing.T) { + t.Parallel() + + var ( + lastDims int + mu sync.Mutex + ) + + mockAPI := newRecordingOpenRouterServer(t, &lastDims, &mu) + memCache := cache.NewInMemory(0) + + svc := NewEmbeddingServiceWithDimensions( + logrus.New(), memCache, + "test-api-key", "google/gemini-embedding-2", mockAPI.URL+"/v1", 0.01, 1536, + ) + + // Nothing cached yet. + results, err := svc.CheckCached(context.Background(), []string{"aaa"}) + require.NoError(t, err) + assert.Empty(t, results) + + // Embed populates the cache, then the same hash is a hit. + _, err = svc.Embed(context.Background(), []EmbedItem{{Hash: "aaa", Text: "hello"}}) + require.NoError(t, err) + + results, err = svc.CheckCached(context.Background(), []string{"aaa"}) + require.NoError(t, err) + require.Len(t, results, 1) + assert.Equal(t, "aaa", results[0].Hash) +} diff --git a/pkg/proxy/server.go b/pkg/proxy/server.go index bea0db4..31c668a 100644 --- a/pkg/proxy/server.go +++ b/pkg/proxy/server.go @@ -72,6 +72,7 @@ type server struct { ethNodeHandler *handlers.EthNodeHandler benchmarkoorHandler *handlers.BenchmarkoorHandler embeddingService *EmbeddingService + embeddingServiceV2 *EmbeddingService githubHandler *handlers.GitHubHandler autodiscoverHTTPClient *http.Client @@ -211,6 +212,24 @@ func newServer(log logrus.FieldLogger, cfg ServerConfig, hostURL, port string) ( ) } + // Create v2 embedding service if configured (independent model + dimensions). + if cfg.EmbeddingV2 != nil { + embCacheV2, err := buildEmbeddingCache(cfg.EmbeddingV2.Cache) + if err != nil { + return nil, fmt.Errorf("creating v2 embedding cache: %w", err) + } + + s.embeddingServiceV2 = NewEmbeddingServiceWithDimensions( + log, + embCacheV2, + cfg.EmbeddingV2.APIKey, + cfg.EmbeddingV2.Model, + cfg.EmbeddingV2.APIURL, + 0, + cfg.EmbeddingV2.Dimensions, + ) + } + // Create GitHub handler if configured. if cfg.GitHub != nil && cfg.GitHub.Token != "" { s.githubTriggerLimiter = NewRateLimiter(log, RateLimiterConfig{ @@ -277,6 +296,11 @@ func (s *server) registerRoutes() { s.mux.Method(http.MethodPost, "/embed/check", s.metricsMiddleware(chain(http.HandlerFunc(s.handleEmbedCheck)))) } + if s.embeddingServiceV2 != nil { + s.mux.Method(http.MethodPost, "/v2/embedding", s.metricsMiddleware(chain(http.HandlerFunc(s.handleEmbedV2)))) + s.mux.Method(http.MethodPost, "/v2/embedding/check", s.metricsMiddleware(chain(http.HandlerFunc(s.handleEmbedCheckV2)))) + } + // Authenticated routes. if s.clickhouseHandler != nil { s.handleSubtreeRoute("/clickhouse", s.metricsMiddleware(chain(s.clickhouseHandler))) @@ -515,6 +539,77 @@ func (s *server) handleEmbedCheck(w http.ResponseWriter, r *http.Request) { } } +// handleEmbedV2 embeds items via the v2 embedding service. The response +// advertises the model and dimensions; vectors are fp32. The model behind this +// route is config-swappable, and the advertised model lets clients detect a +// change and re-index. +func (s *server) handleEmbedV2(w http.ResponseWriter, r *http.Request) { + var req EmbedRequest + if err := json.NewDecoder(r.Body).Decode(&req); err != nil { + http.Error(w, fmt.Sprintf("invalid request body: %v", err), http.StatusBadRequest) + + return + } + + if len(req.Items) > maxEmbedItems { + http.Error(w, fmt.Sprintf("too many items: %d exceeds maximum of %d", len(req.Items), maxEmbedItems), http.StatusBadRequest) + + return + } + + resp, err := s.embeddingServiceV2.Embed(r.Context(), req.Items) + if err != nil { + s.log.WithError(err).Error("V2 embedding request failed") + http.Error(w, fmt.Sprintf("embedding failed: %v", err), http.StatusInternalServerError) + + return + } + + w.Header().Set("Content-Type", "application/json") + w.WriteHeader(http.StatusOK) + + out := EmbedV2Response{ + Model: resp.Model, + Dimensions: s.embeddingServiceV2.Dimensions(), + Results: resp.Results, + } + + if err := json.NewEncoder(w).Encode(out); err != nil { + s.log.WithError(err).Error("Failed to encode v2 embedding response") + } +} + +// handleEmbedCheckV2 returns cached v2 vectors for the given hashes without +// embedding new content. The response advertises the model. +func (s *server) handleEmbedCheckV2(w http.ResponseWriter, r *http.Request) { + var req EmbedCheckRequest + if err := json.NewDecoder(r.Body).Decode(&req); err != nil { + http.Error(w, fmt.Sprintf("invalid request body: %v", err), http.StatusBadRequest) + + return + } + + results, err := s.embeddingServiceV2.CheckCached(r.Context(), req.Hashes) + if err != nil { + s.log.WithError(err).Error("V2 embed check failed") + http.Error(w, fmt.Sprintf("embed check failed: %v", err), http.StatusInternalServerError) + + return + } + + w.Header().Set("Content-Type", "application/json") + w.WriteHeader(http.StatusOK) + + out := EmbedV2CheckResponse{ + Model: s.embeddingServiceV2.Model(), + Cached: results, + } + + if err := json.NewEncoder(w).Encode(out); err != nil { + s.log.WithError(err).Error("Failed to encode v2 embed check response") + } +} + // handleBranding returns the success_page config as JSON, or 204 if not configured. func (s *server) handleBranding(w http.ResponseWriter, _ *http.Request) { if s.cfg.Auth.SuccessPage == nil { @@ -647,6 +742,13 @@ func (s *server) Stop(ctx context.Context) error { } } + // Close v2 embedding service. + if s.embeddingServiceV2 != nil { + if err := s.embeddingServiceV2.Close(); err != nil { + s.log.WithError(err).Warn("Error closing v2 embedding service") + } + } + // Shutdown HTTP server. if s.httpSrv != nil { if err := s.httpSrv.Shutdown(ctx); err != nil { @@ -913,18 +1015,27 @@ func (s *server) EthNodeDatasourceInfo() []types.DatasourceInfo { return ethNodeDatasourceInfo(s.EthNodeAvailable()) } -// EmbeddingAvailable returns true if the embedding service is configured. +// EmbeddingAvailable returns true if either the v1 or the v2 embedding service +// is configured. A proxy that only configures embedding_v2 still offers +// embedding (via /v2/embedding), and the search runtime discovers v2 by probe. func (s *server) EmbeddingAvailable() bool { - return s.embeddingService != nil + return s.embeddingService != nil || s.embeddingServiceV2 != nil } -// EmbeddingModel returns the configured embedding model name. +// EmbeddingModel returns the advertised embedding model name. It prefers the v1 +// model — what /embed serves and what unspecified/legacy clients get — and falls +// through to the v2 model so a v2-only proxy still advertises a non-empty model. +// v2-capable clients discover the v2 model from the /v2/embedding response. func (s *server) EmbeddingModel() string { - if s.embeddingService == nil { - return "" + if s.embeddingService != nil { + return s.embeddingService.Model() + } + + if s.embeddingServiceV2 != nil { + return s.embeddingServiceV2.Model() } - return s.embeddingService.Model() + return "" } func advertisedURLs(listenAddr string) (string, string) { diff --git a/pkg/proxy/server_config.go b/pkg/proxy/server_config.go index ef2b485..c608148 100644 --- a/pkg/proxy/server_config.go +++ b/pkg/proxy/server_config.go @@ -48,9 +48,14 @@ type ServerConfig struct { // Metrics holds Prometheus metrics configuration. Metrics MetricsConfig `yaml:"metrics"` - // Embedding holds optional embedding API configuration. + // Embedding holds optional embedding API configuration (v1 route: /embed). Embedding *EmbeddingConfig `yaml:"embedding,omitempty"` + // EmbeddingV2 holds optional configuration for the v2 embedding route + // (/v2/embedding). The route's contract is fixed (fp32, model advertised in + // the response); the model itself is swappable here without touching v1. + EmbeddingV2 *EmbeddingConfig `yaml:"embedding_v2,omitempty"` + // GitHub holds optional GitHub API configuration for triggering workflows. GitHub *GitHubAPIConfig `yaml:"github,omitempty"` } @@ -309,6 +314,10 @@ type EmbeddingConfig struct { // APIURL is the base URL of the embedding API (default: "https://openrouter.ai/api/v1"). APIURL string `yaml:"api_url,omitempty"` + // Dimensions, when > 0, requests a fixed output dimensionality (Matryoshka) + // from the embedding API. Used by the v2 embedding route; ignored by v1. + Dimensions int `yaml:"dimensions,omitempty"` + // Cache holds embedding cache configuration. Cache EmbeddingCacheConfig `yaml:"cache"` } @@ -407,6 +416,25 @@ func (c *ServerConfig) ApplyDefaults() { } } + // Embedding v2 defaults. + if c.EmbeddingV2 != nil { + if c.EmbeddingV2.Model == "" { + c.EmbeddingV2.Model = "google/gemini-embedding-2" + } + + if c.EmbeddingV2.APIURL == "" { + c.EmbeddingV2.APIURL = "https://openrouter.ai/api/v1" + } + + if c.EmbeddingV2.Dimensions == 0 { + c.EmbeddingV2.Dimensions = 1536 + } + + if c.EmbeddingV2.Cache.Backend == "" { + c.EmbeddingV2.Cache.Backend = "memory" + } + } + // ClickHouse defaults. for i := range c.ClickHouse { if len(c.ClickHouse[i].Variants) > 0 { @@ -480,6 +508,17 @@ func (c *ServerConfig) Validate() error { } } + // Validate embedding v2 config. + if c.EmbeddingV2 != nil { + if c.EmbeddingV2.APIKey == "" { + return fmt.Errorf("embedding_v2.api_key is required when embedding_v2 is configured") + } + + if c.EmbeddingV2.Cache.Backend == "redis" && c.EmbeddingV2.Cache.RedisURL == "" { + return fmt.Errorf("embedding_v2.cache.redis_url is required when cache backend is 'redis'") + } + } + // Validate at least one datasource is configured. if len(c.ClickHouse) == 0 && len(c.Prometheus) == 0 && len(c.Loki) == 0 && c.EthNode == nil && len(c.Benchmarkoor) == 0 { return fmt.Errorf("at least one datasource (clickhouse, prometheus, loki, ethnode, or benchmarkoor) must be configured") diff --git a/pkg/searchruntime/runtime.go b/pkg/searchruntime/runtime.go index 1b0164d..c602450 100644 --- a/pkg/searchruntime/runtime.go +++ b/pkg/searchruntime/runtime.go @@ -75,9 +75,22 @@ func Build( return runtime, nil } + // Prefer the versioned /v2/embedding route (fp32 @ a fixed dimensionality, + // model advertised per response). Fall back to the legacy /embed routes when + // the proxy does not expose v2, so a new server still works against an older + // or self-hosted proxy. The model is whatever the chosen route serves: v2's + // advertised model, or v1's configured model. + tokenFn := func() string { return proxyService.RegisterToken() } + model := proxyService.EmbeddingModel() + useV2 := false + + if v2Model, ok := embedding.ProbeV2(ctx, proxyService.URL(), tokenFn); ok && v2Model != "" { + model = v2Model + useV2 = true + } - log.WithField("model", model). + log.WithFields(logrus.Fields{"model": model, "v2": useV2}). Info("Using remote embedder via proxy") var localCache cache.Cache @@ -93,13 +106,14 @@ func Build( } } - embedder := embedding.NewRemote( + embedder := embedding.NewRemoteWithEndpoint( log, proxyService.URL(), - func() string { return proxyService.RegisterToken() }, + tokenFn, proxyService.Invalidate, localCache, model, + useV2, ) runtime.embedder = embedder