diff --git a/modules/clickhouse/schema_routing_test.go b/modules/clickhouse/schema_routing_test.go index 9c28549..97ae23a 100644 --- a/modules/clickhouse/schema_routing_test.go +++ b/modules/clickhouse/schema_routing_test.go @@ -100,6 +100,7 @@ func (c *schemaProxyClient) Start(_ context.Context) error { return nil } func (c *schemaProxyClient) Stop(_ context.Context) error { return nil } func (c *schemaProxyClient) URL() string { return c.url } func (c *schemaProxyClient) RegisterToken() string { return c.token } +func (c *schemaProxyClient) Ready() bool { return true } func (c *schemaProxyClient) Invalidate() {} func (c *schemaProxyClient) RevokeToken() {} func (c *schemaProxyClient) ClickHouseDatasources() []string { diff --git a/pkg/app/refresh_test.go b/pkg/app/refresh_test.go index 631d2fa..33ff027 100644 --- a/pkg/app/refresh_test.go +++ b/pkg/app/refresh_test.go @@ -325,6 +325,7 @@ func (f *fakeProxyClient) Start(_ context.Context) error { return func (f *fakeProxyClient) Stop(_ context.Context) error { return nil } func (f *fakeProxyClient) URL() string { return "" } func (f *fakeProxyClient) RegisterToken() string { return "" } +func (f *fakeProxyClient) Ready() bool { return true } func (f *fakeProxyClient) Invalidate() {} func (f *fakeProxyClient) RevokeToken() {} func (f *fakeProxyClient) Discover(_ context.Context) error { return nil } diff --git a/pkg/cli/server.go b/pkg/cli/server.go index eb7df6a..4956d0e 100644 --- a/pkg/cli/server.go +++ b/pkg/cli/server.go @@ -211,7 +211,44 @@ func checkServerHealth(ctx context.Context) error { return nil } -func waitForServerHealth(ctx context.Context, timeout time.Duration) error { +// checkServerReady probes the server's /ready endpoint, which reports 200 only +// once the proxy layer has completed initial datasource discovery (or is +// blocked on user auth). /health returns 200 as soon as the HTTP listener is +// up, so readiness — not liveness — is what 'panda init' and 'panda server +// start' should wait on before claiming the server is usable. +func checkServerReady(ctx context.Context) error { + if ctx == nil { + ctx = context.Background() + } + + cfg, err := config.LoadClient(cfgFile) + if err != nil { + return &serverHealthConfigError{err: err} + } + + readyURL := strings.TrimRight(cfg.ServerURL(), "/") + "/ready" + + req, err := http.NewRequestWithContext(ctx, http.MethodGet, readyURL, nil) + if err != nil { + return &serverHealthRequestError{err: err} + } + + client := &http.Client{Timeout: 5 * time.Second} + + resp, err := client.Do(req) + if err != nil { + return &serverHealthRequestError{err: err} + } + defer func() { _ = resp.Body.Close() }() + + if resp.StatusCode != http.StatusOK { + return &serverHealthStatusError{statusCode: resp.StatusCode} + } + + return nil +} + +func waitForServerReady(ctx context.Context, timeout time.Duration) error { if ctx == nil { ctx = context.Background() } @@ -228,14 +265,14 @@ func waitForServerHealth(ctx context.Context, timeout time.Duration) error { var lastErr error for { - err := checkServerHealth(waitCtx) + err := checkServerReady(waitCtx) if err == nil { return nil } var configErr *serverHealthConfigError if errors.As(err, &configErr) { - return fmt.Errorf("cannot check server health: %w", err) + return fmt.Errorf("cannot check server readiness: %w", err) } lastErr = err @@ -244,7 +281,7 @@ func waitForServerHealth(ctx context.Context, timeout time.Duration) error { if !now.Before(nextProgressAt) { remaining := max(time.Until(deadline).Round(time.Second), 0) - fmt.Printf("Still waiting for server to become healthy... (%s remaining)\n", remaining) + fmt.Printf("Still waiting for server to become ready... (%s remaining)\n", remaining) nextProgressAt = now.Add(serverHealthProgressInterval) } @@ -260,13 +297,13 @@ func waitForServerHealth(ctx context.Context, timeout time.Duration) error { if errors.Is(waitCtx.Err(), context.DeadlineExceeded) { return fmt.Errorf( - "server did not become healthy within %s (last check: %v). Check logs with 'panda server logs'", + "server did not become ready within %s (last check: %v). Check logs with 'panda server logs'", timeout, lastErr, ) } - return fmt.Errorf("server health wait canceled: %w", waitCtx.Err()) + return fmt.Errorf("server readiness wait canceled: %w", waitCtx.Err()) case <-timer.C: } } @@ -277,9 +314,9 @@ func runComposeAndWait(ctx context.Context, composeFile string, composeArgs []st return err } - fmt.Println("Waiting for server to become healthy...") + fmt.Println("Waiting for server to become ready...") - if err := waitForServerHealth(ctx, timeout); err != nil { + if err := waitForServerReady(ctx, timeout); err != nil { return err } diff --git a/pkg/cli/server_test.go b/pkg/cli/server_test.go index d0dfaf4..7166c1f 100644 --- a/pkg/cli/server_test.go +++ b/pkg/cli/server_test.go @@ -72,17 +72,17 @@ func TestComposeOverrideFile(t *testing.T) { }) } -func TestWaitForServerHealthSucceedsAfterTemporaryFailure(t *testing.T) { +func TestWaitForServerReadySucceedsAfterTemporaryFailure(t *testing.T) { var attempts atomic.Int32 server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { - if r.URL.Path != "/health" { + if r.URL.Path != "/ready" { http.NotFound(w, r) return } if attempts.Add(1) == 1 { - http.Error(w, "starting", http.StatusServiceUnavailable) + http.Error(w, "not ready", http.StatusServiceUnavailable) return } @@ -93,20 +93,20 @@ func TestWaitForServerHealthSucceedsAfterTemporaryFailure(t *testing.T) { setClientConfig(t, server.URL) setServerHealthWaitIntervals(t, 5*time.Millisecond, time.Hour) - err := waitForServerHealth(context.Background(), 200*time.Millisecond) + err := waitForServerReady(context.Background(), 200*time.Millisecond) require.NoError(t, err) assert.GreaterOrEqual(t, attempts.Load(), int32(2)) } -func TestWaitForServerHealthTimesOutWithLogsHint(t *testing.T) { +func TestWaitForServerReadyTimesOutWithLogsHint(t *testing.T) { server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { - if r.URL.Path != "/health" { + if r.URL.Path != "/ready" { http.NotFound(w, r) return } - http.Error(w, "starting", http.StatusServiceUnavailable) + http.Error(w, "not ready", http.StatusServiceUnavailable) })) defer server.Close() @@ -115,18 +115,19 @@ func TestWaitForServerHealthTimesOutWithLogsHint(t *testing.T) { var err error output := captureStdout(t, func() { - err = waitForServerHealth(context.Background(), 200*time.Millisecond) + err = waitForServerReady(context.Background(), 200*time.Millisecond) }) require.Error(t, err) - assert.Contains(t, err.Error(), "server did not become healthy within") + assert.Contains(t, err.Error(), "server did not become ready within") assert.Contains(t, err.Error(), "panda server logs") - assert.Contains(t, output, "Still waiting for server to become healthy...") + assert.Contains(t, output, "Still waiting for server to become ready...") } func TestRunServerRestartLogsAndWaitsForHealth(t *testing.T) { server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { - if r.URL.Path != "/health" { + // /health gates the restart/liveness precheck; /ready gates the wait. + if r.URL.Path != "/health" && r.URL.Path != "/ready" { http.NotFound(w, r) return } @@ -170,14 +171,15 @@ func TestRunServerRestartLogsAndWaitsForHealth(t *testing.T) { assert.Equal(t, []string{"restart"}, runnerArgs) assertContainsInOrder(t, output, "Restarting server...", - "Waiting for server to become healthy...", + "Waiting for server to become ready...", "Server ready.", ) } func TestRunServerStartLogsAndWaitsForHealth(t *testing.T) { server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { - if r.URL.Path != "/health" { + // /health gates the restart/liveness precheck; /ready gates the wait. + if r.URL.Path != "/health" && r.URL.Path != "/ready" { http.NotFound(w, r) return } @@ -221,7 +223,7 @@ func TestRunServerStartLogsAndWaitsForHealth(t *testing.T) { assert.Equal(t, []string{"up", "-d", "--force-recreate"}, runnerArgs) assertContainsInOrder(t, output, "Starting server...", - "Waiting for server to become healthy...", + "Waiting for server to become ready...", "Server ready.", ) } diff --git a/pkg/embedding/remote.go b/pkg/embedding/remote.go index b24c1be..9321142 100644 --- a/pkg/embedding/remote.go +++ b/pkg/embedding/remote.go @@ -66,6 +66,20 @@ type RemoteEmbedder struct { invalidateFn func() localCache cache.Cache model string + progressFn func(completed, total int) +} + +// OnProgress registers a callback invoked during EmbedBatch with the number of +// documents embedded so far and the total in the batch. It enables +// document-level progress reporting for index builds. +func (e *RemoteEmbedder) OnProgress(fn func(completed, total int)) { + e.progressFn = fn +} + +func (e *RemoteEmbedder) reportProgress(completed, total int) { + if e.progressFn != nil { + e.progressFn(completed, total) + } } // Compile-time interface check. @@ -199,6 +213,8 @@ func (e *RemoteEmbedder) EmbedBatch(texts []string) ([][]float32, error) { } } + e.reportProgress(localHits, len(texts)) + // Collect indices that still need embedding. var remoteIndices []int @@ -260,28 +276,29 @@ func (e *RemoteEmbedder) EmbedBatch(texts []string) ([][]float32, error) { } } - if len(missItems) == 0 { - continue - } - - e.log.WithFields(logrus.Fields{ - "total": len(batchIndices), - "cached": len(batchIndices) - len(missItems), - "misses": len(missItems), - }).Info("Proxy cache stats") + if len(missItems) > 0 { + e.log.WithFields(logrus.Fields{ + "total": len(batchIndices), + "cached": len(batchIndices) - len(missItems), + "misses": len(missItems), + }).Info("Proxy cache stats") + + resp, err := e.callEmbed(missItems) + if err != nil { + return nil, fmt.Errorf("embedding batch %d/%d: %w", batchNum, totalBatches, err) + } - resp, err := e.callEmbed(missItems) - if err != nil { - return nil, fmt.Errorf("embedding batch %d/%d: %w", batchNum, totalBatches, err) - } + for _, result := range resp.Results { + for _, idx := range hashToIndices[result.Hash] { + vectors[idx] = result.Vector + } - for _, result := range resp.Results { - for _, idx := range hashToIndices[result.Hash] { - vectors[idx] = result.Vector + e.queueLocalCache(toCache, result.Hash, result.Vector) } - - e.queueLocalCache(toCache, result.Hash, result.Vector) } + + // batchEnd is the cumulative count of remote items processed so far. + e.reportProgress(localHits+batchEnd, len(texts)) } // Phase 3: persist newly fetched vectors to local cache. diff --git a/pkg/embedding/remote_test.go b/pkg/embedding/remote_test.go index 877b632..d4912c0 100644 --- a/pkg/embedding/remote_test.go +++ b/pkg/embedding/remote_test.go @@ -123,6 +123,46 @@ func TestRemoteEmbedder_EmbedBatch_AllMisses(t *testing.T) { assert.Equal(t, fakeVectors["gamma"], vectors[2]) } +func TestRemoteEmbedder_EmbedBatch_ReportsProgress(t *testing.T) { + t.Parallel() + + texts := []string{"alpha", "beta", "gamma"} + + srv := newMockProxy(t, + func(w http.ResponseWriter, r *http.Request) { + var req embedRequest + require.NoError(t, json.NewDecoder(r.Body).Decode(&req)) + + results := make([]embedResult, 0, len(req.Items)) + for _, item := range req.Items { + results = append(results, embedResult{Hash: item.Hash, Vector: []float32{1, 0, 0}}) + } + + w.Header().Set("Content-Type", "application/json") + require.NoError(t, json.NewEncoder(w).Encode(embedResponse{Model: "test-model", Results: results})) + }, + func(w http.ResponseWriter, _ *http.Request) { + w.Header().Set("Content-Type", "application/json") + _ = json.NewEncoder(w).Encode(embedCheckResponse{}) + }, + ) + + embedder := NewRemote(logrus.New(), srv.URL, func() string { return "" }, nil, nil, "") + + var calls, lastDone, lastTotal int + embedder.OnProgress(func(completed, total int) { + calls++ + lastDone, lastTotal = completed, total + }) + + _, err := embedder.EmbedBatch(texts) + require.NoError(t, err) + + assert.Positive(t, calls, "OnProgress should be invoked during a batch embed") + assert.Equal(t, 3, lastDone, "final progress should report every document done") + assert.Equal(t, 3, lastTotal) +} + func TestRemoteEmbedder_EmbedBatch_AllCached(t *testing.T) { t.Parallel() diff --git a/pkg/proxy/client.go b/pkg/proxy/client.go index 636f749..9719b9d 100644 --- a/pkg/proxy/client.go +++ b/pkg/proxy/client.go @@ -10,6 +10,7 @@ import ( "net/url" "strings" "sync" + "sync/atomic" "time" "github.com/sirupsen/logrus" @@ -114,6 +115,17 @@ type proxyClient struct { datasources *DatasourcesResponse stopCh chan struct{} stopped bool + + // discovered reports whether at least one datasource discovery has + // succeeded. It gates server readiness: until the first successful + // discovery the server has no datasources to serve. + discovered atomic.Bool + + // authPending reports whether discovery is blocked waiting for the user to + // authenticate. In that state the server is up and cannot do better until + // the user runs `panda auth login`, so it counts as ready — otherwise a + // yet-to-auth user would wedge readiness at 503 forever. + authPending atomic.Bool } // AuthModeClientCredentials is the ClientConfig.AuthMode value for the @@ -492,12 +504,30 @@ func (c *proxyClient) Discover(ctx context.Context) error { c.log.WithError(err).Debug("Proxy rejected token; invalidating and retrying") c.tokenSource.Invalidate() - return c.discoverOnce(ctx) + err = c.discoverOnce(ctx) + } + + switch { + case err == nil: + c.discovered.Store(true) + c.authPending.Store(false) + case errors.Is(err, ErrAuthenticationRequired): + // The server is up but cannot discover until the user authenticates. + // Treat this as ready so the init/start wait does not block on a state + // only the user can resolve. + c.authPending.Store(true) } return err } +// Ready reports whether the client has completed at least one successful +// datasource discovery, or is blocked waiting for the user to authenticate +// (in which case the server is up and can do no better unattended). +func (c *proxyClient) Ready() bool { + return c.discovered.Load() || c.authPending.Load() +} + // discoverOnce performs a single /datasources fetch. func (c *proxyClient) discoverOnce(ctx context.Context) error { url := fmt.Sprintf("%s/datasources", c.cfg.URL) diff --git a/pkg/proxy/client_credentials_test.go b/pkg/proxy/client_credentials_test.go index 2386350..5847ce7 100644 --- a/pkg/proxy/client_credentials_test.go +++ b/pkg/proxy/client_credentials_test.go @@ -240,3 +240,56 @@ func TestClickHouseQueryRetriesOn401(t *testing.T) { t.Fatalf("expected 2 mints (initial + re-mint on 401), got %d", got) } } + +func TestReadyFlipsAfterSuccessfulDiscovery(t *testing.T) { + t.Parallel() + + srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + if r.URL.Path != "/datasources" { + http.NotFound(w, r) + return + } + + _ = json.NewEncoder(w).Encode(map[string]any{"clickhouse": []string{"xatu"}}) + })) + t.Cleanup(srv.Close) + + // No auth configured (local-proxy style): discovery should just succeed. + client := NewClient(logrus.New(), ClientConfig{URL: srv.URL}).(*proxyClient) + + if client.Ready() { + t.Fatal("Ready() should be false before the first discovery") + } + + if err := client.Discover(context.Background()); err != nil { + t.Fatalf("Discover error = %v", err) + } + + if !client.Ready() { + t.Fatal("Ready() should be true after a successful discovery") + } +} + +func TestReadyTrueWhenBlockedOnAuth(t *testing.T) { + t.Parallel() + + // The proxy rejects every token, so discovery can never succeed without the + // user (re)authenticating. The server is up and can do no better unattended, + // so it must report ready rather than wedge the init/start wait at 503. + issuer := newFakeIssuer(t, 3600) + proxy := newFakeProxy(t, func(string) bool { return false }) + + client := newClientCredentialsClient(issuer.server.URL, proxy.URL) + + if client.Ready() { + t.Fatal("Ready() should be false before any discovery attempt") + } + + if err := client.Discover(context.Background()); err == nil { + t.Fatal("expected discovery to fail with authentication required") + } + + if !client.Ready() { + t.Fatal("Ready() should be true when discovery is blocked waiting for auth") + } +} diff --git a/pkg/proxy/proxy.go b/pkg/proxy/proxy.go index 4877a87..000466c 100644 --- a/pkg/proxy/proxy.go +++ b/pkg/proxy/proxy.go @@ -26,6 +26,11 @@ type Service interface { // URL returns the proxy URL. URL() string + // Ready reports whether the proxy layer has completed at least one + // successful datasource discovery. Until then there are no datasources to + // serve, so server readiness should be gated on it. + Ready() bool + // RegisterToken returns the current access token for server-to-proxy // requests, or NoAuthToken when no bearer token is required. RegisterToken() string diff --git a/pkg/proxy/router.go b/pkg/proxy/router.go index d845da9..e9a0926 100644 --- a/pkg/proxy/router.go +++ b/pkg/proxy/router.go @@ -171,6 +171,24 @@ func (r *routerClient) RegisterToken() string { return primary.RegisterToken() } +// Ready reports readiness across the wrapped clients. When an external +// (primary) proxy is configured, readiness tracks that proxy, since it owns the +// datasources users expect. With only local proxies, readiness requires every +// local client to have discovered at least once. +func (r *routerClient) Ready() bool { + if r.primary != nil { + return r.primary.client.Ready() + } + + for i := range r.routes { + if !r.routes[i].client.Ready() { + return false + } + } + + return true +} + // Invalidate drops the primary proxy's cached token for primary-only requests. func (r *routerClient) Invalidate() { primary := r.Primary() diff --git a/pkg/proxy/router_test.go b/pkg/proxy/router_test.go index 66d468c..004206d 100644 --- a/pkg/proxy/router_test.go +++ b/pkg/proxy/router_test.go @@ -183,6 +183,55 @@ func TestRouterWithOnlyLocalProxyHasNoPrimary(t *testing.T) { } } +func TestRouterReady(t *testing.T) { + t.Parallel() + + log := logrus.New() + log.SetOutput(io.Discard) + + t.Run("tracks the external primary and ignores locals", func(t *testing.T) { + t.Parallel() + + hosted := &fakeRouterClient{url: "https://hosted.example"} + local := &fakeRouterClient{url: "http://local.example", ready: true} + + router := NewRouter(log, []ClientRoute{ + {Name: "hosted", Client: hosted}, + {Name: "local", Client: local, Local: true}, + }) + + if router.Ready() { + t.Fatal("Ready() = true, want false while the primary has not discovered") + } + + hosted.ready = true + if !router.Ready() { + t.Fatal("Ready() = false, want true once the primary has discovered") + } + }) + + t.Run("local-only requires every local to be ready", func(t *testing.T) { + t.Parallel() + + first := &fakeRouterClient{url: "http://a.example", ready: true} + second := &fakeRouterClient{url: "http://b.example"} + + router := NewRouter(log, []ClientRoute{ + {Name: "a", Client: first, Local: true}, + {Name: "b", Client: second, Local: true}, + }) + + if router.Ready() { + t.Fatal("Ready() = true, want false while a local proxy has not discovered") + } + + second.ready = true + if !router.Ready() { + t.Fatal("Ready() = false, want true once all local proxies have discovered") + } + }) +} + func TestRouterStartsStopsAndDiscoversAllClients(t *testing.T) { t.Parallel() @@ -273,6 +322,7 @@ type fakeRouterClient struct { ethnode bool embedding bool model string + ready bool starts int stops int @@ -296,6 +346,7 @@ func (f *fakeRouterClient) URL() string { return f.url } func (f *fakeRouterClient) RegisterToken() string { return f.token } +func (f *fakeRouterClient) Ready() bool { return f.ready } func (f *fakeRouterClient) Invalidate() {} func (f *fakeRouterClient) RevokeToken() {} diff --git a/pkg/proxy/server.go b/pkg/proxy/server.go index a821c9a..bea0db4 100644 --- a/pkg/proxy/server.go +++ b/pkg/proxy/server.go @@ -679,6 +679,15 @@ func (s *server) RegisterToken() string { return NoAuthToken } +// Ready reports whether the embedded proxy server has finished starting. It +// satisfies the proxy.Service readiness contract for in-process proxies. +func (s *server) Ready() bool { + s.mu.RLock() + defer s.mu.RUnlock() + + return s.started +} + // Invalidate is a no-op: the embedded proxy issues no bearer tokens. func (s *server) Invalidate() { } diff --git a/pkg/searchruntime/runtime.go b/pkg/searchruntime/runtime.go index 6a491f8..1b0164d 100644 --- a/pkg/searchruntime/runtime.go +++ b/pkg/searchruntime/runtime.go @@ -104,6 +104,22 @@ func Build( runtime.embedder = embedder + // Log document-level embedding progress so operators can watch the index + // build advance in `panda server logs`. The embedder reports documents as + // it works, attributed to whichever stage is currently building. + currentStage := "" + setStage := func(stage string) { currentStage = stage } + + embedder.OnProgress(func(completed, total int) { + log.WithFields(logrus.Fields{ + "stage": currentStage, + "embedded": completed, + "total": total, + }).Info("Embedding search index") + }) + + setStage("examples") + examples := resource.GetQueryExamples(moduleRegistry) exampleCount := 0 for _, cat := range examples { @@ -134,6 +150,7 @@ func Build( return runtime, nil } + setStage("runbooks") log.WithField("runbooks", runbookReg.Count()).Info("Building runbook search index") runbookIndex, err := resource.NewRunbookIndex(log, embedder, runbookReg.All()) @@ -179,6 +196,7 @@ func Build( case eipReg.Count() == 0: log.Warn("No EIPs found, EIP search will be disabled") default: + setStage("EIPs") log.WithField("eips", eipReg.Count()).Info("Building EIP search index") eipIndex, indexErr := resource.NewEIPIndex(log, embedder, eipReg.All()) @@ -203,6 +221,8 @@ func Build( "constants": specsReg.ConstantCount(), }).Info("Building consensus specs search index") + setStage("consensus specs") + specsIndex, indexErr := resource.NewConsensusSpecIndex(log, embedder, specsReg.AllSpecs(), specsReg.AllConstants()) if indexErr != nil { log.WithError(indexErr).Warn("Failed to build consensus specs index — specs search disabled") diff --git a/pkg/server/operations_ethnode_test.go b/pkg/server/operations_ethnode_test.go index 348d32d..e52a37a 100644 --- a/pkg/server/operations_ethnode_test.go +++ b/pkg/server/operations_ethnode_test.go @@ -112,6 +112,7 @@ func (p *ethNodeOperationProxy) Start(_ context.Context) error { return nil } func (p *ethNodeOperationProxy) Stop(_ context.Context) error { return nil } func (p *ethNodeOperationProxy) URL() string { return p.url } func (p *ethNodeOperationProxy) RegisterToken() string { return proxy.NoAuthToken } +func (p *ethNodeOperationProxy) Ready() bool { return true } func (p *ethNodeOperationProxy) Invalidate() {} func (p *ethNodeOperationProxy) RevokeToken() {} func (p *ethNodeOperationProxy) ClickHouseDatasources() []string { diff --git a/pkg/server/proxy_routing_test.go b/pkg/server/proxy_routing_test.go index f8b7964..68b8198 100644 --- a/pkg/server/proxy_routing_test.go +++ b/pkg/server/proxy_routing_test.go @@ -250,6 +250,7 @@ func (c *routingProxyClient) Start(_ context.Context) error { return nil } func (c *routingProxyClient) Stop(_ context.Context) error { return nil } func (c *routingProxyClient) URL() string { return c.url } func (c *routingProxyClient) RegisterToken() string { return c.token } +func (c *routingProxyClient) Ready() bool { return true } func (c *routingProxyClient) Invalidate() {} func (c *routingProxyClient) RevokeToken() {} func (c *routingProxyClient) ClickHouseDatasources() []string { diff --git a/pkg/server/server.go b/pkg/server/server.go index 7950d7d..e1d6017 100644 --- a/pkg/server/server.go +++ b/pkg/server/server.go @@ -357,6 +357,17 @@ func (s *service) buildHTTPHandler(routes map[string]http.Handler) http.Handler _, _ = w.Write([]byte("ok")) }) r.Get("/ready", func(w http.ResponseWriter, _ *http.Request) { + // Gate readiness on the proxy layer having completed initial datasource + // discovery (or being blocked on user auth). Until then /api/v1/datasources + // would return an empty list, so reporting "ready" too early makes + // 'panda datasources' look empty while the server is still bootstrapping. + if s.proxyService == nil || !s.proxyService.Ready() { + w.WriteHeader(http.StatusServiceUnavailable) + _, _ = w.Write([]byte("not ready")) + + return + } + w.WriteHeader(http.StatusOK) _, _ = w.Write([]byte("ready")) })