Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions modules/clickhouse/schema_routing_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,7 @@ func (c *schemaProxyClient) Start(_ context.Context) error { return nil }
func (c *schemaProxyClient) Stop(_ context.Context) error { return nil }
func (c *schemaProxyClient) URL() string { return c.url }
func (c *schemaProxyClient) RegisterToken() string { return c.token }
func (c *schemaProxyClient) Ready() bool { return true }
func (c *schemaProxyClient) Invalidate() {}
func (c *schemaProxyClient) RevokeToken() {}
func (c *schemaProxyClient) ClickHouseDatasources() []string {
Expand Down
1 change: 1 addition & 0 deletions pkg/app/refresh_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -325,6 +325,7 @@ func (f *fakeProxyClient) Start(_ context.Context) error { return
func (f *fakeProxyClient) Stop(_ context.Context) error { return nil }
func (f *fakeProxyClient) URL() string { return "" }
func (f *fakeProxyClient) RegisterToken() string { return "" }
func (f *fakeProxyClient) Ready() bool { return true }
func (f *fakeProxyClient) Invalidate() {}
func (f *fakeProxyClient) RevokeToken() {}
func (f *fakeProxyClient) Discover(_ context.Context) error { return nil }
Expand Down
53 changes: 45 additions & 8 deletions pkg/cli/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -211,7 +211,44 @@ func checkServerHealth(ctx context.Context) error {
return nil
}

func waitForServerHealth(ctx context.Context, timeout time.Duration) error {
// checkServerReady probes the server's /ready endpoint, which reports 200 only
// once the proxy layer has completed initial datasource discovery (or is
// blocked on user auth). /health returns 200 as soon as the HTTP listener is
// up, so readiness — not liveness — is what 'panda init' and 'panda server
// start' should wait on before claiming the server is usable.
func checkServerReady(ctx context.Context) error {
if ctx == nil {
ctx = context.Background()
}

cfg, err := config.LoadClient(cfgFile)
if err != nil {
return &serverHealthConfigError{err: err}
}

readyURL := strings.TrimRight(cfg.ServerURL(), "/") + "/ready"

req, err := http.NewRequestWithContext(ctx, http.MethodGet, readyURL, nil)
if err != nil {
return &serverHealthRequestError{err: err}
}

client := &http.Client{Timeout: 5 * time.Second}

resp, err := client.Do(req)
if err != nil {
return &serverHealthRequestError{err: err}
}
defer func() { _ = resp.Body.Close() }()

if resp.StatusCode != http.StatusOK {
return &serverHealthStatusError{statusCode: resp.StatusCode}
}

return nil
}

func waitForServerReady(ctx context.Context, timeout time.Duration) error {
if ctx == nil {
ctx = context.Background()
}
Expand All @@ -228,14 +265,14 @@ func waitForServerHealth(ctx context.Context, timeout time.Duration) error {
var lastErr error

for {
err := checkServerHealth(waitCtx)
err := checkServerReady(waitCtx)
if err == nil {
return nil
}

var configErr *serverHealthConfigError
if errors.As(err, &configErr) {
return fmt.Errorf("cannot check server health: %w", err)
return fmt.Errorf("cannot check server readiness: %w", err)
}

lastErr = err
Expand All @@ -244,7 +281,7 @@ func waitForServerHealth(ctx context.Context, timeout time.Duration) error {
if !now.Before(nextProgressAt) {
remaining := max(time.Until(deadline).Round(time.Second), 0)

fmt.Printf("Still waiting for server to become healthy... (%s remaining)\n", remaining)
fmt.Printf("Still waiting for server to become ready... (%s remaining)\n", remaining)
nextProgressAt = now.Add(serverHealthProgressInterval)
}

Expand All @@ -260,13 +297,13 @@ func waitForServerHealth(ctx context.Context, timeout time.Duration) error {

if errors.Is(waitCtx.Err(), context.DeadlineExceeded) {
return fmt.Errorf(
"server did not become healthy within %s (last check: %v). Check logs with 'panda server logs'",
"server did not become ready within %s (last check: %v). Check logs with 'panda server logs'",
timeout,
lastErr,
)
}

return fmt.Errorf("server health wait canceled: %w", waitCtx.Err())
return fmt.Errorf("server readiness wait canceled: %w", waitCtx.Err())
case <-timer.C:
}
}
Expand All @@ -277,9 +314,9 @@ func runComposeAndWait(ctx context.Context, composeFile string, composeArgs []st
return err
}

fmt.Println("Waiting for server to become healthy...")
fmt.Println("Waiting for server to become ready...")

if err := waitForServerHealth(ctx, timeout); err != nil {
if err := waitForServerReady(ctx, timeout); err != nil {
return err
}

Expand Down
30 changes: 16 additions & 14 deletions pkg/cli/server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,17 +72,17 @@ func TestComposeOverrideFile(t *testing.T) {
})
}

func TestWaitForServerHealthSucceedsAfterTemporaryFailure(t *testing.T) {
func TestWaitForServerReadySucceedsAfterTemporaryFailure(t *testing.T) {
var attempts atomic.Int32

server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
if r.URL.Path != "/health" {
if r.URL.Path != "/ready" {
http.NotFound(w, r)
return
}

if attempts.Add(1) == 1 {
http.Error(w, "starting", http.StatusServiceUnavailable)
http.Error(w, "not ready", http.StatusServiceUnavailable)
return
}

Expand All @@ -93,20 +93,20 @@ func TestWaitForServerHealthSucceedsAfterTemporaryFailure(t *testing.T) {
setClientConfig(t, server.URL)
setServerHealthWaitIntervals(t, 5*time.Millisecond, time.Hour)

err := waitForServerHealth(context.Background(), 200*time.Millisecond)
err := waitForServerReady(context.Background(), 200*time.Millisecond)

require.NoError(t, err)
assert.GreaterOrEqual(t, attempts.Load(), int32(2))
}

func TestWaitForServerHealthTimesOutWithLogsHint(t *testing.T) {
func TestWaitForServerReadyTimesOutWithLogsHint(t *testing.T) {
server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
if r.URL.Path != "/health" {
if r.URL.Path != "/ready" {
http.NotFound(w, r)
return
}

http.Error(w, "starting", http.StatusServiceUnavailable)
http.Error(w, "not ready", http.StatusServiceUnavailable)
}))
defer server.Close()

Expand All @@ -115,18 +115,19 @@ func TestWaitForServerHealthTimesOutWithLogsHint(t *testing.T) {

var err error
output := captureStdout(t, func() {
err = waitForServerHealth(context.Background(), 200*time.Millisecond)
err = waitForServerReady(context.Background(), 200*time.Millisecond)
})

require.Error(t, err)
assert.Contains(t, err.Error(), "server did not become healthy within")
assert.Contains(t, err.Error(), "server did not become ready within")
assert.Contains(t, err.Error(), "panda server logs")
assert.Contains(t, output, "Still waiting for server to become healthy...")
assert.Contains(t, output, "Still waiting for server to become ready...")
}

func TestRunServerRestartLogsAndWaitsForHealth(t *testing.T) {
server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
if r.URL.Path != "/health" {
// /health gates the restart/liveness precheck; /ready gates the wait.
if r.URL.Path != "/health" && r.URL.Path != "/ready" {
http.NotFound(w, r)
return
}
Expand Down Expand Up @@ -170,14 +171,15 @@ func TestRunServerRestartLogsAndWaitsForHealth(t *testing.T) {
assert.Equal(t, []string{"restart"}, runnerArgs)
assertContainsInOrder(t, output,
"Restarting server...",
"Waiting for server to become healthy...",
"Waiting for server to become ready...",
"Server ready.",
)
}

func TestRunServerStartLogsAndWaitsForHealth(t *testing.T) {
server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
if r.URL.Path != "/health" {
// /health gates the restart/liveness precheck; /ready gates the wait.
if r.URL.Path != "/health" && r.URL.Path != "/ready" {
http.NotFound(w, r)
return
}
Expand Down Expand Up @@ -221,7 +223,7 @@ func TestRunServerStartLogsAndWaitsForHealth(t *testing.T) {
assert.Equal(t, []string{"up", "-d", "--force-recreate"}, runnerArgs)
assertContainsInOrder(t, output,
"Starting server...",
"Waiting for server to become healthy...",
"Waiting for server to become ready...",
"Server ready.",
)
}
Expand Down
53 changes: 35 additions & 18 deletions pkg/embedding/remote.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,20 @@ type RemoteEmbedder struct {
invalidateFn func()
localCache cache.Cache
model string
progressFn func(completed, total int)
}

// OnProgress registers a callback invoked during EmbedBatch with the number of
// documents embedded so far and the total in the batch. It enables
// document-level progress reporting for index builds.
func (e *RemoteEmbedder) OnProgress(fn func(completed, total int)) {
e.progressFn = fn
}

func (e *RemoteEmbedder) reportProgress(completed, total int) {
if e.progressFn != nil {
e.progressFn(completed, total)
}
}

// Compile-time interface check.
Expand Down Expand Up @@ -199,6 +213,8 @@ func (e *RemoteEmbedder) EmbedBatch(texts []string) ([][]float32, error) {
}
}

e.reportProgress(localHits, len(texts))

// Collect indices that still need embedding.
var remoteIndices []int

Expand Down Expand Up @@ -260,28 +276,29 @@ func (e *RemoteEmbedder) EmbedBatch(texts []string) ([][]float32, error) {
}
}

if len(missItems) == 0 {
continue
}

e.log.WithFields(logrus.Fields{
"total": len(batchIndices),
"cached": len(batchIndices) - len(missItems),
"misses": len(missItems),
}).Info("Proxy cache stats")
if len(missItems) > 0 {
e.log.WithFields(logrus.Fields{
"total": len(batchIndices),
"cached": len(batchIndices) - len(missItems),
"misses": len(missItems),
}).Info("Proxy cache stats")

resp, err := e.callEmbed(missItems)
if err != nil {
return nil, fmt.Errorf("embedding batch %d/%d: %w", batchNum, totalBatches, err)
}

resp, err := e.callEmbed(missItems)
if err != nil {
return nil, fmt.Errorf("embedding batch %d/%d: %w", batchNum, totalBatches, err)
}
for _, result := range resp.Results {
for _, idx := range hashToIndices[result.Hash] {
vectors[idx] = result.Vector
}

for _, result := range resp.Results {
for _, idx := range hashToIndices[result.Hash] {
vectors[idx] = result.Vector
e.queueLocalCache(toCache, result.Hash, result.Vector)
}

e.queueLocalCache(toCache, result.Hash, result.Vector)
}

// batchEnd is the cumulative count of remote items processed so far.
e.reportProgress(localHits+batchEnd, len(texts))
}

// Phase 3: persist newly fetched vectors to local cache.
Expand Down
40 changes: 40 additions & 0 deletions pkg/embedding/remote_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -123,6 +123,46 @@ func TestRemoteEmbedder_EmbedBatch_AllMisses(t *testing.T) {
assert.Equal(t, fakeVectors["gamma"], vectors[2])
}

func TestRemoteEmbedder_EmbedBatch_ReportsProgress(t *testing.T) {
t.Parallel()

texts := []string{"alpha", "beta", "gamma"}

srv := newMockProxy(t,
func(w http.ResponseWriter, r *http.Request) {
var req embedRequest
require.NoError(t, json.NewDecoder(r.Body).Decode(&req))

results := make([]embedResult, 0, len(req.Items))
for _, item := range req.Items {
results = append(results, embedResult{Hash: item.Hash, Vector: []float32{1, 0, 0}})
}

w.Header().Set("Content-Type", "application/json")
require.NoError(t, json.NewEncoder(w).Encode(embedResponse{Model: "test-model", Results: results}))
},
func(w http.ResponseWriter, _ *http.Request) {
w.Header().Set("Content-Type", "application/json")
_ = json.NewEncoder(w).Encode(embedCheckResponse{})
},
)

embedder := NewRemote(logrus.New(), srv.URL, func() string { return "" }, nil, nil, "")

var calls, lastDone, lastTotal int
embedder.OnProgress(func(completed, total int) {
calls++
lastDone, lastTotal = completed, total
})

_, err := embedder.EmbedBatch(texts)
require.NoError(t, err)

assert.Positive(t, calls, "OnProgress should be invoked during a batch embed")
assert.Equal(t, 3, lastDone, "final progress should report every document done")
assert.Equal(t, 3, lastTotal)
}

func TestRemoteEmbedder_EmbedBatch_AllCached(t *testing.T) {
t.Parallel()

Expand Down
32 changes: 31 additions & 1 deletion pkg/proxy/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
"net/url"
"strings"
"sync"
"sync/atomic"
"time"

"github.com/sirupsen/logrus"
Expand Down Expand Up @@ -114,6 +115,17 @@ type proxyClient struct {
datasources *DatasourcesResponse
stopCh chan struct{}
stopped bool

// discovered reports whether at least one datasource discovery has
// succeeded. It gates server readiness: until the first successful
// discovery the server has no datasources to serve.
discovered atomic.Bool

// authPending reports whether discovery is blocked waiting for the user to
// authenticate. In that state the server is up and cannot do better until
// the user runs `panda auth login`, so it counts as ready — otherwise a
// yet-to-auth user would wedge readiness at 503 forever.
authPending atomic.Bool
}

// AuthModeClientCredentials is the ClientConfig.AuthMode value for the
Expand Down Expand Up @@ -492,12 +504,30 @@ func (c *proxyClient) Discover(ctx context.Context) error {
c.log.WithError(err).Debug("Proxy rejected token; invalidating and retrying")
c.tokenSource.Invalidate()

return c.discoverOnce(ctx)
err = c.discoverOnce(ctx)
}

switch {
case err == nil:
c.discovered.Store(true)
c.authPending.Store(false)
case errors.Is(err, ErrAuthenticationRequired):
// The server is up but cannot discover until the user authenticates.
// Treat this as ready so the init/start wait does not block on a state
// only the user can resolve.
c.authPending.Store(true)
}

return err
}

// Ready reports whether the client has completed at least one successful
// datasource discovery, or is blocked waiting for the user to authenticate
// (in which case the server is up and can do no better unattended).
func (c *proxyClient) Ready() bool {
return c.discovered.Load() || c.authPending.Load()
}

// discoverOnce performs a single /datasources fetch.
func (c *proxyClient) discoverOnce(ctx context.Context) error {
url := fmt.Sprintf("%s/datasources", c.cfg.URL)
Expand Down
Loading
Loading