Skip to content

Commit 5454f52

Browse files
committed
Update show metrics
1 parent d55d5be commit 5454f52

4 files changed

Lines changed: 196 additions & 18 deletions

File tree

internal/gui/api_handler.go

Lines changed: 56 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -328,13 +328,64 @@ func (h *APIHandler) handleMetrics(w http.ResponseWriter, r *http.Request, parts
328328
return
329329
}
330330

331-
metrics := map[string]interface{}{
332-
"namespace": namespace,
333-
"name": name,
334-
"metrics": map[string]interface{}{},
331+
if h.server.operatorMetricsURL == "" {
332+
metrics := map[string]interface{}{
333+
"namespace": namespace,
334+
"name": name,
335+
"metrics": map[string]interface{}{},
336+
}
337+
json.NewEncoder(w).Encode(metrics)
338+
return
339+
}
340+
341+
metricsURL := strings.TrimSuffix(h.server.operatorMetricsURL, "/") + "/metrics"
342+
req, err := http.NewRequestWithContext(r.Context(), http.MethodGet, metricsURL, nil)
343+
if err != nil {
344+
h.server.logger.Error(err, "Failed to create metrics request")
345+
http.Error(w, "Failed to create metrics request", http.StatusInternalServerError)
346+
return
347+
}
348+
349+
resp, err := http.DefaultClient.Do(req)
350+
if err != nil {
351+
h.server.logger.Error(err, "Failed to fetch metrics from operator")
352+
http.Error(w, "Failed to fetch metrics from operator", http.StatusBadGateway)
353+
return
335354
}
355+
defer resp.Body.Close()
336356

337-
json.NewEncoder(w).Encode(metrics)
357+
if resp.StatusCode != http.StatusOK {
358+
h.server.logger.Info("Operator metrics returned non-200", "status", resp.StatusCode)
359+
http.Error(w, fmt.Sprintf("Operator metrics returned %d", resp.StatusCode), http.StatusBadGateway)
360+
return
361+
}
362+
363+
w.Header().Set("Content-Type", "text/plain; version=0.0.4; charset=utf-8")
364+
filterPrometheusByDataFlow(resp.Body, w, namespace, name)
365+
}
366+
367+
// filterPrometheusByDataFlow reads Prometheus text format from src and writes to dst
368+
// only lines for dataflow_* metrics with namespace and name labels matching the given DataFlow.
369+
func filterPrometheusByDataFlow(src io.Reader, dst io.Writer, namespace, name string) {
370+
nsMatch := `namespace="` + namespace + `"`
371+
nameMatch := `name="` + name + `"`
372+
scanner := bufio.NewScanner(src)
373+
const maxLineSize = 64 * 1024
374+
buf := make([]byte, 0, maxLineSize)
375+
scanner.Buffer(buf, maxLineSize)
376+
377+
for scanner.Scan() {
378+
line := scanner.Text()
379+
if strings.HasPrefix(line, "#") {
380+
if strings.Contains(line, "dataflow_") {
381+
fmt.Fprintln(dst, line)
382+
}
383+
continue
384+
}
385+
if strings.HasPrefix(line, "dataflow_") && strings.Contains(line, nsMatch) && strings.Contains(line, nameMatch) {
386+
fmt.Fprintln(dst, line)
387+
}
388+
}
338389
}
339390

