diff --git a/autoscaler/autoscaler.go b/autoscaler/autoscaler.go new file mode 100644 index 0000000..20515e9 --- /dev/null +++ b/autoscaler/autoscaler.go @@ -0,0 +1,109 @@ +package autoscaler + +import ( + "fmt" + "time" + + "github.com/akhani18/QueueReader/poller" + "github.com/akhani18/QueueReader/queue" +) + +const ( + MAX = 10 +) + +type Autoscaler struct { + ps []*poller.Poller + uc chan int + minP int + maxP int + sqs *queue.Queue +} + +// New +func New(q *queue.Queue, minPollers int, maxPollers int) *Autoscaler { + utilChan := make(chan int, 100) + pollers := []*poller.Poller{poller.New(1, utilChan)} + + a := &Autoscaler{ + ps: pollers, + uc: utilChan, + minP: minPollers, + maxP: maxPollers, + sqs: q, + } + + return a +} + +// Start +func (a *Autoscaler) Start() { + // start monitor in a goroutine + go a.monitor() + + // Start first poller + go a.ps[0].Start(a.sqs) +} + +// scaleUp by 1 poller +func (a *Autoscaler) scaleUp() { + numPollers := len(a.ps) + + if numPollers < a.maxP { + newPoller := poller.New(numPollers+1, a.uc) + a.ps = append(a.ps, newPoller) + + go newPoller.Start(a.sqs) + } else { + fmt.Println("Can't scaleup, reached max num pollers") + } +} + +// scaleDown +func (a *Autoscaler) scaleDown() { + numPollers := len(a.ps) + + if numPollers > a.minP { + delPoller := a.ps[numPollers-1] + delPoller.Stop() + + a.ps = a.ps[:numPollers-1] + } else { + fmt.Println("Can't scaledown, reached min num pollers") + } +} + +// monitor +func (a *Autoscaler) monitor() { + ticker := time.NewTicker(60 * time.Second) + sum := 0 + count := 0 + + for { + select { + case t := <-ticker.C: + fmt.Println("Time for autoscaling: ", t) + if count != 0 { + avg := float64(sum) / float64(count) + + if avg <= 0.75*MAX { + fmt.Println("Performing scale down at ", t) + a.scaleDown() + } else if avg >= MAX { + fmt.Println("Performing scale up at ", t) + a.scaleUp() + } else { + fmt.Println("No autoscaling required") + } + sum = 0 + count = 0 + } + + case numMsg := <-a.uc: + sum += numMsg + count++ + //fmt.Printf("Accumulated messages %d, for count %d\n", sum, count) + } + + } +} diff --git a/main.go b/main.go index 8b1f958..4fbad93 100644 --- a/main.go +++ b/main.go @@ -6,6 +6,7 @@ import ( "os" "sync" + "github.com/akhani18/QueueReader/autoscaler" "github.com/akhani18/QueueReader/queue" ) @@ -13,7 +14,7 @@ func main() { qName := flag.String("n", "", "Name of the queue used for communication.") region := flag.String("r", "us-west-2", "AWS region where the queue exists.") longPollDurationSecs := flag.Int64("l", 20, "Long poll duration for SQS pollers (0 - 20 seconds).") - numPollers := flag.Int("p", 1, "Number of concurrent pollers (1 - 20).") + //numPollers := flag.Int("p", 1, "Number of concurrent pollers (1 - 20).") flag.Parse() @@ -29,13 +30,13 @@ func main() { os.Exit(1) } - if *numPollers < 1 || *numPollers > 20 { + /*if *numPollers < 1 || *numPollers > 20 { flag.PrintDefaults() log.Println("Number of concurrent pollers should be between 1 and 20.") os.Exit(1) - } + }*/ - var wg sync.WaitGroup + /*var wg sync.WaitGroup wg.Add(*numPollers) for i := 1; i <= *numPollers; i++ { @@ -44,5 +45,15 @@ func main() { go poller.Run() } + wg.Wait()*/ + + sqs := queue.New(qName, region, *longPollDurationSecs) + autos := autoscaler.New(sqs, 1, 20) + + var wg sync.WaitGroup + wg.Add(1) + + go autos.Start() + wg.Wait() } diff --git a/poller/impl.go b/poller/impl.go new file mode 100644 index 0000000..5b561f1 --- /dev/null +++ b/poller/impl.go @@ -0,0 +1,61 @@ +package poller + +import ( + "log" + + "github.com/akhani18/QueueReader/queue" + "github.com/aws/aws-sdk-go/aws" + "github.com/aws/aws-sdk-go/service/sqs" +) + +func (p *Poller) poll(q *queue.Queue) { + // Receive Messages. + result, err := q.SQSClient.ReceiveMessage(&sqs.ReceiveMessageInput{ + QueueUrl: q.URL, + MaxNumberOfMessages: aws.Int64(10), // Batch receive upto 10 msgs + WaitTimeSeconds: aws.Int64(q.WaitTimeSeconds), // Long polling for 20 secs + MessageAttributeNames: aws.StringSlice([]string{ + "All", + }), + AttributeNames: aws.StringSlice([]string{ + "All", + }), + }) + + if err != nil { + log.Printf("Poller#%d: Error: %s", p.pollerID, err.Error()) + return + } + + // Process Messages + log.Printf("Poller#%d: Received %d messages.\n", p.pollerID, len(result.Messages)) + p.utilizationChan <- len(result.Messages) + + //log.Printf("Poller#%d: Received %d messages.\n", p.pollerID, 10) + //p.utilizationChan <- 10 + + for _, msg := range result.Messages { + go p.process(msg, q) + } +} + +// Batch Delete +func (p *Poller) process(m *sqs.Message, q *queue.Queue) { + // Process + /*log.Printf("Poller#%d: Message Id: %s\n", p.pollerID, *m.MessageId) + log.Printf("Poller#%d: Message ReceiveCount: %s\n", p.pollerID, *m.Attributes["ApproximateReceiveCount"]) + log.Printf("Poller#%d: Message Payload: %s\n", p.pollerID, *m.Body)*/ + + // Delete + _, err := q.SQSClient.DeleteMessage(&sqs.DeleteMessageInput{ + QueueUrl: q.URL, + ReceiptHandle: m.ReceiptHandle, + }) + + if err != nil { + log.Printf("Poller#%d: Error: %s", p.pollerID, err.Error()) + return + } + + //log.Printf("Poller#%d: Successfully deleted message Id: %s\n", p.pollerID, *m.MessageId) +} diff --git a/poller/poller.go b/poller/poller.go new file mode 100644 index 0000000..1a5a46b --- /dev/null +++ b/poller/poller.go @@ -0,0 +1,44 @@ +package poller + +import ( + "fmt" + "log" + "time" + + "github.com/akhani18/QueueReader/queue" +) + +type Poller struct { + pollerID int + utilizationChan chan<- int + stop chan bool +} + +func New(id int, utilChan chan<- int) *Poller { + return &Poller{ + pollerID: id, + utilizationChan: utilChan, + stop: make(chan bool), + } +} + +func (p *Poller) Start(q *queue.Queue) { + ticker := time.NewTicker(1 * time.Second) + defer ticker.Stop() + log.Printf("Poller#%d: Polling queue %s with a 20 sec long-poll and 1 sec period.\n", p.pollerID, *q.URL) + + for { + select { + case <-ticker.C: + p.poll(q) + + case <-p.stop: + fmt.Printf("Poller#%d: Stopping...\n", p.pollerID) + return + } + } +} + +func (p *Poller) Stop() { + p.stop <- true +} diff --git a/queue/process.go b/queue/process.go index adad81e..13861cb 100644 --- a/queue/process.go +++ b/queue/process.go @@ -1,5 +1,6 @@ package queue +/* import ( "log" "sync" @@ -70,3 +71,4 @@ func (p *Poller) process(m *sqs.Message, wg *sync.WaitGroup) { log.Printf("Poller#%d: Successfully deleted message Id: %s\n", p.pollerID, *m.MessageId) } +*/ diff --git a/queue/setup.go b/queue/setup.go index 6ee3eb0..521b7f3 100644 --- a/queue/setup.go +++ b/queue/setup.go @@ -9,7 +9,13 @@ import ( "github.com/aws/aws-sdk-go/service/sqs" ) -func NewPoller(id int, qName *string, region *string, longPollDurationSec int64) *Poller { +type Queue struct { + URL *string + WaitTimeSeconds int64 + SQSClient *sqs.SQS +} + +func New(qName *string, region *string, longPollDurationSec int64) *Queue { sess, _ := session.NewSession(&aws.Config{ Region: region, }) @@ -27,10 +33,9 @@ func NewPoller(id int, qName *string, region *string, longPollDurationSec int64) qURL := result.QueueUrl - return &Poller{ - pollerID: id, - queueURL: qURL, - waitDurationSec: longPollDurationSec, - sqsClient: svc, + return &Queue{ + URL: qURL, + WaitTimeSeconds: longPollDurationSec, + SQSClient: svc, } }