Skip to content

Commit abe19c7

Browse files
committed
Fix metrics and autoupdate
1 parent 18e2aa7 commit abe19c7

9 files changed

Lines changed: 518 additions & 8 deletions

File tree

cmd/processor/main.go

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,29 +20,35 @@ import (
2020
"context"
2121
"encoding/json"
2222
"flag"
23+
"net/http"
2324
"os"
2425
"os/signal"
2526
"strings"
2627
"syscall"
2728

29+
"github.com/prometheus/client_golang/prometheus/promhttp"
2830
"go.uber.org/zap"
2931
"go.uber.org/zap/zapcore"
3032
ctrl "sigs.k8s.io/controller-runtime"
3133
zaprctrl "sigs.k8s.io/controller-runtime/pkg/log/zap"
34+
"sigs.k8s.io/controller-runtime/pkg/metrics"
3235

3336
dataflowv1 "github.com/dataflow-operator/dataflow/api/v1"
3437
"github.com/dataflow-operator/dataflow/internal/constants"
3538
"github.com/dataflow-operator/dataflow/internal/logkeys"
39+
_ "github.com/dataflow-operator/dataflow/internal/metrics" // Register metrics
3640
"github.com/dataflow-operator/dataflow/internal/processor"
3741
)
3842

3943
func main() {
4044
var specPath string
4145
var namespace string
4246
var name string
47+
var metricsPort string
4348
flag.StringVar(&specPath, "spec-path", "/etc/dataflow/spec.json", "Path to DataFlow spec JSON file")
4449
flag.StringVar(&namespace, "namespace", "", "Namespace of the DataFlow resource")
4550
flag.StringVar(&name, "name", "", "Name of the DataFlow resource")
51+
flag.StringVar(&metricsPort, "metrics-port", ":9090", "Address for the metrics HTTP server")
4652
opts := zaprctrl.Options{
4753
Development: true,
4854
}
@@ -82,6 +88,16 @@ func main() {
8288
ctx, cancel := context.WithCancel(context.Background())
8389
defer cancel()
8490

91+
// Start metrics HTTP server (must be before proc.Start so /metrics is available from the start)
92+
mux := http.NewServeMux()
93+
mux.Handle("/metrics", promhttp.HandlerFor(metrics.Registry, promhttp.HandlerOpts{}))
94+
metricsServer := &http.Server{Addr: metricsPort, Handler: mux}
95+
go func() {
96+
if err := metricsServer.ListenAndServe(); err != nil && err != http.ErrServerClosed {
97+
logger.Error(err, "Metrics server exited")
98+
}
99+
}()
100+
85101
// Signal handling for graceful shutdown
86102
sigChan := make(chan os.Signal, constants.DefaultSingleValueChannelBufferSize)
87103
signal.Notify(sigChan, syscall.SIGINT, syscall.SIGTERM)

cmd/processor/metrics_test.go

Lines changed: 59 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,59 @@
1+
/*
2+
Copyright 2024.
3+
4+
Licensed under the Apache License, Version 2.0 (the "License");
5+
you may not use this file except in compliance with the License.
6+
You may obtain a copy of the License at
7+
8+
http://www.apache.org/licenses/LICENSE-2.0
9+
10+
Unless required by applicable law or agreed to in writing, software
11+
distributed under the License is distributed on an "AS IS" BASIS,
12+
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
See the License for the specific language governing permissions and
14+
limitations under the License.
15+
*/
16+
17+
package main
18+
19+
import (
20+
"io"
21+
"net/http"
22+
"net/http/httptest"
23+
"strings"
24+
"testing"
25+
26+
"github.com/prometheus/client_golang/prometheus/promhttp"
27+
"sigs.k8s.io/controller-runtime/pkg/metrics"
28+
29+
_ "github.com/dataflow-operator/dataflow/internal/metrics" // Register metrics
30+
)
31+
32+
func TestMetricsEndpoint(t *testing.T) {
33+
mux := http.NewServeMux()
34+
mux.Handle("/metrics", promhttp.HandlerFor(metrics.Registry, promhttp.HandlerOpts{}))
35+
server := httptest.NewServer(mux)
36+
defer server.Close()
37+
38+
resp, err := http.Get(server.URL + "/metrics")
39+
if err != nil {
40+
t.Fatalf("GET /metrics: %v", err)
41+
}
42+
defer resp.Body.Close()
43+
44+
if resp.StatusCode != http.StatusOK {
45+
t.Errorf("GET /metrics status = %d, want 200", resp.StatusCode)
46+
}
47+
48+
// Read body
49+
body, err := io.ReadAll(resp.Body)
50+
if err != nil {
51+
t.Fatalf("Read body: %v", err)
52+
}
53+
content := string(body)
54+
55+
// Should contain dataflow_ metric definitions (from internal/metrics init)
56+
if !strings.Contains(content, "dataflow_") {
57+
t.Errorf("metrics output should contain dataflow_ metrics, got: %s", content)
58+
}
59+
}

config/rbac/role.yaml

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,14 @@ kind: ClusterRole
44
metadata:
55
name: manager-role
66
rules:
7+
- apiGroups:
8+
- ""
9+
resources:
10+
- pods
11+
verbs:
12+
- get
13+
- list
14+
- watch
715
- apiGroups:
816
- ""
917
resources:

internal/controller/dataflow_controller.go

Lines changed: 22 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@ import (
2828
"time"
2929

3030
crand "crypto/rand"
31+
"crypto/sha256"
3132

3233
appsv1 "k8s.io/api/apps/v1"
3334
corev1 "k8s.io/api/core/v1"
@@ -320,7 +321,7 @@ func (r *DataFlowReconciler) Reconcile(ctx context.Context, req ctrl.Request) (c
320321
}
321322

322323
// Create or update Deployment
323-
if err := r.createOrUpdateDeployment(ctx, req, &dataflow); err != nil {
324+
if err := r.createOrUpdateDeployment(ctx, req, &dataflow, resolvedSpec); err != nil {
324325
log.Error(err, "failed to create or update Deployment")
325326
if r.Recorder != nil {
326327
r.Recorder.Eventf(&dataflow, corev1.EventTypeWarning, "DeploymentFailed", "Failed to create or update Deployment: %v", err)
@@ -478,13 +479,25 @@ func (r *DataFlowReconciler) processorImageFor(dataflow *dataflowv1.DataFlow) st
478479
return r.processorImage
479480
}
480481

482+
// specHashAnnotation is the pod template annotation key for spec content hash.
483+
// When spec changes, the hash changes, triggering a Deployment rollout.
484+
const specHashAnnotation = "dataflow.dataflow.io/spec-hash"
485+
481486
// createOrUpdateDeployment creates or updates Deployment for the processor.
482-
func (r *DataFlowReconciler) createOrUpdateDeployment(ctx context.Context, req ctrl.Request, dataflow *dataflowv1.DataFlow) error {
487+
func (r *DataFlowReconciler) createOrUpdateDeployment(ctx context.Context, req ctrl.Request, dataflow *dataflowv1.DataFlow, resolvedSpec *dataflowv1.DataFlowSpec) error {
483488
log := log.FromContext(ctx)
484489

485490
deploymentName := fmt.Sprintf("dataflow-%s", dataflow.Name)
486491
configMapName := fmt.Sprintf("dataflow-%s-spec", dataflow.Name)
487492

493+
// Compute spec hash so pod template changes when ConfigMap content changes, triggering rollout.
494+
specJSON, err := json.Marshal(resolvedSpec)
495+
if err != nil {
496+
return fmt.Errorf("failed to marshal spec for hash: %w", err)
497+
}
498+
hash := sha256.Sum256(specJSON)
499+
specHash := hex.EncodeToString(hash[:])
500+
488501
labels := map[string]string{
489502
"app": "dataflow-processor",
490503
"dataflow.dataflow.io/name": dataflow.Name,
@@ -506,6 +519,9 @@ func (r *DataFlowReconciler) createOrUpdateDeployment(ctx context.Context, req c
506519
Template: corev1.PodTemplateSpec{
507520
ObjectMeta: metav1.ObjectMeta{
508521
Labels: labels,
522+
Annotations: map[string]string{
523+
specHashAnnotation: specHash,
524+
},
509525
},
510526
Spec: corev1.PodSpec{
511527
TerminationGracePeriodSeconds: ptr.To(int64(30)),
@@ -522,6 +538,9 @@ func (r *DataFlowReconciler) createOrUpdateDeployment(ctx context.Context, req c
522538
Env: []corev1.EnvVar{
523539
{Name: "LOG_LEVEL", Value: processorLogLevel()},
524540
},
541+
Ports: []corev1.ContainerPort{
542+
{Name: "metrics", ContainerPort: 9090, Protocol: corev1.ProtocolTCP},
543+
},
525544
Lifecycle: &corev1.Lifecycle{
526545
PreStop: &corev1.LifecycleHandler{
527546
Exec: &corev1.ExecAction{
@@ -567,7 +586,7 @@ func (r *DataFlowReconciler) createOrUpdateDeployment(ctx context.Context, req c
567586

568587
// Check if Deployment exists
569588
existing := &appsv1.Deployment{}
570-
err := r.Get(ctx, types.NamespacedName{Name: deploymentName, Namespace: req.Namespace}, existing)
589+
err = r.Get(ctx, types.NamespacedName{Name: deploymentName, Namespace: req.Namespace}, existing)
571590
if err != nil && apierrors.IsNotFound(err) {
572591
// Ensure finalizer before creating first child so deletion is coordinated
573592
if err := r.ensureDataFlowFinalizer(ctx, req); err != nil {

internal/controller/dataflow_controller_test.go

Lines changed: 86 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -520,6 +520,10 @@ func TestDataFlowReconciler_Reconcile_CreateDeployment(t *testing.T) {
520520
err = fakeClient.Get(ctx, deploymentName, &deployment)
521521
assert.NoError(t, err, "Deployment should be created")
522522
assert.Equal(t, "dataflow-test-dataflow", deployment.Name)
523+
assert.Contains(t, deployment.Spec.Template.Annotations, specHashAnnotation,
524+
"Deployment pod template should have spec-hash annotation for ConfigMap change detection")
525+
assert.NotEmpty(t, deployment.Spec.Template.Annotations[specHashAnnotation],
526+
"spec-hash annotation should be non-empty")
523527
require.Len(t, deployment.Spec.Template.Spec.Containers, 1)
524528
assert.Equal(t, version.DefaultProcessorImage(), deployment.Spec.Template.Spec.Containers[0].Image, "default processor image should match controller")
525529
var hasLogLevel bool
@@ -1139,6 +1143,88 @@ func TestCreateOrUpdateDeployment_UpdateWhenSpecChanged(t *testing.T) {
11391143
"Deployment NodeSelector should reflect updated DataFlow spec")
11401144
}
11411145

1146+
// TestCreateOrUpdateDeployment_UpdateWhenSpecContentChanged verifies that when DataFlow spec content
1147+
// changes (e.g. Kafka brokers, SecretRef), the spec-hash annotation changes and Deployment is updated,
1148+
// triggering a pod restart for the new config.
1149+
func TestCreateOrUpdateDeployment_UpdateWhenSpecContentChanged(t *testing.T) {
1150+
scheme := runtime.NewScheme()
1151+
require.NoError(t, dataflowv1.AddToScheme(scheme))
1152+
require.NoError(t, clientgoscheme.AddToScheme(scheme))
1153+
1154+
fakeRecorder := record.NewFakeRecorder(10)
1155+
fakeClient := fake.NewClientBuilder().WithScheme(scheme).Build()
1156+
reconciler := NewDataFlowReconciler(fakeClient, scheme, fakeRecorder)
1157+
1158+
ctx := context.Background()
1159+
dataflow := &dataflowv1.DataFlow{
1160+
TypeMeta: metav1.TypeMeta{
1161+
APIVersion: "dataflow.dataflow.io/v1",
1162+
Kind: "DataFlow",
1163+
},
1164+
ObjectMeta: metav1.ObjectMeta{
1165+
Name: "test-dataflow",
1166+
Namespace: "default",
1167+
},
1168+
Spec: dataflowv1.DataFlowSpec{
1169+
Source: dataflowv1.SourceSpec{
1170+
Type: "kafka",
1171+
Kafka: &dataflowv1.KafkaSourceSpec{
1172+
Brokers: []string{"localhost:9092"},
1173+
Topic: "test-topic",
1174+
ConsumerGroup: "test-group",
1175+
},
1176+
},
1177+
Sink: dataflowv1.SinkSpec{
1178+
Type: "kafka",
1179+
Kafka: &dataflowv1.KafkaSinkSpec{Brokers: []string{"localhost:9092"}, Topic: "output-topic"},
1180+
},
1181+
},
1182+
}
1183+
require.NoError(t, fakeClient.Create(ctx, dataflow))
1184+
1185+
req := ctrl.Request{
1186+
NamespacedName: types.NamespacedName{Name: "test-dataflow", Namespace: "default"},
1187+
}
1188+
1189+
// First reconcile — Deployment created
1190+
_, _ = reconciler.Reconcile(ctx, req)
1191+
drainRecorderEvents(fakeRecorder)
1192+
1193+
deploymentName := types.NamespacedName{Name: "dataflow-test-dataflow", Namespace: "default"}
1194+
var deployment appsv1.Deployment
1195+
require.NoError(t, fakeClient.Get(ctx, deploymentName, &deployment))
1196+
hashBefore := deployment.Spec.Template.Annotations[specHashAnnotation]
1197+
require.NotEmpty(t, hashBefore, "initial Deployment should have spec-hash annotation")
1198+
1199+
// Change DataFlow spec content (Kafka brokers) — same ConfigMap name, but content changes
1200+
require.NoError(t, fakeClient.Get(ctx, req.NamespacedName, dataflow))
1201+
dataflow.Spec.Source.Kafka.Brokers = []string{"kafka-1:9092", "kafka-2:9092"}
1202+
require.NoError(t, fakeClient.Update(ctx, dataflow))
1203+
1204+
// Second reconcile — Deployment should be updated (spec-hash changed)
1205+
_, _ = reconciler.Reconcile(ctx, req)
1206+
1207+
var deploymentUpdatedCount int
1208+
for {
1209+
select {
1210+
case e := <-fakeRecorder.Events:
1211+
if strings.Contains(e, "DeploymentUpdated") {
1212+
deploymentUpdatedCount++
1213+
}
1214+
default:
1215+
goto done
1216+
}
1217+
}
1218+
done:
1219+
assert.Equal(t, 1, deploymentUpdatedCount,
1220+
"expected DeploymentUpdated event when spec content (Kafka brokers) changed")
1221+
1222+
require.NoError(t, fakeClient.Get(ctx, deploymentName, &deployment))
1223+
hashAfter := deployment.Spec.Template.Annotations[specHashAnnotation]
1224+
assert.NotEqual(t, hashBefore, hashAfter,
1225+
"spec-hash should change when Kafka brokers change, triggering pod restart")
1226+
}
1227+
11421228
// TestCreateOrUpdateDeployment_RetryOnConflict verifies that when Deployment Update returns 409 Conflict,
11431229
// the controller retries and succeeds on the next attempt (no extra rollout: spec comparison skips redundant Update).
11441230
func TestCreateOrUpdateDeployment_RetryOnConflict(t *testing.T) {

0 commit comments

Comments
 (0)