Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
109 changes: 109 additions & 0 deletions autoscaler/autoscaler.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,109 @@
package autoscaler

import (
"fmt"
"time"

"github.com/akhani18/QueueReader/poller"
"github.com/akhani18/QueueReader/queue"
)

const (
MAX = 10
)

type Autoscaler struct {
ps []*poller.Poller
uc chan int
minP int
maxP int
sqs *queue.Queue
}

// New
func New(q *queue.Queue, minPollers int, maxPollers int) *Autoscaler {
utilChan := make(chan int, 100)
pollers := []*poller.Poller{poller.New(1, utilChan)}

a := &Autoscaler{
ps: pollers,
uc: utilChan,
minP: minPollers,
maxP: maxPollers,
sqs: q,
}

return a
}

// Start
func (a *Autoscaler) Start() {
// start monitor in a goroutine
go a.monitor()

// Start first poller
go a.ps[0].Start(a.sqs)
}

// scaleUp by 1 poller
func (a *Autoscaler) scaleUp() {
numPollers := len(a.ps)

if numPollers < a.maxP {
newPoller := poller.New(numPollers+1, a.uc)
a.ps = append(a.ps, newPoller)

go newPoller.Start(a.sqs)
} else {
fmt.Println("Can't scaleup, reached max num pollers")
}
}

// scaleDown
func (a *Autoscaler) scaleDown() {
numPollers := len(a.ps)

if numPollers > a.minP {
delPoller := a.ps[numPollers-1]
delPoller.Stop()

a.ps = a.ps[:numPollers-1]
} else {
fmt.Println("Can't scaledown, reached min num pollers")
}
}

// monitor
func (a *Autoscaler) monitor() {
ticker := time.NewTicker(60 * time.Second)
sum := 0
count := 0

for {
select {
case t := <-ticker.C:
fmt.Println("Time for autoscaling: ", t)
if count != 0 {
avg := float64(sum) / float64(count)

if avg <= 0.75*MAX {
fmt.Println("Performing scale down at ", t)
a.scaleDown()
} else if avg >= MAX {
fmt.Println("Performing scale up at ", t)
a.scaleUp()
} else {
fmt.Println("No autoscaling required")
}
sum = 0
count = 0
}

case numMsg := <-a.uc:
sum += numMsg
count++
//fmt.Printf("Accumulated messages %d, for count %d\n", sum, count)
}

}
}
19 changes: 15 additions & 4 deletions main.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,14 +6,15 @@ import (
"os"
"sync"

"github.com/akhani18/QueueReader/autoscaler"
"github.com/akhani18/QueueReader/queue"
)

