-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathstreamfleet.go
More file actions
359 lines (295 loc) · 11.9 KB
/
streamfleet.go
File metadata and controls
359 lines (295 loc) · 11.9 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
package streamfleet
import (
"context"
"errors"
"fmt"
"strconv"
"time"
"github.com/redis/go-redis/v9"
)
// DefaultReceiverStreamGcInterval is the default interval at which receiver stream garbage collections are run.
// This applies to clients and servers if no specific interval is set.
const DefaultReceiverStreamGcInterval = 1 * time.Minute
// The current encoding version used for encoding tasks in Redis stream messages.
const curTaskEncVer = "1"
// The current encoding version used for encoding task notifications in Redis stream messages.
const curTaskNotifEncVer = "1"
// TaskMaxPendingTime is the maximum time a task can sit in a worker's pending list before it is reclaimable by other workers.
// Tasks pending time is updated by healthy clients at the interval specified by TaskUpdatePendingInterval.
// This duration will always be higher than TaskUpdatePendingInterval to ensure healthy tasks aren't being reclaimed.
const TaskMaxPendingTime = 1 * time.Minute
// TaskUpdatePendingInterval is the interval at which the pending time of a task is updated.
const TaskUpdatePendingInterval = 10 * time.Second
// KeyPrefix is the prefix used by Streamfleet for all Redis keys, including work queue streams.
// The underlying stream key for a queue will be KeyPrefix + the queue's key.
const KeyPrefix = "streamfleet:"
// ErrNoQueues is returned when running a server or instantiating a client without specifying any queue keys.
var ErrNoQueues = fmt.Errorf("streamfleet: tried to run a server or instantiate a client that had no configured queue keys")
// ErrTaskCanceled is returned when a task has been canceled.
var ErrTaskCanceled = fmt.Errorf("streamfleet: task canceled")
// ErrTaskExpired is returned when a task could not be handled before its expiration time.
var ErrTaskExpired = fmt.Errorf("streamfleet: task expired")
// Task is a task to be processed by a worker.
// It is sent to the queue, and then processed by a worker.
// The only required field is Data.
// Do not construct directly, use client enqueue methods.
type Task struct {
// The message's unique ID.
Id string
// The ID of the client where the task originated.
ClientId string
// The task's underlying data.
// Required, but technically can be nil/empty.
Data string
// The task's maximum retry count.
// If a task is retried more than this many times, it will be removed from the queue, and optionally send a canceled signal.
// If 0, the task will be retried until success.
MaxRetries int
// The timestamp when the task expires.
// If a worker receives the task past this timestamp, it will discard it, and optionally send a canceled signal.
// If omitted, never expires.
// Precision below 1 second is not guaranteed to be enforced immediately by clients; it may take up to 1 second for expirations to be notified.
ExpiresTs time.Time
// The duration to delay retrying the task.
RetryDelay time.Duration
// Whether status notifications for this task should be sent by the worker handling it.
// This is set when the task is enqueued based on which enqueue method is used.
sendNotifications bool
// The current number of retries.
retries int
}
// TaskOpt are options for creating a task.
// All fields are optional.
type TaskOpt struct {
// The maximum number of retries for this task.
// If omitted or 0, the task will be retried until success or expiration.
MaxRetries int
// The timestamp when the task expires.
// If a worker receives the task past this timestamp, it will discard it.
// If omitted, the task will never expire.
ExpiresTs time.Time
// The duration to delay retrying the task.
// If omitted, there is no delay.
RetryDelay time.Duration
}
func newTask(data string, clientId string, sendNotif bool, opt TaskOpt) *Task {
return &Task{
Id: MustUuidV7(),
ClientId: clientId,
Data: data,
MaxRetries: opt.MaxRetries,
ExpiresTs: opt.ExpiresTs,
RetryDelay: opt.RetryDelay,
sendNotifications: sendNotif,
retries: 0,
}
}
func (t *Task) encode() map[string]any {
var expTs int64
if !t.ExpiresTs.IsZero() {
expTs = t.ExpiresTs.UnixMilli()
}
return map[string]any{
"enc_version": curTaskEncVer,
"id": t.Id,
"client_id": t.ClientId,
"data": t.Data,
"max_retries": t.MaxRetries,
"expires_ts": expTs,
"retry_delay": t.RetryDelay.Milliseconds(),
"send_notif": t.sendNotifications,
"retries": t.retries,
}
}
// ErrMissingOrMalformedEncodingVersion is returned when the version field of an encoded message from a Redis stream is missing or malformed.
var ErrMissingOrMalformedEncodingVersion = errors.New("streamfleet: missing or malformed enc_version in encoded message")
// ErrUnsupportedEncodingVersion is returned when the version field of an encoded message from a Redis stream is unsupported.
var ErrUnsupportedEncodingVersion = errors.New("streamfleet: unsupported enc_version in encoded message, this node may be running an outdated version of Streamfleet")
// decodeTask decodes a task from a raw Redis stream message.
// Returns ErrMissingOrMalformedEncodingVersion if the version field is missing or malformed.
// Returns ErrUnsupportedEncodingVersion if the version field is unsupported.
func decodeTask(msg map[string]any) (*Task, error) {
verAny, ok := msg["enc_version"]
if !ok || verAny == nil {
return nil, ErrMissingOrMalformedEncodingVersion
}
verInt, err := strconv.ParseInt(verAny.(string), 10, 64)
if err != nil {
return nil, ErrMissingOrMalformedEncodingVersion
}
switch verInt {
case 1:
fieldId := msg["id"].(string)
fieldClientId := msg["client_id"].(string)
fieldData := msg["data"].(string)
fieldMaxRetriesStr := msg["max_retries"].(string)
fieldExpiresTsStr := msg["expires_ts"].(string)
fieldRetryDelayStr := msg["retry_delay"].(string)
fieldSendNotif := msg["send_notif"] != "0"
fieldRetriesStr := msg["retries"].(string)
fieldMaxRetries, _ := strconv.ParseInt(fieldMaxRetriesStr, 10, 64)
fieldExpiresTs, _ := strconv.ParseInt(fieldExpiresTsStr, 10, 64)
fieldRetryDelay, _ := strconv.ParseInt(fieldRetryDelayStr, 10, 64)
fieldRetries, _ := strconv.ParseInt(fieldRetriesStr, 10, 64)
var expiresTs time.Time
if fieldExpiresTs != 0 {
expiresTs = time.UnixMilli(fieldExpiresTs)
}
return &Task{
Id: fieldId,
ClientId: fieldClientId,
Data: fieldData,
MaxRetries: int(fieldMaxRetries),
ExpiresTs: expiresTs,
RetryDelay: time.Duration(fieldRetryDelay) * time.Millisecond,
sendNotifications: fieldSendNotif,
retries: int(fieldRetries),
}, nil
default:
return nil, ErrUnsupportedEncodingVersion
}
}
// TaskHandle is a handle to a task that was enqueued by a client.
// The task may or may not have been completed.
type TaskHandle struct {
// The task's unique ID.
Id string
expTs time.Time
hasResult bool
result error
// A channel written to and closed when the task is completed.
// If the task completed successfully, the value will be nil.
// If the task failed (or was canceled), the value will be an error.
resultChan chan error
}
// Wait waits for the task to complete and returns the result.
// Subsequent calls will return the same result.
// If the task was canceled, the error will be ErrTaskCanceled.
// If the task expired before being processed, the error will be ErrTaskExpired.
//
// Important: Do not call this method concurrently, otherwise it may panic.
func (t *TaskHandle) Wait() error {
if t.hasResult {
return t.result
}
err := <-t.resultChan
t.hasResult = true
t.result = err
return err
}
type queuedTask struct {
Stream string
Queue string
Task *Task
RedisId string
}
func mkRecvHeartbeatKey(queueKey string) string {
return KeyPrefix + queueKey + ":receivers"
}
func mkRecvStreamKey(id string) string {
return KeyPrefix + "receiver:" + id
}
// The timeout for client receiver stream heartbeats.
// Receiver streams who haven't had a heartbeat for this long will be cleaned up.
const receiverStreamHeartbeatTimeout = 10 * time.Minute
// The interval at which clients emit heartbeats.
const receiverStreamHeartbeatInterval = 10 * time.Second
// runReceiverStreamGc runs the receiver stream garbage collector.
// It finds receiver streams who haven't had a heartbeat in the last receiverStreamHeartbeatTimeout and deletes them.
func runReceiverStreamGc(ctx context.Context, client redis.UniversalClient, queueKeys []string) error {
for _, queue := range queueKeys {
setKey := mkRecvHeartbeatKey(queue)
// Find receiver IDs whose last heartbeat was too long ago.
timeoutCutoff := time.Now().Add(-receiverStreamHeartbeatTimeout)
ids, err := client.ZRangeByScore(ctx, setKey, &redis.ZRangeBy{
Max: strconv.FormatInt(timeoutCutoff.UnixMilli(), 10),
}).Result()
if err != nil {
return fmt.Errorf(`streamfleet: gc: failed to get receiver stream IDs whose heartbeats timed out: %w`, err)
}
if len(ids) == 0 {
continue
}
// Remove receiver streams.
for _, id := range ids {
recvKey := mkRecvStreamKey(id)
// Get groups and delete them.
var groups []redis.XInfoGroup
groups, err = client.XInfoGroups(ctx, recvKey).Result()
if err != nil {
return fmt.Errorf(`streamfleet: gc: failed to get groups on stream with key "%s" that is pending deletion: %w`, recvKey, err)
}
for _, group := range groups {
err = client.XGroupDestroy(ctx, recvKey, group.Name).Err()
if err != nil {
return fmt.Errorf(`streamfleet: gc: failed to delete group "%s" on stream with key "%s" that is pending deletion: %w`, group.Name, recvKey, err)
}
}
err = client.Del(ctx, recvKey, id).Err()
if err != nil {
return fmt.Errorf(`streamfleet: gc: failed to delete receiver stream with key "%s": %w`, recvKey, err)
}
}
err = client.ZRem(ctx, setKey, ids).Err()
if err != nil {
return fmt.Errorf(`streamfleet: gc: failed to remove items from receiver stream heartbeat set with key "%s": %w`, setKey, err)
}
}
return nil
}
type TaskNotificationType string
const (
// TaskNotificationTypeCompleted is sent when a task is completed successfully.
TaskNotificationTypeCompleted TaskNotificationType = "completed"
// TaskNotificationTypeCanceled is sent when a task is canceled.
TaskNotificationTypeCanceled TaskNotificationType = "canceled"
// TaskNotificationTypeExpired is sent when a task expires.
TaskNotificationTypeExpired TaskNotificationType = "expired"
// TaskNotificationTypeError is sent when a task fails due to an error.
// The notification should include the underlying error message.
TaskNotificationTypeError TaskNotificationType = "error"
)
// TaskNotification is a notification sent about a task's status.
type TaskNotification struct {
// The task ID.
TaskId string
// The task type.
Type TaskNotificationType
// The error message, if Type is TaskNotificationTypeError.
// Otherwise, empty.
ErrMsg string
}
func (t *TaskNotification) encode() map[string]any {
return map[string]any{
"enc_version": curTaskNotifEncVer,
"task_id": t.TaskId,
"type": string(t.Type),
"err_msg": t.ErrMsg,
}
}
// decodeTaskNotification decodes a task notification from a raw Redis stream message.
// Returns ErrMissingOrMalformedEncodingVersion if the version field is missing or malformed.
// Returns ErrUnsupportedEncodingVersion if the version field is unsupported.
func decodeTaskNotification(msg map[string]any) (*TaskNotification, error) {
verAny, ok := msg["enc_version"]
if !ok || verAny == nil {
return nil, ErrMissingOrMalformedEncodingVersion
}
verInt, err := strconv.ParseInt(verAny.(string), 10, 64)
if err != nil {
return nil, ErrMissingOrMalformedEncodingVersion
}
switch verInt {
case 1:
fieldTaskId := msg["task_id"].(string)
fieldType := msg["type"].(string)
fieldErrMsg := msg["err_msg"].(string)
return &TaskNotification{
TaskId: fieldTaskId,
Type: TaskNotificationType(fieldType),
ErrMsg: fieldErrMsg,
}, nil
default:
return nil, ErrUnsupportedEncodingVersion
}
}