Skip to content

Commit 5880943

Browse files
committed
Update clickhouse retry
1 parent 21fc42b commit 5880943

4 files changed

Lines changed: 221 additions & 10 deletions

File tree

internal/connectors/clickhouse.go

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -153,7 +153,12 @@ func (c *ClickHouseSourceConnector) readRows(ctx context.Context, msgChan chan *
153153
}
154154

155155
c.logger.V(1).Info("Executing ClickHouse query", "query", query, "table", c.config.Table)
156-
rows, err := conn.QueryContext(ctx, query)
156+
var rows *sql.Rows
157+
err := retry.OnRetryableClickHouse(ctx, 3, 1*time.Second, func() error {
158+
var qerr error
159+
rows, qerr = conn.QueryContext(ctx, query)
160+
return qerr
161+
})
157162
if err != nil {
158163
c.logger.Error(err, "Failed to execute ClickHouse query", "query", query, "table", c.config.Table)
159164
return
@@ -799,7 +804,7 @@ func (c *ClickHouseSinkConnector) Write(ctx context.Context, messages <-chan *ty
799804
batchCtx, cancel = context.WithTimeout(context.Background(), 30*time.Second)
800805
defer cancel()
801806
}
802-
if err := retry.OnTimeout(batchCtx, retry.DefaultMaxAttempts, retry.DefaultInitialBackoff, func() error {
807+
if err := retry.OnRetryableClickHouse(batchCtx, retry.ClickHouseMaxAttempts, retry.ClickHouseInitialBackoff, func() error {
803808
return flushBatch(batchCtx, toFlush)
804809
}); err != nil {
805810
return err

internal/processor/processor.go

Lines changed: 47 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -156,8 +156,8 @@ func NewProcessorWithLoggerAndMetadata(spec *v1.DataFlowSpec, logger logr.Logger
156156
func (p *Processor) Start(ctx context.Context) error {
157157
p.logger.Info("Starting processor")
158158

159-
// Connect to source
160-
if err := p.source.Connect(ctx); err != nil {
159+
// Connect to source with retry on transient errors (connection refused, etc.)
160+
if err := p.connectSourceWithRetry(ctx); err != nil {
161161
p.logger.Error(err, "Failed to connect to source")
162162
return fmt.Errorf("failed to connect to source: %w", err)
163163
}
@@ -172,9 +172,9 @@ func (p *Processor) Start(ctx context.Context) error {
172172
defer p.sink.Close()
173173
p.logger.Info("Connected to sink")
174174

175-
// Connect to error sink if specified
175+
// Connect to error sink if specified (with retry on transient errors)
176176
if p.errorSink != nil {
177-
if err := p.errorSink.Connect(ctx); err != nil {
177+
if err := p.connectConnectorWithRetry(ctx, p.errorSink); err != nil {
178178
p.logger.Error(err, "Failed to connect to error sink")
179179
return fmt.Errorf("failed to connect to error sink: %w", err)
180180
}
@@ -201,23 +201,62 @@ func (p *Processor) Start(ctx context.Context) error {
201201
return p.writeMessages(ctx, processedChan)
202202
}
203203

204+
// connectSourceWithRetry connects to source, retrying on transient errors (connection refused,
205+
// timeout, etc.) until success or context cancellation. Prevents pod restart on temporary backend unavailability.
206+
func (p *Processor) connectSourceWithRetry(ctx context.Context) error {
207+
const (
208+
initialBackoff = 30 * time.Second
209+
maxBackoff = 5 * time.Minute
210+
)
211+
backoff := initialBackoff
212+
for {
213+
err := p.source.Connect(ctx)
214+
if err == nil {
215+
return nil
216+
}
217+
if !retry.IsRetryableTransient(err) {
218+
return err
219+
}
220+
p.logger.Info("Transient source connection error, retrying later",
221+
"error", err.Error(),
222+
"backoff", backoff.String())
223+
select {
224+
case <-ctx.Done():
225+
return ctx.Err()
226+
case <-time.After(backoff):
227+
if backoff < maxBackoff {
228+
backoff *= 2
229+
if backoff > maxBackoff {
230+
backoff = maxBackoff
231+
}
232+
}
233+
}
234+
}
235+
}
236+
204237
// connectSinkWithRetry connects to sink, retrying on transient errors (connection refused,
205238
// HTTP 500, etc.) until success or context cancellation. Prevents pod restart on temporary backend unavailability.
206239
func (p *Processor) connectSinkWithRetry(ctx context.Context) error {
240+
return p.connectConnectorWithRetry(ctx, p.sink)
241+
}
242+
243+
// connectConnectorWithRetry connects a sink connector with retry on transient errors.
244+
// Used for main sink and router sinks.
245+
func (p *Processor) connectConnectorWithRetry(ctx context.Context, sink connectors.SinkConnector) error {
207246
const (
208247
initialBackoff = 30 * time.Second
209248
maxBackoff = 5 * time.Minute
210249
)
211250
backoff := initialBackoff
212251
for {
213-
err := p.sink.Connect(ctx)
252+
err := sink.Connect(ctx)
214253
if err == nil {
215254
return nil
216255
}
217-
if !retry.IsRetryableForTrino(err) {
256+
if !retry.IsRetryableTransient(err) {
218257
return err
219258
}
220-
p.logger.Info("Transient sink connection error, retrying later",
259+
p.logger.Info("Transient connector connection error, retrying later",
221260
"error", err.Error(),
222261
"backoff", backoff.String())
223262
select {
@@ -549,7 +588,7 @@ func (p *Processor) writeMessages(ctx context.Context, messages <-chan *types.Me
549588
metadataConnector.SetMetadata(p.namespace, p.name)
550589
}
551590

552-
if err := routeSink.Connect(ctx); err != nil {
591+
if err := p.connectConnectorWithRetry(ctx, routeSink); err != nil {
553592
p.logger.Error(err, "Failed to connect to route sink", "condition", cond)
554593
return
555594
}

internal/retry/retry.go

Lines changed: 76 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,12 @@ const TrinoMaxAttempts = 5
1919
// TrinoInitialBackoff is the initial backoff for Trino retries (worker may need time to recover).
2020
const TrinoInitialBackoff = 2 * time.Second
2121

22+
// ClickHouseMaxAttempts is the number of retry attempts for ClickHouse batch writes.
23+
const ClickHouseMaxAttempts = 5
24+
25+
// ClickHouseInitialBackoff is the initial backoff for ClickHouse retries.
26+
const ClickHouseInitialBackoff = 2 * time.Second
27+
2228
// IsTimeoutError returns true if err is or wraps context.DeadlineExceeded,
2329
// or if the error message indicates a timeout (e.g. from drivers).
2430
func IsTimeoutError(err error) bool {
@@ -34,6 +40,76 @@ func IsTimeoutError(err error) bool {
3440
strings.Contains(msg, "i/o timeout")
3541
}
3642

43+
// IsRetryableTransient returns true for generic transient errors (connection refused, timeout, HTTP 5xx).
44+
// Used by connectSourceWithRetry and connectSinkWithRetry for all connector types.
45+
func IsRetryableTransient(err error) bool {
46+
if err == nil {
47+
return false
48+
}
49+
if IsTimeoutError(err) {
50+
return true
51+
}
52+
lower := strings.ToLower(err.Error())
53+
return strings.Contains(lower, "connection refused") ||
54+
strings.Contains(lower, "connect timeout") ||
55+
strings.Contains(lower, "connection reset") ||
56+
strings.Contains(lower, "status 503") ||
57+
strings.Contains(lower, "status 502") ||
58+
strings.Contains(lower, "internal server error") ||
59+
strings.Contains(lower, "http/500") ||
60+
strings.Contains(lower, "bad gateway") ||
61+
strings.Contains(lower, "service temporarily unavailable") ||
62+
(strings.Contains(lower, "temporary") && strings.Contains(lower, "unavailable"))
63+
}
64+
65+
// IsTransientClickHouseError returns true if err looks like a transient ClickHouse error
66+
// (TOO_MANY_PARTS, memory limit, connection refused, etc.).
67+
func IsTransientClickHouseError(err error) bool {
68+
if err == nil {
69+
return false
70+
}
71+
if IsRetryableTransient(err) {
72+
return true
73+
}
74+
lower := strings.ToLower(err.Error())
75+
return strings.Contains(lower, "too_many_parts") ||
76+
strings.Contains(lower, "too many parts") ||
77+
strings.Contains(lower, "memory_limit_exceeded") ||
78+
strings.Contains(lower, "memory limit") ||
79+
strings.Contains(lower, "connection reset") ||
80+
(strings.Contains(lower, "temporary") && strings.Contains(lower, "unavailable"))
81+
}
82+
83+
// IsRetryableForClickHouse returns true if the error is retryable for ClickHouse batch writes.
84+
func IsRetryableForClickHouse(err error) bool {
85+
return IsTransientClickHouseError(err)
86+
}
87+
88+
// OnRetryableClickHouse runs op and retries when it returns a transient ClickHouse error.
89+
func OnRetryableClickHouse(ctx context.Context, maxAttempts int, initialBackoff time.Duration, op func() error) error {
90+
var lastErr error
91+
backoff := initialBackoff
92+
for attempt := 0; attempt < maxAttempts; attempt++ {
93+
lastErr = op()
94+
if lastErr == nil {
95+
return nil
96+
}
97+
if !IsRetryableForClickHouse(lastErr) {
98+
return lastErr
99+
}
100+
if attempt == maxAttempts-1 {
101+
return lastErr
102+
}
103+
select {
104+
case <-ctx.Done():
105+
return ctx.Err()
106+
case <-time.After(backoff):
107+
backoff *= 2
108+
}
109+
}
110+
return lastErr
111+
}
112+
37113
// IsTransientTrinoError returns true if err looks like a transient Trino error
38114
// (TOO_MANY_REQUESTS_FAILED, worker overload/crash, "transient", "retry your query",
39115
// HTTP 500/502/503 from proxy/load balancer, connection refused to backend, etc.).

internal/retry/retry_test.go

Lines changed: 91 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -111,6 +111,34 @@ func TestOnTimeout_ContextCanceled(t *testing.T) {
111111
}
112112
}
113113

114+
func TestIsRetryableTransient(t *testing.T) {
115+
tests := []struct {
116+
name string
117+
err error
118+
want bool
119+
}{
120+
{"nil", nil, false},
121+
{"timeout", errors.New("i/o timeout"), true},
122+
{"connection refused", errors.New("connection refused"), true},
123+
{"connect timeout", errors.New("connect timeout"), true},
124+
{"connection reset", errors.New("connection reset by peer"), true},
125+
{"status 503", errors.New("status 503: Service Unavailable"), true},
126+
{"status 502", errors.New("status 502: Bad Gateway"), true},
127+
{"internal server error", errors.New("Internal Server Error"), true},
128+
{"http/500", errors.New("HTTP/500"), true},
129+
{"bad gateway", errors.New("502 bad gateway"), true},
130+
{"service temporarily unavailable", errors.New("503 Service Temporarily Unavailable"), true},
131+
{"permanent error", errors.New("syntax error"), false},
132+
}
133+
for _, tt := range tests {
134+
t.Run(tt.name, func(t *testing.T) {
135+
if got := IsRetryableTransient(tt.err); got != tt.want {
136+
t.Errorf("IsRetryableTransient() = %v, want %v", got, tt.want)
137+
}
138+
})
139+
}
140+
}
141+
114142
func TestIsTransientTrinoError(t *testing.T) {
115143
tests := []struct {
116144
name string
@@ -238,3 +266,66 @@ func TestOnRetryableTrino_ConnectionRefusedThenSuccess(t *testing.T) {
238266
t.Errorf("expected 2 calls (connection refused then success), got %d", calls)
239267
}
240268
}
269+
270+
func TestIsTransientClickHouseError(t *testing.T) {
271+
tests := []struct {
272+
name string
273+
err error
274+
want bool
275+
}{
276+
{"nil", nil, false},
277+
{"connection refused", errors.New("connect: connection refused"), true},
278+
{"connect timeout", errors.New("connect timeout"), true},
279+
{"TOO_MANY_PARTS", errors.New("DB::Exception: Too many parts"), true},
280+
{"too many parts", errors.New("too many parts in total"), true},
281+
{"memory_limit_exceeded", errors.New("Memory limit exceeded"), true},
282+
{"memory limit", errors.New("Memory limit: would use 1.00 GiB"), true},
283+
{"connection reset", errors.New("connection reset by peer"), true},
284+
{"status 503", errors.New("status 503: Service Unavailable"), true},
285+
{"status 502", errors.New("status 502: Bad Gateway"), true},
286+
{"timeout", errors.New("i/o timeout"), true},
287+
{"permanent error", errors.New("syntax error at position 5"), false},
288+
}
289+
for _, tt := range tests {
290+
t.Run(tt.name, func(t *testing.T) {
291+
if got := IsTransientClickHouseError(tt.err); got != tt.want {
292+
t.Errorf("IsTransientClickHouseError() = %v, want %v", got, tt.want)
293+
}
294+
})
295+
}
296+
}
297+
298+
func TestOnRetryableClickHouse_TransientThenSuccess(t *testing.T) {
299+
ctx := context.Background()
300+
calls := 0
301+
transientErr := errors.New("DB::Exception: Too many parts")
302+
err := OnRetryableClickHouse(ctx, 3, 5*time.Millisecond, func() error {
303+
calls++
304+
if calls < 2 {
305+
return transientErr
306+
}
307+
return nil
308+
})
309+
if err != nil {
310+
t.Errorf("OnRetryableClickHouse() err = %v, want nil", err)
311+
}
312+
if calls != 2 {
313+
t.Errorf("expected 2 calls (retry then success), got %d", calls)
314+
}
315+
}
316+
317+
func TestOnRetryableClickHouse_PermanentErrorNoRetry(t *testing.T) {
318+
ctx := context.Background()
319+
wantErr := errors.New("syntax error at position 5")
320+
calls := 0
321+
err := OnRetryableClickHouse(ctx, 3, 10*time.Millisecond, func() error {
322+
calls++
323+
return wantErr
324+
})
325+
if !errors.Is(err, wantErr) {
326+
t.Errorf("OnRetryableClickHouse() err = %v, want %v", err, wantErr)
327+
}
328+
if calls != 1 {
329+
t.Errorf("expected 1 call (no retry on permanent error), got %d", calls)
330+
}
331+
}

0 commit comments

Comments
 (0)