func main() {
qName := flag.String("n", "", "Name of the queue used for communication.")
region := flag.String("r", "us-west-2", "AWS region where the queue exists.")
longPollDurationSecs := flag.Int64("l", 20, "Long poll duration for SQS pollers (0 - 20 seconds).")
numPollers := flag.Int("p", 1, "Number of concurrent pollers (1 - 20).")
//numPollers := flag.Int("p", 1, "Number of concurrent pollers (1 - 20).")

flag.Parse()

Expand All @@ -29,13 +30,13 @@ func main() {
os.Exit(1)
}

if *numPollers < 1 || *numPollers > 20 {
/*if *numPollers < 1 || *numPollers > 20 {
flag.PrintDefaults()
log.Println("Number of concurrent pollers should be between 1 and 20.")
os.Exit(1)
}
}*/

var wg sync.WaitGroup
/*var wg sync.WaitGroup
wg.Add(*numPollers)

for i := 1; i <= *numPollers; i++ {
Expand All @@ -44,5 +45,15 @@ func main() {
go poller.Run()
}

wg.Wait()*/

sqs := queue.New(qName, region, *longPollDurationSecs)
autos := autoscaler.New(sqs, 1, 20)

var wg sync.WaitGroup
wg.Add(1)

go autos.Start()

wg.Wait()
}
61 changes: 61 additions & 0 deletions poller/impl.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
package poller

import (
"log"

"github.com/akhani18/QueueReader/queue"
"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/service/sqs"
)

func (p *Poller) poll(q *queue.Queue) {
// Receive Messages.
result, err := q.SQSClient.ReceiveMessage(&sqs.ReceiveMessageInput{
QueueUrl: q.URL,
MaxNumberOfMessages: aws.Int64(10), // Batch receive upto 10 msgs
WaitTimeSeconds: aws.Int64(q.WaitTimeSeconds), // Long polling for 20 secs
MessageAttributeNames: aws.StringSlice([]string{
"All",
}),
AttributeNames: aws.StringSlice([]string{
"All",
}),
})

if err != nil {
log.Printf("Poller#%d: Error: %s", p.pollerID, err.Error())
return
}

// Process Messages
log.Printf("Poller#%d: Received %d messages.\n", p.pollerID, len(result.Messages))
p.utilizationChan <- len(result.Messages)

//log.Printf("Poller#%d: Received %d messages.\n", p.pollerID, 10)
//p.utilizationChan <- 10

for _, msg := range result.Messages {
go p.process(msg, q)
}
}

// Batch Delete
func (p *Poller) process(m *sqs.Message, q *queue.Queue) {
// Process
/*log.Printf("Poller#%d: Message Id: %s\n", p.pollerID, *m.MessageId)
log.Printf("Poller#%d: Message ReceiveCount: %s\n", p.pollerID, *m.Attributes["ApproximateReceiveCount"])
log.Printf("Poller#%d: Message Payload: %s\n", p.pollerID, *m.Body)*/

// Delete
_, err := q.SQSClient.DeleteMessage(&sqs.DeleteMessageInput{
QueueUrl: q.URL,
ReceiptHandle: m.ReceiptHandle,
})

if err != nil {
log.Printf("Poller#%d: Error: %s", p.pollerID, err.Error())
return
}

//log.Printf("Poller#%d: Successfully deleted message Id: %s\n", p.pollerID, *m.MessageId)
}
44 changes: 44 additions & 0 deletions poller/poller.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
package poller

import (
"fmt"
"log"
"time"

"github.com/akhani18/QueueReader/queue"
)

type Poller struct {
pollerID int
utilizationChan chan<- int
stop chan bool
}

func New(id int, utilChan chan<- int) *Poller {
return &Poller{
pollerID: id,
utilizationChan: utilChan,
stop: make(chan bool),
}
}

func (p *Poller) Start(q *queue.Queue) {
ticker := time.NewTicker(1 * time.Second)
defer ticker.Stop()
log.Printf("Poller#%d: Polling queue %s with a 20 sec long-poll and 1 sec period.\n", p.pollerID, *q.URL)

for {
select {
case <-ticker.C:
p.poll(q)

case <-p.stop:
fmt.Printf("Poller#%d: Stopping...\n", p.pollerID)
return
}
}
}

func (p *Poller) Stop() {
p.stop <- true
}
2 changes: 2 additions & 0 deletions queue/process.go
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package queue

/*
import (
"log"
"sync"
Expand Down Expand Up @@ -70,3 +71,4 @@ func (p *Poller) process(m *sqs.Message, wg *sync.WaitGroup) {

log.Printf("Poller#%d: Successfully deleted message Id: %s\n", p.pollerID, *m.MessageId)
}
*/
17 changes: 11 additions & 6 deletions queue/setup.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,13 @@ import (
"github.com/aws/aws-sdk-go/service/sqs"
)

func NewPoller(id int, qName *string, region *string, longPollDurationSec int64) *Poller {
type Queue struct {
URL *string
WaitTimeSeconds int64
SQSClient *sqs.SQS
}

func New(qName *string, region *string, longPollDurationSec int64) *Queue {
sess, _ := session.NewSession(&aws.Config{
Region: region,
})
Expand All @@ -27,10 +33,9 @@ func NewPoller(id int, qName *string, region *string, longPollDurationSec int64)

qURL := result.QueueUrl

return &Poller{
pollerID: id,
queueURL: qURL,
waitDurationSec: longPollDurationSec,
sqsClient: svc,
return &Queue{
URL: qURL,
WaitTimeSeconds: longPollDurationSec,
SQSClient: svc,
}
}