-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathtypes.go
More file actions
129 lines (115 loc) · 3.02 KB
/
types.go
File metadata and controls
129 lines (115 loc) · 3.02 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
package delta
import (
"fmt"
"sync"
"time"
)
// Msg is a message persisted in a stream.
//
// Msg values are delivered to subscribers and can be replied to when they
// originate from a live subscription.
type Msg struct {
mq *MQ
MessageId uint64
Topic string
Payload []byte
At time.Time
}
// Reply publishes a reply message to this message's inbox topic.
//
// It returns an error when the message was not delivered from a live MQ.
func (m *Msg) Reply(payload []byte) (Msg, error) {
if m.mq == nil {
return Msg{}, fmt.Errorf("message does not support reply")
}
reply := Msg{
Topic: fmt.Sprintf("_inbox.%d", m.MessageId),
Payload: payload,
}
return m.mq.write(reply)
}
// Publication is the result of a publish operation.
//
// For PublishAsync, wait on Done and then inspect Err.
type Publication struct {
Msg
Err error
done chan struct{}
}
// Done returns a channel that is closed when the publish operation finishes.
func (p *Publication) Done() <-chan struct{} {
return p.done
}
// Subscription represents an active topic subscription.
//
// Messages are delivered on Chan until Unsubscribe is called.
type Subscription struct {
id string
topic string
Unsubscribe func()
closeOnce sync.Once
closed bool
notifyChan chan Msg
doneChan chan struct{} // closed when the subscription is closed; lets notify() abort without holding the write lock
notifyMu sync.RWMutex
mu sync.Mutex
}
func (s *Subscription) close() {
// Signal doneChan first, without holding the write lock, so that any
// notify() call currently blocked in its select can wake up and release
// the read lock. Only then acquire the write lock to mark closed and
// close the data channel.
s.closeOnce.Do(func() {
close(s.doneChan)
s.notifyMu.Lock()
defer s.notifyMu.Unlock()
s.closed = true
close(s.notifyChan)
})
}
// Topic returns the subscription's normalized topic pattern.
func (s *Subscription) Topic() string {
return s.topic
}
// Id returns the unique identifier for this subscription.
func (s *Subscription) Id() string {
return s.id
}
func (s *Subscription) notify(m Msg) {
s.notifyMu.RLock()
defer s.notifyMu.RUnlock()
if s.closed {
return
}
// Use a select so that a concurrent close() can signal doneChan and unblock
// this send without needing to acquire the write lock while we hold the read
// lock, which would otherwise cause a deadlock.
select {
case s.notifyChan <- m:
case <-s.doneChan:
}
}
func (s *Subscription) tryNotify(m Msg) (written bool) {
s.notifyMu.RLock()
defer s.notifyMu.RUnlock()
if s.closed {
return false
}
select {
case s.notifyChan <- m:
return true
default:
return false
}
}
// Chan returns the channel used to deliver subscription messages.
func (s *Subscription) Chan() <-chan Msg {
return s.notifyChan
}
// Next blocks until the next message arrives or the subscription closes.
//
// The boolean return value is false when the subscription is closed.
func (s *Subscription) Next() (Msg, bool) {
m, ok := <-s.notifyChan
return m, ok
}