-
Notifications
You must be signed in to change notification settings - Fork 2
Expand file tree
/
Copy pathoptions_queue.go
More file actions
79 lines (70 loc) · 2.57 KB
/
options_queue.go
File metadata and controls
79 lines (70 loc) · 2.57 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
package frame
import (
"context"
"errors"
"fmt"
"strings"
_ "github.com/pitabwire/natspubsub" // required for NATS pubsub driver registration
_ "gocloud.dev/pubsub/mempubsub" // required for in-memory pubsub driver registration
"github.com/pitabwire/frame/data"
"github.com/pitabwire/frame/queue"
)
// WithRegisterPublisher Option to register publishing path referenced within the system.
func WithRegisterPublisher(reference string, queueURL string) Option {
return func(_ context.Context, s *Service) {
s.registerPlugin("queue")
// Validate inputs and report via startup errors instead of panicking
if strings.TrimSpace(reference) == "" {
s.AddStartupError(errors.New("publisher reference cannot be empty"))
return
}
if !data.DSN(queueURL).Valid() {
s.AddStartupError(fmt.Errorf("publisher queueURL is invalid: %s", queueURL))
return
}
// QueueManager manager is initialized after options are applied,
// so defer registration to pre-start phase
// Publishers must be registered before subscribers (for mem:// driver)
s.AddPublisherStartup(func(ctx context.Context, svc *Service) {
err := svc.QueueManager().AddPublisher(ctx, reference, queueURL)
if err != nil {
svc.Log(ctx).WithError(err).
WithField("publisher_ref", reference).
WithField("publisher_url", queueURL).
Error("Failed to register publisher")
svc.AddStartupError(err)
}
})
}
}
// WithRegisterSubscriber Option to register a new subscription handlers.
func WithRegisterSubscriber(reference string, queueURL string,
handlers ...queue.SubscribeWorker) Option {
return func(_ context.Context, s *Service) {
// Validate inputs and report via startup errors instead of panicking
if strings.TrimSpace(reference) == "" {
s.AddStartupError(errors.New("subscriber reference cannot be empty"))
return
}
if !data.DSN(queueURL).Valid() {
s.AddStartupError(fmt.Errorf("subscriber queueURL is invalid: %s", queueURL))
return
}
// QueueManager manager is initialized after options are applied,
// so defer registration to pre-start phase
// Subscribers must be registered after publishers (for mem:// driver)
s.AddSubscriberStartup(func(ctx context.Context, svc *Service) {
err := svc.QueueManager().AddSubscriber(ctx, reference, queueURL, handlers...)
if err != nil {
svc.Log(ctx).WithError(err).
WithField("subscriber_ref", reference).
WithField("subscriber_url", queueURL).
Error("Failed to register subscriber")
svc.AddStartupError(err)
}
})
}
}
func (s *Service) QueueManager() queue.Manager {
return s.queueManager
}