Skip to content

Commit bf21997

Browse files
committed
Add k8s events logs
1 parent a7db9f1 commit bf21997

1 file changed

Lines changed: 31 additions & 5 deletions

File tree

internal/controller/dataflow_controller.go

Lines changed: 31 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -59,6 +59,21 @@ import (
5959
// then removes this finalizer so deletion can complete.
6060
const DataFlowFinalizer = "dataflow.dataflow.io/finalizer"
6161

62+
// dataflowRefForEvent returns a minimal DataFlow object for use as involvedObject in events
63+
// when the full object is unavailable (e.g. FailedGet). Implements runtime.Object for EventRecorder.
64+
func dataflowRefForEvent(namespace, name string) *dataflowv1.DataFlow {
65+
return &dataflowv1.DataFlow{
66+
TypeMeta: metav1.TypeMeta{
67+
APIVersion: dataflowv1.GroupVersion.String(),
68+
Kind: "DataFlow",
69+
},
70+
ObjectMeta: metav1.ObjectMeta{
71+
Namespace: namespace,
72+
Name: name,
73+
},
74+
}
75+
}
76+
6277
// DataFlowReconciler reconciles a DataFlow object
6378
type DataFlowReconciler struct {
6479
client.Client
@@ -72,6 +87,9 @@ type DataFlowReconciler struct {
7287
}
7388

7489
func NewDataFlowReconciler(client client.Client, scheme *runtime.Scheme, recorder record.EventRecorder) *DataFlowReconciler {
90+
if recorder == nil {
91+
ctrl.Log.WithName("dataflow-controller").Info("EventRecorder is nil, Kubernetes events will not be emitted")
92+
}
7593
// Processor image: from env or same image and version as operator (set at build time via ldflags).
7694
processorImage := version.DefaultProcessorImage()
7795
if img := os.Getenv("PROCESSOR_IMAGE"); img != "" {
@@ -236,12 +254,10 @@ func (r *DataFlowReconciler) Reconcile(ctx context.Context, req ctrl.Request) (c
236254
if err := r.Get(ctx, req.NamespacedName, &dataflow); err != nil {
237255
log.Error(err, "unable to fetch DataFlow")
238256
if r.Recorder != nil && !apierrors.IsNotFound(err) {
239-
ref := &dataflowv1.DataFlow{}
240-
ref.APIVersion = dataflowv1.GroupVersion.String()
241-
ref.Kind = "DataFlow"
242-
ref.Namespace = req.Namespace
243-
ref.Name = req.Name
257+
// Use minimal object reference for event (we don't have UID/ResourceVersion since Get failed)
258+
ref := dataflowRefForEvent(req.Namespace, req.Name)
244259
r.Recorder.Event(ref, corev1.EventTypeWarning, "FailedGet", "Unable to fetch DataFlow")
260+
log.V(1).Info("Emitted Kubernetes event", "reason", "FailedGet", "object", req.NamespacedName)
245261
}
246262
return ctrl.Result{}, client.IgnoreNotFound(err)
247263
}
@@ -266,11 +282,13 @@ func (r *DataFlowReconciler) Reconcile(ctx context.Context, req ctrl.Request) (c
266282
log.Error(err, "failed to cleanup resources")
267283
if r.Recorder != nil {
268284
r.Recorder.Eventf(&dataflow, corev1.EventTypeWarning, "CleanupFailed", "Failed to cleanup resources: %v", err)
285+
log.V(1).Info("Emitted Kubernetes event", "reason", "CleanupFailed", "object", req.NamespacedName)
269286
}
270287
return ctrl.Result{}, err
271288
}
272289
if r.Recorder != nil {
273290
r.Recorder.Event(&dataflow, corev1.EventTypeNormal, "ResourcesDeleted", "Deleted Deployment and ConfigMap")
291+
log.V(1).Info("Emitted Kubernetes event", "reason", "ResourcesDeleted", "object", req.NamespacedName)
274292
}
275293

276294
if err := r.updateStatusWithRetry(ctx, req, func(df *dataflowv1.DataFlow) {
@@ -293,6 +311,7 @@ func (r *DataFlowReconciler) Reconcile(ctx context.Context, req ctrl.Request) (c
293311
log.Error(err, "failed to resolve secrets")
294312
if r.Recorder != nil {
295313
r.Recorder.Eventf(&dataflow, corev1.EventTypeWarning, "SecretResolutionFailed", "Failed to resolve secrets: %v", err)
314+
log.V(1).Info("Emitted Kubernetes event", "reason", "SecretResolutionFailed", "object", req.NamespacedName)
296315
}
297316
updateErr := r.updateStatusWithRetry(ctx, req, func(df *dataflowv1.DataFlow) {
298317
df.Status.Phase = "Error"
@@ -309,6 +328,7 @@ func (r *DataFlowReconciler) Reconcile(ctx context.Context, req ctrl.Request) (c
309328
log.Error(err, "failed to create or update ConfigMap")
310329
if r.Recorder != nil {
311330
r.Recorder.Eventf(&dataflow, corev1.EventTypeWarning, "ConfigMapFailed", "Failed to create or update ConfigMap: %v", err)
331+
log.V(1).Info("Emitted Kubernetes event", "reason", "ConfigMapFailed", "object", req.NamespacedName)
312332
}
313333
updateErr := r.updateStatusWithRetry(ctx, req, func(df *dataflowv1.DataFlow) {
314334
df.Status.Phase = "Error"
@@ -325,6 +345,7 @@ func (r *DataFlowReconciler) Reconcile(ctx context.Context, req ctrl.Request) (c
325345
log.Error(err, "failed to create or update Deployment")
326346
if r.Recorder != nil {
327347
r.Recorder.Eventf(&dataflow, corev1.EventTypeWarning, "DeploymentFailed", "Failed to create or update Deployment: %v", err)
348+
log.V(1).Info("Emitted Kubernetes event", "reason", "DeploymentFailed", "object", req.NamespacedName)
328349
}
329350
updateErr := r.updateStatusWithRetry(ctx, req, func(df *dataflowv1.DataFlow) {
330351
df.Status.Phase = "Error"
@@ -383,6 +404,7 @@ func (r *DataFlowReconciler) Reconcile(ctx context.Context, req ctrl.Request) (c
383404
log.Error(err, "unable to update DataFlow status")
384405
if r.Recorder != nil {
385406
r.Recorder.Eventf(&dataflow, corev1.EventTypeWarning, "StatusUpdateFailed", "Unable to update DataFlow status: %v", err)
407+
log.V(1).Info("Emitted Kubernetes event", "reason", "StatusUpdateFailed", "object", req.NamespacedName)
386408
}
387409
// Don't return error if context was canceled or timed out, just requeue
388410
if errors.Is(err, context.Canceled) || errors.Is(err, context.DeadlineExceeded) {
@@ -449,6 +471,7 @@ func (r *DataFlowReconciler) createOrUpdateConfigMap(ctx context.Context, req ct
449471
log.Info("Created ConfigMap", "name", configMapName)
450472
if r.Recorder != nil {
451473
r.Recorder.Eventf(&df, corev1.EventTypeNormal, "ConfigMapCreated", "Created ConfigMap %s", configMapName)
474+
log.V(1).Info("Emitted Kubernetes event", "reason", "ConfigMapCreated", "object", configMapName)
452475
}
453476
} else if err != nil {
454477
return fmt.Errorf("failed to get ConfigMap: %w", err)
@@ -461,6 +484,7 @@ func (r *DataFlowReconciler) createOrUpdateConfigMap(ctx context.Context, req ct
461484
log.Info("Updated ConfigMap", "name", configMapName)
462485
if r.Recorder != nil {
463486
r.Recorder.Eventf(&df, corev1.EventTypeNormal, "ConfigMapUpdated", "Updated ConfigMap %s", configMapName)
487+
log.V(1).Info("Emitted Kubernetes event", "reason", "ConfigMapUpdated", "object", configMapName)
464488
}
465489
}
466490

@@ -599,6 +623,7 @@ func (r *DataFlowReconciler) createOrUpdateDeployment(ctx context.Context, req c
599623
log.Info("Created Deployment", "name", deploymentName)
600624
if r.Recorder != nil {
601625
r.Recorder.Eventf(dataflow, corev1.EventTypeNormal, "DeploymentCreated", "Created Deployment %s", deploymentName)
626+
log.V(1).Info("Emitted Kubernetes event", "reason", "DeploymentCreated", "object", deploymentName)
602627
}
603628
return nil
604629
}
@@ -631,6 +656,7 @@ func (r *DataFlowReconciler) createOrUpdateDeployment(ctx context.Context, req c
631656
log.Info("Updated Deployment", "name", deploymentName)
632657
if r.Recorder != nil {
633658
r.Recorder.Eventf(dataflow, corev1.EventTypeNormal, "DeploymentUpdated", "Updated Deployment %s", deploymentName)
659+
log.V(1).Info("Emitted Kubernetes event", "reason", "DeploymentUpdated", "object", deploymentName)
634660
}
635661
return nil
636662
}

0 commit comments

Comments
 (0)