-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathqueue.go
More file actions
122 lines (110 loc) · 3.22 KB
/
queue.go
File metadata and controls
122 lines (110 loc) · 3.22 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
// Package goqueue provides a concurrency-limited FIFO work queue.
//
// A Queue executes submitted functions while ensuring that no more than a
// configured number run concurrently. Additional functions are queued and
// executed in first-in-first-out (FIFO) order as running functions complete.
//
// The zero value of Queue is not usable. Use NewQueue to construct a Queue.
package goqueue
import (
"container/list"
"context"
"fmt"
)
// Queue represents a concurrency-limited FIFO work queue.
//
// A Queue guarantees that at most maxActive functions are running at any
// given time. If the limit has been reached, additional functions submitted
// with Add are placed in a backlog and executed in submission order.
//
// Queue is safe for concurrent use by multiple goroutines.
type Queue struct {
maxActive int
st chan queueState
}
type queueState struct {
active int
backlog *list.List
idle chan struct{}
}
// NewQueue creates a new Queue that allows at most maxActive functions
// to run concurrently.
//
// maxActive must be greater than zero. If maxActive is less than 1,
// NewQueue returns an error.
func NewQueue(maxActive int) (*Queue, error) {
if maxActive < 1 {
return nil, fmt.Errorf("goQueue called with nonpositive limit (%d)", maxActive)
}
q := &Queue{maxActive: maxActive, st: make(chan queueState, 1)}
q.st <- queueState{backlog: list.New()}
return q, nil
}
// Add submits a function to the Queue for execution.
//
// If fewer than the maximum number of functions are currently running,
// f is executed immediately in its own goroutine. Otherwise, f is added
// to the backlog and will be executed in FIFO order when capacity becomes
// available.
//
// The provided context is passed to the function when it executes.
// Add does not block waiting for execution to begin.
//
// The function f must not panic. If f panics, the behavior of the Queue
// is undefined.
func (q *Queue) Add(ctx context.Context, f func(context.Context)) {
st := <-q.st
if st.active == q.maxActive {
st.backlog.PushBack(f)
q.st <- st
return
}
if st.active == 0 {
// Mark q as non-idle
st.idle = nil
}
st.active++
q.st <- st
go func() {
for {
f(ctx)
st := <-q.st
if st.backlog.Len() == 0 {
if st.active--; st.active == 0 && st.idle != nil {
close(st.idle)
}
q.st <- st
return
}
f = st.backlog.Remove(st.backlog.Front()).(func(context.Context))
q.st <- st
}
}()
}
// Idle returns a channel that is closed when the Queue becomes idle.
//
// The returned channel is closed when there are no active functions running
// and the backlog is empty. If the Queue is already idle at the time of the
// call, the returned channel is already closed.
//
// Multiple calls to Idle may return the same channel while the Queue
// remains non-idle.
func (q *Queue) Idle() <-chan struct{} {
st := <-q.st
defer func() { q.st <- st }()
if st.idle == nil {
st.idle = make(chan struct{})
if st.active == 0 {
close(st.idle)
}
}
return st.idle
}
// BacklogLen returns the number of functions currently waiting in the backlog.
//
// This does not include functions that are actively running.
func (q *Queue) BacklogLen() int64 {
st := <-q.st
defer func() { q.st <- st }()
return int64(st.backlog.Len())
}