Skip to content

Commit

Permalink
Change from slice to container list
Browse files Browse the repository at this point in the history
  • Loading branch information
djw8605 committed Dec 17, 2021
1 parent 83379e3 commit 71ff69a
Showing 1 changed file with 12 additions and 10 deletions.
22 changes: 12 additions & 10 deletions queue.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package main

import (
"container/list"
"errors"
"github.com/joncrlsn/dque"
log "github.com/sirupsen/logrus"
Expand All @@ -18,7 +19,7 @@ type ConfirmationQueue struct {
msgQueue *dque.DQue
mutex sync.Mutex
emptyCond *sync.Cond
inMemory [][]byte
inMemory *list.List
}

var (
Expand All @@ -35,6 +36,7 @@ func ItemBuilder() interface{} {
return &MessageStruct{}
}

// Init initializes the queue
func (cq *ConfirmationQueue) Init() *ConfirmationQueue {
// Set the attributes
viper.SetDefault("queue_directory", "/tmp/shoveler-queue")
Expand All @@ -56,6 +58,7 @@ func (cq *ConfirmationQueue) Init() *ConfirmationQueue {
cq.emptyCond = sync.NewCond(&cq.mutex)

// Start the metrics goroutine
cq.inMemory = list.New()
go cq.queueMetrics()
return cq

Expand All @@ -64,7 +67,7 @@ func (cq *ConfirmationQueue) Init() *ConfirmationQueue {
func (cq *ConfirmationQueue) Size() int {
cq.mutex.Lock()
defer cq.mutex.Unlock()
return len(cq.inMemory) + cq.msgQueue.SizeUnsafe()
return cq.inMemory.Len() + cq.msgQueue.SizeUnsafe()
}

// queueMetrics updates the queue size prometheus metric
Expand All @@ -90,9 +93,9 @@ func (cq *ConfirmationQueue) Enqueue(msg []byte) {
cq.mutex.Lock()
defer cq.mutex.Unlock()
// Check size of in memory queue
if len(cq.inMemory) < MaxInMemory {
if cq.inMemory.Len() < MaxInMemory {
// Add to in memory queue
cq.inMemory = append(cq.inMemory, msg)
cq.inMemory.PushBack(msg)
} else {
// Add to on disk queue
err := cq.msgQueue.Enqueue(&MessageStruct{Message: msg})
Expand All @@ -106,23 +109,22 @@ func (cq *ConfirmationQueue) Enqueue(msg []byte) {
// dequeueLocked dequeues a message, assuming the queue has already been locked
func (cq *ConfirmationQueue) dequeueLocked() ([]byte, error) {
// Check if we have a message available in the queue
if len(cq.inMemory) == 0 {
if cq.inMemory.Len() == 0 {
return nil, ErrEmpty
}
// We know we have something to dequeue
toReturn := cq.inMemory[0]
cq.inMemory = cq.inMemory[1:]
// Remove the first element and get the value
toReturn := cq.inMemory.Remove(cq.inMemory.Front()).([]byte)

// See if we have anything on the on-disk
for len(cq.inMemory) < MaxInMemory {
for cq.inMemory.Len() < MaxInMemory {
// Dequeue something from the on disk
msgStruct, err := cq.msgQueue.Dequeue()
if err == dque.ErrEmpty {
// Queue is empty
break
}
// Add the new message to the back of the in memory queue
cq.inMemory = append(cq.inMemory, msgStruct.(*MessageStruct).Message)
cq.inMemory.PushBack(msgStruct.(*MessageStruct).Message)
}
return toReturn, nil
}
Expand Down

0 comments on commit 71ff69a

Please sign in to comment.