diff --git a/deploy/helm/charts/platform/components/operator/templates/manager-rbac.yaml b/deploy/helm/charts/platform/components/operator/templates/manager-rbac.yaml index dd86309f418a..bae96a422a04 100644 --- a/deploy/helm/charts/platform/components/operator/templates/manager-rbac.yaml +++ b/deploy/helm/charts/platform/components/operator/templates/manager-rbac.yaml @@ -330,6 +330,7 @@ rules: - dynamographdeploymentscalingadapters - dynamomodels - dynamoworkermetadatas + - snapshots verbs: - create - delete diff --git a/deploy/operator/config/rbac/role.yaml b/deploy/operator/config/rbac/role.yaml index cde4403d847b..5ec2ac826ca9 100644 --- a/deploy/operator/config/rbac/role.yaml +++ b/deploy/operator/config/rbac/role.yaml @@ -227,6 +227,7 @@ rules: - dynamographdeployments - dynamographdeploymentscalingadapters - dynamomodels + - snapshots verbs: - create - delete diff --git a/deploy/operator/internal/controller/checkpoint_snapshot.go b/deploy/operator/internal/controller/checkpoint_snapshot.go new file mode 100644 index 000000000000..df75a0cf51f7 --- /dev/null +++ b/deploy/operator/internal/controller/checkpoint_snapshot.go @@ -0,0 +1,127 @@ +// SPDX-FileCopyrightText: Copyright (c) 2025-2026 NVIDIA CORPORATION & AFFILIATES. All rights reserved. +// SPDX-License-Identifier: Apache-2.0 + +package controller + +import ( + "context" + "fmt" + + nvidiacomv1alpha1 "github.com/ai-dynamo/dynamo/deploy/operator/api/v1alpha1" + snapshotprotocol "github.com/ai-dynamo/dynamo/deploy/snapshot/protocol" + batchv1 "k8s.io/api/batch/v1" + corev1 "k8s.io/api/core/v1" + apierrors "k8s.io/apimachinery/pkg/api/errors" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + ctrl "sigs.k8s.io/controller-runtime" + "sigs.k8s.io/controller-runtime/pkg/client" + "sigs.k8s.io/controller-runtime/pkg/log" +) + +// checkpointSnapshotFieldManager is the Server-Side Apply field owner for Snapshots. +const checkpointSnapshotFieldManager = "dynamo-checkpoint-controller" + +// snapshotName returns the deterministic Snapshot name for a checkpoint ID. +func snapshotName(checkpointID string) string { + return "snapshot-" + checkpointID +} + +// findSourcePod returns the checkpoint Job's pod, or a NotFound error if the Job has not +// created it yet (callers use client.IgnoreNotFound to requeue). +func (r *CheckpointReconciler) findSourcePod(ctx context.Context, job *batchv1.Job) (*corev1.Pod, error) { + var pods corev1.PodList + if err := r.List(ctx, &pods, + client.InNamespace(job.Namespace), + client.MatchingLabels{batchv1.JobNameLabel: job.Name}, + ); err != nil { + return nil, err + } + for i := range pods.Items { + if metav1.IsControlledBy(&pods.Items[i], job) { + return &pods.Items[i], nil + } + } + return nil, apierrors.NewNotFound(corev1.Resource("pods"), job.Name) +} + +// ensureSnapshot creates this checkpoint's Snapshot (owned by ckpt) via Server-Side Apply when +// absent, and is a no-op when it already exists and is ours. +func (r *CheckpointReconciler) ensureSnapshot(ctx context.Context, ckpt *nvidiacomv1alpha1.DynamoCheckpoint, checkpointID, sourcePodName string) error { + owned, err := r.findOwnedSnapshot(ctx, ckpt, snapshotName(checkpointID)) + if err != nil { + return err + } + if owned { + return nil + } + return r.applySnapshot(ctx, ckpt, buildSnapshot(ckpt, checkpointID, sourcePodName)) +} + +// findOwnedSnapshot reports whether this checkpoint's Snapshot already exists and is owned by +// ckpt. It returns a terminal Forbidden error (and emits an event) when a Snapshot with the same +// name exists but is owned by another controller; (false, nil) means none exists yet. +func (r *CheckpointReconciler) findOwnedSnapshot(ctx context.Context, ckpt *nvidiacomv1alpha1.DynamoCheckpoint, name string) (bool, error) { + existing := &nvidiacomv1alpha1.Snapshot{} + if err := r.Get(ctx, client.ObjectKey{Namespace: ckpt.Namespace, Name: name}, existing); err != nil { + return false, client.IgnoreNotFound(err) + } + if metav1.IsControlledBy(existing, ckpt) { + return true, nil + } + // Forbidden is terminal (see controller_common.IgnoreIntermediateError): a foreign-owned + // name collision will not resolve on retry. + conflict := apierrors.NewForbidden( + nvidiacomv1alpha1.GroupVersion.WithResource("snapshots").GroupResource(), + name, + fmt.Errorf("exists but is not owned by checkpoint %q", ckpt.Name), + ) + r.Recorder.Event(ckpt, corev1.EventTypeWarning, "SnapshotCreateFailed", conflict.Error()) + return false, conflict +} + +// buildSnapshot constructs the desired Snapshot for a checkpoint. +func buildSnapshot(ckpt *nvidiacomv1alpha1.DynamoCheckpoint, checkpointID, sourcePodName string) *nvidiacomv1alpha1.Snapshot { + return &nvidiacomv1alpha1.Snapshot{ + TypeMeta: metav1.TypeMeta{ + APIVersion: nvidiacomv1alpha1.GroupVersion.String(), + Kind: "Snapshot", + }, + ObjectMeta: metav1.ObjectMeta{ + Name: snapshotName(checkpointID), + Namespace: ckpt.Namespace, + Labels: map[string]string{snapshotprotocol.CheckpointIDLabel: checkpointID}, + }, + Spec: nvidiacomv1alpha1.SnapshotSpec{ + CheckpointID: checkpointID, + Source: nvidiacomv1alpha1.SnapshotSource{ + PodRef: nvidiacomv1alpha1.PodReference{Name: sourcePodName}, + }, + }, + } +} + +// applySnapshot sets ckpt as controller owner and applies the Snapshot via Server-Side Apply, +// emitting an event on success or failure. +func (r *CheckpointReconciler) applySnapshot(ctx context.Context, ckpt *nvidiacomv1alpha1.DynamoCheckpoint, snap *nvidiacomv1alpha1.Snapshot) error { + if err := ctrl.SetControllerReference(ckpt, snap, r.Scheme()); err != nil { + return err + } + if err := r.Patch(ctx, snap, client.Apply, + client.FieldOwner(checkpointSnapshotFieldManager), client.ForceOwnership); err != nil { + r.Recorder.Event(ckpt, corev1.EventTypeWarning, "SnapshotCreateFailed", err.Error()) + return err + } + r.Recorder.Eventf(ckpt, corev1.EventTypeNormal, "SnapshotCreated", "Created Snapshot %s", snap.Name) + return nil +} + +// updateFailedStatus marks the checkpoint Failed after a terminal Snapshot error. The failure +// event is emitted at the point of failure in ensureSnapshot; this records status only and does +// not stomp the JobCreated condition (the Job was created; only the Snapshot failed). +func (r *CheckpointReconciler) updateFailedStatus(ctx context.Context, ckpt *nvidiacomv1alpha1.DynamoCheckpoint, err error) { + ckpt.Status.Phase = nvidiacomv1alpha1.DynamoCheckpointPhaseFailed + ckpt.Status.Message = fmt.Sprintf("snapshot creation failed: %v", err) + if uerr := r.Status().Update(ctx, ckpt); uerr != nil { + log.FromContext(ctx).Error(uerr, "failed to update DynamoCheckpoint status after snapshot failure") + } +} diff --git a/deploy/operator/internal/controller/checkpoint_snapshot_test.go b/deploy/operator/internal/controller/checkpoint_snapshot_test.go new file mode 100644 index 000000000000..877d8e4a72a0 --- /dev/null +++ b/deploy/operator/internal/controller/checkpoint_snapshot_test.go @@ -0,0 +1,153 @@ +/* + * SPDX-FileCopyrightText: Copyright (c) 2025-2026 NVIDIA CORPORATION & AFFILIATES. All rights reserved. + * SPDX-License-Identifier: Apache-2.0 + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package controller + +import ( + "context" + "testing" + + nvidiacomv1alpha1 "github.com/ai-dynamo/dynamo/deploy/operator/api/v1alpha1" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + batchv1 "k8s.io/api/batch/v1" + corev1 "k8s.io/api/core/v1" + apierrors "k8s.io/apimachinery/pkg/api/errors" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/types" + "k8s.io/utils/ptr" + "sigs.k8s.io/controller-runtime/pkg/client" +) + +func newCheckpointJob(name string) *batchv1.Job { + return &batchv1.Job{ + ObjectMeta: metav1.ObjectMeta{Name: name, Namespace: testNamespace, UID: types.UID("job-uid")}, + } +} + +// podNameFromJob derives the test source-pod name for a checkpoint Job. +func podNameFromJob(jobName string) string { + return jobName + "-pod" +} + +func newOwnedPod(podName string, job *batchv1.Job) *corev1.Pod { + return &corev1.Pod{ + ObjectMeta: metav1.ObjectMeta{ + Name: podName, + Namespace: testNamespace, + Labels: map[string]string{batchv1.JobNameLabel: job.Name}, + OwnerReferences: []metav1.OwnerReference{{ + APIVersion: "batch/v1", + Kind: "Job", + Name: job.Name, + UID: job.UID, + Controller: ptr.To(true), + }}, + }, + } +} + +func newOwnedCheckpoint() *nvidiacomv1alpha1.DynamoCheckpoint { + ckpt := makeTestCheckpoint(nvidiacomv1alpha1.DynamoCheckpointPhaseCreating) + ckpt.UID = types.UID("ckpt-uid") + return ckpt +} + +func TestFindSourcePod_ReturnsJobOwnedPod(t *testing.T) { + job := newCheckpointJob(defaultCheckpointJobName) + pod := newOwnedPod("worker-xyz", job) + r := makeCheckpointReconciler(checkpointTestScheme(), job, pod) + + got, err := r.findSourcePod(context.Background(), job) + require.NoError(t, err) + require.NotNil(t, got) + assert.Equal(t, "worker-xyz", got.Name) +} + +func TestFindSourcePod_NotCreatedReturnsNotFound(t *testing.T) { + job := newCheckpointJob(defaultCheckpointJobName) + r := makeCheckpointReconciler(checkpointTestScheme(), job) + + got, err := r.findSourcePod(context.Background(), job) + require.Error(t, err) + assert.True(t, apierrors.IsNotFound(err)) + assert.Nil(t, got) + assert.NoError(t, client.IgnoreNotFound(err)) +} + +func TestFindSourcePod_IgnoresPodNotOwnedByJob(t *testing.T) { + job := newCheckpointJob(defaultCheckpointJobName) + other := newOwnedPod("stray", &batchv1.Job{ObjectMeta: metav1.ObjectMeta{Name: job.Name, UID: types.UID("different-uid")}}) + r := makeCheckpointReconciler(checkpointTestScheme(), job, other) + + _, err := r.findSourcePod(context.Background(), job) + assert.True(t, apierrors.IsNotFound(err)) +} + +func TestEnsureSnapshot_CreatesWhenAbsent(t *testing.T) { + ckpt := newOwnedCheckpoint() + r := makeCheckpointReconciler(checkpointTestScheme(), ckpt) + + require.NoError(t, r.ensureSnapshot(context.Background(), ckpt, testHash, "worker-xyz")) + + snap := &nvidiacomv1alpha1.Snapshot{} + require.NoError(t, r.Get(context.Background(), + client.ObjectKey{Namespace: testNamespace, Name: snapshotName(testHash)}, snap)) + assert.Equal(t, testHash, snap.Spec.CheckpointID) + assert.Equal(t, "worker-xyz", snap.Spec.Source.PodRef.Name) + assert.True(t, metav1.IsControlledBy(snap, ckpt), "snapshot must be controlled by the checkpoint") +} + +func TestEnsureSnapshot_NoopWhenAlreadyOwned(t *testing.T) { + ckpt := newOwnedCheckpoint() + r := makeCheckpointReconciler(checkpointTestScheme(), ckpt) + require.NoError(t, r.ensureSnapshot(context.Background(), ckpt, testHash, "worker-xyz")) + + // Second call is a no-op (already owned by us). + require.NoError(t, r.ensureSnapshot(context.Background(), ckpt, testHash, "worker-xyz")) + + var snaps nvidiacomv1alpha1.SnapshotList + require.NoError(t, r.List(context.Background(), &snaps, client.InNamespace(testNamespace))) + assert.Len(t, snaps.Items, 1) +} + +func TestEnsureSnapshot_ErrorsWhenNotOwned(t *testing.T) { + ckpt := newOwnedCheckpoint() + foreign := &nvidiacomv1alpha1.Snapshot{ + ObjectMeta: metav1.ObjectMeta{Name: snapshotName(testHash), Namespace: testNamespace}, + Spec: nvidiacomv1alpha1.SnapshotSpec{ + CheckpointID: testHash, + Source: nvidiacomv1alpha1.SnapshotSource{PodRef: nvidiacomv1alpha1.PodReference{Name: "someone-else"}}, + }, + } + r := makeCheckpointReconciler(checkpointTestScheme(), ckpt, foreign) + + err := r.ensureSnapshot(context.Background(), ckpt, testHash, "worker-xyz") + require.Error(t, err) + assert.Contains(t, err.Error(), "not owned by checkpoint") + // Must be terminal (Forbidden) so the capture fails instead of requeuing forever. + assert.True(t, apierrors.IsForbidden(err)) +} + +func TestUpdateFailedStatus_MarksCheckpointFailed(t *testing.T) { + ckpt := newOwnedCheckpoint() + r := makeCheckpointReconciler(checkpointTestScheme(), ckpt) + + r.updateFailedStatus(context.Background(), ckpt, assert.AnError) + assert.Equal(t, nvidiacomv1alpha1.DynamoCheckpointPhaseFailed, ckpt.Status.Phase) + assert.Contains(t, ckpt.Status.Message, "snapshot creation failed") +} diff --git a/deploy/operator/internal/controller/dynamocheckpoint_controller.go b/deploy/operator/internal/controller/dynamocheckpoint_controller.go index f4299e9e2394..4648befd56b6 100644 --- a/deploy/operator/internal/controller/dynamocheckpoint_controller.go +++ b/deploy/operator/internal/controller/dynamocheckpoint_controller.go @@ -331,6 +331,26 @@ func (r *CheckpointReconciler) handleCreating(ctx context.Context, ckpt *nvidiac return ctrl.Result{}, err } + // Required step: create the Snapshot once the source pod exists. The checkpoint cannot + // reach Ready without it, so creation failure fails or requeues the capture. + checkpointID, err := checkpoint.CheckpointID(ckpt) + if err != nil { + return ctrl.Result{}, err + } + pod, err := r.findSourcePod(ctx, job) + if err != nil { + if client.IgnoreNotFound(err) == nil { + return ctrl.Result{RequeueAfter: time.Second}, nil + } + return ctrl.Result{}, err + } + if err := r.ensureSnapshot(ctx, ckpt, checkpointID, pod.Name); err != nil { + if commonController.IgnoreIntermediateError(err) != nil { + r.updateFailedStatus(ctx, ckpt, err) + } + return ctrl.Result{}, err + } + var lease *coordinationv1.Lease leaseKey := client.ObjectKey{Namespace: job.Namespace, Name: job.Name} lease = &coordinationv1.Lease{} diff --git a/deploy/operator/internal/controller/dynamocheckpoint_controller_test.go b/deploy/operator/internal/controller/dynamocheckpoint_controller_test.go index a9f1413599cc..fdb63a8f93f1 100644 --- a/deploy/operator/internal/controller/dynamocheckpoint_controller_test.go +++ b/deploy/operator/internal/controller/dynamocheckpoint_controller_test.go @@ -734,7 +734,7 @@ func TestCheckpointReconciler_FinalizeResourceCleansRetainedAutoCheckpointOnCRDe BasePath: "/checkpoints", }) require.NoError(t, err) - r := makeCheckpointReconciler(s, ckpt, job) + r := makeCheckpointReconciler(s, ckpt, job, newOwnedPod(podNameFromJob(job.Name), job)) r.Config = cfg err = r.FinalizeResource(ctx, ckpt) @@ -759,7 +759,7 @@ func TestCheckpointReconciler_FinalizeResourceCleansRetainedAutoCheckpointOnCRDe Status: corev1.ConditionTrue, Message: "boom", }} - r := makeCheckpointReconciler(s, ckpt, job) + r := makeCheckpointReconciler(s, ckpt, job, newOwnedPod(podNameFromJob(job.Name), job)) r.Config = cfg err = r.FinalizeResource(ctx, ckpt) @@ -786,7 +786,7 @@ func TestCheckpointReconciler_FinalizeResourceCleansRetainedAutoCheckpointOnCRDe Type: batchv1.JobComplete, Status: corev1.ConditionTrue, }} - r := makeCheckpointReconciler(s, ckpt, job) + r := makeCheckpointReconciler(s, ckpt, job, newOwnedPod(podNameFromJob(job.Name), job)) r.Config = cfg require.NoError(t, r.FinalizeResource(ctx, ckpt)) @@ -806,11 +806,46 @@ func TestCheckpointReconciler_HandleCreating(t *testing.T) { if name != "" { ckpt.Name = name } + ckpt.UID = types.UID("ckpt-uid") // required for Snapshot ownership ckpt.Status.IdentityHash = testHash ckpt.Status.JobName = jobName return ckpt } + t.Run("waits without creating a Snapshot until the source pod exists", func(t *testing.T) { + ckpt := makeCreatingCkpt(testHash, "job-no-pod") + job := &batchv1.Job{ObjectMeta: metav1.ObjectMeta{Name: "job-no-pod", Namespace: testNamespace}} + + r := makeCheckpointReconciler(s, ckpt, job) // no Job-owned pod yet + result, err := r.handleCreating(ctx, ckpt) + require.NoError(t, err) + assert.Equal(t, time.Second, result.RequeueAfter) + + updated := &nvidiacomv1alpha1.DynamoCheckpoint{} + require.NoError(t, r.Get(ctx, types.NamespacedName{Name: testHash, Namespace: testNamespace}, updated)) + assert.Equal(t, nvidiacomv1alpha1.DynamoCheckpointPhaseCreating, updated.Status.Phase) + + var snaps nvidiacomv1alpha1.SnapshotList + require.NoError(t, r.List(ctx, &snaps, client.InNamespace(testNamespace))) + assert.Empty(t, snaps.Items) + }) + + t.Run("creates the Snapshot once the source pod exists", func(t *testing.T) { + ckpt := makeCreatingCkpt(testHash, defaultCheckpointJobName) + job := newCheckpointJob(defaultCheckpointJobName) + r := makeCheckpointReconciler(s, ckpt, job, newOwnedPod("worker-0", job)) + + _, err := r.handleCreating(ctx, ckpt) + require.NoError(t, err) + + snap := &nvidiacomv1alpha1.Snapshot{} + require.NoError(t, r.Get(ctx, + types.NamespacedName{Name: snapshotName(testHash), Namespace: testNamespace}, snap)) + assert.Equal(t, testHash, snap.Spec.CheckpointID) + assert.Equal(t, "worker-0", snap.Spec.Source.PodRef.Name) + assert.True(t, metav1.IsControlledBy(snap, ckpt)) + }) + t.Run("succeeded job transitions to Ready", func(t *testing.T) { ckpt := makeCreatingCkpt(testHash, defaultCheckpointJobName) job := &batchv1.Job{ @@ -827,7 +862,7 @@ func TestCheckpointReconciler_HandleCreating(t *testing.T) { }, } - r := makeCheckpointReconciler(s, ckpt, job) + r := makeCheckpointReconciler(s, ckpt, job, newOwnedPod(podNameFromJob(job.Name), job)) _, err := r.handleCreating(ctx, ckpt) require.NoError(t, err) @@ -846,7 +881,7 @@ func TestCheckpointReconciler_HandleCreating(t *testing.T) { }, } - r := makeCheckpointReconciler(s, ckpt, job) + r := makeCheckpointReconciler(s, ckpt, job, newOwnedPod(podNameFromJob(job.Name), job)) _, err := r.handleCreating(ctx, ckpt) require.NoError(t, err) @@ -870,7 +905,7 @@ func TestCheckpointReconciler_HandleCreating(t *testing.T) { } lease := makeCheckpointLease("job-missing-status-active-lease", time.Now(), 30) - r := makeCheckpointReconciler(s, ckpt, job, lease) + r := makeCheckpointReconciler(s, ckpt, job, lease, newOwnedPod(podNameFromJob(job.Name), job)) result, err := r.handleCreating(ctx, ckpt) require.NoError(t, err) assert.Equal(t, time.Second, result.RequeueAfter) @@ -893,7 +928,7 @@ func TestCheckpointReconciler_HandleCreating(t *testing.T) { }, }, } - r := makeCheckpointReconciler(s, ckpt, job) + r := makeCheckpointReconciler(s, ckpt, job, newOwnedPod(podNameFromJob(job.Name), job)) _, err := r.handleCreating(ctx, ckpt) require.NoError(t, err) @@ -919,7 +954,7 @@ func TestCheckpointReconciler_HandleCreating(t *testing.T) { }, } - r := makeCheckpointReconciler(s, ckpt, job) + r := makeCheckpointReconciler(s, ckpt, job, newOwnedPod(podNameFromJob(job.Name), job)) _, err := r.handleCreating(ctx, ckpt) require.NoError(t, err) @@ -940,7 +975,7 @@ func TestCheckpointReconciler_HandleCreating(t *testing.T) { Status: batchv1.JobStatus{Active: 1}, } - r := makeCheckpointReconciler(s, ckpt, job) + r := makeCheckpointReconciler(s, ckpt, job, newOwnedPod(podNameFromJob(job.Name), job)) _, err := r.handleCreating(ctx, ckpt) require.NoError(t, err) @@ -957,7 +992,7 @@ func TestCheckpointReconciler_HandleCreating(t *testing.T) { Status: batchv1.JobStatus{Active: 1}, } - r := makeCheckpointReconciler(s, ckpt, job) + r := makeCheckpointReconciler(s, ckpt, job, newOwnedPod(podNameFromJob(job.Name), job)) _, err := r.handleCreating(ctx, ckpt) require.NoError(t, err) @@ -983,7 +1018,7 @@ func TestCheckpointReconciler_HandleCreating(t *testing.T) { }, } - r := makeCheckpointReconciler(s, ckpt, job) + r := makeCheckpointReconciler(s, ckpt, job, newOwnedPod(podNameFromJob(job.Name), job)) _, err := r.handleCreating(ctx, ckpt) require.NoError(t, err) @@ -999,7 +1034,7 @@ func TestCheckpointReconciler_HandleCreating(t *testing.T) { Status: batchv1.JobStatus{Succeeded: 1}, } - r := makeCheckpointReconciler(s, ckpt, job) + r := makeCheckpointReconciler(s, ckpt, job, newOwnedPod(podNameFromJob(job.Name), job)) _, err := r.handleCreating(ctx, ckpt) require.NoError(t, err) diff --git a/deploy/operator/internal/controller_common/errors.go b/deploy/operator/internal/controller_common/errors.go new file mode 100644 index 000000000000..c77638ed529d --- /dev/null +++ b/deploy/operator/internal/controller_common/errors.go @@ -0,0 +1,36 @@ +/* + * SPDX-FileCopyrightText: Copyright (c) 2025-2026 NVIDIA CORPORATION & AFFILIATES. All rights reserved. + * SPDX-License-Identifier: Apache-2.0 + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package controller_common + +import ( + apierrors "k8s.io/apimachinery/pkg/api/errors" +) + +// IgnoreIntermediateError returns err for a real (terminal) Kubernetes API error that will not +// resolve on retry — Invalid, BadRequest, or Forbidden — and nil for an intermediate +// (transient, retryable) error. It mirrors client.IgnoreNotFound, so callers can write: +// +// if controller_common.IgnoreIntermediateError(err) != nil { /* terminal: handle */ } +// +// to act only on terminal failures while letting transient ones requeue. +func IgnoreIntermediateError(err error) error { + if apierrors.IsInvalid(err) || apierrors.IsBadRequest(err) || apierrors.IsForbidden(err) { + return err + } + return nil +} diff --git a/deploy/operator/internal/controller_common/errors_test.go b/deploy/operator/internal/controller_common/errors_test.go new file mode 100644 index 000000000000..ee80f2e56ba1 --- /dev/null +++ b/deploy/operator/internal/controller_common/errors_test.go @@ -0,0 +1,54 @@ +/* + * SPDX-FileCopyrightText: Copyright (c) 2025-2026 NVIDIA CORPORATION & AFFILIATES. All rights reserved. + * SPDX-License-Identifier: Apache-2.0 + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package controller_common + +import ( + "errors" + "testing" + + "github.com/stretchr/testify/assert" + apierrors "k8s.io/apimachinery/pkg/api/errors" + "k8s.io/apimachinery/pkg/runtime/schema" +) + +func TestIgnoreIntermediateError(t *testing.T) { + gr := schema.GroupResource{Group: "nvidia.com", Resource: "snapshots"} + + terminal := map[string]error{ + "invalid": apierrors.NewInvalid(schema.GroupKind{Group: "nvidia.com", Kind: "Snapshot"}, "x", nil), + "badRequest": apierrors.NewBadRequest("bad"), + "forbidden": apierrors.NewForbidden(gr, "x", errors.New("not owned")), + } + for name, err := range terminal { + t.Run("terminal/"+name, func(t *testing.T) { + assert.Error(t, IgnoreIntermediateError(err), "real errors must be returned") + }) + } + + intermediate := map[string]error{ + "conflict": apierrors.NewConflict(gr, "x", errors.New("conflict")), + "timeout": apierrors.NewServerTimeout(gr, "create", 1), + "plain": errors.New("network blip"), + "nil": nil, + } + for name, err := range intermediate { + t.Run("intermediate/"+name, func(t *testing.T) { + assert.NoError(t, IgnoreIntermediateError(err), "intermediate errors must be ignored") + }) + } +}