340391
func (h *APIHandler) handleStatus(w http.ResponseWriter, r *http.Request, parts []string) {

internal/gui/api_handler_test.go

Lines changed: 128 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -34,16 +34,21 @@ import (
3434
var ctx = context.Background()
3535

3636
func setupTestServer() (*Server, error) {
37+
return setupTestServerWithMetricsURL("")
38+
}
39+
40+
func setupTestServerWithMetricsURL(operatorMetricsURL string) (*Server, error) {
3741
scheme := runtime.NewScheme()
3842
utilruntime.Must(clientgoscheme.AddToScheme(scheme))
3943
utilruntime.Must(dataflowv1.AddToScheme(scheme))
4044

4145
fakeClient := fake.NewClientBuilder().WithScheme(scheme).Build()
4246

4347
server := &Server{
44-
client: fakeClient,
45-
k8sClient: nil, // not needed for dataflow CRUD tests
46-
logger: ctrl.Log.WithName("test"),
48+
client: fakeClient,
49+
k8sClient: nil, // not needed for dataflow CRUD tests
50+
logger: ctrl.Log.WithName("test"),
51+
operatorMetricsURL: operatorMetricsURL,
4752
}
4853

4954
return server, nil
@@ -208,6 +213,126 @@ func TestAPIHandler_DeleteDataFlow(t *testing.T) {
208213
}
209214
}
210215

216+
func TestAPIHandler_Metrics_RequiresNamespaceAndName(t *testing.T) {
217+
server, err := setupTestServer()
218+
if err != nil {
219+
t.Fatalf("Failed to setup test server: %v", err)
220+
}
221+
handler := NewAPIHandler(server)
222+
223+
for _, tc := range []struct {
224+
query string
225+
}{
226+
{"?"},
227+
{"?namespace=default"},
228+
{"?name=foo"},
229+
} {
230+
req := httptest.NewRequest("GET", "/metrics"+tc.query, nil)
231+
w := httptest.NewRecorder()
232+
handler.ServeHTTP(w, req)
233+
if w.Code != http.StatusBadRequest {
234+
t.Errorf("query %q: expected 400, got %d", tc.query, w.Code)
235+
}
236+
}
237+
}
238+
239+
func TestAPIHandler_Metrics_StubWhenNoOperatorURL(t *testing.T) {
240+
server, err := setupTestServer()
241+
if err != nil {
242+
t.Fatalf("Failed to setup test server: %v", err)
243+
}
244+
handler := NewAPIHandler(server)
245+
246+
req := httptest.NewRequest("GET", "/metrics?namespace=default&name=myflow", nil)
247+
w := httptest.NewRecorder()
248+
handler.ServeHTTP(w, req)
249+
250+
if w.Code != http.StatusOK {
251+
t.Errorf("Expected 200, got %d: %s", w.Code, w.Body.String())
252+
}
253+
if w.Header().Get("Content-Type") != "application/json" {
254+
t.Errorf("Expected application/json, got %s", w.Header().Get("Content-Type"))
255+
}
256+
var out map[string]interface{}
257+
if err := json.NewDecoder(w.Body).Decode(&out); err != nil {
258+
t.Fatalf("Failed to decode: %v", err)
259+
}
260+
if out["namespace"] != "default" || out["name"] != "myflow" {
261+
t.Errorf("Expected namespace=default, name=myflow, got %v", out)
262+
}
263+
metrics, ok := out["metrics"].(map[string]interface{})
264+
if !ok || len(metrics) != 0 {
265+
t.Errorf("Expected empty metrics map, got %v", out["metrics"])
266+
}
267+
}
268+
269+
func TestAPIHandler_Metrics_ProxyFromOperator(t *testing.T) {
270+
promOutput := `# HELP dataflow_messages_received_total Total messages received
271+
# TYPE dataflow_messages_received_total counter
272+
dataflow_messages_received_total{namespace="default",name="myflow",source_type="kafka"} 100
273+
dataflow_messages_received_total{namespace="other",name="otherflow",source_type="kafka"} 50
274+
# HELP go_goroutines Number of goroutines
275+
# TYPE go_goroutines gauge
276+
go_goroutines 10
277+
`
278+
mockOperator := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
279+
if r.URL.Path != "/metrics" {
280+
http.NotFound(w, r)
281+
return
282+
}
283+
w.Header().Set("Content-Type", "text/plain; version=0.0.4")
284+
w.Write([]byte(promOutput))
285+
}))
286+
defer mockOperator.Close()
287+
288+
server, err := setupTestServerWithMetricsURL(mockOperator.URL)
289+
if err != nil {
290+
t.Fatalf("Failed to setup test server: %v", err)
291+
}
292+
handler := NewAPIHandler(server)
293+
294+
req := httptest.NewRequest("GET", "/metrics?namespace=default&name=myflow", nil)
295+
w := httptest.NewRecorder()
296+
handler.ServeHTTP(w, req)
297+
298+
if w.Code != http.StatusOK {
299+
t.Errorf("Expected 200, got %d: %s", w.Code, w.Body.String())
300+
}
301+
body := w.Body.String()
302+
if !strings.Contains(body, `dataflow_messages_received_total{namespace="default",name="myflow"`) {
303+
t.Errorf("Expected filtered metrics for default/myflow, got: %s", body)
304+
}
305+
if strings.Contains(body, `namespace="other"`) {
306+
t.Errorf("Should not include metrics from other namespace: %s", body)
307+
}
308+
if strings.Contains(body, "go_goroutines") {
309+
t.Errorf("Should not include non-dataflow metrics: %s", body)
310+
}
311+
}
312+
313+
func TestFilterPrometheusByDataFlow(t *testing.T) {
314+
input := `# HELP dataflow_foo A metric
315+
# TYPE dataflow_foo counter
316+
dataflow_foo{namespace="ns1",name="n1"} 1
317+
dataflow_foo{namespace="ns2",name="n2"} 2
318+
# HELP go_goroutines Goroutines
319+
# TYPE go_goroutines gauge
320+
go_goroutines 5
321+
`
322+
var buf bytes.Buffer
323+
filterPrometheusByDataFlow(strings.NewReader(input), &buf, "ns1", "n1")
324+
out := buf.String()
325+
if !strings.Contains(out, `dataflow_foo{namespace="ns1",name="n1"} 1`) {
326+
t.Errorf("Expected ns1/n1 metric: %s", out)
327+
}
328+
if strings.Contains(out, `namespace="ns2"`) {
329+
t.Errorf("Should not include ns2: %s", out)
330+
}
331+
if strings.Contains(out, "go_goroutines") {
332+
t.Errorf("Should not include go_goroutines: %s", out)
333+
}
334+
}
335+
211336
// TestScannerReadsLongLogLine verifies that a scanner with an increased buffer
212337
// reads lines longer than 64 KB without "token too long" error.
213338
func TestScannerReadsLongLogLine(t *testing.T) {

internal/gui/server.go

Lines changed: 11 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -85,11 +85,12 @@ func (h staticHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
8585

8686
// Server represents the web server for the GUI.
8787
type Server struct {
88-
bindAddr string
89-
httpServer *http.Server
90-
client client.Client
91-
k8sClient *kubernetes.Clientset
92-
logger logr.Logger
88+
bindAddr string
89+
httpServer *http.Server
90+
client client.Client
91+
k8sClient *kubernetes.Clientset
92+
logger logr.Logger
93+
operatorMetricsURL string
9394
}
9495

9596
// NewServer creates a new GUI server.
@@ -130,10 +131,11 @@ func NewServer(bindAddr, kubeconfig string) (*Server, error) {
130131
}
131132

132133
server := &Server{
133-
bindAddr: bindAddr,
134-
client: k8sClient,
135-
k8sClient: clientset,
136-
logger: logger,
134+
bindAddr: bindAddr,
135+
client: k8sClient,
136+
k8sClient: clientset,
137+
logger: logger,
138+
operatorMetricsURL: os.Getenv("OPERATOR_METRICS_URL"),
137139
}
138140

139141
mux := http.NewServeMux()

web/src/api/client.js

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -86,7 +86,7 @@ export async function getStatus(namespace, name) {
8686
}
8787

8888
export async function getMetrics(namespace, name) {
89-
return request(
89+
return requestText(
9090
`/metrics?namespace=${encodeURIComponent(namespace)}&name=${encodeURIComponent(name)}`
9191
)
9292
}

0 commit comments

Comments
 (0)