Skip to content

Commit a221160

Browse files
committed
Add checkpointStore
1 parent 5880943 commit a221160

15 files changed

Lines changed: 1012 additions & 134 deletions

File tree

api/v1/dataflow_types.go

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -66,6 +66,12 @@ type DataFlowSpec struct {
6666
// ImagePullSecrets is a list of references to secrets in the same namespace to use for pulling the processor image from a private registry.
6767
// +optional
6868
ImagePullSecrets []corev1.LocalObjectReference `json:"imagePullSecrets,omitempty"`
69+
70+
// CheckpointPersistence enables persisting source checkpoint (lastReadID, lastReadChangeTime) to a ConfigMap.
71+
// When enabled, polling sources (PostgreSQL, ClickHouse, Trino) resume from the last committed position after restart, reducing duplicates.
72+
// Default: true. Set to false to disable.
73+
// +optional
74+
CheckpointPersistence *bool `json:"checkpointPersistence,omitempty"`
6975
}
7076

7177
// SourceSpec defines the source configuration

cmd/processor/main.go

Lines changed: 35 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@ import (
2525
"os/signal"
2626
"strings"
2727
"syscall"
28+
"time"
2829

2930
"github.com/prometheus/client_golang/prometheus/promhttp"
3031
"go.uber.org/zap"
@@ -34,6 +35,7 @@ import (
3435
"sigs.k8s.io/controller-runtime/pkg/metrics"
3536

3637
dataflowv1 "github.com/dataflow-operator/dataflow/api/v1"
38+
"github.com/dataflow-operator/dataflow/internal/checkpoint"
3739
"github.com/dataflow-operator/dataflow/internal/constants"
3840
"github.com/dataflow-operator/dataflow/internal/logkeys"
3941
_ "github.com/dataflow-operator/dataflow/internal/metrics" // Register metrics
@@ -77,8 +79,24 @@ func main() {
7779
os.Exit(1)
7880
}
7981

82+
// Setup checkpoint store if persistence is enabled
83+
var procOpts []processor.ProcessorOption
84+
// CheckpointPersistence defaults to true when nil
85+
if (spec.CheckpointPersistence == nil || *spec.CheckpointPersistence) && name != "" && namespace != "" {
86+
configMapName := "dataflow-" + name + "-checkpoint"
87+
store, err := checkpoint.NewConfigMapStore(namespace, configMapName)
88+
if err != nil {
89+
logger.Error(err, "Failed to create checkpoint store, continuing without persistence")
90+
} else {
91+
ctx := context.Background()
92+
store.Start(ctx)
93+
defer store.Stop()
94+
procOpts = append(procOpts, processor.WithCheckpointStore(store))
95+
}
96+
}
97+
8098
// Create processor
81-
proc, err := processor.NewProcessorWithLoggerAndMetadata(&spec, logger, namespace, name)
99+
proc, err := processor.NewProcessorWithOptions(&spec, logger, namespace, name, procOpts...)
82100
if err != nil {
83101
logger.Error(err, "Failed to create processor")
84102
os.Exit(1)
@@ -110,22 +128,31 @@ func main() {
110128
}()
111129

112130
// Wait for signal or error
131+
var procErr error
113132
select {
114133
case sig := <-sigChan:
115134
logger.Info("Received signal, shutting down", "signal", sig)
116135
cancel()
117-
// Wait for processor to finish
118-
if err := <-errChan; err != nil {
119-
logger.Error(err, "Processor exited with error")
120-
os.Exit(1)
136+
procErr = <-errChan
137+
if procErr != nil {
138+
logger.Error(procErr, "Processor exited with error")
121139
}
122-
case err := <-errChan:
123-
if err != nil {
124-
logger.Error(err, "Processor error")
140+
// Flush checkpoint before exit
141+
flushCtx, flushCancel := context.WithTimeout(context.Background(), 10*time.Second)
142+
if err := proc.FlushCheckpoint(flushCtx); err != nil {
143+
logger.Error(err, "Failed to flush checkpoint")
144+
}
145+
flushCancel()
146+
case procErr = <-errChan:
147+
if procErr != nil {
148+
logger.Error(procErr, "Processor error")
125149
os.Exit(1)
126150
}
127151
}
128152

153+
if procErr != nil {
154+
os.Exit(1)
155+
}
129156
logger.Info("Processor stopped successfully")
130157
}
131158

config/rbac/role.yaml

Lines changed: 0 additions & 79 deletions
This file was deleted.

0 commit comments

Comments
 (0)