Skip to content

Commit b975fee

Browse files
committed
fix scheduler
* switched to using a scheduler for each occasion instead of one global scheduler for everything, this helps futur scheduled jobs not colliding with scheduled requests (reqscheduler) * fixed scheduler fetching > limit * fixed incoming scheduled requests only considering queue when determining wether to add to heap (queue) => incoming requests now check if earlier than latest heap request AND if they are the earliest request in the db
1 parent da85073 commit b975fee

5 files changed

Lines changed: 66 additions & 43 deletions

File tree

go.mod

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,7 @@ require (
1111
github.com/codeshelldev/gotl/pkg/pretty v0.0.10
1212
github.com/codeshelldev/gotl/pkg/query v0.0.4
1313
github.com/codeshelldev/gotl/pkg/request v0.0.8
14-
github.com/codeshelldev/gotl/pkg/scheduler v0.0.7
14+
github.com/codeshelldev/gotl/pkg/scheduler v0.0.9
1515
github.com/codeshelldev/gotl/pkg/server/http v0.0.3
1616
github.com/codeshelldev/gotl/pkg/stringutils v0.0.8
1717
github.com/codeshelldev/gotl/pkg/templating v0.0.4

go.sum

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -16,8 +16,8 @@ github.com/codeshelldev/gotl/pkg/query v0.0.4 h1:o2Oagx/s1wfNMqkh6GfR6wpsIVOFSDP
1616
github.com/codeshelldev/gotl/pkg/query v0.0.4/go.mod h1:Bg3tFzFq9xButTw0BSfGQhSmfAnFDrJamOcnX6Io4m4=
1717
github.com/codeshelldev/gotl/pkg/request v0.0.8 h1:sVVt2ADOTgZrna7RsqThwMKxYCuxlBE80s7kV90rARg=
1818
github.com/codeshelldev/gotl/pkg/request v0.0.8/go.mod h1:ngE6/OksRIclheFGfqJ6/2lBpzCm9sPe4p5JfGIg5kg=
19-
github.com/codeshelldev/gotl/pkg/scheduler v0.0.7 h1:6D16m1/DndhkIvoYMc26ebc9SySy1UQMc7W4QifdvvM=
20-
github.com/codeshelldev/gotl/pkg/scheduler v0.0.7/go.mod h1:sXEpRxbDc/JAN8WDxxq5+UxJf2dOQpKJIZyvORjIJGM=
19+
github.com/codeshelldev/gotl/pkg/scheduler v0.0.9 h1:8IFSPmyQehogwwo9vo3aAo3NxVY5aR8Hzrb8E+hMrDU=
20+
github.com/codeshelldev/gotl/pkg/scheduler v0.0.9/go.mod h1:sXEpRxbDc/JAN8WDxxq5+UxJf2dOQpKJIZyvORjIJGM=
2121
github.com/codeshelldev/gotl/pkg/server/http v0.0.3 h1:3232uPB2CljzUJadyrME7p0DaOCGz+vPVfPjnS788SE=
2222
github.com/codeshelldev/gotl/pkg/server/http v0.0.3/go.mod h1:/asx7ViJtwlBvLgObjI/tejm6lNDN1/B+/6BPImqDfc=
2323
github.com/codeshelldev/gotl/pkg/stringutils v0.0.8 h1:VKIuEYLJARDmHyhAbcMy1TsdxPdzsKlbQvgr1G4QE7s=

internals/db/reqdb.go

Lines changed: 25 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -214,16 +214,31 @@ func (s *RequestSchedulerDB) CleanupDones(threshold time.Duration) error {
214214
func (s *RequestSchedulerDB) FetchNext(amount int, within time.Duration) ([]*ScheduledRequest, error) {
215215
minRunAt := time.Now().Add(within).Unix()
216216

217-
rows, err := s.db.Query(`
218-
SELECT id, method, url, created_at, run_at, request_headers, request_body
219-
FROM scheduled_requests
220-
WHERE status = ? AND run_at <= ?
221-
ORDER BY run_at ASC
222-
LIMIT ?`,
223-
STATUS_PENDING,
224-
minRunAt,
225-
amount,
226-
)
217+
var err error
218+
var rows *sql.Rows
219+
220+
if within == 0 {
221+
rows, err = s.db.Query(`
222+
SELECT id, method, url, created_at, run_at, request_headers, request_body
223+
FROM scheduled_requests
224+
WHERE status = ?
225+
ORDER BY run_at ASC
226+
LIMIT ?`,
227+
STATUS_PENDING,
228+
amount,
229+
)
230+
} else {
231+
rows, err = s.db.Query(`
232+
SELECT id, method, url, created_at, run_at, request_headers, request_body
233+
FROM scheduled_requests
234+
WHERE status = ? AND run_at <= ?
235+
ORDER BY run_at ASC
236+
LIMIT ?`,
237+
STATUS_PENDING,
238+
minRunAt,
239+
amount,
240+
)
241+
}
227242

228243
if err != nil {
229244
return nil, err

internals/scheduler/reqscheduler.go

Lines changed: 38 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ package scheduler
22

33
import (
44
"bytes"
5+
"context"
56
"errors"
67
"io"
78
"net/http"
@@ -11,20 +12,33 @@ import (
1112
"github.com/codeshelldev/gotl/pkg/jsonutils"
1213
"github.com/codeshelldev/gotl/pkg/logger"
1314
"github.com/codeshelldev/gotl/pkg/request"
15+
scheduling "github.com/codeshelldev/gotl/pkg/scheduler"
1416
"github.com/codeshelldev/secured-signal-api/internals/db"
1517
"github.com/google/uuid"
1618
)
1719

1820
var rsdb *db.RequestSchedulerDB
1921

22+
var reqscheduler = scheduling.New()
23+
var cancel context.CancelFunc
24+
2025
const limit = 5
2126
const withinTime = 5 * time.Minute
2227

2328
const recoveryThreshold = 10 * time.Minute
2429

2530
const doneStaleThreshold = 24 * time.Hour
2631

32+
func Stop() {
33+
cancel()
34+
}
35+
2736
func StartRequestScheduler() {
37+
var ctx context.Context
38+
ctx, cancel = context.WithCancel(context.Background())
39+
40+
go reqscheduler.Run(ctx)
41+
2842
rsdb = db.NewRequestSchedulerDB()
2943

3044
rsdb.CleanupDones(doneStaleThreshold)
@@ -36,15 +50,15 @@ func StartRequestScheduler() {
3650
for range ticker.C {
3751
rsdb.RecoverStales(recoveryThreshold)
3852

39-
if scheduler.Len() < limit {
53+
if reqscheduler.Len() < limit {
4054
UpdateQueue()
4155
}
4256
}
4357
}()
4458
}
4559

4660
func UpdateQueue() {
47-
requests, _ := rsdb.FetchNext(limit, withinTime)
61+
requests, _ := rsdb.FetchNext(limit - reqscheduler.Len(), withinTime)
4862

4963
for _, req := range requests {
5064
AddToQueue(req)
@@ -55,24 +69,36 @@ func AddToQueue(req *db.ScheduledRequest) {
5569
rsdb.SetStatus(req.ID, db.STATUS_QUEUED)
5670
rsdb.Claim(req.ID)
5771

58-
scheduler.AddAt(req.RunAt, func() {
72+
reqscheduler.AddAtWithID(req.ID, req.RunAt, func() {
5973
HandleScheduledRequest(req)
6074
})
6175
}
6276

6377
func OnRequestScheduled(req *db.ScheduledRequest) {
64-
next, exists := scheduler.PeekTime()
78+
id, latest, exists := reqscheduler.Latest()
79+
80+
if !exists {
81+
return
82+
}
6583

66-
if exists {
67-
if req.RunAt.Before(next) {
68-
// add earliest job (current)
69-
AddToQueue(req)
84+
// get next request
85+
nexts, _ := rsdb.FetchNext(1, 0)
7086

71-
// remove latest job
72-
scheduler.Pop()
87+
if len(nexts) != 1 {
88+
return
89+
}
7390

74-
rsdb.SetStatus(req.ID, db.STATUS_PENDING)
75-
}
91+
// check if current request runs before the latest heap request
92+
// and if the next request in the db is also the current request
93+
if req.RunAt.Before(latest) && req.ID == nexts[0].ID {
94+
// add current job (earlier than latest)
95+
AddToQueue(req)
96+
97+
// remove latest job
98+
reqscheduler.Cancel(id)
99+
100+
// mark removed request as pending (not queued anymore)
101+
rsdb.SetStatus(id, db.STATUS_PENDING)
76102
}
77103
}
78104

internals/scheduler/scheduler.go

Lines changed: 0 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -1,23 +1,5 @@
11
package scheduler
22

3-
import (
4-
"context"
5-
6-
scheduling "github.com/codeshelldev/gotl/pkg/scheduler"
7-
)
8-
9-
var scheduler = scheduling.New()
10-
var cancel context.CancelFunc
11-
123
func Start() {
13-
var ctx context.Context
14-
ctx, cancel = context.WithCancel(context.Background())
15-
16-
go scheduler.Run(ctx)
17-
184
StartRequestScheduler()
19-
}
20-
21-
func Stop() {
22-
cancel()
235
}

0 commit comments

Comments
 (0)