diff --git a/queue.go b/queue.go index cd6e2a8..5014d47 100644 --- a/queue.go +++ b/queue.go @@ -1,6 +1,7 @@ package main import ( + "container/list" "errors" "github.com/joncrlsn/dque" log "github.com/sirupsen/logrus" @@ -18,7 +19,7 @@ type ConfirmationQueue struct { msgQueue *dque.DQue mutex sync.Mutex emptyCond *sync.Cond - inMemory [][]byte + inMemory *list.List } var ( @@ -35,6 +36,7 @@ func ItemBuilder() interface{} { return &MessageStruct{} } +// Init initializes the queue func (cq *ConfirmationQueue) Init() *ConfirmationQueue { // Set the attributes viper.SetDefault("queue_directory", "/tmp/shoveler-queue") @@ -56,6 +58,7 @@ func (cq *ConfirmationQueue) Init() *ConfirmationQueue { cq.emptyCond = sync.NewCond(&cq.mutex) // Start the metrics goroutine + cq.inMemory = list.New() go cq.queueMetrics() return cq @@ -64,7 +67,7 @@ func (cq *ConfirmationQueue) Init() *ConfirmationQueue { func (cq *ConfirmationQueue) Size() int { cq.mutex.Lock() defer cq.mutex.Unlock() - return len(cq.inMemory) + cq.msgQueue.SizeUnsafe() + return cq.inMemory.Len() + cq.msgQueue.SizeUnsafe() } // queueMetrics updates the queue size prometheus metric @@ -90,9 +93,9 @@ func (cq *ConfirmationQueue) Enqueue(msg []byte) { cq.mutex.Lock() defer cq.mutex.Unlock() // Check size of in memory queue - if len(cq.inMemory) < MaxInMemory { + if cq.inMemory.Len() < MaxInMemory { // Add to in memory queue - cq.inMemory = append(cq.inMemory, msg) + cq.inMemory.PushBack(msg) } else { // Add to on disk queue err := cq.msgQueue.Enqueue(&MessageStruct{Message: msg}) @@ -106,15 +109,14 @@ func (cq *ConfirmationQueue) Enqueue(msg []byte) { // dequeueLocked dequeues a message, assuming the queue has already been locked func (cq *ConfirmationQueue) dequeueLocked() ([]byte, error) { // Check if we have a message available in the queue - if len(cq.inMemory) == 0 { + if cq.inMemory.Len() == 0 { return nil, ErrEmpty } - // We know we have something to dequeue - toReturn := cq.inMemory[0] - cq.inMemory = cq.inMemory[1:] + // Remove the first element and get the value + toReturn := cq.inMemory.Remove(cq.inMemory.Front()).([]byte) // See if we have anything on the on-disk - for len(cq.inMemory) < MaxInMemory { + for cq.inMemory.Len() < MaxInMemory { // Dequeue something from the on disk msgStruct, err := cq.msgQueue.Dequeue() if err == dque.ErrEmpty { @@ -122,7 +124,7 @@ func (cq *ConfirmationQueue) dequeueLocked() ([]byte, error) { break } // Add the new message to the back of the in memory queue - cq.inMemory = append(cq.inMemory, msgStruct.(*MessageStruct).Message) + cq.inMemory.PushBack(msgStruct.(*MessageStruct).Message) } return toReturn, nil }